Changeset 24d6572 for libcfa/src/concurrency/io.cfa
- Timestamp:
- Jun 12, 2023, 2:45:32 PM (2 years ago)
- Branches:
- ast-experimental, master
- Children:
- 62d62db
- Parents:
- 34b4268 (diff), 251ce80 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r34b4268 r24d6572 15 15 16 16 #define __cforall_thread__ 17 #define _GNU_SOURCE18 17 19 18 #if defined(__CFA_DEBUG__) … … 85 84 static io_context$ * __ioarbiter_allocate( io_arbiter$ & this, __u32 idxs[], __u32 want ); 86 85 static void __ioarbiter_submit( io_context$ * , __u32 idxs[], __u32 have, bool lazy ); 87 static void __ioarbiter_flush ( io_context$ & );86 static void __ioarbiter_flush ( io_context$ &, bool kernel ); 88 87 static inline void __ioarbiter_notify( io_context$ & ctx ); 89 88 //============================================================================================= … … 94 93 extern void __kernel_unpark( thread$ * thrd, unpark_hint ); 95 94 95 static inline void __post(oneshot & this, bool kernel, unpark_hint hint) { 96 thread$ * t = post( this, false ); 97 if(kernel) __kernel_unpark( t, hint ); 98 else unpark( t, hint ); 99 } 100 101 // actual system call of io uring 102 // wrap so everything that needs to happen around it is always done 103 // i.e., stats, book keeping, sqe reclamation, etc. 96 104 static void ioring_syscsll( struct io_context$ & ctx, unsigned int min_comp, unsigned int flags ) { 97 105 __STATS__( true, io.calls.flush++; ) 98 106 int ret; 99 107 for() { 108 // do the system call in a loop, repeat on interrupts 100 109 ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, flags, (sigset_t *)0p, _NSIG / 8); 101 110 if( ret < 0 ) { … … 120 129 /* paranoid */ verify( ctx.sq.to_submit >= ret ); 121 130 122 ctx.sq.to_submit -= ret; 131 // keep track of how many still need submitting 132 __atomic_fetch_sub(&ctx.sq.to_submit, ret, __ATOMIC_SEQ_CST); 123 133 124 134 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); … … 129 139 /* paranoid */ verify( ! __preemption_enabled() ); 130 140 141 // mark that there is no pending io left 131 142 __atomic_store_n(&ctx.proc->io.pending, false, __ATOMIC_RELAXED); 132 143 } 133 144 145 // try to acquire an io context for draining, helping means we never *need* to drain, we can always do it later 134 146 static bool try_acquire( io_context$ * ctx ) __attribute__((nonnull(1))) { 135 147 /* paranoid */ verify( ! __preemption_enabled() ); … … 138 150 139 151 { 152 // if there is nothing to drain there is no point in acquiring anything 140 153 const __u32 head = *ctx->cq.head; 141 154 const __u32 tail = *ctx->cq.tail; … … 144 157 } 145 158 146 // Drain the queue 147 if(!__atomic_try_acquire(&ctx->cq.lock)) { 159 // try a simple spinlock acquire, it's likely there are completions to drain 160 if(!__atomic_try_acquire(&ctx->cq.try_lock)) { 161 // some other processor already has it 148 162 __STATS__( false, io.calls.locked++; ) 149 163 return false; 150 164 } 151 165 166 // acquired!! 152 167 return true; 153 168 } 154 169 170 // actually drain the completion 155 171 static bool __cfa_do_drain( io_context$ * ctx, cluster * cltr ) __attribute__((nonnull(1, 2))) { 156 172 /* paranoid */ verify( ! __preemption_enabled() ); 157 173 /* paranoid */ verify( ready_schedule_islocked() ); 158 /* paranoid */ verify( ctx->cq.lock == true ); 159 174 /* paranoid */ verify( ctx->cq.try_lock == true ); 175 176 // get all the invariants and initial state 160 177 const __u32 mask = *ctx->cq.mask; 161 178 const __u32 num = *ctx->cq.num; … … 166 183 for() { 167 184 // re-read the head and tail in case it already changed. 185 // count the difference between the two 168 186 const __u32 head = *ctx->cq.head; 169 187 const __u32 tail = *ctx->cq.tail; … … 171 189 __STATS__( false, io.calls.drain++; io.calls.completed += count; ) 172 190 191 // for everything between head and tail, drain it 173 192 for(i; count) { 174 193 unsigned idx = (head + i) & mask; … … 177 196 /* paranoid */ verify(&cqe); 178 197 198 // find the future in the completion 179 199 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 180 200 // __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 181 201 202 // don't directly fulfill the future, preemption is disabled so we need to use kernel_unpark 182 203 __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL ); 183 204 } 184 205 206 // update the timestamps accordingly 207 // keep a local copy so we can update the relaxed copy 185 208 ts_next = ctx->cq.ts = rdtscl(); 186 209 … … 190 213 ctx->proc->idle_wctx.drain_time = ts_next; 191 214 215 // we finished draining the completions... unless the ring buffer was full and there are more secret completions in the kernel. 192 216 if(likely(count < num)) break; 193 217 218 // the ring buffer was full, there could be more stuff in the kernel. 194 219 ioring_syscsll( *ctx, 0, IORING_ENTER_GETEVENTS); 195 220 } … … 199 224 /* paranoid */ verify( ! __preemption_enabled() ); 200 225 201 __atomic_unlock(&ctx->cq.lock); 202 226 // everything is drained, we can release the lock 227 __atomic_unlock(&ctx->cq.try_lock); 228 229 // update the relaxed timestamp 203 230 touch_tsc( cltr->sched.io.tscs, ctx->cq.id, ts_prev, ts_next, false ); 204 231 … … 206 233 } 207 234 235 // call from a processor to flush 236 // contains all the bookkeeping a proc must do, not just the barebones flushing logic 237 void __cfa_do_flush( io_context$ & ctx, bool kernel ) { 238 /* paranoid */ verify( ! __preemption_enabled() ); 239 240 // flush any external requests 241 ctx.sq.last_external = false; // clear the external bit, the arbiter will reset it if needed 242 __ioarbiter_flush( ctx, kernel ); 243 244 // if submitting must be submitted, do the system call 245 if(ctx.sq.to_submit != 0) { 246 ioring_syscsll(ctx, 0, 0); 247 } 248 } 249 250 // call from a processor to drain 251 // contains all the bookkeeping a proc must do, not just the barebones draining logic 208 252 bool __cfa_io_drain( struct processor * proc ) { 209 253 bool local = false; 210 254 bool remote = false; 211 255 256 // make sure no ones creates/destroys io contexts 212 257 ready_schedule_lock(); 213 258 … … 217 262 /* paranoid */ verify( ctx ); 218 263 264 // Help if needed 219 265 with(cltr->sched) { 220 266 const size_t ctxs_count = io.count; … … 230 276 const unsigned long long ctsc = rdtscl(); 231 277 278 // only help once every other time 279 // pick a target when not helping 232 280 if(proc->io.target == UINT_MAX) { 233 281 uint64_t chaos = __tls_rand(); 282 // choose who to help and whether to accept helping far processors 234 283 unsigned ext = chaos & 0xff; 235 284 unsigned other = (chaos >> 8) % (ctxs_count); 236 285 286 // if the processor is on the same cache line or is lucky ( 3 out of 256 odds ) help it 237 287 if(ext < 3 || __atomic_load_n(&caches[other / __shard_factor.io].id, __ATOMIC_RELAXED) == this_cache) { 238 288 proc->io.target = other; … … 240 290 } 241 291 else { 292 // a target was picked last time, help it 242 293 const unsigned target = proc->io.target; 243 294 /* paranoid */ verify( io.tscs[target].t.tv != ULLONG_MAX ); 295 // make sure the target hasn't stopped existing since last time 244 296 HELP: if(target < ctxs_count) { 297 // calculate it's age and how young it could be before we give up on helping 245 298 const __readyQ_avg_t cutoff = calc_cutoff(ctsc, ctx->cq.id, ctxs_count, io.data, io.tscs, __shard_factor.io, false); 246 299 const __readyQ_avg_t age = moving_average(ctsc, io.tscs[target].t.tv, io.tscs[target].t.ma, false); 247 300 __cfadbg_print_safe(io, "Kernel I/O: Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, ctx->cq.id, age, cutoff, age > cutoff ? "yes" : "no"); 301 // is the target older than the cutoff, recall 0 is oldest and bigger ints are younger 248 302 if(age <= cutoff) break HELP; 249 303 250 if(!try_acquire(io.data[target])) break HELP; 251 304 // attempt to help the submission side 305 __cfa_do_flush( *io.data[target], true ); 306 307 // attempt to help the completion side 308 if(!try_acquire(io.data[target])) break HELP; // already acquire no help needed 309 310 // actually help 252 311 if(!__cfa_do_drain( io.data[target], cltr )) break HELP; 253 312 313 // track we did help someone 254 314 remote = true; 255 315 __STATS__( true, io.calls.helped++; ) 256 316 } 317 318 // reset the target 257 319 proc->io.target = UINT_MAX; 258 320 } 259 321 } 260 261 322 262 323 // Drain the local queue … … 270 331 271 332 ready_schedule_unlock(); 333 334 // return true if some completion entry, local or remote, was drained 272 335 return local || remote; 273 336 } 274 337 338 339 340 // call from a processor to flush 341 // contains all the bookkeeping a proc must do, not just the barebones flushing logic 275 342 bool __cfa_io_flush( struct processor * proc ) { 276 343 /* paranoid */ verify( ! __preemption_enabled() ); … … 278 345 /* paranoid */ verify( proc->io.ctx ); 279 346 280 io_context$ & ctx = *proc->io.ctx; 281 282 __ioarbiter_flush( ctx ); 283 284 if(ctx.sq.to_submit != 0) { 285 ioring_syscsll(ctx, 0, 0); 286 287 } 288 347 __cfa_do_flush( *proc->io.ctx, false ); 348 349 // also drain since some stuff will immediately complete 289 350 return __cfa_io_drain( proc ); 290 351 } … … 393 454 //============================================================================================= 394 455 // submission 395 static inline void __submit_only( struct io_context$ * ctx, __u32 idxs[], __u32 have) { 456 // barebones logic to submit a group of sqes 457 static inline void __submit_only( struct io_context$ * ctx, __u32 idxs[], __u32 have, bool lock) { 458 if(!lock) 459 lock( ctx->ext_sq.lock __cfaabi_dbg_ctx2 ); 396 460 // We can proceed to the fast path 397 461 // Get the right objects … … 408 472 // Make the sqes visible to the submitter 409 473 __atomic_store_n(sq.kring.tail, tail + have, __ATOMIC_RELEASE); 410 sq.to_submit += have; 411 474 __atomic_fetch_add(&sq.to_submit, have, __ATOMIC_SEQ_CST); 475 476 // set the bit to mark things need to be flushed 412 477 __atomic_store_n(&ctx->proc->io.pending, true, __ATOMIC_RELAXED); 413 478 __atomic_store_n(&ctx->proc->io.dirty , true, __ATOMIC_RELAXED); 414 } 415 479 480 if(!lock) 481 unlock( ctx->ext_sq.lock ); 482 } 483 484 // submission logic + maybe flushing 416 485 static inline void __submit( struct io_context$ * ctx, __u32 idxs[], __u32 have, bool lazy) { 417 486 __sub_ring_t & sq = ctx->sq; 418 __submit_only(ctx, idxs, have );487 __submit_only(ctx, idxs, have, false); 419 488 420 489 if(sq.to_submit > 30) { … … 428 497 } 429 498 499 // call from a processor to flush 500 // might require arbitration if the thread was migrated after the allocation 430 501 void cfa_io_submit( struct io_context$ * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) libcfa_public { 431 502 // __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager"); … … 441 512 if( ctx == inctx ) // We have the right instance? 442 513 { 514 // yes! fast submit 443 515 __submit(ctx, idxs, have, lazy); 444 516 … … 507 579 __atomic_store_n(&ctx.sq.free_ring.tail, ftail + count, __ATOMIC_SEQ_CST); 508 580 581 // notify the allocator that new allocations can be made 509 582 __ioarbiter_notify(ctx); 510 583 … … 557 630 } 558 631 632 // notify the arbiter that new allocations are available 559 633 static void __ioarbiter_notify( io_arbiter$ & this, io_context$ * ctx ) { 560 634 /* paranoid */ verify( !empty(this.pending.queue) ); 561 635 /* paranoid */ verify( __preemption_enabled() ); 636 637 // mutual exclusion is needed 562 638 lock( this.pending.lock __cfaabi_dbg_ctx2 ); 563 639 { 640 __cfadbg_print_safe(io, "Kernel I/O : notifying\n"); 641 642 // as long as there are pending allocations try to satisfy them 643 // for simplicity do it in FIFO order 564 644 while( !empty(this.pending.queue) ) { 565 __cfadbg_print_safe(io, "Kernel I/O : notifying\n");645 // get first pending allocs 566 646 __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head; 567 647 __pending_alloc & pa = (__pending_alloc&)head( this.pending.queue ); 568 648 649 // check if we have enough to satisfy the request 569 650 if( have > pa.want ) goto DONE; 651 652 // if there are enough allocations it means we can drop the request 570 653 drop( this.pending.queue ); 571 654 572 655 /* paranoid */__attribute__((unused)) bool ret = 573 656 657 // actually do the alloc 574 658 __alloc(ctx, pa.idxs, pa.want); 575 659 576 660 /* paranoid */ verify( ret ); 577 661 662 // write out which context statisfied the request and post 663 // this 578 664 pa.ctx = ctx; 579 580 665 post( pa.waitctx ); 581 666 } … … 585 670 } 586 671 unlock( this.pending.lock ); 587 } 588 672 673 /* paranoid */ verify( __preemption_enabled() ); 674 } 675 676 // short hand to avoid the mutual exclusion of the pending is empty regardless 589 677 static void __ioarbiter_notify( io_context$ & ctx ) { 590 if(!empty( ctx.arbiter->pending )) { 591 __ioarbiter_notify( *ctx.arbiter, &ctx ); 592 } 593 } 594 595 // Simply append to the pending 678 if(empty( ctx.arbiter->pending )) return; 679 __ioarbiter_notify( *ctx.arbiter, &ctx ); 680 } 681 682 // Submit from outside the local processor: append to the outstanding list 596 683 static void __ioarbiter_submit( io_context$ * ctx, __u32 idxs[], __u32 have, bool lazy ) { 597 684 __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd); … … 599 686 __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have); 600 687 688 // create the intrusive object to append 601 689 __external_io ei; 602 690 ei.idxs = idxs; … … 604 692 ei.lazy = lazy; 605 693 694 // enqueue the io 606 695 bool we = enqueue(ctx->ext_sq, (__outstanding_io&)ei); 607 696 697 // mark pending 608 698 __atomic_store_n(&ctx->proc->io.pending, true, __ATOMIC_SEQ_CST); 609 699 700 // if this is the first to be enqueued, signal the processor in an attempt to speed up flushing 701 // if it's not the first enqueue, a signal is already in transit 610 702 if( we ) { 611 703 sigval_t value = { PREEMPT_IO }; 612 704 __cfaabi_pthread_sigqueue(ctx->proc->kernel_thread, SIGUSR1, value); 613 } 614 705 __STATS__( false, io.flush.signal += 1; ) 706 } 707 __STATS__( false, io.submit.extr += 1; ) 708 709 // to avoid dynamic allocation/memory reclamation headaches, wait for it to have been submitted 615 710 wait( ei.waitctx ); 616 711 … … 618 713 } 619 714 620 static void __ioarbiter_flush( io_context$ & ctx ) { 621 if(!empty( ctx.ext_sq )) { 622 __STATS__( false, io.flush.external += 1; ) 623 624 __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n"); 625 626 lock( ctx.ext_sq.lock __cfaabi_dbg_ctx2 ); 627 { 628 while( !empty(ctx.ext_sq.queue) ) { 629 __external_io & ei = (__external_io&)drop( ctx.ext_sq.queue ); 630 631 __submit_only(&ctx, ei.idxs, ei.have); 632 633 post( ei.waitctx ); 634 } 635 636 ctx.ext_sq.empty = true; 715 // flush the io arbiter: move all external io operations to the submission ring 716 static void __ioarbiter_flush( io_context$ & ctx, bool kernel ) { 717 // if there are no external operations just return 718 if(empty( ctx.ext_sq )) return; 719 720 // stats and logs 721 __STATS__( false, io.flush.external += 1; ) 722 __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n"); 723 724 // this can happen from multiple processors, mutual exclusion is needed 725 lock( ctx.ext_sq.lock __cfaabi_dbg_ctx2 ); 726 { 727 // pop each operation one at a time. 728 // There is no wait morphing because of the io sq ring 729 while( !empty(ctx.ext_sq.queue) ) { 730 // drop the element from the queue 731 __external_io & ei = (__external_io&)drop( ctx.ext_sq.queue ); 732 733 // submit it 734 __submit_only(&ctx, ei.idxs, ei.have, true); 735 736 // wake the thread that was waiting on it 737 // since this can both be called from kernel and user, check the flag before posting 738 __post( ei.waitctx, kernel, UNPARK_LOCAL ); 637 739 } 638 unlock(ctx.ext_sq.lock ); 740 741 // mark the queue as empty 742 ctx.ext_sq.empty = true; 743 ctx.sq.last_external = true; 744 } 745 unlock(ctx.ext_sq.lock ); 746 } 747 748 extern "C" { 749 // debug functions used for gdb 750 // io_uring doesn't yet support gdb soe the kernel-shared data structures aren't viewable in gdb 751 // these functions read the data that gdb can't and should be removed once the support is added 752 static __u32 __cfagdb_cq_head( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->cq.head; } 753 static __u32 __cfagdb_cq_tail( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->cq.tail; } 754 static __u32 __cfagdb_cq_mask( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->cq.mask; } 755 static __u32 __cfagdb_sq_head( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->sq.kring.head; } 756 static __u32 __cfagdb_sq_tail( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->sq.kring.tail; } 757 static __u32 __cfagdb_sq_mask( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->sq.mask; } 758 759 // fancier version that reads an sqe and copies it out. 760 static struct io_uring_sqe __cfagdb_sq_at( io_context$ * ctx, __u32 at ) __attribute__((nonnull(1),used,noinline)) { 761 __u32 ax = at & *ctx->sq.mask; 762 __u32 ix = ctx->sq.kring.array[ax]; 763 return ctx->sq.sqes[ix]; 639 764 } 640 765 }
Note:
See TracChangeset
for help on using the changeset viewer.