Ignore:
Timestamp:
Jan 18, 2022, 1:16:23 PM (2 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, ast-experimental, enum, forall-pointer-decay, master, pthread-emulation, qualifiedEnum
Children:
1e8b4b49, adfd125
Parents:
21a5bfb7 (diff), 91a72ef (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/ready_queue.cfa

    r21a5bfb7 r175f9f4  
    2020
    2121
    22 #define USE_RELAXED_FIFO
     22// #define USE_RELAXED_FIFO
    2323// #define USE_WORK_STEALING
    2424// #define USE_CPU_WORK_STEALING
     25#define USE_AWARE_STEALING
    2526
    2627#include "bits/defs.hfa"
     
    2930
    3031#include "stdlib.hfa"
     32#include "limits.hfa"
    3133#include "math.hfa"
    3234
     
    5456#endif
    5557
    56 #if   defined(USE_CPU_WORK_STEALING)
     58#if   defined(USE_AWARE_STEALING)
     59        #define READYQ_SHARD_FACTOR 2
     60        #define SEQUENTIAL_SHARD 2
     61#elif defined(USE_CPU_WORK_STEALING)
    5762        #define READYQ_SHARD_FACTOR 2
    5863#elif defined(USE_RELAXED_FIFO)
     
    138143        __kernel_rseq_register();
    139144
    140         __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
    141145        bool * handle = (bool *)&kernelTLS().sched_lock;
    142146
     
    174178        }
    175179
    176         __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);
    177 
    178180        // Return new spot.
    179181        /* paranoid */ verify(n < ready);
     
    190192
    191193        __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
    192 
    193         __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
    194194
    195195        __kernel_rseq_unregister();
     
    244244
    245245//=======================================================================
     246// caches handling
     247
     248struct __attribute__((aligned(128))) __ready_queue_caches_t {
     249        // Count States:
     250        // - 0  : No one is looking after this cache
     251        // - 1  : No one is looking after this cache, BUT it's not empty
     252        // - 2+ : At least one processor is looking after this cache
     253        volatile unsigned count;
     254};
     255
     256void  ?{}(__ready_queue_caches_t & this) { this.count = 0; }
     257void ^?{}(__ready_queue_caches_t & this) {}
     258
     259static inline void depart(__ready_queue_caches_t & cache) {
     260        /* paranoid */ verify( cache.count > 1);
     261        __atomic_fetch_add(&cache.count, -1, __ATOMIC_SEQ_CST);
     262        /* paranoid */ verify( cache.count != 0);
     263        /* paranoid */ verify( cache.count < 65536 ); // This verify assumes no cluster will have more than 65000 kernel threads mapped to a single cache, which could be correct but is super weird.
     264}
     265
     266static inline void arrive(__ready_queue_caches_t & cache) {
     267        // for() {
     268        //      unsigned expected = cache.count;
     269        //      unsigned desired  = 0 == expected ? 2 : expected + 1;
     270        // }
     271}
     272
     273//=======================================================================
    246274// Cforall Ready Queue used for scheduling
    247275//=======================================================================
    248 unsigned long long moving_average(unsigned long long nval, unsigned long long oval) {
    249         const unsigned long long tw = 16;
    250         const unsigned long long nw = 4;
    251         const unsigned long long ow = tw - nw;
    252         return ((nw * nval) + (ow * oval)) / tw;
     276unsigned long long moving_average(unsigned long long currtsc, unsigned long long instsc, unsigned long long old_avg) {
     277        /* paranoid */ verifyf( currtsc < 45000000000000000, "Suspiciously large current time: %'llu (%llx)\n", currtsc, currtsc );
     278        /* paranoid */ verifyf( instsc  < 45000000000000000, "Suspiciously large insert time: %'llu (%llx)\n", instsc, instsc );
     279        /* paranoid */ verifyf( old_avg < 15000000000000, "Suspiciously large previous average: %'llu (%llx)\n", old_avg, old_avg );
     280
     281        const unsigned long long new_val = currtsc > instsc ? currtsc - instsc : 0;
     282        const unsigned long long total_weight = 16;
     283        const unsigned long long new_weight   = 4;
     284        const unsigned long long old_weight = total_weight - new_weight;
     285        const unsigned long long ret = ((new_weight * new_val) + (old_weight * old_avg)) / total_weight;
     286        return ret;
    253287}
    254288
     
    271305                }
    272306        #else
    273                 lanes.data  = 0p;
    274                 lanes.tscs  = 0p;
    275                 lanes.help  = 0p;
    276                 lanes.count = 0;
     307                lanes.data   = 0p;
     308                lanes.tscs   = 0p;
     309                lanes.caches = 0p;
     310                lanes.help   = 0p;
     311                lanes.count  = 0;
    277312        #endif
    278313}
     
    285320        free(lanes.data);
    286321        free(lanes.tscs);
     322        free(lanes.caches);
    287323        free(lanes.help);
    288324}
    289325
    290326//-----------------------------------------------------------------------
     327#if defined(USE_AWARE_STEALING)
     328        __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
     329                processor * const proc = kernelTLS().this_processor;
     330                const bool external = (!proc) || (cltr != proc->cltr);
     331                const bool remote   = hint == UNPARK_REMOTE;
     332
     333                unsigned i;
     334                if( external || remote ) {
     335                        // Figure out where thread was last time and make sure it's valid
     336                        /* paranoid */ verify(thrd->preferred >= 0);
     337                        if(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count) {
     338                                /* paranoid */ verify(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count);
     339                                unsigned start = thrd->preferred * READYQ_SHARD_FACTOR;
     340                                do {
     341                                        unsigned r = __tls_rand();
     342                                        i = start + (r % READYQ_SHARD_FACTOR);
     343                                        /* paranoid */ verify( i < lanes.count );
     344                                        // If we can't lock it retry
     345                                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     346                        } else {
     347                                do {
     348                                        i = __tls_rand() % lanes.count;
     349                                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     350                        }
     351                } else {
     352                        do {
     353                                unsigned r = proc->rdq.its++;
     354                                i = proc->rdq.id + (r % READYQ_SHARD_FACTOR);
     355                                /* paranoid */ verify( i < lanes.count );
     356                                // If we can't lock it retry
     357                        } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     358                }
     359
     360                // Actually push it
     361                push(lanes.data[i], thrd);
     362
     363                // Unlock and return
     364                __atomic_unlock( &lanes.data[i].lock );
     365
     366                #if !defined(__CFA_NO_STATISTICS__)
     367                        if(unlikely(external || remote)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
     368                        else __tls_stats()->ready.push.local.success++;
     369                #endif
     370        }
     371
     372        static inline unsigned long long calc_cutoff(const unsigned long long ctsc, const processor * proc, __ready_queue_t & rdq) {
     373                unsigned start = proc->rdq.id;
     374                unsigned long long max = 0;
     375                for(i; READYQ_SHARD_FACTOR) {
     376                        unsigned long long ptsc = ts(rdq.lanes.data[start + i]);
     377                        if(ptsc != -1ull) {
     378                                /* paranoid */ verify( start + i < rdq.lanes.count );
     379                                unsigned long long tsc = moving_average(ctsc, ptsc, rdq.lanes.tscs[start + i].ma);
     380                                if(tsc > max) max = tsc;
     381                        }
     382                }
     383                return (max + 2 * max) / 2;
     384        }
     385
     386        __attribute__((hot)) struct thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
     387                /* paranoid */ verify( lanes.count > 0 );
     388                /* paranoid */ verify( kernelTLS().this_processor );
     389                /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
     390
     391                processor * const proc = kernelTLS().this_processor;
     392                unsigned this = proc->rdq.id;
     393                /* paranoid */ verify( this < lanes.count );
     394                __cfadbg_print_safe(ready_queue, "Kernel : pop from %u\n", this);
     395
     396                // Figure out the current cpu and make sure it is valid
     397                const int cpu = __kernel_getcpu();
     398                /* paranoid */ verify(cpu >= 0);
     399                /* paranoid */ verify(cpu < cpu_info.hthrd_count);
     400                unsigned this_cache = cpu_info.llc_map[cpu].cache;
     401
     402                // Super important: don't write the same value over and over again
     403                // We want to maximise our chances that his particular values stays in cache
     404                if(lanes.caches[this / READYQ_SHARD_FACTOR].id != this_cache)
     405                        __atomic_store_n(&lanes.caches[this / READYQ_SHARD_FACTOR].id, this_cache, __ATOMIC_RELAXED);
     406
     407                const unsigned long long ctsc = rdtscl();
     408
     409                if(proc->rdq.target == MAX) {
     410                        uint64_t chaos = __tls_rand();
     411                        unsigned ext = chaos & 0xff;
     412                        unsigned other  = (chaos >> 8) % (lanes.count);
     413
     414                        if(ext < 3 || __atomic_load_n(&lanes.caches[other / READYQ_SHARD_FACTOR].id, __ATOMIC_RELAXED) == this_cache) {
     415                                proc->rdq.target = other;
     416                        }
     417                }
     418                else {
     419                        const unsigned target = proc->rdq.target;
     420                        __cfadbg_print_safe(ready_queue, "Kernel : %u considering helping %u, tcsc %llu\n", this, target, lanes.tscs[target].tv);
     421                        /* paranoid */ verify( lanes.tscs[target].tv != MAX );
     422                        if(target < lanes.count) {
     423                                const unsigned long long cutoff = calc_cutoff(ctsc, proc, cltr->ready_queue);
     424                                const unsigned long long age = moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma);
     425                                __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no");
     426                                if(age > cutoff) {
     427                                        thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
     428                                        if(t) return t;
     429                                }
     430                        }
     431                        proc->rdq.target = MAX;
     432                }
     433
     434                for(READYQ_SHARD_FACTOR) {
     435                        unsigned i = this + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
     436                        if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
     437                }
     438
     439                // All lanes where empty return 0p
     440                return 0p;
     441
     442        }
     443        __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
     444                unsigned i = __tls_rand() % lanes.count;
     445                return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
     446        }
     447        __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) {
     448                return search(cltr);
     449        }
     450#endif
    291451#if defined(USE_CPU_WORK_STEALING)
    292452        __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
     
    350510                /* paranoid */ verify( kernelTLS().this_processor );
    351511
     512                processor * const proc = kernelTLS().this_processor;
    352513                const int cpu = __kernel_getcpu();
    353514                /* paranoid */ verify(cpu >= 0);
     
    360521                /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %zu lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR);
    361522
    362                 processor * const proc = kernelTLS().this_processor;
    363523                const int start = map.self * READYQ_SHARD_FACTOR;
    364524                const unsigned long long ctsc = rdtscl();
    365525
    366526                // Did we already have a help target
    367                 if(proc->rdq.target == -1u) {
     527                if(proc->rdq.target == MAX) {
    368528                        unsigned long long max = 0;
    369529                        for(i; READYQ_SHARD_FACTOR) {
    370                                 unsigned long long tsc = moving_average(ctsc - ts(lanes.data[start + i]), lanes.tscs[start + i].ma);
     530                                unsigned long long tsc = moving_average(ctsc, ts(lanes.data[start + i]), lanes.tscs[start + i].ma);
    371531                                if(tsc > max) max = tsc;
    372532                        }
    373                         proc->rdq.cutoff = (max + 2 * max) / 2;
     533                        // proc->rdq.cutoff = (max + 2 * max) / 2;
    374534                        /* paranoid */ verify(lanes.count < 65536); // The following code assumes max 65536 cores.
    375535                        /* paranoid */ verify(map.count < 65536); // The following code assumes max 65536 cores.
     
    384544                        }
    385545
    386                         /* paranoid */ verify(proc->rdq.target != -1u);
     546                        /* paranoid */ verify(proc->rdq.target != MAX);
    387547                }
    388548                else {
    389549                        unsigned long long max = 0;
    390550                        for(i; READYQ_SHARD_FACTOR) {
    391                                 unsigned long long tsc = moving_average(ctsc - ts(lanes.data[start + i]), lanes.tscs[start + i].ma);
     551                                unsigned long long tsc = moving_average(ctsc, ts(lanes.data[start + i]), lanes.tscs[start + i].ma);
    392552                                if(tsc > max) max = tsc;
    393553                        }
     
    395555                        {
    396556                                unsigned target = proc->rdq.target;
    397                                 proc->rdq.target = -1u;
     557                                proc->rdq.target = MAX;
    398558                                lanes.help[target / READYQ_SHARD_FACTOR].tri++;
    399                                 if(moving_average(ctsc - lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) {
     559                                if(moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) {
    400560                                        thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
    401561                                        proc->rdq.last = target;
    402562                                        if(t) return t;
    403                                         else proc->rdq.target = -1u;
    404563                                }
    405                                 else proc->rdq.target = -1u;
     564                                proc->rdq.target = MAX;
    406565                        }
    407566
    408567                        unsigned last = proc->rdq.last;
    409                         if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) < cutoff) {
     568                        if(last != MAX && moving_average(ctsc, lanes.tscs[last].tv, lanes.tscs[last].ma) > cutoff) {
    410569                                thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help));
    411570                                if(t) return t;
    412571                        }
    413572                        else {
    414                                 proc->rdq.last = -1u;
     573                                proc->rdq.last = MAX;
    415574                        }
    416575                }
     
    428587                processor * const proc = kernelTLS().this_processor;
    429588                unsigned last = proc->rdq.last;
    430                 if(last != -1u) {
     589                if(last != MAX) {
    431590                        struct thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.steal));
    432591                        if(t) return t;
    433                         proc->rdq.last = -1u;
     592                        proc->rdq.last = MAX;
    434593                }
    435594
     
    560719                #else
    561720                        unsigned preferred = thrd->preferred;
    562                         const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr;
     721                        const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == MAX || thrd->curr_cluster != cltr;
    563722                        /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count );
    564723
     
    612771                processor * proc = kernelTLS().this_processor;
    613772
    614                 if(proc->rdq.target == -1u) {
     773                if(proc->rdq.target == MAX) {
    615774                        unsigned long long min = ts(lanes.data[proc->rdq.id]);
    616775                        for(int i = 0; i < READYQ_SHARD_FACTOR; i++) {
     
    623782                else {
    624783                        unsigned target = proc->rdq.target;
    625                         proc->rdq.target = -1u;
     784                        proc->rdq.target = MAX;
    626785                        const unsigned long long bias = 0; //2_500_000_000;
    627786                        const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
     
    658817// try to pop from a lane given by index w
    659818static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
     819        /* paranoid */ verify( w < lanes.count );
    660820        __STATS( stats.attempt++; )
    661821
     
    681841        // Actually pop the list
    682842        struct thread$ * thrd;
    683         unsigned long long tsc_before = ts(lane);
     843        #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING)
     844                unsigned long long tsc_before = ts(lane);
     845        #endif
    684846        unsigned long long tsv;
    685847        [thrd, tsv] = pop(lane);
     
    695857        __STATS( stats.success++; )
    696858
    697         #if defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING)
    698                 unsigned long long now = rdtscl();
    699                 lanes.tscs[w].tv = tsv;
    700                 lanes.tscs[w].ma = moving_average(now > tsc_before ? now - tsc_before : 0, lanes.tscs[w].ma);
     859        #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING)
     860                if (tsv != MAX) {
     861                        unsigned long long now = rdtscl();
     862                        unsigned long long pma = __atomic_load_n(&lanes.tscs[w].ma, __ATOMIC_RELAXED);
     863                        __atomic_store_n(&lanes.tscs[w].tv, tsv, __ATOMIC_RELAXED);
     864                        __atomic_store_n(&lanes.tscs[w].ma, moving_average(now, tsc_before, pma), __ATOMIC_RELAXED);
     865                }
    701866        #endif
    702867
    703         #if defined(USE_CPU_WORK_STEALING)
     868        #if defined(USE_AWARE_STEALING) || defined(USE_CPU_WORK_STEALING)
    704869                thrd->preferred = w / READYQ_SHARD_FACTOR;
    705870        #else
     
    800965                /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
    801966                it->rdq.id = value;
    802                 it->rdq.target = -1u;
     967                it->rdq.target = MAX;
    803968                value += READYQ_SHARD_FACTOR;
    804969                it = &(*it)`next;
     
    813978
    814979static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) {
    815         #if defined(USE_WORK_STEALING)
     980        #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING)
    816981                lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
    817982                for(i; lanes.count) {
    818                         unsigned long long tsc1 = ts(lanes.data[i]);
    819                         unsigned long long tsc2 = rdtscl();
    820                         lanes.tscs[i].tv = min(tsc1, tsc2);
     983                        lanes.tscs[i].tv = rdtscl();
     984                        lanes.tscs[i].ma = 0;
    821985                }
    822986        #endif
     
    8641028                        // Update original
    8651029                        lanes.count = ncount;
     1030
     1031                        lanes.caches = alloc( target, lanes.caches`realloc );
    8661032                }
    8671033
     
    9401106                                fix(lanes.data[idx]);
    9411107                        }
     1108
     1109                        lanes.caches = alloc( target, lanes.caches`realloc );
    9421110                }
    9431111
    9441112                fix_times(cltr);
     1113
    9451114
    9461115                reassign_cltr_id(cltr);
Note: See TracChangeset for help on using the changeset viewer.