Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/ready_queue.cfa

    r8fc652e0 rfc59df78  
    1717// #define __CFA_DEBUG_PRINT_READY_QUEUE__
    1818
    19 // #define USE_SNZI
     19// #define USE_MPSC
     20
     21#define USE_RELAXED_FIFO
     22// #define USE_WORK_STEALING
    2023
    2124#include "bits/defs.hfa"
     
    2831#include <unistd.h>
    2932
    30 #include "snzi.hfa"
    3133#include "ready_subqueue.hfa"
    3234
    3335static const size_t cache_line_size = 64;
     36
     37#if !defined(__CFA_NO_STATISTICS__)
     38        #define __STATS(...) __VA_ARGS__
     39#else
     40        #define __STATS(...)
     41#endif
    3442
    3543// No overriden function, no environment variable, no define
     
    3947#endif
    4048
    41 #define BIAS 16
     49#if   defined(USE_RELAXED_FIFO)
     50        #define BIAS 4
     51        #define READYQ_SHARD_FACTOR 4
     52        #define SEQUENTIAL_SHARD 1
     53#elif defined(USE_WORK_STEALING)
     54        #define READYQ_SHARD_FACTOR 2
     55        #define SEQUENTIAL_SHARD 2
     56#else
     57        #error no scheduling strategy selected
     58#endif
     59
     60static inline struct $thread * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats));
     61static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats));
     62static inline struct $thread * search(struct cluster * cltr);
     63static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred);
     64
    4265
    4366// returns the maximum number of processors the RWLock support
     
    93116//=======================================================================
    94117// Lock-Free registering/unregistering of threads
    95 unsigned doregister( struct __processor_id_t * proc ) with(*__scheduler_lock) {
     118void register_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) {
    96119        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
    97120
     
    107130                        /*paranoid*/ verify(0 == (__alignof__(data[i]) % cache_line_size));
    108131                        /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
    109                         return i;
     132                        proc->id = i;
    110133                }
    111134        }
     
    134157        /*paranoid*/ verify(__alignof__(data[n]) == (2 * cache_line_size));
    135158        /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
    136         return n;
    137 }
    138 
    139 void unregister( struct __processor_id_t * proc ) with(*__scheduler_lock) {
     159        proc->id = n;
     160}
     161
     162void unregister_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) {
    140163        unsigned id = proc->id;
    141164        /*paranoid*/ verify(id < ready);
     
    192215
    193216//=======================================================================
    194 // Cforall Reqdy Queue used for scheduling
     217// Cforall Ready Queue used for scheduling
    195218//=======================================================================
    196219void ?{}(__ready_queue_t & this) with (this) {
    197220        lanes.data  = 0p;
     221        lanes.tscs  = 0p;
    198222        lanes.count = 0;
    199223}
    200224
    201225void ^?{}(__ready_queue_t & this) with (this) {
    202         verify( 1 == lanes.count );
    203         #ifdef USE_SNZI
    204                 verify( !query( snzi ) );
    205         #endif
     226        verify( SEQUENTIAL_SHARD == lanes.count );
    206227        free(lanes.data);
     228        free(lanes.tscs);
    207229}
    208230
    209231//-----------------------------------------------------------------------
    210 __attribute__((hot)) bool query(struct cluster * cltr) {
    211         #ifdef USE_SNZI
    212                 return query(cltr->ready_queue.snzi);
    213         #endif
    214         return true;
    215 }
    216 
    217 static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred) {
    218         unsigned i;
    219         bool local;
    220         #if defined(BIAS)
     232#if defined(USE_RELAXED_FIFO)
     233        //-----------------------------------------------------------------------
     234        // get index from random number with or without bias towards queues
     235        static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred) {
     236                unsigned i;
     237                bool local;
    221238                unsigned rlow  = r % BIAS;
    222239                unsigned rhigh = r / BIAS;
     
    224241                        // (BIAS - 1) out of BIAS chances
    225242                        // Use perferred queues
    226                         i = preferred + (rhigh % 4);
     243                        i = preferred + (rhigh % READYQ_SHARD_FACTOR);
    227244                        local = true;
    228245                }
     
    233250                        local = false;
    234251                }
    235         #else
    236                 i = r;
    237                 local = false;
     252                return [i, local];
     253        }
     254
     255        __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
     256                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
     257
     258                const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
     259                /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
     260
     261                // write timestamp
     262                thrd->link.ts = rdtscl();
     263
     264                bool local;
     265                int preferred = external ? -1 : kernelTLS().this_processor->rdq.id;
     266
     267                // Try to pick a lane and lock it
     268                unsigned i;
     269                do {
     270                        // Pick the index of a lane
     271                        unsigned r = __tls_rand_fwd();
     272                        [i, local] = idx_from_r(r, preferred);
     273
     274                        i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
     275
     276                        #if !defined(__CFA_NO_STATISTICS__)
     277                                if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED);
     278                                else if(local) __tls_stats()->ready.push.local.attempt++;
     279                                else __tls_stats()->ready.push.share.attempt++;
     280                        #endif
     281
     282                #if defined(USE_MPSC)
     283                        // mpsc always succeeds
     284                } while( false );
     285                #else
     286                        // If we can't lock it retry
     287                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     288                #endif
     289
     290                // Actually push it
     291                push(lanes.data[i], thrd);
     292
     293                #if !defined(USE_MPSC)
     294                        // Unlock and return
     295                        __atomic_unlock( &lanes.data[i].lock );
     296                #endif
     297
     298                // Mark the current index in the tls rng instance as having an item
     299                __tls_rand_advance_bck();
     300
     301                __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
     302
     303                // Update statistics
     304                #if !defined(__CFA_NO_STATISTICS__)
     305                        if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
     306                        else if(local) __tls_stats()->ready.push.local.success++;
     307                        else __tls_stats()->ready.push.share.success++;
     308                #endif
     309        }
     310
     311        // Pop from the ready queue from a given cluster
     312        __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
     313                /* paranoid */ verify( lanes.count > 0 );
     314                /* paranoid */ verify( kernelTLS().this_processor );
     315                /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
     316
     317                unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
     318                int preferred = kernelTLS().this_processor->rdq.id;
     319
     320
     321                // As long as the list is not empty, try finding a lane that isn't empty and pop from it
     322                for(25) {
     323                        // Pick two lists at random
     324                        unsigned ri = __tls_rand_bck();
     325                        unsigned rj = __tls_rand_bck();
     326
     327                        unsigned i, j;
     328                        __attribute__((unused)) bool locali, localj;
     329                        [i, locali] = idx_from_r(ri, preferred);
     330                        [j, localj] = idx_from_r(rj, preferred);
     331
     332                        i %= count;
     333                        j %= count;
     334
     335                        // try popping from the 2 picked lists
     336                        struct $thread * thrd = try_pop(cltr, i, j __STATS(, *(locali || localj ? &__tls_stats()->ready.pop.local : &__tls_stats()->ready.pop.help)));
     337                        if(thrd) {
     338                                return thrd;
     339                        }
     340                }
     341
     342                // All lanes where empty return 0p
     343                return 0p;
     344        }
     345
     346        __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) { return pop_fast(cltr); }
     347        __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) {
     348                return search(cltr);
     349        }
     350#endif
     351#if defined(USE_WORK_STEALING)
     352        __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
     353                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
     354
     355                const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
     356                /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
     357
     358                // write timestamp
     359                thrd->link.ts = rdtscl();
     360
     361                // Try to pick a lane and lock it
     362                unsigned i;
     363                do {
     364                        #if !defined(__CFA_NO_STATISTICS__)
     365                                if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED);
     366                                else __tls_stats()->ready.push.local.attempt++;
     367                        #endif
     368
     369                        if(unlikely(external)) {
     370                                i = __tls_rand() % lanes.count;
     371                        }
     372                        else {
     373                                processor * proc = kernelTLS().this_processor;
     374                                unsigned r = proc->rdq.its++;
     375                                i =  proc->rdq.id + (r % READYQ_SHARD_FACTOR);
     376                        }
     377
     378
     379                #if defined(USE_MPSC)
     380                        // mpsc always succeeds
     381                } while( false );
     382                #else
     383                        // If we can't lock it retry
     384                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     385                #endif
     386
     387                // Actually push it
     388                push(lanes.data[i], thrd);
     389
     390                #if !defined(USE_MPSC)
     391                        // Unlock and return
     392                        __atomic_unlock( &lanes.data[i].lock );
     393                #endif
     394
     395                #if !defined(__CFA_NO_STATISTICS__)
     396                        if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
     397                        else __tls_stats()->ready.push.local.success++;
     398                #endif
     399
     400                __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
     401        }
     402
     403        // Pop from the ready queue from a given cluster
     404        __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
     405                /* paranoid */ verify( lanes.count > 0 );
     406                /* paranoid */ verify( kernelTLS().this_processor );
     407                /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
     408
     409                processor * proc = kernelTLS().this_processor;
     410
     411                if(proc->rdq.target == -1u) {
     412                        proc->rdq.target = __tls_rand() % lanes.count;
     413                        unsigned it1  = proc->rdq.itr;
     414                        unsigned it2  = proc->rdq.itr + 1;
     415                        unsigned idx1 = proc->rdq.id + (it1 % READYQ_SHARD_FACTOR);
     416                        unsigned idx2 = proc->rdq.id + (it2 % READYQ_SHARD_FACTOR);
     417                        unsigned long long tsc1 = ts(lanes.data[idx1]);
     418                        unsigned long long tsc2 = ts(lanes.data[idx2]);
     419                        proc->rdq.cutoff = min(tsc1, tsc2);
     420                        if(proc->rdq.cutoff == 0) proc->rdq.cutoff = -1ull;
     421                }
     422                else {
     423                        unsigned target = proc->rdq.target;
     424                        proc->rdq.target = -1u;
     425                        if(lanes.tscs[target].tv < proc->rdq.cutoff) {
     426                                $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
     427                                if(t) return t;
     428                        }
     429                }
     430
     431                for(READYQ_SHARD_FACTOR) {
     432                        unsigned i = proc->rdq.id + (--proc->rdq.itr % READYQ_SHARD_FACTOR);
     433                        if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
     434                }
     435                return 0p;
     436        }
     437
     438        __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
     439                unsigned i = __tls_rand() % lanes.count;
     440                return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
     441        }
     442
     443        __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) with (cltr->ready_queue) {
     444                return search(cltr);
     445        }
     446#endif
     447
     448//=======================================================================
     449// Various Ready Queue utilities
     450//=======================================================================
     451// these function work the same or almost the same
     452// whether they are using work-stealing or relaxed fifo scheduling
     453
     454//-----------------------------------------------------------------------
     455// try to pop from a lane given by index w
     456static inline struct $thread * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
     457        __STATS( stats.attempt++; )
     458
     459        // Get relevant elements locally
     460        __intrusive_lane_t & lane = lanes.data[w];
     461
     462        // If list looks empty retry
     463        if( is_empty(lane) ) {
     464                __STATS( stats.espec++; )
     465                return 0p;
     466        }
     467
     468        // If we can't get the lock retry
     469        if( !__atomic_try_acquire(&lane.lock) ) {
     470                __STATS( stats.elock++; )
     471                return 0p;
     472        }
     473
     474        // If list is empty, unlock and retry
     475        if( is_empty(lane) ) {
     476                __atomic_unlock(&lane.lock);
     477                __STATS( stats.eempty++; )
     478                return 0p;
     479        }
     480
     481        // Actually pop the list
     482        struct $thread * thrd;
     483        thrd = pop(lane);
     484
     485        /* paranoid */ verify(thrd);
     486        /* paranoid */ verify(lane.lock);
     487
     488        // Unlock and return
     489        __atomic_unlock(&lane.lock);
     490
     491        // Update statistics
     492        __STATS( stats.success++; )
     493
     494        #if defined(USE_WORK_STEALING)
     495                lanes.tscs[w].tv = thrd->link.ts;
    238496        #endif
    239         return [i, local];
     497
     498        // return the popped thread
     499        return thrd;
    240500}
    241501
    242502//-----------------------------------------------------------------------
    243 __attribute__((hot)) bool push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
    244         __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
    245 
    246         // write timestamp
    247         thrd->link.ts = rdtscl();
    248 
    249         __attribute__((unused)) bool local;
    250         __attribute__((unused)) int preferred;
    251         #if defined(BIAS)
    252                 preferred =
    253                         //*
    254                         kernelTLS().this_processor ? kernelTLS().this_processor->id * 4 : -1;
    255                         /*/
    256                         thrd->link.preferred * 4;
    257                         //*/
    258         #endif
    259 
    260         // Try to pick a lane and lock it
    261         unsigned i;
    262         do {
    263                 // Pick the index of a lane
    264                 // unsigned r = __tls_rand();
    265                 unsigned r = __tls_rand_fwd();
    266                 [i, local] = idx_from_r(r, preferred);
    267 
    268                 #if !defined(__CFA_NO_STATISTICS__)
    269                         if(local) {
    270                                 __tls_stats()->ready.pick.push.local++;
    271                         }
    272                 #endif
    273 
    274                 i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
    275 
    276                 #if !defined(__CFA_NO_STATISTICS__)
    277                         __tls_stats()->ready.pick.push.attempt++;
    278                 #endif
    279 
    280                 // If we can't lock it retry
    281         } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
    282 
    283         bool first = false;
    284 
    285         // Actually push it
    286         #ifdef USE_SNZI
    287                 bool lane_first =
    288         #endif
    289 
    290         push(lanes.data[i], thrd);
    291 
    292         #ifdef USE_SNZI
    293                 // If this lane used to be empty we need to do more
    294                 if(lane_first) {
    295                         // Check if the entire queue used to be empty
    296                         first = !query(snzi);
    297 
    298                         // Update the snzi
    299                         arrive( snzi, i );
    300                 }
    301         #endif
    302 
    303         __tls_rand_advance_bck();
    304 
    305         // Unlock and return
    306         __atomic_unlock( &lanes.data[i].lock );
    307 
    308         __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
    309 
    310         // Update statistics
    311         #if !defined(__CFA_NO_STATISTICS__)
    312                 #if defined(BIAS)
    313                         if( local ) __tls_stats()->ready.pick.push.lsuccess++;
    314                 #endif
    315                 __tls_stats()->ready.pick.push.success++;
    316         #endif
    317 
    318         // return whether or not the list was empty before this push
    319         return first;
    320 }
    321 
    322 static struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j);
    323 static struct $thread * try_pop(struct cluster * cltr, unsigned i);
    324 
    325 // Pop from the ready queue from a given cluster
    326 __attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) {
    327         /* paranoid */ verify( lanes.count > 0 );
    328         unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
    329         int preferred;
    330         #if defined(BIAS)
    331                 // Don't bother trying locally too much
    332                 int local_tries = 8;
    333                 preferred = kernelTLS().this_processor->id * 4;
    334         #endif
    335 
    336 
    337         // As long as the list is not empty, try finding a lane that isn't empty and pop from it
    338         #ifdef USE_SNZI
    339                 while( query(snzi) ) {
    340         #else
    341                 for(25) {
    342         #endif
    343                 // Pick two lists at random
    344                 // unsigned ri = __tls_rand();
    345                 // unsigned rj = __tls_rand();
    346                 unsigned ri = __tls_rand_bck();
    347                 unsigned rj = __tls_rand_bck();
    348 
    349                 unsigned i, j;
    350                 __attribute__((unused)) bool locali, localj;
    351                 [i, locali] = idx_from_r(ri, preferred);
    352                 [j, localj] = idx_from_r(rj, preferred);
    353 
    354                 #if !defined(__CFA_NO_STATISTICS__)
    355                         if(locali) {
    356                                 __tls_stats()->ready.pick.pop.local++;
    357                         }
    358                         if(localj) {
    359                                 __tls_stats()->ready.pick.pop.local++;
    360                         }
    361                 #endif
    362 
    363                 i %= count;
    364                 j %= count;
    365 
    366                 // try popping from the 2 picked lists
    367                 struct $thread * thrd = try_pop(cltr, i, j);
    368                 if(thrd) {
    369                         #if defined(BIAS) && !defined(__CFA_NO_STATISTICS__)
    370                                 if( locali || localj ) __tls_stats()->ready.pick.pop.lsuccess++;
    371                         #endif
    372                         return thrd;
    373                 }
    374         }
    375 
    376         // All lanes where empty return 0p
    377         return 0p;
    378 }
    379 
    380 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
     503// try to pop from any lanes making sure you don't miss any threads push
     504// before the start of the function
     505static inline struct $thread * search(struct cluster * cltr) with (cltr->ready_queue) {
    381506        /* paranoid */ verify( lanes.count > 0 );
    382507        unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
     
    384509        for(i; count) {
    385510                unsigned idx = (offset + i) % count;
    386                 struct $thread * thrd = try_pop(cltr, idx);
     511                struct $thread * thrd = try_pop(cltr, idx __STATS(, __tls_stats()->ready.pop.search));
    387512                if(thrd) {
    388513                        return thrd;
     
    394519}
    395520
    396 
    397521//-----------------------------------------------------------------------
    398 // Given 2 indexes, pick the list with the oldest push an try to pop from it
    399 static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
    400         #if !defined(__CFA_NO_STATISTICS__)
    401                 __tls_stats()->ready.pick.pop.attempt++;
    402         #endif
    403 
    404         // Pick the bet list
    405         int w = i;
    406         if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
    407                 w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
    408         }
    409 
    410         return try_pop(cltr, w);
    411 }
    412 
    413 static inline struct $thread * try_pop(struct cluster * cltr, unsigned w) with (cltr->ready_queue) {
    414         // Get relevant elements locally
    415         __intrusive_lane_t & lane = lanes.data[w];
    416 
    417         // If list looks empty retry
    418         if( is_empty(lane) ) return 0p;
    419 
    420         // If we can't get the lock retry
    421         if( !__atomic_try_acquire(&lane.lock) ) return 0p;
    422 
    423 
    424         // If list is empty, unlock and retry
    425         if( is_empty(lane) ) {
    426                 __atomic_unlock(&lane.lock);
    427                 return 0p;
    428         }
    429 
    430         // Actually pop the list
    431         struct $thread * thrd;
    432         thrd = pop(lane);
    433 
    434         /* paranoid */ verify(thrd);
    435         /* paranoid */ verify(lane.lock);
    436 
    437         #ifdef USE_SNZI
    438                 // If this was the last element in the lane
    439                 if(emptied) {
    440                         depart( snzi, w );
    441                 }
    442         #endif
    443 
    444         // Unlock and return
    445         __atomic_unlock(&lane.lock);
    446 
    447         // Update statistics
    448         #if !defined(__CFA_NO_STATISTICS__)
    449                 __tls_stats()->ready.pick.pop.success++;
    450         #endif
    451 
    452         // Update the thread bias
    453         thrd->link.preferred = w / 4;
    454 
    455         // return the popped thread
    456         return thrd;
    457 }
    458 //-----------------------------------------------------------------------
    459 
    460 bool remove_head(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
    461         for(i; lanes.count) {
    462                 __intrusive_lane_t & lane = lanes.data[i];
    463 
    464                 bool removed = false;
    465 
    466                 __atomic_acquire(&lane.lock);
    467                         if(head(lane)->link.next == thrd) {
    468                                 $thread * pthrd;
    469                                 pthrd = pop(lane);
    470 
    471                                 /* paranoid */ verify( pthrd == thrd );
    472 
    473                                 removed = true;
    474                                 #ifdef USE_SNZI
    475                                         if(emptied) {
    476                                                 depart( snzi, i );
    477                                         }
    478                                 #endif
    479                         }
    480                 __atomic_unlock(&lane.lock);
    481 
    482                 if( removed ) return true;
    483         }
    484         return false;
    485 }
    486 
    487 //-----------------------------------------------------------------------
    488 
     522// Check that all the intrusive queues in the data structure are still consistent
    489523static void check( __ready_queue_t & q ) with (q) {
    490         #if defined(__CFA_WITH_VERIFY__)
     524        #if defined(__CFA_WITH_VERIFY__) && !defined(USE_MPSC)
    491525                {
    492526                        for( idx ; lanes.count ) {
     
    499533                                assert(tail(sl)->link.prev->link.next == tail(sl) );
    500534
    501                                 if(sl.before.link.ts == 0l) {
     535                                if(is_empty(sl)) {
    502536                                        assert(tail(sl)->link.prev == head(sl));
    503537                                        assert(head(sl)->link.next == tail(sl));
     
    511545}
    512546
     547//-----------------------------------------------------------------------
     548// Given 2 indexes, pick the list with the oldest push an try to pop from it
     549static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
     550        // Pick the bet list
     551        int w = i;
     552        if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
     553                w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
     554        }
     555
     556        return try_pop(cltr, w __STATS(, stats));
     557}
     558
    513559// Call this function of the intrusive list was moved using memcpy
    514560// fixes the list so that the pointers back to anchors aren't left dangling
    515561static inline void fix(__intrusive_lane_t & ll) {
    516         // if the list is not empty then follow he pointer and fix its reverse
    517         if(!is_empty(ll)) {
    518                 head(ll)->link.next->link.prev = head(ll);
    519                 tail(ll)->link.prev->link.next = tail(ll);
    520         }
    521         // Otherwise just reset the list
    522         else {
    523                 verify(tail(ll)->link.next == 0p);
    524                 tail(ll)->link.prev = head(ll);
    525                 head(ll)->link.next = tail(ll);
    526                 verify(head(ll)->link.prev == 0p);
    527         }
     562        #if !defined(USE_MPSC)
     563                // if the list is not empty then follow he pointer and fix its reverse
     564                if(!is_empty(ll)) {
     565                        head(ll)->link.next->link.prev = head(ll);
     566                        tail(ll)->link.prev->link.next = tail(ll);
     567                }
     568                // Otherwise just reset the list
     569                else {
     570                        verify(tail(ll)->link.next == 0p);
     571                        tail(ll)->link.prev = head(ll);
     572                        head(ll)->link.next = tail(ll);
     573                        verify(head(ll)->link.prev == 0p);
     574                }
     575        #endif
     576}
     577
     578static void assign_list(unsigned & value, dlist(processor, processor) & list, unsigned count) {
     579        processor * it = &list`first;
     580        for(unsigned i = 0; i < count; i++) {
     581                /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
     582                it->rdq.id = value;
     583                it->rdq.target = -1u;
     584                value += READYQ_SHARD_FACTOR;
     585                it = &(*it)`next;
     586        }
     587}
     588
     589static void reassign_cltr_id(struct cluster * cltr) {
     590        unsigned preferred = 0;
     591        assign_list(preferred, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
     592        assign_list(preferred, cltr->procs.idles  , cltr->procs.idle );
     593}
     594
     595static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) {
     596        #if defined(USE_WORK_STEALING)
     597                lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
     598                for(i; lanes.count) {
     599                        lanes.tscs[i].tv = ts(lanes.data[i]);
     600                }
     601        #endif
    528602}
    529603
    530604// Grow the ready queue
    531 void ready_queue_grow  (struct cluster * cltr, int target) {
     605void ready_queue_grow(struct cluster * cltr) {
     606        size_t ncount;
     607        int target = cltr->procs.total;
     608
    532609        /* paranoid */ verify( ready_mutate_islocked() );
    533610        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
     
    538615        // grow the ready queue
    539616        with( cltr->ready_queue ) {
    540                 #ifdef USE_SNZI
    541                         ^(snzi){};
    542                 #endif
    543 
    544617                // Find new count
    545618                // Make sure we always have atleast 1 list
    546                 size_t ncount = target >= 2 ? target * 4: 1;
     619                if(target >= 2) {
     620                        ncount = target * READYQ_SHARD_FACTOR;
     621                } else {
     622                        ncount = SEQUENTIAL_SHARD;
     623                }
    547624
    548625                // Allocate new array (uses realloc and memcpies the data)
     
    561638                // Update original
    562639                lanes.count = ncount;
    563 
    564                 #ifdef USE_SNZI
    565                         // Re-create the snzi
    566                         snzi{ log2( lanes.count / 8 ) };
    567                         for( idx; (size_t)lanes.count ) {
    568                                 if( !is_empty(lanes.data[idx]) ) {
    569                                         arrive(snzi, idx);
    570                                 }
    571                         }
    572                 #endif
    573         }
     640        }
     641
     642        fix_times(cltr);
     643
     644        reassign_cltr_id(cltr);
    574645
    575646        // Make sure that everything is consistent
     
    582653
    583654// Shrink the ready queue
    584 void ready_queue_shrink(struct cluster * cltr, int target) {
     655void ready_queue_shrink(struct cluster * cltr) {
    585656        /* paranoid */ verify( ready_mutate_islocked() );
    586657        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
     
    589660        /* paranoid */ check( cltr->ready_queue );
    590661
     662        int target = cltr->procs.total;
     663
    591664        with( cltr->ready_queue ) {
    592                 #ifdef USE_SNZI
    593                         ^(snzi){};
    594                 #endif
    595 
    596665                // Remember old count
    597666                size_t ocount = lanes.count;
     
    599668                // Find new count
    600669                // Make sure we always have atleast 1 list
    601                 lanes.count = target >= 2 ? target * 4: 1;
     670                lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD;
    602671                /* paranoid */ verify( ocount >= lanes.count );
    603                 /* paranoid */ verify( lanes.count == target * 4 || target < 2 );
     672                /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 );
    604673
    605674                // for printing count the number of displaced threads
     
    644713                        fix(lanes.data[idx]);
    645714                }
    646 
    647                 #ifdef USE_SNZI
    648                         // Re-create the snzi
    649                         snzi{ log2( lanes.count / 8 ) };
    650                         for( idx; (size_t)lanes.count ) {
    651                                 if( !is_empty(lanes.data[idx]) ) {
    652                                         arrive(snzi, idx);
    653                                 }
    654                         }
    655                 #endif
    656         }
     715        }
     716
     717        fix_times(cltr);
     718
     719        reassign_cltr_id(cltr);
    657720
    658721        // Make sure that everything is consistent
Note: See TracChangeset for help on using the changeset viewer.