Ignore:
Timestamp:
Apr 19, 2022, 3:00:04 PM (3 years ago)
Author:
m3zulfiq <m3zulfiq@…>
Branches:
ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
Children:
5b84a321
Parents:
ba897d21 (diff), bb7c77d (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

added benchmark and evaluations chapter to thesis

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    rba897d21 r2e9b59b  
    4141        #include "kernel.hfa"
    4242        #include "kernel/fwd.hfa"
    43         #include "kernel_private.hfa"
     43        #include "kernel/private.hfa"
     44        #include "kernel/cluster.hfa"
    4445        #include "io/types.hfa"
    4546
     
    9394        extern void __kernel_unpark( thread$ * thrd, unpark_hint );
    9495
    95         bool __cfa_io_drain( processor * proc ) {
    96                 /* paranoid */ verify( ! __preemption_enabled() );
    97                 /* paranoid */ verify( ready_schedule_islocked() );
    98                 /* paranoid */ verify( proc );
    99                 /* paranoid */ verify( proc->io.ctx );
    100 
    101                 // Drain the queue
    102                 $io_context * ctx = proc->io.ctx;
    103                 unsigned head = *ctx->cq.head;
    104                 unsigned tail = *ctx->cq.tail;
    105                 const __u32 mask = *ctx->cq.mask;
    106 
    107                 __u32 count = tail - head;
    108                 __STATS__( false, io.calls.drain++; io.calls.completed += count; )
    109 
    110                 if(count == 0) return false;
    111 
    112                 for(i; count) {
    113                         unsigned idx = (head + i) & mask;
    114                         volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx];
    115 
    116                         /* paranoid */ verify(&cqe);
    117 
    118                         struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
    119                         __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
    120 
    121                         __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL );
    122                 }
    123 
    124                 __cfadbg_print_safe(io, "Kernel I/O : %u completed\n", count);
    125 
    126                 // Mark to the kernel that the cqe has been seen
    127                 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
    128                 __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST );
    129 
    130                 /* paranoid */ verify( ready_schedule_islocked() );
    131                 /* paranoid */ verify( ! __preemption_enabled() );
    132 
    133                 return true;
    134         }
    135 
    136         bool __cfa_io_flush( processor * proc, int min_comp ) {
    137                 /* paranoid */ verify( ! __preemption_enabled() );
    138                 /* paranoid */ verify( proc );
    139                 /* paranoid */ verify( proc->io.ctx );
    140 
    141                 __attribute__((unused)) cluster * cltr = proc->cltr;
    142                 $io_context & ctx = *proc->io.ctx;
    143 
    144                 __ioarbiter_flush( ctx );
    145 
    146                 if(ctx.sq.to_submit != 0 || min_comp > 0) {
    147 
    148                         __STATS__( true, io.calls.flush++; )
    149                         int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, min_comp > 0 ? IORING_ENTER_GETEVENTS : 0, (sigset_t *)0p, _NSIG / 8);
     96        static void ioring_syscsll( struct $io_context & ctx, unsigned int min_comp, unsigned int flags ) {
     97                __STATS__( true, io.calls.flush++; )
     98                int ret;
     99                for() {
     100                        ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, flags, (sigset_t *)0p, _NSIG / 8);
    150101                        if( ret < 0 ) {
    151102                                switch((int)errno) {
     103                                case EINTR:
     104                                        continue;
    152105                                case EAGAIN:
    153                                 case EINTR:
    154106                                case EBUSY:
    155107                                        // Update statistics
     
    160112                                }
    161113                        }
    162 
    163                         __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd);
    164                         __STATS__( true, io.calls.submitted += ret; )
    165                         /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
    166                         /* paranoid */ verify( ctx.sq.to_submit >= ret );
    167 
    168                         ctx.sq.to_submit -= ret;
    169 
    170                         /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
    171 
    172                         // Release the consumed SQEs
    173                         __release_sqes( ctx );
    174 
     114                        break;
     115                }
     116
     117                __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd);
     118                __STATS__( true, io.calls.submitted += ret; )
     119                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     120                /* paranoid */ verify( ctx.sq.to_submit >= ret );
     121
     122                ctx.sq.to_submit -= ret;
     123
     124                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     125
     126                // Release the consumed SQEs
     127                __release_sqes( ctx );
     128
     129                /* paranoid */ verify( ! __preemption_enabled() );
     130
     131                __atomic_store_n(&ctx.proc->io.pending, false, __ATOMIC_RELAXED);
     132        }
     133
     134        static bool try_acquire( $io_context * ctx ) __attribute__((nonnull(1))) {
     135                /* paranoid */ verify( ! __preemption_enabled() );
     136                /* paranoid */ verify( ready_schedule_islocked() );
     137
     138
     139                {
     140                        const __u32 head = *ctx->cq.head;
     141                        const __u32 tail = *ctx->cq.tail;
     142
     143                        if(head == tail) return false;
     144                }
     145
     146                // Drain the queue
     147                if(!__atomic_try_acquire(&ctx->cq.lock)) {
     148                        __STATS__( false, io.calls.locked++; )
     149                        return false;
     150                }
     151
     152                return true;
     153        }
     154
     155        static bool __cfa_do_drain( $io_context * ctx, cluster * cltr ) __attribute__((nonnull(1, 2))) {
     156                /* paranoid */ verify( ! __preemption_enabled() );
     157                /* paranoid */ verify( ready_schedule_islocked() );
     158                /* paranoid */ verify( ctx->cq.lock == true );
     159
     160                const __u32 mask = *ctx->cq.mask;
     161                unsigned long long ts_prev = ctx->cq.ts;
     162
     163                // re-read the head and tail in case it already changed.
     164                const __u32 head = *ctx->cq.head;
     165                const __u32 tail = *ctx->cq.tail;
     166                const __u32 count = tail - head;
     167                __STATS__( false, io.calls.drain++; io.calls.completed += count; )
     168
     169                for(i; count) {
     170                        unsigned idx = (head + i) & mask;
     171                        volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx];
     172
     173                        /* paranoid */ verify(&cqe);
     174
     175                        struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
     176                        // __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
     177
     178                        __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL );
     179                }
     180
     181                unsigned long long ts_next = ctx->cq.ts = rdtscl();
     182
     183                // Mark to the kernel that the cqe has been seen
     184                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
     185                __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST );
     186                ctx->proc->idle_wctx.drain_time = ts_next;
     187
     188                __cfadbg_print_safe(io, "Kernel I/O : %u completed age %llu\n", count, ts_next);
     189                /* paranoid */ verify( ready_schedule_islocked() );
     190                /* paranoid */ verify( ! __preemption_enabled() );
     191
     192                __atomic_unlock(&ctx->cq.lock);
     193
     194                touch_tsc( cltr->sched.io.tscs, ctx->cq.id, ts_prev, ts_next );
     195
     196                return true;
     197        }
     198
     199        bool __cfa_io_drain( processor * proc ) {
     200                bool local = false;
     201                bool remote = false;
     202
     203                ready_schedule_lock();
     204
     205                cluster * const cltr = proc->cltr;
     206                $io_context * const ctx = proc->io.ctx;
     207                /* paranoid */ verify( cltr );
     208                /* paranoid */ verify( ctx );
     209
     210                with(cltr->sched) {
     211                        const size_t ctxs_count = io.count;
     212
     213                        /* paranoid */ verify( ready_schedule_islocked() );
    175214                        /* paranoid */ verify( ! __preemption_enabled() );
    176 
    177                         ctx.proc->io.pending = false;
    178                 }
    179 
    180                 ready_schedule_lock();
    181                 bool ret = __cfa_io_drain( proc );
     215                        /* paranoid */ verify( active_processor() == proc );
     216                        /* paranoid */ verify( __shard_factor.io > 0 );
     217                        /* paranoid */ verify( ctxs_count > 0 );
     218                        /* paranoid */ verify( ctx->cq.id < ctxs_count );
     219
     220                        const unsigned this_cache = cache_id(cltr, ctx->cq.id / __shard_factor.io);
     221                        const unsigned long long ctsc = rdtscl();
     222
     223                        if(proc->io.target == MAX) {
     224                                uint64_t chaos = __tls_rand();
     225                                unsigned ext = chaos & 0xff;
     226                                unsigned other  = (chaos >> 8) % (ctxs_count);
     227
     228                                if(ext < 3 || __atomic_load_n(&caches[other / __shard_factor.io].id, __ATOMIC_RELAXED) == this_cache) {
     229                                        proc->io.target = other;
     230                                }
     231                        }
     232                        else {
     233                                const unsigned target = proc->io.target;
     234                                /* paranoid */ verify( io.tscs[target].tv != MAX );
     235                                HELP: if(target < ctxs_count) {
     236                                        const unsigned long long cutoff = calc_cutoff(ctsc, ctx->cq.id, ctxs_count, io.data, io.tscs, __shard_factor.io);
     237                                        const unsigned long long age = moving_average(ctsc, io.tscs[target].tv, io.tscs[target].ma);
     238                                        __cfadbg_print_safe(io, "Kernel I/O: Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, ctx->cq.id, age, cutoff, age > cutoff ? "yes" : "no");
     239                                        if(age <= cutoff) break HELP;
     240
     241                                        if(!try_acquire(io.data[target])) break HELP;
     242
     243                                        if(!__cfa_do_drain( io.data[target], cltr )) break HELP;
     244
     245                                        remote = true;
     246                                        __STATS__( false, io.calls.helped++; )
     247                                }
     248                                proc->io.target = MAX;
     249                        }
     250                }
     251
     252
     253                // Drain the local queue
     254                if(try_acquire( proc->io.ctx )) {
     255                        local = __cfa_do_drain( proc->io.ctx, cltr );
     256                }
     257
     258                /* paranoid */ verify( ready_schedule_islocked() );
     259                /* paranoid */ verify( ! __preemption_enabled() );
     260                /* paranoid */ verify( active_processor() == proc );
     261
    182262                ready_schedule_unlock();
    183                 return ret;
     263                return local || remote;
     264        }
     265
     266        bool __cfa_io_flush( processor * proc ) {
     267                /* paranoid */ verify( ! __preemption_enabled() );
     268                /* paranoid */ verify( proc );
     269                /* paranoid */ verify( proc->io.ctx );
     270
     271                $io_context & ctx = *proc->io.ctx;
     272
     273                __ioarbiter_flush( ctx );
     274
     275                if(ctx.sq.to_submit != 0) {
     276                        ioring_syscsll(ctx, 0, 0);
     277
     278                }
     279
     280                return __cfa_io_drain( proc );
    184281        }
    185282
     
    209306                struct io_uring_sqe * sqes = ctx->sq.sqes;
    210307                for(i; want) {
    211                         __cfadbg_print_safe(io, "Kernel I/O : filling loop\n");
     308                        // __cfadbg_print_safe(io, "Kernel I/O : filling loop\n");
    212309                        out_sqes[i] = &sqes[idxs[i]];
    213310                }
     
    227324                // copy all the indexes we want from the available list
    228325                for(i; want) {
    229                         __cfadbg_print_safe(io, "Kernel I/O : allocating loop\n");
     326                        // __cfadbg_print_safe(io, "Kernel I/O : allocating loop\n");
    230327                        idxs[i] = sq.free_ring.array[(fhead + i) & mask];
    231328                }
     
    244341        // sqe == &sqes[idx]
    245342        struct $io_context * cfa_io_allocate(struct io_uring_sqe * sqes[], __u32 idxs[], __u32 want) {
    246                 __cfadbg_print_safe(io, "Kernel I/O : attempting to allocate %u\n", want);
     343                // __cfadbg_print_safe(io, "Kernel I/O : attempting to allocate %u\n", want);
    247344
    248345                disable_interrupts();
     
    252349                /* paranoid */ verify( ctx );
    253350
    254                 __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n");
     351                // __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n");
    255352
    256353                // We can proceed to the fast path
     
    260357                        enable_interrupts();
    261358
    262                         __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful from ring %d\n", ctx->fd);
     359                        // __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful from ring %d\n", ctx->fd);
    263360
    264361                        __fill( sqes, want, idxs, ctx );
     
    275372                /* paranoid */ verify( ioarb );
    276373
    277                 __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n");
     374                // __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n");
    278375
    279376                struct $io_context * ret = __ioarbiter_allocate(*ioarb, idxs, want);
    280377
    281                 __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd);
     378                // __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd);
    282379
    283380                __fill( sqes, want, idxs,ret );
     
    296393                // Add the sqes to the array
    297394                for( i; have ) {
    298                         __cfadbg_print_safe(io, "Kernel I/O : __submit loop\n");
     395                        // __cfadbg_print_safe(io, "Kernel I/O : __submit loop\n");
    299396                        sq.kring.array[ (tail + i) & mask ] = idxs[i];
    300397                }
     
    304401                sq.to_submit += have;
    305402
    306                 ctx->proc->io.pending = true;
    307                 ctx->proc->io.dirty   = true;
     403                __atomic_store_n(&ctx->proc->io.pending, true, __ATOMIC_RELAXED);
     404                __atomic_store_n(&ctx->proc->io.dirty  , true, __ATOMIC_RELAXED);
    308405        }
    309406
     
    314411                if(sq.to_submit > 30) {
    315412                        __tls_stats()->io.flush.full++;
    316                         __cfa_io_flush( ctx->proc, 0 );
     413                        __cfa_io_flush( ctx->proc );
    317414                }
    318415                if(!lazy) {
    319416                        __tls_stats()->io.flush.eager++;
    320                         __cfa_io_flush( ctx->proc, 0 );
     417                        __cfa_io_flush( ctx->proc );
    321418                }
    322419        }
    323420
    324421        void cfa_io_submit( struct $io_context * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) {
    325                 __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager");
     422                // __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager");
    326423
    327424                disable_interrupts();
     
    340437                        enable_interrupts();
    341438
    342                         __cfadbg_print_safe(io, "Kernel I/O : submitted on fast path\n");
     439                        // __cfadbg_print_safe(io, "Kernel I/O : submitted on fast path\n");
    343440                        return;
    344441                }
     
    348445                enable_interrupts();
    349446
    350                 __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n");
     447                // __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n");
    351448
    352449                __ioarbiter_submit(inctx, idxs, have, lazy);
     
    392489                // go through the range and release the sqes
    393490                for( i; count ) {
    394                         __cfadbg_print_safe(io, "Kernel I/O : release loop\n");
     491                        // __cfadbg_print_safe(io, "Kernel I/O : release loop\n");
    395492                        __u32 idx = ctx.sq.kring.array[ (phead + i) & mask ];
    396493                        ctx.sq.free_ring.array[ (ftail + i) & mask ] = idx;
     
    432529
    433530        static $io_context * __ioarbiter_allocate( $io_arbiter & this, __u32 idxs[], __u32 want ) {
    434                 __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n");
     531                // __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n");
    435532
    436533                __STATS__( false, io.alloc.block += 1; )
     
    499596                bool we = enqueue(ctx->ext_sq, (__outstanding_io&)ei);
    500597
    501                 ctx->proc->io.pending = true;
     598                __atomic_store_n(&ctx->proc->io.pending, true, __ATOMIC_SEQ_CST);
    502599
    503600                if( we ) {
     
    544641
    545642                        // We can proceed to the fast path
    546                         if( !__alloc(ctx, &idx, 1) ) return false;
     643                        if( !__alloc(ctx, &idx, 1) ) {
     644                                /* paranoid */ verify( false ); // for now check if this happens, next time just abort the sleep.
     645                                return false;
     646                        }
    547647
    548648                        // Allocation was successful
     
    574674
    575675                        /* paranoid */ verify( sqe->user_data == (uintptr_t)&future );
    576                         __submit( ctx, &idx, 1, true );
     676                        __submit_only( ctx, &idx, 1 );
    577677
    578678                        /* paranoid */ verify( proc == __cfaabi_tls.this_processor );
     
    581681                        return true;
    582682                }
     683
     684                void __cfa_io_idle( processor * proc ) {
     685                        iovec iov;
     686                        __atomic_acquire( &proc->io.ctx->cq.lock );
     687
     688                        __attribute__((used)) volatile bool was_reset = false;
     689
     690                        with( proc->idle_wctx) {
     691
     692                                // Do we already have a pending read
     693                                if(available(*ftr)) {
     694                                        // There is no pending read, we need to add one
     695                                        reset(*ftr);
     696
     697                                        iov.iov_base = rdbuf;
     698                                        iov.iov_len  = sizeof(eventfd_t);
     699                                        __kernel_read(proc, *ftr, iov, evfd );
     700                                        ftr->result = 0xDEADDEAD;
     701                                        *((eventfd_t *)rdbuf) = 0xDEADDEADDEADDEAD;
     702                                        was_reset = true;
     703                                }
     704                        }
     705
     706                        if( !__atomic_load_n( &proc->do_terminate, __ATOMIC_SEQ_CST ) ) {
     707                                __ioarbiter_flush( *proc->io.ctx );
     708                                proc->idle_wctx.sleep_time = rdtscl();
     709                                ioring_syscsll( *proc->io.ctx, 1, IORING_ENTER_GETEVENTS);
     710                        }
     711
     712                        ready_schedule_lock();
     713                        __cfa_do_drain( proc->io.ctx, proc->cltr );
     714                        ready_schedule_unlock();
     715
     716                        asm volatile ("" :: "m" (was_reset));
     717                }
    583718        #endif
    584719#endif
Note: See TracChangeset for help on using the changeset viewer.