Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/ready_queue.cfa

    rfc59df78 r8fc652e0  
    1717// #define __CFA_DEBUG_PRINT_READY_QUEUE__
    1818
    19 // #define USE_MPSC
    20 
    21 #define USE_RELAXED_FIFO
    22 // #define USE_WORK_STEALING
     19// #define USE_SNZI
    2320
    2421#include "bits/defs.hfa"
     
    3128#include <unistd.h>
    3229
     30#include "snzi.hfa"
    3331#include "ready_subqueue.hfa"
    3432
    3533static const size_t cache_line_size = 64;
    36 
    37 #if !defined(__CFA_NO_STATISTICS__)
    38         #define __STATS(...) __VA_ARGS__
    39 #else
    40         #define __STATS(...)
    41 #endif
    4234
    4335// No overriden function, no environment variable, no define
     
    4739#endif
    4840
    49 #if   defined(USE_RELAXED_FIFO)
    50         #define BIAS 4
    51         #define READYQ_SHARD_FACTOR 4
    52         #define SEQUENTIAL_SHARD 1
    53 #elif defined(USE_WORK_STEALING)
    54         #define READYQ_SHARD_FACTOR 2
    55         #define SEQUENTIAL_SHARD 2
    56 #else
    57         #error no scheduling strategy selected
    58 #endif
    59 
    60 static inline struct $thread * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats));
    61 static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats));
    62 static inline struct $thread * search(struct cluster * cltr);
    63 static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred);
    64 
     41#define BIAS 16
    6542
    6643// returns the maximum number of processors the RWLock support
     
    11693//=======================================================================
    11794// Lock-Free registering/unregistering of threads
    118 void register_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) {
     95unsigned doregister( struct __processor_id_t * proc ) with(*__scheduler_lock) {
    11996        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
    12097
     
    130107                        /*paranoid*/ verify(0 == (__alignof__(data[i]) % cache_line_size));
    131108                        /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
    132                         proc->id = i;
     109                        return i;
    133110                }
    134111        }
     
    157134        /*paranoid*/ verify(__alignof__(data[n]) == (2 * cache_line_size));
    158135        /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
    159         proc->id = n;
    160 }
    161 
    162 void unregister_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) {
     136        return n;
     137}
     138
     139void unregister( struct __processor_id_t * proc ) with(*__scheduler_lock) {
    163140        unsigned id = proc->id;
    164141        /*paranoid*/ verify(id < ready);
     
    215192
    216193//=======================================================================
    217 // Cforall Ready Queue used for scheduling
     194// Cforall Reqdy Queue used for scheduling
    218195//=======================================================================
    219196void ?{}(__ready_queue_t & this) with (this) {
    220197        lanes.data  = 0p;
    221         lanes.tscs  = 0p;
    222198        lanes.count = 0;
    223199}
    224200
    225201void ^?{}(__ready_queue_t & this) with (this) {
    226         verify( SEQUENTIAL_SHARD == lanes.count );
     202        verify( 1 == lanes.count );
     203        #ifdef USE_SNZI
     204                verify( !query( snzi ) );
     205        #endif
    227206        free(lanes.data);
    228         free(lanes.tscs);
    229207}
    230208
    231209//-----------------------------------------------------------------------
    232 #if defined(USE_RELAXED_FIFO)
    233         //-----------------------------------------------------------------------
    234         // get index from random number with or without bias towards queues
    235         static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred) {
    236                 unsigned i;
    237                 bool local;
     210__attribute__((hot)) bool query(struct cluster * cltr) {
     211        #ifdef USE_SNZI
     212                return query(cltr->ready_queue.snzi);
     213        #endif
     214        return true;
     215}
     216
     217static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred) {
     218        unsigned i;
     219        bool local;
     220        #if defined(BIAS)
    238221                unsigned rlow  = r % BIAS;
    239222                unsigned rhigh = r / BIAS;
     
    241224                        // (BIAS - 1) out of BIAS chances
    242225                        // Use perferred queues
    243                         i = preferred + (rhigh % READYQ_SHARD_FACTOR);
     226                        i = preferred + (rhigh % 4);
    244227                        local = true;
    245228                }
     
    250233                        local = false;
    251234                }
    252                 return [i, local];
    253         }
    254 
    255         __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
    256                 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
    257 
    258                 const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
    259                 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
    260 
    261                 // write timestamp
    262                 thrd->link.ts = rdtscl();
    263 
    264                 bool local;
    265                 int preferred = external ? -1 : kernelTLS().this_processor->rdq.id;
    266 
    267                 // Try to pick a lane and lock it
    268                 unsigned i;
    269                 do {
    270                         // Pick the index of a lane
    271                         unsigned r = __tls_rand_fwd();
    272                         [i, local] = idx_from_r(r, preferred);
    273 
    274                         i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
    275 
    276                         #if !defined(__CFA_NO_STATISTICS__)
    277                                 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED);
    278                                 else if(local) __tls_stats()->ready.push.local.attempt++;
    279                                 else __tls_stats()->ready.push.share.attempt++;
     235        #else
     236                i = r;
     237                local = false;
     238        #endif
     239        return [i, local];
     240}
     241
     242//-----------------------------------------------------------------------
     243__attribute__((hot)) bool push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
     244        __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
     245
     246        // write timestamp
     247        thrd->link.ts = rdtscl();
     248
     249        __attribute__((unused)) bool local;
     250        __attribute__((unused)) int preferred;
     251        #if defined(BIAS)
     252                preferred =
     253                        //*
     254                        kernelTLS().this_processor ? kernelTLS().this_processor->id * 4 : -1;
     255                        /*/
     256                        thrd->link.preferred * 4;
     257                        //*/
     258        #endif
     259
     260        // Try to pick a lane and lock it
     261        unsigned i;
     262        do {
     263                // Pick the index of a lane
     264                // unsigned r = __tls_rand();
     265                unsigned r = __tls_rand_fwd();
     266                [i, local] = idx_from_r(r, preferred);
     267
     268                #if !defined(__CFA_NO_STATISTICS__)
     269                        if(local) {
     270                                __tls_stats()->ready.pick.push.local++;
     271                        }
     272                #endif
     273
     274                i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
     275
     276                #if !defined(__CFA_NO_STATISTICS__)
     277                        __tls_stats()->ready.pick.push.attempt++;
     278                #endif
     279
     280                // If we can't lock it retry
     281        } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     282
     283        bool first = false;
     284
     285        // Actually push it
     286        #ifdef USE_SNZI
     287                bool lane_first =
     288        #endif
     289
     290        push(lanes.data[i], thrd);
     291
     292        #ifdef USE_SNZI
     293                // If this lane used to be empty we need to do more
     294                if(lane_first) {
     295                        // Check if the entire queue used to be empty
     296                        first = !query(snzi);
     297
     298                        // Update the snzi
     299                        arrive( snzi, i );
     300                }
     301        #endif
     302
     303        __tls_rand_advance_bck();
     304
     305        // Unlock and return
     306        __atomic_unlock( &lanes.data[i].lock );
     307
     308        __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
     309
     310        // Update statistics
     311        #if !defined(__CFA_NO_STATISTICS__)
     312                #if defined(BIAS)
     313                        if( local ) __tls_stats()->ready.pick.push.lsuccess++;
     314                #endif
     315                __tls_stats()->ready.pick.push.success++;
     316        #endif
     317
     318        // return whether or not the list was empty before this push
     319        return first;
     320}
     321
     322static struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j);
     323static struct $thread * try_pop(struct cluster * cltr, unsigned i);
     324
     325// Pop from the ready queue from a given cluster
     326__attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) {
     327        /* paranoid */ verify( lanes.count > 0 );
     328        unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
     329        int preferred;
     330        #if defined(BIAS)
     331                // Don't bother trying locally too much
     332                int local_tries = 8;
     333                preferred = kernelTLS().this_processor->id * 4;
     334        #endif
     335
     336
     337        // As long as the list is not empty, try finding a lane that isn't empty and pop from it
     338        #ifdef USE_SNZI
     339                while( query(snzi) ) {
     340        #else
     341                for(25) {
     342        #endif
     343                // Pick two lists at random
     344                // unsigned ri = __tls_rand();
     345                // unsigned rj = __tls_rand();
     346                unsigned ri = __tls_rand_bck();
     347                unsigned rj = __tls_rand_bck();
     348
     349                unsigned i, j;
     350                __attribute__((unused)) bool locali, localj;
     351                [i, locali] = idx_from_r(ri, preferred);
     352                [j, localj] = idx_from_r(rj, preferred);
     353
     354                #if !defined(__CFA_NO_STATISTICS__)
     355                        if(locali) {
     356                                __tls_stats()->ready.pick.pop.local++;
     357                        }
     358                        if(localj) {
     359                                __tls_stats()->ready.pick.pop.local++;
     360                        }
     361                #endif
     362
     363                i %= count;
     364                j %= count;
     365
     366                // try popping from the 2 picked lists
     367                struct $thread * thrd = try_pop(cltr, i, j);
     368                if(thrd) {
     369                        #if defined(BIAS) && !defined(__CFA_NO_STATISTICS__)
     370                                if( locali || localj ) __tls_stats()->ready.pick.pop.lsuccess++;
    280371                        #endif
    281 
    282                 #if defined(USE_MPSC)
    283                         // mpsc always succeeds
    284                 } while( false );
    285                 #else
    286                         // If we can't lock it retry
    287                 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
    288                 #endif
    289 
    290                 // Actually push it
    291                 push(lanes.data[i], thrd);
    292 
    293                 #if !defined(USE_MPSC)
    294                         // Unlock and return
    295                         __atomic_unlock( &lanes.data[i].lock );
    296                 #endif
    297 
    298                 // Mark the current index in the tls rng instance as having an item
    299                 __tls_rand_advance_bck();
    300 
    301                 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
    302 
    303                 // Update statistics
    304                 #if !defined(__CFA_NO_STATISTICS__)
    305                         if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
    306                         else if(local) __tls_stats()->ready.push.local.success++;
    307                         else __tls_stats()->ready.push.share.success++;
    308                 #endif
    309         }
    310 
    311         // Pop from the ready queue from a given cluster
    312         __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
    313                 /* paranoid */ verify( lanes.count > 0 );
    314                 /* paranoid */ verify( kernelTLS().this_processor );
    315                 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
    316 
    317                 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
    318                 int preferred = kernelTLS().this_processor->rdq.id;
    319 
    320 
    321                 // As long as the list is not empty, try finding a lane that isn't empty and pop from it
    322                 for(25) {
    323                         // Pick two lists at random
    324                         unsigned ri = __tls_rand_bck();
    325                         unsigned rj = __tls_rand_bck();
    326 
    327                         unsigned i, j;
    328                         __attribute__((unused)) bool locali, localj;
    329                         [i, locali] = idx_from_r(ri, preferred);
    330                         [j, localj] = idx_from_r(rj, preferred);
    331 
    332                         i %= count;
    333                         j %= count;
    334 
    335                         // try popping from the 2 picked lists
    336                         struct $thread * thrd = try_pop(cltr, i, j __STATS(, *(locali || localj ? &__tls_stats()->ready.pop.local : &__tls_stats()->ready.pop.help)));
    337                         if(thrd) {
    338                                 return thrd;
    339                         }
    340                 }
    341 
    342                 // All lanes where empty return 0p
    343                 return 0p;
    344         }
    345 
    346         __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) { return pop_fast(cltr); }
    347         __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) {
    348                 return search(cltr);
    349         }
    350 #endif
    351 #if defined(USE_WORK_STEALING)
    352         __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
    353                 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
    354 
    355                 const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
    356                 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
    357 
    358                 // write timestamp
    359                 thrd->link.ts = rdtscl();
    360 
    361                 // Try to pick a lane and lock it
    362                 unsigned i;
    363                 do {
    364                         #if !defined(__CFA_NO_STATISTICS__)
    365                                 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED);
    366                                 else __tls_stats()->ready.push.local.attempt++;
    367                         #endif
    368 
    369                         if(unlikely(external)) {
    370                                 i = __tls_rand() % lanes.count;
    371                         }
    372                         else {
    373                                 processor * proc = kernelTLS().this_processor;
    374                                 unsigned r = proc->rdq.its++;
    375                                 i =  proc->rdq.id + (r % READYQ_SHARD_FACTOR);
    376                         }
    377 
    378 
    379                 #if defined(USE_MPSC)
    380                         // mpsc always succeeds
    381                 } while( false );
    382                 #else
    383                         // If we can't lock it retry
    384                 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
    385                 #endif
    386 
    387                 // Actually push it
    388                 push(lanes.data[i], thrd);
    389 
    390                 #if !defined(USE_MPSC)
    391                         // Unlock and return
    392                         __atomic_unlock( &lanes.data[i].lock );
    393                 #endif
    394 
    395                 #if !defined(__CFA_NO_STATISTICS__)
    396                         if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
    397                         else __tls_stats()->ready.push.local.success++;
    398                 #endif
    399 
    400                 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
    401         }
    402 
    403         // Pop from the ready queue from a given cluster
    404         __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
    405                 /* paranoid */ verify( lanes.count > 0 );
    406                 /* paranoid */ verify( kernelTLS().this_processor );
    407                 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
    408 
    409                 processor * proc = kernelTLS().this_processor;
    410 
    411                 if(proc->rdq.target == -1u) {
    412                         proc->rdq.target = __tls_rand() % lanes.count;
    413                         unsigned it1  = proc->rdq.itr;
    414                         unsigned it2  = proc->rdq.itr + 1;
    415                         unsigned idx1 = proc->rdq.id + (it1 % READYQ_SHARD_FACTOR);
    416                         unsigned idx2 = proc->rdq.id + (it2 % READYQ_SHARD_FACTOR);
    417                         unsigned long long tsc1 = ts(lanes.data[idx1]);
    418                         unsigned long long tsc2 = ts(lanes.data[idx2]);
    419                         proc->rdq.cutoff = min(tsc1, tsc2);
    420                         if(proc->rdq.cutoff == 0) proc->rdq.cutoff = -1ull;
    421                 }
    422                 else {
    423                         unsigned target = proc->rdq.target;
    424                         proc->rdq.target = -1u;
    425                         if(lanes.tscs[target].tv < proc->rdq.cutoff) {
    426                                 $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
    427                                 if(t) return t;
    428                         }
    429                 }
    430 
    431                 for(READYQ_SHARD_FACTOR) {
    432                         unsigned i = proc->rdq.id + (--proc->rdq.itr % READYQ_SHARD_FACTOR);
    433                         if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
    434                 }
    435                 return 0p;
    436         }
    437 
    438         __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
    439                 unsigned i = __tls_rand() % lanes.count;
    440                 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
    441         }
    442 
    443         __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) with (cltr->ready_queue) {
    444                 return search(cltr);
    445         }
    446 #endif
    447 
    448 //=======================================================================
    449 // Various Ready Queue utilities
    450 //=======================================================================
    451 // these function work the same or almost the same
    452 // whether they are using work-stealing or relaxed fifo scheduling
    453 
    454 //-----------------------------------------------------------------------
    455 // try to pop from a lane given by index w
    456 static inline struct $thread * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
    457         __STATS( stats.attempt++; )
    458 
    459         // Get relevant elements locally
    460         __intrusive_lane_t & lane = lanes.data[w];
    461 
    462         // If list looks empty retry
    463         if( is_empty(lane) ) {
    464                 __STATS( stats.espec++; )
    465                 return 0p;
    466         }
    467 
    468         // If we can't get the lock retry
    469         if( !__atomic_try_acquire(&lane.lock) ) {
    470                 __STATS( stats.elock++; )
    471                 return 0p;
    472         }
    473 
    474         // If list is empty, unlock and retry
    475         if( is_empty(lane) ) {
    476                 __atomic_unlock(&lane.lock);
    477                 __STATS( stats.eempty++; )
    478                 return 0p;
    479         }
    480 
    481         // Actually pop the list
    482         struct $thread * thrd;
    483         thrd = pop(lane);
    484 
    485         /* paranoid */ verify(thrd);
    486         /* paranoid */ verify(lane.lock);
    487 
    488         // Unlock and return
    489         __atomic_unlock(&lane.lock);
    490 
    491         // Update statistics
    492         __STATS( stats.success++; )
    493 
    494         #if defined(USE_WORK_STEALING)
    495                 lanes.tscs[w].tv = thrd->link.ts;
    496         #endif
    497 
    498         // return the popped thread
    499         return thrd;
    500 }
    501 
    502 //-----------------------------------------------------------------------
    503 // try to pop from any lanes making sure you don't miss any threads push
    504 // before the start of the function
    505 static inline struct $thread * search(struct cluster * cltr) with (cltr->ready_queue) {
     372                        return thrd;
     373                }
     374        }
     375
     376        // All lanes where empty return 0p
     377        return 0p;
     378}
     379
     380__attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
    506381        /* paranoid */ verify( lanes.count > 0 );
    507382        unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
     
    509384        for(i; count) {
    510385                unsigned idx = (offset + i) % count;
    511                 struct $thread * thrd = try_pop(cltr, idx __STATS(, __tls_stats()->ready.pop.search));
     386                struct $thread * thrd = try_pop(cltr, idx);
    512387                if(thrd) {
    513388                        return thrd;
     
    519394}
    520395
     396
    521397//-----------------------------------------------------------------------
    522 // Check that all the intrusive queues in the data structure are still consistent
     398// Given 2 indexes, pick the list with the oldest push an try to pop from it
     399static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
     400        #if !defined(__CFA_NO_STATISTICS__)
     401                __tls_stats()->ready.pick.pop.attempt++;
     402        #endif
     403
     404        // Pick the bet list
     405        int w = i;
     406        if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
     407                w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
     408        }
     409
     410        return try_pop(cltr, w);
     411}
     412
     413static inline struct $thread * try_pop(struct cluster * cltr, unsigned w) with (cltr->ready_queue) {
     414        // Get relevant elements locally
     415        __intrusive_lane_t & lane = lanes.data[w];
     416
     417        // If list looks empty retry
     418        if( is_empty(lane) ) return 0p;
     419
     420        // If we can't get the lock retry
     421        if( !__atomic_try_acquire(&lane.lock) ) return 0p;
     422
     423
     424        // If list is empty, unlock and retry
     425        if( is_empty(lane) ) {
     426                __atomic_unlock(&lane.lock);
     427                return 0p;
     428        }
     429
     430        // Actually pop the list
     431        struct $thread * thrd;
     432        thrd = pop(lane);
     433
     434        /* paranoid */ verify(thrd);
     435        /* paranoid */ verify(lane.lock);
     436
     437        #ifdef USE_SNZI
     438                // If this was the last element in the lane
     439                if(emptied) {
     440                        depart( snzi, w );
     441                }
     442        #endif
     443
     444        // Unlock and return
     445        __atomic_unlock(&lane.lock);
     446
     447        // Update statistics
     448        #if !defined(__CFA_NO_STATISTICS__)
     449                __tls_stats()->ready.pick.pop.success++;
     450        #endif
     451
     452        // Update the thread bias
     453        thrd->link.preferred = w / 4;
     454
     455        // return the popped thread
     456        return thrd;
     457}
     458//-----------------------------------------------------------------------
     459
     460bool remove_head(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
     461        for(i; lanes.count) {
     462                __intrusive_lane_t & lane = lanes.data[i];
     463
     464                bool removed = false;
     465
     466                __atomic_acquire(&lane.lock);
     467                        if(head(lane)->link.next == thrd) {
     468                                $thread * pthrd;
     469                                pthrd = pop(lane);
     470
     471                                /* paranoid */ verify( pthrd == thrd );
     472
     473                                removed = true;
     474                                #ifdef USE_SNZI
     475                                        if(emptied) {
     476                                                depart( snzi, i );
     477                                        }
     478                                #endif
     479                        }
     480                __atomic_unlock(&lane.lock);
     481
     482                if( removed ) return true;
     483        }
     484        return false;
     485}
     486
     487//-----------------------------------------------------------------------
     488
    523489static void check( __ready_queue_t & q ) with (q) {
    524         #if defined(__CFA_WITH_VERIFY__) && !defined(USE_MPSC)
     490        #if defined(__CFA_WITH_VERIFY__)
    525491                {
    526492                        for( idx ; lanes.count ) {
     
    533499                                assert(tail(sl)->link.prev->link.next == tail(sl) );
    534500
    535                                 if(is_empty(sl)) {
     501                                if(sl.before.link.ts == 0l) {
    536502                                        assert(tail(sl)->link.prev == head(sl));
    537503                                        assert(head(sl)->link.next == tail(sl));
     
    545511}
    546512
    547 //-----------------------------------------------------------------------
    548 // Given 2 indexes, pick the list with the oldest push an try to pop from it
    549 static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
    550         // Pick the bet list
    551         int w = i;
    552         if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
    553                 w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
    554         }
    555 
    556         return try_pop(cltr, w __STATS(, stats));
    557 }
    558 
    559513// Call this function of the intrusive list was moved using memcpy
    560514// fixes the list so that the pointers back to anchors aren't left dangling
    561515static inline void fix(__intrusive_lane_t & ll) {
    562         #if !defined(USE_MPSC)
    563                 // if the list is not empty then follow he pointer and fix its reverse
    564                 if(!is_empty(ll)) {
    565                         head(ll)->link.next->link.prev = head(ll);
    566                         tail(ll)->link.prev->link.next = tail(ll);
    567                 }
    568                 // Otherwise just reset the list
    569                 else {
    570                         verify(tail(ll)->link.next == 0p);
    571                         tail(ll)->link.prev = head(ll);
    572                         head(ll)->link.next = tail(ll);
    573                         verify(head(ll)->link.prev == 0p);
    574                 }
    575         #endif
    576 }
    577 
    578 static void assign_list(unsigned & value, dlist(processor, processor) & list, unsigned count) {
    579         processor * it = &list`first;
    580         for(unsigned i = 0; i < count; i++) {
    581                 /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
    582                 it->rdq.id = value;
    583                 it->rdq.target = -1u;
    584                 value += READYQ_SHARD_FACTOR;
    585                 it = &(*it)`next;
    586         }
    587 }
    588 
    589 static void reassign_cltr_id(struct cluster * cltr) {
    590         unsigned preferred = 0;
    591         assign_list(preferred, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
    592         assign_list(preferred, cltr->procs.idles  , cltr->procs.idle );
    593 }
    594 
    595 static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) {
    596         #if defined(USE_WORK_STEALING)
    597                 lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
    598                 for(i; lanes.count) {
    599                         lanes.tscs[i].tv = ts(lanes.data[i]);
    600                 }
    601         #endif
     516        // if the list is not empty then follow he pointer and fix its reverse
     517        if(!is_empty(ll)) {
     518                head(ll)->link.next->link.prev = head(ll);
     519                tail(ll)->link.prev->link.next = tail(ll);
     520        }
     521        // Otherwise just reset the list
     522        else {
     523                verify(tail(ll)->link.next == 0p);
     524                tail(ll)->link.prev = head(ll);
     525                head(ll)->link.next = tail(ll);
     526                verify(head(ll)->link.prev == 0p);
     527        }
    602528}
    603529
    604530// Grow the ready queue
    605 void ready_queue_grow(struct cluster * cltr) {
    606         size_t ncount;
    607         int target = cltr->procs.total;
    608 
     531void ready_queue_grow  (struct cluster * cltr, int target) {
    609532        /* paranoid */ verify( ready_mutate_islocked() );
    610533        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
     
    615538        // grow the ready queue
    616539        with( cltr->ready_queue ) {
     540                #ifdef USE_SNZI
     541                        ^(snzi){};
     542                #endif
     543
    617544                // Find new count
    618545                // Make sure we always have atleast 1 list
    619                 if(target >= 2) {
    620                         ncount = target * READYQ_SHARD_FACTOR;
    621                 } else {
    622                         ncount = SEQUENTIAL_SHARD;
    623                 }
     546                size_t ncount = target >= 2 ? target * 4: 1;
    624547
    625548                // Allocate new array (uses realloc and memcpies the data)
     
    638561                // Update original
    639562                lanes.count = ncount;
    640         }
    641 
    642         fix_times(cltr);
    643 
    644         reassign_cltr_id(cltr);
     563
     564                #ifdef USE_SNZI
     565                        // Re-create the snzi
     566                        snzi{ log2( lanes.count / 8 ) };
     567                        for( idx; (size_t)lanes.count ) {
     568                                if( !is_empty(lanes.data[idx]) ) {
     569                                        arrive(snzi, idx);
     570                                }
     571                        }
     572                #endif
     573        }
    645574
    646575        // Make sure that everything is consistent
     
    653582
    654583// Shrink the ready queue
    655 void ready_queue_shrink(struct cluster * cltr) {
     584void ready_queue_shrink(struct cluster * cltr, int target) {
    656585        /* paranoid */ verify( ready_mutate_islocked() );
    657586        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
     
    660589        /* paranoid */ check( cltr->ready_queue );
    661590
    662         int target = cltr->procs.total;
    663 
    664591        with( cltr->ready_queue ) {
     592                #ifdef USE_SNZI
     593                        ^(snzi){};
     594                #endif
     595
    665596                // Remember old count
    666597                size_t ocount = lanes.count;
     
    668599                // Find new count
    669600                // Make sure we always have atleast 1 list
    670                 lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD;
     601                lanes.count = target >= 2 ? target * 4: 1;
    671602                /* paranoid */ verify( ocount >= lanes.count );
    672                 /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 );
     603                /* paranoid */ verify( lanes.count == target * 4 || target < 2 );
    673604
    674605                // for printing count the number of displaced threads
     
    713644                        fix(lanes.data[idx]);
    714645                }
    715         }
    716 
    717         fix_times(cltr);
    718 
    719         reassign_cltr_id(cltr);
     646
     647                #ifdef USE_SNZI
     648                        // Re-create the snzi
     649                        snzi{ log2( lanes.count / 8 ) };
     650                        for( idx; (size_t)lanes.count ) {
     651                                if( !is_empty(lanes.data[idx]) ) {
     652                                        arrive(snzi, idx);
     653                                }
     654                        }
     655                #endif
     656        }
    720657
    721658        // Make sure that everything is consistent
Note: See TracChangeset for help on using the changeset viewer.