- File:
-
- 1 edited
-
libcfa/src/concurrency/io/setup.cfa (modified) (13 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io/setup.cfa
rec19b21 r80444bb 52 52 #include <pthread.h> 53 53 #include <sys/epoll.h> 54 #include <sys/eventfd.h>55 54 #include <sys/mman.h> 56 55 #include <sys/syscall.h> … … 170 169 // Main loop 171 170 while( iopoll.run ) { 172 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");173 174 171 // Wait for events 175 172 int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask ); 176 177 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);178 173 179 174 // Check if an error occured … … 186 181 $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64; 187 182 /* paranoid */ verify( io_ctx ); 188 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->ring->fd, io_ctx);183 __cfadbg_print_safe(io_core, "Kernel I/O : Unparking io poller %p\n", io_ctx); 189 184 #if !defined( __CFA_NO_STATISTICS__ ) 190 185 __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats; 191 186 #endif 192 193 eventfd_t v;194 eventfd_read(io_ctx->ring->efd, &v);195 196 187 post( io_ctx->sem ); 197 188 } … … 242 233 $thread & thrd = this.thrd.self; 243 234 if( cluster_context ) { 244 // We are about to do weird things with the threads245 // we don't need interrupts to complicate everything246 disable_interrupts();247 248 // Get cluster info249 235 cluster & cltr = *thrd.curr_cluster; 250 236 /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster ); … … 253 239 // We need to adjust the clean-up based on where the thread is 254 240 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 255 // This is the tricky case256 // The thread was preempted or ready to run and now it is on the ready queue257 // but the cluster is shutting down, so there aren't any processors to run the ready queue258 // the solution is to steal the thread from the ready-queue and pretend it was blocked all along259 241 260 242 ready_schedule_lock(); 261 // The thread should on the list 243 244 // This is the tricky case 245 // The thread was preempted and now it is on the ready queue 246 // The thread should be the last on the list 262 247 /* paranoid */ verify( thrd.link.next != 0p ); 263 248 264 249 // Remove the thread from the ready queue of this cluster 265 // The thread should be the last on the list266 250 __attribute__((unused)) bool removed = remove_head( &cltr, &thrd ); 267 251 /* paranoid */ verify( removed ); … … 279 263 } 280 264 // !!! This is not an else if !!! 281 // Ok, now the thread is blocked (whether we cheated to get here or not)282 265 if( thrd.state == Blocked ) { 266 283 267 // This is the "easy case" 284 268 // The thread is parked and can easily be moved to active cluster … … 290 274 } 291 275 else { 276 292 277 // The thread is in a weird state 293 278 // I don't know what to do here 294 279 abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n"); 295 280 } 296 297 // The weird thread kidnapping stuff is over, restore interrupts.298 enable_interrupts( __cfaabi_dbg_ctx );299 281 } else { 300 282 post( this.thrd.sem ); … … 383 365 } 384 366 385 // Step 3 : Initialize the data structure386 367 // Get the pointers from the kernel to fill the structure 387 368 // submit queue … … 398 379 const __u32 num = *sq.num; 399 380 for( i; num ) { 400 __sqe_clean( &sq.sqes[i] );381 sq.sqes[i].user_data = 0ul64; 401 382 } 402 383 } … … 428 409 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes); 429 410 430 // Step 4 : eventfd431 int efd;432 for() {433 efd = eventfd(0, 0);434 if (efd < 0) {435 if (errno == EINTR) continue;436 abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno));437 }438 break;439 }440 441 int ret;442 for() {443 ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1);444 if (ret < 0) {445 if (errno == EINTR) continue;446 abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno));447 }448 break;449 }450 451 411 // some paranoid checks 452 412 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask ); … … 463 423 this.ring_flags = params.flags; 464 424 this.fd = fd; 465 this.efd = efd;466 425 this.eager_submits = params_in.eager_submits; 467 426 this.poller_submits = params_in.poller_submits; … … 486 445 // close the file descriptor 487 446 close(this.fd); 488 close(this.efd);489 447 490 448 free( this.submit_q.ready ); // Maybe null, doesn't matter … … 494 452 // I/O Context Sleep 495 453 //============================================================================================= 496 #define IOEVENTS EPOLLIN | EPOLLONESHOT 497 498 static inline void __ioctx_epoll_ctl($io_ctx_thread & ctx, int op, const char * error) { 499 struct epoll_event ev; 500 ev.events = IOEVENTS; 454 455 void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev) { 456 ev.events = EPOLLIN | EPOLLONESHOT; 501 457 ev.data.u64 = (__u64)&ctx; 502 int ret = epoll_ctl(iopoll.epollfd, op, ctx.ring->efd, &ev);458 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, ctx.ring->fd, &ev); 503 459 if (ret < 0) { 504 abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) ); 505 } 506 } 507 508 void __ioctx_register($io_ctx_thread & ctx) { 509 __ioctx_epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD"); 510 } 511 512 void __ioctx_prepare_block($io_ctx_thread & ctx) { 513 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.ring->fd, &ctx); 514 __ioctx_epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM"); 460 abort( "KERNEL ERROR: EPOLL ADD - (%d) %s\n", (int)errno, strerror(errno) ); 461 } 462 } 463 464 void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev) { 465 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, ctx.ring->fd, &ev); 466 if (ret < 0) { 467 abort( "KERNEL ERROR: EPOLL REARM - (%d) %s\n", (int)errno, strerror(errno) ); 468 } 515 469 } 516 470
Note:
See TracChangeset
for help on using the changeset viewer.