Changeset ece0e80 for libcfa


Ignore:
Timestamp:
Jan 9, 2021, 4:27:57 PM (5 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
561dd26
Parents:
35fd2c4
Message:

Added prints.
Naive implementation of cancel.
Server now shutdown cleanly.

Location:
libcfa/src/concurrency
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    r35fd2c4 rece0e80  
    134134                int ret = 0;
    135135                if( need_sys_to_submit || need_sys_to_complete ) {
     136                        __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags);
    136137                        ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8);
    137138                        if( ret < 0 ) {
     
    230231                __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring);
    231232
    232                 int reset = 0;
     233                const int reset_cnt = 5;
     234                int reset = reset_cnt;
    233235                // Then loop until we need to start
     236                LOOP:
    234237                while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
    235238                        // Drain the io
     
    239242                                [count, again] = __drain_io( *this.ring );
    240243
    241                                 if(!again) reset++;
     244                                if(!again) reset--;
    242245
    243246                                // Update statistics
     
    249252
    250253                        // If we got something, just yield and check again
    251                         if(reset < 5) {
     254                        if(reset > 1) {
    252255                                yield();
    253                         }
    254                         // We didn't get anything baton pass to the slow poller
    255                         else {
     256                                continue LOOP;
     257                        }
     258
     259                        // We alread failed to find events a few time.
     260                        if(reset == 1) {
     261                                // Rearm the context so it can block
     262                                // but don't block right away
     263                                // we need to retry one last time in case
     264                                // something completed *just now*
     265                                __ioctx_prepare_block( this, ev );
     266                                continue LOOP;
     267                        }
     268
    256269                                __STATS__( false,
    257270                                        io.complete_q.blocks += 1;
    258271                                )
    259272                                __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self);
    260                                 reset = 0;
    261273
    262274                                // block this thread
    263                                 __ioctx_prepare_block( this, ev );
    264275                                wait( this.sem );
    265                         }
     276
     277                        // restore counter
     278                        reset = reset_cnt;
    266279                }
    267280
     
    319332                                        )
    320333
     334                                        __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data );
    321335
    322336                                        // Success return the data
     
    376390
    377391        void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) {
     392                __cfadbg_print_safe( io, "Kernel I/O : submitting %u for %p\n", idx, active_thread() );
     393
    378394                __io_data & ring = *ctx->thrd.ring;
    379395                // Get now the data we definetely need
     
    443459                                unlock(ring.submit_q.submit_lock);
    444460                        #endif
    445                         if( ret < 0 ) return;
     461                        if( ret < 0 ) {
     462                                return;
     463                        }
    446464
    447465                        // Release the consumed SQEs
     
    454472                                io.submit_q.submit_avg.cnt += 1;
    455473                        )
     474
     475                        __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() );
    456476                }
    457477                else {
  • libcfa/src/concurrency/io/call.cfa.in

    r35fd2c4 rece0e80  
    464464
    465465print("""
     466//-----------------------------------------------------------------------------
     467bool cancel(io_cancellation & this) {
     468        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL)
     469                return false;
     470        #else
     471                io_future_t future;
     472
     473                io_context * context = __get_io_context();
     474
     475                __u8 sflags = 0;
     476                struct __io_data & ring = *context->thrd.ring;
     477
     478                __u32 idx;
     479                struct io_uring_sqe * sqe;
     480                [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
     481
     482                sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
     483                sqe->opcode = IORING_OP_ASYNC_CANCEL;
     484                sqe->flags = sflags;
     485                sqe->addr = this.target;
     486
     487                verify( sqe->user_data == (__u64)(uintptr_t)&future );
     488                __submit( context, idx );
     489
     490                wait(future);
     491
     492                if( future.result == 0 ) return true; // Entry found
     493                if( future.result == -EALREADY) return true; // Entry found but in progress
     494                if( future.result == -ENOENT ) return false; // Entry not found
     495                return false;
     496        #endif
     497}
     498
    466499//-----------------------------------------------------------------------------
    467500// Check if a function is has asynchronous
  • libcfa/src/concurrency/io/setup.cfa

    r35fd2c4 rece0e80  
    169169                // Main loop
    170170                while( iopoll.run ) {
     171                        __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");
     172
    171173                        // Wait for events
    172174                        int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
     175
     176                        __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);
    173177
    174178                        // Check if an error occured
     
    181185                                $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64;
    182186                                /* paranoid */ verify( io_ctx );
    183                                 __cfadbg_print_safe(io_core, "Kernel I/O : Unparking io poller %p\n", io_ctx);
     187                                __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %p\n", io_ctx);
    184188                                #if !defined( __CFA_NO_STATISTICS__ )
    185189                                        __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;
     
    233237                $thread & thrd = this.thrd.self;
    234238                if( cluster_context ) {
     239                        // We are about to do weird things with the threads
     240                        // we don't need interrupts to complicate everything
     241                        disable_interrupts();
     242
     243                        // Get cluster info
    235244                        cluster & cltr = *thrd.curr_cluster;
    236245                        /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster );
     
    239248                        // We need to adjust the clean-up based on where the thread is
    240249                        if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
     250                                // This is the tricky case
     251                                // The thread was preempted or ready to run and now it is on the ready queue
     252                                // but the cluster is shutting down, so there aren't any processors to run the ready queue
     253                                // the solution is to steal the thread from the ready-queue and pretend it was blocked all along
    241254
    242255                                ready_schedule_lock();
    243 
    244                                         // This is the tricky case
    245                                         // The thread was preempted and now it is on the ready queue
     256                                        // The thread should on the list
     257                                        /* paranoid */ verify( thrd.link.next != 0p );
     258
     259                                        // Remove the thread from the ready queue of this cluster
    246260                                        // The thread should be the last on the list
    247                                         /* paranoid */ verify( thrd.link.next != 0p );
    248 
    249                                         // Remove the thread from the ready queue of this cluster
    250261                                        __attribute__((unused)) bool removed = remove_head( &cltr, &thrd );
    251262                                        /* paranoid */ verify( removed );
     
    263274                        }
    264275                        // !!! This is not an else if !!!
     276                        // Ok, now the thread is blocked (whether we cheated to get here or not)
    265277                        if( thrd.state == Blocked ) {
    266 
    267278                                // This is the "easy case"
    268279                                // The thread is parked and can easily be moved to active cluster
     
    274285                        }
    275286                        else {
    276 
    277287                                // The thread is in a weird state
    278288                                // I don't know what to do here
    279289                                abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n");
    280290                        }
     291
     292                        // The weird thread kidnapping stuff is over, restore interrupts.
     293                        enable_interrupts( __cfaabi_dbg_ctx );
    281294                } else {
    282295                        post( this.thrd.sem );
     
    463476
    464477        void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev) {
     478                __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %p\n", &ctx);
    465479                int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, ctx.ring->fd, &ev);
    466480                if (ret < 0) {
Note: See TracChangeset for help on using the changeset viewer.