Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/ready_queue.cfa

    r4479890 r46bbcaf  
    2020
    2121
     22// #define USE_RELAXED_FIFO
     23// #define USE_WORK_STEALING
     24// #define USE_CPU_WORK_STEALING
    2225#define USE_AWARE_STEALING
    2326
    2427#include "bits/defs.hfa"
    2528#include "device/cpu.hfa"
    26 #include "kernel/cluster.hfa"
    27 #include "kernel/private.hfa"
    28 
    29 // #include <errno.h>
    30 // #include <unistd.h>
     29#include "kernel_private.hfa"
     30
     31#include "stdlib.hfa"
     32#include "limits.hfa"
     33#include "math.hfa"
     34
     35#include <errno.h>
     36#include <unistd.h>
     37
     38extern "C" {
     39        #include <sys/syscall.h>  // __NR_xxx
     40}
    3141
    3242#include "ready_subqueue.hfa"
     
    4050#endif
    4151
     52// No overriden function, no environment variable, no define
     53// fall back to a magic number
     54#ifndef __CFA_MAX_PROCESSORS__
     55        #define __CFA_MAX_PROCESSORS__ 1024
     56#endif
     57
     58#if   defined(USE_AWARE_STEALING)
     59        #define READYQ_SHARD_FACTOR 2
     60        #define SEQUENTIAL_SHARD 2
     61#elif defined(USE_CPU_WORK_STEALING)
     62        #define READYQ_SHARD_FACTOR 2
     63#elif defined(USE_RELAXED_FIFO)
     64        #define BIAS 4
     65        #define READYQ_SHARD_FACTOR 4
     66        #define SEQUENTIAL_SHARD 1
     67#elif defined(USE_WORK_STEALING)
     68        #define READYQ_SHARD_FACTOR 2
     69        #define SEQUENTIAL_SHARD 2
     70#else
     71        #error no scheduling strategy selected
     72#endif
     73
    4274static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats));
    4375static inline struct thread$ * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats));
    4476static inline struct thread$ * search(struct cluster * cltr);
     77static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred);
     78
     79
     80// returns the maximum number of processors the RWLock support
     81__attribute__((weak)) unsigned __max_processors() {
     82        const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
     83        if(!max_cores_s) {
     84                __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
     85                return __CFA_MAX_PROCESSORS__;
     86        }
     87
     88        char * endptr = 0p;
     89        long int max_cores_l = strtol(max_cores_s, &endptr, 10);
     90        if(max_cores_l < 1 || max_cores_l > 65535) {
     91                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
     92                return __CFA_MAX_PROCESSORS__;
     93        }
     94        if('\0' != *endptr) {
     95                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
     96                return __CFA_MAX_PROCESSORS__;
     97        }
     98
     99        return max_cores_l;
     100}
     101
     102#if   defined(CFA_HAVE_LINUX_LIBRSEQ)
     103        // No forward declaration needed
     104        #define __kernel_rseq_register rseq_register_current_thread
     105        #define __kernel_rseq_unregister rseq_unregister_current_thread
     106#elif defined(CFA_HAVE_LINUX_RSEQ_H)
     107        static void __kernel_raw_rseq_register  (void);
     108        static void __kernel_raw_rseq_unregister(void);
     109
     110        #define __kernel_rseq_register __kernel_raw_rseq_register
     111        #define __kernel_rseq_unregister __kernel_raw_rseq_unregister
     112#else
     113        // No forward declaration needed
     114        // No initialization needed
     115        static inline void noop(void) {}
     116
     117        #define __kernel_rseq_register noop
     118        #define __kernel_rseq_unregister noop
     119#endif
     120
     121//=======================================================================
     122// Cluster wide reader-writer lock
     123//=======================================================================
     124void  ?{}(__scheduler_RWLock_t & this) {
     125        this.max   = __max_processors();
     126        this.alloc = 0;
     127        this.ready = 0;
     128        this.data  = alloc(this.max);
     129        this.write_lock  = false;
     130
     131        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
     132        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
     133
     134}
     135void ^?{}(__scheduler_RWLock_t & this) {
     136        free(this.data);
     137}
     138
     139
     140//=======================================================================
     141// Lock-Free registering/unregistering of threads
     142unsigned register_proc_id( void ) with(*__scheduler_lock) {
     143        __kernel_rseq_register();
     144
     145        bool * handle = (bool *)&kernelTLS().sched_lock;
     146
     147        // Step - 1 : check if there is already space in the data
     148        uint_fast32_t s = ready;
     149
     150        // Check among all the ready
     151        for(uint_fast32_t i = 0; i < s; i++) {
     152                bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems
     153                /* paranoid */ verify( handle != *cell );
     154
     155                bool * null = 0p; // Re-write every loop since compare thrashes it
     156                if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null
     157                        && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     158                        /* paranoid */ verify(i < ready);
     159                        /* paranoid */ verify( (kernelTLS().sched_id = i, true) );
     160                        return i;
     161                }
     162        }
     163
     164        if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock->max);
     165
     166        // Step - 2 : F&A to get a new spot in the array.
     167        uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
     168        if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock->max);
     169
     170        // Step - 3 : Mark space as used and then publish it.
     171        data[n] = handle;
     172        while() {
     173                unsigned copy = n;
     174                if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
     175                        && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
     176                        break;
     177                Pause();
     178        }
     179
     180        // Return new spot.
     181        /* paranoid */ verify(n < ready);
     182        /* paranoid */ verify( (kernelTLS().sched_id = n, true) );
     183        return n;
     184}
     185
     186void unregister_proc_id( unsigned id ) with(*__scheduler_lock) {
     187        /* paranoid */ verify(id < ready);
     188        /* paranoid */ verify(id == kernelTLS().sched_id);
     189        /* paranoid */ verify(data[id] == &kernelTLS().sched_lock);
     190
     191        bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems
     192
     193        __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
     194
     195        __kernel_rseq_unregister();
     196}
     197
     198//-----------------------------------------------------------------------
     199// Writer side : acquire when changing the ready queue, e.g. adding more
     200//  queues or removing them.
     201uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
     202        /* paranoid */ verify( ! __preemption_enabled() );
     203
     204        // Step 1 : lock global lock
     205        // It is needed to avoid processors that register mid Critical-Section
     206        //   to simply lock their own lock and enter.
     207        __atomic_acquire( &write_lock );
     208
     209        // Make sure we won't deadlock ourself
     210        // Checking before acquiring the writer lock isn't safe
     211        // because someone else could have locked us.
     212        /* paranoid */ verify( ! kernelTLS().sched_lock );
     213
     214        // Step 2 : lock per-proc lock
     215        // Processors that are currently being registered aren't counted
     216        //   but can't be in read_lock or in the critical section.
     217        // All other processors are counted
     218        uint_fast32_t s = ready;
     219        for(uint_fast32_t i = 0; i < s; i++) {
     220                volatile bool * llock = data[i];
     221                if(llock) __atomic_acquire( llock );
     222        }
     223
     224        /* paranoid */ verify( ! __preemption_enabled() );
     225        return s;
     226}
     227
     228void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) {
     229        /* paranoid */ verify( ! __preemption_enabled() );
     230
     231        // Step 1 : release local locks
     232        // This must be done while the global lock is held to avoid
     233        //   threads that where created mid critical section
     234        //   to race to lock their local locks and have the writer
     235        //   immidiately unlock them
     236        // Alternative solution : return s in write_lock and pass it to write_unlock
     237        for(uint_fast32_t i = 0; i < last_s; i++) {
     238                volatile bool * llock = data[i];
     239                if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE);
     240        }
     241
     242        // Step 2 : release global lock
     243        /*paranoid*/ assert(true == write_lock);
     244        __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE);
     245
     246        /* paranoid */ verify( ! __preemption_enabled() );
     247}
     248
     249//=======================================================================
     250// caches handling
     251
     252struct __attribute__((aligned(128))) __ready_queue_caches_t {
     253        // Count States:
     254        // - 0  : No one is looking after this cache
     255        // - 1  : No one is looking after this cache, BUT it's not empty
     256        // - 2+ : At least one processor is looking after this cache
     257        volatile unsigned count;
     258};
     259
     260void  ?{}(__ready_queue_caches_t & this) { this.count = 0; }
     261void ^?{}(__ready_queue_caches_t & this) {}
     262
     263static inline void depart(__ready_queue_caches_t & cache) {
     264        /* paranoid */ verify( cache.count > 1);
     265        __atomic_fetch_add(&cache.count, -1, __ATOMIC_SEQ_CST);
     266        /* paranoid */ verify( cache.count != 0);
     267        /* paranoid */ verify( cache.count < 65536 ); // This verify assumes no cluster will have more than 65000 kernel threads mapped to a single cache, which could be correct but is super weird.
     268}
     269
     270static inline void arrive(__ready_queue_caches_t & cache) {
     271        // for() {
     272        //      unsigned expected = cache.count;
     273        //      unsigned desired  = 0 == expected ? 2 : expected + 1;
     274        // }
     275}
    45276
    46277//=======================================================================
    47278// Cforall Ready Queue used for scheduling
    48279//=======================================================================
    49 // void ?{}(__ready_queue_t & this) with (this) {
    50 //      lanes.data   = 0p;
    51 //      lanes.tscs   = 0p;
    52 //      lanes.caches = 0p;
    53 //      lanes.count  = 0;
    54 // }
    55 
    56 // void ^?{}(__ready_queue_t & this) with (this) {
    57 //      free(lanes.data);
    58 //      free(lanes.tscs);
    59 //      free(lanes.caches);
    60 // }
     280unsigned long long moving_average(unsigned long long currtsc, unsigned long long instsc, unsigned long long old_avg) {
     281        /* paranoid */ verifyf( currtsc < 45000000000000000, "Suspiciously large current time: %'llu (%llx)\n", currtsc, currtsc );
     282        /* paranoid */ verifyf( instsc  < 45000000000000000, "Suspiciously large insert time: %'llu (%llx)\n", instsc, instsc );
     283        /* paranoid */ verifyf( old_avg < 15000000000000, "Suspiciously large previous average: %'llu (%llx)\n", old_avg, old_avg );
     284
     285        const unsigned long long new_val = currtsc > instsc ? currtsc - instsc : 0;
     286        const unsigned long long total_weight = 16;
     287        const unsigned long long new_weight   = 4;
     288        const unsigned long long old_weight = total_weight - new_weight;
     289        const unsigned long long ret = ((new_weight * new_val) + (old_weight * old_avg)) / total_weight;
     290        return ret;
     291}
     292
     293void ?{}(__ready_queue_t & this) with (this) {
     294        #if defined(USE_CPU_WORK_STEALING)
     295                lanes.count = cpu_info.hthrd_count * READYQ_SHARD_FACTOR;
     296                lanes.data = alloc( lanes.count );
     297                lanes.tscs = alloc( lanes.count );
     298                lanes.help = alloc( cpu_info.hthrd_count );
     299
     300                for( idx; (size_t)lanes.count ) {
     301                        (lanes.data[idx]){};
     302                        lanes.tscs[idx].tv = rdtscl();
     303                        lanes.tscs[idx].ma = rdtscl();
     304                }
     305                for( idx; (size_t)cpu_info.hthrd_count ) {
     306                        lanes.help[idx].src = 0;
     307                        lanes.help[idx].dst = 0;
     308                        lanes.help[idx].tri = 0;
     309                }
     310        #else
     311                lanes.data   = 0p;
     312                lanes.tscs   = 0p;
     313                lanes.caches = 0p;
     314                lanes.help   = 0p;
     315                lanes.count  = 0;
     316        #endif
     317}
     318
     319void ^?{}(__ready_queue_t & this) with (this) {
     320        #if !defined(USE_CPU_WORK_STEALING)
     321                verify( SEQUENTIAL_SHARD == lanes.count );
     322        #endif
     323
     324        free(lanes.data);
     325        free(lanes.tscs);
     326        free(lanes.caches);
     327        free(lanes.help);
     328}
    61329
    62330//-----------------------------------------------------------------------
    63 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->sched) {
    64         processor * const proc = kernelTLS().this_processor;
    65         const bool external = (!proc) || (cltr != proc->cltr);
    66         const bool remote   = hint == UNPARK_REMOTE;
    67         const size_t lanes_count = readyQ.count;
    68 
    69         /* paranoid */ verify( __shard_factor.readyq > 0 );
    70         /* paranoid */ verify( lanes_count > 0 );
    71 
    72         unsigned i;
    73         if( external || remote ) {
    74                 // Figure out where thread was last time and make sure it's valid
    75                 /* paranoid */ verify(thrd->preferred >= 0);
    76                 unsigned start = thrd->preferred * __shard_factor.readyq;
    77                 if(start < lanes_count) {
    78                         do {
    79                                 unsigned r = __tls_rand();
    80                                 i = start + (r % __shard_factor.readyq);
    81                                 /* paranoid */ verify( i < lanes_count );
    82                                 // If we can't lock it retry
    83                         } while( !__atomic_try_acquire( &readyQ.data[i].lock ) );
     331#if defined(USE_AWARE_STEALING)
     332        __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
     333                processor * const proc = kernelTLS().this_processor;
     334                const bool external = (!proc) || (cltr != proc->cltr);
     335                const bool remote   = hint == UNPARK_REMOTE;
     336
     337                unsigned i;
     338                if( external || remote ) {
     339                        // Figure out where thread was last time and make sure it's valid
     340                        /* paranoid */ verify(thrd->preferred >= 0);
     341                        if(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count) {
     342                                /* paranoid */ verify(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count);
     343                                unsigned start = thrd->preferred * READYQ_SHARD_FACTOR;
     344                                do {
     345                                        unsigned r = __tls_rand();
     346                                        i = start + (r % READYQ_SHARD_FACTOR);
     347                                        /* paranoid */ verify( i < lanes.count );
     348                                        // If we can't lock it retry
     349                                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     350                        } else {
     351                                do {
     352                                        i = __tls_rand() % lanes.count;
     353                                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     354                        }
    84355                } else {
    85356                        do {
    86                                 i = __tls_rand() % lanes_count;
    87                         } while( !__atomic_try_acquire( &readyQ.data[i].lock ) );
    88                 }
    89         } else {
     357                                unsigned r = proc->rdq.its++;
     358                                i = proc->rdq.id + (r % READYQ_SHARD_FACTOR);
     359                                /* paranoid */ verify( i < lanes.count );
     360                                // If we can't lock it retry
     361                        } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     362                }
     363
     364                // Actually push it
     365                push(lanes.data[i], thrd);
     366
     367                // Unlock and return
     368                __atomic_unlock( &lanes.data[i].lock );
     369
     370                #if !defined(__CFA_NO_STATISTICS__)
     371                        if(unlikely(external || remote)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
     372                        else __tls_stats()->ready.push.local.success++;
     373                #endif
     374        }
     375
     376        static inline unsigned long long calc_cutoff(const unsigned long long ctsc, const processor * proc, __ready_queue_t & rdq) {
     377                unsigned start = proc->rdq.id;
     378                unsigned long long max = 0;
     379                for(i; READYQ_SHARD_FACTOR) {
     380                        unsigned long long ptsc = ts(rdq.lanes.data[start + i]);
     381                        if(ptsc != -1ull) {
     382                                /* paranoid */ verify( start + i < rdq.lanes.count );
     383                                unsigned long long tsc = moving_average(ctsc, ptsc, rdq.lanes.tscs[start + i].ma);
     384                                if(tsc > max) max = tsc;
     385                        }
     386                }
     387                return (max + 2 * max) / 2;
     388        }
     389
     390        __attribute__((hot)) struct thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
     391                /* paranoid */ verify( lanes.count > 0 );
     392                /* paranoid */ verify( kernelTLS().this_processor );
     393                /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
     394
     395                processor * const proc = kernelTLS().this_processor;
     396                unsigned this = proc->rdq.id;
     397                /* paranoid */ verify( this < lanes.count );
     398                __cfadbg_print_safe(ready_queue, "Kernel : pop from %u\n", this);
     399
     400                // Figure out the current cpu and make sure it is valid
     401                const int cpu = __kernel_getcpu();
     402                /* paranoid */ verify(cpu >= 0);
     403                /* paranoid */ verify(cpu < cpu_info.hthrd_count);
     404                unsigned this_cache = cpu_info.llc_map[cpu].cache;
     405
     406                // Super important: don't write the same value over and over again
     407                // We want to maximise our chances that his particular values stays in cache
     408                if(lanes.caches[this / READYQ_SHARD_FACTOR].id != this_cache)
     409                        __atomic_store_n(&lanes.caches[this / READYQ_SHARD_FACTOR].id, this_cache, __ATOMIC_RELAXED);
     410
     411                const unsigned long long ctsc = rdtscl();
     412
     413                if(proc->rdq.target == MAX) {
     414                        uint64_t chaos = __tls_rand();
     415                        unsigned ext = chaos & 0xff;
     416                        unsigned other  = (chaos >> 8) % (lanes.count);
     417
     418                        if(ext < 3 || __atomic_load_n(&lanes.caches[other / READYQ_SHARD_FACTOR].id, __ATOMIC_RELAXED) == this_cache) {
     419                                proc->rdq.target = other;
     420                        }
     421                }
     422                else {
     423                        const unsigned target = proc->rdq.target;
     424                        __cfadbg_print_safe(ready_queue, "Kernel : %u considering helping %u, tcsc %llu\n", this, target, lanes.tscs[target].tv);
     425                        /* paranoid */ verify( lanes.tscs[target].tv != MAX );
     426                        if(target < lanes.count) {
     427                                const unsigned long long cutoff = calc_cutoff(ctsc, proc, cltr->ready_queue);
     428                                const unsigned long long age = moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma);
     429                                __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no");
     430                                if(age > cutoff) {
     431                                        thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
     432                                        if(t) return t;
     433                                }
     434                        }
     435                        proc->rdq.target = MAX;
     436                }
     437
     438                for(READYQ_SHARD_FACTOR) {
     439                        unsigned i = this + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
     440                        if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
     441                }
     442
     443                // All lanes where empty return 0p
     444                return 0p;
     445
     446        }
     447        __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
     448                unsigned i = __tls_rand() % lanes.count;
     449                return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
     450        }
     451        __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) {
     452                return search(cltr);
     453        }
     454#endif
     455#if defined(USE_CPU_WORK_STEALING)
     456        __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
     457                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
     458
     459                processor * const proc = kernelTLS().this_processor;
     460                const bool external = (!proc) || (cltr != proc->cltr);
     461
     462                // Figure out the current cpu and make sure it is valid
     463                const int cpu = __kernel_getcpu();
     464                /* paranoid */ verify(cpu >= 0);
     465                /* paranoid */ verify(cpu < cpu_info.hthrd_count);
     466                /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);
     467
     468                // Figure out where thread was last time and make sure it's
     469                /* paranoid */ verify(thrd->preferred >= 0);
     470                /* paranoid */ verify(thrd->preferred < cpu_info.hthrd_count);
     471                /* paranoid */ verify(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count);
     472                const int prf = thrd->preferred * READYQ_SHARD_FACTOR;
     473
     474                const cpu_map_entry_t & map;
     475                choose(hint) {
     476                        case UNPARK_LOCAL : &map = &cpu_info.llc_map[cpu];
     477                        case UNPARK_REMOTE: &map = &cpu_info.llc_map[prf];
     478                }
     479                /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count);
     480                /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count);
     481                /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %zu lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR);
     482
     483                const int start = map.self * READYQ_SHARD_FACTOR;
     484                unsigned i;
    90485                do {
    91                         unsigned r = proc->rdq.its++;
    92                         i = proc->rdq.id + (r % __shard_factor.readyq);
    93                         /* paranoid */ verify( i < lanes_count );
     486                        unsigned r;
     487                        if(unlikely(external)) { r = __tls_rand(); }
     488                        else { r = proc->rdq.its++; }
     489                        choose(hint) {
     490                                case UNPARK_LOCAL : i = start + (r % READYQ_SHARD_FACTOR);
     491                                case UNPARK_REMOTE: i = prf   + (r % READYQ_SHARD_FACTOR);
     492                        }
    94493                        // If we can't lock it retry
    95                 } while( !__atomic_try_acquire( &readyQ.data[i].lock ) );
    96         }
    97 
    98         // Actually push it
    99         push(readyQ.data[i], thrd);
    100 
    101         // Unlock and return
    102         __atomic_unlock( &readyQ.data[i].lock );
    103 
    104         #if !defined(__CFA_NO_STATISTICS__)
    105                 if(unlikely(external || remote)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
    106                 else __tls_stats()->ready.push.local.success++;
    107         #endif
    108 }
    109 
    110 __attribute__((hot)) struct thread$ * pop_fast(struct cluster * cltr) with (cltr->sched) {
    111         const size_t lanes_count = readyQ.count;
    112 
    113         /* paranoid */ verify( __shard_factor.readyq > 0 );
    114         /* paranoid */ verify( lanes_count > 0 );
    115         /* paranoid */ verify( kernelTLS().this_processor );
    116         /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes_count );
    117 
    118         processor * const proc = kernelTLS().this_processor;
    119         unsigned this = proc->rdq.id;
    120         /* paranoid */ verify( this < lanes_count );
    121         __cfadbg_print_safe(ready_queue, "Kernel : pop from %u\n", this);
    122 
    123         // Figure out the current cache is
    124         const unsigned this_cache = cache_id(cltr, this / __shard_factor.readyq);
    125         const unsigned long long ctsc = rdtscl();
    126 
    127         if(proc->rdq.target == MAX) {
    128                 uint64_t chaos = __tls_rand();
    129                 unsigned ext = chaos & 0xff;
    130                 unsigned other  = (chaos >> 8) % (lanes_count);
    131 
    132                 if(ext < 3 || __atomic_load_n(&caches[other / __shard_factor.readyq].id, __ATOMIC_RELAXED) == this_cache) {
    133                         proc->rdq.target = other;
    134                 }
    135         }
    136         else {
    137                 const unsigned target = proc->rdq.target;
    138                 __cfadbg_print_safe(ready_queue, "Kernel : %u considering helping %u, tcsc %llu\n", this, target, readyQ.tscs[target].tv);
    139                 /* paranoid */ verify( readyQ.tscs[target].tv != MAX );
    140                 if(target < lanes_count) {
    141                         const unsigned long long cutoff = calc_cutoff(ctsc, proc->rdq.id, lanes_count, cltr->sched.readyQ.data, cltr->sched.readyQ.tscs, __shard_factor.readyq);
    142                         const unsigned long long age = moving_average(ctsc, readyQ.tscs[target].tv, readyQ.tscs[target].ma);
    143                         __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no");
    144                         if(age > cutoff) {
     494                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     495
     496                // Actually push it
     497                push(lanes.data[i], thrd);
     498
     499                // Unlock and return
     500                __atomic_unlock( &lanes.data[i].lock );
     501
     502                #if !defined(__CFA_NO_STATISTICS__)
     503                        if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
     504                        else __tls_stats()->ready.push.local.success++;
     505                #endif
     506
     507                __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
     508
     509        }
     510
     511        // Pop from the ready queue from a given cluster
     512        __attribute__((hot)) thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
     513                /* paranoid */ verify( lanes.count > 0 );
     514                /* paranoid */ verify( kernelTLS().this_processor );
     515
     516                processor * const proc = kernelTLS().this_processor;
     517                const int cpu = __kernel_getcpu();
     518                /* paranoid */ verify(cpu >= 0);
     519                /* paranoid */ verify(cpu < cpu_info.hthrd_count);
     520                /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);
     521
     522                const cpu_map_entry_t & map = cpu_info.llc_map[cpu];
     523                /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count);
     524                /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count);
     525                /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %zu lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR);
     526
     527                const int start = map.self * READYQ_SHARD_FACTOR;
     528                const unsigned long long ctsc = rdtscl();
     529
     530                // Did we already have a help target
     531                if(proc->rdq.target == MAX) {
     532                        unsigned long long max = 0;
     533                        for(i; READYQ_SHARD_FACTOR) {
     534                                unsigned long long tsc = moving_average(ctsc, ts(lanes.data[start + i]), lanes.tscs[start + i].ma);
     535                                if(tsc > max) max = tsc;
     536                        }
     537                        //  proc->rdq.cutoff = (max + 2 * max) / 2;
     538                        /* paranoid */ verify(lanes.count < 65536); // The following code assumes max 65536 cores.
     539                        /* paranoid */ verify(map.count < 65536); // The following code assumes max 65536 cores.
     540
     541                        if(0 == (__tls_rand() % 100)) {
     542                                proc->rdq.target = __tls_rand() % lanes.count;
     543                        } else {
     544                                unsigned cpu_chaos = map.start + (__tls_rand() % map.count);
     545                                proc->rdq.target = (cpu_chaos * READYQ_SHARD_FACTOR) + (__tls_rand() % READYQ_SHARD_FACTOR);
     546                                /* paranoid */ verify(proc->rdq.target >= (map.start * READYQ_SHARD_FACTOR));
     547                                /* paranoid */ verify(proc->rdq.target <  ((map.start + map.count) * READYQ_SHARD_FACTOR));
     548                        }
     549
     550                        /* paranoid */ verify(proc->rdq.target != MAX);
     551                }
     552                else {
     553                        unsigned long long max = 0;
     554                        for(i; READYQ_SHARD_FACTOR) {
     555                                unsigned long long tsc = moving_average(ctsc, ts(lanes.data[start + i]), lanes.tscs[start + i].ma);
     556                                if(tsc > max) max = tsc;
     557                        }
     558                        const unsigned long long cutoff = (max + 2 * max) / 2;
     559                        {
     560                                unsigned target = proc->rdq.target;
     561                                proc->rdq.target = MAX;
     562                                lanes.help[target / READYQ_SHARD_FACTOR].tri++;
     563                                if(moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) {
     564                                        thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
     565                                        proc->rdq.last = target;
     566                                        if(t) return t;
     567                                }
     568                                proc->rdq.target = MAX;
     569                        }
     570
     571                        unsigned last = proc->rdq.last;
     572                        if(last != MAX && moving_average(ctsc, lanes.tscs[last].tv, lanes.tscs[last].ma) > cutoff) {
     573                                thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help));
     574                                if(t) return t;
     575                        }
     576                        else {
     577                                proc->rdq.last = MAX;
     578                        }
     579                }
     580
     581                for(READYQ_SHARD_FACTOR) {
     582                        unsigned i = start + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
     583                        if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
     584                }
     585
     586                // All lanes where empty return 0p
     587                return 0p;
     588        }
     589
     590        __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
     591                processor * const proc = kernelTLS().this_processor;
     592                unsigned last = proc->rdq.last;
     593                if(last != MAX) {
     594                        struct thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.steal));
     595                        if(t) return t;
     596                        proc->rdq.last = MAX;
     597                }
     598
     599                unsigned i = __tls_rand() % lanes.count;
     600                return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
     601        }
     602        __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) {
     603                return search(cltr);
     604        }
     605#endif
     606#if defined(USE_RELAXED_FIFO)
     607        //-----------------------------------------------------------------------
     608        // get index from random number with or without bias towards queues
     609        static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred) {
     610                unsigned i;
     611                bool local;
     612                unsigned rlow  = r % BIAS;
     613                unsigned rhigh = r / BIAS;
     614                if((0 != rlow) && preferred >= 0) {
     615                        // (BIAS - 1) out of BIAS chances
     616                        // Use perferred queues
     617                        i = preferred + (rhigh % READYQ_SHARD_FACTOR);
     618                        local = true;
     619                }
     620                else {
     621                        // 1 out of BIAS chances
     622                        // Use all queues
     623                        i = rhigh;
     624                        local = false;
     625                }
     626                return [i, local];
     627        }
     628
     629        __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
     630                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
     631
     632                const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
     633                /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
     634
     635                bool local;
     636                int preferred = external ? -1 : kernelTLS().this_processor->rdq.id;
     637
     638                // Try to pick a lane and lock it
     639                unsigned i;
     640                do {
     641                        // Pick the index of a lane
     642                        unsigned r = __tls_rand_fwd();
     643                        [i, local] = idx_from_r(r, preferred);
     644
     645                        i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
     646
     647                        #if !defined(__CFA_NO_STATISTICS__)
     648                                if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED);
     649                                else if(local) __tls_stats()->ready.push.local.attempt++;
     650                                else __tls_stats()->ready.push.share.attempt++;
     651                        #endif
     652
     653                        // If we can't lock it retry
     654                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     655
     656                // Actually push it
     657                push(lanes.data[i], thrd);
     658
     659                // Unlock and return
     660                __atomic_unlock( &lanes.data[i].lock );
     661
     662                // Mark the current index in the tls rng instance as having an item
     663                __tls_rand_advance_bck();
     664
     665                __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
     666
     667                // Update statistics
     668                #if !defined(__CFA_NO_STATISTICS__)
     669                        if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
     670                        else if(local) __tls_stats()->ready.push.local.success++;
     671                        else __tls_stats()->ready.push.share.success++;
     672                #endif
     673        }
     674
     675        // Pop from the ready queue from a given cluster
     676        __attribute__((hot)) thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
     677                /* paranoid */ verify( lanes.count > 0 );
     678                /* paranoid */ verify( kernelTLS().this_processor );
     679                /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
     680
     681                unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
     682                int preferred = kernelTLS().this_processor->rdq.id;
     683
     684
     685                // As long as the list is not empty, try finding a lane that isn't empty and pop from it
     686                for(25) {
     687                        // Pick two lists at random
     688                        unsigned ri = __tls_rand_bck();
     689                        unsigned rj = __tls_rand_bck();
     690
     691                        unsigned i, j;
     692                        __attribute__((unused)) bool locali, localj;
     693                        [i, locali] = idx_from_r(ri, preferred);
     694                        [j, localj] = idx_from_r(rj, preferred);
     695
     696                        i %= count;
     697                        j %= count;
     698
     699                        // try popping from the 2 picked lists
     700                        struct thread$ * thrd = try_pop(cltr, i, j __STATS(, *(locali || localj ? &__tls_stats()->ready.pop.local : &__tls_stats()->ready.pop.help)));
     701                        if(thrd) {
     702                                return thrd;
     703                        }
     704                }
     705
     706                // All lanes where empty return 0p
     707                return 0p;
     708        }
     709
     710        __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) { return pop_fast(cltr); }
     711        __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) {
     712                return search(cltr);
     713        }
     714#endif
     715#if defined(USE_WORK_STEALING)
     716        __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
     717                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
     718
     719                // #define USE_PREFERRED
     720                #if !defined(USE_PREFERRED)
     721                const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
     722                /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
     723                #else
     724                        unsigned preferred = thrd->preferred;
     725                        const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == MAX || thrd->curr_cluster != cltr;
     726                        /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count );
     727
     728                        unsigned r = preferred % READYQ_SHARD_FACTOR;
     729                        const unsigned start = preferred - r;
     730                #endif
     731
     732                // Try to pick a lane and lock it
     733                unsigned i;
     734                do {
     735                        #if !defined(__CFA_NO_STATISTICS__)
     736                                if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED);
     737                                else __tls_stats()->ready.push.local.attempt++;
     738                        #endif
     739
     740                        if(unlikely(external)) {
     741                                i = __tls_rand() % lanes.count;
     742                        }
     743                        else {
     744                                #if !defined(USE_PREFERRED)
     745                                        processor * proc = kernelTLS().this_processor;
     746                                        unsigned r = proc->rdq.its++;
     747                                        i =  proc->rdq.id + (r % READYQ_SHARD_FACTOR);
     748                                #else
     749                                        i = start + (r++ % READYQ_SHARD_FACTOR);
     750                                #endif
     751                        }
     752                        // If we can't lock it retry
     753                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     754
     755                // Actually push it
     756                push(lanes.data[i], thrd);
     757
     758                // Unlock and return
     759                __atomic_unlock( &lanes.data[i].lock );
     760
     761                #if !defined(__CFA_NO_STATISTICS__)
     762                        if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
     763                        else __tls_stats()->ready.push.local.success++;
     764                #endif
     765
     766                __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
     767        }
     768
     769        // Pop from the ready queue from a given cluster
     770        __attribute__((hot)) thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
     771                /* paranoid */ verify( lanes.count > 0 );
     772                /* paranoid */ verify( kernelTLS().this_processor );
     773                /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
     774
     775                processor * proc = kernelTLS().this_processor;
     776
     777                if(proc->rdq.target == MAX) {
     778                        unsigned long long min = ts(lanes.data[proc->rdq.id]);
     779                        for(int i = 0; i < READYQ_SHARD_FACTOR; i++) {
     780                                unsigned long long tsc = ts(lanes.data[proc->rdq.id + i]);
     781                                if(tsc < min) min = tsc;
     782                        }
     783                        proc->rdq.cutoff = min;
     784                        proc->rdq.target = __tls_rand() % lanes.count;
     785                }
     786                else {
     787                        unsigned target = proc->rdq.target;
     788                        proc->rdq.target = MAX;
     789                        const unsigned long long bias = 0; //2_500_000_000;
     790                        const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
     791                        if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) {
    145792                                thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
    146793                                if(t) return t;
    147794                        }
    148795                }
    149                 proc->rdq.target = MAX;
    150         }
    151 
    152         for(__shard_factor.readyq) {
    153                 unsigned i = this + (proc->rdq.itr++ % __shard_factor.readyq);
    154                 if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
    155         }
    156 
    157         // All lanes where empty return 0p
    158         return 0p;
    159 
    160 }
    161 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) {
    162         unsigned i = __tls_rand() % (cltr->sched.readyQ.count);
    163         return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
    164 }
    165 __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) {
    166         return search(cltr);
    167 }
     796
     797                for(READYQ_SHARD_FACTOR) {
     798                        unsigned i = proc->rdq.id + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
     799                        if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
     800                }
     801                return 0p;
     802        }
     803
     804        __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
     805                unsigned i = __tls_rand() % lanes.count;
     806                return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
     807        }
     808
     809        __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) with (cltr->ready_queue) {
     810                return search(cltr);
     811        }
     812#endif
    168813
    169814//=======================================================================
     
    175820//-----------------------------------------------------------------------
    176821// try to pop from a lane given by index w
    177 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->sched) {
    178         /* paranoid */ verify( w < readyQ.count );
     822static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
     823        /* paranoid */ verify( w < lanes.count );
    179824        __STATS( stats.attempt++; )
    180825
    181826        // Get relevant elements locally
    182         __intrusive_lane_t & lane = readyQ.data[w];
     827        __intrusive_lane_t & lane = lanes.data[w];
    183828
    184829        // If list looks empty retry
     
    200845        // Actually pop the list
    201846        struct thread$ * thrd;
    202         unsigned long long ts_prev = ts(lane);
    203         unsigned long long ts_next;
    204         [thrd, ts_next] = pop(lane);
     847        #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING)
     848                unsigned long long tsc_before = ts(lane);
     849        #endif
     850        unsigned long long tsv;
     851        [thrd, tsv] = pop(lane);
    205852
    206853        /* paranoid */ verify(thrd);
    207         /* paranoid */ verify(ts_next);
     854        /* paranoid */ verify(tsv);
    208855        /* paranoid */ verify(lane.lock);
    209856
     
    214861        __STATS( stats.success++; )
    215862
    216         touch_tsc(readyQ.tscs, w, ts_prev, ts_next);
    217 
    218         thrd->preferred = w / __shard_factor.readyq;
     863        #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING)
     864                if (tsv != MAX) {
     865                        unsigned long long now = rdtscl();
     866                        unsigned long long pma = __atomic_load_n(&lanes.tscs[w].ma, __ATOMIC_RELAXED);
     867                        __atomic_store_n(&lanes.tscs[w].tv, tsv, __ATOMIC_RELAXED);
     868                        __atomic_store_n(&lanes.tscs[w].ma, moving_average(now, tsc_before, pma), __ATOMIC_RELAXED);
     869                }
     870        #endif
     871
     872        #if defined(USE_AWARE_STEALING) || defined(USE_CPU_WORK_STEALING)
     873                thrd->preferred = w / READYQ_SHARD_FACTOR;
     874        #else
     875                thrd->preferred = w;
     876        #endif
    219877
    220878        // return the popped thread
     
    225883// try to pop from any lanes making sure you don't miss any threads push
    226884// before the start of the function
    227 static inline struct thread$ * search(struct cluster * cltr) {
    228         const size_t lanes_count = cltr->sched.readyQ.count;
    229         /* paranoid */ verify( lanes_count > 0 );
    230         unsigned count = __atomic_load_n( &lanes_count, __ATOMIC_RELAXED );
     885static inline struct thread$ * search(struct cluster * cltr) with (cltr->ready_queue) {
     886        /* paranoid */ verify( lanes.count > 0 );
     887        unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
    231888        unsigned offset = __tls_rand();
    232889        for(i; count) {
     
    245902// get preferred ready for new thread
    246903unsigned ready_queue_new_preferred() {
    247         unsigned pref = MAX;
     904        unsigned pref = 0;
    248905        if(struct thread$ * thrd = publicTLS_get( this_thread )) {
    249906                pref = thrd->preferred;
    250907        }
     908        else {
     909                #if defined(USE_CPU_WORK_STEALING)
     910                        pref = __kernel_getcpu();
     911                #endif
     912        }
     913
     914        #if defined(USE_CPU_WORK_STEALING)
     915                /* paranoid */ verify(pref >= 0);
     916                /* paranoid */ verify(pref < cpu_info.hthrd_count);
     917        #endif
    251918
    252919        return pref;
     920}
     921
     922//-----------------------------------------------------------------------
     923// Check that all the intrusive queues in the data structure are still consistent
     924static void check( __ready_queue_t & q ) with (q) {
     925        #if defined(__CFA_WITH_VERIFY__)
     926                {
     927                        for( idx ; lanes.count ) {
     928                                __intrusive_lane_t & sl = lanes.data[idx];
     929                                assert(!lanes.data[idx].lock);
     930
     931                                        if(is_empty(sl)) {
     932                                                assert( sl.anchor.next == 0p );
     933                                                assert( sl.anchor.ts   == -1llu );
     934                                                assert( mock_head(sl)  == sl.prev );
     935                                        } else {
     936                                                assert( sl.anchor.next != 0p );
     937                                                assert( sl.anchor.ts   != -1llu );
     938                                                assert( mock_head(sl)  != sl.prev );
     939                                        }
     940                        }
     941                }
     942        #endif
    253943}
    254944
    255945//-----------------------------------------------------------------------
    256946// Given 2 indexes, pick the list with the oldest push an try to pop from it
    257 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->sched) {
     947static inline struct thread$ * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
    258948        // Pick the bet list
    259949        int w = i;
    260         if( __builtin_expect(!is_empty(readyQ.data[j]), true) ) {
    261                 w = (ts(readyQ.data[i]) < ts(readyQ.data[j])) ? i : j;
     950        if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
     951                w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
    262952        }
    263953
    264954        return try_pop(cltr, w __STATS(, stats));
    265955}
     956
     957// Call this function of the intrusive list was moved using memcpy
     958// fixes the list so that the pointers back to anchors aren't left dangling
     959static inline void fix(__intrusive_lane_t & ll) {
     960                        if(is_empty(ll)) {
     961                                verify(ll.anchor.next == 0p);
     962                                ll.prev = mock_head(ll);
     963                        }
     964}
     965
     966static void assign_list(unsigned & value, dlist(processor) & list, unsigned count) {
     967        processor * it = &list`first;
     968        for(unsigned i = 0; i < count; i++) {
     969                /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
     970                it->rdq.id = value;
     971                it->rdq.target = MAX;
     972                value += READYQ_SHARD_FACTOR;
     973                it = &(*it)`next;
     974        }
     975}
     976
     977static void reassign_cltr_id(struct cluster * cltr) {
     978        unsigned preferred = 0;
     979        assign_list(preferred, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
     980        assign_list(preferred, cltr->procs.idles  , cltr->procs.idle );
     981}
     982
     983static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) {
     984        #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING)
     985                lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
     986                for(i; lanes.count) {
     987                        lanes.tscs[i].tv = rdtscl();
     988                        lanes.tscs[i].ma = 0;
     989                }
     990        #endif
     991}
     992
     993#if defined(USE_CPU_WORK_STEALING)
     994        // ready_queue size is fixed in this case
     995        void ready_queue_grow(struct cluster * cltr) {}
     996        void ready_queue_shrink(struct cluster * cltr) {}
     997#else
     998        // Grow the ready queue
     999        void ready_queue_grow(struct cluster * cltr) {
     1000                size_t ncount;
     1001                int target = cltr->procs.total;
     1002
     1003                /* paranoid */ verify( ready_mutate_islocked() );
     1004                __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
     1005
     1006                // Make sure that everything is consistent
     1007                /* paranoid */ check( cltr->ready_queue );
     1008
     1009                // grow the ready queue
     1010                with( cltr->ready_queue ) {
     1011                        // Find new count
     1012                        // Make sure we always have atleast 1 list
     1013                        if(target >= 2) {
     1014                                ncount = target * READYQ_SHARD_FACTOR;
     1015                        } else {
     1016                                ncount = SEQUENTIAL_SHARD;
     1017                        }
     1018
     1019                        // Allocate new array (uses realloc and memcpies the data)
     1020                        lanes.data = alloc( ncount, lanes.data`realloc );
     1021
     1022                        // Fix the moved data
     1023                        for( idx; (size_t)lanes.count ) {
     1024                                fix(lanes.data[idx]);
     1025                        }
     1026
     1027                        // Construct new data
     1028                        for( idx; (size_t)lanes.count ~ ncount) {
     1029                                (lanes.data[idx]){};
     1030                        }
     1031
     1032                        // Update original
     1033                        lanes.count = ncount;
     1034
     1035                        lanes.caches = alloc( target, lanes.caches`realloc );
     1036                }
     1037
     1038                fix_times(cltr);
     1039
     1040                reassign_cltr_id(cltr);
     1041
     1042                // Make sure that everything is consistent
     1043                /* paranoid */ check( cltr->ready_queue );
     1044
     1045                __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
     1046
     1047                /* paranoid */ verify( ready_mutate_islocked() );
     1048        }
     1049
     1050        // Shrink the ready queue
     1051        void ready_queue_shrink(struct cluster * cltr) {
     1052                /* paranoid */ verify( ready_mutate_islocked() );
     1053                __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
     1054
     1055                // Make sure that everything is consistent
     1056                /* paranoid */ check( cltr->ready_queue );
     1057
     1058                int target = cltr->procs.total;
     1059
     1060                with( cltr->ready_queue ) {
     1061                        // Remember old count
     1062                        size_t ocount = lanes.count;
     1063
     1064                        // Find new count
     1065                        // Make sure we always have atleast 1 list
     1066                        lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD;
     1067                        /* paranoid */ verify( ocount >= lanes.count );
     1068                        /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 );
     1069
     1070                        // for printing count the number of displaced threads
     1071                        #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
     1072                                __attribute__((unused)) size_t displaced = 0;
     1073                        #endif
     1074
     1075                        // redistribute old data
     1076                        for( idx; (size_t)lanes.count ~ ocount) {
     1077                                // Lock is not strictly needed but makes checking invariants much easier
     1078                                __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
     1079                                verify(locked);
     1080
     1081                                // As long as we can pop from this lane to push the threads somewhere else in the queue
     1082                                while(!is_empty(lanes.data[idx])) {
     1083                                        struct thread$ * thrd;
     1084                                        unsigned long long _;
     1085                                        [thrd, _] = pop(lanes.data[idx]);
     1086
     1087                                        push(cltr, thrd, true);
     1088
     1089                                        // for printing count the number of displaced threads
     1090                                        #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
     1091                                                displaced++;
     1092                                        #endif
     1093                                }
     1094
     1095                                // Unlock the lane
     1096                                __atomic_unlock(&lanes.data[idx].lock);
     1097
     1098                                // TODO print the queue statistics here
     1099
     1100                                ^(lanes.data[idx]){};
     1101                        }
     1102
     1103                        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
     1104
     1105                        // Allocate new array (uses realloc and memcpies the data)
     1106                        lanes.data = alloc( lanes.count, lanes.data`realloc );
     1107
     1108                        // Fix the moved data
     1109                        for( idx; (size_t)lanes.count ) {
     1110                                fix(lanes.data[idx]);
     1111                        }
     1112
     1113                        lanes.caches = alloc( target, lanes.caches`realloc );
     1114                }
     1115
     1116                fix_times(cltr);
     1117
     1118
     1119                reassign_cltr_id(cltr);
     1120
     1121                // Make sure that everything is consistent
     1122                /* paranoid */ check( cltr->ready_queue );
     1123
     1124                __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
     1125                /* paranoid */ verify( ready_mutate_islocked() );
     1126        }
     1127#endif
     1128
     1129#if !defined(__CFA_NO_STATISTICS__)
     1130        unsigned cnt(const __ready_queue_t & this, unsigned idx) {
     1131                /* paranoid */ verify(this.lanes.count > idx);
     1132                return this.lanes.data[idx].cnt;
     1133        }
     1134#endif
     1135
     1136
     1137#if   defined(CFA_HAVE_LINUX_LIBRSEQ)
     1138        // No definition needed
     1139#elif defined(CFA_HAVE_LINUX_RSEQ_H)
     1140
     1141        #if defined( __x86_64 ) || defined( __i386 )
     1142                #define RSEQ_SIG        0x53053053
     1143        #elif defined( __ARM_ARCH )
     1144                #ifdef __ARMEB__
     1145                #define RSEQ_SIG    0xf3def5e7      /* udf    #24035    ; 0x5de3 (ARMv6+) */
     1146                #else
     1147                #define RSEQ_SIG    0xe7f5def3      /* udf    #24035    ; 0x5de3 */
     1148                #endif
     1149        #endif
     1150
     1151        extern void __disable_interrupts_hard();
     1152        extern void __enable_interrupts_hard();
     1153
     1154        static void __kernel_raw_rseq_register  (void) {
     1155                /* paranoid */ verify( __cfaabi_rseq.cpu_id == RSEQ_CPU_ID_UNINITIALIZED );
     1156
     1157                // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, (sigset_t *)0p, _NSIG / 8);
     1158                int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, RSEQ_SIG);
     1159                if(ret != 0) {
     1160                        int e = errno;
     1161                        switch(e) {
     1162                        case EINVAL: abort("KERNEL ERROR: rseq register invalid argument");
     1163                        case ENOSYS: abort("KERNEL ERROR: rseq register no supported");
     1164                        case EFAULT: abort("KERNEL ERROR: rseq register with invalid argument");
     1165                        case EBUSY : abort("KERNEL ERROR: rseq register already registered");
     1166                        case EPERM : abort("KERNEL ERROR: rseq register sig  argument  on unregistration does not match the signature received on registration");
     1167                        default: abort("KERNEL ERROR: rseq register unexpected return %d", e);
     1168                        }
     1169                }
     1170        }
     1171
     1172        static void __kernel_raw_rseq_unregister(void) {
     1173                /* paranoid */ verify( __cfaabi_rseq.cpu_id >= 0 );
     1174
     1175                // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, (sigset_t *)0p, _NSIG / 8);
     1176                int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, RSEQ_SIG);
     1177                if(ret != 0) {
     1178                        int e = errno;
     1179                        switch(e) {
     1180                        case EINVAL: abort("KERNEL ERROR: rseq unregister invalid argument");
     1181                        case ENOSYS: abort("KERNEL ERROR: rseq unregister no supported");
     1182                        case EFAULT: abort("KERNEL ERROR: rseq unregister with invalid argument");
     1183                        case EBUSY : abort("KERNEL ERROR: rseq unregister already registered");
     1184                        case EPERM : abort("KERNEL ERROR: rseq unregister sig  argument  on unregistration does not match the signature received on registration");
     1185                        default: abort("KERNEL ERROR: rseq unregisteunexpected return %d", e);
     1186                        }
     1187                }
     1188        }
     1189#else
     1190        // No definition needed
     1191#endif
Note: See TracChangeset for help on using the changeset viewer.