Changeset 884f3f67 for libcfa/src


Ignore:
Timestamp:
Mar 14, 2022, 2:24:51 PM (3 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, ast-experimental, enum, master, pthread-emulation, qualifiedEnum
Children:
bfb9bf5
Parents:
c42b8a1
Message:

Change how the ready queue is initialized to make it common with I/O

Location:
libcfa/src/concurrency
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/kernel.hfa

    rc42b8a1 r884f3f67  
    155155void ^?{}(__intrusive_lane_t & this);
    156156
    157 // Aligned timestamps which are used by the relaxed ready queue
     157// Aligned timestamps which are used by the ready queue and io subsystem
    158158struct __attribute__((aligned(128))) __timestamp_t {
    159159        volatile unsigned long long tv;
     
    161161};
    162162
     163static inline void  ?{}(__timestamp_t & this) { this.tv = 0; this.ma = 0; }
     164static inline void ^?{}(__timestamp_t &) {}
     165
     166
    163167struct __attribute__((aligned(16))) __cache_id_t {
    164168        volatile unsigned id;
    165169};
    166170
    167 // Aligned timestamps which are used by the relaxed ready queue
    168 struct __attribute__((aligned(128))) __help_cnts_t {
    169         volatile unsigned long long src;
    170         volatile unsigned long long dst;
    171         volatile unsigned long long tri;
    172 };
    173 
    174 static inline void  ?{}(__timestamp_t & this) { this.tv = 0; this.ma = 0; }
    175 static inline void ^?{}(__timestamp_t &) {}
    176 
    177 struct __attribute__((aligned(128))) __ready_queue_caches_t;
    178 void  ?{}(__ready_queue_caches_t & this);
    179 void ^?{}(__ready_queue_caches_t & this);
    180 
    181 //TODO adjust cache size to ARCHITECTURE
    182 // Structure holding the ready queue
    183 struct __ready_queue_t {
    184         // Data tracking the actual lanes
    185         // On a seperate cacheline from the used struct since
    186         // used can change on each push/pop but this data
    187         // only changes on shrink/grow
    188         struct {
    189                 // Arary of lanes
    190                 __intrusive_lane_t * volatile data;
    191 
    192                 // Array of times
    193                 __timestamp_t * volatile tscs;
    194 
    195                 __cache_id_t * volatile caches;
    196 
    197                 // Array of stats
    198                 __help_cnts_t * volatile help;
    199 
    200                 // Number of lanes (empty or not)
    201                 volatile size_t count;
    202         } lanes;
    203 };
    204 
    205 void  ?{}(__ready_queue_t & this);
    206 void ^?{}(__ready_queue_t & this);
     171// //TODO adjust cache size to ARCHITECTURE
     172// // Structure holding the ready queue
     173// struct __ready_queue_t {
     174//      // Data tracking the actual lanes
     175//      // On a seperate cacheline from the used struct since
     176//      // used can change on each push/pop but this data
     177//      // only changes on shrink/grow
     178//      struct {
     179//              // Arary of lanes
     180//              __intrusive_lane_t * volatile data;
     181
     182//              __cache_id_t * volatile caches;
     183
     184//              // Number of lanes (empty or not)
     185//              volatile size_t count;
     186//      } lanes;
     187// };
     188
     189// void  ?{}(__ready_queue_t & this);
     190// void ^?{}(__ready_queue_t & this);
    207191
    208192// Idle Sleep
     
    230214// Cluster
    231215struct __attribute__((aligned(128))) cluster {
    232         // Ready queue for threads
    233         __ready_queue_t ready_queue;
     216        struct {
     217                struct {
     218                        // Arary of subqueues
     219                        __intrusive_lane_t * volatile data;
     220
     221                        // Time since subqueues were processed
     222                        __timestamp_t * volatile tscs;
     223
     224                        // Number of subqueue / timestamps
     225                        size_t count;
     226                } readyQ;
     227
     228                struct {
     229                        // Number of I/O subqueues
     230                        volatile size_t count;
     231
     232                        // Time since subqueues were processed
     233                        __timestamp_t * volatile tscs;
     234                } io;
     235
     236                // Cache each kernel thread belongs to
     237                __cache_id_t * volatile caches;
     238        } sched;
     239
     240        // // Ready queue for threads
     241        // __ready_queue_t ready_queue;
    234242
    235243        // Name of the cluster
  • libcfa/src/concurrency/kernel/cluster.cfa

    rc42b8a1 r884f3f67  
    221221//-----------------------------------------------------------------------
    222222// Check that all the intrusive queues in the data structure are still consistent
    223 static void check( __ready_queue_t & q ) with (q) {
     223static void check_readyQ( cluster * cltr ) with (cltr->sched) {
    224224        #if defined(__CFA_WITH_VERIFY__)
    225225                {
    226                         for( idx ; lanes.count ) {
    227                                 __intrusive_lane_t & sl = lanes.data[idx];
    228                                 assert(!lanes.data[idx].lock);
     226                        const unsigned lanes_count = readyQ.count;
     227                        for( idx ; lanes_count ) {
     228                                __intrusive_lane_t & sl = readyQ.data[idx];
     229                                assert(!readyQ.data[idx].lock);
    229230
    230231                                        if(is_empty(sl)) {
     
    257258                it->rdq.id = value;
    258259                it->rdq.target = MAX;
    259                 value += __readyq_shard_factor;
     260                value += __shard_factor.readyq;
    260261                it = &(*it)`next;
    261262        }
     
    268269}
    269270
    270 static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) {
    271         lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
    272         for(i; lanes.count) {
    273                 lanes.tscs[i].tv = rdtscl();
    274                 lanes.tscs[i].ma = 0;
     271static void fix_times( __timestamp_t * volatile & tscs, unsigned count ) {
     272        tscs = alloc(count, tscs`realloc);
     273        for(i; count) {
     274                tscs[i].tv = rdtscl();
     275                tscs[i].ma = 0;
    275276        }
    276277}
     
    278279// Grow the ready queue
    279280void ready_queue_grow(struct cluster * cltr) {
    280         size_t ncount;
    281281        int target = cltr->procs.total;
    282282
     
    285285
    286286        // Make sure that everything is consistent
    287         /* paranoid */ check( cltr->ready_queue );
    288 
    289         // grow the ready queue
    290         with( cltr->ready_queue ) {
    291                 // Find new count
    292                 // Make sure we always have atleast 1 list
    293                 if(target >= 2) {
    294                         ncount = target * __readyq_shard_factor;
    295                 } else {
    296                         ncount = __readyq_single_shard;
    297                 }
    298 
    299                 // Allocate new array (uses realloc and memcpies the data)
    300                 lanes.data = alloc( ncount, lanes.data`realloc );
    301 
    302                 // Fix the moved data
    303                 for( idx; (size_t)lanes.count ) {
    304                         fix(lanes.data[idx]);
    305                 }
    306 
    307                 // Construct new data
    308                 for( idx; (size_t)lanes.count ~ ncount) {
    309                         (lanes.data[idx]){};
    310                 }
    311 
    312                 // Update original
    313                 lanes.count = ncount;
    314 
    315                 lanes.caches = alloc( target, lanes.caches`realloc );
    316         }
    317 
    318         fix_times(cltr);
    319 
     287        /* paranoid */ check_readyQ( cltr );
     288
     289
     290        // Find new count
     291        // Make sure we always have atleast 1 list
     292        size_t ocount = cltr->sched.readyQ.count;
     293        size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
     294
     295        // Do we have to do anything?
     296        if( ocount != ncount ) {
     297
     298                // grow the ready queue
     299                with( cltr->sched ) {
     300
     301                        // Allocate new array (uses realloc and memcpies the data)
     302                        readyQ.data = alloc( ncount, readyQ.data`realloc );
     303
     304                        // Fix the moved data
     305                        for( idx; ocount ) {
     306                                fix(readyQ.data[idx]);
     307                        }
     308
     309                        // Construct new data
     310                        for( idx; ocount ~ ncount) {
     311                                (readyQ.data[idx]){};
     312                        }
     313
     314                        // Update original count
     315                        readyQ.count = ncount;
     316                }
     317
     318
     319                fix_times(cltr->sched.readyQ.tscs, cltr->sched.readyQ.count);
     320        }
     321
     322        // realloc the caches
     323        cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
     324
     325        // reassign the clusters.
    320326        reassign_cltr_id(cltr);
    321327
    322328        // Make sure that everything is consistent
    323         /* paranoid */ check( cltr->ready_queue );
     329        /* paranoid */ check_readyQ( cltr );
     330        /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
    324331
    325332        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
     
    334341
    335342        // Make sure that everything is consistent
    336         /* paranoid */ check( cltr->ready_queue );
     343        /* paranoid */ check_readyQ( cltr );
    337344
    338345        int target = cltr->procs.total;
    339346
    340         with( cltr->ready_queue ) {
     347        with( cltr->sched ) {
    341348                // Remember old count
    342                 size_t ocount = lanes.count;
     349                size_t ocount = readyQ.count;
    343350
    344351                // Find new count
    345352                // Make sure we always have atleast 1 list
    346                 lanes.count = target >= 2 ? target * __readyq_shard_factor: __readyq_single_shard;
    347                 /* paranoid */ verify( ocount >= lanes.count );
    348                 /* paranoid */ verify( lanes.count == target * __readyq_shard_factor || target < 2 );
     353                size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
     354                /* paranoid */ verifyf( ocount >= ncount, "Error in shrinking size calculation, %zu >= %zu", ocount, ncount );
     355                /* paranoid */ verifyf( ncount == target * __shard_factor.readyq || ncount == __readyq_single_shard,
     356                /* paranoid */          "Error in shrinking size calculation, expected %u or %u, got %zu", target * __shard_factor.readyq, ncount );
     357
     358                readyQ.count = ncount;
    349359
    350360                // for printing count the number of displaced threads
     
    354364
    355365                // redistribute old data
    356                 for( idx; (size_t)lanes.count ~ ocount) {
     366                for( idx; ncount ~ ocount) {
    357367                        // Lock is not strictly needed but makes checking invariants much easier
    358                         __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
     368                        __attribute__((unused)) bool locked = __atomic_try_acquire(&readyQ.data[idx].lock);
    359369                        verify(locked);
    360370
    361371                        // As long as we can pop from this lane to push the threads somewhere else in the queue
    362                         while(!is_empty(lanes.data[idx])) {
     372                        while(!is_empty(readyQ.data[idx])) {
    363373                                struct thread$ * thrd;
    364374                                unsigned long long _;
    365                                 [thrd, _] = pop(lanes.data[idx]);
     375                                [thrd, _] = pop(readyQ.data[idx]);
    366376
    367377                                push(cltr, thrd, true);
     
    374384
    375385                        // Unlock the lane
    376                         __atomic_unlock(&lanes.data[idx].lock);
     386                        __atomic_unlock(&readyQ.data[idx].lock);
    377387
    378388                        // TODO print the queue statistics here
    379389
    380                         ^(lanes.data[idx]){};
     390                        ^(readyQ.data[idx]){};
    381391                }
    382392
     
    384394
    385395                // Allocate new array (uses realloc and memcpies the data)
    386                 lanes.data = alloc( lanes.count, lanes.data`realloc );
     396                readyQ.data = alloc( ncount, readyQ.data`realloc );
    387397
    388398                // Fix the moved data
    389                 for( idx; (size_t)lanes.count ) {
    390                         fix(lanes.data[idx]);
    391                 }
    392 
    393                 lanes.caches = alloc( target, lanes.caches`realloc );
    394         }
    395 
    396         fix_times(cltr);
     399                for( idx; ncount ) {
     400                        fix(readyQ.data[idx]);
     401                }
     402
     403                fix_times(readyQ.tscs, ncount);
     404        }
     405        cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
     406
    397407
    398408
     
    400410
    401411        // Make sure that everything is consistent
    402         /* paranoid */ check( cltr->ready_queue );
     412        /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
     413        /* paranoid */ check_readyQ( cltr );
    403414
    404415        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
    405416        /* paranoid */ verify( ready_mutate_islocked() );
     417}
     418
     419void ready_queue_close(struct cluster * cltr) {
     420        free( cltr->sched.readyQ.data );
     421        free( cltr->sched.readyQ.tscs );
     422        cltr->sched.readyQ.data = 0p;
     423        cltr->sched.readyQ.tscs = 0p;
     424        cltr->sched.readyQ.count = 0;
     425
     426        free( cltr->sched.io.tscs );
     427        free( cltr->sched.caches );
    406428}
    407429
  • libcfa/src/concurrency/kernel/startup.cfa

    rc42b8a1 r884f3f67  
    515515        this.rdq.its = 0;
    516516        this.rdq.itr = 0;
    517         this.rdq.id  = MAX;
     517        this.rdq.id  = 0;
    518518        this.rdq.target = MAX;
    519519        this.rdq.last = MAX;
     
    605605        this.name = name;
    606606        this.preemption_rate = preemption_rate;
    607         ready_queue{};
     607        this.sched.readyQ.data = 0p;
     608        this.sched.readyQ.tscs = 0p;
     609        this.sched.readyQ.count = 0;
     610        this.sched.io.tscs = 0p;
     611        this.sched.caches = 0p;
    608612
    609613        #if !defined(__CFA_NO_STATISTICS__)
     
    644648        // Unlock the RWlock
    645649        ready_mutate_unlock( last_size );
     650
     651        ready_queue_close( &this );
     652        /* paranoid */ verify( this.sched.readyQ.data == 0p );
     653        /* paranoid */ verify( this.sched.readyQ.tscs == 0p );
     654        /* paranoid */ verify( this.sched.readyQ.count == 0 );
     655        /* paranoid */ verify( this.sched.io.tscs == 0p );
     656        /* paranoid */ verify( this.sched.caches == 0p );
     657
    646658        enable_interrupts( false ); // Don't poll, could be in main cluster
     659
    647660
    648661        #if !defined(__CFA_NO_STATISTICS__)
  • libcfa/src/concurrency/kernel_private.hfa

    rc42b8a1 r884f3f67  
    366366
    367367//-----------------------------------------------------------------------
     368// Decrease the width of the ready queue (number of lanes) by 4
     369void ready_queue_close(struct cluster * cltr);
     370
     371//-----------------------------------------------------------------------
    368372// Calc moving average based on existing average, before and current time.
    369373static inline unsigned long long moving_average(unsigned long long currtsc, unsigned long long instsc, unsigned long long old_avg) {
     
    380384}
    381385
    382 static const unsigned __readyq_shard_factor = 2;
     386//-----------------------------------------------------------------------
     387// Calc age a timestamp should be before needing help.
     388forall(Data_t * | { unsigned long long ts(Data_t & this); })
     389static inline unsigned long long calc_cutoff(
     390        const unsigned long long ctsc,
     391        const processor * proc,
     392        size_t count,
     393        Data_t * data,
     394        __timestamp_t * tscs,
     395        const unsigned shard_factor
     396) {
     397        unsigned start = proc->rdq.id;
     398        unsigned long long max = 0;
     399        for(i; shard_factor) {
     400                unsigned long long ptsc = ts(data[start + i]);
     401                if(ptsc != -1ull) {
     402                        /* paranoid */ verify( start + i < count );
     403                        unsigned long long tsc = moving_average(ctsc, ptsc, tscs[start + i].ma);
     404                        if(tsc > max) max = tsc;
     405                }
     406        }
     407        return (max + 2 * max) / 2;
     408}
     409
     410static struct {
     411        const unsigned readyq;
     412} __shard_factor = { 2 };
    383413
    384414// Local Variables: //
  • libcfa/src/concurrency/ready_queue.cfa

    rc42b8a1 r884f3f67  
    4848// Cforall Ready Queue used for scheduling
    4949//=======================================================================
    50 void ?{}(__ready_queue_t & this) with (this) {
    51         lanes.data   = 0p;
    52         lanes.tscs   = 0p;
    53         lanes.caches = 0p;
    54         lanes.help   = 0p;
    55         lanes.count  = 0;
    56 }
    57 
    58 void ^?{}(__ready_queue_t & this) with (this) {
    59         free(lanes.data);
    60         free(lanes.tscs);
    61         free(lanes.caches);
    62         free(lanes.help);
    63 }
    64 
    65 //-----------------------------------------------------------------------
    66 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
     50// void ?{}(__ready_queue_t & this) with (this) {
     51//      lanes.data   = 0p;
     52//      lanes.tscs   = 0p;
     53//      lanes.caches = 0p;
     54//      lanes.count  = 0;
     55// }
     56
     57// void ^?{}(__ready_queue_t & this) with (this) {
     58//      free(lanes.data);
     59//      free(lanes.tscs);
     60//      free(lanes.caches);
     61// }
     62
     63//-----------------------------------------------------------------------
     64__attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->sched) {
    6765        processor * const proc = kernelTLS().this_processor;
    6866        const bool external = (!proc) || (cltr != proc->cltr);
    6967        const bool remote   = hint == UNPARK_REMOTE;
     68        const size_t lanes_count = readyQ.count;
     69
     70        /* paranoid */ verify( __shard_factor.readyq > 0 );
     71        /* paranoid */ verify( lanes_count > 0 );
    7072
    7173        unsigned i;
     
    7375                // Figure out where thread was last time and make sure it's valid
    7476                /* paranoid */ verify(thrd->preferred >= 0);
    75                 if(thrd->preferred * __readyq_shard_factor < lanes.count) {
    76                         /* paranoid */ verify(thrd->preferred * __readyq_shard_factor < lanes.count);
    77                         unsigned start = thrd->preferred * __readyq_shard_factor;
     77                unsigned start = thrd->preferred * __shard_factor.readyq;
     78                if(start < lanes_count) {
    7879                        do {
    7980                                unsigned r = __tls_rand();
    80                                 i = start + (r % __readyq_shard_factor);
    81                                 /* paranoid */ verify( i < lanes.count );
     81                                i = start + (r % __shard_factor.readyq);
     82                                /* paranoid */ verify( i < lanes_count );
    8283                                // If we can't lock it retry
    83                         } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     84                        } while( !__atomic_try_acquire( &readyQ.data[i].lock ) );
    8485                } else {
    8586                        do {
    86                                 i = __tls_rand() % lanes.count;
    87                         } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     87                                i = __tls_rand() % lanes_count;
     88                        } while( !__atomic_try_acquire( &readyQ.data[i].lock ) );
    8889                }
    8990        } else {
    9091                do {
    9192                        unsigned r = proc->rdq.its++;
    92                         i = proc->rdq.id + (r % __readyq_shard_factor);
    93                         /* paranoid */ verify( i < lanes.count );
     93                        i = proc->rdq.id + (r % __shard_factor.readyq);
     94                        /* paranoid */ verify( i < lanes_count );
    9495                        // If we can't lock it retry
    95                 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     96                } while( !__atomic_try_acquire( &readyQ.data[i].lock ) );
    9697        }
    9798
    9899        // Actually push it
    99         push(lanes.data[i], thrd);
     100        push(readyQ.data[i], thrd);
    100101
    101102        // Unlock and return
    102         __atomic_unlock( &lanes.data[i].lock );
     103        __atomic_unlock( &readyQ.data[i].lock );
    103104
    104105        #if !defined(__CFA_NO_STATISTICS__)
     
    108109}
    109110
    110 static inline unsigned long long calc_cutoff(const unsigned long long ctsc, const processor * proc, __ready_queue_t & rdq) {
    111         unsigned start = proc->rdq.id;
    112         unsigned long long max = 0;
    113         for(i; __readyq_shard_factor) {
    114                 unsigned long long ptsc = ts(rdq.lanes.data[start + i]);
    115                 if(ptsc != -1ull) {
    116                         /* paranoid */ verify( start + i < rdq.lanes.count );
    117                         unsigned long long tsc = moving_average(ctsc, ptsc, rdq.lanes.tscs[start + i].ma);
    118                         if(tsc > max) max = tsc;
    119                 }
    120         }
    121         return (max + 2 * max) / 2;
    122 }
    123 
    124 __attribute__((hot)) struct thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
    125         /* paranoid */ verify( lanes.count > 0 );
     111__attribute__((hot)) struct thread$ * pop_fast(struct cluster * cltr) with (cltr->sched) {
     112        const size_t lanes_count = readyQ.count;
     113
     114        /* paranoid */ verify( __shard_factor.readyq > 0 );
     115        /* paranoid */ verify( lanes_count > 0 );
    126116        /* paranoid */ verify( kernelTLS().this_processor );
    127         /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
     117        /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes_count );
    128118
    129119        processor * const proc = kernelTLS().this_processor;
    130120        unsigned this = proc->rdq.id;
    131         /* paranoid */ verify( this < lanes.count );
     121        /* paranoid */ verify( this < lanes_count );
    132122        __cfadbg_print_safe(ready_queue, "Kernel : pop from %u\n", this);
    133123
     
    140130        // Super important: don't write the same value over and over again
    141131        // We want to maximise our chances that his particular values stays in cache
    142         if(lanes.caches[this / __readyq_shard_factor].id != this_cache)
    143                 __atomic_store_n(&lanes.caches[this / __readyq_shard_factor].id, this_cache, __ATOMIC_RELAXED);
     132        if(caches[this / __shard_factor.readyq].id != this_cache)
     133                __atomic_store_n(&caches[this / __shard_factor.readyq].id, this_cache, __ATOMIC_RELAXED);
    144134
    145135        const unsigned long long ctsc = rdtscl();
     
    148138                uint64_t chaos = __tls_rand();
    149139                unsigned ext = chaos & 0xff;
    150                 unsigned other  = (chaos >> 8) % (lanes.count);
    151 
    152                 if(ext < 3 || __atomic_load_n(&lanes.caches[other / __readyq_shard_factor].id, __ATOMIC_RELAXED) == this_cache) {
     140                unsigned other  = (chaos >> 8) % (lanes_count);
     141
     142                if(ext < 3 || __atomic_load_n(&caches[other / __shard_factor.readyq].id, __ATOMIC_RELAXED) == this_cache) {
    153143                        proc->rdq.target = other;
    154144                }
     
    156146        else {
    157147                const unsigned target = proc->rdq.target;
    158                 __cfadbg_print_safe(ready_queue, "Kernel : %u considering helping %u, tcsc %llu\n", this, target, lanes.tscs[target].tv);
    159                 /* paranoid */ verify( lanes.tscs[target].tv != MAX );
    160                 if(target < lanes.count) {
    161                         const unsigned long long cutoff = calc_cutoff(ctsc, proc, cltr->ready_queue);
    162                         const unsigned long long age = moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma);
     148                __cfadbg_print_safe(ready_queue, "Kernel : %u considering helping %u, tcsc %llu\n", this, target, readyQ.tscs[target].tv);
     149                /* paranoid */ verify( readyQ.tscs[target].tv != MAX );
     150                if(target < lanes_count) {
     151                        const unsigned long long cutoff = calc_cutoff(ctsc, proc, lanes_count, cltr->sched.readyQ.data, cltr->sched.readyQ.tscs, __shard_factor.readyq);
     152                        const unsigned long long age = moving_average(ctsc, readyQ.tscs[target].tv, readyQ.tscs[target].ma);
    163153                        __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no");
    164154                        if(age > cutoff) {
     
    170160        }
    171161
    172         for(__readyq_shard_factor) {
    173                 unsigned i = this + (proc->rdq.itr++ % __readyq_shard_factor);
     162        for(__shard_factor.readyq) {
     163                unsigned i = this + (proc->rdq.itr++ % __shard_factor.readyq);
    174164                if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
    175165        }
     
    179169
    180170}
    181 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
    182         unsigned i = __tls_rand() % lanes.count;
     171__attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) {
     172        unsigned i = __tls_rand() % (cltr->sched.readyQ.count);
    183173        return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
    184174}
     
    195185//-----------------------------------------------------------------------
    196186// try to pop from a lane given by index w
    197 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
    198         /* paranoid */ verify( w < lanes.count );
     187static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->sched) {
     188        const size_t lanes_count = readyQ.count;
     189        /* paranoid */ verify( w < lanes_count );
    199190        __STATS( stats.attempt++; )
    200191
    201192        // Get relevant elements locally
    202         __intrusive_lane_t & lane = lanes.data[w];
     193        __intrusive_lane_t & lane = readyQ.data[w];
    203194
    204195        // If list looks empty retry
     
    236227        if (tsv != MAX) {
    237228                unsigned long long now = rdtscl();
    238                 unsigned long long pma = __atomic_load_n(&lanes.tscs[w].ma, __ATOMIC_RELAXED);
    239                 __atomic_store_n(&lanes.tscs[w].tv, tsv, __ATOMIC_RELAXED);
    240                 __atomic_store_n(&lanes.tscs[w].ma, moving_average(now, tsc_before, pma), __ATOMIC_RELAXED);
    241         }
    242 
    243         thrd->preferred = w / __readyq_shard_factor;
     229                unsigned long long pma = __atomic_load_n(&readyQ.tscs[w].ma, __ATOMIC_RELAXED);
     230                __atomic_store_n(&readyQ.tscs[w].tv, tsv, __ATOMIC_RELAXED);
     231                __atomic_store_n(&readyQ.tscs[w].ma, moving_average(now, tsc_before, pma), __ATOMIC_RELAXED);
     232        }
     233
     234        thrd->preferred = w / __shard_factor.readyq;
    244235
    245236        // return the popped thread
     
    250241// try to pop from any lanes making sure you don't miss any threads push
    251242// before the start of the function
    252 static inline struct thread$ * search(struct cluster * cltr) with (cltr->ready_queue) {
    253         /* paranoid */ verify( lanes.count > 0 );
    254         unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
     243static inline struct thread$ * search(struct cluster * cltr) {
     244        const size_t lanes_count = cltr->sched.readyQ.count;
     245        /* paranoid */ verify( lanes_count > 0 );
     246        unsigned count = __atomic_load_n( &lanes_count, __ATOMIC_RELAXED );
    255247        unsigned offset = __tls_rand();
    256248        for(i; count) {
     
    279271//-----------------------------------------------------------------------
    280272// Given 2 indexes, pick the list with the oldest push an try to pop from it
    281 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
     273static inline struct thread$ * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->sched) {
    282274        // Pick the bet list
    283275        int w = i;
    284         if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
    285                 w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
     276        if( __builtin_expect(!is_empty(readyQ.data[j]), true) ) {
     277                w = (ts(readyQ.data[i]) < ts(readyQ.data[j])) ? i : j;
    286278        }
    287279
Note: See TracChangeset for help on using the changeset viewer.