Ignore:
Timestamp:
Mar 2, 2021, 5:28:32 PM (8 months ago)
Author:
Peter A. Buhr <pabuhr@…>
Branches:
arm-eh, jacob/cs343-translation, master, new-ast-unique-expr
Children:
6083392
Parents:
182256b (diff), 9eb7a53 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    r182256b r266ecf1  
    3232        extern "C" {
    3333                #include <sys/syscall.h>
     34                #include <sys/eventfd.h>
    3435
    3536                #include <linux/io_uring.h>
     
    7980        };
    8081
    81         // returns true of acquired as leader or second leader
    82         static inline bool try_lock( __leaderlock_t & this ) {
    83                 const uintptr_t thrd = 1z | (uintptr_t)active_thread();
    84                 bool block;
    85                 disable_interrupts();
    86                 for() {
    87                         struct $thread * expected = this.value;
    88                         if( 1p != expected && 0p != expected ) {
    89                                 /* paranoid */ verify( thrd != (uintptr_t)expected ); // We better not already be the next leader
    90                                 enable_interrupts( __cfaabi_dbg_ctx );
    91                                 return false;
    92                         }
    93                         struct $thread * desired;
    94                         if( 0p == expected ) {
    95                                 // If the lock isn't locked acquire it, no need to block
    96                                 desired = 1p;
    97                                 block = false;
    98                         }
    99                         else {
    100                                 // If the lock is already locked try becomming the next leader
    101                                 desired = (struct $thread *)thrd;
    102                                 block = true;
    103                         }
    104                         if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break;
    105                 }
    106                 if( block ) {
    107                         enable_interrupts( __cfaabi_dbg_ctx );
    108                         park();
    109                         disable_interrupts();
    110                 }
    111                 return true;
    112         }
    113 
    114         static inline bool next( __leaderlock_t & this ) {
     82        static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor *, __u32 idxs[], __u32 want );
     83        static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * , __u32 idxs[], __u32 have, bool lazy );
     84        static void __ioarbiter_flush ( $io_arbiter & mutex this, $io_context * );
     85        static inline void __ioarbiter_notify( $io_context & ctx );
     86//=============================================================================================
     87// I/O Polling
     88//=============================================================================================
     89        static inline unsigned __flush( struct $io_context & );
     90        static inline __u32 __release_sqes( struct $io_context & );
     91
     92        void __cfa_io_drain( processor * proc ) {
    11593                /* paranoid */ verify( ! __preemption_enabled() );
    116                 struct $thread * nextt;
    117                 for() {
    118                         struct $thread * expected = this.value;
    119                         /* paranoid */ verify( (1 & (uintptr_t)expected) == 1 ); // The lock better be locked
    120 
    121                         struct $thread * desired;
    122                         if( 1p == expected ) {
    123                                 // No next leader, just unlock
    124                                 desired = 0p;
    125                                 nextt   = 0p;
    126                         }
    127                         else {
    128                                 // There is a next leader, remove but keep locked
    129                                 desired = 1p;
    130                                 nextt   = (struct $thread *)(~1z & (uintptr_t)expected);
    131                         }
    132                         if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break;
    133                 }
    134 
    135                 if(nextt) {
    136                         unpark( nextt );
    137                         enable_interrupts( __cfaabi_dbg_ctx );
    138                         return true;
    139                 }
    140                 enable_interrupts( __cfaabi_dbg_ctx );
    141                 return false;
    142         }
    143 
    144 //=============================================================================================
    145 // I/O Syscall
    146 //=============================================================================================
    147         static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) {
    148                 bool need_sys_to_submit = false;
    149                 bool need_sys_to_complete = false;
    150                 unsigned flags = 0;
    151 
    152                 TO_SUBMIT:
    153                 if( to_submit > 0 ) {
    154                         if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
    155                                 need_sys_to_submit = true;
    156                                 break TO_SUBMIT;
    157                         }
    158                         if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
    159                                 need_sys_to_submit = true;
    160                                 flags |= IORING_ENTER_SQ_WAKEUP;
    161                         }
    162                 }
    163 
    164                 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
    165                         flags |= IORING_ENTER_GETEVENTS;
    166                         if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
    167                                 need_sys_to_complete = true;
    168                         }
    169                 }
    170 
    171                 int ret = 0;
    172                 if( need_sys_to_submit || need_sys_to_complete ) {
    173                         __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags);
    174                         ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8);
    175                         __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING %d returned %d\n", ring.fd, ret);
    176 
    177                         if( ret < 0 ) {
    178                                 switch((int)errno) {
    179                                 case EAGAIN:
    180                                 case EINTR:
    181                                 case EBUSY:
    182                                         ret = -1;
    183                                         break;
    184                                 default:
    185                                         abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
    186                                 }
    187                         }
    188                 }
    189 
    190                 // Memory barrier
    191                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
    192                 return ret;
    193         }
    194 
    195 //=============================================================================================
    196 // I/O Polling
    197 //=============================================================================================
    198         static unsigned __collect_submitions( struct __io_data & ring );
    199         static __u32 __release_consumed_submission( struct __io_data & ring );
    200         static inline void __clean( volatile struct io_uring_sqe * sqe );
    201 
    202         // Process a single completion message from the io_uring
    203         // This is NOT thread-safe
    204         static inline void process( volatile struct io_uring_cqe & cqe ) {
    205                 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
    206                 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
    207 
    208                 fulfil( *future, cqe.res );
    209         }
    210 
    211         static [int, bool] __drain_io( & struct __io_data ring ) {
    212                 /* paranoid */ verify( ! __preemption_enabled() );
    213 
    214                 unsigned to_submit = 0;
    215                 if( ring.poller_submits ) {
    216                         // If the poller thread also submits, then we need to aggregate the submissions which are ready
    217                         to_submit = __collect_submitions( ring );
    218                 }
    219 
    220                 int ret = __io_uring_enter(ring, to_submit, true);
    221                 if( ret < 0 ) {
    222                         return [0, true];
    223                 }
    224 
    225                 // update statistics
    226                 if (to_submit > 0) {
    227                         __STATS__( true,
    228                                 if( to_submit > 0 ) {
    229                                         io.submit_q.submit_avg.rdy += to_submit;
    230                                         io.submit_q.submit_avg.csm += ret;
    231                                         io.submit_q.submit_avg.cnt += 1;
    232                                 }
    233                         )
    234                 }
    235 
    236                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
    237 
    238                 // Release the consumed SQEs
    239                 __release_consumed_submission( ring );
     94                /* paranoid */ verify( proc );
     95                /* paranoid */ verify( proc->io.ctx );
    24096
    24197                // Drain the queue
    242                 unsigned head = *ring.completion_q.head;
    243                 unsigned tail = *ring.completion_q.tail;
    244                 const __u32 mask = *ring.completion_q.mask;
    245 
    246                 // Nothing was new return 0
    247                 if (head == tail) {
    248                         return [0, to_submit > 0];
    249                 }
     98                $io_context * ctx = proc->io.ctx;
     99                unsigned head = *ctx->cq.head;
     100                unsigned tail = *ctx->cq.tail;
     101                const __u32 mask = *ctx->cq.mask;
    250102
    251103                __u32 count = tail - head;
    252                 /* paranoid */ verify( count != 0 );
     104                __STATS__( false, io.calls.drain++; io.calls.completed += count; )
     105
    253106                for(i; count) {
    254107                        unsigned idx = (head + i) & mask;
    255                         volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
     108                        volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx];
    256109
    257110                        /* paranoid */ verify(&cqe);
    258111
    259                         process( cqe );
    260                 }
     112                        struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
     113                        __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
     114
     115                        fulfil( *future, cqe.res );
     116                }
     117
     118                __cfadbg_print_safe(io, "Kernel I/O : %u completed\n", count);
    261119
    262120                // Mark to the kernel that the cqe has been seen
    263121                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
    264                 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST );
    265 
    266                 return [count, count > 0 || to_submit > 0];
    267         }
    268 
    269         void main( $io_ctx_thread & this ) {
    270                 __ioctx_register( this );
    271 
    272                 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this);
    273 
    274                 const int reset_cnt = 5;
    275                 int reset = reset_cnt;
    276                 // Then loop until we need to start
    277                 LOOP:
    278                 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
    279                         // Drain the io
    280                         int count;
    281                         bool again;
    282                         disable_interrupts();
    283                                 [count, again] = __drain_io( *this.ring );
    284 
    285                                 if(!again) reset--;
    286 
     122                __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST );
     123
     124                /* paranoid */ verify( ! __preemption_enabled() );
     125
     126                return;
     127        }
     128
     129        void __cfa_io_flush( processor * proc ) {
     130                /* paranoid */ verify( ! __preemption_enabled() );
     131                /* paranoid */ verify( proc );
     132                /* paranoid */ verify( proc->io.ctx );
     133
     134                $io_context & ctx = *proc->io.ctx;
     135
     136                if(!ctx.ext_sq.empty) {
     137                        __ioarbiter_flush( *ctx.arbiter, &ctx );
     138                }
     139
     140                __STATS__( true, io.calls.flush++; )
     141                int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, 0, 0, (sigset_t *)0p, _NSIG / 8);
     142                if( ret < 0 ) {
     143                        switch((int)errno) {
     144                        case EAGAIN:
     145                        case EINTR:
     146                        case EBUSY:
    287147                                // Update statistics
    288                                 __STATS__( true,
    289                                         io.complete_q.completed_avg.val += count;
    290                                         io.complete_q.completed_avg.cnt += 1;
    291                                 )
    292                         enable_interrupts( __cfaabi_dbg_ctx );
    293 
    294                         // If we got something, just yield and check again
    295                         if(reset > 1) {
    296                                 yield();
    297                                 continue LOOP;
     148                                __STATS__( false, io.calls.errors.busy ++; )
     149                                return;
     150                        default:
     151                                abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
    298152                        }
    299 
    300                         // We alread failed to find completed entries a few time.
    301                         if(reset == 1) {
    302                                 // Rearm the context so it can block
    303                                 // but don't block right away
    304                                 // we need to retry one last time in case
    305                                 // something completed *just now*
    306                                 __ioctx_prepare_block( this );
    307                                 continue LOOP;
    308                         }
    309 
    310                                 __STATS__( false,
    311                                         io.complete_q.blocks += 1;
    312                                 )
    313                                 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this);
    314 
    315                                 // block this thread
    316                                 wait( this.sem );
    317 
    318                         // restore counter
    319                         reset = reset_cnt;
    320                 }
    321 
    322                 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this);
    323 
    324                 __ioctx_unregister( this );
     153                }
     154
     155                __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd);
     156                __STATS__( true, io.calls.submitted += ret; )
     157                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     158                /* paranoid */ verify( ctx.sq.to_submit >= ret );
     159
     160                ctx.sq.to_submit -= ret;
     161
     162                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     163
     164                // Release the consumed SQEs
     165                __release_sqes( ctx );
     166
     167                /* paranoid */ verify( ! __preemption_enabled() );
     168
     169                ctx.proc->io.pending = false;
    325170        }
    326171
     
    344189//         head and tail must be fully filled and shouldn't ever be touched again.
    345190//
     191        //=============================================================================================
     192        // Allocation
     193        // for user's convenience fill the sqes from the indexes
     194        static inline void __fill(struct io_uring_sqe * out_sqes[], __u32 want, __u32 idxs[], struct $io_context * ctx)  {
     195                struct io_uring_sqe * sqes = ctx->sq.sqes;
     196                for(i; want) {
     197                        __cfadbg_print_safe(io, "Kernel I/O : filling loop\n");
     198                        out_sqes[i] = &sqes[idxs[i]];
     199                }
     200        }
     201
     202        // Try to directly allocate from the a given context
     203        // Not thread-safe
     204        static inline bool __alloc(struct $io_context * ctx, __u32 idxs[], __u32 want) {
     205                __sub_ring_t & sq = ctx->sq;
     206                const __u32 mask  = *sq.mask;
     207                __u32 fhead = sq.free_ring.head;    // get the current head of the queue
     208                __u32 ftail = sq.free_ring.tail;    // get the current tail of the queue
     209
     210                // If we don't have enough sqes, fail
     211                if((ftail - fhead) < want) { return false; }
     212
     213                // copy all the indexes we want from the available list
     214                for(i; want) {
     215                        __cfadbg_print_safe(io, "Kernel I/O : allocating loop\n");
     216                        idxs[i] = sq.free_ring.array[(fhead + i) & mask];
     217                }
     218
     219                // Advance the head to mark the indexes as consumed
     220                __atomic_store_n(&sq.free_ring.head, fhead + want, __ATOMIC_RELEASE);
     221
     222                // return success
     223                return true;
     224        }
    346225
    347226        // Allocate an submit queue entry.
     
    350229        // for convenience, return both the index and the pointer to the sqe
    351230        // sqe == &sqes[idx]
    352         [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) {
    353                 /* paranoid */ verify( data != 0 );
    354 
    355                 // Prepare the data we need
    356                 __attribute((unused)) int len   = 0;
    357                 __attribute((unused)) int block = 0;
    358                 __u32 cnt = *ring.submit_q.num;
    359                 __u32 mask = *ring.submit_q.mask;
    360 
    361                 __u32 off = thread_rand();
    362 
    363                 // Loop around looking for an available spot
    364                 for() {
    365                         // Look through the list starting at some offset
    366                         for(i; cnt) {
    367                                 __u64 expected = 3;
    368                                 __u32 idx = (i + off) & mask; // Get an index from a random
    369                                 volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
    370                                 volatile __u64 * udata = &sqe->user_data;
    371 
    372                                 // Allocate the entry by CASing the user_data field from 0 to the future address
    373                                 if( *udata == expected &&
    374                                         __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
    375                                 {
    376                                         // update statistics
    377                                         __STATS__( false,
    378                                                 io.submit_q.alloc_avg.val   += len;
    379                                                 io.submit_q.alloc_avg.block += block;
    380                                                 io.submit_q.alloc_avg.cnt   += 1;
    381                                         )
    382 
    383                                         // debug log
    384                                         __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data );
    385 
    386                                         // Success return the data
    387                                         return [sqe, idx];
    388                                 }
    389                                 verify(expected != data);
    390 
    391                                 // This one was used
    392                                 len ++;
    393                         }
    394 
    395                         block++;
    396 
    397                         yield();
    398                 }
    399         }
    400 
    401         static inline __u32 __submit_to_ready_array( struct __io_data & ring, __u32 idx, const __u32 mask ) {
    402                 /* paranoid */ verify( idx <= mask   );
    403                 /* paranoid */ verify( idx != -1ul32 );
    404 
    405                 // We need to find a spot in the ready array
    406                 __attribute((unused)) int len   = 0;
    407                 __attribute((unused)) int block = 0;
    408                 __u32 ready_mask = ring.submit_q.ready_cnt - 1;
    409 
    410                 __u32 off = thread_rand();
    411 
    412                 __u32 picked;
    413                 LOOKING: for() {
    414                         for(i; ring.submit_q.ready_cnt) {
    415                                 picked = (i + off) & ready_mask;
    416                                 __u32 expected = -1ul32;
    417                                 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
    418                                         break LOOKING;
    419                                 }
    420                                 verify(expected != idx);
    421 
    422                                 len ++;
    423                         }
    424 
    425                         block++;
    426 
    427                         __u32 released = __release_consumed_submission( ring );
    428                         if( released == 0 ) {
    429                                 yield();
    430                         }
    431                 }
    432 
    433                 // update statistics
    434                 __STATS__( false,
    435                         io.submit_q.look_avg.val   += len;
    436                         io.submit_q.look_avg.block += block;
    437                         io.submit_q.look_avg.cnt   += 1;
    438                 )
    439 
    440                 return picked;
    441         }
    442 
    443         void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) {
    444                 __io_data & ring = *ctx->thrd.ring;
    445 
     231        struct $io_context * cfa_io_allocate(struct io_uring_sqe * sqes[], __u32 idxs[], __u32 want) {
     232                __cfadbg_print_safe(io, "Kernel I/O : attempting to allocate %u\n", want);
     233
     234                disable_interrupts();
     235                processor * proc = __cfaabi_tls.this_processor;
     236                $io_context * ctx = proc->io.ctx;
     237                /* paranoid */ verify( __cfaabi_tls.this_processor );
     238                /* paranoid */ verify( ctx );
     239
     240                __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n");
     241
     242                // We can proceed to the fast path
     243                if( __alloc(ctx, idxs, want) ) {
     244                        // Allocation was successful
     245                        __STATS__( true, io.alloc.fast += 1; )
     246                        enable_interrupts( __cfaabi_dbg_ctx );
     247
     248                        __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful from ring %d\n", ctx->fd);
     249
     250                        __fill( sqes, want, idxs, ctx );
     251                        return ctx;
     252                }
     253                // The fast path failed, fallback
     254                __STATS__( true, io.alloc.fail += 1; )
     255
     256                // Fast path failed, fallback on arbitration
     257                __STATS__( true, io.alloc.slow += 1; )
     258                enable_interrupts( __cfaabi_dbg_ctx );
     259
     260                $io_arbiter * ioarb = proc->cltr->io.arbiter;
     261                /* paranoid */ verify( ioarb );
     262
     263                __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n");
     264
     265                struct $io_context * ret = __ioarbiter_allocate(*ioarb, proc, idxs, want);
     266
     267                __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd);
     268
     269                __fill( sqes, want, idxs,ret );
     270                return ret;
     271        }
     272
     273
     274        //=============================================================================================
     275        // submission
     276        static inline void __submit( struct $io_context * ctx, __u32 idxs[], __u32 have, bool lazy) {
     277                // We can proceed to the fast path
     278                // Get the right objects
     279                __sub_ring_t & sq = ctx->sq;
     280                const __u32 mask  = *sq.mask;
     281                __u32 tail = *sq.kring.tail;
     282
     283                // Add the sqes to the array
     284                for( i; have ) {
     285                        __cfadbg_print_safe(io, "Kernel I/O : __submit loop\n");
     286                        sq.kring.array[ (tail + i) & mask ] = idxs[i];
     287                }
     288
     289                // Make the sqes visible to the submitter
     290                __atomic_store_n(sq.kring.tail, tail + have, __ATOMIC_RELEASE);
     291                sq.to_submit++;
     292
     293                ctx->proc->io.pending = true;
     294                ctx->proc->io.dirty   = true;
     295                if(sq.to_submit > 30 || !lazy) {
     296                        __cfa_io_flush( ctx->proc );
     297                }
     298        }
     299
     300        void cfa_io_submit( struct $io_context * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) {
     301                __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager");
     302
     303                disable_interrupts();
     304                processor * proc = __cfaabi_tls.this_processor;
     305                $io_context * ctx = proc->io.ctx;
     306                /* paranoid */ verify( __cfaabi_tls.this_processor );
     307                /* paranoid */ verify( ctx );
     308
     309                // Can we proceed to the fast path
     310                if( ctx == inctx )              // We have the right instance?
    446311                {
    447                         __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
    448                         __cfadbg_print_safe( io,
    449                                 "Kernel I/O : submitting %u (%p) for %p\n"
    450                                 "    data: %p\n"
    451                                 "    opcode: %s\n"
    452                                 "    fd: %d\n"
    453                                 "    flags: %d\n"
    454                                 "    prio: %d\n"
    455                                 "    off: %p\n"
    456                                 "    addr: %p\n"
    457                                 "    len: %d\n"
    458                                 "    other flags: %d\n"
    459                                 "    splice fd: %d\n"
    460                                 "    pad[0]: %llu\n"
    461                                 "    pad[1]: %llu\n"
    462                                 "    pad[2]: %llu\n",
    463                                 idx, sqe,
    464                                 active_thread(),
    465                                 (void*)sqe->user_data,
    466                                 opcodes[sqe->opcode],
    467                                 sqe->fd,
    468                                 sqe->flags,
    469                                 sqe->ioprio,
    470                                 (void*)sqe->off,
    471                                 (void*)sqe->addr,
    472                                 sqe->len,
    473                                 sqe->accept_flags,
    474                                 sqe->splice_fd_in,
    475                                 sqe->__pad2[0],
    476                                 sqe->__pad2[1],
    477                                 sqe->__pad2[2]
    478                         );
    479                 }
    480 
    481 
    482                 // Get now the data we definetely need
    483                 volatile __u32 * const tail = ring.submit_q.tail;
    484                 const __u32 mask  = *ring.submit_q.mask;
    485 
    486                 // There are 2 submission schemes, check which one we are using
    487                 if( ring.poller_submits ) {
    488                         // If the poller thread submits, then we just need to add this to the ready array
    489                         __submit_to_ready_array( ring, idx, mask );
    490 
    491                         post( ctx->thrd.sem );
    492 
    493                         __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
    494                 }
    495                 else if( ring.eager_submits ) {
    496                         __attribute__((unused)) __u32 picked = __submit_to_ready_array( ring, idx, mask );
    497 
    498                         #if defined(LEADER_LOCK)
    499                                 if( !try_lock(ring.submit_q.submit_lock) ) {
    500                                         __STATS__( false,
    501                                                 io.submit_q.helped += 1;
    502                                         )
    503                                         return;
    504                                 }
    505                                 /* paranoid */ verify( ! __preemption_enabled() );
    506                                 __STATS__( true,
    507                                         io.submit_q.leader += 1;
    508                                 )
    509                         #else
    510                                 for() {
    511                                         yield();
    512 
    513                                         if( try_lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2) ) {
    514                                                 __STATS__( false,
    515                                                         io.submit_q.leader += 1;
    516                                                 )
    517                                                 break;
    518                                         }
    519 
    520                                         // If some one else collected our index, we are done
    521                                         #warning ABA problem
    522                                         if( ring.submit_q.ready[picked] != idx ) {
    523                                                 __STATS__( false,
    524                                                         io.submit_q.helped += 1;
    525                                                 )
    526                                                 return;
    527                                         }
    528 
    529                                         __STATS__( false,
    530                                                 io.submit_q.busy += 1;
    531                                         )
    532                                 }
    533                         #endif
    534 
    535                         // We got the lock
    536                         // Collect the submissions
    537                         unsigned to_submit = __collect_submitions( ring );
    538 
    539                         // Actually submit
    540                         int ret = __io_uring_enter( ring, to_submit, false );
    541 
    542                         #if defined(LEADER_LOCK)
    543                                 /* paranoid */ verify( ! __preemption_enabled() );
    544                                 next(ring.submit_q.submit_lock);
    545                         #else
    546                                 unlock(ring.submit_q.submit_lock);
    547                         #endif
    548                         if( ret < 0 ) {
    549                                 return;
    550                         }
    551 
    552                         // Release the consumed SQEs
    553                         __release_consumed_submission( ring );
    554 
    555                         // update statistics
    556                         __STATS__( false,
    557                                 io.submit_q.submit_avg.rdy += to_submit;
    558                                 io.submit_q.submit_avg.csm += ret;
    559                                 io.submit_q.submit_avg.cnt += 1;
    560                         )
    561 
    562                         __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() );
    563                 }
    564                 else
    565                 {
    566                         // get mutual exclusion
    567                         #if defined(LEADER_LOCK)
    568                                 while(!try_lock(ring.submit_q.submit_lock));
    569                         #else
    570                                 lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2);
    571                         #endif
    572 
    573                         /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64,
    574                         /* paranoid */  "index %u already reclaimed\n"
    575                         /* paranoid */  "head %u, prev %u, tail %u\n"
    576                         /* paranoid */  "[-0: %u,-1: %u,-2: %u,-3: %u]\n",
    577                         /* paranoid */  idx,
    578                         /* paranoid */  *ring.submit_q.head, ring.submit_q.prev_head, *tail
    579                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
    580                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
    581                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
    582                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
    583                         /* paranoid */ );
    584 
    585                         // Append to the list of ready entries
    586 
    587                         /* paranoid */ verify( idx <= mask );
    588                         ring.submit_q.array[ (*tail) & mask ] = idx;
    589                         __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
    590 
    591                         // Submit however, many entries need to be submitted
    592                         int ret = __io_uring_enter( ring, 1, false );
    593                         if( ret < 0 ) {
    594                                 switch((int)errno) {
    595                                 default:
    596                                         abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
    597                                 }
    598                         }
    599 
    600                         /* paranoid */ verify(ret == 1);
    601 
    602                         // update statistics
    603                         __STATS__( false,
    604                                 io.submit_q.submit_avg.csm += 1;
    605                                 io.submit_q.submit_avg.cnt += 1;
    606                         )
    607 
    608                         {
    609                                 __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head;
    610                                 __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ];
    611                                 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx];
    612 
    613                                 __cfadbg_print_safe( io,
    614                                         "Kernel I/O : last submitted is %u (%p)\n"
    615                                         "    data: %p\n"
    616                                         "    opcode: %s\n"
    617                                         "    fd: %d\n"
    618                                         "    flags: %d\n"
    619                                         "    prio: %d\n"
    620                                         "    off: %p\n"
    621                                         "    addr: %p\n"
    622                                         "    len: %d\n"
    623                                         "    other flags: %d\n"
    624                                         "    splice fd: %d\n"
    625                                         "    pad[0]: %llu\n"
    626                                         "    pad[1]: %llu\n"
    627                                         "    pad[2]: %llu\n",
    628                                         last_idx, sqe,
    629                                         (void*)sqe->user_data,
    630                                         opcodes[sqe->opcode],
    631                                         sqe->fd,
    632                                         sqe->flags,
    633                                         sqe->ioprio,
    634                                         (void*)sqe->off,
    635                                         (void*)sqe->addr,
    636                                         sqe->len,
    637                                         sqe->accept_flags,
    638                                         sqe->splice_fd_in,
    639                                         sqe->__pad2[0],
    640                                         sqe->__pad2[1],
    641                                         sqe->__pad2[2]
    642                                 );
    643                         }
    644 
    645                         __atomic_thread_fence( __ATOMIC_SEQ_CST );
    646                         // Release the consumed SQEs
    647 
    648                         __release_consumed_submission( ring );
    649                         // ring.submit_q.sqes[idx].user_data = 3ul64;
    650 
    651                         #if defined(LEADER_LOCK)
    652                                 next(ring.submit_q.submit_lock);
    653                         #else
    654                                 unlock(ring.submit_q.submit_lock);
    655                         #endif
    656 
    657                         __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() );
    658                 }
    659         }
    660 
    661         // #define PARTIAL_SUBMIT 32
    662 
    663         // go through the list of submissions in the ready array and moved them into
    664         // the ring's submit queue
    665         static unsigned __collect_submitions( struct __io_data & ring ) {
    666                 /* paranoid */ verify( ring.submit_q.ready != 0p );
    667                 /* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
    668 
    669                 unsigned to_submit = 0;
    670                 __u32 tail = *ring.submit_q.tail;
    671                 const __u32 mask = *ring.submit_q.mask;
    672                 #if defined(PARTIAL_SUBMIT)
    673                         #if defined(LEADER_LOCK)
    674                                 #error PARTIAL_SUBMIT and LEADER_LOCK cannot co-exist
    675                         #endif
    676                         const __u32 cnt = ring.submit_q.ready_cnt > PARTIAL_SUBMIT ? PARTIAL_SUBMIT : ring.submit_q.ready_cnt;
    677                         const __u32 offset = ring.submit_q.prev_ready;
    678                         ring.submit_q.prev_ready += cnt;
    679                 #else
    680                         const __u32 cnt = ring.submit_q.ready_cnt;
    681                         const __u32 offset = 0;
    682                 #endif
    683 
    684                 // Go through the list of ready submissions
    685                 for( c; cnt ) {
    686                         __u32 i = (offset + c) % ring.submit_q.ready_cnt;
    687 
    688                         // replace any submission with the sentinel, to consume it.
    689                         __u32 idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
    690 
    691                         // If it was already the sentinel, then we are done
    692                         if( idx == -1ul32 ) continue;
    693 
    694                         // If we got a real submission, append it to the list
    695                         ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
    696                         to_submit++;
    697                 }
    698 
    699                 // Increment the tail based on how many we are ready to submit
    700                 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
    701 
    702                 return to_submit;
    703         }
    704 
     312                        __submit(ctx, idxs, have, lazy);
     313
     314                        // Mark the instance as no longer in-use, re-enable interrupts and return
     315                        __STATS__( true, io.submit.fast += 1; )
     316                        enable_interrupts( __cfaabi_dbg_ctx );
     317
     318                        __cfadbg_print_safe(io, "Kernel I/O : submitted on fast path\n");
     319                        return;
     320                }
     321
     322                // Fast path failed, fallback on arbitration
     323                __STATS__( true, io.submit.slow += 1; )
     324                enable_interrupts( __cfaabi_dbg_ctx );
     325
     326                __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n");
     327
     328                __ioarbiter_submit(*inctx->arbiter, inctx, idxs, have, lazy);
     329        }
     330
     331        //=============================================================================================
     332        // Flushing
    705333        // Go through the ring's submit queue and release everything that has already been consumed
    706334        // by io_uring
    707         static __u32 __release_consumed_submission( struct __io_data & ring ) {
    708                 const __u32 smask = *ring.submit_q.mask;
    709 
    710                 // We need to get the lock to copy the old head and new head
    711                 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
     335        // This cannot be done by multiple threads
     336        static __u32 __release_sqes( struct $io_context & ctx ) {
     337                const __u32 mask = *ctx.sq.mask;
     338
    712339                __attribute__((unused))
    713                 __u32 ctail = *ring.submit_q.tail;        // get the current tail of the queue
    714                 __u32 chead = *ring.submit_q.head;              // get the current head of the queue
    715                 __u32 phead = ring.submit_q.prev_head;  // get the head the last time we were here
    716                 ring.submit_q.prev_head = chead;                // note up to were we processed
    717                 unlock(ring.submit_q.release_lock);
     340                __u32 ctail = *ctx.sq.kring.tail;    // get the current tail of the queue
     341                __u32 chead = *ctx.sq.kring.head;        // get the current head of the queue
     342                __u32 phead = ctx.sq.kring.released; // get the head the last time we were here
     343
     344                __u32 ftail = ctx.sq.free_ring.tail;  // get the current tail of the queue
    718345
    719346                // the 3 fields are organized like this diagram
     
    734361                __u32 count = chead - phead;
    735362
     363                if(count == 0) {
     364                        return 0;
     365                }
     366
    736367                // We acquired an previous-head/current-head range
    737368                // go through the range and release the sqes
    738369                for( i; count ) {
    739                         __u32 idx = ring.submit_q.array[ (phead + i) & smask ];
    740 
    741                         /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data );
    742                         __clean( &ring.submit_q.sqes[ idx ] );
    743                 }
     370                        __cfadbg_print_safe(io, "Kernel I/O : release loop\n");
     371                        __u32 idx = ctx.sq.kring.array[ (phead + i) & mask ];
     372                        ctx.sq.free_ring.array[ (ftail + i) & mask ] = idx;
     373                }
     374
     375                ctx.sq.kring.released = chead;          // note up to were we processed
     376                __atomic_store_n(&ctx.sq.free_ring.tail, ftail + count, __ATOMIC_SEQ_CST);
     377
     378                __ioarbiter_notify(ctx);
     379
    744380                return count;
    745381        }
    746382
    747         void __sqe_clean( volatile struct io_uring_sqe * sqe ) {
    748                 __clean( sqe );
    749         }
    750 
    751         static inline void __clean( volatile struct io_uring_sqe * sqe ) {
    752                 // If we are in debug mode, thrash the fields to make sure we catch reclamation errors
    753                 __cfaabi_dbg_debug_do(
    754                         memset(sqe, 0xde, sizeof(*sqe));
    755                         sqe->opcode = (sizeof(opcodes) / sizeof(const char *)) - 1;
    756                 );
    757 
    758                 // Mark the entry as unused
    759                 __atomic_store_n(&sqe->user_data, 3ul64, __ATOMIC_SEQ_CST);
     383//=============================================================================================
     384// I/O Arbiter
     385//=============================================================================================
     386        static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor * proc, __u32 idxs[], __u32 want ) {
     387                __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n");
     388
     389                __STATS__( false, io.alloc.block += 1; )
     390
     391                // No one has any resources left, wait for something to finish
     392                // Mark as pending
     393                __atomic_store_n( &this.pending.flag, true, __ATOMIC_SEQ_CST );
     394
     395                // Wait for our turn to submit
     396                wait( this.pending.blocked, want );
     397
     398                __attribute((unused)) bool ret =
     399                __alloc( this.pending.ctx, idxs, want);
     400                /* paranoid */ verify( ret );
     401
     402                return this.pending.ctx;
     403
     404        }
     405
     406        static void __ioarbiter_notify( $io_arbiter & mutex this, $io_context * ctx ) {
     407                /* paranoid */ verify( !is_empty(this.pending.blocked) );
     408                this.pending.ctx = ctx;
     409
     410                while( !is_empty(this.pending.blocked) ) {
     411                        __cfadbg_print_safe(io, "Kernel I/O : notifying\n");
     412                        __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head;
     413                        __u32 want = front( this.pending.blocked );
     414
     415                        if( have > want ) return;
     416
     417                        signal_block( this.pending.blocked );
     418                }
     419
     420                this.pending.flag = false;
     421        }
     422
     423        static void __ioarbiter_notify( $io_context & ctx ) {
     424                if(__atomic_load_n( &ctx.arbiter->pending.flag, __ATOMIC_SEQ_CST)) {
     425                        __ioarbiter_notify( *ctx.arbiter, &ctx );
     426                }
     427        }
     428
     429        // Simply append to the pending
     430        static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * ctx, __u32 idxs[], __u32 have, bool lazy ) {
     431                __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd);
     432
     433                /* paranoid */ verify( &this == ctx->arbiter );
     434
     435                // Mark as pending
     436                __atomic_store_n( &ctx->ext_sq.empty, false, __ATOMIC_SEQ_CST );
     437
     438                __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have);
     439
     440                // Wait for our turn to submit
     441                wait( ctx->ext_sq.blocked );
     442
     443                // Submit our indexes
     444                __submit(ctx, idxs, have, lazy);
     445
     446                __cfadbg_print_safe(io, "Kernel I/O : %u submitted from arbiter\n", have);
     447        }
     448
     449        static void __ioarbiter_flush( $io_arbiter & mutex this, $io_context * ctx ) {
     450                /* paranoid */ verify( &this == ctx->arbiter );
     451
     452                __STATS__( false, io.flush.external += 1; )
     453
     454                __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n");
     455
     456                condition & blcked = ctx->ext_sq.blocked;
     457                /* paranoid */ verify( ctx->ext_sq.empty == is_empty( blcked ) );
     458                while(!is_empty( blcked )) {
     459                        signal_block( blcked );
     460                }
     461
     462                ctx->ext_sq.empty = true;
    760463        }
    761464#endif
Note: See TracChangeset for help on using the changeset viewer.