Ignore:
Timestamp:
Mar 4, 2021, 7:40:25 PM (5 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
77d601f
Parents:
342af53 (diff), a5040fe (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    r342af53 r8e4aa05  
    3232        extern "C" {
    3333                #include <sys/syscall.h>
     34                #include <sys/eventfd.h>
    3435
    3536                #include <linux/io_uring.h>
     
    4142        #include "io/types.hfa"
    4243
    43         static const char * opcodes[] = {
     44        __attribute__((unused)) static const char * opcodes[] = {
    4445                "OP_NOP",
    4546                "OP_READV",
     
    7980        };
    8081
    81         // returns true of acquired as leader or second leader
    82         static inline bool try_lock( __leaderlock_t & this ) {
    83                 const uintptr_t thrd = 1z | (uintptr_t)active_thread();
    84                 bool block;
    85                 disable_interrupts();
    86                 for() {
    87                         struct $thread * expected = this.value;
    88                         if( 1p != expected && 0p != expected ) {
    89                                 /* paranoid */ verify( thrd != (uintptr_t)expected ); // We better not already be the next leader
    90                                 enable_interrupts( __cfaabi_dbg_ctx );
    91                                 return false;
    92                         }
    93                         struct $thread * desired;
    94                         if( 0p == expected ) {
    95                                 // If the lock isn't locked acquire it, no need to block
    96                                 desired = 1p;
    97                                 block = false;
    98                         }
    99                         else {
    100                                 // If the lock is already locked try becomming the next leader
    101                                 desired = (struct $thread *)thrd;
    102                                 block = true;
    103                         }
    104                         if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break;
    105                 }
    106                 if( block ) {
    107                         enable_interrupts( __cfaabi_dbg_ctx );
    108                         park();
    109                         disable_interrupts();
    110                 }
    111                 return true;
    112         }
    113 
    114         static inline bool next( __leaderlock_t & this ) {
     82        static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor *, __u32 idxs[], __u32 want );
     83        static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * , __u32 idxs[], __u32 have, bool lazy );
     84        static void __ioarbiter_flush ( $io_arbiter & mutex this, $io_context * );
     85        static inline void __ioarbiter_notify( $io_context & ctx );
     86//=============================================================================================
     87// I/O Polling
     88//=============================================================================================
     89        static inline unsigned __flush( struct $io_context & );
     90        static inline __u32 __release_sqes( struct $io_context & );
     91
     92        void __cfa_io_drain( processor * proc ) {
    11593                /* paranoid */ verify( ! __preemption_enabled() );
    116                 struct $thread * nextt;
    117                 for() {
    118                         struct $thread * expected = this.value;
    119                         /* paranoid */ verify( (1 & (uintptr_t)expected) == 1 ); // The lock better be locked
    120 
    121                         struct $thread * desired;
    122                         if( 1p == expected ) {
    123                                 // No next leader, just unlock
    124                                 desired = 0p;
    125                                 nextt   = 0p;
    126                         }
    127                         else {
    128                                 // There is a next leader, remove but keep locked
    129                                 desired = 1p;
    130                                 nextt   = (struct $thread *)(~1z & (uintptr_t)expected);
    131                         }
    132                         if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break;
    133                 }
    134 
    135                 if(nextt) {
    136                         unpark( nextt );
    137                         enable_interrupts( __cfaabi_dbg_ctx );
    138                         return true;
    139                 }
    140                 enable_interrupts( __cfaabi_dbg_ctx );
    141                 return false;
    142         }
    143 
    144 //=============================================================================================
    145 // I/O Syscall
    146 //=============================================================================================
    147         static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) {
    148                 bool need_sys_to_submit = false;
    149                 bool need_sys_to_complete = false;
    150                 unsigned flags = 0;
    151 
    152                 TO_SUBMIT:
    153                 if( to_submit > 0 ) {
    154                         if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
    155                                 need_sys_to_submit = true;
    156                                 break TO_SUBMIT;
    157                         }
    158                         if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
    159                                 need_sys_to_submit = true;
    160                                 flags |= IORING_ENTER_SQ_WAKEUP;
    161                         }
    162                 }
    163 
    164                 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
    165                         flags |= IORING_ENTER_GETEVENTS;
    166                         if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
    167                                 need_sys_to_complete = true;
    168                         }
    169                 }
    170 
    171                 int ret = 0;
    172                 if( need_sys_to_submit || need_sys_to_complete ) {
    173                         __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags);
    174                         ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8);
    175                         if( ret < 0 ) {
    176                                 switch((int)errno) {
    177                                 case EAGAIN:
    178                                 case EINTR:
    179                                         ret = -1;
    180                                         break;
    181                                 default:
    182                                         abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
    183                                 }
    184                         }
    185                 }
    186 
    187                 // Memory barrier
    188                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
    189                 return ret;
    190         }
    191 
    192 //=============================================================================================
    193 // I/O Polling
    194 //=============================================================================================
    195         static unsigned __collect_submitions( struct __io_data & ring );
    196         static __u32 __release_consumed_submission( struct __io_data & ring );
    197         static inline void __clean( volatile struct io_uring_sqe * sqe );
    198 
    199         // Process a single completion message from the io_uring
    200         // This is NOT thread-safe
    201         static inline void process( volatile struct io_uring_cqe & cqe ) {
    202                 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
    203                 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
    204 
    205                 fulfil( *future, cqe.res );
    206         }
    207 
    208         static [int, bool] __drain_io( & struct __io_data ring ) {
    209                 /* paranoid */ verify( ! __preemption_enabled() );
    210 
    211                 unsigned to_submit = 0;
    212                 if( ring.poller_submits ) {
    213                         // If the poller thread also submits, then we need to aggregate the submissions which are ready
    214                         to_submit = __collect_submitions( ring );
    215                 }
    216 
    217                 int ret = __io_uring_enter(ring, to_submit, true);
    218                 if( ret < 0 ) {
    219                         return [0, true];
    220                 }
    221 
    222                 // update statistics
    223                 if (to_submit > 0) {
    224                         __STATS__( true,
    225                                 if( to_submit > 0 ) {
    226                                         io.submit_q.submit_avg.rdy += to_submit;
    227                                         io.submit_q.submit_avg.csm += ret;
    228                                         io.submit_q.submit_avg.cnt += 1;
    229                                 }
    230                         )
    231                 }
    232 
    233                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
    234 
    235                 // Release the consumed SQEs
    236                 __release_consumed_submission( ring );
     94                /* paranoid */ verify( proc );
     95                /* paranoid */ verify( proc->io.ctx );
    23796
    23897                // Drain the queue
    239                 unsigned head = *ring.completion_q.head;
    240                 unsigned tail = *ring.completion_q.tail;
    241                 const __u32 mask = *ring.completion_q.mask;
    242 
    243                 // Nothing was new return 0
    244                 if (head == tail) {
    245                         return [0, to_submit > 0];
    246                 }
     98                $io_context * ctx = proc->io.ctx;
     99                unsigned head = *ctx->cq.head;
     100                unsigned tail = *ctx->cq.tail;
     101                const __u32 mask = *ctx->cq.mask;
    247102
    248103                __u32 count = tail - head;
    249                 /* paranoid */ verify( count != 0 );
     104                __STATS__( false, io.calls.drain++; io.calls.completed += count; )
     105
    250106                for(i; count) {
    251107                        unsigned idx = (head + i) & mask;
    252                         volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
     108                        volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx];
    253109
    254110                        /* paranoid */ verify(&cqe);
    255111
    256                         process( cqe );
    257                 }
     112                        struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
     113                        __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
     114
     115                        fulfil( *future, cqe.res );
     116                }
     117
     118                __cfadbg_print_safe(io, "Kernel I/O : %u completed\n", count);
    258119
    259120                // Mark to the kernel that the cqe has been seen
    260121                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
    261                 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST );
    262 
    263                 return [count, count > 0 || to_submit > 0];
    264         }
    265 
    266         void main( $io_ctx_thread & this ) {
    267                 __ioctx_register( this );
    268 
    269                 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this);
    270 
    271                 const int reset_cnt = 5;
    272                 int reset = reset_cnt;
    273                 // Then loop until we need to start
    274                 LOOP:
    275                 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
    276                         // Drain the io
    277                         int count;
    278                         bool again;
    279                         disable_interrupts();
    280                                 [count, again] = __drain_io( *this.ring );
    281 
    282                                 if(!again) reset--;
    283 
     122                __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST );
     123
     124                /* paranoid */ verify( ! __preemption_enabled() );
     125
     126                return;
     127        }
     128
     129        void __cfa_io_flush( processor * proc ) {
     130                /* paranoid */ verify( ! __preemption_enabled() );
     131                /* paranoid */ verify( proc );
     132                /* paranoid */ verify( proc->io.ctx );
     133
     134                $io_context & ctx = *proc->io.ctx;
     135
     136                if(!ctx.ext_sq.empty) {
     137                        __ioarbiter_flush( *ctx.arbiter, &ctx );
     138                }
     139
     140                __STATS__( true, io.calls.flush++; )
     141                int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, 0, 0, (sigset_t *)0p, _NSIG / 8);
     142                if( ret < 0 ) {
     143                        switch((int)errno) {
     144                        case EAGAIN:
     145                        case EINTR:
     146                        case EBUSY:
    284147                                // Update statistics
    285                                 __STATS__( true,
    286                                         io.complete_q.completed_avg.val += count;
    287                                         io.complete_q.completed_avg.cnt += 1;
    288                                 )
    289                         enable_interrupts( __cfaabi_dbg_ctx );
    290 
    291                         // If we got something, just yield and check again
    292                         if(reset > 1) {
    293                                 yield();
    294                                 continue LOOP;
     148                                __STATS__( false, io.calls.errors.busy ++; )
     149                                return;
     150                        default:
     151                                abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
    295152                        }
    296 
    297                         // We alread failed to find completed entries a few time.
    298                         if(reset == 1) {
    299                                 // Rearm the context so it can block
    300                                 // but don't block right away
    301                                 // we need to retry one last time in case
    302                                 // something completed *just now*
    303                                 __ioctx_prepare_block( this );
    304                                 continue LOOP;
    305                         }
    306 
    307                                 __STATS__( false,
    308                                         io.complete_q.blocks += 1;
    309                                 )
    310                                 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this);
    311 
    312                                 // block this thread
    313                                 wait( this.sem );
    314 
    315                         // restore counter
    316                         reset = reset_cnt;
    317                 }
    318 
    319                 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this);
     153                }
     154
     155                __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd);
     156                __STATS__( true, io.calls.submitted += ret; )
     157                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     158                /* paranoid */ verify( ctx.sq.to_submit >= ret );
     159
     160                ctx.sq.to_submit -= ret;
     161
     162                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     163
     164                // Release the consumed SQEs
     165                __release_sqes( ctx );
     166
     167                /* paranoid */ verify( ! __preemption_enabled() );
     168
     169                ctx.proc->io.pending = false;
    320170        }
    321171
     
    339189//         head and tail must be fully filled and shouldn't ever be touched again.
    340190//
     191        //=============================================================================================
     192        // Allocation
     193        // for user's convenience fill the sqes from the indexes
     194        static inline void __fill(struct io_uring_sqe * out_sqes[], __u32 want, __u32 idxs[], struct $io_context * ctx)  {
     195                struct io_uring_sqe * sqes = ctx->sq.sqes;
     196                for(i; want) {
     197                        __cfadbg_print_safe(io, "Kernel I/O : filling loop\n");
     198                        out_sqes[i] = &sqes[idxs[i]];
     199                }
     200        }
     201
     202        // Try to directly allocate from the a given context
     203        // Not thread-safe
     204        static inline bool __alloc(struct $io_context * ctx, __u32 idxs[], __u32 want) {
     205                __sub_ring_t & sq = ctx->sq;
     206                const __u32 mask  = *sq.mask;
     207                __u32 fhead = sq.free_ring.head;    // get the current head of the queue
     208                __u32 ftail = sq.free_ring.tail;    // get the current tail of the queue
     209
     210                // If we don't have enough sqes, fail
     211                if((ftail - fhead) < want) { return false; }
     212
     213                // copy all the indexes we want from the available list
     214                for(i; want) {
     215                        __cfadbg_print_safe(io, "Kernel I/O : allocating loop\n");
     216                        idxs[i] = sq.free_ring.array[(fhead + i) & mask];
     217                }
     218
     219                // Advance the head to mark the indexes as consumed
     220                __atomic_store_n(&sq.free_ring.head, fhead + want, __ATOMIC_RELEASE);
     221
     222                // return success
     223                return true;
     224        }
    341225
    342226        // Allocate an submit queue entry.
     
    345229        // for convenience, return both the index and the pointer to the sqe
    346230        // sqe == &sqes[idx]
    347         [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) {
    348                 /* paranoid */ verify( data != 0 );
    349 
    350                 // Prepare the data we need
    351                 __attribute((unused)) int len   = 0;
    352                 __attribute((unused)) int block = 0;
    353                 __u32 cnt = *ring.submit_q.num;
    354                 __u32 mask = *ring.submit_q.mask;
    355 
    356                 __u32 off = thread_rand();
    357 
    358                 // Loop around looking for an available spot
    359                 for() {
    360                         // Look through the list starting at some offset
    361                         for(i; cnt) {
    362                                 __u64 expected = 3;
    363                                 __u32 idx = (i + off) & mask; // Get an index from a random
    364                                 volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
    365                                 volatile __u64 * udata = &sqe->user_data;
    366 
    367                                 // Allocate the entry by CASing the user_data field from 0 to the future address
    368                                 if( *udata == expected &&
    369                                         __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
    370                                 {
    371                                         // update statistics
    372                                         __STATS__( false,
    373                                                 io.submit_q.alloc_avg.val   += len;
    374                                                 io.submit_q.alloc_avg.block += block;
    375                                                 io.submit_q.alloc_avg.cnt   += 1;
    376                                         )
    377 
    378                                         // debug log
    379                                         __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data );
    380 
    381                                         // Success return the data
    382                                         return [sqe, idx];
    383                                 }
    384                                 verify(expected != data);
    385 
    386                                 // This one was used
    387                                 len ++;
    388                         }
    389 
    390                         block++;
    391 
    392                         abort( "Kernel I/O : all submit queue entries used, yielding\n" );
    393 
    394                         yield();
    395                 }
    396         }
    397 
    398         static inline __u32 __submit_to_ready_array( struct __io_data & ring, __u32 idx, const __u32 mask ) {
    399                 /* paranoid */ verify( idx <= mask   );
    400                 /* paranoid */ verify( idx != -1ul32 );
    401 
    402                 // We need to find a spot in the ready array
    403                 __attribute((unused)) int len   = 0;
    404                 __attribute((unused)) int block = 0;
    405                 __u32 ready_mask = ring.submit_q.ready_cnt - 1;
    406 
    407                 __u32 off = thread_rand();
    408 
    409                 __u32 picked;
    410                 LOOKING: for() {
    411                         for(i; ring.submit_q.ready_cnt) {
    412                                 picked = (i + off) & ready_mask;
    413                                 __u32 expected = -1ul32;
    414                                 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
    415                                         break LOOKING;
    416                                 }
    417                                 verify(expected != idx);
    418 
    419                                 len ++;
    420                         }
    421 
    422                         block++;
    423 
    424                         __u32 released = __release_consumed_submission( ring );
    425                         if( released == 0 ) {
    426                                 yield();
    427                         }
    428                 }
    429 
    430                 // update statistics
    431                 __STATS__( false,
    432                         io.submit_q.look_avg.val   += len;
    433                         io.submit_q.look_avg.block += block;
    434                         io.submit_q.look_avg.cnt   += 1;
    435                 )
    436 
    437                 return picked;
    438         }
    439 
    440         void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) {
    441                 __io_data & ring = *ctx->thrd.ring;
    442 
     231        struct $io_context * cfa_io_allocate(struct io_uring_sqe * sqes[], __u32 idxs[], __u32 want) {
     232                __cfadbg_print_safe(io, "Kernel I/O : attempting to allocate %u\n", want);
     233
     234                disable_interrupts();
     235                processor * proc = __cfaabi_tls.this_processor;
     236                $io_context * ctx = proc->io.ctx;
     237                /* paranoid */ verify( __cfaabi_tls.this_processor );
     238                /* paranoid */ verify( ctx );
     239
     240                __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n");
     241
     242                // We can proceed to the fast path
     243                if( __alloc(ctx, idxs, want) ) {
     244                        // Allocation was successful
     245                        __STATS__( true, io.alloc.fast += 1; )
     246                        enable_interrupts( __cfaabi_dbg_ctx );
     247
     248                        __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful from ring %d\n", ctx->fd);
     249
     250                        __fill( sqes, want, idxs, ctx );
     251                        return ctx;
     252                }
     253                // The fast path failed, fallback
     254                __STATS__( true, io.alloc.fail += 1; )
     255
     256                // Fast path failed, fallback on arbitration
     257                __STATS__( true, io.alloc.slow += 1; )
     258                enable_interrupts( __cfaabi_dbg_ctx );
     259
     260                $io_arbiter * ioarb = proc->cltr->io.arbiter;
     261                /* paranoid */ verify( ioarb );
     262
     263                __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n");
     264
     265                struct $io_context * ret = __ioarbiter_allocate(*ioarb, proc, idxs, want);
     266
     267                __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd);
     268
     269                __fill( sqes, want, idxs,ret );
     270                return ret;
     271        }
     272
     273
     274        //=============================================================================================
     275        // submission
     276        static inline void __submit( struct $io_context * ctx, __u32 idxs[], __u32 have, bool lazy) {
     277                // We can proceed to the fast path
     278                // Get the right objects
     279                __sub_ring_t & sq = ctx->sq;
     280                const __u32 mask  = *sq.mask;
     281                __u32 tail = *sq.kring.tail;
     282
     283                // Add the sqes to the array
     284                for( i; have ) {
     285                        __cfadbg_print_safe(io, "Kernel I/O : __submit loop\n");
     286                        sq.kring.array[ (tail + i) & mask ] = idxs[i];
     287                }
     288
     289                // Make the sqes visible to the submitter
     290                __atomic_store_n(sq.kring.tail, tail + have, __ATOMIC_RELEASE);
     291                sq.to_submit++;
     292
     293                ctx->proc->io.pending = true;
     294                ctx->proc->io.dirty   = true;
     295                if(sq.to_submit > 30 || !lazy) {
     296                        __cfa_io_flush( ctx->proc );
     297                }
     298        }
     299
     300        void cfa_io_submit( struct $io_context * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) {
     301                __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager");
     302
     303                disable_interrupts();
     304                processor * proc = __cfaabi_tls.this_processor;
     305                $io_context * ctx = proc->io.ctx;
     306                /* paranoid */ verify( __cfaabi_tls.this_processor );
     307                /* paranoid */ verify( ctx );
     308
     309                // Can we proceed to the fast path
     310                if( ctx == inctx )              // We have the right instance?
    443311                {
    444                         __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
    445                         __cfadbg_print_safe( io,
    446                                 "Kernel I/O : submitting %u (%p) for %p\n"
    447                                 "    data: %p\n"
    448                                 "    opcode: %s\n"
    449                                 "    fd: %d\n"
    450                                 "    flags: %d\n"
    451                                 "    prio: %d\n"
    452                                 "    off: %p\n"
    453                                 "    addr: %p\n"
    454                                 "    len: %d\n"
    455                                 "    other flags: %d\n"
    456                                 "    splice fd: %d\n"
    457                                 "    pad[0]: %llu\n"
    458                                 "    pad[1]: %llu\n"
    459                                 "    pad[2]: %llu\n",
    460                                 idx, sqe,
    461                                 active_thread(),
    462                                 (void*)sqe->user_data,
    463                                 opcodes[sqe->opcode],
    464                                 sqe->fd,
    465                                 sqe->flags,
    466                                 sqe->ioprio,
    467                                 sqe->off,
    468                                 sqe->addr,
    469                                 sqe->len,
    470                                 sqe->accept_flags,
    471                                 sqe->splice_fd_in,
    472                                 sqe->__pad2[0],
    473                                 sqe->__pad2[1],
    474                                 sqe->__pad2[2]
    475                         );
    476                 }
    477 
    478 
    479                 // Get now the data we definetely need
    480                 volatile __u32 * const tail = ring.submit_q.tail;
    481                 const __u32 mask  = *ring.submit_q.mask;
    482 
    483                 // There are 2 submission schemes, check which one we are using
    484                 if( ring.poller_submits ) {
    485                         // If the poller thread submits, then we just need to add this to the ready array
    486                         __submit_to_ready_array( ring, idx, mask );
    487 
    488                         post( ctx->thrd.sem );
    489 
    490                         __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
    491                 }
    492                 else if( ring.eager_submits ) {
    493                         __u32 picked = __submit_to_ready_array( ring, idx, mask );
    494 
    495                         #if defined(LEADER_LOCK)
    496                                 if( !try_lock(ring.submit_q.submit_lock) ) {
    497                                         __STATS__( false,
    498                                                 io.submit_q.helped += 1;
    499                                         )
    500                                         return;
    501                                 }
    502                                 /* paranoid */ verify( ! __preemption_enabled() );
    503                                 __STATS__( true,
    504                                         io.submit_q.leader += 1;
    505                                 )
    506                         #else
    507                                 for() {
    508                                         yield();
    509 
    510                                         if( try_lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2) ) {
    511                                                 __STATS__( false,
    512                                                         io.submit_q.leader += 1;
    513                                                 )
    514                                                 break;
    515                                         }
    516 
    517                                         // If some one else collected our index, we are done
    518                                         #warning ABA problem
    519                                         if( ring.submit_q.ready[picked] != idx ) {
    520                                                 __STATS__( false,
    521                                                         io.submit_q.helped += 1;
    522                                                 )
    523                                                 return;
    524                                         }
    525 
    526                                         __STATS__( false,
    527                                                 io.submit_q.busy += 1;
    528                                         )
    529                                 }
    530                         #endif
    531 
    532                         // We got the lock
    533                         // Collect the submissions
    534                         unsigned to_submit = __collect_submitions( ring );
    535 
    536                         // Actually submit
    537                         int ret = __io_uring_enter( ring, to_submit, false );
    538 
    539                         #if defined(LEADER_LOCK)
    540                                 /* paranoid */ verify( ! __preemption_enabled() );
    541                                 next(ring.submit_q.submit_lock);
    542                         #else
    543                                 unlock(ring.submit_q.submit_lock);
    544                         #endif
    545                         if( ret < 0 ) {
    546                                 return;
    547                         }
    548 
    549                         // Release the consumed SQEs
    550                         __release_consumed_submission( ring );
    551 
    552                         // update statistics
    553                         __STATS__( false,
    554                                 io.submit_q.submit_avg.rdy += to_submit;
    555                                 io.submit_q.submit_avg.csm += ret;
    556                                 io.submit_q.submit_avg.cnt += 1;
    557                         )
    558 
    559                         __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() );
    560                 }
    561                 else
    562                 {
    563                         // get mutual exclusion
    564                         #if defined(LEADER_LOCK)
    565                                 while(!try_lock(ring.submit_q.submit_lock));
    566                         #else
    567                                 lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2);
    568                         #endif
    569 
    570                         /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64,
    571                         /* paranoid */  "index %u already reclaimed\n"
    572                         /* paranoid */  "head %u, prev %u, tail %u\n"
    573                         /* paranoid */  "[-0: %u,-1: %u,-2: %u,-3: %u]\n",
    574                         /* paranoid */  idx,
    575                         /* paranoid */  *ring.submit_q.head, ring.submit_q.prev_head, *tail
    576                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
    577                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
    578                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
    579                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
    580                         /* paranoid */ );
    581 
    582                         // Append to the list of ready entries
    583 
    584                         /* paranoid */ verify( idx <= mask );
    585                         ring.submit_q.array[ (*tail) & mask ] = idx;
    586                         __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
    587 
    588                         // Submit however, many entries need to be submitted
    589                         int ret = __io_uring_enter( ring, 1, false );
    590                         if( ret < 0 ) {
    591                                 switch((int)errno) {
    592                                 default:
    593                                         abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
    594                                 }
    595                         }
    596 
    597                         /* paranoid */ verify(ret == 1);
    598 
    599                         // update statistics
    600                         __STATS__( false,
    601                                 io.submit_q.submit_avg.csm += 1;
    602                                 io.submit_q.submit_avg.cnt += 1;
    603                         )
    604 
    605                         {
    606                                 __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head;
    607                                 __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ];
    608                                 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx];
    609 
    610                                 __cfadbg_print_safe( io,
    611                                         "Kernel I/O : last submitted is %u (%p)\n"
    612                                         "    data: %p\n"
    613                                         "    opcode: %s\n"
    614                                         "    fd: %d\n"
    615                                         "    flags: %d\n"
    616                                         "    prio: %d\n"
    617                                         "    off: %p\n"
    618                                         "    addr: %p\n"
    619                                         "    len: %d\n"
    620                                         "    other flags: %d\n"
    621                                         "    splice fd: %d\n"
    622                                         "    pad[0]: %llu\n"
    623                                         "    pad[1]: %llu\n"
    624                                         "    pad[2]: %llu\n",
    625                                         last_idx, sqe,
    626                                         (void*)sqe->user_data,
    627                                         opcodes[sqe->opcode],
    628                                         sqe->fd,
    629                                         sqe->flags,
    630                                         sqe->ioprio,
    631                                         sqe->off,
    632                                         sqe->addr,
    633                                         sqe->len,
    634                                         sqe->accept_flags,
    635                                         sqe->splice_fd_in,
    636                                         sqe->__pad2[0],
    637                                         sqe->__pad2[1],
    638                                         sqe->__pad2[2]
    639                                 );
    640                         }
    641 
    642                         __atomic_thread_fence( __ATOMIC_SEQ_CST );
    643                         // Release the consumed SQEs
    644                         __release_consumed_submission( ring );
    645                         // ring.submit_q.sqes[idx].user_data = 3ul64;
    646 
    647                         #if defined(LEADER_LOCK)
    648                                 next(ring.submit_q.submit_lock);
    649                         #else
    650                                 unlock(ring.submit_q.submit_lock);
    651                         #endif
    652 
    653                         __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() );
    654                 }
    655         }
    656 
    657         // #define PARTIAL_SUBMIT 32
    658 
    659         // go through the list of submissions in the ready array and moved them into
    660         // the ring's submit queue
    661         static unsigned __collect_submitions( struct __io_data & ring ) {
    662                 /* paranoid */ verify( ring.submit_q.ready != 0p );
    663                 /* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
    664 
    665                 unsigned to_submit = 0;
    666                 __u32 tail = *ring.submit_q.tail;
    667                 const __u32 mask = *ring.submit_q.mask;
    668                 #if defined(PARTIAL_SUBMIT)
    669                         #if defined(LEADER_LOCK)
    670                                 #error PARTIAL_SUBMIT and LEADER_LOCK cannot co-exist
    671                         #endif
    672                         const __u32 cnt = ring.submit_q.ready_cnt > PARTIAL_SUBMIT ? PARTIAL_SUBMIT : ring.submit_q.ready_cnt;
    673                         const __u32 offset = ring.submit_q.prev_ready;
    674                         ring.submit_q.prev_ready += cnt;
    675                 #else
    676                         const __u32 cnt = ring.submit_q.ready_cnt;
    677                         const __u32 offset = 0;
    678                 #endif
    679 
    680                 // Go through the list of ready submissions
    681                 for( c; cnt ) {
    682                         __u32 i = (offset + c) % ring.submit_q.ready_cnt;
    683 
    684                         // replace any submission with the sentinel, to consume it.
    685                         __u32 idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
    686 
    687                         // If it was already the sentinel, then we are done
    688                         if( idx == -1ul32 ) continue;
    689 
    690                         // If we got a real submission, append it to the list
    691                         ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
    692                         to_submit++;
    693                 }
    694 
    695                 // Increment the tail based on how many we are ready to submit
    696                 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
    697 
    698                 return to_submit;
    699         }
    700 
     312                        __submit(ctx, idxs, have, lazy);
     313
     314                        // Mark the instance as no longer in-use, re-enable interrupts and return
     315                        __STATS__( true, io.submit.fast += 1; )
     316                        enable_interrupts( __cfaabi_dbg_ctx );
     317
     318                        __cfadbg_print_safe(io, "Kernel I/O : submitted on fast path\n");
     319                        return;
     320                }
     321
     322                // Fast path failed, fallback on arbitration
     323                __STATS__( true, io.submit.slow += 1; )
     324                enable_interrupts( __cfaabi_dbg_ctx );
     325
     326                __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n");
     327
     328                __ioarbiter_submit(*inctx->arbiter, inctx, idxs, have, lazy);
     329        }
     330
     331        //=============================================================================================
     332        // Flushing
    701333        // Go through the ring's submit queue and release everything that has already been consumed
    702334        // by io_uring
    703         static __u32 __release_consumed_submission( struct __io_data & ring ) {
    704                 const __u32 smask = *ring.submit_q.mask;
    705 
    706                 // We need to get the lock to copy the old head and new head
    707                 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
     335        // This cannot be done by multiple threads
     336        static __u32 __release_sqes( struct $io_context & ctx ) {
     337                const __u32 mask = *ctx.sq.mask;
     338
    708339                __attribute__((unused))
    709                 __u32 ctail = *ring.submit_q.tail;        // get the current tail of the queue
    710                 __u32 chead = *ring.submit_q.head;              // get the current head of the queue
    711                 __u32 phead = ring.submit_q.prev_head;  // get the head the last time we were here
    712                 ring.submit_q.prev_head = chead;                // note up to were we processed
    713                 unlock(ring.submit_q.release_lock);
     340                __u32 ctail = *ctx.sq.kring.tail;    // get the current tail of the queue
     341                __u32 chead = *ctx.sq.kring.head;        // get the current head of the queue
     342                __u32 phead = ctx.sq.kring.released; // get the head the last time we were here
     343
     344                __u32 ftail = ctx.sq.free_ring.tail;  // get the current tail of the queue
    714345
    715346                // the 3 fields are organized like this diagram
     
    730361                __u32 count = chead - phead;
    731362
     363                if(count == 0) {
     364                        return 0;
     365                }
     366
    732367                // We acquired an previous-head/current-head range
    733368                // go through the range and release the sqes
    734369                for( i; count ) {
    735                         __u32 idx = ring.submit_q.array[ (phead + i) & smask ];
    736 
    737                         /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data );
    738                         __clean( &ring.submit_q.sqes[ idx ] );
    739                 }
     370                        __cfadbg_print_safe(io, "Kernel I/O : release loop\n");
     371                        __u32 idx = ctx.sq.kring.array[ (phead + i) & mask ];
     372                        ctx.sq.free_ring.array[ (ftail + i) & mask ] = idx;
     373                }
     374
     375                ctx.sq.kring.released = chead;          // note up to were we processed
     376                __atomic_store_n(&ctx.sq.free_ring.tail, ftail + count, __ATOMIC_SEQ_CST);
     377
     378                __ioarbiter_notify(ctx);
     379
    740380                return count;
    741381        }
    742382
    743         void __sqe_clean( volatile struct io_uring_sqe * sqe ) {
    744                 __clean( sqe );
    745         }
    746 
    747         static inline void __clean( volatile struct io_uring_sqe * sqe ) {
    748                 // If we are in debug mode, thrash the fields to make sure we catch reclamation errors
    749                 __cfaabi_dbg_debug_do(
    750                         memset(sqe, 0xde, sizeof(*sqe));
    751                         sqe->opcode = (sizeof(opcodes) / sizeof(const char *)) - 1;
    752                 );
    753 
    754                 // Mark the entry as unused
    755                 __atomic_store_n(&sqe->user_data, 3ul64, __ATOMIC_SEQ_CST);
     383//=============================================================================================
     384// I/O Arbiter
     385//=============================================================================================
     386        static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor * proc, __u32 idxs[], __u32 want ) {
     387                __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n");
     388
     389                __STATS__( false, io.alloc.block += 1; )
     390
     391                // No one has any resources left, wait for something to finish
     392                // Mark as pending
     393                __atomic_store_n( &this.pending.flag, true, __ATOMIC_SEQ_CST );
     394
     395                // Wait for our turn to submit
     396                wait( this.pending.blocked, want );
     397
     398                __attribute((unused)) bool ret =
     399                __alloc( this.pending.ctx, idxs, want);
     400                /* paranoid */ verify( ret );
     401
     402                return this.pending.ctx;
     403
     404        }
     405
     406        static void __ioarbiter_notify( $io_arbiter & mutex this, $io_context * ctx ) {
     407                /* paranoid */ verify( !is_empty(this.pending.blocked) );
     408                this.pending.ctx = ctx;
     409
     410                while( !is_empty(this.pending.blocked) ) {
     411                        __cfadbg_print_safe(io, "Kernel I/O : notifying\n");
     412                        __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head;
     413                        __u32 want = front( this.pending.blocked );
     414
     415                        if( have > want ) return;
     416
     417                        signal_block( this.pending.blocked );
     418                }
     419
     420                this.pending.flag = false;
     421        }
     422
     423        static void __ioarbiter_notify( $io_context & ctx ) {
     424                if(__atomic_load_n( &ctx.arbiter->pending.flag, __ATOMIC_SEQ_CST)) {
     425                        __ioarbiter_notify( *ctx.arbiter, &ctx );
     426                }
     427        }
     428
     429        // Simply append to the pending
     430        static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * ctx, __u32 idxs[], __u32 have, bool lazy ) {
     431                __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd);
     432
     433                /* paranoid */ verify( &this == ctx->arbiter );
     434
     435                // Mark as pending
     436                __atomic_store_n( &ctx->ext_sq.empty, false, __ATOMIC_SEQ_CST );
     437
     438                __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have);
     439
     440                // Wait for our turn to submit
     441                wait( ctx->ext_sq.blocked );
     442
     443                // Submit our indexes
     444                __submit(ctx, idxs, have, lazy);
     445
     446                __cfadbg_print_safe(io, "Kernel I/O : %u submitted from arbiter\n", have);
     447        }
     448
     449        static void __ioarbiter_flush( $io_arbiter & mutex this, $io_context * ctx ) {
     450                /* paranoid */ verify( &this == ctx->arbiter );
     451
     452                __STATS__( false, io.flush.external += 1; )
     453
     454                __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n");
     455
     456                condition & blcked = ctx->ext_sq.blocked;
     457                /* paranoid */ verify( ctx->ext_sq.empty == is_empty( blcked ) );
     458                while(!is_empty( blcked )) {
     459                        signal_block( blcked );
     460                }
     461
     462                ctx->ext_sq.empty = true;
    756463        }
    757464#endif
Note: See TracChangeset for help on using the changeset viewer.