Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/ready_queue.cfa

    r0fb3ee5 r75c7252  
    2020
    2121
    22 // #define USE_RELAXED_FIFO
     22#define USE_RELAXED_FIFO
    2323// #define USE_WORK_STEALING
    2424// #define USE_CPU_WORK_STEALING
    25 #define USE_AWARE_STEALING
    2625
    2726#include "bits/defs.hfa"
     
    3029
    3130#include "stdlib.hfa"
    32 #include "limits.hfa"
    3331#include "math.hfa"
    3432
     
    5654#endif
    5755
    58 #if   defined(USE_AWARE_STEALING)
    59         #define READYQ_SHARD_FACTOR 2
    60         #define SEQUENTIAL_SHARD 2
    61 #elif defined(USE_CPU_WORK_STEALING)
     56#if   defined(USE_CPU_WORK_STEALING)
    6257        #define READYQ_SHARD_FACTOR 2
    6358#elif defined(USE_RELAXED_FIFO)
     
    143138        __kernel_rseq_register();
    144139
     140        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
    145141        bool * handle = (bool *)&kernelTLS().sched_lock;
    146142
     
    178174        }
    179175
     176        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);
     177
    180178        // Return new spot.
    181179        /* paranoid */ verify(n < ready);
     
    192190
    193191        __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
     192
     193        __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
    194194
    195195        __kernel_rseq_unregister();
     
    244244
    245245//=======================================================================
    246 // caches handling
    247 
    248 struct __attribute__((aligned(128))) __ready_queue_caches_t {
    249         // Count States:
    250         // - 0  : No one is looking after this cache
    251         // - 1  : No one is looking after this cache, BUT it's not empty
    252         // - 2+ : At least one processor is looking after this cache
    253         volatile unsigned count;
    254 };
    255 
    256 void  ?{}(__ready_queue_caches_t & this) { this.count = 0; }
    257 void ^?{}(__ready_queue_caches_t & this) {}
    258 
    259 static inline void depart(__ready_queue_caches_t & cache) {
    260         /* paranoid */ verify( cache.count > 1);
    261         __atomic_fetch_add(&cache.count, -1, __ATOMIC_SEQ_CST);
    262         /* paranoid */ verify( cache.count != 0);
    263         /* paranoid */ verify( cache.count < 65536 ); // This verify assumes no cluster will have more than 65000 kernel threads mapped to a single cache, which could be correct but is super weird.
    264 }
    265 
    266 static inline void arrive(__ready_queue_caches_t & cache) {
    267         // for() {
    268         //      unsigned expected = cache.count;
    269         //      unsigned desired  = 0 == expected ? 2 : expected + 1;
    270         // }
    271 }
    272 
    273 //=======================================================================
    274246// Cforall Ready Queue used for scheduling
    275247//=======================================================================
    276 unsigned long long moving_average(unsigned long long currtsc, unsigned long long instsc, unsigned long long old_avg) {
    277         /* paranoid */ verifyf( currtsc < 45000000000000000, "Suspiciously large current time: %'llu (%llx)\n", currtsc, currtsc );
    278         /* paranoid */ verifyf( instsc  < 45000000000000000, "Suspiciously large insert time: %'llu (%llx)\n", instsc, instsc );
    279         /* paranoid */ verifyf( old_avg < 15000000000000, "Suspiciously large previous average: %'llu (%llx)\n", old_avg, old_avg );
    280 
    281         const unsigned long long new_val = currtsc > instsc ? currtsc - instsc : 0;
    282         const unsigned long long total_weight = 16;
    283         const unsigned long long new_weight   = 4;
    284         const unsigned long long old_weight = total_weight - new_weight;
    285         const unsigned long long ret = ((new_weight * new_val) + (old_weight * old_avg)) / total_weight;
    286         return ret;
     248unsigned long long moving_average(unsigned long long nval, unsigned long long oval) {
     249        const unsigned long long tw = 16;
     250        const unsigned long long nw = 4;
     251        const unsigned long long ow = tw - nw;
     252        return ((nw * nval) + (ow * oval)) / tw;
    287253}
    288254
     
    305271                }
    306272        #else
    307                 lanes.data   = 0p;
    308                 lanes.tscs   = 0p;
    309                 lanes.caches = 0p;
    310                 lanes.help   = 0p;
    311                 lanes.count  = 0;
     273                lanes.data  = 0p;
     274                lanes.tscs  = 0p;
     275                lanes.help  = 0p;
     276                lanes.count = 0;
    312277        #endif
    313278}
     
    320285        free(lanes.data);
    321286        free(lanes.tscs);
    322         free(lanes.caches);
    323287        free(lanes.help);
    324288}
    325289
    326290//-----------------------------------------------------------------------
    327 #if defined(USE_AWARE_STEALING)
    328         __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
    329                 processor * const proc = kernelTLS().this_processor;
    330                 const bool external = (!proc) || (cltr != proc->cltr);
    331                 const bool remote   = hint == UNPARK_REMOTE;
    332 
    333                 unsigned i;
    334                 if( external || remote ) {
    335                         // Figure out where thread was last time and make sure it's valid
    336                         /* paranoid */ verify(thrd->preferred >= 0);
    337                         if(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count) {
    338                                 /* paranoid */ verify(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count);
    339                                 unsigned start = thrd->preferred * READYQ_SHARD_FACTOR;
    340                                 do {
    341                                         unsigned r = __tls_rand();
    342                                         i = start + (r % READYQ_SHARD_FACTOR);
    343                                         /* paranoid */ verify( i < lanes.count );
    344                                         // If we can't lock it retry
    345                                 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
    346                         } else {
    347                                 do {
    348                                         i = __tls_rand() % lanes.count;
    349                                 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
    350                         }
    351                 } else {
    352                         do {
    353                                 unsigned r = proc->rdq.its++;
    354                                 i = proc->rdq.id + (r % READYQ_SHARD_FACTOR);
    355                                 /* paranoid */ verify( i < lanes.count );
    356                                 // If we can't lock it retry
    357                         } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
    358                 }
    359 
    360                 // Actually push it
    361                 push(lanes.data[i], thrd);
    362 
    363                 // Unlock and return
    364                 __atomic_unlock( &lanes.data[i].lock );
    365 
    366                 #if !defined(__CFA_NO_STATISTICS__)
    367                         if(unlikely(external || remote)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
    368                         else __tls_stats()->ready.push.local.success++;
    369                 #endif
    370         }
    371 
    372         static inline unsigned long long calc_cutoff(const unsigned long long ctsc, const processor * proc, __ready_queue_t & rdq) {
    373                 unsigned start = proc->rdq.id;
    374                 unsigned long long max = 0;
    375                 for(i; READYQ_SHARD_FACTOR) {
    376                         unsigned long long ptsc = ts(rdq.lanes.data[start + i]);
    377                         if(ptsc != -1ull) {
    378                                 /* paranoid */ verify( start + i < rdq.lanes.count );
    379                                 unsigned long long tsc = moving_average(ctsc, ptsc, rdq.lanes.tscs[start + i].ma);
    380                                 if(tsc > max) max = tsc;
    381                         }
    382                 }
    383                 return (max + 2 * max) / 2;
    384         }
    385 
    386         __attribute__((hot)) struct thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
    387                 /* paranoid */ verify( lanes.count > 0 );
    388                 /* paranoid */ verify( kernelTLS().this_processor );
    389                 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
    390 
    391                 processor * const proc = kernelTLS().this_processor;
    392                 unsigned this = proc->rdq.id;
    393                 /* paranoid */ verify( this < lanes.count );
    394                 __cfadbg_print_safe(ready_queue, "Kernel : pop from %u\n", this);
    395 
    396                 // Figure out the current cpu and make sure it is valid
    397                 const int cpu = __kernel_getcpu();
    398                 /* paranoid */ verify(cpu >= 0);
    399                 /* paranoid */ verify(cpu < cpu_info.hthrd_count);
    400                 unsigned this_cache = cpu_info.llc_map[cpu].cache;
    401 
    402                 // Super important: don't write the same value over and over again
    403                 // We want to maximise our chances that his particular values stays in cache
    404                 if(lanes.caches[this / READYQ_SHARD_FACTOR].id != this_cache)
    405                         __atomic_store_n(&lanes.caches[this / READYQ_SHARD_FACTOR].id, this_cache, __ATOMIC_RELAXED);
    406 
    407                 const unsigned long long ctsc = rdtscl();
    408 
    409                 if(proc->rdq.target == MAX) {
    410                         uint64_t chaos = __tls_rand();
    411                         unsigned ext = chaos & 0xff;
    412                         unsigned other  = (chaos >> 8) % (lanes.count);
    413 
    414                         if(ext < 3 || __atomic_load_n(&lanes.caches[other / READYQ_SHARD_FACTOR].id, __ATOMIC_RELAXED) == this_cache) {
    415                                 proc->rdq.target = other;
    416                         }
    417                 }
    418                 else {
    419                         const unsigned target = proc->rdq.target;
    420                         __cfadbg_print_safe(ready_queue, "Kernel : %u considering helping %u, tcsc %llu\n", this, target, lanes.tscs[target].tv);
    421                         /* paranoid */ verify( lanes.tscs[target].tv != MAX );
    422                         if(target < lanes.count) {
    423                                 const unsigned long long cutoff = calc_cutoff(ctsc, proc, cltr->ready_queue);
    424                                 const unsigned long long age = moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma);
    425                                 __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no");
    426                                 if(age > cutoff) {
    427                                         thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
    428                                         if(t) return t;
    429                                 }
    430                         }
    431                         proc->rdq.target = MAX;
    432                 }
    433 
    434                 for(READYQ_SHARD_FACTOR) {
    435                         unsigned i = this + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
    436                         if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
    437                 }
    438 
    439                 // All lanes where empty return 0p
    440                 return 0p;
    441 
    442         }
    443         __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
    444                 unsigned i = __tls_rand() % lanes.count;
    445                 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
    446         }
    447         __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) {
    448                 return search(cltr);
    449         }
    450 #endif
    451291#if defined(USE_CPU_WORK_STEALING)
    452292        __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
     
    510350                /* paranoid */ verify( kernelTLS().this_processor );
    511351
    512                 processor * const proc = kernelTLS().this_processor;
    513352                const int cpu = __kernel_getcpu();
    514353                /* paranoid */ verify(cpu >= 0);
     
    521360                /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %zu lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR);
    522361
     362                processor * const proc = kernelTLS().this_processor;
    523363                const int start = map.self * READYQ_SHARD_FACTOR;
    524364                const unsigned long long ctsc = rdtscl();
    525365
    526366                // Did we already have a help target
    527                 if(proc->rdq.target == MAX) {
     367                if(proc->rdq.target == -1u) {
    528368                        unsigned long long max = 0;
    529369                        for(i; READYQ_SHARD_FACTOR) {
    530                                 unsigned long long tsc = moving_average(ctsc, ts(lanes.data[start + i]), lanes.tscs[start + i].ma);
     370                                unsigned long long tsc = moving_average(ctsc - ts(lanes.data[start + i]), lanes.tscs[start + i].ma);
    531371                                if(tsc > max) max = tsc;
    532372                        }
    533                         // proc->rdq.cutoff = (max + 2 * max) / 2;
     373                        proc->rdq.cutoff = (max + 2 * max) / 2;
    534374                        /* paranoid */ verify(lanes.count < 65536); // The following code assumes max 65536 cores.
    535375                        /* paranoid */ verify(map.count < 65536); // The following code assumes max 65536 cores.
     
    544384                        }
    545385
    546                         /* paranoid */ verify(proc->rdq.target != MAX);
     386                        /* paranoid */ verify(proc->rdq.target != -1u);
    547387                }
    548388                else {
    549389                        unsigned long long max = 0;
    550390                        for(i; READYQ_SHARD_FACTOR) {
    551                                 unsigned long long tsc = moving_average(ctsc, ts(lanes.data[start + i]), lanes.tscs[start + i].ma);
     391                                unsigned long long tsc = moving_average(ctsc - ts(lanes.data[start + i]), lanes.tscs[start + i].ma);
    552392                                if(tsc > max) max = tsc;
    553393                        }
     
    555395                        {
    556396                                unsigned target = proc->rdq.target;
    557                                 proc->rdq.target = MAX;
     397                                proc->rdq.target = -1u;
    558398                                lanes.help[target / READYQ_SHARD_FACTOR].tri++;
    559                                 if(moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) {
     399                                if(moving_average(ctsc - lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) {
    560400                                        thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
    561401                                        proc->rdq.last = target;
    562402                                        if(t) return t;
     403                                        else proc->rdq.target = -1u;
    563404                                }
    564                                 proc->rdq.target = MAX;
     405                                else proc->rdq.target = -1u;
    565406                        }
    566407
    567408                        unsigned last = proc->rdq.last;
    568                         if(last != MAX && moving_average(ctsc, lanes.tscs[last].tv, lanes.tscs[last].ma) > cutoff) {
     409                        if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) < cutoff) {
    569410                                thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help));
    570411                                if(t) return t;
    571412                        }
    572413                        else {
    573                                 proc->rdq.last = MAX;
     414                                proc->rdq.last = -1u;
    574415                        }
    575416                }
     
    587428                processor * const proc = kernelTLS().this_processor;
    588429                unsigned last = proc->rdq.last;
    589                 if(last != MAX) {
     430                if(last != -1u) {
    590431                        struct thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.steal));
    591432                        if(t) return t;
    592                         proc->rdq.last = MAX;
     433                        proc->rdq.last = -1u;
    593434                }
    594435
     
    719560                #else
    720561                        unsigned preferred = thrd->preferred;
    721                         const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == MAX || thrd->curr_cluster != cltr;
     562                        const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr;
    722563                        /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count );
    723564
     
    771612                processor * proc = kernelTLS().this_processor;
    772613
    773                 if(proc->rdq.target == MAX) {
     614                if(proc->rdq.target == -1u) {
    774615                        unsigned long long min = ts(lanes.data[proc->rdq.id]);
    775616                        for(int i = 0; i < READYQ_SHARD_FACTOR; i++) {
     
    782623                else {
    783624                        unsigned target = proc->rdq.target;
    784                         proc->rdq.target = MAX;
     625                        proc->rdq.target = -1u;
    785626                        const unsigned long long bias = 0; //2_500_000_000;
    786627                        const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
     
    817658// try to pop from a lane given by index w
    818659static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
    819         /* paranoid */ verify( w < lanes.count );
    820660        __STATS( stats.attempt++; )
    821661
     
    841681        // Actually pop the list
    842682        struct thread$ * thrd;
    843         #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING)
    844                 unsigned long long tsc_before = ts(lane);
    845         #endif
     683        unsigned long long tsc_before = ts(lane);
    846684        unsigned long long tsv;
    847685        [thrd, tsv] = pop(lane);
     
    857695        __STATS( stats.success++; )
    858696
    859         #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING)
    860                 if (tsv != MAX) {
    861                         unsigned long long now = rdtscl();
    862                         unsigned long long pma = __atomic_load_n(&lanes.tscs[w].ma, __ATOMIC_RELAXED);
    863                         __atomic_store_n(&lanes.tscs[w].tv, tsv, __ATOMIC_RELAXED);
    864                         __atomic_store_n(&lanes.tscs[w].ma, moving_average(now, tsc_before, pma), __ATOMIC_RELAXED);
    865                 }
     697        #if defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING)
     698                unsigned long long now = rdtscl();
     699                lanes.tscs[w].tv = tsv;
     700                lanes.tscs[w].ma = moving_average(now > tsc_before ? now - tsc_before : 0, lanes.tscs[w].ma);
    866701        #endif
    867702
    868         #if defined(USE_AWARE_STEALING) || defined(USE_CPU_WORK_STEALING)
     703        #if defined(USE_CPU_WORK_STEALING)
    869704                thrd->preferred = w / READYQ_SHARD_FACTOR;
    870705        #else
     
    965800                /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
    966801                it->rdq.id = value;
    967                 it->rdq.target = MAX;
     802                it->rdq.target = -1u;
    968803                value += READYQ_SHARD_FACTOR;
    969804                it = &(*it)`next;
     
    978813
    979814static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) {
    980         #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING)
     815        #if defined(USE_WORK_STEALING)
    981816                lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
    982817                for(i; lanes.count) {
    983                         lanes.tscs[i].tv = rdtscl();
    984                         lanes.tscs[i].ma = 0;
     818                        unsigned long long tsc1 = ts(lanes.data[i]);
     819                        unsigned long long tsc2 = rdtscl();
     820                        lanes.tscs[i].tv = min(tsc1, tsc2);
    985821                }
    986822        #endif
     
    1028864                        // Update original
    1029865                        lanes.count = ncount;
    1030 
    1031                         lanes.caches = alloc( target, lanes.caches`realloc );
    1032866                }
    1033867
     
    1106940                                fix(lanes.data[idx]);
    1107941                        }
    1108 
    1109                         lanes.caches = alloc( target, lanes.caches`realloc );
    1110942                }
    1111943
    1112944                fix_times(cltr);
    1113 
    1114945
    1115946                reassign_cltr_id(cltr);
Note: See TracChangeset for help on using the changeset viewer.