Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/kernel.cfa

    rc9c1c1cb rae7adbc4  
    4242
    4343#if !defined(__CFA_NO_STATISTICS__)
    44         #define __STATS_DEF( ...) __VA_ARGS__
     44        #define __STATS( ...) __VA_ARGS__
    4545#else
    46         #define __STATS_DEF( ...)
     46        #define __STATS( ...)
    4747#endif
    4848
     
    122122static thread$ * __next_thread(cluster * this);
    123123static thread$ * __next_thread_slow(cluster * this);
    124 static thread$ * __next_thread_search(cluster * this);
    125124static inline bool __must_unpark( thread$ * thrd ) __attribute((nonnull(1)));
    126125static void __run_thread(processor * this, thread$ * dst);
     
    188187                MAIN_LOOP:
    189188                for() {
     189                        #define OLD_MAIN 1
     190                        #if OLD_MAIN
    190191                        // Check if there is pending io
    191192                        __maybe_io_drain( this );
     
    195196
    196197                        if( !readyThread ) {
    197                                 __IO_STATS__(true, io.flush.idle++; )
    198198                                __cfa_io_flush( this, 0 );
    199199
    200                                 readyThread = __next_thread( this->cltr );
    201                         }
    202 
    203                         if( !readyThread ) for(5) {
    204                                 __IO_STATS__(true, io.flush.idle++; )
    205 
    206200                                readyThread = __next_thread_slow( this->cltr );
    207 
    208                                 if( readyThread ) break;
    209 
    210                                 __cfa_io_flush( this, 0 );
    211201                        }
    212202
     
    216206                                if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP;
    217207
     208                                #if !defined(__CFA_NO_STATISTICS__)
     209                                        __tls_stats()->ready.sleep.halts++;
     210                                #endif
     211
    218212                                // Push self to idle stack
    219213                                if(!mark_idle(this->cltr->procs, * this)) continue MAIN_LOOP;
    220214
    221215                                // Confirm the ready-queue is empty
    222                                 readyThread = __next_thread_search( this->cltr );
     216                                readyThread = __next_thread_slow( this->cltr );
    223217                                if( readyThread ) {
    224218                                        // A thread was found, cancel the halt
    225219                                        mark_awake(this->cltr->procs, * this);
    226220
    227                                         __STATS__(true, ready.sleep.cancels++; )
     221                                        #if !defined(__CFA_NO_STATISTICS__)
     222                                                __tls_stats()->ready.sleep.cancels++;
     223                                        #endif
    228224
    229225                                        // continue the mai loop
     
    252248
    253249                        if(this->io.pending && !this->io.dirty) {
    254                                 __IO_STATS__(true, io.flush.dirty++; )
    255250                                __cfa_io_flush( this, 0 );
    256251                        }
     252
     253                        #else
     254                                #warning new kernel loop
     255                        SEARCH: {
     256                                /* paranoid */ verify( ! __preemption_enabled() );
     257
     258                                // First, lock the scheduler since we are searching for a thread
     259                                ready_schedule_lock();
     260
     261                                // Try to get the next thread
     262                                readyThread = pop_fast( this->cltr );
     263                                if(readyThread) { ready_schedule_unlock(); break SEARCH; }
     264
     265                                // If we can't find a thread, might as well flush any outstanding I/O
     266                                if(this->io.pending) { __cfa_io_flush( this, 0 ); }
     267
     268                                // Spin a little on I/O, just in case
     269                                for(5) {
     270                                        __maybe_io_drain( this );
     271                                        readyThread = pop_fast( this->cltr );
     272                                        if(readyThread) { ready_schedule_unlock(); break SEARCH; }
     273                                }
     274
     275                                // no luck, try stealing a few times
     276                                for(5) {
     277                                        if( __maybe_io_drain( this ) ) {
     278                                                readyThread = pop_fast( this->cltr );
     279                                        } else {
     280                                                readyThread = pop_slow( this->cltr );
     281                                        }
     282                                        if(readyThread) { ready_schedule_unlock(); break SEARCH; }
     283                                }
     284
     285                                // still no luck, search for a thread
     286                                readyThread = pop_search( this->cltr );
     287                                if(readyThread) { ready_schedule_unlock(); break SEARCH; }
     288
     289                                // Don't block if we are done
     290                                if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) {
     291                                        ready_schedule_unlock();
     292                                        break MAIN_LOOP;
     293                                }
     294
     295                                __STATS( __tls_stats()->ready.sleep.halts++; )
     296
     297                                // Push self to idle stack
     298                                ready_schedule_unlock();
     299                                if(!mark_idle(this->cltr->procs, * this)) goto SEARCH;
     300                                ready_schedule_lock();
     301
     302                                // Confirm the ready-queue is empty
     303                                __maybe_io_drain( this );
     304                                readyThread = pop_search( this->cltr );
     305                                ready_schedule_unlock();
     306
     307                                if( readyThread ) {
     308                                        // A thread was found, cancel the halt
     309                                        mark_awake(this->cltr->procs, * this);
     310
     311                                        __STATS( __tls_stats()->ready.sleep.cancels++; )
     312
     313                                        // continue the main loop
     314                                        break SEARCH;
     315                                }
     316
     317                                __STATS( if(this->print_halts) __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->unique_id, rdtscl()); )
     318                                __cfadbg_print_safe(runtime_core, "Kernel : core %p waiting on eventfd %d\n", this, this->idle_fd);
     319
     320                                {
     321                                        eventfd_t val;
     322                                        ssize_t ret = read( this->idle_fd, &val, sizeof(val) );
     323                                        if(ret < 0) {
     324                                                switch((int)errno) {
     325                                                case EAGAIN:
     326                                                #if EAGAIN != EWOULDBLOCK
     327                                                        case EWOULDBLOCK:
     328                                                #endif
     329                                                case EINTR:
     330                                                        // No need to do anything special here, just assume it's a legitimate wake-up
     331                                                        break;
     332                                                default:
     333                                                        abort( "KERNEL : internal error, read failure on idle eventfd, error(%d) %s.", (int)errno, strerror( (int)errno ) );
     334                                                }
     335                                        }
     336                                }
     337
     338                                        __STATS( if(this->print_halts) __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->unique_id, rdtscl()); )
     339
     340                                // We were woken up, remove self from idle
     341                                mark_awake(this->cltr->procs, * this);
     342
     343                                // DON'T just proceed, start looking again
     344                                continue MAIN_LOOP;
     345                        }
     346
     347                RUN_THREAD:
     348                        /* paranoid */ verify( ! __preemption_enabled() );
     349                        /* paranoid */ verify( readyThread );
     350
     351                        // Reset io dirty bit
     352                        this->io.dirty = false;
     353
     354                        // We found a thread run it
     355                        __run_thread(this, readyThread);
     356
     357                        // Are we done?
     358                        if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP;
     359
     360                        if(this->io.pending && !this->io.dirty) {
     361                                __cfa_io_flush( this, 0 );
     362                        }
     363
     364                        ready_schedule_lock();
     365                        __maybe_io_drain( this );
     366                        ready_schedule_unlock();
     367                        #endif
    257368                }
    258369
     
    365476                                break RUNNING;
    366477                        case TICKET_UNBLOCK:
    367                                 __STATS__(true, ready.threads.threads++; )
     478                                #if !defined(__CFA_NO_STATISTICS__)
     479                                        __tls_stats()->ready.threads.threads++;
     480                                #endif
    368481                                // This is case 2, the racy case, someone tried to run this thread before it finished blocking
    369482                                // In this case, just run it again.
     
    380493        __cfadbg_print_safe(runtime_core, "Kernel : core %p finished running thread %p\n", this, thrd_dst);
    381494
    382         __STATS__(true, ready.threads.threads--; )
     495        #if !defined(__CFA_NO_STATISTICS__)
     496                __tls_stats()->ready.threads.threads--;
     497        #endif
    383498
    384499        /* paranoid */ verify( ! __preemption_enabled() );
     
    391506        thread$ * thrd_src = kernelTLS().this_thread;
    392507
    393         __STATS_DEF( thrd_src->last_proc = kernelTLS().this_processor; )
     508        __STATS( thrd_src->last_proc = kernelTLS().this_processor; )
    394509
    395510        // Run the thread on this processor
     
    439554        /* paranoid */ verify( 0x0D15EA5E0D15EA5Ep == thrd->canary );
    440555
     556        const bool local = thrd->state != Start;
    441557        if (thrd->preempted == __NO_PREEMPTION) thrd->state = Ready;
    442558
    443559        // Dereference the thread now because once we push it, there is not guaranteed it's still valid.
    444560        struct cluster * cl = thrd->curr_cluster;
    445         __STATS_DEF(bool outside = hint == UNPARK_LOCAL && thrd->last_proc && thrd->last_proc != kernelTLS().this_processor; )
     561        __STATS(bool outside = hint == UNPARK_LOCAL && thrd->last_proc && thrd->last_proc != kernelTLS().this_processor; )
    446562
    447563        // push the thread to the cluster ready-queue
     
    494610
    495611        ready_schedule_lock();
    496                 thread$ * thrd = pop_slow( this );
    497         ready_schedule_unlock();
    498 
    499         /* paranoid */ verify( ! __preemption_enabled() );
    500         return thrd;
    501 }
    502 
    503 // KERNEL ONLY
    504 static inline thread$ * __next_thread_search(cluster * this) with( *this ) {
    505         /* paranoid */ verify( ! __preemption_enabled() );
    506 
    507         ready_schedule_lock();
    508                 thread$ * thrd = pop_search( this );
     612                thread$ * thrd;
     613                for(25) {
     614                        thrd = pop_slow( this );
     615                        if(thrd) goto RET;
     616                }
     617                thrd = pop_search( this );
     618
     619                RET:
    509620        ready_schedule_unlock();
    510621
     
    622733// Wake a thread from the front if there are any
    623734static void __wake_one(cluster * this) {
     735        /* paranoid */ verify( ! __preemption_enabled() );
     736        /* paranoid */ verify( ready_schedule_islocked() );
     737
     738        // Check if there is a sleeping processor
     739        // int fd = __atomic_load_n(&this->procs.fd, __ATOMIC_SEQ_CST);
     740        int fd = 0;
     741        if( __atomic_load_n(&this->procs.fd, __ATOMIC_SEQ_CST) != 0 ) {
     742                fd = __atomic_exchange_n(&this->procs.fd, 0, __ATOMIC_RELAXED);
     743        }
     744
     745        // If no one is sleeping, we are done
     746        if( fd == 0 ) return;
     747
     748        // We found a processor, wake it up
    624749        eventfd_t val;
    625 
    626         /* paranoid */ verify( ! __preemption_enabled() );
    627         /* paranoid */ verify( ready_schedule_islocked() );
    628 
    629         // Check if there is a sleeping processor
    630         struct __fd_waitctx * fdp = __atomic_load_n(&this->procs.fdw, __ATOMIC_SEQ_CST);
    631 
    632         // If no one is sleeping: we are done
    633         if( fdp == 0p ) return;
    634 
    635         int fd = 1;
    636         if( __atomic_load_n(&fdp->fd, __ATOMIC_SEQ_CST) != 1 ) {
    637                 fd = __atomic_exchange_n(&fdp->fd, 1, __ATOMIC_RELAXED);
    638         }
    639 
    640         switch(fd) {
    641         case 0:
    642                 // If the processor isn't ready to sleep then the exchange will already wake it up
    643                 #if !defined(__CFA_NO_STATISTICS__)
    644                         if( kernelTLS().this_stats ) { __tls_stats()->ready.sleep.early++;
    645                         } else { __atomic_fetch_add(&this->stats->ready.sleep.early, 1, __ATOMIC_RELAXED); }
    646                 #endif
    647                 break;
    648         case 1:
    649                 // If someone else already said they will wake them: we are done
    650                 #if !defined(__CFA_NO_STATISTICS__)
    651                         if( kernelTLS().this_stats ) { __tls_stats()->ready.sleep.seen++;
    652                         } else { __atomic_fetch_add(&this->stats->ready.sleep.seen, 1, __ATOMIC_RELAXED); }
    653                 #endif
    654                 break;
    655         default:
    656                 // If the processor was ready to sleep, we need to wake it up with an actual write
    657                 val = 1;
    658                 eventfd_write( fd, val );
    659 
    660                 #if !defined(__CFA_NO_STATISTICS__)
    661                         if( kernelTLS().this_stats ) { __tls_stats()->ready.sleep.wakes++;
    662                         } else { __atomic_fetch_add(&this->stats->ready.sleep.wakes, 1, __ATOMIC_RELAXED); }
    663                 #endif
    664                 break;
    665         }
     750        val = 1;
     751        eventfd_write( fd, val );
     752
     753        #if !defined(__CFA_NO_STATISTICS__)
     754                if( kernelTLS().this_stats ) {
     755                        __tls_stats()->ready.sleep.wakes++;
     756                }
     757                else {
     758                        __atomic_fetch_add(&this->stats->ready.sleep.wakes, 1, __ATOMIC_RELAXED);
     759                }
     760        #endif
    666761
    667762        /* paranoid */ verify( ready_schedule_islocked() );
     
    676771
    677772        __cfadbg_print_safe(runtime_core, "Kernel : waking Processor %p\n", this);
    678 
    679         this->idle_wctx.fd = 1;
    680773
    681774        eventfd_t val;
     
    687780
    688781static void idle_sleep(processor * this, io_future_t & future, iovec & iov) {
    689         // Tell everyone we are ready to go do sleep
    690         for() {
    691                 int expected = this->idle_wctx.fd;
    692 
    693                 // Someone already told us to wake-up! No time for a nap.
    694                 if(expected == 1) { return; }
    695 
    696                 // Try to mark that we are going to sleep
    697                 if(__atomic_compare_exchange_n(&this->idle_wctx.fd, &expected, this->idle_fd, false,  __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) {
    698                         // Every one agreed, taking a nap
    699                         break;
    700                 }
    701         }
    702 
    703 
    704782        #if !defined(CFA_WITH_IO_URING_IDLE)
    705783                #if !defined(__CFA_NO_STATISTICS__)
     
    748826
    749827static bool mark_idle(__cluster_proc_list & this, processor & proc) {
    750         __STATS__(true, ready.sleep.halts++; )
    751 
    752         proc.idle_wctx.fd = 0;
    753 
    754828        /* paranoid */ verify( ! __preemption_enabled() );
    755829        if(!try_lock( this )) return false;
     
    759833                insert_first(this.idles, proc);
    760834
    761                 __atomic_store_n(&this.fdw, &proc.idle_wctx, __ATOMIC_SEQ_CST);
     835                __atomic_store_n(&this.fd, proc.idle_fd, __ATOMIC_SEQ_CST);
    762836        unlock( this );
    763837        /* paranoid */ verify( ! __preemption_enabled() );
     
    775849
    776850                {
    777                         struct __fd_waitctx * wctx = 0;
    778                         if(!this.idles`isEmpty) wctx = &this.idles`first.idle_wctx;
    779                         __atomic_store_n(&this.fdw, wctx, __ATOMIC_SEQ_CST);
     851                        int fd = 0;
     852                        if(!this.idles`isEmpty) fd = this.idles`first.idle_fd;
     853                        __atomic_store_n(&this.fd, fd, __ATOMIC_SEQ_CST);
    780854                }
    781855
     
    841915                unsigned tail = *ctx->cq.tail;
    842916                if(head == tail) return false;
    843                 ready_schedule_lock();
    844                 ret = __cfa_io_drain( proc );
    845                 ready_schedule_unlock();
     917                #if OLD_MAIN
     918                        ready_schedule_lock();
     919                        ret = __cfa_io_drain( proc );
     920                        ready_schedule_unlock();
     921                #else
     922                        ret = __cfa_io_drain( proc );
     923                #endif
    846924        #endif
    847925        return ret;
Note: See TracChangeset for help on using the changeset viewer.