Changes in / [8465b4d:e2702fd]


Ignore:
Location:
libcfa/src/concurrency
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/alarm.hfa

    r8465b4d re2702fd  
    2323#include "time.hfa"
    2424
    25 #include "containers/list.hfa"
     25#include <containers/list.hfa>
    2626
    2727struct $thread;
  • libcfa/src/concurrency/io/setup.cfa

    r8465b4d re2702fd  
    228228                if( cluster_context ) {
    229229                        cluster & cltr = *thrd.curr_cluster;
    230                         /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster );
     230                        /* paranoid */ verify( cltr.nprocessors == 0 || &cltr == mainCluster );
    231231                        /* paranoid */ verify( !ready_mutate_islocked() );
    232232
  • libcfa/src/concurrency/kernel.cfa

    r8465b4d re2702fd  
    8686// Kernel Scheduling logic
    8787static $thread * __next_thread(cluster * this);
    88 static $thread * __next_thread_slow(cluster * this);
     88static bool __has_next_thread(cluster * this);
    8989static void __run_thread(processor * this, $thread * dst);
    90 static void __wake_one(struct __processor_id_t * id, cluster * cltr);
    91 
    92 static void push  (__cluster_idles & idles, processor & proc);
    93 static void remove(__cluster_idles & idles, processor & proc);
    94 static [unsigned idle, unsigned total, * processor] query( & __cluster_idles idles );
    95 
     90static bool __wake_one(struct __processor_id_t * id, cluster * cltr);
     91static void __halt(processor * this);
     92bool __wake_proc(processor *);
    9693
    9794//=============================================================================================
     
    121118
    122119                $thread * readyThread = 0p;
    123                 MAIN_LOOP:
    124                 for() {
     120                for( unsigned int spin_count = 0;; spin_count++ ) {
    125121                        // Try to get the next thread
    126122                        readyThread = __next_thread( this->cltr );
    127123
     124                        // Check if we actually found a thread
     125                        if( readyThread ) {
     126                                /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
     127                                /* paranoid */ verifyf( readyThread->state == Ready || readyThread->preempted != __NO_PREEMPTION, "state : %d, preempted %d\n", readyThread->state, readyThread->preempted);
     128                                /* paranoid */ verifyf( readyThread->link.next == 0p, "Expected null got %p", readyThread->link.next );
     129                                __builtin_prefetch( readyThread->context.SP );
     130
     131                                // We found a thread run it
     132                                __run_thread(this, readyThread);
     133
     134                                /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
     135                        }
     136
     137                        if(__atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST)) break;
     138
    128139                        if( !readyThread ) {
    129                                 readyThread = __next_thread_slow( this->cltr );
     140                                // Block until a thread is ready
     141                                __halt(this);
    130142                        }
    131 
    132                         HALT:
    133                         if( !readyThread ) {
    134                                 // Don't block if we are done
    135                                 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP;
    136 
    137                                 #if !defined(__CFA_NO_STATISTICS__)
    138                                         __tls_stats()->ready.sleep.halts++;
    139                                 #endif
    140 
    141                                 // Push self to idle stack
    142                                 push(this->cltr->idles, * this);
    143 
    144                                 // Confirm the ready-queue is empty
    145                                 readyThread = __next_thread_slow( this->cltr );
    146                                 if( readyThread ) {
    147                                         // A thread was found, cancel the halt
    148                                         remove(this->cltr->idles, * this);
    149 
    150                                         #if !defined(__CFA_NO_STATISTICS__)
    151                                                 __tls_stats()->ready.sleep.cancels++;
    152                                         #endif
    153 
    154                                         // continue the mai loop
    155                                         break HALT;
    156                                 }
    157 
    158                                 #if !defined(__CFA_NO_STATISTICS__)
    159                                         if(this->print_halts) {
    160                                                 __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->id, rdtscl());
    161                                         }
    162                                 #endif
    163 
    164                                 wait( this->idle );
    165 
    166                                 #if !defined(__CFA_NO_STATISTICS__)
    167                                         if(this->print_halts) {
    168                                                 __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->id, rdtscl());
    169                                         }
    170                                 #endif
    171 
    172                                 // We were woken up, remove self from idle
    173                                 remove(this->cltr->idles, * this);
    174 
    175                                 // DON'T just proceed, start looking again
    176                                 continue MAIN_LOOP;
    177                         }
    178 
    179                         /* paranoid */ verify( readyThread );
    180 
    181                         // We found a thread run it
    182                         __run_thread(this, readyThread);
    183 
    184                         // Are we done?
    185                         if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP;
    186143                }
    187144
     
    208165// from the processor coroutine to the target thread
    209166static void __run_thread(processor * this, $thread * thrd_dst) {
    210         /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    211         /* paranoid */ verifyf( thrd_dst->state == Ready || thrd_dst->preempted != __NO_PREEMPTION, "state : %d, preempted %d\n", thrd_dst->state, thrd_dst->preempted);
    212         /* paranoid */ verifyf( thrd_dst->link.next == 0p, "Expected null got %p", thrd_dst->link.next );
    213         __builtin_prefetch( thrd_dst->context.SP );
    214 
    215167        $coroutine * proc_cor = get_coroutine(this->runner);
    216168
     
    292244        proc_cor->state = Active;
    293245        kernelTLS.this_thread = 0p;
    294 
    295         /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    296246}
    297247
     
    350300        ready_schedule_lock  ( id );
    351301                push( thrd->curr_cluster, thrd );
    352                 __wake_one(id, thrd->curr_cluster);
     302
     303                #if !defined(__CFA_NO_STATISTICS__)
     304                        bool woke =
     305                #endif
     306                        __wake_one(id, thrd->curr_cluster);
     307
     308                #if !defined(__CFA_NO_STATISTICS__)
     309                        if(woke) __tls_stats()->ready.sleep.wakes++;
     310                #endif
    353311        ready_schedule_unlock( id );
    354312
     
    357315
    358316// KERNEL ONLY
    359 static inline $thread * __next_thread(cluster * this) with( *this ) {
     317static $thread * __next_thread(cluster * this) with( *this ) {
    360318        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    361319
    362320        ready_schedule_lock  ( (__processor_id_t*)kernelTLS.this_processor );
    363                 $thread * thrd = pop( this );
     321                $thread * head = pop( this );
    364322        ready_schedule_unlock( (__processor_id_t*)kernelTLS.this_processor );
    365323
    366324        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    367         return thrd;
     325        return head;
    368326}
    369327
    370328// KERNEL ONLY
    371 static inline $thread * __next_thread_slow(cluster * this) with( *this ) {
     329static bool __has_next_thread(cluster * this) with( *this ) {
    372330        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    373331
    374332        ready_schedule_lock  ( (__processor_id_t*)kernelTLS.this_processor );
    375                 $thread * thrd = pop_slow( this );
     333                bool not_empty = query( this );
    376334        ready_schedule_unlock( (__processor_id_t*)kernelTLS.this_processor );
    377335
    378336        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    379         return thrd;
     337        return not_empty;
    380338}
    381339
     
    467425//=============================================================================================
    468426// Wake a thread from the front if there are any
    469 static void __wake_one(struct __processor_id_t * id, cluster * this) {
    470         /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
     427static bool __wake_one(struct __processor_id_t * id, cluster * this) {
    471428        /* paranoid */ verify( ready_schedule_islocked( id ) );
    472429
    473430        // Check if there is a sleeping processor
    474         processor * p;
    475         unsigned idle;
    476         unsigned total;
    477         [idle, total, p] = query(this->idles);
     431        processor * p = pop(this->idles);
    478432
    479433        // If no one is sleeping, we are done
    480         if( idle == 0 ) return;
     434        if( 0p == p ) return false;
    481435
    482436        // We found a processor, wake it up
    483437        post( p->idle );
    484438
    485         #if !defined(__CFA_NO_STATISTICS__)
    486                 __tls_stats()->ready.sleep.wakes++;
    487         #endif
    488 
    489         /* paranoid */ verify( ready_schedule_islocked( id ) );
    490         /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    491 
    492         return;
     439        return true;
    493440}
    494441
    495442// Unconditionnaly wake a thread
    496 void __wake_proc(processor * this) {
     443bool __wake_proc(processor * this) {
    497444        __cfadbg_print_safe(runtime_core, "Kernel : waking Processor %p\n", this);
    498445
     
    501448                bool ret = post( this->idle );
    502449        enable_interrupts( __cfaabi_dbg_ctx );
    503 }
    504 
    505 static void push  (__cluster_idles & this, processor & proc) {
    506         /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    507         lock( this );
    508                 this.idle++;
    509                 /* paranoid */ verify( this.idle <= this.total );
    510 
    511                 insert_first(this.list, proc);
    512         unlock( this );
    513         /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    514 }
    515 
    516 static void remove(__cluster_idles & this, processor & proc) {
    517         /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    518         lock( this );
    519                 this.idle--;
    520                 /* paranoid */ verify( this.idle >= 0 );
    521 
    522                 remove(proc);
    523         unlock( this );
    524         /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    525 }
    526 
    527 static [unsigned idle, unsigned total, * processor] query( & __cluster_idles this ) {
    528         for() {
    529                 uint64_t l = __atomic_load_n(&this.lock, __ATOMIC_SEQ_CST);
    530                 if( 1 == (l % 2) ) { Pause(); continue; }
    531                 unsigned idle    = this.idle;
    532                 unsigned total   = this.total;
    533                 processor * proc = &this.list`first;
    534                 if(l != __atomic_load_n(&this.lock, __ATOMIC_SEQ_CST)) { Pause(); continue; }
    535                 return [idle, total, proc];
    536         }
     450
     451        return ret;
     452}
     453
     454static void __halt(processor * this) with( *this ) {
     455        if( do_terminate ) return;
     456
     457        #if !defined(__CFA_NO_STATISTICS__)
     458                __tls_stats()->ready.sleep.halts++;
     459        #endif
     460        // Push self to queue
     461        push(cltr->idles, *this);
     462
     463        // Makre sure we don't miss a thread
     464        if( __has_next_thread(cltr) ) {
     465                // A thread was posted, make sure a processor is woken up
     466                struct __processor_id_t *id = (struct __processor_id_t *) this;
     467                ready_schedule_lock  ( id );
     468                        __wake_one( id, cltr );
     469                ready_schedule_unlock( id );
     470                #if !defined(__CFA_NO_STATISTICS__)
     471                        __tls_stats()->ready.sleep.cancels++;
     472                #endif
     473        }
     474
     475        #if !defined(__CFA_NO_STATISTICS__)
     476                if(this->print_halts) {
     477                        __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->id, rdtscl());
     478                }
     479        #endif
     480
     481        wait( idle );
     482
     483        #if !defined(__CFA_NO_STATISTICS__)
     484                if(this->print_halts) {
     485                        __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->id, rdtscl());
     486                }
     487        #endif
    537488}
    538489
  • libcfa/src/concurrency/kernel.hfa

    r8465b4d re2702fd  
    2020#include "coroutine.hfa"
    2121
    22 #include "containers/list.hfa"
     22#include "containers/stackLockFree.hfa"
    2323
    2424extern "C" {
     
    9999
    100100        // Link lists fields
    101         DLISTED_MGD_IMPL_IN(processor)
     101        Link(processor) link;
    102102
    103103        #if !defined(__CFA_NO_STATISTICS__)
     
    119119static inline void  ?{}(processor & this, const char name[]) { this{name, *mainCluster }; }
    120120
    121 DLISTED_MGD_IMPL_OUT(processor)
     121static inline Link(processor) * ?`next( processor * this ) { return &this->link; }
    122122
    123123//-----------------------------------------------------------------------------
     
    206206void ^?{}(__ready_queue_t & this);
    207207
    208 // Idle Sleep
    209 struct __cluster_idles {
    210         // Spin lock protecting the queue
    211         volatile uint64_t lock;
    212 
    213         // Total number of processors
    214         unsigned total;
    215 
    216         // Total number of idle processors
    217         unsigned idle;
    218 
    219         // List of idle processors
    220         dlist(processor, processor) list;
    221 };
    222 
    223208//-----------------------------------------------------------------------------
    224209// Cluster
     
    234219
    235220        // List of idle processors
    236         __cluster_idles idles;
     221        StackLF(processor) idles;
     222        volatile unsigned int nprocessors;
    237223
    238224        // List of threads
  • libcfa/src/concurrency/kernel/startup.cfa

    r8465b4d re2702fd  
    8787//-----------------------------------------------------------------------------
    8888// Other Forward Declarations
    89 extern void __wake_proc(processor *);
     89extern bool __wake_proc(processor *);
    9090
    9191//-----------------------------------------------------------------------------
     
    475475        #endif
    476476
    477         lock( this.cltr->idles );
    478                 int target = this.cltr->idles.total += 1u;
    479         unlock( this.cltr->idles );
     477        int target = __atomic_add_fetch( &cltr->nprocessors, 1u, __ATOMIC_SEQ_CST );
    480478
    481479        id = doregister((__processor_id_t*)&this);
     
    495493// Not a ctor, it just preps the destruction but should not destroy members
    496494static void deinit(processor & this) {
    497         lock( this.cltr->idles );
    498                 int target = this.cltr->idles.total -= 1u;
    499         unlock( this.cltr->idles );
     495
     496        int target = __atomic_sub_fetch( &this.cltr->nprocessors, 1u, __ATOMIC_SEQ_CST );
    500497
    501498        // Lock the RWlock so no-one pushes/pops while we are changing the queue
     
    504501                // Adjust the ready queue size
    505502                ready_queue_shrink( this.cltr, target );
     503
     504                // Make sure we aren't on the idle queue
     505                unsafe_remove( this.cltr->idles, &this );
    506506
    507507        // Unlock the RWlock
     
    545545//-----------------------------------------------------------------------------
    546546// Cluster
    547 static void ?{}(__cluster_idles & this) {
    548         this.lock  = 0;
    549         this.idle  = 0;
    550         this.total = 0;
    551         (this.list){};
    552 }
    553 
    554547void ?{}(cluster & this, const char name[], Duration preemption_rate, unsigned num_io, const io_context_params & io_params) with( this ) {
    555548        this.name = name;
    556549        this.preemption_rate = preemption_rate;
     550        this.nprocessors = 0;
    557551        ready_queue{};
    558552
  • libcfa/src/concurrency/kernel_private.hfa

    r8465b4d re2702fd  
    121121void     unregister( struct __processor_id_t * proc );
    122122
    123 //-----------------------------------------------------------------------
    124 // Cluster idle lock/unlock
    125 static inline void lock(__cluster_idles & this) {
    126         for() {
    127                 uint64_t l = this.lock;
    128                 if(
    129                         (0 == (l % 2))
    130                         && __atomic_compare_exchange_n(&this.lock, &l, l + 1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
    131                 ) return;
    132                 Pause();
    133         }
    134 }
    135 
    136 static inline void unlock(__cluster_idles & this) {
    137         /* paranoid */ verify( 1 == (this.lock % 2) );
    138         __atomic_fetch_add( &this.lock, 1, __ATOMIC_SEQ_CST );
    139 }
    140 
    141123//=======================================================================
    142124// Reader-writer lock implementation
     
    266248// pop thread from the ready queue of a cluster
    267249// returns 0p if empty
    268 // May return 0p spuriously
    269250__attribute__((hot)) struct $thread * pop(struct cluster * cltr);
    270 
    271 //-----------------------------------------------------------------------
    272 // pop thread from the ready queue of a cluster
    273 // returns 0p if empty
    274 // guaranteed to find any threads added before this call
    275 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr);
    276251
    277252//-----------------------------------------------------------------------
  • libcfa/src/concurrency/ready_queue.cfa

    r8465b4d re2702fd  
    1717// #define __CFA_DEBUG_PRINT_READY_QUEUE__
    1818
    19 // #define USE_SNZI
    20 
    2119#include "bits/defs.hfa"
    2220#include "kernel_private.hfa"
     
    194192void ^?{}(__ready_queue_t & this) with (this) {
    195193        verify( 1 == lanes.count );
    196         #ifdef USE_SNZI
    197                 verify( !query( snzi ) );
    198         #endif
     194        verify( !query( snzi ) );
    199195        free(lanes.data);
    200196}
     
    202198//-----------------------------------------------------------------------
    203199__attribute__((hot)) bool query(struct cluster * cltr) {
    204         #ifdef USE_SNZI
    205                 return query(cltr->ready_queue.snzi);
    206         #endif
    207         return true;
     200        return query(cltr->ready_queue.snzi);
    208201}
    209202
     
    269262        bool lane_first = push(lanes.data[i], thrd);
    270263
    271         #ifdef USE_SNZI
    272                 // If this lane used to be empty we need to do more
    273                 if(lane_first) {
    274                         // Check if the entire queue used to be empty
    275                         first = !query(snzi);
    276 
    277                         // Update the snzi
    278                         arrive( snzi, i );
    279                 }
    280         #endif
     264        // If this lane used to be empty we need to do more
     265        if(lane_first) {
     266                // Check if the entire queue used to be empty
     267                first = !query(snzi);
     268
     269                // Update the snzi
     270                arrive( snzi, i );
     271        }
    281272
    282273        // Unlock and return
     
    303294__attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) {
    304295        /* paranoid */ verify( lanes.count > 0 );
    305         unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
    306296        #if defined(BIAS)
    307297                // Don't bother trying locally too much
     
    310300
    311301        // As long as the list is not empty, try finding a lane that isn't empty and pop from it
    312         #ifdef USE_SNZI
    313                 while( query(snzi) ) {
    314         #else
    315                 for(25) {
    316         #endif
     302        while( query(snzi) ) {
    317303                // Pick two lists at random
    318304                unsigned i,j;
     
    350336                #endif
    351337
    352                 i %= count;
    353                 j %= count;
     338                i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
     339                j %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
    354340
    355341                // try popping from the 2 picked lists
     
    367353}
    368354
    369 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
    370         /* paranoid */ verify( lanes.count > 0 );
    371         unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
    372         unsigned offset = __tls_rand();
    373         for(i; count) {
    374                 unsigned idx = (offset + i) % count;
    375                 struct $thread * thrd = try_pop(cltr, idx);
    376                 if(thrd) {
    377                         return thrd;
    378                 }
    379         }
    380 
    381         // All lanes where empty return 0p
    382         return 0p;
    383 }
    384 
    385 
    386355//-----------------------------------------------------------------------
    387356// Given 2 indexes, pick the list with the oldest push an try to pop from it
     
    425394        /* paranoid */ verify(lane.lock);
    426395
    427         #ifdef USE_SNZI
    428                 // If this was the last element in the lane
    429                 if(emptied) {
    430                         depart( snzi, w );
    431                 }
    432         #endif
     396        // If this was the last element in the lane
     397        if(emptied) {
     398                depart( snzi, w );
     399        }
    433400
    434401        // Unlock and return
     
    463430
    464431                                removed = true;
    465                                 #ifdef USE_SNZI
    466                                         if(emptied) {
    467                                                 depart( snzi, i );
    468                                         }
    469                                 #endif
     432                                if(emptied) {
     433                                        depart( snzi, i );
     434                                }
    470435                        }
    471436                __atomic_unlock(&lane.lock);
     
    529494        // grow the ready queue
    530495        with( cltr->ready_queue ) {
    531                 #ifdef USE_SNZI
    532                         ^(snzi){};
    533                 #endif
     496                ^(snzi){};
    534497
    535498                // Find new count
     
    553516                lanes.count = ncount;
    554517
    555                 #ifdef USE_SNZI
    556                         // Re-create the snzi
    557                         snzi{ log2( lanes.count / 8 ) };
    558                         for( idx; (size_t)lanes.count ) {
    559                                 if( !is_empty(lanes.data[idx]) ) {
    560                                         arrive(snzi, idx);
    561                                 }
    562                         }
    563                 #endif
     518                // Re-create the snzi
     519                snzi{ log2( lanes.count / 8 ) };
     520                for( idx; (size_t)lanes.count ) {
     521                        if( !is_empty(lanes.data[idx]) ) {
     522                                arrive(snzi, idx);
     523                        }
     524                }
    564525        }
    565526
     
    581542
    582543        with( cltr->ready_queue ) {
    583                 #ifdef USE_SNZI
    584                         ^(snzi){};
    585                 #endif
     544                ^(snzi){};
    586545
    587546                // Remember old count
     
    637596                }
    638597
    639                 #ifdef USE_SNZI
    640                         // Re-create the snzi
    641                         snzi{ log2( lanes.count / 8 ) };
    642                         for( idx; (size_t)lanes.count ) {
    643                                 if( !is_empty(lanes.data[idx]) ) {
    644                                         arrive(snzi, idx);
    645                                 }
    646                         }
    647                 #endif
     598                // Re-create the snzi
     599                snzi{ log2( lanes.count / 8 ) };
     600                for( idx; (size_t)lanes.count ) {
     601                        if( !is_empty(lanes.data[idx]) ) {
     602                                arrive(snzi, idx);
     603                        }
     604                }
    648605        }
    649606
Note: See TracChangeset for help on using the changeset viewer.