Changeset 402658b1 for libcfa/src
- Timestamp:
- Jan 13, 2021, 10:23:07 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 9153e53
- Parents:
- bace538 (diff), a00bc5b (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- libcfa/src/concurrency
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
rbace538 r402658b1 31 31 32 32 extern "C" { 33 #include <sys/epoll.h>34 33 #include <sys/syscall.h> 35 34 … … 41 40 #include "kernel/fwd.hfa" 42 41 #include "io/types.hfa" 42 43 static const char * opcodes[] = { 44 "OP_NOP", 45 "OP_READV", 46 "OP_WRITEV", 47 "OP_FSYNC", 48 "OP_READ_FIXED", 49 "OP_WRITE_FIXED", 50 "OP_POLL_ADD", 51 "OP_POLL_REMOVE", 52 "OP_SYNC_FILE_RANGE", 53 "OP_SENDMSG", 54 "OP_RECVMSG", 55 "OP_TIMEOUT", 56 "OP_TIMEOUT_REMOVE", 57 "OP_ACCEPT", 58 "OP_ASYNC_CANCEL", 59 "OP_LINK_TIMEOUT", 60 "OP_CONNECT", 61 "OP_FALLOCATE", 62 "OP_OPENAT", 63 "OP_CLOSE", 64 "OP_FILES_UPDATE", 65 "OP_STATX", 66 "OP_READ", 67 "OP_WRITE", 68 "OP_FADVISE", 69 "OP_MADVISE", 70 "OP_SEND", 71 "OP_RECV", 72 "OP_OPENAT2", 73 "OP_EPOLL_CTL", 74 "OP_SPLICE", 75 "OP_PROVIDE_BUFFERS", 76 "OP_REMOVE_BUFFERS", 77 "OP_TEE", 78 "INVALID_OP" 79 }; 43 80 44 81 // returns true of acquired as leader or second leader … … 134 171 int ret = 0; 135 172 if( need_sys_to_submit || need_sys_to_complete ) { 173 __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags); 136 174 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8); 137 175 if( ret < 0 ) { … … 157 195 static unsigned __collect_submitions( struct __io_data & ring ); 158 196 static __u32 __release_consumed_submission( struct __io_data & ring ); 159 160 static inline void process(struct io_uring_cqe & cqe ) { 197 static inline void __clean( volatile struct io_uring_sqe * sqe ); 198 199 // Process a single completion message from the io_uring 200 // This is NOT thread-safe 201 static inline void process( volatile struct io_uring_cqe & cqe ) { 161 202 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 162 203 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); … … 165 206 } 166 207 167 // Process a single completion message from the io_uring168 // This is NOT thread-safe169 208 static [int, bool] __drain_io( & struct __io_data ring ) { 170 209 /* paranoid */ verify( ! __preemption_enabled() ); … … 192 231 } 193 232 233 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 234 194 235 // Release the consumed SQEs 195 236 __release_consumed_submission( ring ); … … 209 250 for(i; count) { 210 251 unsigned idx = (head + i) & mask; 211 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];252 volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; 212 253 213 254 /* paranoid */ verify(&cqe); … … 218 259 // Mark to the kernel that the cqe has been seen 219 260 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 220 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 221 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED ); 261 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST ); 222 262 223 263 return [count, count > 0 || to_submit > 0]; … … 225 265 226 266 void main( $io_ctx_thread & this ) { 227 epoll_event ev;228 __ioctx_register( this, ev ); 229 230 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring); 231 232 int reset = 0;267 __ioctx_register( this ); 268 269 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this); 270 271 const int reset_cnt = 5; 272 int reset = reset_cnt; 233 273 // Then loop until we need to start 274 LOOP: 234 275 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) { 235 276 // Drain the io … … 239 280 [count, again] = __drain_io( *this.ring ); 240 281 241 if(!again) reset ++;282 if(!again) reset--; 242 283 243 284 // Update statistics … … 249 290 250 291 // If we got something, just yield and check again 251 if(reset < 5) {292 if(reset > 1) { 252 293 yield(); 253 } 254 // We didn't get anything baton pass to the slow poller 255 else { 294 continue LOOP; 295 } 296 297 // We alread failed to find completed entries a few time. 298 if(reset == 1) { 299 // Rearm the context so it can block 300 // but don't block right away 301 // we need to retry one last time in case 302 // something completed *just now* 303 __ioctx_prepare_block( this ); 304 continue LOOP; 305 } 306 256 307 __STATS__( false, 257 308 io.complete_q.blocks += 1; 258 309 ) 259 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self); 260 reset = 0; 310 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this); 261 311 262 312 // block this thread 263 __ioctx_prepare_block( this, ev );264 313 wait( this.sem ); 265 } 266 } 267 268 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring); 314 315 // restore counter 316 reset = reset_cnt; 317 } 318 319 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this); 269 320 } 270 321 … … 289 340 // 290 341 291 [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) { 342 // Allocate an submit queue entry. 343 // The kernel cannot see these entries until they are submitted, but other threads must be 344 // able to see which entries can be used and which are already un used by an other thread 345 // for convenience, return both the index and the pointer to the sqe 346 // sqe == &sqes[idx] 347 [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) { 292 348 /* paranoid */ verify( data != 0 ); 293 349 … … 304 360 // Look through the list starting at some offset 305 361 for(i; cnt) { 306 __u64 expected = 0;307 __u32 idx = (i + off) & mask; 308 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];362 __u64 expected = 3; 363 __u32 idx = (i + off) & mask; // Get an index from a random 364 volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 309 365 volatile __u64 * udata = &sqe->user_data; 310 366 367 // Allocate the entry by CASing the user_data field from 0 to the future address 311 368 if( *udata == expected && 312 369 __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) … … 319 376 ) 320 377 378 // debug log 379 __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data ); 321 380 322 381 // Success return the data … … 325 384 verify(expected != data); 326 385 386 // This one was used 327 387 len ++; 328 388 } 329 389 330 390 block++; 391 392 abort( "Kernel I/O : all submit queue entries used, yielding\n" ); 393 331 394 yield(); 332 395 } … … 377 440 void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) { 378 441 __io_data & ring = *ctx->thrd.ring; 442 443 { 444 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 445 __cfadbg_print_safe( io, 446 "Kernel I/O : submitting %u (%p) for %p\n" 447 " data: %p\n" 448 " opcode: %s\n" 449 " fd: %d\n" 450 " flags: %d\n" 451 " prio: %d\n" 452 " off: %p\n" 453 " addr: %p\n" 454 " len: %d\n" 455 " other flags: %d\n" 456 " splice fd: %d\n" 457 " pad[0]: %llu\n" 458 " pad[1]: %llu\n" 459 " pad[2]: %llu\n", 460 idx, sqe, 461 active_thread(), 462 (void*)sqe->user_data, 463 opcodes[sqe->opcode], 464 sqe->fd, 465 sqe->flags, 466 sqe->ioprio, 467 sqe->off, 468 sqe->addr, 469 sqe->len, 470 sqe->accept_flags, 471 sqe->splice_fd_in, 472 sqe->__pad2[0], 473 sqe->__pad2[1], 474 sqe->__pad2[2] 475 ); 476 } 477 478 379 479 // Get now the data we definetely need 380 480 volatile __u32 * const tail = ring.submit_q.tail; … … 443 543 unlock(ring.submit_q.submit_lock); 444 544 #endif 445 if( ret < 0 ) return; 545 if( ret < 0 ) { 546 return; 547 } 446 548 447 549 // Release the consumed SQEs … … 454 556 io.submit_q.submit_avg.cnt += 1; 455 557 ) 456 } 457 else { 558 559 __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() ); 560 } 561 else 562 { 458 563 // get mutual exclusion 459 564 #if defined(LEADER_LOCK) … … 463 568 #endif 464 569 465 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,570 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64, 466 571 /* paranoid */ "index %u already reclaimed\n" 467 572 /* paranoid */ "head %u, prev %u, tail %u\n" … … 490 595 } 491 596 597 /* paranoid */ verify(ret == 1); 598 492 599 // update statistics 493 600 __STATS__( false, … … 496 603 ) 497 604 605 { 606 __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head; 607 __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ]; 608 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx]; 609 610 __cfadbg_print_safe( io, 611 "Kernel I/O : last submitted is %u (%p)\n" 612 " data: %p\n" 613 " opcode: %s\n" 614 " fd: %d\n" 615 " flags: %d\n" 616 " prio: %d\n" 617 " off: %p\n" 618 " addr: %p\n" 619 " len: %d\n" 620 " other flags: %d\n" 621 " splice fd: %d\n" 622 " pad[0]: %llu\n" 623 " pad[1]: %llu\n" 624 " pad[2]: %llu\n", 625 last_idx, sqe, 626 (void*)sqe->user_data, 627 opcodes[sqe->opcode], 628 sqe->fd, 629 sqe->flags, 630 sqe->ioprio, 631 sqe->off, 632 sqe->addr, 633 sqe->len, 634 sqe->accept_flags, 635 sqe->splice_fd_in, 636 sqe->__pad2[0], 637 sqe->__pad2[1], 638 sqe->__pad2[2] 639 ); 640 } 641 642 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 498 643 // Release the consumed SQEs 499 644 __release_consumed_submission( ring ); 645 // ring.submit_q.sqes[idx].user_data = 3ul64; 500 646 501 647 #if defined(LEADER_LOCK) … … 505 651 #endif 506 652 507 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret);653 __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() ); 508 654 } 509 655 } 510 656 511 657 // #define PARTIAL_SUBMIT 32 658 659 // go through the list of submissions in the ready array and moved them into 660 // the ring's submit queue 512 661 static unsigned __collect_submitions( struct __io_data & ring ) { 513 662 /* paranoid */ verify( ring.submit_q.ready != 0p ); … … 550 699 } 551 700 701 // Go through the ring's submit queue and release everything that has already been consumed 702 // by io_uring 552 703 static __u32 __release_consumed_submission( struct __io_data & ring ) { 553 704 const __u32 smask = *ring.submit_q.mask; 554 705 706 // We need to get the lock to copy the old head and new head 555 707 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0; 556 __u32 chead = *ring.submit_q.head; 557 __u32 phead = ring.submit_q.prev_head; 558 ring.submit_q.prev_head = chead; 708 __attribute__((unused)) 709 __u32 ctail = *ring.submit_q.tail; // get the current tail of the queue 710 __u32 chead = *ring.submit_q.head; // get the current head of the queue 711 __u32 phead = ring.submit_q.prev_head; // get the head the last time we were here 712 ring.submit_q.prev_head = chead; // note up to were we processed 559 713 unlock(ring.submit_q.release_lock); 560 714 715 // the 3 fields are organized like this diagram 716 // except it's are ring 717 // ---+--------+--------+---- 718 // ---+--------+--------+---- 719 // ^ ^ ^ 720 // phead chead ctail 721 722 // make sure ctail doesn't wrap around and reach phead 723 /* paranoid */ verify( 724 (ctail >= chead && chead >= phead) 725 || (chead >= phead && phead >= ctail) 726 || (phead >= ctail && ctail >= chead) 727 ); 728 729 // find the range we need to clear 561 730 __u32 count = chead - phead; 731 732 // We acquired an previous-head/current-head range 733 // go through the range and release the sqes 562 734 for( i; count ) { 563 735 __u32 idx = ring.submit_q.array[ (phead + i) & smask ]; 564 ring.submit_q.sqes[ idx ].user_data = 0; 736 737 /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data ); 738 __clean( &ring.submit_q.sqes[ idx ] ); 565 739 } 566 740 return count; 567 741 } 742 743 void __sqe_clean( volatile struct io_uring_sqe * sqe ) { 744 __clean( sqe ); 745 } 746 747 static inline void __clean( volatile struct io_uring_sqe * sqe ) { 748 // If we are in debug mode, thrash the fields to make sure we catch reclamation errors 749 __cfaabi_dbg_debug_do( 750 memset(sqe, 0xde, sizeof(*sqe)); 751 sqe->opcode = IORING_OP_LAST; 752 ); 753 754 // Mark the entry as unused 755 __atomic_store_n(&sqe->user_data, 3ul64, __ATOMIC_SEQ_CST); 756 } 568 757 #endif -
libcfa/src/concurrency/io/call.cfa.in
rbace538 r402658b1 74 74 ; 75 75 76 extern [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data );76 extern [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ); 77 77 extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))); 78 78 … … 222 222 __u32 idx; 223 223 struct io_uring_sqe * sqe; 224 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future ); 225 226 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; 224 [(volatile struct io_uring_sqe *) sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future ); 225 227 226 sqe->opcode = IORING_OP_{op}; 228 sqe->flags = sflags;{body} 227 sqe->flags = sflags; 228 sqe->ioprio = 0; 229 sqe->fd = 0; 230 sqe->off = 0; 231 sqe->addr = 0; 232 sqe->len = 0; 233 sqe->accept_flags = 0; 234 sqe->__pad2[0] = 0; 235 sqe->__pad2[1] = 0; 236 sqe->__pad2[2] = 0;{body} 237 238 asm volatile("": : :"memory"); 229 239 230 240 verify( sqe->user_data == (__u64)(uintptr_t)&future ); … … 312 322 }), 313 323 # CFA_HAVE_IORING_OP_ACCEPT 314 Call('ACCEPT 4', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', {324 Call('ACCEPT', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', { 315 325 'fd': 'sockfd', 316 'addr': ' addr',317 'addr2': ' addrlen',326 'addr': '(__u64)addr', 327 'addr2': '(__u64)addrlen', 318 328 'accept_flags': 'flags' 319 329 }), … … 464 474 465 475 print(""" 476 //----------------------------------------------------------------------------- 477 bool cancel(io_cancellation & this) { 478 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL) 479 return false; 480 #else 481 io_future_t future; 482 483 io_context * context = __get_io_context(); 484 485 __u8 sflags = 0; 486 struct __io_data & ring = *context->thrd.ring; 487 488 __u32 idx; 489 volatile struct io_uring_sqe * sqe; 490 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future ); 491 492 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; 493 sqe->opcode = IORING_OP_ASYNC_CANCEL; 494 sqe->flags = sflags; 495 sqe->addr = this.target; 496 497 verify( sqe->user_data == (__u64)(uintptr_t)&future ); 498 __submit( context, idx ); 499 500 wait(future); 501 502 if( future.result == 0 ) return true; // Entry found 503 if( future.result == -EALREADY) return true; // Entry found but in progress 504 if( future.result == -ENOENT ) return false; // Entry not found 505 return false; 506 #endif 507 } 508 466 509 //----------------------------------------------------------------------------- 467 510 // Check if a function is has asynchronous -
libcfa/src/concurrency/io/setup.cfa
rbace538 r402658b1 52 52 #include <pthread.h> 53 53 #include <sys/epoll.h> 54 #include <sys/eventfd.h> 54 55 #include <sys/mman.h> 55 56 #include <sys/syscall.h> … … 169 170 // Main loop 170 171 while( iopoll.run ) { 172 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n"); 173 171 174 // Wait for events 172 175 int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask ); 176 177 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds); 173 178 174 179 // Check if an error occured … … 181 186 $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64; 182 187 /* paranoid */ verify( io_ctx ); 183 __cfadbg_print_safe(io_core, "Kernel I/O : Unparking io poller %p\n", io_ctx);188 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->ring->fd, io_ctx); 184 189 #if !defined( __CFA_NO_STATISTICS__ ) 185 190 __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats; 186 191 #endif 192 193 eventfd_t v; 194 eventfd_read(io_ctx->ring->efd, &v); 195 187 196 post( io_ctx->sem ); 188 197 } … … 233 242 $thread & thrd = this.thrd.self; 234 243 if( cluster_context ) { 244 // We are about to do weird things with the threads 245 // we don't need interrupts to complicate everything 246 disable_interrupts(); 247 248 // Get cluster info 235 249 cluster & cltr = *thrd.curr_cluster; 236 250 /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster ); … … 239 253 // We need to adjust the clean-up based on where the thread is 240 254 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 255 // This is the tricky case 256 // The thread was preempted or ready to run and now it is on the ready queue 257 // but the cluster is shutting down, so there aren't any processors to run the ready queue 258 // the solution is to steal the thread from the ready-queue and pretend it was blocked all along 241 259 242 260 ready_schedule_lock(); 243 244 // This is the tricky case 245 // The thread was preempted and now it is on the ready queue 261 // The thread should on the list 262 /* paranoid */ verify( thrd.link.next != 0p ); 263 264 // Remove the thread from the ready queue of this cluster 246 265 // The thread should be the last on the list 247 /* paranoid */ verify( thrd.link.next != 0p );248 249 // Remove the thread from the ready queue of this cluster250 266 __attribute__((unused)) bool removed = remove_head( &cltr, &thrd ); 251 267 /* paranoid */ verify( removed ); … … 263 279 } 264 280 // !!! This is not an else if !!! 281 // Ok, now the thread is blocked (whether we cheated to get here or not) 265 282 if( thrd.state == Blocked ) { 266 267 283 // This is the "easy case" 268 284 // The thread is parked and can easily be moved to active cluster … … 274 290 } 275 291 else { 276 277 292 // The thread is in a weird state 278 293 // I don't know what to do here 279 294 abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n"); 280 295 } 296 297 // The weird thread kidnapping stuff is over, restore interrupts. 298 enable_interrupts( __cfaabi_dbg_ctx ); 281 299 } else { 282 300 post( this.thrd.sem ); … … 365 383 } 366 384 385 // Step 3 : Initialize the data structure 367 386 // Get the pointers from the kernel to fill the structure 368 387 // submit queue … … 379 398 const __u32 num = *sq.num; 380 399 for( i; num ) { 381 sq.sqes[i].user_data = 0ul64; 400 sq.sqes[i].opcode = IORING_OP_LAST; 401 sq.sqes[i].user_data = 3ul64; 382 402 } 383 403 } … … 409 429 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes); 410 430 431 // Step 4 : eventfd 432 int efd; 433 for() { 434 efd = eventfd(0, 0); 435 if (efd < 0) { 436 if (errno == EINTR) continue; 437 abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno)); 438 } 439 break; 440 } 441 442 int ret; 443 for() { 444 ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1); 445 if (ret < 0) { 446 if (errno == EINTR) continue; 447 abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno)); 448 } 449 break; 450 } 451 411 452 // some paranoid checks 412 453 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask ); … … 423 464 this.ring_flags = params.flags; 424 465 this.fd = fd; 466 this.efd = efd; 425 467 this.eager_submits = params_in.eager_submits; 426 468 this.poller_submits = params_in.poller_submits; … … 445 487 // close the file descriptor 446 488 close(this.fd); 489 close(this.efd); 447 490 448 491 free( this.submit_q.ready ); // Maybe null, doesn't matter … … 452 495 // I/O Context Sleep 453 496 //============================================================================================= 454 455 void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev) { 456 ev.events = EPOLLIN | EPOLLONESHOT; 497 #define IOEVENTS EPOLLIN | EPOLLONESHOT 498 499 static inline void __ioctx_epoll_ctl($io_ctx_thread & ctx, int op, const char * error) { 500 struct epoll_event ev; 501 ev.events = IOEVENTS; 457 502 ev.data.u64 = (__u64)&ctx; 458 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, ctx.ring->fd, &ev);503 int ret = epoll_ctl(iopoll.epollfd, op, ctx.ring->efd, &ev); 459 504 if (ret < 0) { 460 abort( "KERNEL ERROR: EPOLL ADD - (%d) %s\n", (int)errno, strerror(errno) ); 461 } 462 } 463 464 void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev) { 465 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, ctx.ring->fd, &ev); 466 if (ret < 0) { 467 abort( "KERNEL ERROR: EPOLL REARM - (%d) %s\n", (int)errno, strerror(errno) ); 468 } 505 abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) ); 506 } 507 } 508 509 void __ioctx_register($io_ctx_thread & ctx) { 510 __ioctx_epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD"); 511 } 512 513 void __ioctx_prepare_block($io_ctx_thread & ctx) { 514 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.ring->fd, &ctx); 515 __ioctx_epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM"); 469 516 } 470 517 -
libcfa/src/concurrency/io/types.hfa
rbace538 r402658b1 65 65 66 66 // A buffer of sqes (not the actual ring) 67 struct io_uring_sqe * sqes;67 volatile struct io_uring_sqe * sqes; 68 68 69 69 // The location and size of the mmaped area … … 85 85 86 86 // the kernel ring 87 struct io_uring_cqe * cqes;87 volatile struct io_uring_cqe * cqes; 88 88 89 89 // The location and size of the mmaped area … … 97 97 __u32 ring_flags; 98 98 int fd; 99 int efd; 99 100 bool eager_submits:1; 100 101 bool poller_submits:1; … … 130 131 #endif 131 132 132 struct epoll_event;133 133 struct $io_ctx_thread; 134 void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev); 135 void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev); 134 void __ioctx_register($io_ctx_thread & ctx); 135 void __ioctx_prepare_block($io_ctx_thread & ctx); 136 void __sqe_clean( volatile struct io_uring_sqe * sqe ); 136 137 #endif 137 138
Note:
See TracChangeset
for help on using the changeset viewer.