Changes in / [b8e7aed:153d0f52]


Ignore:
Location:
libcfa/src/concurrency
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/alarm.hfa

    rb8e7aed r153d0f52  
    2323#include "time.hfa"
    2424
    25 #include <containers/list.hfa>
     25#include "containers/list.hfa"
    2626
    2727struct $thread;
  • libcfa/src/concurrency/io/setup.cfa

    rb8e7aed r153d0f52  
    228228                if( cluster_context ) {
    229229                        cluster & cltr = *thrd.curr_cluster;
    230                         /* paranoid */ verify( cltr.nprocessors == 0 || &cltr == mainCluster );
     230                        /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster );
    231231                        /* paranoid */ verify( !ready_mutate_islocked() );
    232232
  • libcfa/src/concurrency/kernel.cfa

    rb8e7aed r153d0f52  
    8686// Kernel Scheduling logic
    8787static $thread * __next_thread(cluster * this);
    88 static bool __has_next_thread(cluster * this);
     88static $thread * __next_thread_slow(cluster * this);
    8989static void __run_thread(processor * this, $thread * dst);
    90 static bool __wake_one(struct __processor_id_t * id, cluster * cltr);
    91 static void __halt(processor * this);
    92 bool __wake_proc(processor *);
     90static void __wake_one(struct __processor_id_t * id, cluster * cltr);
     91
     92static void push  (__cluster_idles & idles, processor & proc);
     93static void remove(__cluster_idles & idles, processor & proc);
     94static [unsigned idle, unsigned total, * processor] query( & __cluster_idles idles );
     95
    9396
    9497//=============================================================================================
     
    118121
    119122                $thread * readyThread = 0p;
    120                 for( unsigned int spin_count = 0;; spin_count++ ) {
     123                MAIN_LOOP:
     124                for() {
    121125                        // Try to get the next thread
    122126                        readyThread = __next_thread( this->cltr );
    123127
    124                         // Check if we actually found a thread
    125                         if( readyThread ) {
    126                                 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    127                                 /* paranoid */ verifyf( readyThread->state == Ready || readyThread->preempted != __NO_PREEMPTION, "state : %d, preempted %d\n", readyThread->state, readyThread->preempted);
    128                                 /* paranoid */ verifyf( readyThread->link.next == 0p, "Expected null got %p", readyThread->link.next );
    129                                 __builtin_prefetch( readyThread->context.SP );
    130 
    131                                 // We found a thread run it
    132                                 __run_thread(this, readyThread);
    133 
    134                                 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
     128                        if( !readyThread ) {
     129                                readyThread = __next_thread_slow( this->cltr );
    135130                        }
    136131
    137                         if(__atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST)) break;
    138 
     132                        HALT:
    139133                        if( !readyThread ) {
    140                                 // Block until a thread is ready
    141                                 __halt(this);
     134                                // Don't block if we are done
     135                                if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP;
     136
     137                                #if !defined(__CFA_NO_STATISTICS__)
     138                                        __tls_stats()->ready.sleep.halts++;
     139                                #endif
     140
     141                                // Push self to idle stack
     142                                push(this->cltr->idles, * this);
     143
     144                                // Confirm the ready-queue is empty
     145                                readyThread = __next_thread_slow( this->cltr );
     146                                if( readyThread ) {
     147                                        // A thread was found, cancel the halt
     148                                        remove(this->cltr->idles, * this);
     149
     150                                        #if !defined(__CFA_NO_STATISTICS__)
     151                                                __tls_stats()->ready.sleep.cancels++;
     152                                        #endif
     153
     154                                        // continue the mai loop
     155                                        break HALT;
     156                                }
     157
     158                                #if !defined(__CFA_NO_STATISTICS__)
     159                                        if(this->print_halts) {
     160                                                __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->id, rdtscl());
     161                                        }
     162                                #endif
     163
     164                                wait( this->idle );
     165
     166                                #if !defined(__CFA_NO_STATISTICS__)
     167                                        if(this->print_halts) {
     168                                                __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->id, rdtscl());
     169                                        }
     170                                #endif
     171
     172                                // We were woken up, remove self from idle
     173                                remove(this->cltr->idles, * this);
     174
     175                                // DON'T just proceed, start looking again
     176                                continue MAIN_LOOP;
    142177                        }
     178
     179                        /* paranoid */ verify( readyThread );
     180
     181                        // We found a thread run it
     182                        __run_thread(this, readyThread);
     183
     184                        // Are we done?
     185                        if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP;
    143186                }
    144187
     
    165208// from the processor coroutine to the target thread
    166209static void __run_thread(processor * this, $thread * thrd_dst) {
     210        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
     211        /* paranoid */ verifyf( thrd_dst->state == Ready || thrd_dst->preempted != __NO_PREEMPTION, "state : %d, preempted %d\n", thrd_dst->state, thrd_dst->preempted);
     212        /* paranoid */ verifyf( thrd_dst->link.next == 0p, "Expected null got %p", thrd_dst->link.next );
     213        __builtin_prefetch( thrd_dst->context.SP );
     214
    167215        $coroutine * proc_cor = get_coroutine(this->runner);
    168216
     
    244292        proc_cor->state = Active;
    245293        kernelTLS.this_thread = 0p;
     294
     295        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    246296}
    247297
     
    300350        ready_schedule_lock  ( id );
    301351                push( thrd->curr_cluster, thrd );
    302 
    303                 #if !defined(__CFA_NO_STATISTICS__)
    304                         bool woke =
    305                 #endif
    306                         __wake_one(id, thrd->curr_cluster);
    307 
    308                 #if !defined(__CFA_NO_STATISTICS__)
    309                         if(woke) __tls_stats()->ready.sleep.wakes++;
    310                 #endif
     352                __wake_one(id, thrd->curr_cluster);
    311353        ready_schedule_unlock( id );
    312354
     
    315357
    316358// KERNEL ONLY
    317 static $thread * __next_thread(cluster * this) with( *this ) {
     359static inline $thread * __next_thread(cluster * this) with( *this ) {
    318360        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    319361
    320362        ready_schedule_lock  ( (__processor_id_t*)kernelTLS.this_processor );
    321                 $thread * head = pop( this );
     363                $thread * thrd = pop( this );
    322364        ready_schedule_unlock( (__processor_id_t*)kernelTLS.this_processor );
    323365
    324366        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    325         return head;
     367        return thrd;
    326368}
    327369
    328370// KERNEL ONLY
    329 static bool __has_next_thread(cluster * this) with( *this ) {
     371static inline $thread * __next_thread_slow(cluster * this) with( *this ) {
    330372        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    331373
    332374        ready_schedule_lock  ( (__processor_id_t*)kernelTLS.this_processor );
    333                 bool not_empty = query( this );
     375                $thread * thrd = pop_slow( this );
    334376        ready_schedule_unlock( (__processor_id_t*)kernelTLS.this_processor );
    335377
    336378        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    337         return not_empty;
     379        return thrd;
    338380}
    339381
     
    425467//=============================================================================================
    426468// Wake a thread from the front if there are any
    427 static bool __wake_one(struct __processor_id_t * id, cluster * this) {
     469static void __wake_one(struct __processor_id_t * id, cluster * this) {
     470        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    428471        /* paranoid */ verify( ready_schedule_islocked( id ) );
    429472
    430473        // Check if there is a sleeping processor
    431         processor * p = pop(this->idles);
     474        processor * p;
     475        unsigned idle;
     476        unsigned total;
     477        [idle, total, p] = query(this->idles);
    432478
    433479        // If no one is sleeping, we are done
    434         if( 0p == p ) return false;
     480        if( idle == 0 ) return;
    435481
    436482        // We found a processor, wake it up
    437483        post( p->idle );
    438484
    439         return true;
     485        #if !defined(__CFA_NO_STATISTICS__)
     486                __tls_stats()->ready.sleep.wakes++;
     487        #endif
     488
     489        /* paranoid */ verify( ready_schedule_islocked( id ) );
     490        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
     491
     492        return;
    440493}
    441494
    442495// Unconditionnaly wake a thread
    443 bool __wake_proc(processor * this) {
     496void __wake_proc(processor * this) {
    444497        __cfadbg_print_safe(runtime_core, "Kernel : waking Processor %p\n", this);
    445498
     
    448501                bool ret = post( this->idle );
    449502        enable_interrupts( __cfaabi_dbg_ctx );
    450 
    451         return ret;
    452 }
    453 
    454 static void __halt(processor * this) with( *this ) {
    455         if( do_terminate ) return;
    456 
    457         #if !defined(__CFA_NO_STATISTICS__)
    458                 __tls_stats()->ready.sleep.halts++;
    459         #endif
    460         // Push self to queue
    461         push(cltr->idles, *this);
    462 
    463         // Makre sure we don't miss a thread
    464         if( __has_next_thread(cltr) ) {
    465                 // A thread was posted, make sure a processor is woken up
    466                 struct __processor_id_t *id = (struct __processor_id_t *) this;
    467                 ready_schedule_lock  ( id );
    468                         __wake_one( id, cltr );
    469                 ready_schedule_unlock( id );
    470                 #if !defined(__CFA_NO_STATISTICS__)
    471                         __tls_stats()->ready.sleep.cancels++;
    472                 #endif
    473         }
    474 
    475         #if !defined(__CFA_NO_STATISTICS__)
    476                 if(this->print_halts) {
    477                         __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->id, rdtscl());
    478                 }
    479         #endif
    480 
    481         wait( idle );
    482 
    483         #if !defined(__CFA_NO_STATISTICS__)
    484                 if(this->print_halts) {
    485                         __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->id, rdtscl());
    486                 }
    487         #endif
     503}
     504
     505static void push  (__cluster_idles & this, processor & proc) {
     506        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
     507        lock( this );
     508                this.idle++;
     509                /* paranoid */ verify( this.idle <= this.total );
     510
     511                insert_first(this.list, proc);
     512        unlock( this );
     513        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
     514}
     515
     516static void remove(__cluster_idles & this, processor & proc) {
     517        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
     518        lock( this );
     519                this.idle--;
     520                /* paranoid */ verify( this.idle >= 0 );
     521
     522                remove(proc);
     523        unlock( this );
     524        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
     525}
     526
     527static [unsigned idle, unsigned total, * processor] query( & __cluster_idles this ) {
     528        for() {
     529                uint64_t l = __atomic_load_n(&this.lock, __ATOMIC_SEQ_CST);
     530                if( 1 == (l % 2) ) { Pause(); continue; }
     531                unsigned idle    = this.idle;
     532                unsigned total   = this.total;
     533                processor * proc = &this.list`first;
     534                if(l != __atomic_load_n(&this.lock, __ATOMIC_SEQ_CST)) { Pause(); continue; }
     535                return [idle, total, proc];
     536        }
    488537}
    489538
  • libcfa/src/concurrency/kernel.hfa

    rb8e7aed r153d0f52  
    2020#include "coroutine.hfa"
    2121
    22 #include "containers/stackLockFree.hfa"
     22#include "containers/list.hfa"
    2323
    2424extern "C" {
     
    9999
    100100        // Link lists fields
    101         Link(processor) link;
     101        DLISTED_MGD_IMPL_IN(processor)
    102102
    103103        #if !defined(__CFA_NO_STATISTICS__)
     
    119119static inline void  ?{}(processor & this, const char name[]) { this{name, *mainCluster }; }
    120120
    121 static inline Link(processor) * ?`next( processor * this ) { return &this->link; }
     121DLISTED_MGD_IMPL_OUT(processor)
    122122
    123123//-----------------------------------------------------------------------------
     
    206206void ^?{}(__ready_queue_t & this);
    207207
     208// Idle Sleep
     209struct __cluster_idles {
     210        // Spin lock protecting the queue
     211        volatile uint64_t lock;
     212
     213        // Total number of processors
     214        unsigned total;
     215
     216        // Total number of idle processors
     217        unsigned idle;
     218
     219        // List of idle processors
     220        dlist(processor, processor) list;
     221};
     222
    208223//-----------------------------------------------------------------------------
    209224// Cluster
     
    219234
    220235        // List of idle processors
    221         StackLF(processor) idles;
    222         volatile unsigned int nprocessors;
     236        __cluster_idles idles;
    223237
    224238        // List of threads
  • libcfa/src/concurrency/kernel/startup.cfa

    rb8e7aed r153d0f52  
    8787//-----------------------------------------------------------------------------
    8888// Other Forward Declarations
    89 extern bool __wake_proc(processor *);
     89extern void __wake_proc(processor *);
    9090
    9191//-----------------------------------------------------------------------------
     
    475475        #endif
    476476
    477         int target = __atomic_add_fetch( &cltr->nprocessors, 1u, __ATOMIC_SEQ_CST );
     477        lock( this.cltr->idles );
     478                int target = this.cltr->idles.total += 1u;
     479        unlock( this.cltr->idles );
    478480
    479481        id = doregister((__processor_id_t*)&this);
     
    493495// Not a ctor, it just preps the destruction but should not destroy members
    494496static void deinit(processor & this) {
    495 
    496         int target = __atomic_sub_fetch( &this.cltr->nprocessors, 1u, __ATOMIC_SEQ_CST );
     497        lock( this.cltr->idles );
     498                int target = this.cltr->idles.total -= 1u;
     499        unlock( this.cltr->idles );
    497500
    498501        // Lock the RWlock so no-one pushes/pops while we are changing the queue
     
    501504                // Adjust the ready queue size
    502505                ready_queue_shrink( this.cltr, target );
    503 
    504                 // Make sure we aren't on the idle queue
    505                 unsafe_remove( this.cltr->idles, &this );
    506506
    507507        // Unlock the RWlock
     
    545545//-----------------------------------------------------------------------------
    546546// Cluster
     547static void ?{}(__cluster_idles & this) {
     548        this.lock  = 0;
     549        this.idle  = 0;
     550        this.total = 0;
     551        (this.list){};
     552}
     553
    547554void ?{}(cluster & this, const char name[], Duration preemption_rate, unsigned num_io, const io_context_params & io_params) with( this ) {
    548555        this.name = name;
    549556        this.preemption_rate = preemption_rate;
    550         this.nprocessors = 0;
    551557        ready_queue{};
    552558
  • libcfa/src/concurrency/kernel_private.hfa

    rb8e7aed r153d0f52  
    121121void     unregister( struct __processor_id_t * proc );
    122122
     123//-----------------------------------------------------------------------
     124// Cluster idle lock/unlock
     125static inline void lock(__cluster_idles & this) {
     126        for() {
     127                uint64_t l = this.lock;
     128                if(
     129                        (0 == (l % 2))
     130                        && __atomic_compare_exchange_n(&this.lock, &l, l + 1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
     131                ) return;
     132                Pause();
     133        }
     134}
     135
     136static inline void unlock(__cluster_idles & this) {
     137        /* paranoid */ verify( 1 == (this.lock % 2) );
     138        __atomic_fetch_add( &this.lock, 1, __ATOMIC_SEQ_CST );
     139}
     140
    123141//=======================================================================
    124142// Reader-writer lock implementation
     
    248266// pop thread from the ready queue of a cluster
    249267// returns 0p if empty
     268// May return 0p spuriously
    250269__attribute__((hot)) struct $thread * pop(struct cluster * cltr);
     270
     271//-----------------------------------------------------------------------
     272// pop thread from the ready queue of a cluster
     273// returns 0p if empty
     274// guaranteed to find any threads added before this call
     275__attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr);
    251276
    252277//-----------------------------------------------------------------------
  • libcfa/src/concurrency/ready_queue.cfa

    rb8e7aed r153d0f52  
    1717// #define __CFA_DEBUG_PRINT_READY_QUEUE__
    1818
     19// #define USE_SNZI
     20
    1921#include "bits/defs.hfa"
    2022#include "kernel_private.hfa"
     
    192194void ^?{}(__ready_queue_t & this) with (this) {
    193195        verify( 1 == lanes.count );
    194         verify( !query( snzi ) );
     196        #ifdef USE_SNZI
     197                verify( !query( snzi ) );
     198        #endif
    195199        free(lanes.data);
    196200}
     
    198202//-----------------------------------------------------------------------
    199203__attribute__((hot)) bool query(struct cluster * cltr) {
    200         return query(cltr->ready_queue.snzi);
     204        #ifdef USE_SNZI
     205                return query(cltr->ready_queue.snzi);
     206        #endif
     207        return true;
    201208}
    202209
     
    262269        bool lane_first = push(lanes.data[i], thrd);
    263270
    264         // If this lane used to be empty we need to do more
    265         if(lane_first) {
    266                 // Check if the entire queue used to be empty
    267                 first = !query(snzi);
    268 
    269                 // Update the snzi
    270                 arrive( snzi, i );
    271         }
     271        #ifdef USE_SNZI
     272                // If this lane used to be empty we need to do more
     273                if(lane_first) {
     274                        // Check if the entire queue used to be empty
     275                        first = !query(snzi);
     276
     277                        // Update the snzi
     278                        arrive( snzi, i );
     279                }
     280        #endif
    272281
    273282        // Unlock and return
     
    294303__attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) {
    295304        /* paranoid */ verify( lanes.count > 0 );
     305        unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
    296306        #if defined(BIAS)
    297307                // Don't bother trying locally too much
     
    300310
    301311        // As long as the list is not empty, try finding a lane that isn't empty and pop from it
    302         while( query(snzi) ) {
     312        #ifdef USE_SNZI
     313                while( query(snzi) ) {
     314        #else
     315                for(25) {
     316        #endif
    303317                // Pick two lists at random
    304318                unsigned i,j;
     
    336350                #endif
    337351
    338                 i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
    339                 j %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
     352                i %= count;
     353                j %= count;
    340354
    341355                // try popping from the 2 picked lists
     
    353367}
    354368
     369__attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
     370        /* paranoid */ verify( lanes.count > 0 );
     371        unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
     372        unsigned offset = __tls_rand();
     373        for(i; count) {
     374                unsigned idx = (offset + i) % count;
     375                struct $thread * thrd = try_pop(cltr, idx);
     376                if(thrd) {
     377                        return thrd;
     378                }
     379        }
     380
     381        // All lanes where empty return 0p
     382        return 0p;
     383}
     384
     385
    355386//-----------------------------------------------------------------------
    356387// Given 2 indexes, pick the list with the oldest push an try to pop from it
     
    394425        /* paranoid */ verify(lane.lock);
    395426
    396         // If this was the last element in the lane
    397         if(emptied) {
    398                 depart( snzi, w );
    399         }
     427        #ifdef USE_SNZI
     428                // If this was the last element in the lane
     429                if(emptied) {
     430                        depart( snzi, w );
     431                }
     432        #endif
    400433
    401434        // Unlock and return
     
    430463
    431464                                removed = true;
    432                                 if(emptied) {
    433                                         depart( snzi, i );
    434                                 }
     465                                #ifdef USE_SNZI
     466                                        if(emptied) {
     467                                                depart( snzi, i );
     468                                        }
     469                                #endif
    435470                        }
    436471                __atomic_unlock(&lane.lock);
     
    494529        // grow the ready queue
    495530        with( cltr->ready_queue ) {
    496                 ^(snzi){};
     531                #ifdef USE_SNZI
     532                        ^(snzi){};
     533                #endif
    497534
    498535                // Find new count
     
    516553                lanes.count = ncount;
    517554
    518                 // Re-create the snzi
    519                 snzi{ log2( lanes.count / 8 ) };
    520                 for( idx; (size_t)lanes.count ) {
    521                         if( !is_empty(lanes.data[idx]) ) {
    522                                 arrive(snzi, idx);
    523                         }
    524                 }
     555                #ifdef USE_SNZI
     556                        // Re-create the snzi
     557                        snzi{ log2( lanes.count / 8 ) };
     558                        for( idx; (size_t)lanes.count ) {
     559                                if( !is_empty(lanes.data[idx]) ) {
     560                                        arrive(snzi, idx);
     561                                }
     562                        }
     563                #endif
    525564        }
    526565
     
    542581
    543582        with( cltr->ready_queue ) {
    544                 ^(snzi){};
     583                #ifdef USE_SNZI
     584                        ^(snzi){};
     585                #endif
    545586
    546587                // Remember old count
     
    596637                }
    597638
    598                 // Re-create the snzi
    599                 snzi{ log2( lanes.count / 8 ) };
    600                 for( idx; (size_t)lanes.count ) {
    601                         if( !is_empty(lanes.data[idx]) ) {
    602                                 arrive(snzi, idx);
    603                         }
    604                 }
     639                #ifdef USE_SNZI
     640                        // Re-create the snzi
     641                        snzi{ log2( lanes.count / 8 ) };
     642                        for( idx; (size_t)lanes.count ) {
     643                                if( !is_empty(lanes.data[idx]) ) {
     644                                        arrive(snzi, idx);
     645                                }
     646                        }
     647                #endif
    605648        }
    606649
Note: See TracChangeset for help on using the changeset viewer.