Changeset 6b33e89 for libcfa/src/concurrency/io.cfa
- Timestamp:
- Apr 25, 2025, 7:39:09 AM (5 months ago)
- Branches:
- master
- Children:
- 65bd3c2
- Parents:
- b195498
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
rb195498 r6b33e89 95 95 static inline void __post(oneshot & this, bool kernel, unpark_hint hint) { 96 96 thread$ * t = post( this, false ); 97 if (kernel) __kernel_unpark( t, hint );97 if (kernel) __kernel_unpark( t, hint ); 98 98 else unpark( t, hint ); 99 99 } … … 108 108 // do the system call in a loop, repeat on interrupts 109 109 ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, flags, (sigset_t *)0p, _NSIG / 8); 110 if ( ret < 0 ) {110 if ( ret < 0 ) { 111 111 switch((int)errno) { 112 112 case EINTR: … … 154 154 const __u32 tail = *ctx->cq.tail; 155 155 156 if (head == tail) return false;156 if (head == tail) return false; 157 157 } 158 158 159 159 // try a simple spinlock acquire, it's likely there are completions to drain 160 if (!__atomic_try_acquire(&ctx->cq.try_lock)) {160 if ( ! __atomic_try_acquire(&ctx->cq.try_lock)) { 161 161 // some other processor already has it 162 162 __STATS__( false, io.calls.locked++; ) … … 214 214 215 215 // we finished draining the completions... unless the ring buffer was full and there are more secret completions in the kernel. 216 if (likely(count < num)) break;216 if (likely(count < num)) break; 217 217 218 218 // the ring buffer was full, there could be more stuff in the kernel. … … 243 243 244 244 // if submitting must be submitted, do the system call 245 if (ctx.sq.to_submit != 0) {245 if (ctx.sq.to_submit != 0) { 246 246 ioring_syscsll(ctx, 0, 0); 247 247 } … … 278 278 // only help once every other time 279 279 // pick a target when not helping 280 if (proc->io.target == UINT_MAX) {280 if (proc->io.target == UINT_MAX) { 281 281 uint64_t chaos = __tls_rand(); 282 282 // choose who to help and whether to accept helping far processors … … 285 285 286 286 // if the processor is on the same cache line or is lucky ( 3 out of 256 odds ) help it 287 if (ext < 3 || __atomic_load_n(&caches[other / __shard_factor.io].id, __ATOMIC_RELAXED) == this_cache) {287 if (ext < 3 || __atomic_load_n(&caches[other / __shard_factor.io].id, __ATOMIC_RELAXED) == this_cache) { 288 288 proc->io.target = other; 289 289 } … … 294 294 /* paranoid */ verify( io.tscs[target].t.tv != ULLONG_MAX ); 295 295 // make sure the target hasn't stopped existing since last time 296 HELP: if (target < ctxs_count) {296 HELP: if (target < ctxs_count) { 297 297 // calculate it's age and how young it could be before we give up on helping 298 298 const __readyQ_avg_t cutoff = calc_cutoff(ctsc, ctx->cq.id, ctxs_count, io.data, io.tscs, __shard_factor.io, false); … … 300 300 __cfadbg_print_safe(io, "Kernel I/O: Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, ctx->cq.id, age, cutoff, age > cutoff ? "yes" : "no"); 301 301 // is the target older than the cutoff, recall 0 is oldest and bigger ints are younger 302 if (age <= cutoff) break HELP;302 if (age <= cutoff) break HELP; 303 303 304 304 // attempt to help the submission side … … 306 306 307 307 // attempt to help the completion side 308 if (!try_acquire(io.data[target])) break HELP; // already acquire no help needed308 if ( ! try_acquire(io.data[target])) break HELP; // already acquire no help needed 309 309 310 310 // actually help 311 if (!__cfa_do_drain( io.data[target], cltr )) break HELP;311 if ( ! __cfa_do_drain( io.data[target], cltr )) break HELP; 312 312 313 313 // track we did help someone … … 322 322 323 323 // Drain the local queue 324 if (try_acquire( proc->io.ctx )) {324 if (try_acquire( proc->io.ctx )) { 325 325 local = __cfa_do_drain( proc->io.ctx, cltr ); 326 326 } … … 390 390 391 391 // If we don't have enough sqes, fail 392 if ((ftail - fhead) < want) { return false; }392 if ((ftail - fhead) < want) { return false; } 393 393 394 394 // copy all the indexes we want from the available list … … 422 422 423 423 // We can proceed to the fast path 424 if ( __alloc(ctx, idxs, want) ) {424 if ( __alloc(ctx, idxs, want) ) { 425 425 // Allocation was successful 426 426 __STATS__( true, io.alloc.fast += 1; ) … … 456 456 // barebones logic to submit a group of sqes 457 457 static inline void __submit_only( struct io_context$ * ctx, __u32 idxs[], __u32 have, bool lock) { 458 if (!lock)458 if ( ! lock) 459 459 lock( ctx->ext_sq.lock __cfaabi_dbg_ctx2 ); 460 460 // We can proceed to the fast path … … 478 478 __atomic_store_n(&ctx->proc->io.dirty , true, __ATOMIC_RELAXED); 479 479 480 if (!lock)480 if ( ! lock) 481 481 unlock( ctx->ext_sq.lock ); 482 482 } … … 487 487 __submit_only(ctx, idxs, have, false); 488 488 489 if (sq.to_submit > 30) {489 if (sq.to_submit > 30) { 490 490 __tls_stats()->io.flush.full++; 491 491 __cfa_io_flush( ctx->proc ); 492 492 } 493 if (!lazy) {493 if ( ! lazy ) { 494 494 __tls_stats()->io.flush.eager++; 495 495 __cfa_io_flush( ctx->proc ); … … 503 503 504 504 disable_interrupts(); 505 __STATS__( true, if (!lazy) io.submit.eagr += 1; )505 __STATS__( true, if ( ! lazy ) io.submit.eagr += 1; ) 506 506 struct processor * proc = __cfaabi_tls.this_processor; 507 507 io_context$ * ctx = proc->io.ctx; … … 510 510 511 511 // Can we proceed to the fast path 512 if ( ctx == inctx ) // We have the right instance?512 if ( ctx == inctx ) // We have the right instance? 513 513 { 514 514 // yes! fast submit … … 564 564 __u32 count = chead - phead; 565 565 566 if (count == 0) {566 if (count == 0) { 567 567 return 0; 568 568 } … … 594 594 lock( queue.lock __cfaabi_dbg_ctx2 ); 595 595 { 596 was_empty = queue.queue`isEmpty;596 was_empty = isEmpty( queue.queue ); 597 597 598 598 // Add our request to the list … … 632 632 // notify the arbiter that new allocations are available 633 633 static void __ioarbiter_notify( io_arbiter$ & this, io_context$ * ctx ) { 634 /* paranoid */ verify( ! this.pending.queue`isEmpty);634 /* paranoid */ verify( ! isEmpty( this.pending.queue ) ); 635 635 /* paranoid */ verify( __preemption_enabled() ); 636 636 … … 642 642 // as long as there are pending allocations try to satisfy them 643 643 // for simplicity do it in FIFO order 644 while( ! this.pending.queue`isEmpty) {644 while( ! isEmpty( this.pending.queue ) ) { 645 645 // get first pending allocs 646 646 __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head; 647 __pending_alloc & pa = (__pending_alloc&)( this.pending.queue`first);647 __pending_alloc & pa = (__pending_alloc&)( first( this.pending.queue )); 648 648 649 649 // check if we have enough to satisfy the request 650 if ( have > pa.want ) goto DONE;650 if ( have > pa.want ) goto DONE; 651 651 652 652 // if there are enough allocations it means we can drop the request 653 try_pop_front( this.pending.queue );653 remove_first( this.pending.queue ); 654 654 655 655 /* paranoid */__attribute__((unused)) bool ret = … … 676 676 // short hand to avoid the mutual exclusion of the pending is empty regardless 677 677 static void __ioarbiter_notify( io_context$ & ctx ) { 678 if (empty( ctx.arbiter->pending )) return;678 if (empty( ctx.arbiter->pending )) return; 679 679 __ioarbiter_notify( *ctx.arbiter, &ctx ); 680 680 } … … 700 700 // if this is the first to be enqueued, signal the processor in an attempt to speed up flushing 701 701 // if it's not the first enqueue, a signal is already in transit 702 if ( we ) {702 if ( we ) { 703 703 sigval_t value = { PREEMPT_IO }; 704 704 __cfaabi_pthread_sigqueue(ctx->proc->kernel_thread, SIGUSR1, value); … … 716 716 static void __ioarbiter_flush( io_context$ & ctx, bool kernel ) { 717 717 // if there are no external operations just return 718 if (empty( ctx.ext_sq )) return;718 if ( empty( ctx.ext_sq ) ) return; 719 719 720 720 // stats and logs … … 727 727 // pop each operation one at a time. 728 728 // There is no wait morphing because of the io sq ring 729 while( ! ctx.ext_sq.queue`isEmpty) {729 while( ! isEmpty( ctx.ext_sq.queue ) ) { 730 730 // drop the element from the queue 731 __external_io & ei = (__external_io&) try_pop_front( ctx.ext_sq.queue );731 __external_io & ei = (__external_io&)remove_first( ctx.ext_sq.queue ); 732 732 733 733 // submit it
Note:
See TracChangeset
for help on using the changeset viewer.