Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/ready_queue.cfa

    r07b4970 r43784ac  
    2424
    2525#include "bits/defs.hfa"
    26 #include "device/cpu.hfa"
    2726#include "kernel_private.hfa"
    2827
     
    4847#endif
    4948
    50 #if   defined(USE_CPU_WORK_STEALING)
    51         #define READYQ_SHARD_FACTOR 2
    52 #elif defined(USE_RELAXED_FIFO)
     49#if   defined(USE_RELAXED_FIFO)
    5350        #define BIAS 4
    5451        #define READYQ_SHARD_FACTOR 4
     
    218215//=======================================================================
    219216void ?{}(__ready_queue_t & this) with (this) {
    220         #if defined(USE_CPU_WORK_STEALING)
    221                 lanes.count = cpu_info.hthrd_count * READYQ_SHARD_FACTOR;
    222                 lanes.data = alloc( lanes.count );
    223                 lanes.tscs = alloc( lanes.count );
    224 
    225                 for( idx; (size_t)lanes.count ) {
    226                         (lanes.data[idx]){};
    227                         lanes.tscs[idx].tv = rdtscl();
    228                 }
    229         #else
    230                 lanes.data  = 0p;
    231                 lanes.tscs  = 0p;
    232                 lanes.count = 0;
    233         #endif
     217        lanes.data  = 0p;
     218        lanes.tscs  = 0p;
     219        lanes.count = 0;
    234220}
    235221
    236222void ^?{}(__ready_queue_t & this) with (this) {
    237         #if !defined(USE_CPU_WORK_STEALING)
    238                 verify( SEQUENTIAL_SHARD == lanes.count );
    239         #endif
    240 
     223        verify( SEQUENTIAL_SHARD == lanes.count );
    241224        free(lanes.data);
    242225        free(lanes.tscs);
     
    244227
    245228//-----------------------------------------------------------------------
    246 #if defined(USE_CPU_WORK_STEALING)
    247         __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
    248                 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
    249 
    250                 processor * const proc = kernelTLS().this_processor;
    251                 const bool external = !push_local || (!proc) || (cltr != proc->cltr);
    252 
    253                 const int cpu = __kernel_getcpu();
    254                 /* paranoid */ verify(cpu >= 0);
    255                 /* paranoid */ verify(cpu < cpu_info.hthrd_count);
    256                 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);
    257 
    258                 const cpu_map_entry_t & map = cpu_info.llc_map[cpu];
    259                 /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count);
    260                 /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count);
    261                 /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %u lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR);
    262 
    263                 const int start = map.self * READYQ_SHARD_FACTOR;
    264                 unsigned i;
    265                 do {
    266                         unsigned r;
    267                         if(unlikely(external)) { r = __tls_rand(); }
    268                         else { r = proc->rdq.its++; }
    269                         i = start + (r % READYQ_SHARD_FACTOR);
    270                         // If we can't lock it retry
    271                 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
    272 
    273                 // Actually push it
    274                 push(lanes.data[i], thrd);
    275 
    276                 // Unlock and return
    277                 __atomic_unlock( &lanes.data[i].lock );
    278 
    279                 #if !defined(__CFA_NO_STATISTICS__)
    280                         if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
    281                         else __tls_stats()->ready.push.local.success++;
    282                 #endif
    283 
    284                 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
    285 
    286         }
    287 
    288         // Pop from the ready queue from a given cluster
    289         __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
    290                 /* paranoid */ verify( lanes.count > 0 );
    291                 /* paranoid */ verify( kernelTLS().this_processor );
    292 
    293                 const int cpu = __kernel_getcpu();
    294                 /* paranoid */ verify(cpu >= 0);
    295                 /* paranoid */ verify(cpu < cpu_info.hthrd_count);
    296                 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);
    297 
    298                 const cpu_map_entry_t & map = cpu_info.llc_map[cpu];
    299                 /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count);
    300                 /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count);
    301                 /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %u lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR);
    302 
    303                 processor * const proc = kernelTLS().this_processor;
    304                 const int start = map.self * READYQ_SHARD_FACTOR;
    305 
    306                 // Did we already have a help target
    307                 if(proc->rdq.target == -1u) {
    308                         // if We don't have a
    309                         unsigned long long min = ts(lanes.data[start]);
    310                         for(i; READYQ_SHARD_FACTOR) {
    311                                 unsigned long long tsc = ts(lanes.data[start + i]);
    312                                 if(tsc < min) min = tsc;
    313                         }
    314                         proc->rdq.cutoff = min;
    315                         proc->rdq.target = (map.start * READYQ_SHARD_FACTOR) + (__tls_rand() % (map.count* READYQ_SHARD_FACTOR));
    316                 }
    317                 else {
    318                         const unsigned long long bias = 0; //2_500_000_000;
    319                         const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
    320                         {
    321                                 unsigned target = proc->rdq.target;
    322                                 proc->rdq.target = -1u;
    323                                 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) {
    324                                         $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
    325                                         proc->rdq.last = target;
    326                                         if(t) return t;
    327                                 }
    328                         }
    329 
    330                         unsigned last = proc->rdq.last;
    331                         if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) < cutoff) {
    332                                 $thread * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help));
    333                                 if(t) return t;
    334                         }
    335                         else {
    336                                 proc->rdq.last = -1u;
    337                         }
    338                 }
    339 
    340                 for(READYQ_SHARD_FACTOR) {
    341                         unsigned i = start + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
    342                         if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
    343                 }
    344 
    345                 // All lanes where empty return 0p
    346                 return 0p;
    347         }
    348 
    349         __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
    350                 processor * const proc = kernelTLS().this_processor;
    351                 unsigned last = proc->rdq.last;
    352 
    353                 unsigned i = __tls_rand() % lanes.count;
    354                 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
    355         }
    356         __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) {
    357                 return search(cltr);
    358         }
    359 #endif
    360229#if defined(USE_RELAXED_FIFO)
    361230        //-----------------------------------------------------------------------
     
    711580}
    712581
    713 #if defined(USE_CPU_WORK_STEALING)
    714         // ready_queue size is fixed in this case
    715         void ready_queue_grow(struct cluster * cltr) {}
    716         void ready_queue_shrink(struct cluster * cltr) {}
    717 #else
    718         // Grow the ready queue
    719         void ready_queue_grow(struct cluster * cltr) {
    720                 size_t ncount;
    721                 int target = cltr->procs.total;
    722 
    723                 /* paranoid */ verify( ready_mutate_islocked() );
    724                 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
    725 
    726                 // Make sure that everything is consistent
    727                 /* paranoid */ check( cltr->ready_queue );
    728 
    729                 // grow the ready queue
    730                 with( cltr->ready_queue ) {
    731                         // Find new count
    732                         // Make sure we always have atleast 1 list
    733                         if(target >= 2) {
    734                                 ncount = target * READYQ_SHARD_FACTOR;
    735                         } else {
    736                                 ncount = SEQUENTIAL_SHARD;
     582// Grow the ready queue
     583void ready_queue_grow(struct cluster * cltr) {
     584        size_t ncount;
     585        int target = cltr->procs.total;
     586
     587        /* paranoid */ verify( ready_mutate_islocked() );
     588        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
     589
     590        // Make sure that everything is consistent
     591        /* paranoid */ check( cltr->ready_queue );
     592
     593        // grow the ready queue
     594        with( cltr->ready_queue ) {
     595                // Find new count
     596                // Make sure we always have atleast 1 list
     597                if(target >= 2) {
     598                        ncount = target * READYQ_SHARD_FACTOR;
     599                } else {
     600                        ncount = SEQUENTIAL_SHARD;
     601                }
     602
     603                // Allocate new array (uses realloc and memcpies the data)
     604                lanes.data = alloc( ncount, lanes.data`realloc );
     605
     606                // Fix the moved data
     607                for( idx; (size_t)lanes.count ) {
     608                        fix(lanes.data[idx]);
     609                }
     610
     611                // Construct new data
     612                for( idx; (size_t)lanes.count ~ ncount) {
     613                        (lanes.data[idx]){};
     614                }
     615
     616                // Update original
     617                lanes.count = ncount;
     618        }
     619
     620        fix_times(cltr);
     621
     622        reassign_cltr_id(cltr);
     623
     624        // Make sure that everything is consistent
     625        /* paranoid */ check( cltr->ready_queue );
     626
     627        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
     628
     629        /* paranoid */ verify( ready_mutate_islocked() );
     630}
     631
     632// Shrink the ready queue
     633void ready_queue_shrink(struct cluster * cltr) {
     634        /* paranoid */ verify( ready_mutate_islocked() );
     635        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
     636
     637        // Make sure that everything is consistent
     638        /* paranoid */ check( cltr->ready_queue );
     639
     640        int target = cltr->procs.total;
     641
     642        with( cltr->ready_queue ) {
     643                // Remember old count
     644                size_t ocount = lanes.count;
     645
     646                // Find new count
     647                // Make sure we always have atleast 1 list
     648                lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD;
     649                /* paranoid */ verify( ocount >= lanes.count );
     650                /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 );
     651
     652                // for printing count the number of displaced threads
     653                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
     654                        __attribute__((unused)) size_t displaced = 0;
     655                #endif
     656
     657                // redistribute old data
     658                for( idx; (size_t)lanes.count ~ ocount) {
     659                        // Lock is not strictly needed but makes checking invariants much easier
     660                        __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
     661                        verify(locked);
     662
     663                        // As long as we can pop from this lane to push the threads somewhere else in the queue
     664                        while(!is_empty(lanes.data[idx])) {
     665                                struct $thread * thrd;
     666                                unsigned long long _;
     667                                [thrd, _] = pop(lanes.data[idx]);
     668
     669                                push(cltr, thrd, true);
     670
     671                                // for printing count the number of displaced threads
     672                                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
     673                                        displaced++;
     674                                #endif
    737675                        }
    738676
    739                         // Allocate new array (uses realloc and memcpies the data)
    740                         lanes.data = alloc( ncount, lanes.data`realloc );
    741 
    742                         // Fix the moved data
    743                         for( idx; (size_t)lanes.count ) {
    744                                 fix(lanes.data[idx]);
    745                         }
    746 
    747                         // Construct new data
    748                         for( idx; (size_t)lanes.count ~ ncount) {
    749                                 (lanes.data[idx]){};
    750                         }
    751 
    752                         // Update original
    753                         lanes.count = ncount;
    754                 }
    755 
    756                 fix_times(cltr);
    757 
    758                 reassign_cltr_id(cltr);
    759 
    760                 // Make sure that everything is consistent
    761                 /* paranoid */ check( cltr->ready_queue );
    762 
    763                 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
    764 
    765                 /* paranoid */ verify( ready_mutate_islocked() );
    766         }
    767 
    768         // Shrink the ready queue
    769         void ready_queue_shrink(struct cluster * cltr) {
    770                 /* paranoid */ verify( ready_mutate_islocked() );
    771                 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
    772 
    773                 // Make sure that everything is consistent
    774                 /* paranoid */ check( cltr->ready_queue );
    775 
    776                 int target = cltr->procs.total;
    777 
    778                 with( cltr->ready_queue ) {
    779                         // Remember old count
    780                         size_t ocount = lanes.count;
    781 
    782                         // Find new count
    783                         // Make sure we always have atleast 1 list
    784                         lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD;
    785                         /* paranoid */ verify( ocount >= lanes.count );
    786                         /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 );
    787 
    788                         // for printing count the number of displaced threads
    789                         #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
    790                                 __attribute__((unused)) size_t displaced = 0;
    791                         #endif
    792 
    793                         // redistribute old data
    794                         for( idx; (size_t)lanes.count ~ ocount) {
    795                                 // Lock is not strictly needed but makes checking invariants much easier
    796                                 __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
    797                                 verify(locked);
    798 
    799                                 // As long as we can pop from this lane to push the threads somewhere else in the queue
    800                                 while(!is_empty(lanes.data[idx])) {
    801                                         struct $thread * thrd;
    802                                         unsigned long long _;
    803                                         [thrd, _] = pop(lanes.data[idx]);
    804 
    805                                         push(cltr, thrd, true);
    806 
    807                                         // for printing count the number of displaced threads
    808                                         #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
    809                                                 displaced++;
    810                                         #endif
    811                                 }
    812 
    813                                 // Unlock the lane
    814                                 __atomic_unlock(&lanes.data[idx].lock);
    815 
    816                                 // TODO print the queue statistics here
    817 
    818                                 ^(lanes.data[idx]){};
    819                         }
    820 
    821                         __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
    822 
    823                         // Allocate new array (uses realloc and memcpies the data)
    824                         lanes.data = alloc( lanes.count, lanes.data`realloc );
    825 
    826                         // Fix the moved data
    827                         for( idx; (size_t)lanes.count ) {
    828                                 fix(lanes.data[idx]);
    829                         }
    830                 }
    831 
    832                 fix_times(cltr);
    833 
    834                 reassign_cltr_id(cltr);
    835 
    836                 // Make sure that everything is consistent
    837                 /* paranoid */ check( cltr->ready_queue );
    838 
    839                 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
    840                 /* paranoid */ verify( ready_mutate_islocked() );
    841         }
    842 #endif
     677                        // Unlock the lane
     678                        __atomic_unlock(&lanes.data[idx].lock);
     679
     680                        // TODO print the queue statistics here
     681
     682                        ^(lanes.data[idx]){};
     683                }
     684
     685                __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
     686
     687                // Allocate new array (uses realloc and memcpies the data)
     688                lanes.data = alloc( lanes.count, lanes.data`realloc );
     689
     690                // Fix the moved data
     691                for( idx; (size_t)lanes.count ) {
     692                        fix(lanes.data[idx]);
     693                }
     694        }
     695
     696        fix_times(cltr);
     697
     698        reassign_cltr_id(cltr);
     699
     700        // Make sure that everything is consistent
     701        /* paranoid */ check( cltr->ready_queue );
     702
     703        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
     704        /* paranoid */ verify( ready_mutate_islocked() );
     705}
    843706
    844707#if !defined(__CFA_NO_STATISTICS__)
Note: See TracChangeset for help on using the changeset viewer.