Changes in libcfa/src/concurrency/io.cfa [26544f9:a757ba1]
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r26544f9 ra757ba1 85 85 static io_context$ * __ioarbiter_allocate( io_arbiter$ & this, __u32 idxs[], __u32 want ); 86 86 static void __ioarbiter_submit( io_context$ * , __u32 idxs[], __u32 have, bool lazy ); 87 static void __ioarbiter_flush ( io_context$ & , bool kernel);87 static void __ioarbiter_flush ( io_context$ & ); 88 88 static inline void __ioarbiter_notify( io_context$ & ctx ); 89 89 //============================================================================================= … … 94 94 extern void __kernel_unpark( thread$ * thrd, unpark_hint ); 95 95 96 static inline void __post(oneshot & this, bool kernel, unpark_hint hint) {97 thread$ * t = post( this, false );98 if(kernel) __kernel_unpark( t, hint );99 else unpark( t, hint );100 }101 102 // actual system call of io uring103 // wrap so everything that needs to happen around it is always done104 // i.e., stats, book keeping, sqe reclamation, etc.105 96 static void ioring_syscsll( struct io_context$ & ctx, unsigned int min_comp, unsigned int flags ) { 106 97 __STATS__( true, io.calls.flush++; ) 107 98 int ret; 108 99 for() { 109 // do the system call in a loop, repeat on interrupts110 100 ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, flags, (sigset_t *)0p, _NSIG / 8); 111 101 if( ret < 0 ) { … … 130 120 /* paranoid */ verify( ctx.sq.to_submit >= ret ); 131 121 132 // keep track of how many still need submitting 133 __atomic_fetch_sub(&ctx.sq.to_submit, ret, __ATOMIC_SEQ_CST); 122 ctx.sq.to_submit -= ret; 134 123 135 124 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); … … 140 129 /* paranoid */ verify( ! __preemption_enabled() ); 141 130 142 // mark that there is no pending io left143 131 __atomic_store_n(&ctx.proc->io.pending, false, __ATOMIC_RELAXED); 144 132 } 145 133 146 // try to acquire an io context for draining, helping means we never *need* to drain, we can always do it later147 134 static bool try_acquire( io_context$ * ctx ) __attribute__((nonnull(1))) { 148 135 /* paranoid */ verify( ! __preemption_enabled() ); … … 151 138 152 139 { 153 // if there is nothing to drain there is no point in acquiring anything154 140 const __u32 head = *ctx->cq.head; 155 141 const __u32 tail = *ctx->cq.tail; … … 158 144 } 159 145 160 // try a simple spinlock acquire, it's likely there are completions to drain 161 if(!__atomic_try_acquire(&ctx->cq.try_lock)) { 162 // some other processor already has it 146 // Drain the queue 147 if(!__atomic_try_acquire(&ctx->cq.lock)) { 163 148 __STATS__( false, io.calls.locked++; ) 164 149 return false; 165 150 } 166 151 167 // acquired!!168 152 return true; 169 153 } 170 154 171 // actually drain the completion172 155 static bool __cfa_do_drain( io_context$ * ctx, cluster * cltr ) __attribute__((nonnull(1, 2))) { 173 156 /* paranoid */ verify( ! __preemption_enabled() ); 174 157 /* paranoid */ verify( ready_schedule_islocked() ); 175 /* paranoid */ verify( ctx->cq.try_lock == true ); 176 177 // get all the invariants and initial state 158 /* paranoid */ verify( ctx->cq.lock == true ); 159 178 160 const __u32 mask = *ctx->cq.mask; 179 161 const __u32 num = *ctx->cq.num; … … 184 166 for() { 185 167 // re-read the head and tail in case it already changed. 186 // count the difference between the two187 168 const __u32 head = *ctx->cq.head; 188 169 const __u32 tail = *ctx->cq.tail; … … 190 171 __STATS__( false, io.calls.drain++; io.calls.completed += count; ) 191 172 192 // for everything between head and tail, drain it193 173 for(i; count) { 194 174 unsigned idx = (head + i) & mask; … … 197 177 /* paranoid */ verify(&cqe); 198 178 199 // find the future in the completion200 179 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 201 180 // __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 202 181 203 // don't directly fulfill the future, preemption is disabled so we need to use kernel_unpark204 182 __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL ); 205 183 } 206 184 207 // update the timestamps accordingly208 // keep a local copy so we can update the relaxed copy209 185 ts_next = ctx->cq.ts = rdtscl(); 210 186 … … 214 190 ctx->proc->idle_wctx.drain_time = ts_next; 215 191 216 // we finished draining the completions... unless the ring buffer was full and there are more secret completions in the kernel.217 192 if(likely(count < num)) break; 218 193 219 // the ring buffer was full, there could be more stuff in the kernel.220 194 ioring_syscsll( *ctx, 0, IORING_ENTER_GETEVENTS); 221 195 } … … 225 199 /* paranoid */ verify( ! __preemption_enabled() ); 226 200 227 // everything is drained, we can release the lock 228 __atomic_unlock(&ctx->cq.try_lock); 229 230 // update the relaxed timestamp 201 __atomic_unlock(&ctx->cq.lock); 202 231 203 touch_tsc( cltr->sched.io.tscs, ctx->cq.id, ts_prev, ts_next, false ); 232 204 … … 234 206 } 235 207 236 // call from a processor to flush237 // contains all the bookkeeping a proc must do, not just the barebones flushing logic238 void __cfa_do_flush( io_context$ & ctx, bool kernel ) {239 /* paranoid */ verify( ! __preemption_enabled() );240 241 // flush any external requests242 ctx.sq.last_external = false; // clear the external bit, the arbiter will reset it if needed243 __ioarbiter_flush( ctx, kernel );244 245 // if submitting must be submitted, do the system call246 if(ctx.sq.to_submit != 0) {247 ioring_syscsll(ctx, 0, 0);248 }249 }250 251 // call from a processor to drain252 // contains all the bookkeeping a proc must do, not just the barebones draining logic253 208 bool __cfa_io_drain( struct processor * proc ) { 254 209 bool local = false; 255 210 bool remote = false; 256 211 257 // make sure no ones creates/destroys io contexts258 212 ready_schedule_lock(); 259 213 … … 263 217 /* paranoid */ verify( ctx ); 264 218 265 // Help if needed266 219 with(cltr->sched) { 267 220 const size_t ctxs_count = io.count; … … 277 230 const unsigned long long ctsc = rdtscl(); 278 231 279 // only help once every other time280 // pick a target when not helping281 232 if(proc->io.target == UINT_MAX) { 282 233 uint64_t chaos = __tls_rand(); 283 // choose who to help and whether to accept helping far processors284 234 unsigned ext = chaos & 0xff; 285 235 unsigned other = (chaos >> 8) % (ctxs_count); 286 236 287 // if the processor is on the same cache line or is lucky ( 3 out of 256 odds ) help it288 237 if(ext < 3 || __atomic_load_n(&caches[other / __shard_factor.io].id, __ATOMIC_RELAXED) == this_cache) { 289 238 proc->io.target = other; … … 291 240 } 292 241 else { 293 // a target was picked last time, help it294 242 const unsigned target = proc->io.target; 295 243 /* paranoid */ verify( io.tscs[target].t.tv != ULLONG_MAX ); 296 // make sure the target hasn't stopped existing since last time297 244 HELP: if(target < ctxs_count) { 298 // calculate it's age and how young it could be before we give ip on helping299 245 const __readyQ_avg_t cutoff = calc_cutoff(ctsc, ctx->cq.id, ctxs_count, io.data, io.tscs, __shard_factor.io, false); 300 246 const __readyQ_avg_t age = moving_average(ctsc, io.tscs[target].t.tv, io.tscs[target].t.ma, false); 301 247 __cfadbg_print_safe(io, "Kernel I/O: Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, ctx->cq.id, age, cutoff, age > cutoff ? "yes" : "no"); 302 // is the target older than the cutoff, recall 0 is oldest and bigger ints are younger303 248 if(age <= cutoff) break HELP; 304 249 305 // attempt to help the submission side 306 __cfa_do_flush( *io.data[target], true ); 307 308 // attempt to help the completion side 309 if(!try_acquire(io.data[target])) break HELP; // already acquire no help needed 310 311 // actually help 250 if(!try_acquire(io.data[target])) break HELP; 251 312 252 if(!__cfa_do_drain( io.data[target], cltr )) break HELP; 313 253 314 // track we did help someone315 254 remote = true; 316 255 __STATS__( true, io.calls.helped++; ) 317 256 } 318 319 // reset the target320 257 proc->io.target = UINT_MAX; 321 258 } 322 259 } 260 323 261 324 262 // Drain the local queue … … 332 270 333 271 ready_schedule_unlock(); 334 335 // return true if some completion entry, local or remote, was drained336 272 return local || remote; 337 273 } 338 274 339 340 341 // call from a processor to flush342 // contains all the bookkeeping a proc must do, not just the barebones flushing logic343 275 bool __cfa_io_flush( struct processor * proc ) { 344 276 /* paranoid */ verify( ! __preemption_enabled() ); … … 346 278 /* paranoid */ verify( proc->io.ctx ); 347 279 348 __cfa_do_flush( *proc->io.ctx, false ); 349 350 // also drain since some stuff will immediately complete 280 io_context$ & ctx = *proc->io.ctx; 281 282 __ioarbiter_flush( ctx ); 283 284 if(ctx.sq.to_submit != 0) { 285 ioring_syscsll(ctx, 0, 0); 286 287 } 288 351 289 return __cfa_io_drain( proc ); 352 290 } … … 455 393 //============================================================================================= 456 394 // submission 457 // barebones logic to submit a group of sqes 458 static inline void __submit_only( struct io_context$ * ctx, __u32 idxs[], __u32 have, bool lock) { 459 if(!lock) 460 lock( ctx->ext_sq.lock __cfaabi_dbg_ctx2 ); 395 static inline void __submit_only( struct io_context$ * ctx, __u32 idxs[], __u32 have) { 461 396 // We can proceed to the fast path 462 397 // Get the right objects … … 473 408 // Make the sqes visible to the submitter 474 409 __atomic_store_n(sq.kring.tail, tail + have, __ATOMIC_RELEASE); 475 __atomic_fetch_add(&sq.to_submit, have, __ATOMIC_SEQ_CST); 476 477 // set the bit to mark things need to be flushed 410 sq.to_submit += have; 411 478 412 __atomic_store_n(&ctx->proc->io.pending, true, __ATOMIC_RELAXED); 479 413 __atomic_store_n(&ctx->proc->io.dirty , true, __ATOMIC_RELAXED); 480 481 if(!lock) 482 unlock( ctx->ext_sq.lock ); 483 } 484 485 // submission logic + maybe flushing 414 } 415 486 416 static inline void __submit( struct io_context$ * ctx, __u32 idxs[], __u32 have, bool lazy) { 487 417 __sub_ring_t & sq = ctx->sq; 488 __submit_only(ctx, idxs, have , false);418 __submit_only(ctx, idxs, have); 489 419 490 420 if(sq.to_submit > 30) { … … 498 428 } 499 429 500 // call from a processor to flush501 // might require arbitration if the thread was migrated after the allocation502 430 void cfa_io_submit( struct io_context$ * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) libcfa_public { 503 431 // __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager"); … … 513 441 if( ctx == inctx ) // We have the right instance? 514 442 { 515 // yes! fast submit516 443 __submit(ctx, idxs, have, lazy); 517 444 … … 580 507 __atomic_store_n(&ctx.sq.free_ring.tail, ftail + count, __ATOMIC_SEQ_CST); 581 508 582 // notify the allocator that new allocations can be made583 509 __ioarbiter_notify(ctx); 584 510 … … 631 557 } 632 558 633 // notify the arbiter that new allocations are available634 559 static void __ioarbiter_notify( io_arbiter$ & this, io_context$ * ctx ) { 635 560 /* paranoid */ verify( !empty(this.pending.queue) ); 636 /* paranoid */ verify( __preemption_enabled() ); 637 638 // mutual exclusion is needed 561 639 562 lock( this.pending.lock __cfaabi_dbg_ctx2 ); 640 563 { 641 __cfadbg_print_safe(io, "Kernel I/O : notifying\n");642 643 // as long as there are pending allocations try to satisfy them644 // for simplicity do it in FIFO order645 564 while( !empty(this.pending.queue) ) { 646 // get first pending allocs565 __cfadbg_print_safe(io, "Kernel I/O : notifying\n"); 647 566 __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head; 648 567 __pending_alloc & pa = (__pending_alloc&)head( this.pending.queue ); 649 568 650 // check if we have enough to satisfy the request651 569 if( have > pa.want ) goto DONE; 652 653 // if there are enough allocations it means we can drop the request654 570 drop( this.pending.queue ); 655 571 656 572 /* paranoid */__attribute__((unused)) bool ret = 657 573 658 // actually do the alloc659 574 __alloc(ctx, pa.idxs, pa.want); 660 575 661 576 /* paranoid */ verify( ret ); 662 577 663 // write out which context statisfied the request and post664 // this665 578 pa.ctx = ctx; 579 666 580 post( pa.waitctx ); 667 581 } … … 671 585 } 672 586 unlock( this.pending.lock ); 673 674 /* paranoid */ verify( __preemption_enabled() ); 675 } 676 677 // short hand to avoid the mutual exclusion of the pending is empty regardless 587 } 588 678 589 static void __ioarbiter_notify( io_context$ & ctx ) { 679 if(empty( ctx.arbiter->pending )) return; 680 __ioarbiter_notify( *ctx.arbiter, &ctx ); 681 } 682 683 // Submit from outside the local processor: append to the outstanding list 590 if(!empty( ctx.arbiter->pending )) { 591 __ioarbiter_notify( *ctx.arbiter, &ctx ); 592 } 593 } 594 595 // Simply append to the pending 684 596 static void __ioarbiter_submit( io_context$ * ctx, __u32 idxs[], __u32 have, bool lazy ) { 685 597 __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd); … … 687 599 __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have); 688 600 689 // create the intrusive object to append690 601 __external_io ei; 691 602 ei.idxs = idxs; … … 693 604 ei.lazy = lazy; 694 605 695 // enqueue the io696 606 bool we = enqueue(ctx->ext_sq, (__outstanding_io&)ei); 697 607 698 // mark pending699 608 __atomic_store_n(&ctx->proc->io.pending, true, __ATOMIC_SEQ_CST); 700 609 701 // if this is the first to be enqueued, signal the processor in an attempt to speed up flushing702 // if it's not the first enqueue, a signal is already in transit703 610 if( we ) { 704 611 sigval_t value = { PREEMPT_IO }; 705 612 __cfaabi_pthread_sigqueue(ctx->proc->kernel_thread, SIGUSR1, value); 706 __STATS__( false, io.flush.signal += 1; ) 707 } 708 __STATS__( false, io.submit.extr += 1; ) 709 710 // to avoid dynamic allocation/memory reclamation headaches, wait for it to have been submitted 613 } 614 711 615 wait( ei.waitctx ); 712 616 … … 714 618 } 715 619 716 // flush the io arbiter: move all external io operations to the submission ring 717 static void __ioarbiter_flush( io_context$ & ctx, bool kernel ) { 718 // if there are no external operations just return 719 if(empty( ctx.ext_sq )) return; 720 721 // stats and logs 722 __STATS__( false, io.flush.external += 1; ) 723 __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n"); 724 725 // this can happen from multiple processors, mutual exclusion is needed 726 lock( ctx.ext_sq.lock __cfaabi_dbg_ctx2 ); 727 { 728 // pop each operation one at a time. 729 // There is no wait morphing because of the io sq ring 730 while( !empty(ctx.ext_sq.queue) ) { 731 // drop the element from the queue 732 __external_io & ei = (__external_io&)drop( ctx.ext_sq.queue ); 733 734 // submit it 735 __submit_only(&ctx, ei.idxs, ei.have, true); 736 737 // wake the thread that was waiting on it 738 // since this can both be called from kernel and user, check the flag before posting 739 __post( ei.waitctx, kernel, UNPARK_LOCAL ); 620 static void __ioarbiter_flush( io_context$ & ctx ) { 621 if(!empty( ctx.ext_sq )) { 622 __STATS__( false, io.flush.external += 1; ) 623 624 __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n"); 625 626 lock( ctx.ext_sq.lock __cfaabi_dbg_ctx2 ); 627 { 628 while( !empty(ctx.ext_sq.queue) ) { 629 __external_io & ei = (__external_io&)drop( ctx.ext_sq.queue ); 630 631 __submit_only(&ctx, ei.idxs, ei.have); 632 633 post( ei.waitctx ); 634 } 635 636 ctx.ext_sq.empty = true; 740 637 } 741 742 // mark the queue as empty 743 ctx.ext_sq.empty = true; 744 ctx.sq.last_external = true; 745 } 746 unlock(ctx.ext_sq.lock ); 747 } 748 749 extern "C" { 750 // debug functions used for gdb 751 // io_uring doesn't yet support gdb soe the kernel-shared data structures aren't viewable in gdb 752 // these functions read the data that gdb can't and should be removed once the support is added 753 static __u32 __cfagdb_cq_head( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->cq.head; } 754 static __u32 __cfagdb_cq_tail( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->cq.tail; } 755 static __u32 __cfagdb_cq_mask( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->cq.mask; } 756 static __u32 __cfagdb_sq_head( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->sq.kring.head; } 757 static __u32 __cfagdb_sq_tail( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->sq.kring.tail; } 758 static __u32 __cfagdb_sq_mask( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->sq.mask; } 759 760 // fancier version that reads an sqe and copies it out. 761 static struct io_uring_sqe __cfagdb_sq_at( io_context$ * ctx, __u32 at ) __attribute__((nonnull(1),used,noinline)) { 762 __u32 ax = at & *ctx->sq.mask; 763 __u32 ix = ctx->sq.kring.array[ax]; 764 return ctx->sq.sqes[ix]; 638 unlock(ctx.ext_sq.lock ); 765 639 } 766 640 }
Note: See TracChangeset
for help on using the changeset viewer.