Changeset 12daa43 for libcfa


Ignore:
Timestamp:
Jun 17, 2021, 3:05:54 PM (3 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
cf85f96
Parents:
fde879b3
Message:

Added a define switch for using cpu workstealing.
Not Fully implemented.

Location:
libcfa/src/concurrency
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/kernel.cfa

    rfde879b3 r12daa43  
    280280
    281281                                // Spin a little on I/O, just in case
    282                                         for(5) {
     282                                for(5) {
    283283                                        __maybe_io_drain( this );
    284284                                        readyThread = pop_fast( this->cltr );
     
    287287
    288288                                // no luck, try stealing a few times
    289                                         for(5) {
     289                                for(5) {
    290290                                        if( __maybe_io_drain( this ) ) {
    291291                                                readyThread = pop_fast( this->cltr );
  • libcfa/src/concurrency/kernel.hfa

    rfde879b3 r12daa43  
    6666                unsigned id;
    6767                unsigned target;
     68                unsigned last;
    6869                unsigned long long int cutoff;
    6970        } rdq;
  • libcfa/src/concurrency/kernel/startup.cfa

    rfde879b3 r12daa43  
    541541        this.rdq.id  = -1u;
    542542        this.rdq.target = -1u;
     543        this.rdq.last = -1u;
    543544        this.rdq.cutoff = 0ull;
    544545        do_terminate = false;
  • libcfa/src/concurrency/ready_queue.cfa

    rfde879b3 r12daa43  
    2020
    2121
    22 #define USE_RELAXED_FIFO
     22// #define USE_RELAXED_FIFO
    2323// #define USE_WORK_STEALING
    2424
    2525#include "bits/defs.hfa"
     26#include "device/cpu.hfa"
    2627#include "kernel_private.hfa"
    2728
     
    4748#endif
    4849
    49 #if   defined(USE_RELAXED_FIFO)
     50#if   defined(USE_CPU_WORK_STEALING)
     51        #define READYQ_SHARD_FACTOR 2
     52#elif defined(USE_RELAXED_FIFO)
    5053        #define BIAS 4
    5154        #define READYQ_SHARD_FACTOR 4
     
    215218//=======================================================================
    216219void ?{}(__ready_queue_t & this) with (this) {
    217         lanes.data  = 0p;
    218         lanes.tscs  = 0p;
    219         lanes.count = 0;
     220        #if defined(USE_CPU_WORK_STEALING)
     221                lanes.count = cpu_info.hthrd_count * READYQ_SHARD_FACTOR;
     222                lanes.data = alloc( lanes.count );
     223                lanes.tscs = alloc( lanes.count );
     224
     225                for( idx; (size_t)lanes.count ) {
     226                        (lanes.data[idx]){};
     227                        lanes.tscs[idx].tv = rdtscl();
     228                }
     229        #else
     230                lanes.data  = 0p;
     231                lanes.tscs  = 0p;
     232                lanes.count = 0;
     233        #endif
    220234}
    221235
    222236void ^?{}(__ready_queue_t & this) with (this) {
    223         verify( SEQUENTIAL_SHARD == lanes.count );
     237        #if !defined(USE_CPU_WORK_STEALING)
     238                verify( SEQUENTIAL_SHARD == lanes.count );
     239        #endif
     240
    224241        free(lanes.data);
    225242        free(lanes.tscs);
     
    227244
    228245//-----------------------------------------------------------------------
     246#if defined(USE_CPU_WORK_STEALING)
     247        __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
     248                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
     249
     250                processor * const proc = kernelTLS().this_processor;
     251                const bool external = !push_local || (!proc) || (cltr != proc->cltr);
     252
     253                const int cpu = __kernel_getcpu();
     254                /* paranoid */ verify(cpu >= 0);
     255                /* paranoid */ verify(cpu < cpu_info.hthrd_count);
     256                /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);
     257
     258                const int start = cpu * READYQ_SHARD_FACTOR;
     259                unsigned i;
     260                do {
     261                        unsigned r;
     262                        if(unlikely(external)) { r = __tls_rand(); }
     263                        else { r = proc->rdq.its++; }
     264                        i = start + (r % READYQ_SHARD_FACTOR);
     265                        // If we can't lock it retry
     266                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     267
     268                // Actually push it
     269                push(lanes.data[i], thrd);
     270
     271                // Unlock and return
     272                __atomic_unlock( &lanes.data[i].lock );
     273
     274                #if !defined(__CFA_NO_STATISTICS__)
     275                        if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
     276                        else __tls_stats()->ready.push.local.success++;
     277                #endif
     278
     279                __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
     280
     281        }
     282
     283        // Pop from the ready queue from a given cluster
     284        __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
     285                /* paranoid */ verify( lanes.count > 0 );
     286                /* paranoid */ verify( kernelTLS().this_processor );
     287
     288                const int cpu = __kernel_getcpu();
     289                /* paranoid */ verify(cpu >= 0);
     290                /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);
     291                /* paranoid */ verify(cpu < cpu_info.hthrd_count);
     292
     293                processor * const proc = kernelTLS().this_processor;
     294                const int start = cpu * READYQ_SHARD_FACTOR;
     295
     296                // Did we already have a help target
     297                if(proc->rdq.target == -1u) {
     298                        // if We don't have a
     299                        unsigned long long min = ts(lanes.data[start]);
     300                        for(i; READYQ_SHARD_FACTOR) {
     301                                unsigned long long tsc = ts(lanes.data[start + i]);
     302                                if(tsc < min) min = tsc;
     303                        }
     304                        proc->rdq.cutoff = min;
     305                        proc->rdq.target = __tls_rand() % lanes.count;
     306                }
     307                else {
     308                        const unsigned long long bias = 0; //2_500_000_000;
     309                        const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
     310                        {
     311                                unsigned target = proc->rdq.target;
     312                                proc->rdq.target = -1u;
     313                                if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) {
     314                                        $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
     315                                        proc->rdq.last = target;
     316                                        if(t) return t;
     317                                }
     318                        }
     319
     320                        unsigned last = proc->rdq.last;
     321                        if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) < cutoff) {
     322                                $thread * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help));
     323                                if(t) return t;
     324                        }
     325                        else {
     326                                proc->rdq.last = -1u;
     327                        }
     328                }
     329
     330                for(READYQ_SHARD_FACTOR) {
     331                        unsigned i = start + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
     332                        if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
     333                }
     334
     335                // All lanes where empty return 0p
     336                return 0p;
     337        }
     338
     339        __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
     340                processor * const proc = kernelTLS().this_processor;
     341                unsigned last = proc->rdq.last;
     342
     343                unsigned i = __tls_rand() % lanes.count;
     344                return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
     345        }
     346        __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) {
     347                return search(cltr);
     348        }
     349#endif
    229350#if defined(USE_RELAXED_FIFO)
    230351        //-----------------------------------------------------------------------
     
    580701}
    581702
    582 // Grow the ready queue
    583 void ready_queue_grow(struct cluster * cltr) {
    584         size_t ncount;
    585         int target = cltr->procs.total;
    586 
    587         /* paranoid */ verify( ready_mutate_islocked() );
    588         __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
    589 
    590         // Make sure that everything is consistent
    591         /* paranoid */ check( cltr->ready_queue );
    592 
    593         // grow the ready queue
    594         with( cltr->ready_queue ) {
    595                 // Find new count
    596                 // Make sure we always have atleast 1 list
    597                 if(target >= 2) {
    598                         ncount = target * READYQ_SHARD_FACTOR;
    599                 } else {
    600                         ncount = SEQUENTIAL_SHARD;
    601                 }
    602 
    603                 // Allocate new array (uses realloc and memcpies the data)
    604                 lanes.data = alloc( ncount, lanes.data`realloc );
    605 
    606                 // Fix the moved data
    607                 for( idx; (size_t)lanes.count ) {
    608                         fix(lanes.data[idx]);
    609                 }
    610 
    611                 // Construct new data
    612                 for( idx; (size_t)lanes.count ~ ncount) {
    613                         (lanes.data[idx]){};
    614                 }
    615 
    616                 // Update original
    617                 lanes.count = ncount;
    618         }
    619 
    620         fix_times(cltr);
    621 
    622         reassign_cltr_id(cltr);
    623 
    624         // Make sure that everything is consistent
    625         /* paranoid */ check( cltr->ready_queue );
    626 
    627         __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
    628 
    629         /* paranoid */ verify( ready_mutate_islocked() );
    630 }
    631 
    632 // Shrink the ready queue
    633 void ready_queue_shrink(struct cluster * cltr) {
    634         /* paranoid */ verify( ready_mutate_islocked() );
    635         __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
    636 
    637         // Make sure that everything is consistent
    638         /* paranoid */ check( cltr->ready_queue );
    639 
    640         int target = cltr->procs.total;
    641 
    642         with( cltr->ready_queue ) {
    643                 // Remember old count
    644                 size_t ocount = lanes.count;
    645 
    646                 // Find new count
    647                 // Make sure we always have atleast 1 list
    648                 lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD;
    649                 /* paranoid */ verify( ocount >= lanes.count );
    650                 /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 );
    651 
    652                 // for printing count the number of displaced threads
    653                 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
    654                         __attribute__((unused)) size_t displaced = 0;
    655                 #endif
    656 
    657                 // redistribute old data
    658                 for( idx; (size_t)lanes.count ~ ocount) {
    659                         // Lock is not strictly needed but makes checking invariants much easier
    660                         __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
    661                         verify(locked);
    662 
    663                         // As long as we can pop from this lane to push the threads somewhere else in the queue
    664                         while(!is_empty(lanes.data[idx])) {
    665                                 struct $thread * thrd;
    666                                 unsigned long long _;
    667                                 [thrd, _] = pop(lanes.data[idx]);
    668 
    669                                 push(cltr, thrd, true);
    670 
    671                                 // for printing count the number of displaced threads
    672                                 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
    673                                         displaced++;
    674                                 #endif
    675                         }
    676 
    677                         // Unlock the lane
    678                         __atomic_unlock(&lanes.data[idx].lock);
    679 
    680                         // TODO print the queue statistics here
    681 
    682                         ^(lanes.data[idx]){};
    683                 }
    684 
    685                 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
    686 
    687                 // Allocate new array (uses realloc and memcpies the data)
    688                 lanes.data = alloc( lanes.count, lanes.data`realloc );
    689 
    690                 // Fix the moved data
    691                 for( idx; (size_t)lanes.count ) {
    692                         fix(lanes.data[idx]);
    693                 }
    694         }
    695 
    696         fix_times(cltr);
    697 
    698         reassign_cltr_id(cltr);
    699 
    700         // Make sure that everything is consistent
    701         /* paranoid */ check( cltr->ready_queue );
    702 
    703         __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
    704         /* paranoid */ verify( ready_mutate_islocked() );
    705 }
     703#if defined(USE_CPU_WORK_STEALING)
     704        // ready_queue size is fixed in this case
     705        void ready_queue_grow(struct cluster * cltr) {}
     706        void ready_queue_shrink(struct cluster * cltr) {}
     707#else
     708        // Grow the ready queue
     709        void ready_queue_grow(struct cluster * cltr) {
     710                size_t ncount;
     711                int target = cltr->procs.total;
     712
     713                /* paranoid */ verify( ready_mutate_islocked() );
     714                __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
     715
     716                // Make sure that everything is consistent
     717                /* paranoid */ check( cltr->ready_queue );
     718
     719                // grow the ready queue
     720                with( cltr->ready_queue ) {
     721                        // Find new count
     722                        // Make sure we always have atleast 1 list
     723                        if(target >= 2) {
     724                                ncount = target * READYQ_SHARD_FACTOR;
     725                        } else {
     726                                ncount = SEQUENTIAL_SHARD;
     727                        }
     728
     729                        // Allocate new array (uses realloc and memcpies the data)
     730                        lanes.data = alloc( ncount, lanes.data`realloc );
     731
     732                        // Fix the moved data
     733                        for( idx; (size_t)lanes.count ) {
     734                                fix(lanes.data[idx]);
     735                        }
     736
     737                        // Construct new data
     738                        for( idx; (size_t)lanes.count ~ ncount) {
     739                                (lanes.data[idx]){};
     740                        }
     741
     742                        // Update original
     743                        lanes.count = ncount;
     744                }
     745
     746                fix_times(cltr);
     747
     748                reassign_cltr_id(cltr);
     749
     750                // Make sure that everything is consistent
     751                /* paranoid */ check( cltr->ready_queue );
     752
     753                __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
     754
     755                /* paranoid */ verify( ready_mutate_islocked() );
     756        }
     757
     758        // Shrink the ready queue
     759        void ready_queue_shrink(struct cluster * cltr) {
     760                /* paranoid */ verify( ready_mutate_islocked() );
     761                __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
     762
     763                // Make sure that everything is consistent
     764                /* paranoid */ check( cltr->ready_queue );
     765
     766                int target = cltr->procs.total;
     767
     768                with( cltr->ready_queue ) {
     769                        // Remember old count
     770                        size_t ocount = lanes.count;
     771
     772                        // Find new count
     773                        // Make sure we always have atleast 1 list
     774                        lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD;
     775                        /* paranoid */ verify( ocount >= lanes.count );
     776                        /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 );
     777
     778                        // for printing count the number of displaced threads
     779                        #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
     780                                __attribute__((unused)) size_t displaced = 0;
     781                        #endif
     782
     783                        // redistribute old data
     784                        for( idx; (size_t)lanes.count ~ ocount) {
     785                                // Lock is not strictly needed but makes checking invariants much easier
     786                                __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
     787                                verify(locked);
     788
     789                                // As long as we can pop from this lane to push the threads somewhere else in the queue
     790                                while(!is_empty(lanes.data[idx])) {
     791                                        struct $thread * thrd;
     792                                        unsigned long long _;
     793                                        [thrd, _] = pop(lanes.data[idx]);
     794
     795                                        push(cltr, thrd, true);
     796
     797                                        // for printing count the number of displaced threads
     798                                        #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
     799                                                displaced++;
     800                                        #endif
     801                                }
     802
     803                                // Unlock the lane
     804                                __atomic_unlock(&lanes.data[idx].lock);
     805
     806                                // TODO print the queue statistics here
     807
     808                                ^(lanes.data[idx]){};
     809                        }
     810
     811                        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
     812
     813                        // Allocate new array (uses realloc and memcpies the data)
     814                        lanes.data = alloc( lanes.count, lanes.data`realloc );
     815
     816                        // Fix the moved data
     817                        for( idx; (size_t)lanes.count ) {
     818                                fix(lanes.data[idx]);
     819                        }
     820                }
     821
     822                fix_times(cltr);
     823
     824                reassign_cltr_id(cltr);
     825
     826                // Make sure that everything is consistent
     827                /* paranoid */ check( cltr->ready_queue );
     828
     829                __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
     830                /* paranoid */ verify( ready_mutate_islocked() );
     831        }
     832#endif
    706833
    707834#if !defined(__CFA_NO_STATISTICS__)
Note: See TracChangeset for help on using the changeset viewer.