Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/kernel.cfa

    rae7adbc4 rc9c1c1cb  
    4242
    4343#if !defined(__CFA_NO_STATISTICS__)
    44         #define __STATS( ...) __VA_ARGS__
     44        #define __STATS_DEF( ...) __VA_ARGS__
    4545#else
    46         #define __STATS( ...)
     46        #define __STATS_DEF( ...)
    4747#endif
    4848
     
    122122static thread$ * __next_thread(cluster * this);
    123123static thread$ * __next_thread_slow(cluster * this);
     124static thread$ * __next_thread_search(cluster * this);
    124125static inline bool __must_unpark( thread$ * thrd ) __attribute((nonnull(1)));
    125126static void __run_thread(processor * this, thread$ * dst);
     
    187188                MAIN_LOOP:
    188189                for() {
    189                         #define OLD_MAIN 1
    190                         #if OLD_MAIN
    191190                        // Check if there is pending io
    192191                        __maybe_io_drain( this );
     
    196195
    197196                        if( !readyThread ) {
     197                                __IO_STATS__(true, io.flush.idle++; )
    198198                                __cfa_io_flush( this, 0 );
    199199
     200                                readyThread = __next_thread( this->cltr );
     201                        }
     202
     203                        if( !readyThread ) for(5) {
     204                                __IO_STATS__(true, io.flush.idle++; )
     205
    200206                                readyThread = __next_thread_slow( this->cltr );
     207
     208                                if( readyThread ) break;
     209
     210                                __cfa_io_flush( this, 0 );
    201211                        }
    202212
     
    206216                                if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP;
    207217
    208                                 #if !defined(__CFA_NO_STATISTICS__)
    209                                         __tls_stats()->ready.sleep.halts++;
    210                                 #endif
    211 
    212218                                // Push self to idle stack
    213219                                if(!mark_idle(this->cltr->procs, * this)) continue MAIN_LOOP;
    214220
    215221                                // Confirm the ready-queue is empty
    216                                 readyThread = __next_thread_slow( this->cltr );
     222                                readyThread = __next_thread_search( this->cltr );
    217223                                if( readyThread ) {
    218224                                        // A thread was found, cancel the halt
    219225                                        mark_awake(this->cltr->procs, * this);
    220226
    221                                         #if !defined(__CFA_NO_STATISTICS__)
    222                                                 __tls_stats()->ready.sleep.cancels++;
    223                                         #endif
     227                                        __STATS__(true, ready.sleep.cancels++; )
    224228
    225229                                        // continue the mai loop
     
    248252
    249253                        if(this->io.pending && !this->io.dirty) {
     254                                __IO_STATS__(true, io.flush.dirty++; )
    250255                                __cfa_io_flush( this, 0 );
    251256                        }
    252 
    253                         #else
    254                                 #warning new kernel loop
    255                         SEARCH: {
    256                                 /* paranoid */ verify( ! __preemption_enabled() );
    257 
    258                                 // First, lock the scheduler since we are searching for a thread
    259                                 ready_schedule_lock();
    260 
    261                                 // Try to get the next thread
    262                                 readyThread = pop_fast( this->cltr );
    263                                 if(readyThread) { ready_schedule_unlock(); break SEARCH; }
    264 
    265                                 // If we can't find a thread, might as well flush any outstanding I/O
    266                                 if(this->io.pending) { __cfa_io_flush( this, 0 ); }
    267 
    268                                 // Spin a little on I/O, just in case
    269                                 for(5) {
    270                                         __maybe_io_drain( this );
    271                                         readyThread = pop_fast( this->cltr );
    272                                         if(readyThread) { ready_schedule_unlock(); break SEARCH; }
    273                                 }
    274 
    275                                 // no luck, try stealing a few times
    276                                 for(5) {
    277                                         if( __maybe_io_drain( this ) ) {
    278                                                 readyThread = pop_fast( this->cltr );
    279                                         } else {
    280                                                 readyThread = pop_slow( this->cltr );
    281                                         }
    282                                         if(readyThread) { ready_schedule_unlock(); break SEARCH; }
    283                                 }
    284 
    285                                 // still no luck, search for a thread
    286                                 readyThread = pop_search( this->cltr );
    287                                 if(readyThread) { ready_schedule_unlock(); break SEARCH; }
    288 
    289                                 // Don't block if we are done
    290                                 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) {
    291                                         ready_schedule_unlock();
    292                                         break MAIN_LOOP;
    293                                 }
    294 
    295                                 __STATS( __tls_stats()->ready.sleep.halts++; )
    296 
    297                                 // Push self to idle stack
    298                                 ready_schedule_unlock();
    299                                 if(!mark_idle(this->cltr->procs, * this)) goto SEARCH;
    300                                 ready_schedule_lock();
    301 
    302                                 // Confirm the ready-queue is empty
    303                                 __maybe_io_drain( this );
    304                                 readyThread = pop_search( this->cltr );
    305                                 ready_schedule_unlock();
    306 
    307                                 if( readyThread ) {
    308                                         // A thread was found, cancel the halt
    309                                         mark_awake(this->cltr->procs, * this);
    310 
    311                                         __STATS( __tls_stats()->ready.sleep.cancels++; )
    312 
    313                                         // continue the main loop
    314                                         break SEARCH;
    315                                 }
    316 
    317                                 __STATS( if(this->print_halts) __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->unique_id, rdtscl()); )
    318                                 __cfadbg_print_safe(runtime_core, "Kernel : core %p waiting on eventfd %d\n", this, this->idle_fd);
    319 
    320                                 {
    321                                         eventfd_t val;
    322                                         ssize_t ret = read( this->idle_fd, &val, sizeof(val) );
    323                                         if(ret < 0) {
    324                                                 switch((int)errno) {
    325                                                 case EAGAIN:
    326                                                 #if EAGAIN != EWOULDBLOCK
    327                                                         case EWOULDBLOCK:
    328                                                 #endif
    329                                                 case EINTR:
    330                                                         // No need to do anything special here, just assume it's a legitimate wake-up
    331                                                         break;
    332                                                 default:
    333                                                         abort( "KERNEL : internal error, read failure on idle eventfd, error(%d) %s.", (int)errno, strerror( (int)errno ) );
    334                                                 }
    335                                         }
    336                                 }
    337 
    338                                         __STATS( if(this->print_halts) __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->unique_id, rdtscl()); )
    339 
    340                                 // We were woken up, remove self from idle
    341                                 mark_awake(this->cltr->procs, * this);
    342 
    343                                 // DON'T just proceed, start looking again
    344                                 continue MAIN_LOOP;
    345                         }
    346 
    347                 RUN_THREAD:
    348                         /* paranoid */ verify( ! __preemption_enabled() );
    349                         /* paranoid */ verify( readyThread );
    350 
    351                         // Reset io dirty bit
    352                         this->io.dirty = false;
    353 
    354                         // We found a thread run it
    355                         __run_thread(this, readyThread);
    356 
    357                         // Are we done?
    358                         if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP;
    359 
    360                         if(this->io.pending && !this->io.dirty) {
    361                                 __cfa_io_flush( this, 0 );
    362                         }
    363 
    364                         ready_schedule_lock();
    365                         __maybe_io_drain( this );
    366                         ready_schedule_unlock();
    367                         #endif
    368257                }
    369258
     
    476365                                break RUNNING;
    477366                        case TICKET_UNBLOCK:
    478                                 #if !defined(__CFA_NO_STATISTICS__)
    479                                         __tls_stats()->ready.threads.threads++;
    480                                 #endif
     367                                __STATS__(true, ready.threads.threads++; )
    481368                                // This is case 2, the racy case, someone tried to run this thread before it finished blocking
    482369                                // In this case, just run it again.
     
    493380        __cfadbg_print_safe(runtime_core, "Kernel : core %p finished running thread %p\n", this, thrd_dst);
    494381
    495         #if !defined(__CFA_NO_STATISTICS__)
    496                 __tls_stats()->ready.threads.threads--;
    497         #endif
     382        __STATS__(true, ready.threads.threads--; )
    498383
    499384        /* paranoid */ verify( ! __preemption_enabled() );
     
    506391        thread$ * thrd_src = kernelTLS().this_thread;
    507392
    508         __STATS( thrd_src->last_proc = kernelTLS().this_processor; )
     393        __STATS_DEF( thrd_src->last_proc = kernelTLS().this_processor; )
    509394
    510395        // Run the thread on this processor
     
    554439        /* paranoid */ verify( 0x0D15EA5E0D15EA5Ep == thrd->canary );
    555440
    556         const bool local = thrd->state != Start;
    557441        if (thrd->preempted == __NO_PREEMPTION) thrd->state = Ready;
    558442
    559443        // Dereference the thread now because once we push it, there is not guaranteed it's still valid.
    560444        struct cluster * cl = thrd->curr_cluster;
    561         __STATS(bool outside = hint == UNPARK_LOCAL && thrd->last_proc && thrd->last_proc != kernelTLS().this_processor; )
     445        __STATS_DEF(bool outside = hint == UNPARK_LOCAL && thrd->last_proc && thrd->last_proc != kernelTLS().this_processor; )
    562446
    563447        // push the thread to the cluster ready-queue
     
    610494
    611495        ready_schedule_lock();
    612                 thread$ * thrd;
    613                 for(25) {
    614                         thrd = pop_slow( this );
    615                         if(thrd) goto RET;
    616                 }
    617                 thrd = pop_search( this );
    618 
    619                 RET:
     496                thread$ * thrd = pop_slow( this );
     497        ready_schedule_unlock();
     498
     499        /* paranoid */ verify( ! __preemption_enabled() );
     500        return thrd;
     501}
     502
     503// KERNEL ONLY
     504static inline thread$ * __next_thread_search(cluster * this) with( *this ) {
     505        /* paranoid */ verify( ! __preemption_enabled() );
     506
     507        ready_schedule_lock();
     508                thread$ * thrd = pop_search( this );
    620509        ready_schedule_unlock();
    621510
     
    733622// Wake a thread from the front if there are any
    734623static void __wake_one(cluster * this) {
     624        eventfd_t val;
     625
    735626        /* paranoid */ verify( ! __preemption_enabled() );
    736627        /* paranoid */ verify( ready_schedule_islocked() );
    737628
    738629        // Check if there is a sleeping processor
    739         // int fd = __atomic_load_n(&this->procs.fd, __ATOMIC_SEQ_CST);
    740         int fd = 0;
    741         if( __atomic_load_n(&this->procs.fd, __ATOMIC_SEQ_CST) != 0 ) {
    742                 fd = __atomic_exchange_n(&this->procs.fd, 0, __ATOMIC_RELAXED);
    743         }
    744 
    745         // If no one is sleeping, we are done
    746         if( fd == 0 ) return;
    747 
    748         // We found a processor, wake it up
    749         eventfd_t val;
    750         val = 1;
    751         eventfd_write( fd, val );
    752 
    753         #if !defined(__CFA_NO_STATISTICS__)
    754                 if( kernelTLS().this_stats ) {
    755                         __tls_stats()->ready.sleep.wakes++;
    756                 }
    757                 else {
    758                         __atomic_fetch_add(&this->stats->ready.sleep.wakes, 1, __ATOMIC_RELAXED);
    759                 }
    760         #endif
     630        struct __fd_waitctx * fdp = __atomic_load_n(&this->procs.fdw, __ATOMIC_SEQ_CST);
     631
     632        // If no one is sleeping: we are done
     633        if( fdp == 0p ) return;
     634
     635        int fd = 1;
     636        if( __atomic_load_n(&fdp->fd, __ATOMIC_SEQ_CST) != 1 ) {
     637                fd = __atomic_exchange_n(&fdp->fd, 1, __ATOMIC_RELAXED);
     638        }
     639
     640        switch(fd) {
     641        case 0:
     642                // If the processor isn't ready to sleep then the exchange will already wake it up
     643                #if !defined(__CFA_NO_STATISTICS__)
     644                        if( kernelTLS().this_stats ) { __tls_stats()->ready.sleep.early++;
     645                        } else { __atomic_fetch_add(&this->stats->ready.sleep.early, 1, __ATOMIC_RELAXED); }
     646                #endif
     647                break;
     648        case 1:
     649                // If someone else already said they will wake them: we are done
     650                #if !defined(__CFA_NO_STATISTICS__)
     651                        if( kernelTLS().this_stats ) { __tls_stats()->ready.sleep.seen++;
     652                        } else { __atomic_fetch_add(&this->stats->ready.sleep.seen, 1, __ATOMIC_RELAXED); }
     653                #endif
     654                break;
     655        default:
     656                // If the processor was ready to sleep, we need to wake it up with an actual write
     657                val = 1;
     658                eventfd_write( fd, val );
     659
     660                #if !defined(__CFA_NO_STATISTICS__)
     661                        if( kernelTLS().this_stats ) { __tls_stats()->ready.sleep.wakes++;
     662                        } else { __atomic_fetch_add(&this->stats->ready.sleep.wakes, 1, __ATOMIC_RELAXED); }
     663                #endif
     664                break;
     665        }
    761666
    762667        /* paranoid */ verify( ready_schedule_islocked() );
     
    771676
    772677        __cfadbg_print_safe(runtime_core, "Kernel : waking Processor %p\n", this);
     678
     679        this->idle_wctx.fd = 1;
    773680
    774681        eventfd_t val;
     
    780687
    781688static void idle_sleep(processor * this, io_future_t & future, iovec & iov) {
     689        // Tell everyone we are ready to go do sleep
     690        for() {
     691                int expected = this->idle_wctx.fd;
     692
     693                // Someone already told us to wake-up! No time for a nap.
     694                if(expected == 1) { return; }
     695
     696                // Try to mark that we are going to sleep
     697                if(__atomic_compare_exchange_n(&this->idle_wctx.fd, &expected, this->idle_fd, false,  __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) {
     698                        // Every one agreed, taking a nap
     699                        break;
     700                }
     701        }
     702
     703
    782704        #if !defined(CFA_WITH_IO_URING_IDLE)
    783705                #if !defined(__CFA_NO_STATISTICS__)
     
    826748
    827749static bool mark_idle(__cluster_proc_list & this, processor & proc) {
     750        __STATS__(true, ready.sleep.halts++; )
     751
     752        proc.idle_wctx.fd = 0;
     753
    828754        /* paranoid */ verify( ! __preemption_enabled() );
    829755        if(!try_lock( this )) return false;
     
    833759                insert_first(this.idles, proc);
    834760
    835                 __atomic_store_n(&this.fd, proc.idle_fd, __ATOMIC_SEQ_CST);
     761                __atomic_store_n(&this.fdw, &proc.idle_wctx, __ATOMIC_SEQ_CST);
    836762        unlock( this );
    837763        /* paranoid */ verify( ! __preemption_enabled() );
     
    849775
    850776                {
    851                         int fd = 0;
    852                         if(!this.idles`isEmpty) fd = this.idles`first.idle_fd;
    853                         __atomic_store_n(&this.fd, fd, __ATOMIC_SEQ_CST);
     777                        struct __fd_waitctx * wctx = 0;
     778                        if(!this.idles`isEmpty) wctx = &this.idles`first.idle_wctx;
     779                        __atomic_store_n(&this.fdw, wctx, __ATOMIC_SEQ_CST);
    854780                }
    855781
     
    915841                unsigned tail = *ctx->cq.tail;
    916842                if(head == tail) return false;
    917                 #if OLD_MAIN
    918                         ready_schedule_lock();
    919                         ret = __cfa_io_drain( proc );
    920                         ready_schedule_unlock();
    921                 #else
    922                         ret = __cfa_io_drain( proc );
    923                 #endif
     843                ready_schedule_lock();
     844                ret = __cfa_io_drain( proc );
     845                ready_schedule_unlock();
    924846        #endif
    925847        return ret;
Note: See TracChangeset for help on using the changeset viewer.