Ignore:
Timestamp:
Mar 28, 2022, 4:00:32 PM (2 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, ast-experimental, enum, master, pthread-emulation, qualifiedEnum
Children:
37a3aa23
Parents:
2377ca2
Message:

Refactored io to allow holding the lock duirng idle sleep

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    r2377ca2 r18f7858  
    9494        extern void __kernel_unpark( thread$ * thrd, unpark_hint );
    9595
    96         static bool __cfa_do_drain( $io_context * ctx, cluster * cltr ) {
     96        static void ioring_syscsll( struct $io_context & ctx, unsigned int min_comp, unsigned int flags ) {
     97                __STATS__( true, io.calls.flush++; )
     98                int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, flags, (sigset_t *)0p, _NSIG / 8);
     99                if( ret < 0 ) {
     100                        switch((int)errno) {
     101                        case EAGAIN:
     102                        case EINTR:
     103                        case EBUSY:
     104                                // Update statistics
     105                                __STATS__( false, io.calls.errors.busy ++; )
     106                                return false;
     107                        default:
     108                                abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
     109                        }
     110                }
     111
     112                __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd);
     113                __STATS__( true, io.calls.submitted += ret; )
     114                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     115                /* paranoid */ verify( ctx.sq.to_submit >= ret );
     116
     117                ctx.sq.to_submit -= ret;
     118
     119                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     120
     121                // Release the consumed SQEs
     122                __release_sqes( ctx );
     123
     124                /* paranoid */ verify( ! __preemption_enabled() );
     125
     126                __atomic_store_n(&ctx.proc->io.pending, false, __ATOMIC_RELAXED);
     127        }
     128
     129        static bool try_acquire( $io_context * ctx ) __attribute__((nonnull(1))) {
    97130                /* paranoid */ verify( ! __preemption_enabled() );
    98131                /* paranoid */ verify( ready_schedule_islocked() );
    99                 /* paranoid */ verify( ctx );
    100 
    101                 const __u32 mask = *ctx->cq.mask;
    102132
    103133
     
    115145                }
    116146
     147                return true;
     148        }
     149
     150        static bool __cfa_do_drain( $io_context * ctx, cluster * cltr ) __attribute__((nonnull(1, 2))) {
     151                /* paranoid */ verify( ! __preemption_enabled() );
     152                /* paranoid */ verify( ready_schedule_islocked() );
     153                /* paranoid */ verify( ctx->cq.lock == true );
     154
     155                const __u32 mask = *ctx->cq.mask;
    117156                unsigned long long ts_prev = ctx->cq.ts;
    118157
     
    155194                bool local = false;
    156195                bool remote = false;
     196
     197                ready_schedule_lock();
    157198
    158199                cluster * const cltr = proc->cltr;
     
    186227                                const unsigned target = proc->io.target;
    187228                                /* paranoid */ verify( io.tscs[target].tv != MAX );
    188                                 if(target < ctxs_count) {
     229                                HELP: if(target < ctxs_count) {
    189230                                        const unsigned long long cutoff = calc_cutoff(ctsc, ctx->cq.id, ctxs_count, io.data, io.tscs, __shard_factor.io);
    190231                                        const unsigned long long age = moving_average(ctsc, io.tscs[target].tv, io.tscs[target].ma);
    191232                                        // __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no");
    192                                         if(age > cutoff) {
    193                                                 remote = __cfa_do_drain( io.data[target], cltr );
    194                                                 if(remote) __STATS__( false, io.calls.helped++; )
    195                                         }
     233                                        if(age <= cutoff) break HELP;
     234
     235                                        if(!try_acquire(io.data[target])) break HELP;
     236
     237                                        if(!__cfa_do_drain( io.data[target], cltr )) break HELP;
     238
     239                                        remote = true;
     240                                        __STATS__( false, io.calls.helped++; )
    196241                                }
    197242                                proc->io.target = MAX;
     
    201246
    202247                // Drain the local queue
    203                 local = __cfa_do_drain( proc->io.ctx, cltr );
     248                if(try_acquire( proc->io.ctx )) {
     249                        local = __cfa_do_drain( proc->io.ctx, cltr );
     250                }
    204251
    205252                /* paranoid */ verify( ready_schedule_islocked() );
    206253                /* paranoid */ verify( ! __preemption_enabled() );
    207254                /* paranoid */ verify( active_processor() == proc );
     255
     256                ready_schedule_unlock();
    208257                return local || remote;
    209258        }
    210259
    211         bool __cfa_io_flush( processor * proc, int min_comp ) {
     260        bool __cfa_io_flush( processor * proc ) {
    212261                /* paranoid */ verify( ! __preemption_enabled() );
    213262                /* paranoid */ verify( proc );
     
    219268                __ioarbiter_flush( ctx );
    220269
    221                 if(ctx.sq.to_submit != 0 || min_comp > 0) {
    222 
    223                         __STATS__( true, io.calls.flush++; )
    224                         int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, min_comp > 0 ? IORING_ENTER_GETEVENTS : 0, (sigset_t *)0p, _NSIG / 8);
    225                         if( ret < 0 ) {
    226                                 switch((int)errno) {
    227                                 case EAGAIN:
    228                                 case EINTR:
    229                                 case EBUSY:
    230                                         // Update statistics
    231                                         __STATS__( false, io.calls.errors.busy ++; )
    232                                         return false;
    233                                 default:
    234                                         abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
    235                                 }
    236                         }
    237 
    238                         __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd);
    239                         __STATS__( true, io.calls.submitted += ret; )
    240                         /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
    241                         /* paranoid */ verify( ctx.sq.to_submit >= ret );
    242 
    243                         ctx.sq.to_submit -= ret;
    244 
    245                         /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
    246 
    247                         // Release the consumed SQEs
    248                         __release_sqes( ctx );
    249 
    250                         /* paranoid */ verify( ! __preemption_enabled() );
    251 
    252                         __atomic_store_n(&ctx.proc->io.pending, false, __ATOMIC_RELAXED);
    253                 }
    254 
    255                 ready_schedule_lock();
    256                 bool ret = __cfa_io_drain( proc );
    257                 ready_schedule_unlock();
    258                 return ret;
     270                if(ctx.sq.to_submit != 0) {
     271                        ioring_syscsll(ctx, 0, 0);
     272
     273                }
     274
     275                return __cfa_io_drain( proc );
    259276        }
    260277
     
    389406                if(sq.to_submit > 30) {
    390407                        __tls_stats()->io.flush.full++;
    391                         __cfa_io_flush( ctx->proc, 0 );
     408                        __cfa_io_flush( ctx->proc );
    392409                }
    393410                if(!lazy) {
    394411                        __tls_stats()->io.flush.eager++;
    395                         __cfa_io_flush( ctx->proc, 0 );
     412                        __cfa_io_flush( ctx->proc );
    396413                }
    397414        }
     
    656673                        return true;
    657674                }
     675
     676                void __cfa_io_idle( processor * proc ) {
     677                        iovec iov;
     678                        __atomic_acquire( &proc->io.ctx->cq.lock );
     679
     680                        with( this->idle_wctx) {
     681
     682                        // Do we already have a pending read
     683                        if(available(*ftr)) {
     684                                // There is no pending read, we need to add one
     685                                reset(*ftr);
     686
     687                                iov.iov_base = rdbuf;
     688                                iov.iov_len  = sizeof(eventfd_t);
     689                                __kernel_read(proc, *ftr, iov, evfd );
     690                        }
     691
     692                        __ioarbiter_flush( *proc->io.ctx );
     693                        ioring_syscsll(ctx, 1, IORING_ENTER_GETEVENTS);
     694
     695                        __cfa_do_drain( proc->io.ctx, proc->cltr );
     696                }
    658697        #endif
    659698#endif
Note: See TracChangeset for help on using the changeset viewer.