source: libcfa/src/concurrency/kernel/cluster.cfa @ 9d5eacb

Last change on this file since 9d5eacb was 6dd4091, checked in by Peter A. Buhr <pabuhr@…>, 7 months ago

comment out asserts that fail when malloc(0) returns non-null

  • Property mode set to 100644
File size: 15.4 KB
RevLine 
[c42b8a1]1//
2// Cforall Version 1.0.0 Copyright (C) 2022 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
[708ae38]7// cluster.cfa -- file that includes helpers for subsystem that need cluster wide support
[c42b8a1]8//
9// Author           : Thierry Delisle
[708ae38]10// Created On       : Fri Mar 11 12:39:24 2022
[c42b8a1]11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#define __cforall_thread__
17
18#include "bits/defs.hfa"
19#include "device/cpu.hfa"
[708ae38]20#include "kernel/cluster.hfa"
21#include "kernel/private.hfa"
[c42b8a1]22
23#include "stdlib.hfa"
24#include "limits.hfa"
25#include "math.hfa"
26
27#include "ready_subqueue.hfa"
[78a580d]28#include "io/types.hfa"
[c42b8a1]29
30#include <errno.h>
31#include <unistd.h>
32
33extern "C" {
34        #include <sys/syscall.h>  // __NR_xxx
35}
36
37// No overriden function, no environment variable, no define
38// fall back to a magic number
39#ifndef __CFA_MAX_PROCESSORS__
40        #define __CFA_MAX_PROCESSORS__ 1024
41#endif
42
43#if !defined(__CFA_NO_STATISTICS__)
44        #define __STATS(...) __VA_ARGS__
45#else
46        #define __STATS(...)
47#endif
48
49// returns the maximum number of processors the RWLock support
[c18bf9e]50__attribute__((weak)) unsigned __max_processors() libcfa_public {
[c42b8a1]51        const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
52        if(!max_cores_s) {
53                __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
54                return __CFA_MAX_PROCESSORS__;
55        }
56
57        char * endptr = 0p;
58        long int max_cores_l = strtol(max_cores_s, &endptr, 10);
59        if(max_cores_l < 1 || max_cores_l > 65535) {
60                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
61                return __CFA_MAX_PROCESSORS__;
62        }
63        if('\0' != *endptr) {
64                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
65                return __CFA_MAX_PROCESSORS__;
66        }
67
68        return max_cores_l;
69}
70
71//=======================================================================
72// Cluster wide reader-writer lock
73//=======================================================================
74void  ?{}(__scheduler_RWLock_t & this) {
[741e22c]75        this.lock.max   = __max_processors();
76        this.lock.alloc = 0;
77        this.lock.ready = 0;
78        this.lock.data  = alloc(this.lock.max);
79        this.lock.write_lock  = false;
[c42b8a1]80
[741e22c]81        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.lock.alloc), &this.lock.alloc));
82        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.lock.ready), &this.lock.ready));
[c42b8a1]83
84}
85void ^?{}(__scheduler_RWLock_t & this) {
[741e22c]86        free(this.lock.data);
[c42b8a1]87}
88
89
90//=======================================================================
91// Lock-Free registering/unregistering of threads
[cd3fc46]92unsigned register_proc_id( void ) with(__scheduler_lock.lock) {
[c42b8a1]93        bool * handle = (bool *)&kernelTLS().sched_lock;
94
95        // Step - 1 : check if there is already space in the data
96        uint_fast32_t s = ready;
97
98        // Check among all the ready
99        for(uint_fast32_t i = 0; i < s; i++) {
100                bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems
101                /* paranoid */ verify( handle != *cell );
102
103                bool * null = 0p; // Re-write every loop since compare thrashes it
104                if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null
105                        && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
106                        /* paranoid */ verify(i < ready);
107                        /* paranoid */ verify( (kernelTLS().sched_id = i, true) );
108                        return i;
109                }
110        }
111
[cd3fc46]112        if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock.lock.max);
[c42b8a1]113
114        // Step - 2 : F&A to get a new spot in the array.
115        uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
[cd3fc46]116        if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock.lock.max);
[c42b8a1]117
118        // Step - 3 : Mark space as used and then publish it.
119        data[n] = handle;
120        while() {
121                unsigned copy = n;
122                if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
123                        && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
124                        break;
125                Pause();
126        }
127
128        // Return new spot.
129        /* paranoid */ verify(n < ready);
130        /* paranoid */ verify( (kernelTLS().sched_id = n, true) );
131        return n;
132}
133
[cd3fc46]134void unregister_proc_id( unsigned id ) with(__scheduler_lock.lock) {
[c42b8a1]135        /* paranoid */ verify(id < ready);
136        /* paranoid */ verify(id == kernelTLS().sched_id);
137        /* paranoid */ verify(data[id] == &kernelTLS().sched_lock);
138
139        bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems
140
141        __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
142}
143
144//-----------------------------------------------------------------------
145// Writer side : acquire when changing the ready queue, e.g. adding more
146//  queues or removing them.
[cd3fc46]147uint_fast32_t ready_mutate_lock( void ) with(__scheduler_lock.lock) {
[c42b8a1]148        /* paranoid */ verify( ! __preemption_enabled() );
149
150        // Step 1 : lock global lock
151        // It is needed to avoid processors that register mid Critical-Section
152        //   to simply lock their own lock and enter.
153        __atomic_acquire( &write_lock );
154
155        // Make sure we won't deadlock ourself
156        // Checking before acquiring the writer lock isn't safe
157        // because someone else could have locked us.
158        /* paranoid */ verify( ! kernelTLS().sched_lock );
159
160        // Step 2 : lock per-proc lock
161        // Processors that are currently being registered aren't counted
162        //   but can't be in read_lock or in the critical section.
163        // All other processors are counted
164        uint_fast32_t s = ready;
165        for(uint_fast32_t i = 0; i < s; i++) {
166                volatile bool * llock = data[i];
167                if(llock) __atomic_acquire( llock );
168        }
169
170        /* paranoid */ verify( ! __preemption_enabled() );
171        return s;
172}
173
[cd3fc46]174void ready_mutate_unlock( uint_fast32_t last_s ) with(__scheduler_lock.lock) {
[c42b8a1]175        /* paranoid */ verify( ! __preemption_enabled() );
176
177        // Step 1 : release local locks
178        // This must be done while the global lock is held to avoid
179        //   threads that where created mid critical section
180        //   to race to lock their local locks and have the writer
181        //   immidiately unlock them
182        // Alternative solution : return s in write_lock and pass it to write_unlock
183        for(uint_fast32_t i = 0; i < last_s; i++) {
184                volatile bool * llock = data[i];
185                if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE);
186        }
187
188        // Step 2 : release global lock
189        /*paranoid*/ assert(true == write_lock);
190        __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE);
191
192        /* paranoid */ verify( ! __preemption_enabled() );
193}
194
195//=======================================================================
196// Cluster growth
197static const unsigned __readyq_single_shard = 2;
198
[31c967b]199void  ?{}(__timestamp_t & this) { this.t.tv = 0; this.t.ma = 0; }
200void ^?{}(__timestamp_t &) {}
201
[c42b8a1]202//-----------------------------------------------------------------------
203// Check that all the intrusive queues in the data structure are still consistent
[884f3f67]204static void check_readyQ( cluster * cltr ) with (cltr->sched) {
[c42b8a1]205        #if defined(__CFA_WITH_VERIFY__)
206                {
[884f3f67]207                        const unsigned lanes_count = readyQ.count;
208                        for( idx ; lanes_count ) {
209                                __intrusive_lane_t & sl = readyQ.data[idx];
[2af1943]210                                assert(!readyQ.data[idx].l.lock);
[c42b8a1]211
212                                        if(is_empty(sl)) {
[2af1943]213                                                assert( sl.l.anchor.next == 0p );
214                                                assert( sl.l.anchor.ts   == MAX );
215                                                assert( mock_head(sl)  == sl.l.prev );
[c42b8a1]216                                        } else {
[2af1943]217                                                assert( sl.l.anchor.next != 0p );
218                                                assert( sl.l.anchor.ts   != MAX );
219                                                assert( mock_head(sl)  != sl.l.prev );
[c42b8a1]220                                        }
221                        }
222                }
223        #endif
224}
225
226// Call this function of the intrusive list was moved using memcpy
227// fixes the list so that the pointers back to anchors aren't left dangling
228static inline void fix(__intrusive_lane_t & ll) {
[708ae38]229        if(is_empty(ll)) {
[2af1943]230                verify(ll.l.anchor.next == 0p);
231                ll.l.prev = mock_head(ll);
[708ae38]232        }
[c42b8a1]233}
234
[1756e08]235static void assign_list(unsigned & valrq, unsigned & valio, dlist(struct processor) & list, unsigned count) {
236        struct processor * it = &list`first;
[c42b8a1]237        for(unsigned i = 0; i < count; i++) {
238                /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
[adb3ea1]239                it->rdq.id = valrq;
[b035046]240                it->rdq.target = UINT_MAX;
[adb3ea1]241                valrq += __shard_factor.readyq;
[1a567d0]242                #if defined(CFA_HAVE_LINUX_IO_URING_H)
243                        it->io.ctx->cq.id = valio;
[b035046]244                        it->io.target = UINT_MAX;
[1a567d0]245                        valio += __shard_factor.io;
246                #endif
[c42b8a1]247                it = &(*it)`next;
248        }
249}
250
251static void reassign_cltr_id(struct cluster * cltr) {
[adb3ea1]252        unsigned prefrq = 0;
253        unsigned prefio = 0;
254        assign_list(prefrq, prefio, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
255        assign_list(prefrq, prefio, cltr->procs.idles  , cltr->procs.idle );
256}
257
[1a567d0]258#if defined(CFA_HAVE_LINUX_IO_URING_H)
[1756e08]259        static void assign_io(io_context$ ** data, size_t count, dlist(struct processor) & list) {
260                struct processor * it = &list`first;
[1a567d0]261                while(it) {
262                        /* paranoid */ verifyf( it, "Unexpected null iterator\n");
263                        /* paranoid */ verifyf( it->io.ctx->cq.id < count, "Processor %p has id %u above count %zu\n", it, it->rdq.id, count);
264                        data[it->io.ctx->cq.id] = it->io.ctx;
265                        it = &(*it)`next;
266                }
[adb3ea1]267        }
268
[1a567d0]269        static void reassign_cltr_io(struct cluster * cltr) {
270                assign_io(cltr->sched.io.data, cltr->sched.io.count, cltr->procs.actives);
271                assign_io(cltr->sched.io.data, cltr->sched.io.count, cltr->procs.idles  );
272        }
273#else
274        static void reassign_cltr_io(struct cluster *) {}
275#endif
[c42b8a1]276
[884f3f67]277static void fix_times( __timestamp_t * volatile & tscs, unsigned count ) {
278        tscs = alloc(count, tscs`realloc);
279        for(i; count) {
[2af1943]280                tscs[i].t.tv = rdtscl();
281                tscs[i].t.ma = 0;
[c42b8a1]282        }
283}
284
285// Grow the ready queue
286void ready_queue_grow(struct cluster * cltr) {
287        int target = cltr->procs.total;
288
289        /* paranoid */ verify( ready_mutate_islocked() );
290        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
291
292        // Make sure that everything is consistent
[884f3f67]293        /* paranoid */ check_readyQ( cltr );
[c42b8a1]294
295
[884f3f67]296        // Find new count
297        // Make sure we always have atleast 1 list
298        size_t ocount = cltr->sched.readyQ.count;
299        size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
[c42b8a1]300
[884f3f67]301        // Do we have to do anything?
302        if( ocount != ncount ) {
303
304                // grow the ready queue
305                with( cltr->sched ) {
306
307                        // Allocate new array (uses realloc and memcpies the data)
308                        readyQ.data = alloc( ncount, readyQ.data`realloc );
[c42b8a1]309
[884f3f67]310                        // Fix the moved data
311                        for( idx; ocount ) {
312                                fix(readyQ.data[idx]);
313                        }
314
315                        // Construct new data
316                        for( idx; ocount ~ ncount) {
317                                (readyQ.data[idx]){};
318                        }
319
320                        // Update original count
321                        readyQ.count = ncount;
[c42b8a1]322                }
323
324
[884f3f67]325                fix_times(cltr->sched.readyQ.tscs, cltr->sched.readyQ.count);
[c42b8a1]326        }
327
[708ae38]328        // Fix the io times
[adb3ea1]329        cltr->sched.io.count = target * __shard_factor.io;
[708ae38]330        fix_times(cltr->sched.io.tscs, cltr->sched.io.count);
331
[884f3f67]332        // realloc the caches
333        cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
[c42b8a1]334
[884f3f67]335        // reassign the clusters.
[c42b8a1]336        reassign_cltr_id(cltr);
337
[adb3ea1]338        cltr->sched.io.data = alloc( cltr->sched.io.count, cltr->sched.io.data`realloc );
339        reassign_cltr_io(cltr);
340
[c42b8a1]341        // Make sure that everything is consistent
[884f3f67]342        /* paranoid */ check_readyQ( cltr );
[6dd4091]343//      /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
[c42b8a1]344
345        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
346
347        /* paranoid */ verify( ready_mutate_islocked() );
348}
349
350// Shrink the ready queue
351void ready_queue_shrink(struct cluster * cltr) {
352        /* paranoid */ verify( ready_mutate_islocked() );
353        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
354
355        // Make sure that everything is consistent
[884f3f67]356        /* paranoid */ check_readyQ( cltr );
[c42b8a1]357
358        int target = cltr->procs.total;
359
[884f3f67]360        with( cltr->sched ) {
[c42b8a1]361                // Remember old count
[884f3f67]362                size_t ocount = readyQ.count;
[c42b8a1]363
364                // Find new count
365                // Make sure we always have atleast 1 list
[884f3f67]366                size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
367                /* paranoid */ verifyf( ocount >= ncount, "Error in shrinking size calculation, %zu >= %zu", ocount, ncount );
368                /* paranoid */ verifyf( ncount == target * __shard_factor.readyq || ncount == __readyq_single_shard,
[bfb9bf5]369                /* paranoid */          "Error in shrinking size calculation, expected %u or %u, got %zu", target * __shard_factor.readyq, __readyq_single_shard, ncount );
[884f3f67]370
371                readyQ.count = ncount;
[c42b8a1]372
373                // for printing count the number of displaced threads
374                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
375                        __attribute__((unused)) size_t displaced = 0;
376                #endif
377
378                // redistribute old data
[884f3f67]379                for( idx; ncount ~ ocount) {
[c42b8a1]380                        // Lock is not strictly needed but makes checking invariants much easier
[2af1943]381                        __attribute__((unused)) bool locked = __atomic_try_acquire(&readyQ.data[idx].l.lock);
[c42b8a1]382                        verify(locked);
383
384                        // As long as we can pop from this lane to push the threads somewhere else in the queue
[884f3f67]385                        while(!is_empty(readyQ.data[idx])) {
[c42b8a1]386                                struct thread$ * thrd;
387                                unsigned long long _;
[884f3f67]388                                [thrd, _] = pop(readyQ.data[idx]);
[c42b8a1]389
390                                push(cltr, thrd, true);
391
392                                // for printing count the number of displaced threads
393                                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
394                                        displaced++;
395                                #endif
396                        }
397
398                        // Unlock the lane
[2af1943]399                        __atomic_unlock(&readyQ.data[idx].l.lock);
[c42b8a1]400
401                        // TODO print the queue statistics here
402
[884f3f67]403                        ^(readyQ.data[idx]){};
[c42b8a1]404                }
405
406                __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
407
408                // Allocate new array (uses realloc and memcpies the data)
[884f3f67]409                readyQ.data = alloc( ncount, readyQ.data`realloc );
[c42b8a1]410
411                // Fix the moved data
[884f3f67]412                for( idx; ncount ) {
413                        fix(readyQ.data[idx]);
[c42b8a1]414                }
415
[884f3f67]416                fix_times(readyQ.tscs, ncount);
[c42b8a1]417        }
[6dd4091]418
[884f3f67]419        cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
[c42b8a1]420
[708ae38]421        // Fix the io times
[adb3ea1]422        cltr->sched.io.count = target * __shard_factor.io;
[708ae38]423        fix_times(cltr->sched.io.tscs, cltr->sched.io.count);
[c42b8a1]424
425        reassign_cltr_id(cltr);
426
[adb3ea1]427        cltr->sched.io.data = alloc( cltr->sched.io.count, cltr->sched.io.data`realloc );
428        reassign_cltr_io(cltr);
429
[c42b8a1]430        // Make sure that everything is consistent
[6dd4091]431//      /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
[884f3f67]432        /* paranoid */ check_readyQ( cltr );
[c42b8a1]433
434        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
435        /* paranoid */ verify( ready_mutate_islocked() );
436}
437
[884f3f67]438void ready_queue_close(struct cluster * cltr) {
439        free( cltr->sched.readyQ.data );
440        free( cltr->sched.readyQ.tscs );
441        cltr->sched.readyQ.data = 0p;
442        cltr->sched.readyQ.tscs = 0p;
443        cltr->sched.readyQ.count = 0;
444
445        free( cltr->sched.io.tscs );
446        free( cltr->sched.caches );
447}
448
[2af1943]449#define nested_offsetof(type, field) ((off_t)(&(((type*)0)-> field)))
450
[c42b8a1]451// Ctor
452void ?{}( __intrusive_lane_t & this ) {
[2af1943]453        this.l.lock = false;
454        this.l.prev = mock_head(this);
455        this.l.anchor.next = 0p;
456        this.l.anchor.ts   = MAX;
[c42b8a1]457        #if !defined(__CFA_NO_STATISTICS__)
[2af1943]458                this.l.cnt  = 0;
[c42b8a1]459        #endif
460
461        // We add a boat-load of assertions here because the anchor code is very fragile
[15c93d8]462        /* paranoid */ _Static_assert( offsetof( thread$, rdy_link ) == nested_offsetof(__intrusive_lane_t, l.anchor) );
463        /* paranoid */ verify( offsetof( thread$, rdy_link ) == nested_offsetof(__intrusive_lane_t, l.anchor) );
464        /* paranoid */ verify( ((uintptr_t)( mock_head(this) ) + offsetof( thread$, rdy_link )) == (uintptr_t)(&this.l.anchor) );
465        /* paranoid */ verify( &mock_head(this)->rdy_link.next == &this.l.anchor.next );
466        /* paranoid */ verify( &mock_head(this)->rdy_link.ts   == &this.l.anchor.ts   );
467        /* paranoid */ verify( mock_head(this)->rdy_link.next == 0p );
468        /* paranoid */ verify( mock_head(this)->rdy_link.ts   == MAX );
[2af1943]469        /* paranoid */ verify( mock_head(this) == this.l.prev );
470        /* paranoid */ verify( __alignof__(__intrusive_lane_t) == 64 );
471        /* paranoid */ verify( __alignof__(this) == 64 );
472        /* paranoid */ verifyf( ((intptr_t)(&this) % 64) == 0, "Expected address to be aligned %p %% 64 == %zd", &this, ((intptr_t)(&this) % 64) );
[c42b8a1]473}
474
[2af1943]475#undef nested_offsetof
476
[c42b8a1]477// Dtor is trivial
478void ^?{}( __intrusive_lane_t & this ) {
479        // Make sure the list is empty
[2af1943]480        /* paranoid */ verify( this.l.anchor.next == 0p );
481        /* paranoid */ verify( this.l.anchor.ts   == MAX );
482        /* paranoid */ verify( mock_head(this)    == this.l.prev );
[c42b8a1]483}
Note: See TracBrowser for help on using the repository browser.