Changeset 18f7858


Ignore:
Timestamp:
Mar 28, 2022, 4:00:32 PM (20 months ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, ast-experimental, enum, master, pthread-emulation, qualifiedEnum
Children:
37a3aa23
Parents:
2377ca2
Message:

Refactored io to allow holding the lock duirng idle sleep

Location:
libcfa/src/concurrency
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    r2377ca2 r18f7858  
    9494        extern void __kernel_unpark( thread$ * thrd, unpark_hint );
    9595
    96         static bool __cfa_do_drain( $io_context * ctx, cluster * cltr ) {
     96        static void ioring_syscsll( struct $io_context & ctx, unsigned int min_comp, unsigned int flags ) {
     97                __STATS__( true, io.calls.flush++; )
     98                int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, flags, (sigset_t *)0p, _NSIG / 8);
     99                if( ret < 0 ) {
     100                        switch((int)errno) {
     101                        case EAGAIN:
     102                        case EINTR:
     103                        case EBUSY:
     104                                // Update statistics
     105                                __STATS__( false, io.calls.errors.busy ++; )
     106                                return false;
     107                        default:
     108                                abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
     109                        }
     110                }
     111
     112                __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd);
     113                __STATS__( true, io.calls.submitted += ret; )
     114                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     115                /* paranoid */ verify( ctx.sq.to_submit >= ret );
     116
     117                ctx.sq.to_submit -= ret;
     118
     119                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     120
     121                // Release the consumed SQEs
     122                __release_sqes( ctx );
     123
     124                /* paranoid */ verify( ! __preemption_enabled() );
     125
     126                __atomic_store_n(&ctx.proc->io.pending, false, __ATOMIC_RELAXED);
     127        }
     128
     129        static bool try_acquire( $io_context * ctx ) __attribute__((nonnull(1))) {
    97130                /* paranoid */ verify( ! __preemption_enabled() );
    98131                /* paranoid */ verify( ready_schedule_islocked() );
    99                 /* paranoid */ verify( ctx );
    100 
    101                 const __u32 mask = *ctx->cq.mask;
    102132
    103133
     
    115145                }
    116146
     147                return true;
     148        }
     149
     150        static bool __cfa_do_drain( $io_context * ctx, cluster * cltr ) __attribute__((nonnull(1, 2))) {
     151                /* paranoid */ verify( ! __preemption_enabled() );
     152                /* paranoid */ verify( ready_schedule_islocked() );
     153                /* paranoid */ verify( ctx->cq.lock == true );
     154
     155                const __u32 mask = *ctx->cq.mask;
    117156                unsigned long long ts_prev = ctx->cq.ts;
    118157
     
    155194                bool local = false;
    156195                bool remote = false;
     196
     197                ready_schedule_lock();
    157198
    158199                cluster * const cltr = proc->cltr;
     
    186227                                const unsigned target = proc->io.target;
    187228                                /* paranoid */ verify( io.tscs[target].tv != MAX );
    188                                 if(target < ctxs_count) {
     229                                HELP: if(target < ctxs_count) {
    189230                                        const unsigned long long cutoff = calc_cutoff(ctsc, ctx->cq.id, ctxs_count, io.data, io.tscs, __shard_factor.io);
    190231                                        const unsigned long long age = moving_average(ctsc, io.tscs[target].tv, io.tscs[target].ma);
    191232                                        // __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no");
    192                                         if(age > cutoff) {
    193                                                 remote = __cfa_do_drain( io.data[target], cltr );
    194                                                 if(remote) __STATS__( false, io.calls.helped++; )
    195                                         }
     233                                        if(age <= cutoff) break HELP;
     234
     235                                        if(!try_acquire(io.data[target])) break HELP;
     236
     237                                        if(!__cfa_do_drain( io.data[target], cltr )) break HELP;
     238
     239                                        remote = true;
     240                                        __STATS__( false, io.calls.helped++; )
    196241                                }
    197242                                proc->io.target = MAX;
     
    201246
    202247                // Drain the local queue
    203                 local = __cfa_do_drain( proc->io.ctx, cltr );
     248                if(try_acquire( proc->io.ctx )) {
     249                        local = __cfa_do_drain( proc->io.ctx, cltr );
     250                }
    204251
    205252                /* paranoid */ verify( ready_schedule_islocked() );
    206253                /* paranoid */ verify( ! __preemption_enabled() );
    207254                /* paranoid */ verify( active_processor() == proc );
     255
     256                ready_schedule_unlock();
    208257                return local || remote;
    209258        }
    210259
    211         bool __cfa_io_flush( processor * proc, int min_comp ) {
     260        bool __cfa_io_flush( processor * proc ) {
    212261                /* paranoid */ verify( ! __preemption_enabled() );
    213262                /* paranoid */ verify( proc );
     
    219268                __ioarbiter_flush( ctx );
    220269
    221                 if(ctx.sq.to_submit != 0 || min_comp > 0) {
    222 
    223                         __STATS__( true, io.calls.flush++; )
    224                         int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, min_comp > 0 ? IORING_ENTER_GETEVENTS : 0, (sigset_t *)0p, _NSIG / 8);
    225                         if( ret < 0 ) {
    226                                 switch((int)errno) {
    227                                 case EAGAIN:
    228                                 case EINTR:
    229                                 case EBUSY:
    230                                         // Update statistics
    231                                         __STATS__( false, io.calls.errors.busy ++; )
    232                                         return false;
    233                                 default:
    234                                         abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
    235                                 }
    236                         }
    237 
    238                         __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd);
    239                         __STATS__( true, io.calls.submitted += ret; )
    240                         /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
    241                         /* paranoid */ verify( ctx.sq.to_submit >= ret );
    242 
    243                         ctx.sq.to_submit -= ret;
    244 
    245                         /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
    246 
    247                         // Release the consumed SQEs
    248                         __release_sqes( ctx );
    249 
    250                         /* paranoid */ verify( ! __preemption_enabled() );
    251 
    252                         __atomic_store_n(&ctx.proc->io.pending, false, __ATOMIC_RELAXED);
    253                 }
    254 
    255                 ready_schedule_lock();
    256                 bool ret = __cfa_io_drain( proc );
    257                 ready_schedule_unlock();
    258                 return ret;
     270                if(ctx.sq.to_submit != 0) {
     271                        ioring_syscsll(ctx, 0, 0);
     272
     273                }
     274
     275                return __cfa_io_drain( proc );
    259276        }
    260277
     
    389406                if(sq.to_submit > 30) {
    390407                        __tls_stats()->io.flush.full++;
    391                         __cfa_io_flush( ctx->proc, 0 );
     408                        __cfa_io_flush( ctx->proc );
    392409                }
    393410                if(!lazy) {
    394411                        __tls_stats()->io.flush.eager++;
    395                         __cfa_io_flush( ctx->proc, 0 );
     412                        __cfa_io_flush( ctx->proc );
    396413                }
    397414        }
     
    656673                        return true;
    657674                }
     675
     676                void __cfa_io_idle( processor * proc ) {
     677                        iovec iov;
     678                        __atomic_acquire( &proc->io.ctx->cq.lock );
     679
     680                        with( this->idle_wctx) {
     681
     682                        // Do we already have a pending read
     683                        if(available(*ftr)) {
     684                                // There is no pending read, we need to add one
     685                                reset(*ftr);
     686
     687                                iov.iov_base = rdbuf;
     688                                iov.iov_len  = sizeof(eventfd_t);
     689                                __kernel_read(proc, *ftr, iov, evfd );
     690                        }
     691
     692                        __ioarbiter_flush( *proc->io.ctx );
     693                        ioring_syscsll(ctx, 1, IORING_ENTER_GETEVENTS);
     694
     695                        __cfa_do_drain( proc->io.ctx, proc->cltr );
     696                }
    658697        #endif
    659698#endif
  • libcfa/src/concurrency/io/setup.cfa

    r2377ca2 r18f7858  
    3232
    3333        void __cfa_io_start( processor * proc ) {}
    34         bool __cfa_io_flush( processor * proc, int ) { return false; }
     34        bool __cfa_io_flush( processor * proc ) { return false; }
     35        bool __cfa_io_drain( processor * proc ) __attribute__((nonnull (1)));
     36        void __cfa_io_idle ( processor * ) __attribute__((nonnull (1)));
    3537        void __cfa_io_stop ( processor * proc ) {}
    3638
     
    215217
    216218                // completion queue
    217                 cq.lock      = 0;
     219                cq.lock      = false;
    218220                cq.id        = MAX;
    219221                cq.ts        = rdtscl();
  • libcfa/src/concurrency/kernel.cfa

    r2377ca2 r18f7858  
    132132static void __wake_one(cluster * cltr);
    133133
    134 static void idle_sleep(processor * proc, io_future_t & future, iovec & iov);
     134static void idle_sleep(processor * proc);
    135135static bool mark_idle (__cluster_proc_list & idles, processor & proc);
    136136static void mark_awake(__cluster_proc_list & idles, processor & proc);
    137137
    138138extern bool __cfa_io_drain( processor * proc ) __attribute__((nonnull (1)));
    139 extern bool __cfa_io_flush( processor *, int min_comp );
    140 static inline bool __maybe_io_drain( processor * );
     139extern bool __cfa_io_flush( processor * ) __attribute__((nonnull (1)));
     140extern void __cfa_io_idle( processor * ) __attribute__((nonnull (1)));
    141141
    142142#if defined(CFA_WITH_IO_URING_IDLE)
     
    168168        // mark it as already fulfilled so we know if there is a pending request or not
    169169        this->idle_wctx.ftr->self.ptr = 1p;
    170         iovec idle_iovec = { this->idle_wctx.rdbuf, sizeof(eventfd_t) };
    171170
    172171        __cfadbg_print_safe(runtime_core, "Kernel : core %p starting\n", this);
     
    193192                for() {
    194193                        // Check if there is pending io
    195                         __maybe_io_drain( this );
     194                        __cfa_io_drain( this );
    196195
    197196                        // Try to get the next thread
     
    199198
    200199                        if( !readyThread ) {
     200                                // there is no point in holding submissions if we are idle
    201201                                __IO_STATS__(true, io.flush.idle++; )
    202                                 __cfa_io_flush( this, 0 );
     202                                __cfa_io_flush( this );
     203
     204                                // drain again in case something showed up
     205                                __cfa_io_drain( this );
    203206
    204207                                readyThread = __next_thread( this->cltr );
     
    206209
    207210                        if( !readyThread ) for(5) {
     211                                readyThread = __next_thread_slow( this->cltr );
     212
     213                                if( readyThread ) break;
     214
     215                                // It's unlikely we still I/O to submit, but the arbiter could
    208216                                __IO_STATS__(true, io.flush.idle++; )
    209 
    210                                 readyThread = __next_thread_slow( this->cltr );
    211 
    212                                 if( readyThread ) break;
    213 
    214                                 __cfa_io_flush( this, 0 );
     217                                __cfa_io_flush( this );
     218
     219                                // drain again in case something showed up
     220                                __cfa_io_drain( this );
    215221                        }
    216222
     
    235241                                }
    236242
    237                                 idle_sleep( this, *this->idle_wctx.ftr, idle_iovec );
     243                                idle_sleep( this );
    238244
    239245                                // We were woken up, remove self from idle
     
    257263                        if(__atomic_load_n(&this->io.pending, __ATOMIC_RELAXED) && !__atomic_load_n(&this->io.dirty, __ATOMIC_RELAXED)) {
    258264                                __IO_STATS__(true, io.flush.dirty++; )
    259                                 __cfa_io_flush( this, 0 );
     265                                __cfa_io_flush( this );
    260266                        }
    261267                }
     
    683689}
    684690
    685 static void idle_sleep(processor * this, io_future_t & future, iovec & iov) {
     691static void idle_sleep(processor * this) {
    686692        /* paranoid */ verify( this->idle_wctx.evfd != 1 );
    687693        /* paranoid */ verify( this->idle_wctx.evfd != 2 );
     
    735741                #endif
    736742        #else
    737                 // Do we already have a pending read
    738                 if(available(future)) {
    739                         // There is no pending read, we need to add one
    740                         reset(future);
    741 
    742                         __kernel_read(this, future, iov, this->idle_wctx.evfd );
    743                 }
    744 
    745                 __cfa_io_flush( this, 1 );
     743                __cfa_io_idle( this );
    746744        #endif
    747745}
     
    831829#endif
    832830
    833 static inline bool __maybe_io_drain( processor * proc ) {
    834         /* paranoid */ verify( proc );
    835         bool ret = false;
    836         #if defined(CFA_HAVE_LINUX_IO_URING_H)
    837                 __cfadbg_print_safe(runtime_core, "Kernel : core %p checking io for ring %d\n", proc, proc->io.ctx->fd);
    838 
    839                 // Check if we should drain the queue
    840                 $io_context * ctx = proc->io.ctx;
    841                 unsigned head = *ctx->cq.head;
    842                 unsigned tail = *ctx->cq.tail;
    843                 if(head == tail) return false;
    844                 ready_schedule_lock();
    845                 ret = __cfa_io_drain( proc );
    846                 ready_schedule_unlock();
    847         #endif
    848         return ret;
    849 }
     831
    850832
    851833//-----------------------------------------------------------------------------
Note: See TracChangeset for help on using the changeset viewer.