Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/ready_queue.cfa

    r46bbcaf r078fb05  
    2020
    2121
    22 // #define USE_RELAXED_FIFO
     22#define USE_RELAXED_FIFO
    2323// #define USE_WORK_STEALING
    2424// #define USE_CPU_WORK_STEALING
    25 #define USE_AWARE_STEALING
    2625
    2726#include "bits/defs.hfa"
     
    3029
    3130#include "stdlib.hfa"
    32 #include "limits.hfa"
    3331#include "math.hfa"
    3432
     
    5654#endif
    5755
    58 #if   defined(USE_AWARE_STEALING)
    59         #define READYQ_SHARD_FACTOR 2
    60         #define SEQUENTIAL_SHARD 2
    61 #elif defined(USE_CPU_WORK_STEALING)
     56#if   defined(USE_CPU_WORK_STEALING)
    6257        #define READYQ_SHARD_FACTOR 2
    6358#elif defined(USE_RELAXED_FIFO)
     
    143138        __kernel_rseq_register();
    144139
     140        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
    145141        bool * handle = (bool *)&kernelTLS().sched_lock;
    146142
     
    178174        }
    179175
     176        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);
     177
    180178        // Return new spot.
    181179        /* paranoid */ verify(n < ready);
     
    192190
    193191        __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
     192
     193        __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
    194194
    195195        __kernel_rseq_unregister();
     
    201201uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
    202202        /* paranoid */ verify( ! __preemption_enabled() );
     203        /* paranoid */ verify( ! kernelTLS().sched_lock );
    203204
    204205        // Step 1 : lock global lock
     
    206207        //   to simply lock their own lock and enter.
    207208        __atomic_acquire( &write_lock );
    208 
    209         // Make sure we won't deadlock ourself
    210         // Checking before acquiring the writer lock isn't safe
    211         // because someone else could have locked us.
    212         /* paranoid */ verify( ! kernelTLS().sched_lock );
    213209
    214210        // Step 2 : lock per-proc lock
     
    248244
    249245//=======================================================================
    250 // caches handling
    251 
    252 struct __attribute__((aligned(128))) __ready_queue_caches_t {
    253         // Count States:
    254         // - 0  : No one is looking after this cache
    255         // - 1  : No one is looking after this cache, BUT it's not empty
    256         // - 2+ : At least one processor is looking after this cache
    257         volatile unsigned count;
    258 };
    259 
    260 void  ?{}(__ready_queue_caches_t & this) { this.count = 0; }
    261 void ^?{}(__ready_queue_caches_t & this) {}
    262 
    263 static inline void depart(__ready_queue_caches_t & cache) {
    264         /* paranoid */ verify( cache.count > 1);
    265         __atomic_fetch_add(&cache.count, -1, __ATOMIC_SEQ_CST);
    266         /* paranoid */ verify( cache.count != 0);
    267         /* paranoid */ verify( cache.count < 65536 ); // This verify assumes no cluster will have more than 65000 kernel threads mapped to a single cache, which could be correct but is super weird.
    268 }
    269 
    270 static inline void arrive(__ready_queue_caches_t & cache) {
    271         // for() {
    272         //      unsigned expected = cache.count;
    273         //      unsigned desired  = 0 == expected ? 2 : expected + 1;
    274         // }
    275 }
    276 
    277 //=======================================================================
    278246// Cforall Ready Queue used for scheduling
    279247//=======================================================================
    280 unsigned long long moving_average(unsigned long long currtsc, unsigned long long instsc, unsigned long long old_avg) {
    281         /* paranoid */ verifyf( currtsc < 45000000000000000, "Suspiciously large current time: %'llu (%llx)\n", currtsc, currtsc );
    282         /* paranoid */ verifyf( instsc  < 45000000000000000, "Suspiciously large insert time: %'llu (%llx)\n", instsc, instsc );
    283         /* paranoid */ verifyf( old_avg < 15000000000000, "Suspiciously large previous average: %'llu (%llx)\n", old_avg, old_avg );
    284 
    285         const unsigned long long new_val = currtsc > instsc ? currtsc - instsc : 0;
    286         const unsigned long long total_weight = 16;
    287         const unsigned long long new_weight   = 4;
    288         const unsigned long long old_weight = total_weight - new_weight;
    289         const unsigned long long ret = ((new_weight * new_val) + (old_weight * old_avg)) / total_weight;
    290         return ret;
     248unsigned long long moving_average(unsigned long long nval, unsigned long long oval) {
     249        const unsigned long long tw = 16;
     250        const unsigned long long nw = 4;
     251        const unsigned long long ow = tw - nw;
     252        return ((nw * nval) + (ow * oval)) / tw;
    291253}
    292254
     
    309271                }
    310272        #else
    311                 lanes.data   = 0p;
    312                 lanes.tscs   = 0p;
    313                 lanes.caches = 0p;
    314                 lanes.help   = 0p;
    315                 lanes.count  = 0;
     273                lanes.data  = 0p;
     274                lanes.tscs  = 0p;
     275                lanes.help  = 0p;
     276                lanes.count = 0;
    316277        #endif
    317278}
     
    324285        free(lanes.data);
    325286        free(lanes.tscs);
    326         free(lanes.caches);
    327287        free(lanes.help);
    328288}
    329289
    330290//-----------------------------------------------------------------------
    331 #if defined(USE_AWARE_STEALING)
    332         __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
    333                 processor * const proc = kernelTLS().this_processor;
    334                 const bool external = (!proc) || (cltr != proc->cltr);
    335                 const bool remote   = hint == UNPARK_REMOTE;
    336 
    337                 unsigned i;
    338                 if( external || remote ) {
    339                         // Figure out where thread was last time and make sure it's valid
    340                         /* paranoid */ verify(thrd->preferred >= 0);
    341                         if(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count) {
    342                                 /* paranoid */ verify(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count);
    343                                 unsigned start = thrd->preferred * READYQ_SHARD_FACTOR;
    344                                 do {
    345                                         unsigned r = __tls_rand();
    346                                         i = start + (r % READYQ_SHARD_FACTOR);
    347                                         /* paranoid */ verify( i < lanes.count );
    348                                         // If we can't lock it retry
    349                                 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
    350                         } else {
    351                                 do {
    352                                         i = __tls_rand() % lanes.count;
    353                                 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
    354                         }
    355                 } else {
    356                         do {
    357                                 unsigned r = proc->rdq.its++;
    358                                 i = proc->rdq.id + (r % READYQ_SHARD_FACTOR);
    359                                 /* paranoid */ verify( i < lanes.count );
    360                                 // If we can't lock it retry
    361                         } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
    362                 }
    363 
    364                 // Actually push it
    365                 push(lanes.data[i], thrd);
    366 
    367                 // Unlock and return
    368                 __atomic_unlock( &lanes.data[i].lock );
    369 
    370                 #if !defined(__CFA_NO_STATISTICS__)
    371                         if(unlikely(external || remote)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
    372                         else __tls_stats()->ready.push.local.success++;
    373                 #endif
    374         }
    375 
    376         static inline unsigned long long calc_cutoff(const unsigned long long ctsc, const processor * proc, __ready_queue_t & rdq) {
    377                 unsigned start = proc->rdq.id;
    378                 unsigned long long max = 0;
    379                 for(i; READYQ_SHARD_FACTOR) {
    380                         unsigned long long ptsc = ts(rdq.lanes.data[start + i]);
    381                         if(ptsc != -1ull) {
    382                                 /* paranoid */ verify( start + i < rdq.lanes.count );
    383                                 unsigned long long tsc = moving_average(ctsc, ptsc, rdq.lanes.tscs[start + i].ma);
    384                                 if(tsc > max) max = tsc;
    385                         }
    386                 }
    387                 return (max + 2 * max) / 2;
    388         }
    389 
    390         __attribute__((hot)) struct thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
    391                 /* paranoid */ verify( lanes.count > 0 );
    392                 /* paranoid */ verify( kernelTLS().this_processor );
    393                 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
    394 
    395                 processor * const proc = kernelTLS().this_processor;
    396                 unsigned this = proc->rdq.id;
    397                 /* paranoid */ verify( this < lanes.count );
    398                 __cfadbg_print_safe(ready_queue, "Kernel : pop from %u\n", this);
    399 
    400                 // Figure out the current cpu and make sure it is valid
    401                 const int cpu = __kernel_getcpu();
    402                 /* paranoid */ verify(cpu >= 0);
    403                 /* paranoid */ verify(cpu < cpu_info.hthrd_count);
    404                 unsigned this_cache = cpu_info.llc_map[cpu].cache;
    405 
    406                 // Super important: don't write the same value over and over again
    407                 // We want to maximise our chances that his particular values stays in cache
    408                 if(lanes.caches[this / READYQ_SHARD_FACTOR].id != this_cache)
    409                         __atomic_store_n(&lanes.caches[this / READYQ_SHARD_FACTOR].id, this_cache, __ATOMIC_RELAXED);
    410 
    411                 const unsigned long long ctsc = rdtscl();
    412 
    413                 if(proc->rdq.target == MAX) {
    414                         uint64_t chaos = __tls_rand();
    415                         unsigned ext = chaos & 0xff;
    416                         unsigned other  = (chaos >> 8) % (lanes.count);
    417 
    418                         if(ext < 3 || __atomic_load_n(&lanes.caches[other / READYQ_SHARD_FACTOR].id, __ATOMIC_RELAXED) == this_cache) {
    419                                 proc->rdq.target = other;
    420                         }
    421                 }
    422                 else {
    423                         const unsigned target = proc->rdq.target;
    424                         __cfadbg_print_safe(ready_queue, "Kernel : %u considering helping %u, tcsc %llu\n", this, target, lanes.tscs[target].tv);
    425                         /* paranoid */ verify( lanes.tscs[target].tv != MAX );
    426                         if(target < lanes.count) {
    427                                 const unsigned long long cutoff = calc_cutoff(ctsc, proc, cltr->ready_queue);
    428                                 const unsigned long long age = moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma);
    429                                 __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no");
    430                                 if(age > cutoff) {
    431                                         thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
    432                                         if(t) return t;
    433                                 }
    434                         }
    435                         proc->rdq.target = MAX;
    436                 }
    437 
    438                 for(READYQ_SHARD_FACTOR) {
    439                         unsigned i = this + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
    440                         if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
    441                 }
    442 
    443                 // All lanes where empty return 0p
    444                 return 0p;
    445 
    446         }
    447         __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
    448                 unsigned i = __tls_rand() % lanes.count;
    449                 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
    450         }
    451         __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) {
    452                 return search(cltr);
    453         }
    454 #endif
    455291#if defined(USE_CPU_WORK_STEALING)
    456292        __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
     
    514350                /* paranoid */ verify( kernelTLS().this_processor );
    515351
    516                 processor * const proc = kernelTLS().this_processor;
    517352                const int cpu = __kernel_getcpu();
    518353                /* paranoid */ verify(cpu >= 0);
     
    525360                /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %zu lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR);
    526361
     362                processor * const proc = kernelTLS().this_processor;
    527363                const int start = map.self * READYQ_SHARD_FACTOR;
    528364                const unsigned long long ctsc = rdtscl();
    529365
    530366                // Did we already have a help target
    531                 if(proc->rdq.target == MAX) {
     367                if(proc->rdq.target == -1u) {
    532368                        unsigned long long max = 0;
    533369                        for(i; READYQ_SHARD_FACTOR) {
    534                                 unsigned long long tsc = moving_average(ctsc, ts(lanes.data[start + i]), lanes.tscs[start + i].ma);
     370                                unsigned long long tsc = moving_average(ctsc - ts(lanes.data[start + i]), lanes.tscs[start + i].ma);
    535371                                if(tsc > max) max = tsc;
    536372                        }
    537                         // proc->rdq.cutoff = (max + 2 * max) / 2;
     373                        proc->rdq.cutoff = (max + 2 * max) / 2;
    538374                        /* paranoid */ verify(lanes.count < 65536); // The following code assumes max 65536 cores.
    539375                        /* paranoid */ verify(map.count < 65536); // The following code assumes max 65536 cores.
     
    548384                        }
    549385
    550                         /* paranoid */ verify(proc->rdq.target != MAX);
     386                        /* paranoid */ verify(proc->rdq.target != -1u);
    551387                }
    552388                else {
    553389                        unsigned long long max = 0;
    554390                        for(i; READYQ_SHARD_FACTOR) {
    555                                 unsigned long long tsc = moving_average(ctsc, ts(lanes.data[start + i]), lanes.tscs[start + i].ma);
     391                                unsigned long long tsc = moving_average(ctsc - ts(lanes.data[start + i]), lanes.tscs[start + i].ma);
    556392                                if(tsc > max) max = tsc;
    557393                        }
     
    559395                        {
    560396                                unsigned target = proc->rdq.target;
    561                                 proc->rdq.target = MAX;
     397                                proc->rdq.target = -1u;
    562398                                lanes.help[target / READYQ_SHARD_FACTOR].tri++;
    563                                 if(moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) {
     399                                if(moving_average(ctsc - lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) {
    564400                                        thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
    565401                                        proc->rdq.last = target;
    566402                                        if(t) return t;
     403                                        else proc->rdq.target = -1u;
    567404                                }
    568                                 proc->rdq.target = MAX;
     405                                else proc->rdq.target = -1u;
    569406                        }
    570407
    571408                        unsigned last = proc->rdq.last;
    572                         if(last != MAX && moving_average(ctsc, lanes.tscs[last].tv, lanes.tscs[last].ma) > cutoff) {
     409                        if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) < cutoff) {
    573410                                thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help));
    574411                                if(t) return t;
    575412                        }
    576413                        else {
    577                                 proc->rdq.last = MAX;
     414                                proc->rdq.last = -1u;
    578415                        }
    579416                }
     
    591428                processor * const proc = kernelTLS().this_processor;
    592429                unsigned last = proc->rdq.last;
    593                 if(last != MAX) {
     430                if(last != -1u) {
    594431                        struct thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.steal));
    595432                        if(t) return t;
    596                         proc->rdq.last = MAX;
     433                        proc->rdq.last = -1u;
    597434                }
    598435
     
    723560                #else
    724561                        unsigned preferred = thrd->preferred;
    725                         const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == MAX || thrd->curr_cluster != cltr;
     562                        const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr;
    726563                        /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count );
    727564
     
    775612                processor * proc = kernelTLS().this_processor;
    776613
    777                 if(proc->rdq.target == MAX) {
     614                if(proc->rdq.target == -1u) {
    778615                        unsigned long long min = ts(lanes.data[proc->rdq.id]);
    779616                        for(int i = 0; i < READYQ_SHARD_FACTOR; i++) {
     
    786623                else {
    787624                        unsigned target = proc->rdq.target;
    788                         proc->rdq.target = MAX;
     625                        proc->rdq.target = -1u;
    789626                        const unsigned long long bias = 0; //2_500_000_000;
    790627                        const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
     
    821658// try to pop from a lane given by index w
    822659static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
    823         /* paranoid */ verify( w < lanes.count );
    824660        __STATS( stats.attempt++; )
    825661
     
    845681        // Actually pop the list
    846682        struct thread$ * thrd;
    847         #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING)
     683        #if defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING)
    848684                unsigned long long tsc_before = ts(lane);
    849685        #endif
     
    861697        __STATS( stats.success++; )
    862698
    863         #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING)
    864                 if (tsv != MAX) {
    865                         unsigned long long now = rdtscl();
    866                         unsigned long long pma = __atomic_load_n(&lanes.tscs[w].ma, __ATOMIC_RELAXED);
    867                         __atomic_store_n(&lanes.tscs[w].tv, tsv, __ATOMIC_RELAXED);
    868                         __atomic_store_n(&lanes.tscs[w].ma, moving_average(now, tsc_before, pma), __ATOMIC_RELAXED);
    869                 }
     699        #if defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING)
     700                unsigned long long now = rdtscl();
     701                lanes.tscs[w].tv = tsv;
     702                lanes.tscs[w].ma = moving_average(now > tsc_before ? now - tsc_before : 0, lanes.tscs[w].ma);
    870703        #endif
    871704
    872         #if defined(USE_AWARE_STEALING) || defined(USE_CPU_WORK_STEALING)
     705        #if defined(USE_CPU_WORK_STEALING)
    873706                thrd->preferred = w / READYQ_SHARD_FACTOR;
    874707        #else
     
    969802                /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
    970803                it->rdq.id = value;
    971                 it->rdq.target = MAX;
     804                it->rdq.target = -1u;
    972805                value += READYQ_SHARD_FACTOR;
    973806                it = &(*it)`next;
     
    982815
    983816static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) {
    984         #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING)
     817        #if defined(USE_WORK_STEALING)
    985818                lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
    986819                for(i; lanes.count) {
    987                         lanes.tscs[i].tv = rdtscl();
    988                         lanes.tscs[i].ma = 0;
     820                        unsigned long long tsc1 = ts(lanes.data[i]);
     821                        unsigned long long tsc2 = rdtscl();
     822                        lanes.tscs[i].tv = min(tsc1, tsc2);
    989823                }
    990824        #endif
     
    1032866                        // Update original
    1033867                        lanes.count = ncount;
    1034 
    1035                         lanes.caches = alloc( target, lanes.caches`realloc );
    1036868                }
    1037869
     
    1110942                                fix(lanes.data[idx]);
    1111943                        }
    1112 
    1113                         lanes.caches = alloc( target, lanes.caches`realloc );
    1114944                }
    1115945
    1116946                fix_times(cltr);
    1117 
    1118947
    1119948                reassign_cltr_id(cltr);
Note: See TracChangeset for help on using the changeset viewer.