Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    rbdfd0bd r9f5a71eb  
    4141        #include "kernel.hfa"
    4242        #include "kernel/fwd.hfa"
    43         #include "kernel/private.hfa"
    44         #include "kernel/cluster.hfa"
     43        #include "kernel_private.hfa"
    4544        #include "io/types.hfa"
    4645
     
    9493        extern void __kernel_unpark( thread$ * thrd, unpark_hint );
    9594
    96         static void ioring_syscsll( struct $io_context & ctx, unsigned int min_comp, unsigned int flags ) {
    97                 __STATS__( true, io.calls.flush++; )
    98                 int ret;
    99                 for() {
    100                         ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, flags, (sigset_t *)0p, _NSIG / 8);
     95        bool __cfa_io_drain( processor * proc ) {
     96                /* paranoid */ verify( ! __preemption_enabled() );
     97                /* paranoid */ verify( ready_schedule_islocked() );
     98                /* paranoid */ verify( proc );
     99                /* paranoid */ verify( proc->io.ctx );
     100
     101                // Drain the queue
     102                $io_context * ctx = proc->io.ctx;
     103                unsigned head = *ctx->cq.head;
     104                unsigned tail = *ctx->cq.tail;
     105                const __u32 mask = *ctx->cq.mask;
     106
     107                __u32 count = tail - head;
     108                __STATS__( false, io.calls.drain++; io.calls.completed += count; )
     109
     110                if(count == 0) return false;
     111
     112                for(i; count) {
     113                        unsigned idx = (head + i) & mask;
     114                        volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx];
     115
     116                        /* paranoid */ verify(&cqe);
     117
     118                        struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
     119                        __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
     120
     121                        __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL );
     122                }
     123
     124                __cfadbg_print_safe(io, "Kernel I/O : %u completed\n", count);
     125
     126                // Mark to the kernel that the cqe has been seen
     127                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
     128                __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST );
     129
     130                /* paranoid */ verify( ready_schedule_islocked() );
     131                /* paranoid */ verify( ! __preemption_enabled() );
     132
     133                return true;
     134        }
     135
     136        bool __cfa_io_flush( processor * proc, int min_comp ) {
     137                /* paranoid */ verify( ! __preemption_enabled() );
     138                /* paranoid */ verify( proc );
     139                /* paranoid */ verify( proc->io.ctx );
     140
     141                __attribute__((unused)) cluster * cltr = proc->cltr;
     142                $io_context & ctx = *proc->io.ctx;
     143
     144                __ioarbiter_flush( ctx );
     145
     146                if(ctx.sq.to_submit != 0 || min_comp > 0) {
     147
     148                        __STATS__( true, io.calls.flush++; )
     149                        int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, min_comp > 0 ? IORING_ENTER_GETEVENTS : 0, (sigset_t *)0p, _NSIG / 8);
    101150                        if( ret < 0 ) {
    102151                                switch((int)errno) {
     152                                case EAGAIN:
    103153                                case EINTR:
    104                                         continue;
    105                                 case EAGAIN:
    106154                                case EBUSY:
    107155                                        // Update statistics
     
    112160                                }
    113161                        }
    114                         break;
    115                 }
    116 
    117                 __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd);
    118                 __STATS__( true, io.calls.submitted += ret; )
    119                 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
    120                 /* paranoid */ verify( ctx.sq.to_submit >= ret );
    121 
    122                 ctx.sq.to_submit -= ret;
    123 
    124                 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
    125 
    126                 // Release the consumed SQEs
    127                 __release_sqes( ctx );
    128 
    129                 /* paranoid */ verify( ! __preemption_enabled() );
    130 
    131                 __atomic_store_n(&ctx.proc->io.pending, false, __ATOMIC_RELAXED);
    132         }
    133 
    134         static bool try_acquire( $io_context * ctx ) __attribute__((nonnull(1))) {
    135                 /* paranoid */ verify( ! __preemption_enabled() );
    136                 /* paranoid */ verify( ready_schedule_islocked() );
    137 
    138 
    139                 {
    140                         const __u32 head = *ctx->cq.head;
    141                         const __u32 tail = *ctx->cq.tail;
    142 
    143                         if(head == tail) return false;
    144                 }
    145 
    146                 // Drain the queue
    147                 if(!__atomic_try_acquire(&ctx->cq.lock)) {
    148                         __STATS__( false, io.calls.locked++; )
    149                         return false;
    150                 }
    151 
    152                 return true;
    153         }
    154 
    155         static bool __cfa_do_drain( $io_context * ctx, cluster * cltr ) __attribute__((nonnull(1, 2))) {
    156                 /* paranoid */ verify( ! __preemption_enabled() );
    157                 /* paranoid */ verify( ready_schedule_islocked() );
    158                 /* paranoid */ verify( ctx->cq.lock == true );
    159 
    160                 const __u32 mask = *ctx->cq.mask;
    161                 unsigned long long ts_prev = ctx->cq.ts;
    162 
    163                 // re-read the head and tail in case it already changed.
    164                 const __u32 head = *ctx->cq.head;
    165                 const __u32 tail = *ctx->cq.tail;
    166                 const __u32 count = tail - head;
    167                 __STATS__( false, io.calls.drain++; io.calls.completed += count; )
    168 
    169                 for(i; count) {
    170                         unsigned idx = (head + i) & mask;
    171                         volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx];
    172 
    173                         /* paranoid */ verify(&cqe);
    174 
    175                         struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
    176                         // __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
    177 
    178                         __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL );
    179                 }
    180 
    181                 unsigned long long ts_next = ctx->cq.ts = rdtscl();
    182 
    183                 // Mark to the kernel that the cqe has been seen
    184                 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
    185                 __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST );
    186                 ctx->proc->idle_wctx.drain_time = ts_next;
    187 
    188                 __cfadbg_print_safe(io, "Kernel I/O : %u completed age %llu\n", count, ts_next);
    189                 /* paranoid */ verify( ready_schedule_islocked() );
    190                 /* paranoid */ verify( ! __preemption_enabled() );
    191 
    192                 __atomic_unlock(&ctx->cq.lock);
    193 
    194                 touch_tsc( cltr->sched.io.tscs, ctx->cq.id, ts_prev, ts_next );
    195 
    196                 return true;
    197         }
    198 
    199         bool __cfa_io_drain( processor * proc ) {
    200                 bool local = false;
    201                 bool remote = false;
     162
     163                        __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd);
     164                        __STATS__( true, io.calls.submitted += ret; )
     165                        /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     166                        /* paranoid */ verify( ctx.sq.to_submit >= ret );
     167
     168                        ctx.sq.to_submit -= ret;
     169
     170                        /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     171
     172                        // Release the consumed SQEs
     173                        __release_sqes( ctx );
     174
     175                        /* paranoid */ verify( ! __preemption_enabled() );
     176
     177                        ctx.proc->io.pending = false;
     178                }
    202179
    203180                ready_schedule_lock();
    204 
    205                 cluster * const cltr = proc->cltr;
    206                 $io_context * const ctx = proc->io.ctx;
    207                 /* paranoid */ verify( cltr );
    208                 /* paranoid */ verify( ctx );
    209 
    210                 with(cltr->sched) {
    211                         const size_t ctxs_count = io.count;
    212 
    213                         /* paranoid */ verify( ready_schedule_islocked() );
    214                         /* paranoid */ verify( ! __preemption_enabled() );
    215                         /* paranoid */ verify( active_processor() == proc );
    216                         /* paranoid */ verify( __shard_factor.io > 0 );
    217                         /* paranoid */ verify( ctxs_count > 0 );
    218                         /* paranoid */ verify( ctx->cq.id < ctxs_count );
    219 
    220                         const unsigned this_cache = cache_id(cltr, ctx->cq.id / __shard_factor.io);
    221                         const unsigned long long ctsc = rdtscl();
    222 
    223                         if(proc->io.target == MAX) {
    224                                 uint64_t chaos = __tls_rand();
    225                                 unsigned ext = chaos & 0xff;
    226                                 unsigned other  = (chaos >> 8) % (ctxs_count);
    227 
    228                                 if(ext < 3 || __atomic_load_n(&caches[other / __shard_factor.io].id, __ATOMIC_RELAXED) == this_cache) {
    229                                         proc->io.target = other;
    230                                 }
    231                         }
    232                         else {
    233                                 const unsigned target = proc->io.target;
    234                                 /* paranoid */ verify( io.tscs[target].tv != MAX );
    235                                 HELP: if(target < ctxs_count) {
    236                                         const unsigned long long cutoff = calc_cutoff(ctsc, ctx->cq.id, ctxs_count, io.data, io.tscs, __shard_factor.io);
    237                                         const unsigned long long age = moving_average(ctsc, io.tscs[target].tv, io.tscs[target].ma);
    238                                         __cfadbg_print_safe(io, "Kernel I/O: Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, ctx->cq.id, age, cutoff, age > cutoff ? "yes" : "no");
    239                                         if(age <= cutoff) break HELP;
    240 
    241                                         if(!try_acquire(io.data[target])) break HELP;
    242 
    243                                         if(!__cfa_do_drain( io.data[target], cltr )) break HELP;
    244 
    245                                         remote = true;
    246                                         __STATS__( false, io.calls.helped++; )
    247                                 }
    248                                 proc->io.target = MAX;
    249                         }
    250                 }
    251 
    252 
    253                 // Drain the local queue
    254                 if(try_acquire( proc->io.ctx )) {
    255                         local = __cfa_do_drain( proc->io.ctx, cltr );
    256                 }
    257 
    258                 /* paranoid */ verify( ready_schedule_islocked() );
    259                 /* paranoid */ verify( ! __preemption_enabled() );
    260                 /* paranoid */ verify( active_processor() == proc );
    261 
     181                bool ret = __cfa_io_drain( proc );
    262182                ready_schedule_unlock();
    263                 return local || remote;
    264         }
    265 
    266         bool __cfa_io_flush( processor * proc ) {
    267                 /* paranoid */ verify( ! __preemption_enabled() );
    268                 /* paranoid */ verify( proc );
    269                 /* paranoid */ verify( proc->io.ctx );
    270 
    271                 $io_context & ctx = *proc->io.ctx;
    272 
    273                 __ioarbiter_flush( ctx );
    274 
    275                 if(ctx.sq.to_submit != 0) {
    276                         ioring_syscsll(ctx, 0, 0);
    277 
    278                 }
    279 
    280                 return __cfa_io_drain( proc );
     183                return ret;
    281184        }
    282185
     
    306209                struct io_uring_sqe * sqes = ctx->sq.sqes;
    307210                for(i; want) {
    308                         // __cfadbg_print_safe(io, "Kernel I/O : filling loop\n");
     211                        __cfadbg_print_safe(io, "Kernel I/O : filling loop\n");
    309212                        out_sqes[i] = &sqes[idxs[i]];
    310213                }
     
    324227                // copy all the indexes we want from the available list
    325228                for(i; want) {
    326                         // __cfadbg_print_safe(io, "Kernel I/O : allocating loop\n");
     229                        __cfadbg_print_safe(io, "Kernel I/O : allocating loop\n");
    327230                        idxs[i] = sq.free_ring.array[(fhead + i) & mask];
    328231                }
     
    341244        // sqe == &sqes[idx]
    342245        struct $io_context * cfa_io_allocate(struct io_uring_sqe * sqes[], __u32 idxs[], __u32 want) {
    343                 // __cfadbg_print_safe(io, "Kernel I/O : attempting to allocate %u\n", want);
     246                __cfadbg_print_safe(io, "Kernel I/O : attempting to allocate %u\n", want);
    344247
    345248                disable_interrupts();
     
    349252                /* paranoid */ verify( ctx );
    350253
    351                 // __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n");
     254                __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n");
    352255
    353256                // We can proceed to the fast path
     
    357260                        enable_interrupts();
    358261
    359                         // __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful from ring %d\n", ctx->fd);
     262                        __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful from ring %d\n", ctx->fd);
    360263
    361264                        __fill( sqes, want, idxs, ctx );
     
    372275                /* paranoid */ verify( ioarb );
    373276
    374                 // __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n");
     277                __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n");
    375278
    376279                struct $io_context * ret = __ioarbiter_allocate(*ioarb, idxs, want);
    377280
    378                 // __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd);
     281                __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd);
    379282
    380283                __fill( sqes, want, idxs,ret );
     
    393296                // Add the sqes to the array
    394297                for( i; have ) {
    395                         // __cfadbg_print_safe(io, "Kernel I/O : __submit loop\n");
     298                        __cfadbg_print_safe(io, "Kernel I/O : __submit loop\n");
    396299                        sq.kring.array[ (tail + i) & mask ] = idxs[i];
    397300                }
     
    401304                sq.to_submit += have;
    402305
    403                 __atomic_store_n(&ctx->proc->io.pending, true, __ATOMIC_RELAXED);
    404                 __atomic_store_n(&ctx->proc->io.dirty  , true, __ATOMIC_RELAXED);
     306                ctx->proc->io.pending = true;
     307                ctx->proc->io.dirty   = true;
    405308        }
    406309
     
    411314                if(sq.to_submit > 30) {
    412315                        __tls_stats()->io.flush.full++;
    413                         __cfa_io_flush( ctx->proc );
     316                        __cfa_io_flush( ctx->proc, 0 );
    414317                }
    415318                if(!lazy) {
    416319                        __tls_stats()->io.flush.eager++;
    417                         __cfa_io_flush( ctx->proc );
     320                        __cfa_io_flush( ctx->proc, 0 );
    418321                }
    419322        }
    420323
    421324        void cfa_io_submit( struct $io_context * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) {
    422                 // __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager");
     325                __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager");
    423326
    424327                disable_interrupts();
     
    437340                        enable_interrupts();
    438341
    439                         // __cfadbg_print_safe(io, "Kernel I/O : submitted on fast path\n");
     342                        __cfadbg_print_safe(io, "Kernel I/O : submitted on fast path\n");
    440343                        return;
    441344                }
     
    445348                enable_interrupts();
    446349
    447                 // __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n");
     350                __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n");
    448351
    449352                __ioarbiter_submit(inctx, idxs, have, lazy);
     
    489392                // go through the range and release the sqes
    490393                for( i; count ) {
    491                         // __cfadbg_print_safe(io, "Kernel I/O : release loop\n");
     394                        __cfadbg_print_safe(io, "Kernel I/O : release loop\n");
    492395                        __u32 idx = ctx.sq.kring.array[ (phead + i) & mask ];
    493396                        ctx.sq.free_ring.array[ (ftail + i) & mask ] = idx;
     
    529432
    530433        static $io_context * __ioarbiter_allocate( $io_arbiter & this, __u32 idxs[], __u32 want ) {
    531                 // __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n");
     434                __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n");
    532435
    533436                __STATS__( false, io.alloc.block += 1; )
     
    596499                bool we = enqueue(ctx->ext_sq, (__outstanding_io&)ei);
    597500
    598                 __atomic_store_n(&ctx->proc->io.pending, true, __ATOMIC_SEQ_CST);
     501                ctx->proc->io.pending = true;
    599502
    600503                if( we ) {
     
    641544
    642545                        // We can proceed to the fast path
    643                         if( !__alloc(ctx, &idx, 1) ) {
    644                                 /* paranoid */ verify( false ); // for now check if this happens, next time just abort the sleep.
    645                                 return false;
    646                         }
     546                        if( !__alloc(ctx, &idx, 1) ) return false;
    647547
    648548                        // Allocation was successful
     
    674574
    675575                        /* paranoid */ verify( sqe->user_data == (uintptr_t)&future );
    676                         __submit_only( ctx, &idx, 1 );
     576                        __submit( ctx, &idx, 1, true );
    677577
    678578                        /* paranoid */ verify( proc == __cfaabi_tls.this_processor );
     
    681581                        return true;
    682582                }
    683 
    684                 void __cfa_io_idle( processor * proc ) {
    685                         iovec iov;
    686                         __atomic_acquire( &proc->io.ctx->cq.lock );
    687 
    688                         __attribute__((used)) volatile bool was_reset = false;
    689 
    690                         with( proc->idle_wctx) {
    691 
    692                                 // Do we already have a pending read
    693                                 if(available(*ftr)) {
    694                                         // There is no pending read, we need to add one
    695                                         reset(*ftr);
    696 
    697                                         iov.iov_base = rdbuf;
    698                                         iov.iov_len  = sizeof(eventfd_t);
    699                                         __kernel_read(proc, *ftr, iov, evfd );
    700                                         ftr->result = 0xDEADDEAD;
    701                                         *((eventfd_t *)rdbuf) = 0xDEADDEADDEADDEAD;
    702                                         was_reset = true;
    703                                 }
    704                         }
    705 
    706                         if( !__atomic_load_n( &proc->do_terminate, __ATOMIC_SEQ_CST ) ) {
    707                                 __ioarbiter_flush( *proc->io.ctx );
    708                                 proc->idle_wctx.sleep_time = rdtscl();
    709                                 ioring_syscsll( *proc->io.ctx, 1, IORING_ENTER_GETEVENTS);
    710                         }
    711 
    712                         ready_schedule_lock();
    713                         __cfa_do_drain( proc->io.ctx, proc->cltr );
    714                         ready_schedule_unlock();
    715 
    716                         asm volatile ("" :: "m" (was_reset));
    717                 }
    718583        #endif
    719584#endif
Note: See TracChangeset for help on using the changeset viewer.