Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/ready_queue.cfa

    r1f45c7d r75c7252  
    6767#endif
    6868
    69 static inline struct $thread * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats));
    70 static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats));
    71 static inline struct $thread * search(struct cluster * cltr);
     69static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats));
     70static inline struct thread$ * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats));
     71static inline struct thread$ * search(struct cluster * cltr);
    7272static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred);
    7373
     
    100100        #define __kernel_rseq_unregister rseq_unregister_current_thread
    101101#elif defined(CFA_HAVE_LINUX_RSEQ_H)
    102         void __kernel_raw_rseq_register  (void);
    103         void __kernel_raw_rseq_unregister(void);
     102        static void __kernel_raw_rseq_register  (void);
     103        static void __kernel_raw_rseq_unregister(void);
    104104
    105105        #define __kernel_rseq_register __kernel_raw_rseq_register
     
    246246// Cforall Ready Queue used for scheduling
    247247//=======================================================================
     248unsigned long long moving_average(unsigned long long nval, unsigned long long oval) {
     249        const unsigned long long tw = 16;
     250        const unsigned long long nw = 4;
     251        const unsigned long long ow = tw - nw;
     252        return ((nw * nval) + (ow * oval)) / tw;
     253}
     254
    248255void ?{}(__ready_queue_t & this) with (this) {
    249256        #if defined(USE_CPU_WORK_STEALING)
     
    251258                lanes.data = alloc( lanes.count );
    252259                lanes.tscs = alloc( lanes.count );
     260                lanes.help = alloc( cpu_info.hthrd_count );
    253261
    254262                for( idx; (size_t)lanes.count ) {
    255263                        (lanes.data[idx]){};
    256264                        lanes.tscs[idx].tv = rdtscl();
     265                        lanes.tscs[idx].ma = rdtscl();
     266                }
     267                for( idx; (size_t)cpu_info.hthrd_count ) {
     268                        lanes.help[idx].src = 0;
     269                        lanes.help[idx].dst = 0;
     270                        lanes.help[idx].tri = 0;
    257271                }
    258272        #else
    259273                lanes.data  = 0p;
    260274                lanes.tscs  = 0p;
     275                lanes.help  = 0p;
    261276                lanes.count = 0;
    262277        #endif
     
    270285        free(lanes.data);
    271286        free(lanes.tscs);
     287        free(lanes.help);
    272288}
    273289
    274290//-----------------------------------------------------------------------
    275291#if defined(USE_CPU_WORK_STEALING)
    276         __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
     292        __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
    277293                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
    278294
    279295                processor * const proc = kernelTLS().this_processor;
    280                 const bool external = !push_local || (!proc) || (cltr != proc->cltr);
    281 
     296                const bool external = (!proc) || (cltr != proc->cltr);
     297
     298                // Figure out the current cpu and make sure it is valid
    282299                const int cpu = __kernel_getcpu();
    283300                /* paranoid */ verify(cpu >= 0);
     
    285302                /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);
    286303
    287                 const cpu_map_entry_t & map = cpu_info.llc_map[cpu];
     304                // Figure out where thread was last time and make sure it's
     305                /* paranoid */ verify(thrd->preferred >= 0);
     306                /* paranoid */ verify(thrd->preferred < cpu_info.hthrd_count);
     307                /* paranoid */ verify(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count);
     308                const int prf = thrd->preferred * READYQ_SHARD_FACTOR;
     309
     310                const cpu_map_entry_t & map;
     311                choose(hint) {
     312                        case UNPARK_LOCAL : &map = &cpu_info.llc_map[cpu];
     313                        case UNPARK_REMOTE: &map = &cpu_info.llc_map[prf];
     314                }
    288315                /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count);
    289316                /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count);
     
    296323                        if(unlikely(external)) { r = __tls_rand(); }
    297324                        else { r = proc->rdq.its++; }
    298                         i = start + (r % READYQ_SHARD_FACTOR);
     325                        choose(hint) {
     326                                case UNPARK_LOCAL : i = start + (r % READYQ_SHARD_FACTOR);
     327                                case UNPARK_REMOTE: i = prf   + (r % READYQ_SHARD_FACTOR);
     328                        }
    299329                        // If we can't lock it retry
    300330                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     
    316346
    317347        // Pop from the ready queue from a given cluster
    318         __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
     348        __attribute__((hot)) thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
    319349                /* paranoid */ verify( lanes.count > 0 );
    320350                /* paranoid */ verify( kernelTLS().this_processor );
     
    332362                processor * const proc = kernelTLS().this_processor;
    333363                const int start = map.self * READYQ_SHARD_FACTOR;
     364                const unsigned long long ctsc = rdtscl();
    334365
    335366                // Did we already have a help target
    336367                if(proc->rdq.target == -1u) {
    337                         // if We don't have a
    338                         unsigned long long min = ts(lanes.data[start]);
     368                        unsigned long long max = 0;
    339369                        for(i; READYQ_SHARD_FACTOR) {
    340                                 unsigned long long tsc = ts(lanes.data[start + i]);
    341                                 if(tsc < min) min = tsc;
    342                         }
    343                         proc->rdq.cutoff = min;
    344 
     370                                unsigned long long tsc = moving_average(ctsc - ts(lanes.data[start + i]), lanes.tscs[start + i].ma);
     371                                if(tsc > max) max = tsc;
     372                        }
     373                         proc->rdq.cutoff = (max + 2 * max) / 2;
    345374                        /* paranoid */ verify(lanes.count < 65536); // The following code assumes max 65536 cores.
    346375                        /* paranoid */ verify(map.count < 65536); // The following code assumes max 65536 cores.
    347376
    348                         if(0 == (__tls_rand() % 10_000)) {
     377                        if(0 == (__tls_rand() % 100)) {
    349378                                proc->rdq.target = __tls_rand() % lanes.count;
    350379                        } else {
     
    358387                }
    359388                else {
    360                         const unsigned long long bias = 0; //2_500_000_000;
    361                         const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
     389                        unsigned long long max = 0;
     390                        for(i; READYQ_SHARD_FACTOR) {
     391                                unsigned long long tsc = moving_average(ctsc - ts(lanes.data[start + i]), lanes.tscs[start + i].ma);
     392                                if(tsc > max) max = tsc;
     393                        }
     394                        const unsigned long long cutoff = (max + 2 * max) / 2;
    362395                        {
    363396                                unsigned target = proc->rdq.target;
    364397                                proc->rdq.target = -1u;
    365                                 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) {
    366                                         $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
     398                                lanes.help[target / READYQ_SHARD_FACTOR].tri++;
     399                                if(moving_average(ctsc - lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) {
     400                                        thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
    367401                                        proc->rdq.last = target;
    368402                                        if(t) return t;
     403                                        else proc->rdq.target = -1u;
    369404                                }
     405                                else proc->rdq.target = -1u;
    370406                        }
    371407
    372408                        unsigned last = proc->rdq.last;
    373409                        if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) < cutoff) {
    374                                 $thread * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help));
     410                                thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help));
    375411                                if(t) return t;
    376412                        }
     
    382418                for(READYQ_SHARD_FACTOR) {
    383419                        unsigned i = start + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
    384                         if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
     420                        if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
    385421                }
    386422
     
    389425        }
    390426
    391         __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
     427        __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
    392428                processor * const proc = kernelTLS().this_processor;
    393429                unsigned last = proc->rdq.last;
    394430                if(last != -1u) {
    395                         struct $thread * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.steal));
     431                        struct thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.steal));
    396432                        if(t) return t;
    397433                        proc->rdq.last = -1u;
     
    401437                return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
    402438        }
    403         __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) {
     439        __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) {
    404440                return search(cltr);
    405441        }
     
    428464        }
    429465
    430         __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
     466        __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
    431467                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
    432468
    433                 const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
     469                const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
    434470                /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
    435471
     
    475511
    476512        // Pop from the ready queue from a given cluster
    477         __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
     513        __attribute__((hot)) thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
    478514                /* paranoid */ verify( lanes.count > 0 );
    479515                /* paranoid */ verify( kernelTLS().this_processor );
     
    499535
    500536                        // try popping from the 2 picked lists
    501                         struct $thread * thrd = try_pop(cltr, i, j __STATS(, *(locali || localj ? &__tls_stats()->ready.pop.local : &__tls_stats()->ready.pop.help)));
     537                        struct thread$ * thrd = try_pop(cltr, i, j __STATS(, *(locali || localj ? &__tls_stats()->ready.pop.local : &__tls_stats()->ready.pop.help)));
    502538                        if(thrd) {
    503539                                return thrd;
     
    509545        }
    510546
    511         __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) { return pop_fast(cltr); }
    512         __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) {
     547        __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) { return pop_fast(cltr); }
     548        __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) {
    513549                return search(cltr);
    514550        }
    515551#endif
    516552#if defined(USE_WORK_STEALING)
    517         __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
     553        __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
    518554                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
    519555
    520556                // #define USE_PREFERRED
    521557                #if !defined(USE_PREFERRED)
    522                 const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
     558                const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
    523559                /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
    524560                #else
    525561                        unsigned preferred = thrd->preferred;
    526                         const bool external = push_local || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr;
     562                        const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr;
    527563                        /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count );
    528564
     
    569605
    570606        // Pop from the ready queue from a given cluster
    571         __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
     607        __attribute__((hot)) thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
    572608                /* paranoid */ verify( lanes.count > 0 );
    573609                /* paranoid */ verify( kernelTLS().this_processor );
     
    591627                        const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
    592628                        if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) {
    593                                 $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
     629                                thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
    594630                                if(t) return t;
    595631                        }
     
    598634                for(READYQ_SHARD_FACTOR) {
    599635                        unsigned i = proc->rdq.id + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
    600                         if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
     636                        if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
    601637                }
    602638                return 0p;
    603639        }
    604640
    605         __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
     641        __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
    606642                unsigned i = __tls_rand() % lanes.count;
    607643                return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
    608644        }
    609645
    610         __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) with (cltr->ready_queue) {
     646        __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) with (cltr->ready_queue) {
    611647                return search(cltr);
    612648        }
     
    621657//-----------------------------------------------------------------------
    622658// try to pop from a lane given by index w
    623 static inline struct $thread * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
     659static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
    624660        __STATS( stats.attempt++; )
    625661
     
    644680
    645681        // Actually pop the list
    646         struct $thread * thrd;
     682        struct thread$ * thrd;
     683        unsigned long long tsc_before = ts(lane);
    647684        unsigned long long tsv;
    648685        [thrd, tsv] = pop(lane);
     
    658695        __STATS( stats.success++; )
    659696
    660         #if defined(USE_WORK_STEALING)
     697        #if defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING)
     698                unsigned long long now = rdtscl();
    661699                lanes.tscs[w].tv = tsv;
     700                lanes.tscs[w].ma = moving_average(now > tsc_before ? now - tsc_before : 0, lanes.tscs[w].ma);
    662701        #endif
    663702
    664         thrd->preferred = w;
     703        #if defined(USE_CPU_WORK_STEALING)
     704                thrd->preferred = w / READYQ_SHARD_FACTOR;
     705        #else
     706                thrd->preferred = w;
     707        #endif
    665708
    666709        // return the popped thread
     
    671714// try to pop from any lanes making sure you don't miss any threads push
    672715// before the start of the function
    673 static inline struct $thread * search(struct cluster * cltr) with (cltr->ready_queue) {
     716static inline struct thread$ * search(struct cluster * cltr) with (cltr->ready_queue) {
    674717        /* paranoid */ verify( lanes.count > 0 );
    675718        unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
     
    677720        for(i; count) {
    678721                unsigned idx = (offset + i) % count;
    679                 struct $thread * thrd = try_pop(cltr, idx __STATS(, __tls_stats()->ready.pop.search));
     722                struct thread$ * thrd = try_pop(cltr, idx __STATS(, __tls_stats()->ready.pop.search));
    680723                if(thrd) {
    681724                        return thrd;
     
    685728        // All lanes where empty return 0p
    686729        return 0p;
     730}
     731
     732//-----------------------------------------------------------------------
     733// get preferred ready for new thread
     734unsigned ready_queue_new_preferred() {
     735        unsigned pref = 0;
     736        if(struct thread$ * thrd = publicTLS_get( this_thread )) {
     737                pref = thrd->preferred;
     738        }
     739        else {
     740                #if defined(USE_CPU_WORK_STEALING)
     741                        pref = __kernel_getcpu();
     742                #endif
     743        }
     744
     745        #if defined(USE_CPU_WORK_STEALING)
     746                /* paranoid */ verify(pref >= 0);
     747                /* paranoid */ verify(pref < cpu_info.hthrd_count);
     748        #endif
     749
     750        return pref;
    687751}
    688752
     
    712776//-----------------------------------------------------------------------
    713777// Given 2 indexes, pick the list with the oldest push an try to pop from it
    714 static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
     778static inline struct thread$ * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
    715779        // Pick the bet list
    716780        int w = i;
     
    847911                                // As long as we can pop from this lane to push the threads somewhere else in the queue
    848912                                while(!is_empty(lanes.data[idx])) {
    849                                         struct $thread * thrd;
     913                                        struct thread$ * thrd;
    850914                                        unsigned long long _;
    851915                                        [thrd, _] = pop(lanes.data[idx]);
     
    915979        extern void __enable_interrupts_hard();
    916980
    917         void __kernel_raw_rseq_register  (void) {
     981        static void __kernel_raw_rseq_register  (void) {
    918982                /* paranoid */ verify( __cfaabi_rseq.cpu_id == RSEQ_CPU_ID_UNINITIALIZED );
    919983
     
    933997        }
    934998
    935         void __kernel_raw_rseq_unregister(void) {
     999        static void __kernel_raw_rseq_unregister(void) {
    9361000                /* paranoid */ verify( __cfaabi_rseq.cpu_id >= 0 );
    9371001
Note: See TracChangeset for help on using the changeset viewer.