Ignore:
Timestamp:
Mar 14, 2022, 2:24:51 PM (3 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, ast-experimental, enum, master, pthread-emulation, qualifiedEnum
Children:
bfb9bf5
Parents:
c42b8a1
Message:

Change how the ready queue is initialized to make it common with I/O

Location:
libcfa/src/concurrency/kernel
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/kernel/cluster.cfa

    rc42b8a1 r884f3f67  
    221221//-----------------------------------------------------------------------
    222222// Check that all the intrusive queues in the data structure are still consistent
    223 static void check( __ready_queue_t & q ) with (q) {
     223static void check_readyQ( cluster * cltr ) with (cltr->sched) {
    224224        #if defined(__CFA_WITH_VERIFY__)
    225225                {
    226                         for( idx ; lanes.count ) {
    227                                 __intrusive_lane_t & sl = lanes.data[idx];
    228                                 assert(!lanes.data[idx].lock);
     226                        const unsigned lanes_count = readyQ.count;
     227                        for( idx ; lanes_count ) {
     228                                __intrusive_lane_t & sl = readyQ.data[idx];
     229                                assert(!readyQ.data[idx].lock);
    229230
    230231                                        if(is_empty(sl)) {
     
    257258                it->rdq.id = value;
    258259                it->rdq.target = MAX;
    259                 value += __readyq_shard_factor;
     260                value += __shard_factor.readyq;
    260261                it = &(*it)`next;
    261262        }
     
    268269}
    269270
    270 static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) {
    271         lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
    272         for(i; lanes.count) {
    273                 lanes.tscs[i].tv = rdtscl();
    274                 lanes.tscs[i].ma = 0;
     271static void fix_times( __timestamp_t * volatile & tscs, unsigned count ) {
     272        tscs = alloc(count, tscs`realloc);
     273        for(i; count) {
     274                tscs[i].tv = rdtscl();
     275                tscs[i].ma = 0;
    275276        }
    276277}
     
    278279// Grow the ready queue
    279280void ready_queue_grow(struct cluster * cltr) {
    280         size_t ncount;
    281281        int target = cltr->procs.total;
    282282
     
    285285
    286286        // Make sure that everything is consistent
    287         /* paranoid */ check( cltr->ready_queue );
    288 
    289         // grow the ready queue
    290         with( cltr->ready_queue ) {
    291                 // Find new count
    292                 // Make sure we always have atleast 1 list
    293                 if(target >= 2) {
    294                         ncount = target * __readyq_shard_factor;
    295                 } else {
    296                         ncount = __readyq_single_shard;
    297                 }
    298 
    299                 // Allocate new array (uses realloc and memcpies the data)
    300                 lanes.data = alloc( ncount, lanes.data`realloc );
    301 
    302                 // Fix the moved data
    303                 for( idx; (size_t)lanes.count ) {
    304                         fix(lanes.data[idx]);
    305                 }
    306 
    307                 // Construct new data
    308                 for( idx; (size_t)lanes.count ~ ncount) {
    309                         (lanes.data[idx]){};
    310                 }
    311 
    312                 // Update original
    313                 lanes.count = ncount;
    314 
    315                 lanes.caches = alloc( target, lanes.caches`realloc );
    316         }
    317 
    318         fix_times(cltr);
    319 
     287        /* paranoid */ check_readyQ( cltr );
     288
     289
     290        // Find new count
     291        // Make sure we always have atleast 1 list
     292        size_t ocount = cltr->sched.readyQ.count;
     293        size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
     294
     295        // Do we have to do anything?
     296        if( ocount != ncount ) {
     297
     298                // grow the ready queue
     299                with( cltr->sched ) {
     300
     301                        // Allocate new array (uses realloc and memcpies the data)
     302                        readyQ.data = alloc( ncount, readyQ.data`realloc );
     303
     304                        // Fix the moved data
     305                        for( idx; ocount ) {
     306                                fix(readyQ.data[idx]);
     307                        }
     308
     309                        // Construct new data
     310                        for( idx; ocount ~ ncount) {
     311                                (readyQ.data[idx]){};
     312                        }
     313
     314                        // Update original count
     315                        readyQ.count = ncount;
     316                }
     317
     318
     319                fix_times(cltr->sched.readyQ.tscs, cltr->sched.readyQ.count);
     320        }
     321
     322        // realloc the caches
     323        cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
     324
     325        // reassign the clusters.
    320326        reassign_cltr_id(cltr);
    321327
    322328        // Make sure that everything is consistent
    323         /* paranoid */ check( cltr->ready_queue );
     329        /* paranoid */ check_readyQ( cltr );
     330        /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
    324331
    325332        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
     
    334341
    335342        // Make sure that everything is consistent
    336         /* paranoid */ check( cltr->ready_queue );
     343        /* paranoid */ check_readyQ( cltr );
    337344
    338345        int target = cltr->procs.total;
    339346
    340         with( cltr->ready_queue ) {
     347        with( cltr->sched ) {
    341348                // Remember old count
    342                 size_t ocount = lanes.count;
     349                size_t ocount = readyQ.count;
    343350
    344351                // Find new count
    345352                // Make sure we always have atleast 1 list
    346                 lanes.count = target >= 2 ? target * __readyq_shard_factor: __readyq_single_shard;
    347                 /* paranoid */ verify( ocount >= lanes.count );
    348                 /* paranoid */ verify( lanes.count == target * __readyq_shard_factor || target < 2 );
     353                size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
     354                /* paranoid */ verifyf( ocount >= ncount, "Error in shrinking size calculation, %zu >= %zu", ocount, ncount );
     355                /* paranoid */ verifyf( ncount == target * __shard_factor.readyq || ncount == __readyq_single_shard,
     356                /* paranoid */          "Error in shrinking size calculation, expected %u or %u, got %zu", target * __shard_factor.readyq, ncount );
     357
     358                readyQ.count = ncount;
    349359
    350360                // for printing count the number of displaced threads
     
    354364
    355365                // redistribute old data
    356                 for( idx; (size_t)lanes.count ~ ocount) {
     366                for( idx; ncount ~ ocount) {
    357367                        // Lock is not strictly needed but makes checking invariants much easier
    358                         __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
     368                        __attribute__((unused)) bool locked = __atomic_try_acquire(&readyQ.data[idx].lock);
    359369                        verify(locked);
    360370
    361371                        // As long as we can pop from this lane to push the threads somewhere else in the queue
    362                         while(!is_empty(lanes.data[idx])) {
     372                        while(!is_empty(readyQ.data[idx])) {
    363373                                struct thread$ * thrd;
    364374                                unsigned long long _;
    365                                 [thrd, _] = pop(lanes.data[idx]);
     375                                [thrd, _] = pop(readyQ.data[idx]);
    366376
    367377                                push(cltr, thrd, true);
     
    374384
    375385                        // Unlock the lane
    376                         __atomic_unlock(&lanes.data[idx].lock);
     386                        __atomic_unlock(&readyQ.data[idx].lock);
    377387
    378388                        // TODO print the queue statistics here
    379389
    380                         ^(lanes.data[idx]){};
     390                        ^(readyQ.data[idx]){};
    381391                }
    382392
     
    384394
    385395                // Allocate new array (uses realloc and memcpies the data)
    386                 lanes.data = alloc( lanes.count, lanes.data`realloc );
     396                readyQ.data = alloc( ncount, readyQ.data`realloc );
    387397
    388398                // Fix the moved data
    389                 for( idx; (size_t)lanes.count ) {
    390                         fix(lanes.data[idx]);
    391                 }
    392 
    393                 lanes.caches = alloc( target, lanes.caches`realloc );
    394         }
    395 
    396         fix_times(cltr);
     399                for( idx; ncount ) {
     400                        fix(readyQ.data[idx]);
     401                }
     402
     403                fix_times(readyQ.tscs, ncount);
     404        }
     405        cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
     406
    397407
    398408
     
    400410
    401411        // Make sure that everything is consistent
    402         /* paranoid */ check( cltr->ready_queue );
     412        /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
     413        /* paranoid */ check_readyQ( cltr );
    403414
    404415        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
    405416        /* paranoid */ verify( ready_mutate_islocked() );
     417}
     418
     419void ready_queue_close(struct cluster * cltr) {
     420        free( cltr->sched.readyQ.data );
     421        free( cltr->sched.readyQ.tscs );
     422        cltr->sched.readyQ.data = 0p;
     423        cltr->sched.readyQ.tscs = 0p;
     424        cltr->sched.readyQ.count = 0;
     425
     426        free( cltr->sched.io.tscs );
     427        free( cltr->sched.caches );
    406428}
    407429
  • libcfa/src/concurrency/kernel/startup.cfa

    rc42b8a1 r884f3f67  
    515515        this.rdq.its = 0;
    516516        this.rdq.itr = 0;
    517         this.rdq.id  = MAX;
     517        this.rdq.id  = 0;
    518518        this.rdq.target = MAX;
    519519        this.rdq.last = MAX;
     
    605605        this.name = name;
    606606        this.preemption_rate = preemption_rate;
    607         ready_queue{};
     607        this.sched.readyQ.data = 0p;
     608        this.sched.readyQ.tscs = 0p;
     609        this.sched.readyQ.count = 0;
     610        this.sched.io.tscs = 0p;
     611        this.sched.caches = 0p;
    608612
    609613        #if !defined(__CFA_NO_STATISTICS__)
     
    644648        // Unlock the RWlock
    645649        ready_mutate_unlock( last_size );
     650
     651        ready_queue_close( &this );
     652        /* paranoid */ verify( this.sched.readyQ.data == 0p );
     653        /* paranoid */ verify( this.sched.readyQ.tscs == 0p );
     654        /* paranoid */ verify( this.sched.readyQ.count == 0 );
     655        /* paranoid */ verify( this.sched.io.tscs == 0p );
     656        /* paranoid */ verify( this.sched.caches == 0p );
     657
    646658        enable_interrupts( false ); // Don't poll, could be in main cluster
     659
    647660
    648661        #if !defined(__CFA_NO_STATISTICS__)
Note: See TracChangeset for help on using the changeset viewer.