Ignore:
Timestamp:
Jun 12, 2023, 2:45:32 PM (2 years ago)
Author:
Fangren Yu <f37yu@…>
Branches:
ast-experimental, master
Children:
62d62db
Parents:
34b4268 (diff), 251ce80 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' into ast-experimental

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    r34b4268 r24d6572  
    1515
    1616#define __cforall_thread__
    17 #define _GNU_SOURCE
    1817
    1918#if defined(__CFA_DEBUG__)
     
    8584        static io_context$ * __ioarbiter_allocate( io_arbiter$ & this, __u32 idxs[], __u32 want );
    8685        static void __ioarbiter_submit( io_context$ * , __u32 idxs[], __u32 have, bool lazy );
    87         static void __ioarbiter_flush ( io_context$ & );
     86        static void __ioarbiter_flush ( io_context$ &, bool kernel );
    8887        static inline void __ioarbiter_notify( io_context$ & ctx );
    8988//=============================================================================================
     
    9493        extern void __kernel_unpark( thread$ * thrd, unpark_hint );
    9594
     95        static inline void __post(oneshot & this, bool kernel, unpark_hint hint) {
     96                thread$ * t = post( this, false );
     97                if(kernel) __kernel_unpark( t, hint );
     98                else unpark( t, hint );
     99        }
     100
     101        // actual system call of io uring
     102        // wrap so everything that needs to happen around it is always done
     103        //   i.e., stats, book keeping, sqe reclamation, etc.
    96104        static void ioring_syscsll( struct io_context$ & ctx, unsigned int min_comp, unsigned int flags ) {
    97105                __STATS__( true, io.calls.flush++; )
    98106                int ret;
    99107                for() {
     108                        // do the system call in a loop, repeat on interrupts
    100109                        ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, flags, (sigset_t *)0p, _NSIG / 8);
    101110                        if( ret < 0 ) {
     
    120129                /* paranoid */ verify( ctx.sq.to_submit >= ret );
    121130
    122                 ctx.sq.to_submit -= ret;
     131                // keep track of how many still need submitting
     132                __atomic_fetch_sub(&ctx.sq.to_submit, ret, __ATOMIC_SEQ_CST);
    123133
    124134                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     
    129139                /* paranoid */ verify( ! __preemption_enabled() );
    130140
     141                // mark that there is no pending io left
    131142                __atomic_store_n(&ctx.proc->io.pending, false, __ATOMIC_RELAXED);
    132143        }
    133144
     145        // try to acquire an io context for draining, helping means we never *need* to drain, we can always do it later
    134146        static bool try_acquire( io_context$ * ctx ) __attribute__((nonnull(1))) {
    135147                /* paranoid */ verify( ! __preemption_enabled() );
     
    138150
    139151                {
     152                        // if there is nothing to drain there is no point in acquiring anything
    140153                        const __u32 head = *ctx->cq.head;
    141154                        const __u32 tail = *ctx->cq.tail;
     
    144157                }
    145158
    146                 // Drain the queue
    147                 if(!__atomic_try_acquire(&ctx->cq.lock)) {
     159                // try a simple spinlock acquire, it's likely there are completions to drain
     160                if(!__atomic_try_acquire(&ctx->cq.try_lock)) {
     161                        // some other processor already has it
    148162                        __STATS__( false, io.calls.locked++; )
    149163                        return false;
    150164                }
    151165
     166                // acquired!!
    152167                return true;
    153168        }
    154169
     170        // actually drain the completion
    155171        static bool __cfa_do_drain( io_context$ * ctx, cluster * cltr ) __attribute__((nonnull(1, 2))) {
    156172                /* paranoid */ verify( ! __preemption_enabled() );
    157173                /* paranoid */ verify( ready_schedule_islocked() );
    158                 /* paranoid */ verify( ctx->cq.lock == true );
    159 
     174                /* paranoid */ verify( ctx->cq.try_lock == true );
     175
     176                // get all the invariants and initial state
    160177                const __u32 mask = *ctx->cq.mask;
    161178                const __u32 num  = *ctx->cq.num;
     
    166183                for() {
    167184                        // re-read the head and tail in case it already changed.
     185                        // count the difference between the two
    168186                        const __u32 head = *ctx->cq.head;
    169187                        const __u32 tail = *ctx->cq.tail;
     
    171189                        __STATS__( false, io.calls.drain++; io.calls.completed += count; )
    172190
     191                        // for everything between head and tail, drain it
    173192                        for(i; count) {
    174193                                unsigned idx = (head + i) & mask;
     
    177196                                /* paranoid */ verify(&cqe);
    178197
     198                                // find the future in the completion
    179199                                struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
    180200                                // __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
    181201
     202                                // don't directly fulfill the future, preemption is disabled so we need to use kernel_unpark
    182203                                __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL );
    183204                        }
    184205
     206                        // update the timestamps accordingly
     207                        // keep a local copy so we can update the relaxed copy
    185208                        ts_next = ctx->cq.ts = rdtscl();
    186209
     
    190213                        ctx->proc->idle_wctx.drain_time = ts_next;
    191214
     215                        // we finished draining the completions... unless the ring buffer was full and there are more secret completions in the kernel.
    192216                        if(likely(count < num)) break;
    193217
     218                        // the ring buffer was full, there could be more stuff in the kernel.
    194219                        ioring_syscsll( *ctx, 0, IORING_ENTER_GETEVENTS);
    195220                }
     
    199224                /* paranoid */ verify( ! __preemption_enabled() );
    200225
    201                 __atomic_unlock(&ctx->cq.lock);
    202 
     226                // everything is drained, we can release the lock
     227                __atomic_unlock(&ctx->cq.try_lock);
     228
     229                // update the relaxed timestamp
    203230                touch_tsc( cltr->sched.io.tscs, ctx->cq.id, ts_prev, ts_next, false );
    204231
     
    206233        }
    207234
     235        // call from a processor to flush
     236        // contains all the bookkeeping a proc must do, not just the barebones flushing logic
     237        void __cfa_do_flush( io_context$ & ctx, bool kernel ) {
     238                /* paranoid */ verify( ! __preemption_enabled() );
     239
     240                // flush any external requests
     241                ctx.sq.last_external = false; // clear the external bit, the arbiter will reset it if needed
     242                __ioarbiter_flush( ctx, kernel );
     243
     244                // if submitting must be submitted, do the system call
     245                if(ctx.sq.to_submit != 0) {
     246                        ioring_syscsll(ctx, 0, 0);
     247                }
     248        }
     249
     250        // call from a processor to drain
     251        // contains all the bookkeeping a proc must do, not just the barebones draining logic
    208252        bool __cfa_io_drain( struct processor * proc ) {
    209253                bool local = false;
    210254                bool remote = false;
    211255
     256                // make sure no ones creates/destroys io contexts
    212257                ready_schedule_lock();
    213258
     
    217262                /* paranoid */ verify( ctx );
    218263
     264                // Help if needed
    219265                with(cltr->sched) {
    220266                        const size_t ctxs_count = io.count;
     
    230276                        const unsigned long long ctsc = rdtscl();
    231277
     278                        // only help once every other time
     279                        // pick a target when not helping
    232280                        if(proc->io.target == UINT_MAX) {
    233281                                uint64_t chaos = __tls_rand();
     282                                // choose who to help and whether to accept helping far processors
    234283                                unsigned ext = chaos & 0xff;
    235284                                unsigned other  = (chaos >> 8) % (ctxs_count);
    236285
     286                                // if the processor is on the same cache line or is lucky ( 3 out of 256 odds ) help it
    237287                                if(ext < 3 || __atomic_load_n(&caches[other / __shard_factor.io].id, __ATOMIC_RELAXED) == this_cache) {
    238288                                        proc->io.target = other;
     
    240290                        }
    241291                        else {
     292                                // a target was picked last time, help it
    242293                                const unsigned target = proc->io.target;
    243294                                /* paranoid */ verify( io.tscs[target].t.tv != ULLONG_MAX );
     295                                // make sure the target hasn't stopped existing since last time
    244296                                HELP: if(target < ctxs_count) {
     297                                        // calculate it's age and how young it could be before we give up on helping
    245298                                        const __readyQ_avg_t cutoff = calc_cutoff(ctsc, ctx->cq.id, ctxs_count, io.data, io.tscs, __shard_factor.io, false);
    246299                                        const __readyQ_avg_t age = moving_average(ctsc, io.tscs[target].t.tv, io.tscs[target].t.ma, false);
    247300                                        __cfadbg_print_safe(io, "Kernel I/O: Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, ctx->cq.id, age, cutoff, age > cutoff ? "yes" : "no");
     301                                        // is the target older than the cutoff, recall 0 is oldest and bigger ints are younger
    248302                                        if(age <= cutoff) break HELP;
    249303
    250                                         if(!try_acquire(io.data[target])) break HELP;
    251 
     304                                        // attempt to help the submission side
     305                                        __cfa_do_flush( *io.data[target], true );
     306
     307                                        // attempt to help the completion side
     308                                        if(!try_acquire(io.data[target])) break HELP; // already acquire no help needed
     309
     310                                        // actually help
    252311                                        if(!__cfa_do_drain( io.data[target], cltr )) break HELP;
    253312
     313                                        // track we did help someone
    254314                                        remote = true;
    255315                                        __STATS__( true, io.calls.helped++; )
    256316                                }
     317
     318                                // reset the target
    257319                                proc->io.target = UINT_MAX;
    258320                        }
    259321                }
    260 
    261322
    262323                // Drain the local queue
     
    270331
    271332                ready_schedule_unlock();
     333
     334                // return true if some completion entry, local or remote, was drained
    272335                return local || remote;
    273336        }
    274337
     338
     339
     340        // call from a processor to flush
     341        // contains all the bookkeeping a proc must do, not just the barebones flushing logic
    275342        bool __cfa_io_flush( struct processor * proc ) {
    276343                /* paranoid */ verify( ! __preemption_enabled() );
     
    278345                /* paranoid */ verify( proc->io.ctx );
    279346
    280                 io_context$ & ctx = *proc->io.ctx;
    281 
    282                 __ioarbiter_flush( ctx );
    283 
    284                 if(ctx.sq.to_submit != 0) {
    285                         ioring_syscsll(ctx, 0, 0);
    286 
    287                 }
    288 
     347                __cfa_do_flush( *proc->io.ctx, false );
     348
     349                // also drain since some stuff will immediately complete
    289350                return __cfa_io_drain( proc );
    290351        }
     
    393454        //=============================================================================================
    394455        // submission
    395         static inline void __submit_only( struct io_context$ * ctx, __u32 idxs[], __u32 have) {
     456        // barebones logic to submit a group of sqes
     457        static inline void __submit_only( struct io_context$ * ctx, __u32 idxs[], __u32 have, bool lock) {
     458                if(!lock)
     459                        lock( ctx->ext_sq.lock __cfaabi_dbg_ctx2 );
    396460                // We can proceed to the fast path
    397461                // Get the right objects
     
    408472                // Make the sqes visible to the submitter
    409473                __atomic_store_n(sq.kring.tail, tail + have, __ATOMIC_RELEASE);
    410                 sq.to_submit += have;
    411 
     474                __atomic_fetch_add(&sq.to_submit, have, __ATOMIC_SEQ_CST);
     475
     476                // set the bit to mark things need to be flushed
    412477                __atomic_store_n(&ctx->proc->io.pending, true, __ATOMIC_RELAXED);
    413478                __atomic_store_n(&ctx->proc->io.dirty  , true, __ATOMIC_RELAXED);
    414         }
    415 
     479
     480                if(!lock)
     481                        unlock( ctx->ext_sq.lock );
     482        }
     483
     484        // submission logic + maybe flushing
    416485        static inline void __submit( struct io_context$ * ctx, __u32 idxs[], __u32 have, bool lazy) {
    417486                __sub_ring_t & sq = ctx->sq;
    418                 __submit_only(ctx, idxs, have);
     487                __submit_only(ctx, idxs, have, false);
    419488
    420489                if(sq.to_submit > 30) {
     
    428497        }
    429498
     499        // call from a processor to flush
     500        // might require arbitration if the thread was migrated after the allocation
    430501        void cfa_io_submit( struct io_context$ * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) libcfa_public {
    431502                // __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager");
     
    441512                if( ctx == inctx )              // We have the right instance?
    442513                {
     514                        // yes! fast submit
    443515                        __submit(ctx, idxs, have, lazy);
    444516
     
    507579                __atomic_store_n(&ctx.sq.free_ring.tail, ftail + count, __ATOMIC_SEQ_CST);
    508580
     581                // notify the allocator that new allocations can be made
    509582                __ioarbiter_notify(ctx);
    510583
     
    557630        }
    558631
     632        // notify the arbiter that new allocations are available
    559633        static void __ioarbiter_notify( io_arbiter$ & this, io_context$ * ctx ) {
    560634                /* paranoid */ verify( !empty(this.pending.queue) );
    561 
     635                /* paranoid */ verify( __preemption_enabled() );
     636
     637                // mutual exclusion is needed
    562638                lock( this.pending.lock __cfaabi_dbg_ctx2 );
    563639                {
     640                        __cfadbg_print_safe(io, "Kernel I/O : notifying\n");
     641
     642                        // as long as there are pending allocations try to satisfy them
     643                        // for simplicity do it in FIFO order
    564644                        while( !empty(this.pending.queue) ) {
    565                                 __cfadbg_print_safe(io, "Kernel I/O : notifying\n");
     645                                // get first pending allocs
    566646                                __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head;
    567647                                __pending_alloc & pa = (__pending_alloc&)head( this.pending.queue );
    568648
     649                                // check if we have enough to satisfy the request
    569650                                if( have > pa.want ) goto DONE;
     651
     652                                // if there are enough allocations it means we can drop the request
    570653                                drop( this.pending.queue );
    571654
    572655                                /* paranoid */__attribute__((unused)) bool ret =
    573656
     657                                // actually do the alloc
    574658                                __alloc(ctx, pa.idxs, pa.want);
    575659
    576660                                /* paranoid */ verify( ret );
    577661
     662                                // write out which context statisfied the request and post
     663                                // this
    578664                                pa.ctx = ctx;
    579 
    580665                                post( pa.waitctx );
    581666                        }
     
    585670                }
    586671                unlock( this.pending.lock );
    587         }
    588 
     672
     673                /* paranoid */ verify( __preemption_enabled() );
     674        }
     675
     676        // short hand to avoid the mutual exclusion of the pending is empty regardless
    589677        static void __ioarbiter_notify( io_context$ & ctx ) {
    590                 if(!empty( ctx.arbiter->pending )) {
    591                         __ioarbiter_notify( *ctx.arbiter, &ctx );
    592                 }
    593         }
    594 
    595         // Simply append to the pending
     678                if(empty( ctx.arbiter->pending )) return;
     679                __ioarbiter_notify( *ctx.arbiter, &ctx );
     680        }
     681
     682        // Submit from outside the local processor: append to the outstanding list
    596683        static void __ioarbiter_submit( io_context$ * ctx, __u32 idxs[], __u32 have, bool lazy ) {
    597684                __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd);
     
    599686                __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have);
    600687
     688                // create the intrusive object to append
    601689                __external_io ei;
    602690                ei.idxs = idxs;
     
    604692                ei.lazy = lazy;
    605693
     694                // enqueue the io
    606695                bool we = enqueue(ctx->ext_sq, (__outstanding_io&)ei);
    607696
     697                // mark pending
    608698                __atomic_store_n(&ctx->proc->io.pending, true, __ATOMIC_SEQ_CST);
    609699
     700                // if this is the first to be enqueued, signal the processor in an attempt to speed up flushing
     701                // if it's not the first enqueue, a signal is already in transit
    610702                if( we ) {
    611703                        sigval_t value = { PREEMPT_IO };
    612704                        __cfaabi_pthread_sigqueue(ctx->proc->kernel_thread, SIGUSR1, value);
    613                 }
    614 
     705                        __STATS__( false, io.flush.signal += 1; )
     706                }
     707                __STATS__( false, io.submit.extr += 1; )
     708
     709                // to avoid dynamic allocation/memory reclamation headaches, wait for it to have been submitted
    615710                wait( ei.waitctx );
    616711
     
    618713        }
    619714
    620         static void __ioarbiter_flush( io_context$ & ctx ) {
    621                 if(!empty( ctx.ext_sq )) {
    622                         __STATS__( false, io.flush.external += 1; )
    623 
    624                         __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n");
    625 
    626                         lock( ctx.ext_sq.lock __cfaabi_dbg_ctx2 );
    627                         {
    628                                 while( !empty(ctx.ext_sq.queue) ) {
    629                                         __external_io & ei = (__external_io&)drop( ctx.ext_sq.queue );
    630 
    631                                         __submit_only(&ctx, ei.idxs, ei.have);
    632 
    633                                         post( ei.waitctx );
    634                                 }
    635 
    636                                 ctx.ext_sq.empty = true;
     715        // flush the io arbiter: move all external io operations to the submission ring
     716        static void __ioarbiter_flush( io_context$ & ctx, bool kernel ) {
     717                // if there are no external operations just return
     718                if(empty( ctx.ext_sq )) return;
     719
     720                // stats and logs
     721                __STATS__( false, io.flush.external += 1; )
     722                __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n");
     723
     724                // this can happen from multiple processors, mutual exclusion is needed
     725                lock( ctx.ext_sq.lock __cfaabi_dbg_ctx2 );
     726                {
     727                        // pop each operation one at a time.
     728                        // There is no wait morphing because of the io sq ring
     729                        while( !empty(ctx.ext_sq.queue) ) {
     730                                // drop the element from the queue
     731                                __external_io & ei = (__external_io&)drop( ctx.ext_sq.queue );
     732
     733                                // submit it
     734                                __submit_only(&ctx, ei.idxs, ei.have, true);
     735
     736                                // wake the thread that was waiting on it
     737                                // since this can both be called from kernel and user, check the flag before posting
     738                                __post( ei.waitctx, kernel, UNPARK_LOCAL );
    637739                        }
    638                         unlock(ctx.ext_sq.lock );
     740
     741                        // mark the queue as empty
     742                        ctx.ext_sq.empty = true;
     743                        ctx.sq.last_external = true;
     744                }
     745                unlock(ctx.ext_sq.lock );
     746        }
     747
     748        extern "C" {
     749                // debug functions used for gdb
     750                // io_uring doesn't yet support gdb soe the kernel-shared data structures aren't viewable in gdb
     751                // these functions read the data that gdb can't and should be removed once the support is added
     752                static __u32 __cfagdb_cq_head( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->cq.head; }
     753                static __u32 __cfagdb_cq_tail( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->cq.tail; }
     754                static __u32 __cfagdb_cq_mask( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->cq.mask; }
     755                static __u32 __cfagdb_sq_head( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->sq.kring.head; }
     756                static __u32 __cfagdb_sq_tail( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->sq.kring.tail; }
     757                static __u32 __cfagdb_sq_mask( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->sq.mask; }
     758
     759                // fancier version that reads an sqe and copies it out.
     760                static struct io_uring_sqe __cfagdb_sq_at( io_context$ * ctx, __u32 at ) __attribute__((nonnull(1),used,noinline)) {
     761                        __u32 ax = at & *ctx->sq.mask;
     762                        __u32 ix = ctx->sq.kring.array[ax];
     763                        return ctx->sq.sqes[ix];
    639764                }
    640765        }
Note: See TracChangeset for help on using the changeset viewer.