Changes in libcfa/src/concurrency/io.cfa [1afd9ccb:a757ba1]
- File:
-
- 1 edited
-
libcfa/src/concurrency/io.cfa (modified) (28 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r1afd9ccb ra757ba1 15 15 16 16 #define __cforall_thread__ 17 #define _GNU_SOURCE 17 18 18 19 #if defined(__CFA_DEBUG__) … … 84 85 static io_context$ * __ioarbiter_allocate( io_arbiter$ & this, __u32 idxs[], __u32 want ); 85 86 static void __ioarbiter_submit( io_context$ * , __u32 idxs[], __u32 have, bool lazy ); 86 static void __ioarbiter_flush ( io_context$ & , bool kernel);87 static void __ioarbiter_flush ( io_context$ & ); 87 88 static inline void __ioarbiter_notify( io_context$ & ctx ); 88 89 //============================================================================================= … … 93 94 extern void __kernel_unpark( thread$ * thrd, unpark_hint ); 94 95 95 static inline void __post(oneshot & this, bool kernel, unpark_hint hint) {96 thread$ * t = post( this, false );97 if(kernel) __kernel_unpark( t, hint );98 else unpark( t, hint );99 }100 101 // actual system call of io uring102 // wrap so everything that needs to happen around it is always done103 // i.e., stats, book keeping, sqe reclamation, etc.104 96 static void ioring_syscsll( struct io_context$ & ctx, unsigned int min_comp, unsigned int flags ) { 105 97 __STATS__( true, io.calls.flush++; ) 106 98 int ret; 107 99 for() { 108 // do the system call in a loop, repeat on interrupts109 100 ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, flags, (sigset_t *)0p, _NSIG / 8); 110 101 if( ret < 0 ) { … … 129 120 /* paranoid */ verify( ctx.sq.to_submit >= ret ); 130 121 131 // keep track of how many still need submitting 132 __atomic_fetch_sub(&ctx.sq.to_submit, ret, __ATOMIC_SEQ_CST); 122 ctx.sq.to_submit -= ret; 133 123 134 124 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); … … 139 129 /* paranoid */ verify( ! __preemption_enabled() ); 140 130 141 // mark that there is no pending io left142 131 __atomic_store_n(&ctx.proc->io.pending, false, __ATOMIC_RELAXED); 143 132 } 144 133 145 // try to acquire an io context for draining, helping means we never *need* to drain, we can always do it later146 134 static bool try_acquire( io_context$ * ctx ) __attribute__((nonnull(1))) { 147 135 /* paranoid */ verify( ! __preemption_enabled() ); … … 150 138 151 139 { 152 // if there is nothing to drain there is no point in acquiring anything153 140 const __u32 head = *ctx->cq.head; 154 141 const __u32 tail = *ctx->cq.tail; … … 157 144 } 158 145 159 // try a simple spinlock acquire, it's likely there are completions to drain 160 if(!__atomic_try_acquire(&ctx->cq.try_lock)) { 161 // some other processor already has it 146 // Drain the queue 147 if(!__atomic_try_acquire(&ctx->cq.lock)) { 162 148 __STATS__( false, io.calls.locked++; ) 163 149 return false; 164 150 } 165 151 166 // acquired!!167 152 return true; 168 153 } 169 154 170 // actually drain the completion171 155 static bool __cfa_do_drain( io_context$ * ctx, cluster * cltr ) __attribute__((nonnull(1, 2))) { 172 156 /* paranoid */ verify( ! __preemption_enabled() ); 173 157 /* paranoid */ verify( ready_schedule_islocked() ); 174 /* paranoid */ verify( ctx->cq.try_lock == true ); 175 176 // get all the invariants and initial state 158 /* paranoid */ verify( ctx->cq.lock == true ); 159 177 160 const __u32 mask = *ctx->cq.mask; 178 161 const __u32 num = *ctx->cq.num; … … 183 166 for() { 184 167 // re-read the head and tail in case it already changed. 185 // count the difference between the two186 168 const __u32 head = *ctx->cq.head; 187 169 const __u32 tail = *ctx->cq.tail; … … 189 171 __STATS__( false, io.calls.drain++; io.calls.completed += count; ) 190 172 191 // for everything between head and tail, drain it192 173 for(i; count) { 193 174 unsigned idx = (head + i) & mask; … … 196 177 /* paranoid */ verify(&cqe); 197 178 198 // find the future in the completion199 179 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 200 180 // __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 201 181 202 // don't directly fulfill the future, preemption is disabled so we need to use kernel_unpark203 182 __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL ); 204 183 } 205 184 206 // update the timestamps accordingly207 // keep a local copy so we can update the relaxed copy208 185 ts_next = ctx->cq.ts = rdtscl(); 209 186 … … 213 190 ctx->proc->idle_wctx.drain_time = ts_next; 214 191 215 // we finished draining the completions... unless the ring buffer was full and there are more secret completions in the kernel.216 192 if(likely(count < num)) break; 217 193 218 // the ring buffer was full, there could be more stuff in the kernel.219 194 ioring_syscsll( *ctx, 0, IORING_ENTER_GETEVENTS); 220 195 } … … 224 199 /* paranoid */ verify( ! __preemption_enabled() ); 225 200 226 // everything is drained, we can release the lock 227 __atomic_unlock(&ctx->cq.try_lock); 228 229 // update the relaxed timestamp 201 __atomic_unlock(&ctx->cq.lock); 202 230 203 touch_tsc( cltr->sched.io.tscs, ctx->cq.id, ts_prev, ts_next, false ); 231 204 … … 233 206 } 234 207 235 // call from a processor to flush236 // contains all the bookkeeping a proc must do, not just the barebones flushing logic237 void __cfa_do_flush( io_context$ & ctx, bool kernel ) {238 /* paranoid */ verify( ! __preemption_enabled() );239 240 // flush any external requests241 ctx.sq.last_external = false; // clear the external bit, the arbiter will reset it if needed242 __ioarbiter_flush( ctx, kernel );243 244 // if submitting must be submitted, do the system call245 if(ctx.sq.to_submit != 0) {246 ioring_syscsll(ctx, 0, 0);247 }248 }249 250 // call from a processor to drain251 // contains all the bookkeeping a proc must do, not just the barebones draining logic252 208 bool __cfa_io_drain( struct processor * proc ) { 253 209 bool local = false; 254 210 bool remote = false; 255 211 256 // make sure no ones creates/destroys io contexts257 212 ready_schedule_lock(); 258 213 … … 262 217 /* paranoid */ verify( ctx ); 263 218 264 // Help if needed265 219 with(cltr->sched) { 266 220 const size_t ctxs_count = io.count; … … 276 230 const unsigned long long ctsc = rdtscl(); 277 231 278 // only help once every other time279 // pick a target when not helping280 232 if(proc->io.target == UINT_MAX) { 281 233 uint64_t chaos = __tls_rand(); 282 // choose who to help and whether to accept helping far processors283 234 unsigned ext = chaos & 0xff; 284 235 unsigned other = (chaos >> 8) % (ctxs_count); 285 236 286 // if the processor is on the same cache line or is lucky ( 3 out of 256 odds ) help it287 237 if(ext < 3 || __atomic_load_n(&caches[other / __shard_factor.io].id, __ATOMIC_RELAXED) == this_cache) { 288 238 proc->io.target = other; … … 290 240 } 291 241 else { 292 // a target was picked last time, help it293 242 const unsigned target = proc->io.target; 294 243 /* paranoid */ verify( io.tscs[target].t.tv != ULLONG_MAX ); 295 // make sure the target hasn't stopped existing since last time296 244 HELP: if(target < ctxs_count) { 297 // calculate it's age and how young it could be before we give up on helping298 245 const __readyQ_avg_t cutoff = calc_cutoff(ctsc, ctx->cq.id, ctxs_count, io.data, io.tscs, __shard_factor.io, false); 299 246 const __readyQ_avg_t age = moving_average(ctsc, io.tscs[target].t.tv, io.tscs[target].t.ma, false); 300 247 __cfadbg_print_safe(io, "Kernel I/O: Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, ctx->cq.id, age, cutoff, age > cutoff ? "yes" : "no"); 301 // is the target older than the cutoff, recall 0 is oldest and bigger ints are younger302 248 if(age <= cutoff) break HELP; 303 249 304 // attempt to help the submission side 305 __cfa_do_flush( *io.data[target], true ); 306 307 // attempt to help the completion side 308 if(!try_acquire(io.data[target])) break HELP; // already acquire no help needed 309 310 // actually help 250 if(!try_acquire(io.data[target])) break HELP; 251 311 252 if(!__cfa_do_drain( io.data[target], cltr )) break HELP; 312 253 313 // track we did help someone314 254 remote = true; 315 255 __STATS__( true, io.calls.helped++; ) 316 256 } 317 318 // reset the target319 257 proc->io.target = UINT_MAX; 320 258 } 321 259 } 260 322 261 323 262 // Drain the local queue … … 331 270 332 271 ready_schedule_unlock(); 333 334 // return true if some completion entry, local or remote, was drained335 272 return local || remote; 336 273 } 337 274 338 339 340 // call from a processor to flush341 // contains all the bookkeeping a proc must do, not just the barebones flushing logic342 275 bool __cfa_io_flush( struct processor * proc ) { 343 276 /* paranoid */ verify( ! __preemption_enabled() ); … … 345 278 /* paranoid */ verify( proc->io.ctx ); 346 279 347 __cfa_do_flush( *proc->io.ctx, false ); 348 349 // also drain since some stuff will immediately complete 280 io_context$ & ctx = *proc->io.ctx; 281 282 __ioarbiter_flush( ctx ); 283 284 if(ctx.sq.to_submit != 0) { 285 ioring_syscsll(ctx, 0, 0); 286 287 } 288 350 289 return __cfa_io_drain( proc ); 351 290 } … … 454 393 //============================================================================================= 455 394 // submission 456 // barebones logic to submit a group of sqes 457 static inline void __submit_only( struct io_context$ * ctx, __u32 idxs[], __u32 have, bool lock) { 458 if(!lock) 459 lock( ctx->ext_sq.lock __cfaabi_dbg_ctx2 ); 395 static inline void __submit_only( struct io_context$ * ctx, __u32 idxs[], __u32 have) { 460 396 // We can proceed to the fast path 461 397 // Get the right objects … … 472 408 // Make the sqes visible to the submitter 473 409 __atomic_store_n(sq.kring.tail, tail + have, __ATOMIC_RELEASE); 474 __atomic_fetch_add(&sq.to_submit, have, __ATOMIC_SEQ_CST); 475 476 // set the bit to mark things need to be flushed 410 sq.to_submit += have; 411 477 412 __atomic_store_n(&ctx->proc->io.pending, true, __ATOMIC_RELAXED); 478 413 __atomic_store_n(&ctx->proc->io.dirty , true, __ATOMIC_RELAXED); 479 480 if(!lock) 481 unlock( ctx->ext_sq.lock ); 482 } 483 484 // submission logic + maybe flushing 414 } 415 485 416 static inline void __submit( struct io_context$ * ctx, __u32 idxs[], __u32 have, bool lazy) { 486 417 __sub_ring_t & sq = ctx->sq; 487 __submit_only(ctx, idxs, have , false);418 __submit_only(ctx, idxs, have); 488 419 489 420 if(sq.to_submit > 30) { … … 497 428 } 498 429 499 // call from a processor to flush500 // might require arbitration if the thread was migrated after the allocation501 430 void cfa_io_submit( struct io_context$ * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) libcfa_public { 502 431 // __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager"); … … 512 441 if( ctx == inctx ) // We have the right instance? 513 442 { 514 // yes! fast submit515 443 __submit(ctx, idxs, have, lazy); 516 444 … … 579 507 __atomic_store_n(&ctx.sq.free_ring.tail, ftail + count, __ATOMIC_SEQ_CST); 580 508 581 // notify the allocator that new allocations can be made582 509 __ioarbiter_notify(ctx); 583 510 … … 630 557 } 631 558 632 // notify the arbiter that new allocations are available633 559 static void __ioarbiter_notify( io_arbiter$ & this, io_context$ * ctx ) { 634 560 /* paranoid */ verify( !empty(this.pending.queue) ); 635 /* paranoid */ verify( __preemption_enabled() ); 636 637 // mutual exclusion is needed 561 638 562 lock( this.pending.lock __cfaabi_dbg_ctx2 ); 639 563 { 640 __cfadbg_print_safe(io, "Kernel I/O : notifying\n");641 642 // as long as there are pending allocations try to satisfy them643 // for simplicity do it in FIFO order644 564 while( !empty(this.pending.queue) ) { 645 // get first pending allocs565 __cfadbg_print_safe(io, "Kernel I/O : notifying\n"); 646 566 __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head; 647 567 __pending_alloc & pa = (__pending_alloc&)head( this.pending.queue ); 648 568 649 // check if we have enough to satisfy the request650 569 if( have > pa.want ) goto DONE; 651 652 // if there are enough allocations it means we can drop the request653 570 drop( this.pending.queue ); 654 571 655 572 /* paranoid */__attribute__((unused)) bool ret = 656 573 657 // actually do the alloc658 574 __alloc(ctx, pa.idxs, pa.want); 659 575 660 576 /* paranoid */ verify( ret ); 661 577 662 // write out which context statisfied the request and post663 // this664 578 pa.ctx = ctx; 579 665 580 post( pa.waitctx ); 666 581 } … … 670 585 } 671 586 unlock( this.pending.lock ); 672 673 /* paranoid */ verify( __preemption_enabled() ); 674 } 675 676 // short hand to avoid the mutual exclusion of the pending is empty regardless 587 } 588 677 589 static void __ioarbiter_notify( io_context$ & ctx ) { 678 if(empty( ctx.arbiter->pending )) return; 679 __ioarbiter_notify( *ctx.arbiter, &ctx ); 680 } 681 682 // Submit from outside the local processor: append to the outstanding list 590 if(!empty( ctx.arbiter->pending )) { 591 __ioarbiter_notify( *ctx.arbiter, &ctx ); 592 } 593 } 594 595 // Simply append to the pending 683 596 static void __ioarbiter_submit( io_context$ * ctx, __u32 idxs[], __u32 have, bool lazy ) { 684 597 __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd); … … 686 599 __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have); 687 600 688 // create the intrusive object to append689 601 __external_io ei; 690 602 ei.idxs = idxs; … … 692 604 ei.lazy = lazy; 693 605 694 // enqueue the io695 606 bool we = enqueue(ctx->ext_sq, (__outstanding_io&)ei); 696 607 697 // mark pending698 608 __atomic_store_n(&ctx->proc->io.pending, true, __ATOMIC_SEQ_CST); 699 609 700 // if this is the first to be enqueued, signal the processor in an attempt to speed up flushing701 // if it's not the first enqueue, a signal is already in transit702 610 if( we ) { 703 611 sigval_t value = { PREEMPT_IO }; 704 612 __cfaabi_pthread_sigqueue(ctx->proc->kernel_thread, SIGUSR1, value); 705 __STATS__( false, io.flush.signal += 1; ) 706 } 707 __STATS__( false, io.submit.extr += 1; ) 708 709 // to avoid dynamic allocation/memory reclamation headaches, wait for it to have been submitted 613 } 614 710 615 wait( ei.waitctx ); 711 616 … … 713 618 } 714 619 715 // flush the io arbiter: move all external io operations to the submission ring 716 static void __ioarbiter_flush( io_context$ & ctx, bool kernel ) { 717 // if there are no external operations just return 718 if(empty( ctx.ext_sq )) return; 719 720 // stats and logs 721 __STATS__( false, io.flush.external += 1; ) 722 __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n"); 723 724 // this can happen from multiple processors, mutual exclusion is needed 725 lock( ctx.ext_sq.lock __cfaabi_dbg_ctx2 ); 726 { 727 // pop each operation one at a time. 728 // There is no wait morphing because of the io sq ring 729 while( !empty(ctx.ext_sq.queue) ) { 730 // drop the element from the queue 731 __external_io & ei = (__external_io&)drop( ctx.ext_sq.queue ); 732 733 // submit it 734 __submit_only(&ctx, ei.idxs, ei.have, true); 735 736 // wake the thread that was waiting on it 737 // since this can both be called from kernel and user, check the flag before posting 738 __post( ei.waitctx, kernel, UNPARK_LOCAL ); 620 static void __ioarbiter_flush( io_context$ & ctx ) { 621 if(!empty( ctx.ext_sq )) { 622 __STATS__( false, io.flush.external += 1; ) 623 624 __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n"); 625 626 lock( ctx.ext_sq.lock __cfaabi_dbg_ctx2 ); 627 { 628 while( !empty(ctx.ext_sq.queue) ) { 629 __external_io & ei = (__external_io&)drop( ctx.ext_sq.queue ); 630 631 __submit_only(&ctx, ei.idxs, ei.have); 632 633 post( ei.waitctx ); 634 } 635 636 ctx.ext_sq.empty = true; 739 637 } 740 741 // mark the queue as empty 742 ctx.ext_sq.empty = true; 743 ctx.sq.last_external = true; 744 } 745 unlock(ctx.ext_sq.lock ); 746 } 747 748 extern "C" { 749 // debug functions used for gdb 750 // io_uring doesn't yet support gdb soe the kernel-shared data structures aren't viewable in gdb 751 // these functions read the data that gdb can't and should be removed once the support is added 752 static __u32 __cfagdb_cq_head( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->cq.head; } 753 static __u32 __cfagdb_cq_tail( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->cq.tail; } 754 static __u32 __cfagdb_cq_mask( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->cq.mask; } 755 static __u32 __cfagdb_sq_head( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->sq.kring.head; } 756 static __u32 __cfagdb_sq_tail( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->sq.kring.tail; } 757 static __u32 __cfagdb_sq_mask( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->sq.mask; } 758 759 // fancier version that reads an sqe and copies it out. 760 static struct io_uring_sqe __cfagdb_sq_at( io_context$ * ctx, __u32 at ) __attribute__((nonnull(1),used,noinline)) { 761 __u32 ax = at & *ctx->sq.mask; 762 __u32 ix = ctx->sq.kring.array[ax]; 763 return ctx->sq.sqes[ix]; 638 unlock(ctx.ext_sq.lock ); 764 639 } 765 640 }
Note:
See TracChangeset
for help on using the changeset viewer.