source: libcfa/src/concurrency/kernel/cluster.cfa @ f678c53b

Last change on this file since f678c53b was 6dd4091, checked in by Peter A. Buhr <pabuhr@…>, 7 months ago

comment out asserts that fail when malloc(0) returns non-null

  • Property mode set to 100644
File size: 15.4 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2022 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// cluster.cfa -- file that includes helpers for subsystem that need cluster wide support
8//
9// Author           : Thierry Delisle
10// Created On       : Fri Mar 11 12:39:24 2022
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#define __cforall_thread__
17
18#include "bits/defs.hfa"
19#include "device/cpu.hfa"
20#include "kernel/cluster.hfa"
21#include "kernel/private.hfa"
22
23#include "stdlib.hfa"
24#include "limits.hfa"
25#include "math.hfa"
26
27#include "ready_subqueue.hfa"
28#include "io/types.hfa"
29
30#include <errno.h>
31#include <unistd.h>
32
33extern "C" {
34        #include <sys/syscall.h>  // __NR_xxx
35}
36
37// No overriden function, no environment variable, no define
38// fall back to a magic number
39#ifndef __CFA_MAX_PROCESSORS__
40        #define __CFA_MAX_PROCESSORS__ 1024
41#endif
42
43#if !defined(__CFA_NO_STATISTICS__)
44        #define __STATS(...) __VA_ARGS__
45#else
46        #define __STATS(...)
47#endif
48
49// returns the maximum number of processors the RWLock support
50__attribute__((weak)) unsigned __max_processors() libcfa_public {
51        const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
52        if(!max_cores_s) {
53                __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
54                return __CFA_MAX_PROCESSORS__;
55        }
56
57        char * endptr = 0p;
58        long int max_cores_l = strtol(max_cores_s, &endptr, 10);
59        if(max_cores_l < 1 || max_cores_l > 65535) {
60                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
61                return __CFA_MAX_PROCESSORS__;
62        }
63        if('\0' != *endptr) {
64                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
65                return __CFA_MAX_PROCESSORS__;
66        }
67
68        return max_cores_l;
69}
70
71//=======================================================================
72// Cluster wide reader-writer lock
73//=======================================================================
74void  ?{}(__scheduler_RWLock_t & this) {
75        this.lock.max   = __max_processors();
76        this.lock.alloc = 0;
77        this.lock.ready = 0;
78        this.lock.data  = alloc(this.lock.max);
79        this.lock.write_lock  = false;
80
81        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.lock.alloc), &this.lock.alloc));
82        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.lock.ready), &this.lock.ready));
83
84}
85void ^?{}(__scheduler_RWLock_t & this) {
86        free(this.lock.data);
87}
88
89
90//=======================================================================
91// Lock-Free registering/unregistering of threads
92unsigned register_proc_id( void ) with(__scheduler_lock.lock) {
93        bool * handle = (bool *)&kernelTLS().sched_lock;
94
95        // Step - 1 : check if there is already space in the data
96        uint_fast32_t s = ready;
97
98        // Check among all the ready
99        for(uint_fast32_t i = 0; i < s; i++) {
100                bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems
101                /* paranoid */ verify( handle != *cell );
102
103                bool * null = 0p; // Re-write every loop since compare thrashes it
104                if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null
105                        && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
106                        /* paranoid */ verify(i < ready);
107                        /* paranoid */ verify( (kernelTLS().sched_id = i, true) );
108                        return i;
109                }
110        }
111
112        if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock.lock.max);
113
114        // Step - 2 : F&A to get a new spot in the array.
115        uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
116        if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock.lock.max);
117
118        // Step - 3 : Mark space as used and then publish it.
119        data[n] = handle;
120        while() {
121                unsigned copy = n;
122                if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
123                        && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
124                        break;
125                Pause();
126        }
127
128        // Return new spot.
129        /* paranoid */ verify(n < ready);
130        /* paranoid */ verify( (kernelTLS().sched_id = n, true) );
131        return n;
132}
133
134void unregister_proc_id( unsigned id ) with(__scheduler_lock.lock) {
135        /* paranoid */ verify(id < ready);
136        /* paranoid */ verify(id == kernelTLS().sched_id);
137        /* paranoid */ verify(data[id] == &kernelTLS().sched_lock);
138
139        bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems
140
141        __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
142}
143
144//-----------------------------------------------------------------------
145// Writer side : acquire when changing the ready queue, e.g. adding more
146//  queues or removing them.
147uint_fast32_t ready_mutate_lock( void ) with(__scheduler_lock.lock) {
148        /* paranoid */ verify( ! __preemption_enabled() );
149
150        // Step 1 : lock global lock
151        // It is needed to avoid processors that register mid Critical-Section
152        //   to simply lock their own lock and enter.
153        __atomic_acquire( &write_lock );
154
155        // Make sure we won't deadlock ourself
156        // Checking before acquiring the writer lock isn't safe
157        // because someone else could have locked us.
158        /* paranoid */ verify( ! kernelTLS().sched_lock );
159
160        // Step 2 : lock per-proc lock
161        // Processors that are currently being registered aren't counted
162        //   but can't be in read_lock or in the critical section.
163        // All other processors are counted
164        uint_fast32_t s = ready;
165        for(uint_fast32_t i = 0; i < s; i++) {
166                volatile bool * llock = data[i];
167                if(llock) __atomic_acquire( llock );
168        }
169
170        /* paranoid */ verify( ! __preemption_enabled() );
171        return s;
172}
173
174void ready_mutate_unlock( uint_fast32_t last_s ) with(__scheduler_lock.lock) {
175        /* paranoid */ verify( ! __preemption_enabled() );
176
177        // Step 1 : release local locks
178        // This must be done while the global lock is held to avoid
179        //   threads that where created mid critical section
180        //   to race to lock their local locks and have the writer
181        //   immidiately unlock them
182        // Alternative solution : return s in write_lock and pass it to write_unlock
183        for(uint_fast32_t i = 0; i < last_s; i++) {
184                volatile bool * llock = data[i];
185                if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE);
186        }
187
188        // Step 2 : release global lock
189        /*paranoid*/ assert(true == write_lock);
190        __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE);
191
192        /* paranoid */ verify( ! __preemption_enabled() );
193}
194
195//=======================================================================
196// Cluster growth
197static const unsigned __readyq_single_shard = 2;
198
199void  ?{}(__timestamp_t & this) { this.t.tv = 0; this.t.ma = 0; }
200void ^?{}(__timestamp_t &) {}
201
202//-----------------------------------------------------------------------
203// Check that all the intrusive queues in the data structure are still consistent
204static void check_readyQ( cluster * cltr ) with (cltr->sched) {
205        #if defined(__CFA_WITH_VERIFY__)
206                {
207                        const unsigned lanes_count = readyQ.count;
208                        for( idx ; lanes_count ) {
209                                __intrusive_lane_t & sl = readyQ.data[idx];
210                                assert(!readyQ.data[idx].l.lock);
211
212                                        if(is_empty(sl)) {
213                                                assert( sl.l.anchor.next == 0p );
214                                                assert( sl.l.anchor.ts   == MAX );
215                                                assert( mock_head(sl)  == sl.l.prev );
216                                        } else {
217                                                assert( sl.l.anchor.next != 0p );
218                                                assert( sl.l.anchor.ts   != MAX );
219                                                assert( mock_head(sl)  != sl.l.prev );
220                                        }
221                        }
222                }
223        #endif
224}
225
226// Call this function of the intrusive list was moved using memcpy
227// fixes the list so that the pointers back to anchors aren't left dangling
228static inline void fix(__intrusive_lane_t & ll) {
229        if(is_empty(ll)) {
230                verify(ll.l.anchor.next == 0p);
231                ll.l.prev = mock_head(ll);
232        }
233}
234
235static void assign_list(unsigned & valrq, unsigned & valio, dlist(struct processor) & list, unsigned count) {
236        struct processor * it = &list`first;
237        for(unsigned i = 0; i < count; i++) {
238                /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
239                it->rdq.id = valrq;
240                it->rdq.target = UINT_MAX;
241                valrq += __shard_factor.readyq;
242                #if defined(CFA_HAVE_LINUX_IO_URING_H)
243                        it->io.ctx->cq.id = valio;
244                        it->io.target = UINT_MAX;
245                        valio += __shard_factor.io;
246                #endif
247                it = &(*it)`next;
248        }
249}
250
251static void reassign_cltr_id(struct cluster * cltr) {
252        unsigned prefrq = 0;
253        unsigned prefio = 0;
254        assign_list(prefrq, prefio, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
255        assign_list(prefrq, prefio, cltr->procs.idles  , cltr->procs.idle );
256}
257
258#if defined(CFA_HAVE_LINUX_IO_URING_H)
259        static void assign_io(io_context$ ** data, size_t count, dlist(struct processor) & list) {
260                struct processor * it = &list`first;
261                while(it) {
262                        /* paranoid */ verifyf( it, "Unexpected null iterator\n");
263                        /* paranoid */ verifyf( it->io.ctx->cq.id < count, "Processor %p has id %u above count %zu\n", it, it->rdq.id, count);
264                        data[it->io.ctx->cq.id] = it->io.ctx;
265                        it = &(*it)`next;
266                }
267        }
268
269        static void reassign_cltr_io(struct cluster * cltr) {
270                assign_io(cltr->sched.io.data, cltr->sched.io.count, cltr->procs.actives);
271                assign_io(cltr->sched.io.data, cltr->sched.io.count, cltr->procs.idles  );
272        }
273#else
274        static void reassign_cltr_io(struct cluster *) {}
275#endif
276
277static void fix_times( __timestamp_t * volatile & tscs, unsigned count ) {
278        tscs = alloc(count, tscs`realloc);
279        for(i; count) {
280                tscs[i].t.tv = rdtscl();
281                tscs[i].t.ma = 0;
282        }
283}
284
285// Grow the ready queue
286void ready_queue_grow(struct cluster * cltr) {
287        int target = cltr->procs.total;
288
289        /* paranoid */ verify( ready_mutate_islocked() );
290        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
291
292        // Make sure that everything is consistent
293        /* paranoid */ check_readyQ( cltr );
294
295
296        // Find new count
297        // Make sure we always have atleast 1 list
298        size_t ocount = cltr->sched.readyQ.count;
299        size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
300
301        // Do we have to do anything?
302        if( ocount != ncount ) {
303
304                // grow the ready queue
305                with( cltr->sched ) {
306
307                        // Allocate new array (uses realloc and memcpies the data)
308                        readyQ.data = alloc( ncount, readyQ.data`realloc );
309
310                        // Fix the moved data
311                        for( idx; ocount ) {
312                                fix(readyQ.data[idx]);
313                        }
314
315                        // Construct new data
316                        for( idx; ocount ~ ncount) {
317                                (readyQ.data[idx]){};
318                        }
319
320                        // Update original count
321                        readyQ.count = ncount;
322                }
323
324
325                fix_times(cltr->sched.readyQ.tscs, cltr->sched.readyQ.count);
326        }
327
328        // Fix the io times
329        cltr->sched.io.count = target * __shard_factor.io;
330        fix_times(cltr->sched.io.tscs, cltr->sched.io.count);
331
332        // realloc the caches
333        cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
334
335        // reassign the clusters.
336        reassign_cltr_id(cltr);
337
338        cltr->sched.io.data = alloc( cltr->sched.io.count, cltr->sched.io.data`realloc );
339        reassign_cltr_io(cltr);
340
341        // Make sure that everything is consistent
342        /* paranoid */ check_readyQ( cltr );
343//      /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
344
345        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
346
347        /* paranoid */ verify( ready_mutate_islocked() );
348}
349
350// Shrink the ready queue
351void ready_queue_shrink(struct cluster * cltr) {
352        /* paranoid */ verify( ready_mutate_islocked() );
353        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
354
355        // Make sure that everything is consistent
356        /* paranoid */ check_readyQ( cltr );
357
358        int target = cltr->procs.total;
359
360        with( cltr->sched ) {
361                // Remember old count
362                size_t ocount = readyQ.count;
363
364                // Find new count
365                // Make sure we always have atleast 1 list
366                size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
367                /* paranoid */ verifyf( ocount >= ncount, "Error in shrinking size calculation, %zu >= %zu", ocount, ncount );
368                /* paranoid */ verifyf( ncount == target * __shard_factor.readyq || ncount == __readyq_single_shard,
369                /* paranoid */          "Error in shrinking size calculation, expected %u or %u, got %zu", target * __shard_factor.readyq, __readyq_single_shard, ncount );
370
371                readyQ.count = ncount;
372
373                // for printing count the number of displaced threads
374                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
375                        __attribute__((unused)) size_t displaced = 0;
376                #endif
377
378                // redistribute old data
379                for( idx; ncount ~ ocount) {
380                        // Lock is not strictly needed but makes checking invariants much easier
381                        __attribute__((unused)) bool locked = __atomic_try_acquire(&readyQ.data[idx].l.lock);
382                        verify(locked);
383
384                        // As long as we can pop from this lane to push the threads somewhere else in the queue
385                        while(!is_empty(readyQ.data[idx])) {
386                                struct thread$ * thrd;
387                                unsigned long long _;
388                                [thrd, _] = pop(readyQ.data[idx]);
389
390                                push(cltr, thrd, true);
391
392                                // for printing count the number of displaced threads
393                                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
394                                        displaced++;
395                                #endif
396                        }
397
398                        // Unlock the lane
399                        __atomic_unlock(&readyQ.data[idx].l.lock);
400
401                        // TODO print the queue statistics here
402
403                        ^(readyQ.data[idx]){};
404                }
405
406                __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
407
408                // Allocate new array (uses realloc and memcpies the data)
409                readyQ.data = alloc( ncount, readyQ.data`realloc );
410
411                // Fix the moved data
412                for( idx; ncount ) {
413                        fix(readyQ.data[idx]);
414                }
415
416                fix_times(readyQ.tscs, ncount);
417        }
418
419        cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
420
421        // Fix the io times
422        cltr->sched.io.count = target * __shard_factor.io;
423        fix_times(cltr->sched.io.tscs, cltr->sched.io.count);
424
425        reassign_cltr_id(cltr);
426
427        cltr->sched.io.data = alloc( cltr->sched.io.count, cltr->sched.io.data`realloc );
428        reassign_cltr_io(cltr);
429
430        // Make sure that everything is consistent
431//      /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
432        /* paranoid */ check_readyQ( cltr );
433
434        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
435        /* paranoid */ verify( ready_mutate_islocked() );
436}
437
438void ready_queue_close(struct cluster * cltr) {
439        free( cltr->sched.readyQ.data );
440        free( cltr->sched.readyQ.tscs );
441        cltr->sched.readyQ.data = 0p;
442        cltr->sched.readyQ.tscs = 0p;
443        cltr->sched.readyQ.count = 0;
444
445        free( cltr->sched.io.tscs );
446        free( cltr->sched.caches );
447}
448
449#define nested_offsetof(type, field) ((off_t)(&(((type*)0)-> field)))
450
451// Ctor
452void ?{}( __intrusive_lane_t & this ) {
453        this.l.lock = false;
454        this.l.prev = mock_head(this);
455        this.l.anchor.next = 0p;
456        this.l.anchor.ts   = MAX;
457        #if !defined(__CFA_NO_STATISTICS__)
458                this.l.cnt  = 0;
459        #endif
460
461        // We add a boat-load of assertions here because the anchor code is very fragile
462        /* paranoid */ _Static_assert( offsetof( thread$, rdy_link ) == nested_offsetof(__intrusive_lane_t, l.anchor) );
463        /* paranoid */ verify( offsetof( thread$, rdy_link ) == nested_offsetof(__intrusive_lane_t, l.anchor) );
464        /* paranoid */ verify( ((uintptr_t)( mock_head(this) ) + offsetof( thread$, rdy_link )) == (uintptr_t)(&this.l.anchor) );
465        /* paranoid */ verify( &mock_head(this)->rdy_link.next == &this.l.anchor.next );
466        /* paranoid */ verify( &mock_head(this)->rdy_link.ts   == &this.l.anchor.ts   );
467        /* paranoid */ verify( mock_head(this)->rdy_link.next == 0p );
468        /* paranoid */ verify( mock_head(this)->rdy_link.ts   == MAX );
469        /* paranoid */ verify( mock_head(this) == this.l.prev );
470        /* paranoid */ verify( __alignof__(__intrusive_lane_t) == 64 );
471        /* paranoid */ verify( __alignof__(this) == 64 );
472        /* paranoid */ verifyf( ((intptr_t)(&this) % 64) == 0, "Expected address to be aligned %p %% 64 == %zd", &this, ((intptr_t)(&this) % 64) );
473}
474
475#undef nested_offsetof
476
477// Dtor is trivial
478void ^?{}( __intrusive_lane_t & this ) {
479        // Make sure the list is empty
480        /* paranoid */ verify( this.l.anchor.next == 0p );
481        /* paranoid */ verify( this.l.anchor.ts   == MAX );
482        /* paranoid */ verify( mock_head(this)    == this.l.prev );
483}
Note: See TracBrowser for help on using the repository browser.