source: libcfa/src/concurrency/ready_queue.cfa @ 6abcb4d

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 6abcb4d was 9cc3a18, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Major clean-up before attempting to add new scheduler

  • Property mode set to 100644
File size: 18.6 KB
RevLine 
[7768b8d]1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#define __cforall_thread__
[1b143de]17// #define __CFA_DEBUG_PRINT_READY_QUEUE__
[7768b8d]18
[7a2972b9]19// #define USE_MPSC
[1eb239e4]20
[9cc3a18]21#define USE_RELAXED_FIFO
22// #define USE_WORK_STEALING
23
[7768b8d]24#include "bits/defs.hfa"
25#include "kernel_private.hfa"
26
27#define _GNU_SOURCE
28#include "stdlib.hfa"
[61d7bec]29#include "math.hfa"
[7768b8d]30
[04b5cef]31#include <unistd.h>
32
[13c5e19]33#include "ready_subqueue.hfa"
34
[7768b8d]35static const size_t cache_line_size = 64;
36
[dca5802]37// No overriden function, no environment variable, no define
38// fall back to a magic number
39#ifndef __CFA_MAX_PROCESSORS__
[b388ee81]40        #define __CFA_MAX_PROCESSORS__ 1024
[dca5802]41#endif
[7768b8d]42
[9cc3a18]43#if   defined(USE_RELAXED_FIFO)
44        #define BIAS 4
45        #define READYQ_SHARD_FACTOR 4
46#elif defined(USE_WORK_STEALING)
47        #define READYQ_SHARD_FACTOR 2
48#else
49        #error no scheduling strategy selected
50#endif
51
52static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred);
53static inline struct $thread * try_pop(struct cluster * cltr, unsigned w);
54static struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j);
55
[04b5cef]56
[dca5802]57// returns the maximum number of processors the RWLock support
[7768b8d]58__attribute__((weak)) unsigned __max_processors() {
59        const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
60        if(!max_cores_s) {
[504a7dc]61                __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
[dca5802]62                return __CFA_MAX_PROCESSORS__;
[7768b8d]63        }
64
65        char * endptr = 0p;
66        long int max_cores_l = strtol(max_cores_s, &endptr, 10);
67        if(max_cores_l < 1 || max_cores_l > 65535) {
[504a7dc]68                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
[dca5802]69                return __CFA_MAX_PROCESSORS__;
[7768b8d]70        }
71        if('\0' != *endptr) {
[504a7dc]72                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
[dca5802]73                return __CFA_MAX_PROCESSORS__;
[7768b8d]74        }
75
76        return max_cores_l;
77}
78
79//=======================================================================
80// Cluster wide reader-writer lock
81//=======================================================================
[b388ee81]82void  ?{}(__scheduler_RWLock_t & this) {
[7768b8d]83        this.max   = __max_processors();
84        this.alloc = 0;
85        this.ready = 0;
86        this.lock  = false;
87        this.data  = alloc(this.max);
88
89        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data    )) % 64) );
90        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
91        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
92        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
93
94}
[b388ee81]95void ^?{}(__scheduler_RWLock_t & this) {
[7768b8d]96        free(this.data);
97}
98
[9b1dcc2]99void ?{}( __scheduler_lock_id_t & this, __processor_id_t * proc ) {
[7768b8d]100        this.handle = proc;
101        this.lock   = false;
[64a7146]102        #ifdef __CFA_WITH_VERIFY__
103                this.owned  = false;
104        #endif
[7768b8d]105}
106
107//=======================================================================
108// Lock-Free registering/unregistering of threads
[a33c113]109void register_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) {
[b388ee81]110        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
[504a7dc]111
[7768b8d]112        // Step - 1 : check if there is already space in the data
113        uint_fast32_t s = ready;
114
115        // Check among all the ready
116        for(uint_fast32_t i = 0; i < s; i++) {
[9b1dcc2]117                __processor_id_t * null = 0p; // Re-write every loop since compare thrashes it
[7768b8d]118                if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
119                        && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
120                        /*paranoid*/ verify(i < ready);
[64a7146]121                        /*paranoid*/ verify(0 == (__alignof__(data[i]) % cache_line_size));
[7768b8d]122                        /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
[a33c113]123                        proc->id = i;
[7768b8d]124                }
125        }
126
[b388ee81]127        if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock->max);
[7768b8d]128
129        // Step - 2 : F&A to get a new spot in the array.
130        uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
[b388ee81]131        if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock->max);
[7768b8d]132
133        // Step - 3 : Mark space as used and then publish it.
[9b1dcc2]134        __scheduler_lock_id_t * storage = (__scheduler_lock_id_t *)&data[n];
[7768b8d]135        (*storage){ proc };
[fd9b524]136        while() {
[7768b8d]137                unsigned copy = n;
138                if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
139                        && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
140                        break;
[fd9b524]141                Pause();
[7768b8d]142        }
143
[1b143de]144        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);
[504a7dc]145
[7768b8d]146        // Return new spot.
147        /*paranoid*/ verify(n < ready);
[37ba662]148        /*paranoid*/ verify(__alignof__(data[n]) == (2 * cache_line_size));
[7768b8d]149        /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
[a33c113]150        proc->id = n;
[7768b8d]151}
152
[a33c113]153void unregister_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) {
[7768b8d]154        unsigned id = proc->id;
155        /*paranoid*/ verify(id < ready);
156        /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
157        __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
[504a7dc]158
159        __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
[7768b8d]160}
161
162//-----------------------------------------------------------------------
163// Writer side : acquire when changing the ready queue, e.g. adding more
164//  queues or removing them.
[b388ee81]165uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
[8fc652e0]166        /* paranoid */ verify( ! __preemption_enabled() );
[62502cc4]167
[7768b8d]168        // Step 1 : lock global lock
169        // It is needed to avoid processors that register mid Critical-Section
170        //   to simply lock their own lock and enter.
171        __atomic_acquire( &lock );
172
173        // Step 2 : lock per-proc lock
174        // Processors that are currently being registered aren't counted
175        //   but can't be in read_lock or in the critical section.
176        // All other processors are counted
177        uint_fast32_t s = ready;
178        for(uint_fast32_t i = 0; i < s; i++) {
179                __atomic_acquire( &data[i].lock );
180        }
181
[8fc652e0]182        /* paranoid */ verify( ! __preemption_enabled() );
[7768b8d]183        return s;
184}
185
[b388ee81]186void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) {
[8fc652e0]187        /* paranoid */ verify( ! __preemption_enabled() );
[62502cc4]188
[7768b8d]189        // Step 1 : release local locks
190        // This must be done while the global lock is held to avoid
191        //   threads that where created mid critical section
192        //   to race to lock their local locks and have the writer
193        //   immidiately unlock them
194        // Alternative solution : return s in write_lock and pass it to write_unlock
195        for(uint_fast32_t i = 0; i < last_s; i++) {
196                verify(data[i].lock);
197                __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
198        }
199
200        // Step 2 : release global lock
201        /*paranoid*/ assert(true == lock);
202        __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
[62502cc4]203
[8fc652e0]204        /* paranoid */ verify( ! __preemption_enabled() );
[7768b8d]205}
206
207//=======================================================================
[9cc3a18]208// Cforall Ready Queue used for scheduling
[b798713]209//=======================================================================
210void ?{}(__ready_queue_t & this) with (this) {
[28d73c1]211        lanes.data  = 0p;
[9cc3a18]212        lanes.tscs  = 0p;
[28d73c1]213        lanes.count = 0;
[b798713]214}
215
216void ^?{}(__ready_queue_t & this) with (this) {
[39fc03e]217        verify( 1 == lanes.count );
[dca5802]218        free(lanes.data);
[9cc3a18]219        free(lanes.tscs);
[dca5802]220}
221
[64a7146]222//-----------------------------------------------------------------------
[9cc3a18]223__attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
[61d7bec]224        __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
[1b143de]225
[5cb51502]226        const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
[9cc3a18]227        /* paranoid */ verify(external || kernelTLS().this_processor->cltr_id < lanes.count );
[fd1f65e]228
[dca5802]229        // write timestamp
[b798713]230        thrd->link.ts = rdtscl();
231
[9cc3a18]232        bool local;
233        int preferred = external ? -1 : kernelTLS().this_processor->cltr_id;
[52769ba]234
[dca5802]235        // Try to pick a lane and lock it
236        unsigned i;
237        do {
238                // Pick the index of a lane
[5fe7322]239                unsigned r = __tls_rand_fwd();
[772411a]240                [i, local] = idx_from_r(r, preferred);
241
[04b5cef]242                i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
[b798713]243
244                #if !defined(__CFA_NO_STATISTICS__)
[fd1f65e]245                        if(external) {
246                                if(local) __atomic_fetch_add(&cltr->stats->ready.pick.ext.local, 1, __ATOMIC_RELAXED);
247                                __atomic_fetch_add(&cltr->stats->ready.pick.ext.attempt, 1, __ATOMIC_RELAXED);
248                        }
249                        else {
250                                if(local) __tls_stats()->ready.pick.push.local++;
251                                __tls_stats()->ready.pick.push.attempt++;
252                        }
[b798713]253                #endif
254
[7a2972b9]255        #if defined(USE_MPSC)
256                // mpsc always succeeds
257        } while( false );
258        #else
[b798713]259                // If we can't lock it retry
[dca5802]260        } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
[7a2972b9]261        #endif
[b798713]262
[dca5802]263        // Actually push it
[5fe7322]264        push(lanes.data[i], thrd);
[dca5802]265
[7a2972b9]266        #if !defined(USE_MPSC)
267                // Unlock and return
268                __atomic_unlock( &lanes.data[i].lock );
269        #endif
[dca5802]270
[c426b03]271        // Mark the current index in the tls rng instance as having an item
272        __tls_rand_advance_bck();
273
[1b143de]274        __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
275
[dca5802]276        // Update statistics
277        #if !defined(__CFA_NO_STATISTICS__)
[fd1f65e]278                if(external) {
279                        if(local) __atomic_fetch_add(&cltr->stats->ready.pick.ext.lsuccess, 1, __ATOMIC_RELAXED);
280                        __atomic_fetch_add(&cltr->stats->ready.pick.ext.success, 1, __ATOMIC_RELAXED);
281                }
282                else {
283                        if(local) __tls_stats()->ready.pick.push.lsuccess++;
284                        __tls_stats()->ready.pick.push.success++;
285                }
[dca5802]286        #endif
[b798713]287}
288
[13c5e19]289// Pop from the ready queue from a given cluster
290__attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) {
291        /* paranoid */ verify( lanes.count > 0 );
[9cc3a18]292        /* paranoid */ verify(kernelTLS().this_processor->cltr_id < lanes.count );
293
[1eb239e4]294        unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
[9cc3a18]295        int preferred = kernelTLS().this_processor->cltr_id;
[13c5e19]296
[772411a]297
[13c5e19]298        // As long as the list is not empty, try finding a lane that isn't empty and pop from it
[78d6c803]299        for(25) {
[13c5e19]300                // Pick two lists at random
[5fe7322]301                unsigned ri = __tls_rand_bck();
302                unsigned rj = __tls_rand_bck();
[772411a]303
304                unsigned i, j;
305                __attribute__((unused)) bool locali, localj;
306                [i, locali] = idx_from_r(ri, preferred);
307                [j, localj] = idx_from_r(rj, preferred);
308
309                #if !defined(__CFA_NO_STATISTICS__)
[3143f28]310                        if(locali && localj) {
[772411a]311                                __tls_stats()->ready.pick.pop.local++;
[13c5e19]312                        }
313                #endif
314
[1eb239e4]315                i %= count;
316                j %= count;
[13c5e19]317
318                // try popping from the 2 picked lists
319                struct $thread * thrd = try_pop(cltr, i, j);
[52769ba]320                if(thrd) {
[9cc3a18]321                        #if !defined(__CFA_NO_STATISTICS__)
[772411a]322                                if( locali || localj ) __tls_stats()->ready.pick.pop.lsuccess++;
[52769ba]323                        #endif
324                        return thrd;
325                }
[13c5e19]326        }
327
328        // All lanes where empty return 0p
329        return 0p;
330}
331
[9cc3a18]332static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) {
333        lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
334        for(i; lanes.count) {
335                lanes.tscs[i].tv = ts(lanes.data[i]);
[1eb239e4]336        }
337
338}
339
[9cc3a18]340//=======================================================================
341// Various Ready Queue utilities
342//=======================================================================
343// these function work the same or almost the same
344// whether they are using work-stealing or relaxed fifo scheduling
[1eb239e4]345
[b798713]346//-----------------------------------------------------------------------
[9cc3a18]347// get index from random number with or without bias towards queues
348static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred) {
349        unsigned i;
350        bool local;
351        unsigned rlow  = r % BIAS;
352        unsigned rhigh = r / BIAS;
353        if((0 != rlow) && preferred >= 0) {
354                // (BIAS - 1) out of BIAS chances
355                // Use perferred queues
356                i = preferred + (rhigh % READYQ_SHARD_FACTOR);
357                local = true;
[b798713]358        }
[9cc3a18]359        else {
360                // 1 out of BIAS chances
361                // Use all queues
362                i = rhigh;
363                local = false;
364        }
365        return [i, local];
[13c5e19]366}
367
[9cc3a18]368//-----------------------------------------------------------------------
369// try to pop from a lane given by index w
[13c5e19]370static inline struct $thread * try_pop(struct cluster * cltr, unsigned w) with (cltr->ready_queue) {
[dca5802]371        // Get relevant elements locally
372        __intrusive_lane_t & lane = lanes.data[w];
373
[b798713]374        // If list looks empty retry
[dca5802]375        if( is_empty(lane) ) return 0p;
[b798713]376
377        // If we can't get the lock retry
[dca5802]378        if( !__atomic_try_acquire(&lane.lock) ) return 0p;
[b798713]379
380        // If list is empty, unlock and retry
[dca5802]381        if( is_empty(lane) ) {
382                __atomic_unlock(&lane.lock);
[b798713]383                return 0p;
384        }
385
386        // Actually pop the list
[504a7dc]387        struct $thread * thrd;
[343d10e]388        thrd = pop(lane);
[b798713]389
[dca5802]390        /* paranoid */ verify(thrd);
391        /* paranoid */ verify(lane.lock);
[b798713]392
393        // Unlock and return
[dca5802]394        __atomic_unlock(&lane.lock);
[b798713]395
[dca5802]396        // Update statistics
[b798713]397        #if !defined(__CFA_NO_STATISTICS__)
[8834751]398                __tls_stats()->ready.pick.pop.success++;
[b798713]399        #endif
400
[9cc3a18]401        #if defined(USE_WORKSTEALING)
402                lanes.times[i].val = thrd->links.ts;
403        #endif
[d72c074]404
[dca5802]405        // return the popped thread
[b798713]406        return thrd;
407}
[04b5cef]408
[9cc3a18]409//-----------------------------------------------------------------------
410// try to pop from any lanes making sure you don't miss any threads push
411// before the start of the function
412__attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
413        /* paranoid */ verify( lanes.count > 0 );
414        unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
415        unsigned offset = __tls_rand();
416        for(i; count) {
417                unsigned idx = (offset + i) % count;
418                struct $thread * thrd = try_pop(cltr, idx);
419                if(thrd) {
420                        return thrd;
421                }
[13c5e19]422        }
[9cc3a18]423
424        // All lanes where empty return 0p
425        return 0p;
[b798713]426}
427
428//-----------------------------------------------------------------------
[9cc3a18]429// Check that all the intrusive queues in the data structure are still consistent
[b798713]430static void check( __ready_queue_t & q ) with (q) {
[7a2972b9]431        #if defined(__CFA_WITH_VERIFY__) && !defined(USE_MPSC)
[b798713]432                {
[dca5802]433                        for( idx ; lanes.count ) {
434                                __intrusive_lane_t & sl = lanes.data[idx];
435                                assert(!lanes.data[idx].lock);
[b798713]436
437                                assert(head(sl)->link.prev == 0p );
438                                assert(head(sl)->link.next->link.prev == head(sl) );
439                                assert(tail(sl)->link.next == 0p );
440                                assert(tail(sl)->link.prev->link.next == tail(sl) );
441
[7a2972b9]442                                if(is_empty(sl)) {
[b798713]443                                        assert(tail(sl)->link.prev == head(sl));
444                                        assert(head(sl)->link.next == tail(sl));
[1b143de]445                                } else {
446                                        assert(tail(sl)->link.prev != head(sl));
447                                        assert(head(sl)->link.next != tail(sl));
[b798713]448                                }
449                        }
450                }
451        #endif
452}
453
[9cc3a18]454//-----------------------------------------------------------------------
455// Given 2 indexes, pick the list with the oldest push an try to pop from it
456static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
457        #if !defined(__CFA_NO_STATISTICS__)
458                __tls_stats()->ready.pick.pop.attempt++;
459        #endif
460
461        // Pick the bet list
462        int w = i;
463        if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
464                w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
465        }
466
467        return try_pop(cltr, w);
468}
469
[b798713]470// Call this function of the intrusive list was moved using memcpy
[dca5802]471// fixes the list so that the pointers back to anchors aren't left dangling
472static inline void fix(__intrusive_lane_t & ll) {
[7a2972b9]473        #if !defined(USE_MPSC)
474                // if the list is not empty then follow he pointer and fix its reverse
475                if(!is_empty(ll)) {
476                        head(ll)->link.next->link.prev = head(ll);
477                        tail(ll)->link.prev->link.next = tail(ll);
478                }
479                // Otherwise just reset the list
480                else {
481                        verify(tail(ll)->link.next == 0p);
482                        tail(ll)->link.prev = head(ll);
483                        head(ll)->link.next = tail(ll);
484                        verify(head(ll)->link.prev == 0p);
485                }
486        #endif
[b798713]487}
488
[9cc3a18]489static void assign_list(unsigned & value, dlist(processor, processor) & list, unsigned count) {
[a017ee7]490        processor * it = &list`first;
491        for(unsigned i = 0; i < count; i++) {
492                /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
493                it->cltr_id = value;
[9cc3a18]494                value += READYQ_SHARD_FACTOR;
[a017ee7]495                it = &(*it)`next;
496        }
497}
498
[9cc3a18]499static void reassign_cltr_id(struct cluster * cltr) {
[a017ee7]500        unsigned preferred = 0;
[9cc3a18]501        assign_list(preferred, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
502        assign_list(preferred, cltr->procs.idles  , cltr->procs.idle );
[a017ee7]503}
504
[dca5802]505// Grow the ready queue
[a017ee7]506void ready_queue_grow(struct cluster * cltr) {
[bd0bdd37]507        size_t ncount;
[a017ee7]508        int target = cltr->procs.total;
[bd0bdd37]509
[64a7146]510        /* paranoid */ verify( ready_mutate_islocked() );
[504a7dc]511        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
[b798713]512
[dca5802]513        // Make sure that everything is consistent
514        /* paranoid */ check( cltr->ready_queue );
515
516        // grow the ready queue
[b798713]517        with( cltr->ready_queue ) {
[39fc03e]518                // Find new count
519                // Make sure we always have atleast 1 list
[bd0bdd37]520                if(target >= 2) {
[9cc3a18]521                        ncount = target * READYQ_SHARD_FACTOR;
[bd0bdd37]522                } else {
523                        ncount = 1;
524                }
[b798713]525
[dca5802]526                // Allocate new array (uses realloc and memcpies the data)
[ceb7db8]527                lanes.data = alloc( ncount, lanes.data`realloc );
[b798713]528
529                // Fix the moved data
[dca5802]530                for( idx; (size_t)lanes.count ) {
531                        fix(lanes.data[idx]);
[b798713]532                }
533
534                // Construct new data
[dca5802]535                for( idx; (size_t)lanes.count ~ ncount) {
536                        (lanes.data[idx]){};
[b798713]537                }
538
539                // Update original
[dca5802]540                lanes.count = ncount;
[b798713]541        }
542
[9cc3a18]543        fix_times(cltr);
544
545        reassign_cltr_id(cltr);
[a017ee7]546
[b798713]547        // Make sure that everything is consistent
[dca5802]548        /* paranoid */ check( cltr->ready_queue );
549
[504a7dc]550        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
[dca5802]551
[64a7146]552        /* paranoid */ verify( ready_mutate_islocked() );
[b798713]553}
554
[dca5802]555// Shrink the ready queue
[a017ee7]556void ready_queue_shrink(struct cluster * cltr) {
[64a7146]557        /* paranoid */ verify( ready_mutate_islocked() );
[504a7dc]558        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
[dca5802]559
560        // Make sure that everything is consistent
561        /* paranoid */ check( cltr->ready_queue );
562
[a017ee7]563        int target = cltr->procs.total;
564
[b798713]565        with( cltr->ready_queue ) {
[39fc03e]566                // Remember old count
[dca5802]567                size_t ocount = lanes.count;
[b798713]568
[39fc03e]569                // Find new count
570                // Make sure we always have atleast 1 list
[9cc3a18]571                lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: 1;
[39fc03e]572                /* paranoid */ verify( ocount >= lanes.count );
[9cc3a18]573                /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 );
[dca5802]574
575                // for printing count the number of displaced threads
[504a7dc]576                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
[dca5802]577                        __attribute__((unused)) size_t displaced = 0;
578                #endif
[b798713]579
580                // redistribute old data
[dca5802]581                for( idx; (size_t)lanes.count ~ ocount) {
582                        // Lock is not strictly needed but makes checking invariants much easier
[1b143de]583                        __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
[b798713]584                        verify(locked);
[dca5802]585
586                        // As long as we can pop from this lane to push the threads somewhere else in the queue
587                        while(!is_empty(lanes.data[idx])) {
[504a7dc]588                                struct $thread * thrd;
[343d10e]589                                thrd = pop(lanes.data[idx]);
[dca5802]590
[b798713]591                                push(cltr, thrd);
[dca5802]592
593                                // for printing count the number of displaced threads
[504a7dc]594                                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
[dca5802]595                                        displaced++;
596                                #endif
[b798713]597                        }
598
[dca5802]599                        // Unlock the lane
600                        __atomic_unlock(&lanes.data[idx].lock);
[b798713]601
602                        // TODO print the queue statistics here
603
[dca5802]604                        ^(lanes.data[idx]){};
[b798713]605                }
606
[504a7dc]607                __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
[c84b4be]608
[dca5802]609                // Allocate new array (uses realloc and memcpies the data)
[ceb7db8]610                lanes.data = alloc( lanes.count, lanes.data`realloc );
[b798713]611
612                // Fix the moved data
[dca5802]613                for( idx; (size_t)lanes.count ) {
614                        fix(lanes.data[idx]);
[b798713]615                }
616        }
617
[9cc3a18]618        fix_times(cltr);
619
620        reassign_cltr_id(cltr);
[a017ee7]621
[b798713]622        // Make sure that everything is consistent
[dca5802]623        /* paranoid */ check( cltr->ready_queue );
624
[504a7dc]625        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
[64a7146]626        /* paranoid */ verify( ready_mutate_islocked() );
[fd9b524]627}
Note: See TracBrowser for help on using the repository browser.