Ignore:
Timestamp:
Mar 14, 2022, 2:24:51 PM (2 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, ast-experimental, enum, master, pthread-emulation, qualifiedEnum
Children:
bfb9bf5
Parents:
c42b8a1
Message:

Change how the ready queue is initialized to make it common with I/O

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/ready_queue.cfa

    rc42b8a1 r884f3f67  
    4848// Cforall Ready Queue used for scheduling
    4949//=======================================================================
    50 void ?{}(__ready_queue_t & this) with (this) {
    51         lanes.data   = 0p;
    52         lanes.tscs   = 0p;
    53         lanes.caches = 0p;
    54         lanes.help   = 0p;
    55         lanes.count  = 0;
    56 }
    57 
    58 void ^?{}(__ready_queue_t & this) with (this) {
    59         free(lanes.data);
    60         free(lanes.tscs);
    61         free(lanes.caches);
    62         free(lanes.help);
    63 }
    64 
    65 //-----------------------------------------------------------------------
    66 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
     50// void ?{}(__ready_queue_t & this) with (this) {
     51//      lanes.data   = 0p;
     52//      lanes.tscs   = 0p;
     53//      lanes.caches = 0p;
     54//      lanes.count  = 0;
     55// }
     56
     57// void ^?{}(__ready_queue_t & this) with (this) {
     58//      free(lanes.data);
     59//      free(lanes.tscs);
     60//      free(lanes.caches);
     61// }
     62
     63//-----------------------------------------------------------------------
     64__attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->sched) {
    6765        processor * const proc = kernelTLS().this_processor;
    6866        const bool external = (!proc) || (cltr != proc->cltr);
    6967        const bool remote   = hint == UNPARK_REMOTE;
     68        const size_t lanes_count = readyQ.count;
     69
     70        /* paranoid */ verify( __shard_factor.readyq > 0 );
     71        /* paranoid */ verify( lanes_count > 0 );
    7072
    7173        unsigned i;
     
    7375                // Figure out where thread was last time and make sure it's valid
    7476                /* paranoid */ verify(thrd->preferred >= 0);
    75                 if(thrd->preferred * __readyq_shard_factor < lanes.count) {
    76                         /* paranoid */ verify(thrd->preferred * __readyq_shard_factor < lanes.count);
    77                         unsigned start = thrd->preferred * __readyq_shard_factor;
     77                unsigned start = thrd->preferred * __shard_factor.readyq;
     78                if(start < lanes_count) {
    7879                        do {
    7980                                unsigned r = __tls_rand();
    80                                 i = start + (r % __readyq_shard_factor);
    81                                 /* paranoid */ verify( i < lanes.count );
     81                                i = start + (r % __shard_factor.readyq);
     82                                /* paranoid */ verify( i < lanes_count );
    8283                                // If we can't lock it retry
    83                         } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     84                        } while( !__atomic_try_acquire( &readyQ.data[i].lock ) );
    8485                } else {
    8586                        do {
    86                                 i = __tls_rand() % lanes.count;
    87                         } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     87                                i = __tls_rand() % lanes_count;
     88                        } while( !__atomic_try_acquire( &readyQ.data[i].lock ) );
    8889                }
    8990        } else {
    9091                do {
    9192                        unsigned r = proc->rdq.its++;
    92                         i = proc->rdq.id + (r % __readyq_shard_factor);
    93                         /* paranoid */ verify( i < lanes.count );
     93                        i = proc->rdq.id + (r % __shard_factor.readyq);
     94                        /* paranoid */ verify( i < lanes_count );
    9495                        // If we can't lock it retry
    95                 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     96                } while( !__atomic_try_acquire( &readyQ.data[i].lock ) );
    9697        }
    9798
    9899        // Actually push it
    99         push(lanes.data[i], thrd);
     100        push(readyQ.data[i], thrd);
    100101
    101102        // Unlock and return
    102         __atomic_unlock( &lanes.data[i].lock );
     103        __atomic_unlock( &readyQ.data[i].lock );
    103104
    104105        #if !defined(__CFA_NO_STATISTICS__)
     
    108109}
    109110
    110 static inline unsigned long long calc_cutoff(const unsigned long long ctsc, const processor * proc, __ready_queue_t & rdq) {
    111         unsigned start = proc->rdq.id;
    112         unsigned long long max = 0;
    113         for(i; __readyq_shard_factor) {
    114                 unsigned long long ptsc = ts(rdq.lanes.data[start + i]);
    115                 if(ptsc != -1ull) {
    116                         /* paranoid */ verify( start + i < rdq.lanes.count );
    117                         unsigned long long tsc = moving_average(ctsc, ptsc, rdq.lanes.tscs[start + i].ma);
    118                         if(tsc > max) max = tsc;
    119                 }
    120         }
    121         return (max + 2 * max) / 2;
    122 }
    123 
    124 __attribute__((hot)) struct thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
    125         /* paranoid */ verify( lanes.count > 0 );
     111__attribute__((hot)) struct thread$ * pop_fast(struct cluster * cltr) with (cltr->sched) {
     112        const size_t lanes_count = readyQ.count;
     113
     114        /* paranoid */ verify( __shard_factor.readyq > 0 );
     115        /* paranoid */ verify( lanes_count > 0 );
    126116        /* paranoid */ verify( kernelTLS().this_processor );
    127         /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
     117        /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes_count );
    128118
    129119        processor * const proc = kernelTLS().this_processor;
    130120        unsigned this = proc->rdq.id;
    131         /* paranoid */ verify( this < lanes.count );
     121        /* paranoid */ verify( this < lanes_count );
    132122        __cfadbg_print_safe(ready_queue, "Kernel : pop from %u\n", this);
    133123
     
    140130        // Super important: don't write the same value over and over again
    141131        // We want to maximise our chances that his particular values stays in cache
    142         if(lanes.caches[this / __readyq_shard_factor].id != this_cache)
    143                 __atomic_store_n(&lanes.caches[this / __readyq_shard_factor].id, this_cache, __ATOMIC_RELAXED);
     132        if(caches[this / __shard_factor.readyq].id != this_cache)
     133                __atomic_store_n(&caches[this / __shard_factor.readyq].id, this_cache, __ATOMIC_RELAXED);
    144134
    145135        const unsigned long long ctsc = rdtscl();
     
    148138                uint64_t chaos = __tls_rand();
    149139                unsigned ext = chaos & 0xff;
    150                 unsigned other  = (chaos >> 8) % (lanes.count);
    151 
    152                 if(ext < 3 || __atomic_load_n(&lanes.caches[other / __readyq_shard_factor].id, __ATOMIC_RELAXED) == this_cache) {
     140                unsigned other  = (chaos >> 8) % (lanes_count);
     141
     142                if(ext < 3 || __atomic_load_n(&caches[other / __shard_factor.readyq].id, __ATOMIC_RELAXED) == this_cache) {
    153143                        proc->rdq.target = other;
    154144                }
     
    156146        else {
    157147                const unsigned target = proc->rdq.target;
    158                 __cfadbg_print_safe(ready_queue, "Kernel : %u considering helping %u, tcsc %llu\n", this, target, lanes.tscs[target].tv);
    159                 /* paranoid */ verify( lanes.tscs[target].tv != MAX );
    160                 if(target < lanes.count) {
    161                         const unsigned long long cutoff = calc_cutoff(ctsc, proc, cltr->ready_queue);
    162                         const unsigned long long age = moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma);
     148                __cfadbg_print_safe(ready_queue, "Kernel : %u considering helping %u, tcsc %llu\n", this, target, readyQ.tscs[target].tv);
     149                /* paranoid */ verify( readyQ.tscs[target].tv != MAX );
     150                if(target < lanes_count) {
     151                        const unsigned long long cutoff = calc_cutoff(ctsc, proc, lanes_count, cltr->sched.readyQ.data, cltr->sched.readyQ.tscs, __shard_factor.readyq);
     152                        const unsigned long long age = moving_average(ctsc, readyQ.tscs[target].tv, readyQ.tscs[target].ma);
    163153                        __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no");
    164154                        if(age > cutoff) {
     
    170160        }
    171161
    172         for(__readyq_shard_factor) {
    173                 unsigned i = this + (proc->rdq.itr++ % __readyq_shard_factor);
     162        for(__shard_factor.readyq) {
     163                unsigned i = this + (proc->rdq.itr++ % __shard_factor.readyq);
    174164                if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
    175165        }
     
    179169
    180170}
    181 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
    182         unsigned i = __tls_rand() % lanes.count;
     171__attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) {
     172        unsigned i = __tls_rand() % (cltr->sched.readyQ.count);
    183173        return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
    184174}
     
    195185//-----------------------------------------------------------------------
    196186// try to pop from a lane given by index w
    197 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
    198         /* paranoid */ verify( w < lanes.count );
     187static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->sched) {
     188        const size_t lanes_count = readyQ.count;
     189        /* paranoid */ verify( w < lanes_count );
    199190        __STATS( stats.attempt++; )
    200191
    201192        // Get relevant elements locally
    202         __intrusive_lane_t & lane = lanes.data[w];
     193        __intrusive_lane_t & lane = readyQ.data[w];
    203194
    204195        // If list looks empty retry
     
    236227        if (tsv != MAX) {
    237228                unsigned long long now = rdtscl();
    238                 unsigned long long pma = __atomic_load_n(&lanes.tscs[w].ma, __ATOMIC_RELAXED);
    239                 __atomic_store_n(&lanes.tscs[w].tv, tsv, __ATOMIC_RELAXED);
    240                 __atomic_store_n(&lanes.tscs[w].ma, moving_average(now, tsc_before, pma), __ATOMIC_RELAXED);
    241         }
    242 
    243         thrd->preferred = w / __readyq_shard_factor;
     229                unsigned long long pma = __atomic_load_n(&readyQ.tscs[w].ma, __ATOMIC_RELAXED);
     230                __atomic_store_n(&readyQ.tscs[w].tv, tsv, __ATOMIC_RELAXED);
     231                __atomic_store_n(&readyQ.tscs[w].ma, moving_average(now, tsc_before, pma), __ATOMIC_RELAXED);
     232        }
     233
     234        thrd->preferred = w / __shard_factor.readyq;
    244235
    245236        // return the popped thread
     
    250241// try to pop from any lanes making sure you don't miss any threads push
    251242// before the start of the function
    252 static inline struct thread$ * search(struct cluster * cltr) with (cltr->ready_queue) {
    253         /* paranoid */ verify( lanes.count > 0 );
    254         unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
     243static inline struct thread$ * search(struct cluster * cltr) {
     244        const size_t lanes_count = cltr->sched.readyQ.count;
     245        /* paranoid */ verify( lanes_count > 0 );
     246        unsigned count = __atomic_load_n( &lanes_count, __ATOMIC_RELAXED );
    255247        unsigned offset = __tls_rand();
    256248        for(i; count) {
     
    279271//-----------------------------------------------------------------------
    280272// Given 2 indexes, pick the list with the oldest push an try to pop from it
    281 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
     273static inline struct thread$ * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->sched) {
    282274        // Pick the bet list
    283275        int w = i;
    284         if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
    285                 w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
     276        if( __builtin_expect(!is_empty(readyQ.data[j]), true) ) {
     277                w = (ts(readyQ.data[i]) < ts(readyQ.data[j])) ? i : j;
    286278        }
    287279
Note: See TracChangeset for help on using the changeset viewer.