Ignore:
Timestamp:
Feb 19, 2021, 1:47:09 PM (2 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
4f762d3
Parents:
b44959f
Message:

New implementation of io based on instance burrowing.
Trying to avoid the unbounded growth of the previous flat combining approach.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    rb44959f r78da4ab  
    1717
    1818#if defined(__CFA_DEBUG__)
    19         // #define __CFA_DEBUG_PRINT_IO__
    20         // #define __CFA_DEBUG_PRINT_IO_CORE__
     19        #define __CFA_DEBUG_PRINT_IO__
     20        #define __CFA_DEBUG_PRINT_IO_CORE__
    2121#endif
    2222
     
    7979        };
    8080
    81         // returns true of acquired as leader or second leader
    82         static inline bool try_lock( __leaderlock_t & this ) {
    83                 const uintptr_t thrd = 1z | (uintptr_t)active_thread();
    84                 bool block;
    85                 disable_interrupts();
    86                 for() {
    87                         struct $thread * expected = this.value;
    88                         if( 1p != expected && 0p != expected ) {
    89                                 /* paranoid */ verify( thrd != (uintptr_t)expected ); // We better not already be the next leader
    90                                 enable_interrupts( __cfaabi_dbg_ctx );
    91                                 return false;
    92                         }
    93                         struct $thread * desired;
    94                         if( 0p == expected ) {
    95                                 // If the lock isn't locked acquire it, no need to block
    96                                 desired = 1p;
    97                                 block = false;
    98                         }
    99                         else {
    100                                 // If the lock is already locked try becomming the next leader
    101                                 desired = (struct $thread *)thrd;
    102                                 block = true;
    103                         }
    104                         if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break;
    105                 }
    106                 if( block ) {
    107                         enable_interrupts( __cfaabi_dbg_ctx );
    108                         park();
    109                         disable_interrupts();
    110                 }
    111                 return true;
    112         }
    113 
    114         static inline bool next( __leaderlock_t & this ) {
    115                 /* paranoid */ verify( ! __preemption_enabled() );
    116                 struct $thread * nextt;
    117                 for() {
    118                         struct $thread * expected = this.value;
    119                         /* paranoid */ verify( (1 & (uintptr_t)expected) == 1 ); // The lock better be locked
    120 
    121                         struct $thread * desired;
    122                         if( 1p == expected ) {
    123                                 // No next leader, just unlock
    124                                 desired = 0p;
    125                                 nextt   = 0p;
    126                         }
    127                         else {
    128                                 // There is a next leader, remove but keep locked
    129                                 desired = 1p;
    130                                 nextt   = (struct $thread *)(~1z & (uintptr_t)expected);
    131                         }
    132                         if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break;
    133                 }
    134 
    135                 if(nextt) {
    136                         unpark( nextt );
    137                         enable_interrupts( __cfaabi_dbg_ctx );
    138                         return true;
    139                 }
    140                 enable_interrupts( __cfaabi_dbg_ctx );
    141                 return false;
    142         }
    143 
    14481//=============================================================================================
    14582// I/O Syscall
    14683//=============================================================================================
    147         static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) {
     84        static int __io_uring_enter( struct $io_context & ctx, unsigned to_submit, bool get ) {
    14885                bool need_sys_to_submit = false;
    14986                bool need_sys_to_complete = false;
     
    15289                TO_SUBMIT:
    15390                if( to_submit > 0 ) {
    154                         if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
     91                        if( !(ctx.ring_flags & IORING_SETUP_SQPOLL) ) {
    15592                                need_sys_to_submit = true;
    15693                                break TO_SUBMIT;
    15794                        }
    158                         if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
     95                        if( (*ctx.sq.flags) & IORING_SQ_NEED_WAKEUP ) {
    15996                                need_sys_to_submit = true;
    16097                                flags |= IORING_ENTER_SQ_WAKEUP;
     
    16299                }
    163100
    164                 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
     101                if( get && !(ctx.ring_flags & IORING_SETUP_SQPOLL) ) {
    165102                        flags |= IORING_ENTER_GETEVENTS;
    166                         if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
     103                        if( (ctx.ring_flags & IORING_SETUP_IOPOLL) ) {
    167104                                need_sys_to_complete = true;
    168105                        }
     
    171108                int ret = 0;
    172109                if( need_sys_to_submit || need_sys_to_complete ) {
    173                         __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags);
    174                         ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8);
    175                         __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING %d returned %d\n", ring.fd, ret);
    176 
    177                         if( ret < 0 ) {
    178                                 switch((int)errno) {
    179                                 case EAGAIN:
    180                                 case EINTR:
    181                                 case EBUSY:
    182                                         ret = -1;
    183                                         break;
    184                                 default:
    185                                         abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
    186                                 }
    187                         }
     110                        __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ctx.fd, to_submit, flags);
     111                        ret = syscall( __NR_io_uring_enter, ctx.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8);
     112                        __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING %d returned %d\n", ctx.fd, ret);
    188113                }
    189114
     
    196121// I/O Polling
    197122//=============================================================================================
    198         static unsigned __collect_submitions( struct __io_data & ring );
    199         static __u32 __release_consumed_submission( struct __io_data & ring );
    200         static inline void __clean( volatile struct io_uring_sqe * sqe );
    201 
    202         // Process a single completion message from the io_uring
    203         // This is NOT thread-safe
    204         static inline void process( volatile struct io_uring_cqe & cqe ) {
    205                 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
    206                 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
    207 
    208                 fulfil( *future, cqe.res );
    209         }
    210 
    211         static [int, bool] __drain_io( & struct __io_data ring ) {
    212                 /* paranoid */ verify( ! __preemption_enabled() );
    213 
    214                 unsigned to_submit = 0;
    215                 if( ring.poller_submits ) {
    216                         // If the poller thread also submits, then we need to aggregate the submissions which are ready
    217                         to_submit = __collect_submitions( ring );
    218                 }
    219 
    220                 int ret = __io_uring_enter(ring, to_submit, true);
     123        static inline unsigned __flush( struct $io_context & );
     124        static inline __u32 __release_sqes( struct $io_context & );
     125
     126        static [int, bool] __drain_io( & struct  $io_context ctx ) {
     127                unsigned to_submit = __flush( ctx );
     128                int ret = __io_uring_enter( ctx, to_submit, true );
    221129                if( ret < 0 ) {
    222                         return [0, true];
     130                        switch((int)errno) {
     131                        case EAGAIN:
     132                        case EINTR:
     133                        case EBUSY:
     134                                return [0, true];
     135                                break;
     136                        default:
     137                                abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
     138                        }
    223139                }
    224140
    225141                // update statistics
    226142                if (to_submit > 0) {
    227                         __STATS__( true,
     143                        __STATS__( false,
    228144                                if( to_submit > 0 ) {
    229145                                        io.submit_q.submit_avg.rdy += to_submit;
     
    232148                                }
    233149                        )
    234                 }
    235 
    236                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
     150                        /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     151
     152                        /* paranoid */ verify( ctx.sq.to_submit >= ret );
     153                        ctx.sq.to_submit -= ret;
     154
     155                        /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     156
     157                        if(ret) {
     158                                __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring\n", ret);
     159                        }
     160                }
    237161
    238162                // Release the consumed SQEs
    239                 __release_consumed_submission( ring );
     163                __release_sqes( ctx );
    240164
    241165                // Drain the queue
    242                 unsigned head = *ring.completion_q.head;
    243                 unsigned tail = *ring.completion_q.tail;
    244                 const __u32 mask = *ring.completion_q.mask;
     166                unsigned head = *ctx.cq.head;
     167                unsigned tail = *ctx.cq.tail;
     168                const __u32 mask = *ctx.cq.mask;
    245169
    246170                // Nothing was new return 0
     
    253177                for(i; count) {
    254178                        unsigned idx = (head + i) & mask;
    255                         volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
     179                        volatile struct io_uring_cqe & cqe = ctx.cq.cqes[idx];
    256180
    257181                        /* paranoid */ verify(&cqe);
    258182
    259                         process( cqe );
     183                        struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
     184                        __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
     185
     186                        fulfil( *future, cqe.res );
     187                }
     188
     189                if(count) {
     190                        __cfadbg_print_safe(io, "Kernel I/O : %u completed\n", count);
    260191                }
    261192
    262193                // Mark to the kernel that the cqe has been seen
    263194                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
    264                 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST );
     195                __atomic_store_n( ctx.cq.head, head + count, __ATOMIC_SEQ_CST );
    265196
    266197                return [count, count > 0 || to_submit > 0];
    267198        }
    268199
    269         void main( $io_ctx_thread & this ) {
    270                 __ioctx_register( this );
    271 
    272                 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this);
     200        void main( $io_context & this ) {
     201                __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.fd, &this);
    273202
    274203                const int reset_cnt = 5;
     
    276205                // Then loop until we need to start
    277206                LOOP:
    278                 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
     207                while() {
     208                        waitfor( ^?{} : this) {
     209                                break LOOP;
     210                        }
     211                        or else {}
     212
    279213                        // Drain the io
    280214                        int count;
    281215                        bool again;
    282                         disable_interrupts();
    283                                 [count, again] = __drain_io( *this.ring );
    284 
    285                                 if(!again) reset--;
    286 
    287                                 // Update statistics
    288                                 __STATS__( true,
    289                                         io.complete_q.completed_avg.val += count;
    290                                         io.complete_q.completed_avg.cnt += 1;
    291                                 )
    292                         enable_interrupts( __cfaabi_dbg_ctx );
     216                        [count, again] = __drain_io( this );
     217
     218                        if(!again) reset--;
     219
     220                        // Update statistics
     221                        __STATS__( false,
     222                                io.complete_q.completed_avg.val += count;
     223                                io.complete_q.completed_avg.cnt += 1;
     224                        )
    293225
    294226                        // If we got something, just yield and check again
     
    308240                        }
    309241
    310                                 __STATS__( false,
    311                                         io.complete_q.blocks += 1;
    312                                 )
    313                                 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this);
    314 
    315                                 // block this thread
    316                                 wait( this.sem );
     242                        __STATS__( false,
     243                                io.complete_q.blocks += 1;
     244                        )
     245                        __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.fd, &this);
     246
     247                        // block this thread
     248                        wait( this.sem );
    317249
    318250                        // restore counter
     
    320252                }
    321253
    322                 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this);
    323 
    324                 __ioctx_unregister( this );
     254                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.fd, &this);
    325255        }
    326256
     
    345275//
    346276
     277        static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor *, __u32 idxs[], __u32 want );
     278        static void __ioarbiter_submit  ( $io_arbiter & mutex this, $io_context * , __u32 idxs[], __u32 have );
     279        static void __ioarbiter_flush   ( $io_arbiter & mutex this, $io_context * );
     280        static inline void __ioarbiter_notify( $io_context & ctx );
     281
     282        //=============================================================================================
     283        // Allocation
     284        // for user's convenience fill the sqes from the indexes
     285        static inline void __fill(struct io_uring_sqe * out_sqes[], __u32 want, __u32 idxs[], struct $io_context * ctx)  {
     286                struct io_uring_sqe * sqes = ctx->sq.sqes;
     287                for(i; want) {
     288                        out_sqes[i] = &sqes[idxs[i]];
     289                }
     290        }
     291
     292        // Try to directly allocate from the a given context
     293        // Not thread-safe
     294        static inline bool __alloc(struct $io_context * ctx, __u32 idxs[], __u32 want) {
     295                __sub_ring_t & sq = ctx->sq;
     296                const __u32 mask  = *sq.mask;
     297                __u32 fhead = sq.free_ring.head;    // get the current head of the queue
     298                __u32 ftail = sq.free_ring.tail;    // get the current tail of the queue
     299
     300                // If we don't have enough sqes, fail
     301                if((ftail - fhead) < want) { return false; }
     302
     303                // copy all the indexes we want from the available list
     304                for(i; want) {
     305                        idxs[i] = sq.free_ring.array[(fhead + i) & mask];
     306                }
     307
     308                // Advance the head to mark the indexes as consumed
     309                __atomic_store_n(&sq.free_ring.head, fhead + want, __ATOMIC_RELEASE);
     310
     311                // return success
     312                return true;
     313        }
     314
    347315        // Allocate an submit queue entry.
    348316        // The kernel cannot see these entries until they are submitted, but other threads must be
     
    350318        // for convenience, return both the index and the pointer to the sqe
    351319        // sqe == &sqes[idx]
    352         [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) {
    353                 /* paranoid */ verify( data != 0 );
    354 
    355                 // Prepare the data we need
    356                 __attribute((unused)) int len   = 0;
    357                 __attribute((unused)) int block = 0;
    358                 __u32 cnt = *ring.submit_q.num;
    359                 __u32 mask = *ring.submit_q.mask;
    360 
    361                 __u32 off = thread_rand();
    362 
    363                 // Loop around looking for an available spot
    364                 for() {
    365                         // Look through the list starting at some offset
    366                         for(i; cnt) {
    367                                 __u64 expected = 3;
    368                                 __u32 idx = (i + off) & mask; // Get an index from a random
    369                                 volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
    370                                 volatile __u64 * udata = &sqe->user_data;
    371 
    372                                 // Allocate the entry by CASing the user_data field from 0 to the future address
    373                                 if( *udata == expected &&
    374                                         __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
    375                                 {
    376                                         // update statistics
    377                                         __STATS__( false,
    378                                                 io.submit_q.alloc_avg.val   += len;
    379                                                 io.submit_q.alloc_avg.block += block;
    380                                                 io.submit_q.alloc_avg.cnt   += 1;
    381                                         )
    382 
    383                                         // debug log
    384                                         __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data );
    385 
    386                                         // Success return the data
    387                                         return [sqe, idx];
    388                                 }
    389                                 verify(expected != data);
    390 
    391                                 // This one was used
    392                                 len ++;
    393                         }
    394 
    395                         block++;
    396 
    397                         yield();
    398                 }
    399         }
    400 
    401         static inline __u32 __submit_to_ready_array( struct __io_data & ring, __u32 idx, const __u32 mask ) {
    402                 /* paranoid */ verify( idx <= mask   );
    403                 /* paranoid */ verify( idx != -1ul32 );
    404 
    405                 // We need to find a spot in the ready array
    406                 __attribute((unused)) int len   = 0;
    407                 __attribute((unused)) int block = 0;
    408                 __u32 ready_mask = ring.submit_q.ready_cnt - 1;
    409 
    410                 __u32 off = thread_rand();
    411 
    412                 __u32 picked;
    413                 LOOKING: for() {
    414                         for(i; ring.submit_q.ready_cnt) {
    415                                 picked = (i + off) & ready_mask;
    416                                 __u32 expected = -1ul32;
    417                                 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
    418                                         break LOOKING;
    419                                 }
    420                                 verify(expected != idx);
    421 
    422                                 len ++;
    423                         }
    424 
    425                         block++;
    426 
    427                         __u32 released = __release_consumed_submission( ring );
    428                         if( released == 0 ) {
    429                                 yield();
    430                         }
    431                 }
    432 
    433                 // update statistics
    434                 __STATS__( false,
    435                         io.submit_q.look_avg.val   += len;
    436                         io.submit_q.look_avg.block += block;
    437                         io.submit_q.look_avg.cnt   += 1;
    438                 )
    439 
    440                 return picked;
    441         }
    442 
    443         void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) {
    444                 __io_data & ring = *ctx->thrd.ring;
    445 
     320        struct $io_context * cfa_io_allocate(struct io_uring_sqe * sqes[], __u32 idxs[], __u32 want) {
     321                __cfadbg_print_safe(io, "Kernel I/O : attempting to allocate %u\n", want);
     322
     323                disable_interrupts();
     324                processor * proc = __cfaabi_tls.this_processor;
     325                /* paranoid */ verify( __cfaabi_tls.this_processor );
     326                /* paranoid */ verify( proc->io.lock == false );
     327
     328                __atomic_store_n( &proc->io.lock, true, __ATOMIC_SEQ_CST );
     329                $io_context * ctx = proc->io.ctx;
     330                $io_arbiter * ioarb = proc->cltr->io.arbiter;
     331                /* paranoid */ verify( ioarb );
     332
     333                // Can we proceed to the fast path
     334                if(  ctx                                // We alreay have an instance?
     335                &&  !ctx->revoked )             // Our instance is still valid?
    446336                {
    447                         __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
    448                         __cfadbg_print_safe( io,
    449                                 "Kernel I/O : submitting %u (%p) for %p\n"
    450                                 "    data: %p\n"
    451                                 "    opcode: %s\n"
    452                                 "    fd: %d\n"
    453                                 "    flags: %d\n"
    454                                 "    prio: %d\n"
    455                                 "    off: %p\n"
    456                                 "    addr: %p\n"
    457                                 "    len: %d\n"
    458                                 "    other flags: %d\n"
    459                                 "    splice fd: %d\n"
    460                                 "    pad[0]: %llu\n"
    461                                 "    pad[1]: %llu\n"
    462                                 "    pad[2]: %llu\n",
    463                                 idx, sqe,
    464                                 active_thread(),
    465                                 (void*)sqe->user_data,
    466                                 opcodes[sqe->opcode],
    467                                 sqe->fd,
    468                                 sqe->flags,
    469                                 sqe->ioprio,
    470                                 (void*)sqe->off,
    471                                 (void*)sqe->addr,
    472                                 sqe->len,
    473                                 sqe->accept_flags,
    474                                 sqe->splice_fd_in,
    475                                 sqe->__pad2[0],
    476                                 sqe->__pad2[1],
    477                                 sqe->__pad2[2]
    478                         );
    479                 }
    480 
    481 
    482                 // Get now the data we definetely need
    483                 volatile __u32 * const tail = ring.submit_q.tail;
    484                 const __u32 mask  = *ring.submit_q.mask;
    485 
    486                 // There are 2 submission schemes, check which one we are using
    487                 if( ring.poller_submits ) {
    488                         // If the poller thread submits, then we just need to add this to the ready array
    489                         __submit_to_ready_array( ring, idx, mask );
    490 
    491                         post( ctx->thrd.sem );
    492 
    493                         __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
    494                 }
    495                 else if( ring.eager_submits ) {
    496                         __attribute__((unused)) __u32 picked = __submit_to_ready_array( ring, idx, mask );
    497 
    498                         #if defined(LEADER_LOCK)
    499                                 if( !try_lock(ring.submit_q.submit_lock) ) {
    500                                         __STATS__( false,
    501                                                 io.submit_q.helped += 1;
    502                                         )
    503                                         return;
    504                                 }
    505                                 /* paranoid */ verify( ! __preemption_enabled() );
    506                                 __STATS__( true,
    507                                         io.submit_q.leader += 1;
    508                                 )
    509                         #else
    510                                 for() {
    511                                         yield();
    512 
    513                                         if( try_lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2) ) {
    514                                                 __STATS__( false,
    515                                                         io.submit_q.leader += 1;
    516                                                 )
    517                                                 break;
    518                                         }
    519 
    520                                         // If some one else collected our index, we are done
    521                                         #warning ABA problem
    522                                         if( ring.submit_q.ready[picked] != idx ) {
    523                                                 __STATS__( false,
    524                                                         io.submit_q.helped += 1;
    525                                                 )
    526                                                 return;
    527                                         }
    528 
    529                                         __STATS__( false,
    530                                                 io.submit_q.busy += 1;
    531                                         )
    532                                 }
    533                         #endif
    534 
    535                         // We got the lock
    536                         // Collect the submissions
    537                         unsigned to_submit = __collect_submitions( ring );
    538 
    539                         // Actually submit
    540                         int ret = __io_uring_enter( ring, to_submit, false );
    541 
    542                         #if defined(LEADER_LOCK)
    543                                 /* paranoid */ verify( ! __preemption_enabled() );
    544                                 next(ring.submit_q.submit_lock);
    545                         #else
    546                                 unlock(ring.submit_q.submit_lock);
    547                         #endif
    548                         if( ret < 0 ) {
    549                                 return;
    550                         }
    551 
    552                         // Release the consumed SQEs
    553                         __release_consumed_submission( ring );
    554 
    555                         // update statistics
    556                         __STATS__( false,
    557                                 io.submit_q.submit_avg.rdy += to_submit;
    558                                 io.submit_q.submit_avg.csm += ret;
    559                                 io.submit_q.submit_avg.cnt += 1;
    560                         )
    561 
    562                         __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() );
    563                 }
    564                 else
     337                        __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n");
     338
     339                        // We can proceed to the fast path
     340                        if( __alloc(ctx, idxs, want) ) {
     341                                // Allocation was successful
     342                                // Mark the instance as no longer in-use and re-enable interrupts
     343                                __atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE );
     344                                enable_interrupts( __cfaabi_dbg_ctx );
     345
     346                                __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful\n");
     347
     348                                __fill( sqes, want, idxs, ctx );
     349                                return ctx;
     350                        }
     351                        // The fast path failed, fallback
     352                }
     353
     354                // Fast path failed, fallback on arbitration
     355                __atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE );
     356                enable_interrupts( __cfaabi_dbg_ctx );
     357
     358                __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n");
     359
     360                struct $io_context * ret = __ioarbiter_allocate(*ioarb, proc, idxs, want);
     361
     362                __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed\n");
     363
     364                __fill( sqes, want, idxs,ret );
     365                return ret;
     366        }
     367
     368
     369        //=============================================================================================
     370        // submission
     371        static inline void __submit( struct $io_context * ctx, __u32 idxs[], __u32 have) {
     372                // We can proceed to the fast path
     373                // Get the right objects
     374                __sub_ring_t & sq = ctx->sq;
     375                const __u32 mask  = *sq.mask;
     376                __u32 tail = sq.kring.ready;
     377
     378                // Add the sqes to the array
     379                for( i; have ) {
     380                        sq.kring.array[ (tail + i) & mask ] = idxs[i];
     381                }
     382
     383                // Make the sqes visible to the submitter
     384                __atomic_store_n(&sq.kring.ready, tail + have, __ATOMIC_RELEASE);
     385
     386                // Make sure the poller is awake
     387                __cfadbg_print_safe(io, "Kernel I/O : waking the poller\n");
     388                post( ctx->sem );
     389        }
     390
     391        void cfa_io_submit( struct $io_context * inctx, __u32 idxs[], __u32 have ) __attribute__((nonnull (1))) {
     392                __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u\n", have);
     393
     394                disable_interrupts();
     395                processor * proc = __cfaabi_tls.this_processor;
     396                /* paranoid */ verify( __cfaabi_tls.this_processor );
     397                /* paranoid */ verify( proc->io.lock == false );
     398
     399                __atomic_store_n( &proc->io.lock, true, __ATOMIC_SEQ_CST );
     400                $io_context * ctx = proc->io.ctx;
     401
     402                // Can we proceed to the fast path
     403                if(  ctx                                // We alreay have an instance?
     404                &&  !ctx->revoked               // Our instance is still valid?
     405                &&   ctx == inctx )             // We have the right instance?
    565406                {
    566                         // get mutual exclusion
    567                         #if defined(LEADER_LOCK)
    568                                 while(!try_lock(ring.submit_q.submit_lock));
    569                         #else
    570                                 lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2);
    571                         #endif
    572 
    573                         /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64,
    574                         /* paranoid */  "index %u already reclaimed\n"
    575                         /* paranoid */  "head %u, prev %u, tail %u\n"
    576                         /* paranoid */  "[-0: %u,-1: %u,-2: %u,-3: %u]\n",
    577                         /* paranoid */  idx,
    578                         /* paranoid */  *ring.submit_q.head, ring.submit_q.prev_head, *tail
    579                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
    580                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
    581                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
    582                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
    583                         /* paranoid */ );
    584 
    585                         // Append to the list of ready entries
    586 
    587                         /* paranoid */ verify( idx <= mask );
    588                         ring.submit_q.array[ (*tail) & mask ] = idx;
    589                         __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
    590 
    591                         // Submit however, many entries need to be submitted
    592                         int ret = __io_uring_enter( ring, 1, false );
    593                         if( ret < 0 ) {
    594                                 switch((int)errno) {
    595                                 default:
    596                                         abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
    597                                 }
    598                         }
    599 
    600                         /* paranoid */ verify(ret == 1);
    601 
    602                         // update statistics
    603                         __STATS__( false,
    604                                 io.submit_q.submit_avg.csm += 1;
    605                                 io.submit_q.submit_avg.cnt += 1;
    606                         )
    607 
    608                         {
    609                                 __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head;
    610                                 __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ];
    611                                 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx];
    612 
    613                                 __cfadbg_print_safe( io,
    614                                         "Kernel I/O : last submitted is %u (%p)\n"
    615                                         "    data: %p\n"
    616                                         "    opcode: %s\n"
    617                                         "    fd: %d\n"
    618                                         "    flags: %d\n"
    619                                         "    prio: %d\n"
    620                                         "    off: %p\n"
    621                                         "    addr: %p\n"
    622                                         "    len: %d\n"
    623                                         "    other flags: %d\n"
    624                                         "    splice fd: %d\n"
    625                                         "    pad[0]: %llu\n"
    626                                         "    pad[1]: %llu\n"
    627                                         "    pad[2]: %llu\n",
    628                                         last_idx, sqe,
    629                                         (void*)sqe->user_data,
    630                                         opcodes[sqe->opcode],
    631                                         sqe->fd,
    632                                         sqe->flags,
    633                                         sqe->ioprio,
    634                                         (void*)sqe->off,
    635                                         (void*)sqe->addr,
    636                                         sqe->len,
    637                                         sqe->accept_flags,
    638                                         sqe->splice_fd_in,
    639                                         sqe->__pad2[0],
    640                                         sqe->__pad2[1],
    641                                         sqe->__pad2[2]
    642                                 );
    643                         }
    644 
    645                         __atomic_thread_fence( __ATOMIC_SEQ_CST );
    646                         // Release the consumed SQEs
    647 
    648                         __release_consumed_submission( ring );
    649                         // ring.submit_q.sqes[idx].user_data = 3ul64;
    650 
    651                         #if defined(LEADER_LOCK)
    652                                 next(ring.submit_q.submit_lock);
    653                         #else
    654                                 unlock(ring.submit_q.submit_lock);
    655                         #endif
    656 
    657                         __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() );
    658                 }
    659         }
    660 
    661         // #define PARTIAL_SUBMIT 32
    662 
    663         // go through the list of submissions in the ready array and moved them into
    664         // the ring's submit queue
    665         static unsigned __collect_submitions( struct __io_data & ring ) {
    666                 /* paranoid */ verify( ring.submit_q.ready != 0p );
    667                 /* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
    668 
    669                 unsigned to_submit = 0;
    670                 __u32 tail = *ring.submit_q.tail;
    671                 const __u32 mask = *ring.submit_q.mask;
    672                 #if defined(PARTIAL_SUBMIT)
    673                         #if defined(LEADER_LOCK)
    674                                 #error PARTIAL_SUBMIT and LEADER_LOCK cannot co-exist
    675                         #endif
    676                         const __u32 cnt = ring.submit_q.ready_cnt > PARTIAL_SUBMIT ? PARTIAL_SUBMIT : ring.submit_q.ready_cnt;
    677                         const __u32 offset = ring.submit_q.prev_ready;
    678                         ring.submit_q.prev_ready += cnt;
    679                 #else
    680                         const __u32 cnt = ring.submit_q.ready_cnt;
    681                         const __u32 offset = 0;
    682                 #endif
    683 
    684                 // Go through the list of ready submissions
    685                 for( c; cnt ) {
    686                         __u32 i = (offset + c) % ring.submit_q.ready_cnt;
    687 
    688                         // replace any submission with the sentinel, to consume it.
    689                         __u32 idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
    690 
    691                         // If it was already the sentinel, then we are done
    692                         if( idx == -1ul32 ) continue;
    693 
    694                         // If we got a real submission, append it to the list
    695                         ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
    696                         to_submit++;
    697                 }
    698 
    699                 // Increment the tail based on how many we are ready to submit
    700                 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
    701 
    702                 return to_submit;
    703         }
     407                        __submit(ctx, idxs, have);
     408
     409                        // Mark the instance as no longer in-use, re-enable interrupts and return
     410                        __atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE );
     411                        enable_interrupts( __cfaabi_dbg_ctx );
     412
     413                        __cfadbg_print_safe(io, "Kernel I/O : submitted on fast path\n");
     414                        return;
     415                }
     416
     417                // Fast path failed, fallback on arbitration
     418                __atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE );
     419                enable_interrupts( __cfaabi_dbg_ctx );
     420
     421                __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n");
     422
     423                __ioarbiter_submit(*inctx->arbiter, inctx, idxs, have);
     424        }
     425
     426        //=============================================================================================
     427        // Flushing
     428        static unsigned __flush( struct $io_context & ctx ) {
     429                // First check for external
     430                if( !__atomic_load_n(&ctx.ext_sq.empty, __ATOMIC_SEQ_CST) ) {
     431                        // We have external submissions, delegate to the arbiter
     432                        __ioarbiter_flush( *ctx.arbiter, &ctx );
     433                }
     434
     435                __u32 tail  = *ctx.sq.kring.tail;
     436                __u32 ready = ctx.sq.kring.ready;
     437
     438                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     439                ctx.sq.to_submit += (ready - tail);
     440                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     441
     442                if(ctx.sq.to_submit) {
     443                        __cfadbg_print_safe(io, "Kernel I/O : %u ready to submit\n", ctx.sq.to_submit);
     444                }
     445
     446                __atomic_store_n(ctx.sq.kring.tail, ready, __ATOMIC_RELEASE);
     447
     448                return ctx.sq.to_submit;
     449        }
     450
    704451
    705452        // Go through the ring's submit queue and release everything that has already been consumed
    706453        // by io_uring
    707         static __u32 __release_consumed_submission( struct __io_data & ring ) {
    708                 const __u32 smask = *ring.submit_q.mask;
    709 
    710                 // We need to get the lock to copy the old head and new head
    711                 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
     454        // This cannot be done by multiple threads
     455        static __u32 __release_sqes( struct $io_context & ctx ) {
     456                const __u32 mask = *ctx.sq.mask;
     457
    712458                __attribute__((unused))
    713                 __u32 ctail = *ring.submit_q.tail;        // get the current tail of the queue
    714                 __u32 chead = *ring.submit_q.head;              // get the current head of the queue
    715                 __u32 phead = ring.submit_q.prev_head;  // get the head the last time we were here
    716                 ring.submit_q.prev_head = chead;                // note up to were we processed
    717                 unlock(ring.submit_q.release_lock);
     459                __u32 ctail = *ctx.sq.kring.tail;    // get the current tail of the queue
     460                __u32 chead = *ctx.sq.kring.head;        // get the current head of the queue
     461                __u32 phead = ctx.sq.kring.released; // get the head the last time we were here
     462
     463                __u32 ftail = ctx.sq.free_ring.tail;  // get the current tail of the queue
    718464
    719465                // the 3 fields are organized like this diagram
     
    734480                __u32 count = chead - phead;
    735481
     482                if(count == 0) {
     483                        return 0;
     484                }
     485
    736486                // We acquired an previous-head/current-head range
    737487                // go through the range and release the sqes
    738488                for( i; count ) {
    739                         __u32 idx = ring.submit_q.array[ (phead + i) & smask ];
    740 
    741                         /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data );
    742                         __clean( &ring.submit_q.sqes[ idx ] );
    743                 }
     489                        __u32 idx = ctx.sq.kring.array[ (phead + i) & mask ];
     490                        ctx.sq.free_ring.array[ (ftail + i) & mask ] = idx;
     491                }
     492
     493                ctx.sq.kring.released = chead;          // note up to were we processed
     494                __atomic_store_n(&ctx.sq.free_ring.tail, ftail + count, __ATOMIC_SEQ_CST);
     495
     496                __ioarbiter_notify(ctx);
     497
    744498                return count;
    745499        }
    746500
    747         void __sqe_clean( volatile struct io_uring_sqe * sqe ) {
    748                 __clean( sqe );
    749         }
    750 
    751         static inline void __clean( volatile struct io_uring_sqe * sqe ) {
    752                 // If we are in debug mode, thrash the fields to make sure we catch reclamation errors
    753                 __cfaabi_dbg_debug_do(
    754                         memset(sqe, 0xde, sizeof(*sqe));
    755                         sqe->opcode = (sizeof(opcodes) / sizeof(const char *)) - 1;
    756                 );
    757 
    758                 // Mark the entry as unused
    759                 __atomic_store_n(&sqe->user_data, 3ul64, __ATOMIC_SEQ_CST);
     501//=============================================================================================
     502// I/O Arbiter
     503//=============================================================================================
     504        static inline void __revoke( $io_arbiter & this, $io_context * ctx ) {
     505                if(ctx->revoked) return;
     506
     507                remove( this.assigned, *ctx );
     508
     509                // Mark as revoked
     510                __atomic_store_n(&ctx->revoked, true, __ATOMIC_SEQ_CST);
     511
     512                // Wait for the processor to no longer use it
     513                while(ctx->proc->io.lock) Pause();
     514
     515                // Remove the coupling with the processor
     516                ctx->proc->io.ctx = 0p;
     517                ctx->proc = 0p;
     518
     519                // add to available contexts
     520                addHead( this.available, *ctx );
     521        }
     522
     523        static inline void __assign( $io_arbiter & this, $io_context * ctx, processor * proc ) {
     524                remove( this.available, *ctx );
     525
     526                ctx->revoked = false;
     527                ctx->proc = proc;
     528                __atomic_store_n(&proc->io.ctx, ctx, __ATOMIC_SEQ_CST);
     529
     530                // add to assigned contexts
     531                addTail( this.assigned, *ctx );
     532        }
     533
     534        static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor * proc, __u32 idxs[], __u32 want ) {
     535                __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n");
     536
     537                SeqIter($io_context) iter;
     538                $io_context & ci;
     539                // Do we already have something available?
     540                for( over( iter, this.available ); iter | ci;) {
     541                        __cfadbg_print_safe(io, "Kernel I/O : attempting available context\n");
     542
     543                        $io_context * c = &ci;
     544                        if(__alloc(c, idxs, want)) {
     545                                __assign( this, c, proc);
     546                                return c;
     547                        }
     548                }
     549
     550
     551                // Otherwise, we have no choice but to revoke everyone to check if other instance have available data
     552                for( over( iter, this.assigned ); iter | ci; ) {
     553                        __cfadbg_print_safe(io, "Kernel I/O : revoking context for allocation\n");
     554
     555                        $io_context * c = &ci;
     556                        __revoke( this, c );
     557
     558                        if(__alloc(c, idxs, want)) {
     559                                __assign( this, c, proc);
     560                                return c;
     561                        }
     562                }
     563
     564                __cfadbg_print_safe(io, "Kernel I/O : waiting for available resources\n");
     565
     566                // No one has any resources left, wait for something to finish
     567                // Mark as pending
     568                __atomic_store_n( &this.pending.flag, true, __ATOMIC_SEQ_CST );
     569
     570                // Wait for our turn to submit
     571                wait( this.pending.blocked, want );
     572
     573                __attribute((unused)) bool ret =
     574                __alloc( this.pending.ctx, idxs, want);
     575                /* paranoid */ verify( ret );
     576
     577                __assign( this, this.pending.ctx, proc);
     578                return this.pending.ctx;
     579        }
     580
     581        static void __ioarbiter_notify( $io_arbiter & mutex this, $io_context * ctx ) {
     582                /* paranoid */ verify( !is_empty(this.pending.blocked) );
     583                this.pending.ctx = ctx;
     584
     585                while( !is_empty(this.pending.blocked) ) {
     586                        __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head;
     587                        __u32 want = front( this.pending.blocked );
     588
     589                        if( have > want ) return;
     590
     591                        signal_block( this.pending.blocked );
     592                }
     593
     594                this.pending.flag = false;
     595        }
     596
     597        static void __ioarbiter_notify( $io_context & ctx ) {
     598                if(__atomic_load_n( &ctx.arbiter->pending.flag, __ATOMIC_SEQ_CST)) {
     599                        __ioarbiter_notify( *ctx.arbiter, &ctx );
     600                }
     601        }
     602
     603        // Simply append to the pending
     604        static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * ctx, __u32 idxs[], __u32 have ) {
     605                __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd);
     606
     607                /* paranoid */ verify( &this == ctx->arbiter );
     608
     609                // Mark as pending
     610                __atomic_store_n( &ctx->ext_sq.empty, false, __ATOMIC_SEQ_CST );
     611
     612                // Wake-up the poller
     613                post( ctx->sem );
     614
     615                __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have);
     616
     617                // Wait for our turn to submit
     618                wait( ctx->ext_sq.blocked );
     619
     620                // Submit our indexes
     621                __submit(ctx, idxs, have);
     622
     623                __cfadbg_print_safe(io, "Kernel I/O : %u submitted from arbiter\n", have);
     624        }
     625
     626        static void __ioarbiter_flush( $io_arbiter & mutex this, $io_context * ctx ) {
     627                /* paranoid */ verify( &this == ctx->arbiter );
     628
     629                __revoke( this, ctx );
     630
     631                __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n");
     632
     633                condition & blcked = ctx->ext_sq.blocked;
     634                /* paranoid */ verify( ctx->ext_sq.empty == is_empty( blcked ) );
     635                while(!is_empty( blcked )) {
     636                        signal_block( blcked );
     637                }
     638
     639                ctx->ext_sq.empty = true;
     640        }
     641
     642        void __ioarbiter_register( $io_arbiter & mutex this, $io_context & ctx ) {
     643                __cfadbg_print_safe(io, "Kernel I/O : registering new context\n");
     644
     645                ctx.arbiter = &this;
     646
     647                // add to available contexts
     648                addHead( this.available, ctx );
     649
     650                // Check if this solves pending allocations
     651                if(this.pending.flag) {
     652                        __ioarbiter_notify( ctx );
     653                }
     654        }
     655
     656        void __ioarbiter_unregister( $io_arbiter & mutex this, $io_context & ctx ) {
     657                /* paranoid */ verify( &this == ctx.arbiter );
     658
     659                __revoke( this, &ctx );
     660
     661                remove( this.available, ctx );
    760662        }
    761663#endif
Note: See TracChangeset for help on using the changeset viewer.