Ignore:
Timestamp:
Jun 19, 2021, 3:53:18 PM (5 years ago)
Author:
Peter A. Buhr <pabuhr@…>
Branches:
ADT, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum, stuck-waitfor-destruct
Children:
15f769c
Parents:
6992f95 (diff), c7d8696a (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Location:
libcfa/src/concurrency
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/kernel.cfa

    r6992f95 re319fc5  
    280280
    281281                                // Spin a little on I/O, just in case
    282                                         for(5) {
     282                                for(5) {
    283283                                        __maybe_io_drain( this );
    284284                                        readyThread = pop_fast( this->cltr );
     
    287287
    288288                                // no luck, try stealing a few times
    289                                         for(5) {
     289                                for(5) {
    290290                                        if( __maybe_io_drain( this ) ) {
    291291                                                readyThread = pop_fast( this->cltr );
  • libcfa/src/concurrency/kernel.hfa

    r6992f95 re319fc5  
    6666                unsigned id;
    6767                unsigned target;
     68                unsigned last;
    6869                unsigned long long int cutoff;
    6970        } rdq;
  • libcfa/src/concurrency/kernel/startup.cfa

    r6992f95 re319fc5  
    541541        this.rdq.id  = -1u;
    542542        this.rdq.target = -1u;
     543        this.rdq.last = -1u;
    543544        this.rdq.cutoff = 0ull;
    544545        do_terminate = false;
  • libcfa/src/concurrency/ready_queue.cfa

    r6992f95 re319fc5  
    2424
    2525#include "bits/defs.hfa"
     26#include "device/cpu.hfa"
    2627#include "kernel_private.hfa"
    2728
     
    4748#endif
    4849
    49 #if   defined(USE_RELAXED_FIFO)
     50#if   defined(USE_CPU_WORK_STEALING)
     51        #define READYQ_SHARD_FACTOR 2
     52#elif defined(USE_RELAXED_FIFO)
    5053        #define BIAS 4
    5154        #define READYQ_SHARD_FACTOR 4
     
    215218//=======================================================================
    216219void ?{}(__ready_queue_t & this) with (this) {
    217         lanes.data  = 0p;
    218         lanes.tscs  = 0p;
    219         lanes.count = 0;
     220        #if defined(USE_CPU_WORK_STEALING)
     221                lanes.count = cpu_info.hthrd_count * READYQ_SHARD_FACTOR;
     222                lanes.data = alloc( lanes.count );
     223                lanes.tscs = alloc( lanes.count );
     224
     225                for( idx; (size_t)lanes.count ) {
     226                        (lanes.data[idx]){};
     227                        lanes.tscs[idx].tv = rdtscl();
     228                }
     229        #else
     230                lanes.data  = 0p;
     231                lanes.tscs  = 0p;
     232                lanes.count = 0;
     233        #endif
    220234}
    221235
    222236void ^?{}(__ready_queue_t & this) with (this) {
    223         verify( SEQUENTIAL_SHARD == lanes.count );
     237        #if !defined(USE_CPU_WORK_STEALING)
     238                verify( SEQUENTIAL_SHARD == lanes.count );
     239        #endif
     240
    224241        free(lanes.data);
    225242        free(lanes.tscs);
     
    227244
    228245//-----------------------------------------------------------------------
     246#if defined(USE_CPU_WORK_STEALING)
     247        __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
     248                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
     249
     250                processor * const proc = kernelTLS().this_processor;
     251                const bool external = !push_local || (!proc) || (cltr != proc->cltr);
     252
     253                const int cpu = __kernel_getcpu();
     254                /* paranoid */ verify(cpu >= 0);
     255                /* paranoid */ verify(cpu < cpu_info.hthrd_count);
     256                /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);
     257
     258                const cpu_map_entry_t & map = cpu_info.llc_map[cpu];
     259                /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count);
     260                /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count);
     261                /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %u lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR);
     262
     263                const int start = map.self * READYQ_SHARD_FACTOR;
     264                unsigned i;
     265                do {
     266                        unsigned r;
     267                        if(unlikely(external)) { r = __tls_rand(); }
     268                        else { r = proc->rdq.its++; }
     269                        i = start + (r % READYQ_SHARD_FACTOR);
     270                        // If we can't lock it retry
     271                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     272
     273                // Actually push it
     274                push(lanes.data[i], thrd);
     275
     276                // Unlock and return
     277                __atomic_unlock( &lanes.data[i].lock );
     278
     279                #if !defined(__CFA_NO_STATISTICS__)
     280                        if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
     281                        else __tls_stats()->ready.push.local.success++;
     282                #endif
     283
     284                __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
     285
     286        }
     287
     288        // Pop from the ready queue from a given cluster
     289        __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
     290                /* paranoid */ verify( lanes.count > 0 );
     291                /* paranoid */ verify( kernelTLS().this_processor );
     292
     293                const int cpu = __kernel_getcpu();
     294                /* paranoid */ verify(cpu >= 0);
     295                /* paranoid */ verify(cpu < cpu_info.hthrd_count);
     296                /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);
     297
     298                const cpu_map_entry_t & map = cpu_info.llc_map[cpu];
     299                /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count);
     300                /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count);
     301                /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %u lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR);
     302
     303                processor * const proc = kernelTLS().this_processor;
     304                const int start = map.self * READYQ_SHARD_FACTOR;
     305
     306                // Did we already have a help target
     307                if(proc->rdq.target == -1u) {
     308                        // if We don't have a
     309                        unsigned long long min = ts(lanes.data[start]);
     310                        for(i; READYQ_SHARD_FACTOR) {
     311                                unsigned long long tsc = ts(lanes.data[start + i]);
     312                                if(tsc < min) min = tsc;
     313                        }
     314                        proc->rdq.cutoff = min;
     315                        proc->rdq.target = (map.start * READYQ_SHARD_FACTOR) + (__tls_rand() % (map.count* READYQ_SHARD_FACTOR));
     316                }
     317                else {
     318                        const unsigned long long bias = 0; //2_500_000_000;
     319                        const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
     320                        {
     321                                unsigned target = proc->rdq.target;
     322                                proc->rdq.target = -1u;
     323                                if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) {
     324                                        $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
     325                                        proc->rdq.last = target;
     326                                        if(t) return t;
     327                                }
     328                        }
     329
     330                        unsigned last = proc->rdq.last;
     331                        if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) < cutoff) {
     332                                $thread * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help));
     333                                if(t) return t;
     334                        }
     335                        else {
     336                                proc->rdq.last = -1u;
     337                        }
     338                }
     339
     340                for(READYQ_SHARD_FACTOR) {
     341                        unsigned i = start + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
     342                        if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
     343                }
     344
     345                // All lanes where empty return 0p
     346                return 0p;
     347        }
     348
     349        __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
     350                processor * const proc = kernelTLS().this_processor;
     351                unsigned last = proc->rdq.last;
     352
     353                unsigned i = __tls_rand() % lanes.count;
     354                return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
     355        }
     356        __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) {
     357                return search(cltr);
     358        }
     359#endif
    229360#if defined(USE_RELAXED_FIFO)
    230361        //-----------------------------------------------------------------------
     
    580711}
    581712
    582 // Grow the ready queue
    583 void ready_queue_grow(struct cluster * cltr) {
    584         size_t ncount;
    585         int target = cltr->procs.total;
    586 
    587         /* paranoid */ verify( ready_mutate_islocked() );
    588         __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
    589 
    590         // Make sure that everything is consistent
    591         /* paranoid */ check( cltr->ready_queue );
    592 
    593         // grow the ready queue
    594         with( cltr->ready_queue ) {
    595                 // Find new count
    596                 // Make sure we always have atleast 1 list
    597                 if(target >= 2) {
    598                         ncount = target * READYQ_SHARD_FACTOR;
    599                 } else {
    600                         ncount = SEQUENTIAL_SHARD;
    601                 }
    602 
    603                 // Allocate new array (uses realloc and memcpies the data)
    604                 lanes.data = alloc( ncount, lanes.data`realloc );
    605 
    606                 // Fix the moved data
    607                 for( idx; (size_t)lanes.count ) {
    608                         fix(lanes.data[idx]);
    609                 }
    610 
    611                 // Construct new data
    612                 for( idx; (size_t)lanes.count ~ ncount) {
    613                         (lanes.data[idx]){};
    614                 }
    615 
    616                 // Update original
    617                 lanes.count = ncount;
    618         }
    619 
    620         fix_times(cltr);
    621 
    622         reassign_cltr_id(cltr);
    623 
    624         // Make sure that everything is consistent
    625         /* paranoid */ check( cltr->ready_queue );
    626 
    627         __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
    628 
    629         /* paranoid */ verify( ready_mutate_islocked() );
    630 }
    631 
    632 // Shrink the ready queue
    633 void ready_queue_shrink(struct cluster * cltr) {
    634         /* paranoid */ verify( ready_mutate_islocked() );
    635         __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
    636 
    637         // Make sure that everything is consistent
    638         /* paranoid */ check( cltr->ready_queue );
    639 
    640         int target = cltr->procs.total;
    641 
    642         with( cltr->ready_queue ) {
    643                 // Remember old count
    644                 size_t ocount = lanes.count;
    645 
    646                 // Find new count
    647                 // Make sure we always have atleast 1 list
    648                 lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD;
    649                 /* paranoid */ verify( ocount >= lanes.count );
    650                 /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 );
    651 
    652                 // for printing count the number of displaced threads
    653                 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
    654                         __attribute__((unused)) size_t displaced = 0;
    655                 #endif
    656 
    657                 // redistribute old data
    658                 for( idx; (size_t)lanes.count ~ ocount) {
    659                         // Lock is not strictly needed but makes checking invariants much easier
    660                         __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
    661                         verify(locked);
    662 
    663                         // As long as we can pop from this lane to push the threads somewhere else in the queue
    664                         while(!is_empty(lanes.data[idx])) {
    665                                 struct $thread * thrd;
    666                                 unsigned long long _;
    667                                 [thrd, _] = pop(lanes.data[idx]);
    668 
    669                                 push(cltr, thrd, true);
    670 
    671                                 // for printing count the number of displaced threads
    672                                 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
    673                                         displaced++;
    674                                 #endif
    675                         }
    676 
    677                         // Unlock the lane
    678                         __atomic_unlock(&lanes.data[idx].lock);
    679 
    680                         // TODO print the queue statistics here
    681 
    682                         ^(lanes.data[idx]){};
    683                 }
    684 
    685                 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
    686 
    687                 // Allocate new array (uses realloc and memcpies the data)
    688                 lanes.data = alloc( lanes.count, lanes.data`realloc );
    689 
    690                 // Fix the moved data
    691                 for( idx; (size_t)lanes.count ) {
    692                         fix(lanes.data[idx]);
    693                 }
    694         }
    695 
    696         fix_times(cltr);
    697 
    698         reassign_cltr_id(cltr);
    699 
    700         // Make sure that everything is consistent
    701         /* paranoid */ check( cltr->ready_queue );
    702 
    703         __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
    704         /* paranoid */ verify( ready_mutate_islocked() );
    705 }
     713#if defined(USE_CPU_WORK_STEALING)
     714        // ready_queue size is fixed in this case
     715        void ready_queue_grow(struct cluster * cltr) {}
     716        void ready_queue_shrink(struct cluster * cltr) {}
     717#else
     718        // Grow the ready queue
     719        void ready_queue_grow(struct cluster * cltr) {
     720                size_t ncount;
     721                int target = cltr->procs.total;
     722
     723                /* paranoid */ verify( ready_mutate_islocked() );
     724                __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
     725
     726                // Make sure that everything is consistent
     727                /* paranoid */ check( cltr->ready_queue );
     728
     729                // grow the ready queue
     730                with( cltr->ready_queue ) {
     731                        // Find new count
     732                        // Make sure we always have atleast 1 list
     733                        if(target >= 2) {
     734                                ncount = target * READYQ_SHARD_FACTOR;
     735                        } else {
     736                                ncount = SEQUENTIAL_SHARD;
     737                        }
     738
     739                        // Allocate new array (uses realloc and memcpies the data)
     740                        lanes.data = alloc( ncount, lanes.data`realloc );
     741
     742                        // Fix the moved data
     743                        for( idx; (size_t)lanes.count ) {
     744                                fix(lanes.data[idx]);
     745                        }
     746
     747                        // Construct new data
     748                        for( idx; (size_t)lanes.count ~ ncount) {
     749                                (lanes.data[idx]){};
     750                        }
     751
     752                        // Update original
     753                        lanes.count = ncount;
     754                }
     755
     756                fix_times(cltr);
     757
     758                reassign_cltr_id(cltr);
     759
     760                // Make sure that everything is consistent
     761                /* paranoid */ check( cltr->ready_queue );
     762
     763                __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
     764
     765                /* paranoid */ verify( ready_mutate_islocked() );
     766        }
     767
     768        // Shrink the ready queue
     769        void ready_queue_shrink(struct cluster * cltr) {
     770                /* paranoid */ verify( ready_mutate_islocked() );
     771                __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
     772
     773                // Make sure that everything is consistent
     774                /* paranoid */ check( cltr->ready_queue );
     775
     776                int target = cltr->procs.total;
     777
     778                with( cltr->ready_queue ) {
     779                        // Remember old count
     780                        size_t ocount = lanes.count;
     781
     782                        // Find new count
     783                        // Make sure we always have atleast 1 list
     784                        lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD;
     785                        /* paranoid */ verify( ocount >= lanes.count );
     786                        /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 );
     787
     788                        // for printing count the number of displaced threads
     789                        #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
     790                                __attribute__((unused)) size_t displaced = 0;
     791                        #endif
     792
     793                        // redistribute old data
     794                        for( idx; (size_t)lanes.count ~ ocount) {
     795                                // Lock is not strictly needed but makes checking invariants much easier
     796                                __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
     797                                verify(locked);
     798
     799                                // As long as we can pop from this lane to push the threads somewhere else in the queue
     800                                while(!is_empty(lanes.data[idx])) {
     801                                        struct $thread * thrd;
     802                                        unsigned long long _;
     803                                        [thrd, _] = pop(lanes.data[idx]);
     804
     805                                        push(cltr, thrd, true);
     806
     807                                        // for printing count the number of displaced threads
     808                                        #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
     809                                                displaced++;
     810                                        #endif
     811                                }
     812
     813                                // Unlock the lane
     814                                __atomic_unlock(&lanes.data[idx].lock);
     815
     816                                // TODO print the queue statistics here
     817
     818                                ^(lanes.data[idx]){};
     819                        }
     820
     821                        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
     822
     823                        // Allocate new array (uses realloc and memcpies the data)
     824                        lanes.data = alloc( lanes.count, lanes.data`realloc );
     825
     826                        // Fix the moved data
     827                        for( idx; (size_t)lanes.count ) {
     828                                fix(lanes.data[idx]);
     829                        }
     830                }
     831
     832                fix_times(cltr);
     833
     834                reassign_cltr_id(cltr);
     835
     836                // Make sure that everything is consistent
     837                /* paranoid */ check( cltr->ready_queue );
     838
     839                __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
     840                /* paranoid */ verify( ready_mutate_islocked() );
     841        }
     842#endif
    706843
    707844#if !defined(__CFA_NO_STATISTICS__)
Note: See TracChangeset for help on using the changeset viewer.