Changes in libcfa/src/concurrency/io.cfa [ec19b21:fe9468e2]
- File:
-
- 1 edited
-
libcfa/src/concurrency/io.cfa (modified) (22 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
rec19b21 rfe9468e2 31 31 32 32 extern "C" { 33 #include <sys/epoll.h> 33 34 #include <sys/syscall.h> 34 35 … … 40 41 #include "kernel/fwd.hfa" 41 42 #include "io/types.hfa" 42 43 static const char * opcodes[] = {44 "OP_NOP",45 "OP_READV",46 "OP_WRITEV",47 "OP_FSYNC",48 "OP_READ_FIXED",49 "OP_WRITE_FIXED",50 "OP_POLL_ADD",51 "OP_POLL_REMOVE",52 "OP_SYNC_FILE_RANGE",53 "OP_SENDMSG",54 "OP_RECVMSG",55 "OP_TIMEOUT",56 "OP_TIMEOUT_REMOVE",57 "OP_ACCEPT",58 "OP_ASYNC_CANCEL",59 "OP_LINK_TIMEOUT",60 "OP_CONNECT",61 "OP_FALLOCATE",62 "OP_OPENAT",63 "OP_CLOSE",64 "OP_FILES_UPDATE",65 "OP_STATX",66 "OP_READ",67 "OP_WRITE",68 "OP_FADVISE",69 "OP_MADVISE",70 "OP_SEND",71 "OP_RECV",72 "OP_OPENAT2",73 "OP_EPOLL_CTL",74 "OP_SPLICE",75 "OP_PROVIDE_BUFFERS",76 "OP_REMOVE_BUFFERS",77 "OP_TEE",78 "INVALID_OP"79 };80 43 81 44 // returns true of acquired as leader or second leader … … 171 134 int ret = 0; 172 135 if( need_sys_to_submit || need_sys_to_complete ) { 173 __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags);174 136 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8); 175 137 if( ret < 0 ) { … … 195 157 static unsigned __collect_submitions( struct __io_data & ring ); 196 158 static __u32 __release_consumed_submission( struct __io_data & ring ); 197 static inline void __clean( volatile struct io_uring_sqe * sqe ); 159 160 static inline void process(struct io_uring_cqe & cqe ) { 161 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 162 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 163 164 fulfil( *future, cqe.res ); 165 } 198 166 199 167 // Process a single completion message from the io_uring 200 168 // This is NOT thread-safe 201 static inline void process( volatile struct io_uring_cqe & cqe ) {202 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;203 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );204 205 fulfil( *future, cqe.res );206 }207 208 169 static [int, bool] __drain_io( & struct __io_data ring ) { 209 170 /* paranoid */ verify( ! __preemption_enabled() ); … … 231 192 } 232 193 233 __atomic_thread_fence( __ATOMIC_SEQ_CST );234 235 194 // Release the consumed SQEs 236 195 __release_consumed_submission( ring ); … … 250 209 for(i; count) { 251 210 unsigned idx = (head + i) & mask; 252 volatilestruct io_uring_cqe & cqe = ring.completion_q.cqes[idx];211 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; 253 212 254 213 /* paranoid */ verify(&cqe); … … 259 218 // Mark to the kernel that the cqe has been seen 260 219 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 261 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST ); 220 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 221 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED ); 262 222 263 223 return [count, count > 0 || to_submit > 0]; … … 265 225 266 226 void main( $io_ctx_thread & this ) { 267 __ioctx_register( this );268 269 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this); 270 271 const int reset_cnt = 5; 272 int reset = reset_cnt;227 epoll_event ev; 228 __ioctx_register( this, ev ); 229 230 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring); 231 232 int reset = 0; 273 233 // Then loop until we need to start 274 LOOP:275 234 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) { 276 235 // Drain the io … … 280 239 [count, again] = __drain_io( *this.ring ); 281 240 282 if(!again) reset --;241 if(!again) reset++; 283 242 284 243 // Update statistics … … 290 249 291 250 // If we got something, just yield and check again 292 if(reset > 1) {251 if(reset < 5) { 293 252 yield(); 294 continue LOOP; 295 } 296 297 // We alread failed to find completed entries a few time. 298 if(reset == 1) { 299 // Rearm the context so it can block 300 // but don't block right away 301 // we need to retry one last time in case 302 // something completed *just now* 303 __ioctx_prepare_block( this ); 304 continue LOOP; 305 } 306 253 } 254 // We didn't get anything baton pass to the slow poller 255 else { 307 256 __STATS__( false, 308 257 io.complete_q.blocks += 1; 309 258 ) 310 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this); 259 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self); 260 reset = 0; 311 261 312 262 // block this thread 263 __ioctx_prepare_block( this, ev ); 313 264 wait( this.sem ); 314 315 // restore counter 316 reset = reset_cnt; 317 } 318 319 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this); 265 } 266 } 267 268 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring); 320 269 } 321 270 … … 340 289 // 341 290 342 // Allocate an submit queue entry. 343 // The kernel cannot see these entries until they are submitted, but other threads must be 344 // able to see which entries can be used and which are already un used by an other thread 345 // for convenience, return both the index and the pointer to the sqe 346 // sqe == &sqes[idx] 347 [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) { 291 [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) { 348 292 /* paranoid */ verify( data != 0 ); 349 293 … … 360 304 // Look through the list starting at some offset 361 305 for(i; cnt) { 362 __u64 expected = 3;363 __u32 idx = (i + off) & mask; // Get an index from a random364 volatilestruct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];306 __u64 expected = 0; 307 __u32 idx = (i + off) & mask; 308 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 365 309 volatile __u64 * udata = &sqe->user_data; 366 310 367 // Allocate the entry by CASing the user_data field from 0 to the future address368 311 if( *udata == expected && 369 312 __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) … … 376 319 ) 377 320 378 // debug log379 __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data );380 321 381 322 // Success return the data … … 384 325 verify(expected != data); 385 326 386 // This one was used387 327 len ++; 388 328 } 389 329 390 330 block++; 391 392 abort( "Kernel I/O : all submit queue entries used, yielding\n" );393 394 331 yield(); 395 332 } … … 440 377 void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) { 441 378 __io_data & ring = *ctx->thrd.ring; 442 443 {444 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];445 __cfadbg_print_safe( io,446 "Kernel I/O : submitting %u (%p) for %p\n"447 " data: %p\n"448 " opcode: %s\n"449 " fd: %d\n"450 " flags: %d\n"451 " prio: %d\n"452 " off: %p\n"453 " addr: %p\n"454 " len: %d\n"455 " other flags: %d\n"456 " splice fd: %d\n"457 " pad[0]: %llu\n"458 " pad[1]: %llu\n"459 " pad[2]: %llu\n",460 idx, sqe,461 active_thread(),462 (void*)sqe->user_data,463 opcodes[sqe->opcode],464 sqe->fd,465 sqe->flags,466 sqe->ioprio,467 sqe->off,468 sqe->addr,469 sqe->len,470 sqe->accept_flags,471 sqe->splice_fd_in,472 sqe->__pad2[0],473 sqe->__pad2[1],474 sqe->__pad2[2]475 );476 }477 478 479 379 // Get now the data we definetely need 480 380 volatile __u32 * const tail = ring.submit_q.tail; … … 543 443 unlock(ring.submit_q.submit_lock); 544 444 #endif 545 if( ret < 0 ) { 546 return; 547 } 445 if( ret < 0 ) return; 548 446 549 447 // Release the consumed SQEs … … 556 454 io.submit_q.submit_avg.cnt += 1; 557 455 ) 558 559 __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() ); 560 } 561 else 562 { 456 } 457 else { 563 458 // get mutual exclusion 564 459 #if defined(LEADER_LOCK) … … 568 463 #endif 569 464 570 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64,465 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0, 571 466 /* paranoid */ "index %u already reclaimed\n" 572 467 /* paranoid */ "head %u, prev %u, tail %u\n" … … 595 490 } 596 491 597 /* paranoid */ verify(ret == 1);598 599 492 // update statistics 600 493 __STATS__( false, … … 603 496 ) 604 497 605 {606 __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head;607 __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ];608 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx];609 610 __cfadbg_print_safe( io,611 "Kernel I/O : last submitted is %u (%p)\n"612 " data: %p\n"613 " opcode: %s\n"614 " fd: %d\n"615 " flags: %d\n"616 " prio: %d\n"617 " off: %p\n"618 " addr: %p\n"619 " len: %d\n"620 " other flags: %d\n"621 " splice fd: %d\n"622 " pad[0]: %llu\n"623 " pad[1]: %llu\n"624 " pad[2]: %llu\n",625 last_idx, sqe,626 (void*)sqe->user_data,627 opcodes[sqe->opcode],628 sqe->fd,629 sqe->flags,630 sqe->ioprio,631 sqe->off,632 sqe->addr,633 sqe->len,634 sqe->accept_flags,635 sqe->splice_fd_in,636 sqe->__pad2[0],637 sqe->__pad2[1],638 sqe->__pad2[2]639 );640 }641 642 __atomic_thread_fence( __ATOMIC_SEQ_CST );643 498 // Release the consumed SQEs 644 499 __release_consumed_submission( ring ); 645 // ring.submit_q.sqes[idx].user_data = 3ul64;646 500 647 501 #if defined(LEADER_LOCK) … … 651 505 #endif 652 506 653 __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread());507 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret ); 654 508 } 655 509 } 656 510 657 511 // #define PARTIAL_SUBMIT 32 658 659 // go through the list of submissions in the ready array and moved them into660 // the ring's submit queue661 512 static unsigned __collect_submitions( struct __io_data & ring ) { 662 513 /* paranoid */ verify( ring.submit_q.ready != 0p ); … … 699 550 } 700 551 701 // Go through the ring's submit queue and release everything that has already been consumed702 // by io_uring703 552 static __u32 __release_consumed_submission( struct __io_data & ring ) { 704 553 const __u32 smask = *ring.submit_q.mask; 705 554 706 // We need to get the lock to copy the old head and new head707 555 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0; 708 __attribute__((unused)) 709 __u32 ctail = *ring.submit_q.tail; // get the current tail of the queue 710 __u32 chead = *ring.submit_q.head; // get the current head of the queue 711 __u32 phead = ring.submit_q.prev_head; // get the head the last time we were here 712 ring.submit_q.prev_head = chead; // note up to were we processed 556 __u32 chead = *ring.submit_q.head; 557 __u32 phead = ring.submit_q.prev_head; 558 ring.submit_q.prev_head = chead; 713 559 unlock(ring.submit_q.release_lock); 714 560 715 // the 3 fields are organized like this diagram716 // except it's are ring717 // ---+--------+--------+----718 // ---+--------+--------+----719 // ^ ^ ^720 // phead chead ctail721 722 // make sure ctail doesn't wrap around and reach phead723 /* paranoid */ verify(724 (ctail >= chead && chead >= phead)725 || (chead >= phead && phead >= ctail)726 || (phead >= ctail && ctail >= chead)727 );728 729 // find the range we need to clear730 561 __u32 count = chead - phead; 731 732 // We acquired an previous-head/current-head range733 // go through the range and release the sqes734 562 for( i; count ) { 735 563 __u32 idx = ring.submit_q.array[ (phead + i) & smask ]; 736 737 /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data ); 738 __clean( &ring.submit_q.sqes[ idx ] ); 564 ring.submit_q.sqes[ idx ].user_data = 0; 739 565 } 740 566 return count; 741 567 } 742 743 void __sqe_clean( volatile struct io_uring_sqe * sqe ) {744 __clean( sqe );745 }746 747 static inline void __clean( volatile struct io_uring_sqe * sqe ) {748 // If we are in debug mode, thrash the fields to make sure we catch reclamation errors749 __cfaabi_dbg_debug_do(750 memset(sqe, 0xde, sizeof(*sqe));751 sqe->opcode = (sizeof(opcodes) / sizeof(const char *)) - 1;752 );753 754 // Mark the entry as unused755 __atomic_store_n(&sqe->user_data, 3ul64, __ATOMIC_SEQ_CST);756 }757 568 #endif
Note:
See TracChangeset
for help on using the changeset viewer.