source: libcfa/src/concurrency/kernel/cluster.cfa@ 8492b85

Last change on this file since 8492b85 was 6dd4091, checked in by Peter A. Buhr <pabuhr@…>, 19 months ago

comment out asserts that fail when malloc(0) returns non-null

  • Property mode set to 100644
File size: 15.4 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2022 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// cluster.cfa -- file that includes helpers for subsystem that need cluster wide support
8//
9// Author : Thierry Delisle
10// Created On : Fri Mar 11 12:39:24 2022
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
17
18#include "bits/defs.hfa"
19#include "device/cpu.hfa"
20#include "kernel/cluster.hfa"
21#include "kernel/private.hfa"
22
23#include "stdlib.hfa"
24#include "limits.hfa"
25#include "math.hfa"
26
27#include "ready_subqueue.hfa"
28#include "io/types.hfa"
29
30#include <errno.h>
31#include <unistd.h>
32
33extern "C" {
34 #include <sys/syscall.h> // __NR_xxx
35}
36
37// No overriden function, no environment variable, no define
38// fall back to a magic number
39#ifndef __CFA_MAX_PROCESSORS__
40 #define __CFA_MAX_PROCESSORS__ 1024
41#endif
42
43#if !defined(__CFA_NO_STATISTICS__)
44 #define __STATS(...) __VA_ARGS__
45#else
46 #define __STATS(...)
47#endif
48
49// returns the maximum number of processors the RWLock support
50__attribute__((weak)) unsigned __max_processors() libcfa_public {
51 const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
52 if(!max_cores_s) {
53 __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
54 return __CFA_MAX_PROCESSORS__;
55 }
56
57 char * endptr = 0p;
58 long int max_cores_l = strtol(max_cores_s, &endptr, 10);
59 if(max_cores_l < 1 || max_cores_l > 65535) {
60 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
61 return __CFA_MAX_PROCESSORS__;
62 }
63 if('\0' != *endptr) {
64 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
65 return __CFA_MAX_PROCESSORS__;
66 }
67
68 return max_cores_l;
69}
70
71//=======================================================================
72// Cluster wide reader-writer lock
73//=======================================================================
74void ?{}(__scheduler_RWLock_t & this) {
75 this.lock.max = __max_processors();
76 this.lock.alloc = 0;
77 this.lock.ready = 0;
78 this.lock.data = alloc(this.lock.max);
79 this.lock.write_lock = false;
80
81 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.lock.alloc), &this.lock.alloc));
82 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.lock.ready), &this.lock.ready));
83
84}
85void ^?{}(__scheduler_RWLock_t & this) {
86 free(this.lock.data);
87}
88
89
90//=======================================================================
91// Lock-Free registering/unregistering of threads
92unsigned register_proc_id( void ) with(__scheduler_lock.lock) {
93 bool * handle = (bool *)&kernelTLS().sched_lock;
94
95 // Step - 1 : check if there is already space in the data
96 uint_fast32_t s = ready;
97
98 // Check among all the ready
99 for(uint_fast32_t i = 0; i < s; i++) {
100 bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems
101 /* paranoid */ verify( handle != *cell );
102
103 bool * null = 0p; // Re-write every loop since compare thrashes it
104 if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null
105 && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
106 /* paranoid */ verify(i < ready);
107 /* paranoid */ verify( (kernelTLS().sched_id = i, true) );
108 return i;
109 }
110 }
111
112 if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock.lock.max);
113
114 // Step - 2 : F&A to get a new spot in the array.
115 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
116 if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock.lock.max);
117
118 // Step - 3 : Mark space as used and then publish it.
119 data[n] = handle;
120 while() {
121 unsigned copy = n;
122 if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
123 && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
124 break;
125 Pause();
126 }
127
128 // Return new spot.
129 /* paranoid */ verify(n < ready);
130 /* paranoid */ verify( (kernelTLS().sched_id = n, true) );
131 return n;
132}
133
134void unregister_proc_id( unsigned id ) with(__scheduler_lock.lock) {
135 /* paranoid */ verify(id < ready);
136 /* paranoid */ verify(id == kernelTLS().sched_id);
137 /* paranoid */ verify(data[id] == &kernelTLS().sched_lock);
138
139 bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems
140
141 __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
142}
143
144//-----------------------------------------------------------------------
145// Writer side : acquire when changing the ready queue, e.g. adding more
146// queues or removing them.
147uint_fast32_t ready_mutate_lock( void ) with(__scheduler_lock.lock) {
148 /* paranoid */ verify( ! __preemption_enabled() );
149
150 // Step 1 : lock global lock
151 // It is needed to avoid processors that register mid Critical-Section
152 // to simply lock their own lock and enter.
153 __atomic_acquire( &write_lock );
154
155 // Make sure we won't deadlock ourself
156 // Checking before acquiring the writer lock isn't safe
157 // because someone else could have locked us.
158 /* paranoid */ verify( ! kernelTLS().sched_lock );
159
160 // Step 2 : lock per-proc lock
161 // Processors that are currently being registered aren't counted
162 // but can't be in read_lock or in the critical section.
163 // All other processors are counted
164 uint_fast32_t s = ready;
165 for(uint_fast32_t i = 0; i < s; i++) {
166 volatile bool * llock = data[i];
167 if(llock) __atomic_acquire( llock );
168 }
169
170 /* paranoid */ verify( ! __preemption_enabled() );
171 return s;
172}
173
174void ready_mutate_unlock( uint_fast32_t last_s ) with(__scheduler_lock.lock) {
175 /* paranoid */ verify( ! __preemption_enabled() );
176
177 // Step 1 : release local locks
178 // This must be done while the global lock is held to avoid
179 // threads that where created mid critical section
180 // to race to lock their local locks and have the writer
181 // immidiately unlock them
182 // Alternative solution : return s in write_lock and pass it to write_unlock
183 for(uint_fast32_t i = 0; i < last_s; i++) {
184 volatile bool * llock = data[i];
185 if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE);
186 }
187
188 // Step 2 : release global lock
189 /*paranoid*/ assert(true == write_lock);
190 __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE);
191
192 /* paranoid */ verify( ! __preemption_enabled() );
193}
194
195//=======================================================================
196// Cluster growth
197static const unsigned __readyq_single_shard = 2;
198
199void ?{}(__timestamp_t & this) { this.t.tv = 0; this.t.ma = 0; }
200void ^?{}(__timestamp_t &) {}
201
202//-----------------------------------------------------------------------
203// Check that all the intrusive queues in the data structure are still consistent
204static void check_readyQ( cluster * cltr ) with (cltr->sched) {
205 #if defined(__CFA_WITH_VERIFY__)
206 {
207 const unsigned lanes_count = readyQ.count;
208 for( idx ; lanes_count ) {
209 __intrusive_lane_t & sl = readyQ.data[idx];
210 assert(!readyQ.data[idx].l.lock);
211
212 if(is_empty(sl)) {
213 assert( sl.l.anchor.next == 0p );
214 assert( sl.l.anchor.ts == MAX );
215 assert( mock_head(sl) == sl.l.prev );
216 } else {
217 assert( sl.l.anchor.next != 0p );
218 assert( sl.l.anchor.ts != MAX );
219 assert( mock_head(sl) != sl.l.prev );
220 }
221 }
222 }
223 #endif
224}
225
226// Call this function of the intrusive list was moved using memcpy
227// fixes the list so that the pointers back to anchors aren't left dangling
228static inline void fix(__intrusive_lane_t & ll) {
229 if(is_empty(ll)) {
230 verify(ll.l.anchor.next == 0p);
231 ll.l.prev = mock_head(ll);
232 }
233}
234
235static void assign_list(unsigned & valrq, unsigned & valio, dlist(struct processor) & list, unsigned count) {
236 struct processor * it = &list`first;
237 for(unsigned i = 0; i < count; i++) {
238 /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
239 it->rdq.id = valrq;
240 it->rdq.target = UINT_MAX;
241 valrq += __shard_factor.readyq;
242 #if defined(CFA_HAVE_LINUX_IO_URING_H)
243 it->io.ctx->cq.id = valio;
244 it->io.target = UINT_MAX;
245 valio += __shard_factor.io;
246 #endif
247 it = &(*it)`next;
248 }
249}
250
251static void reassign_cltr_id(struct cluster * cltr) {
252 unsigned prefrq = 0;
253 unsigned prefio = 0;
254 assign_list(prefrq, prefio, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
255 assign_list(prefrq, prefio, cltr->procs.idles , cltr->procs.idle );
256}
257
258#if defined(CFA_HAVE_LINUX_IO_URING_H)
259 static void assign_io(io_context$ ** data, size_t count, dlist(struct processor) & list) {
260 struct processor * it = &list`first;
261 while(it) {
262 /* paranoid */ verifyf( it, "Unexpected null iterator\n");
263 /* paranoid */ verifyf( it->io.ctx->cq.id < count, "Processor %p has id %u above count %zu\n", it, it->rdq.id, count);
264 data[it->io.ctx->cq.id] = it->io.ctx;
265 it = &(*it)`next;
266 }
267 }
268
269 static void reassign_cltr_io(struct cluster * cltr) {
270 assign_io(cltr->sched.io.data, cltr->sched.io.count, cltr->procs.actives);
271 assign_io(cltr->sched.io.data, cltr->sched.io.count, cltr->procs.idles );
272 }
273#else
274 static void reassign_cltr_io(struct cluster *) {}
275#endif
276
277static void fix_times( __timestamp_t * volatile & tscs, unsigned count ) {
278 tscs = alloc(count, tscs`realloc);
279 for(i; count) {
280 tscs[i].t.tv = rdtscl();
281 tscs[i].t.ma = 0;
282 }
283}
284
285// Grow the ready queue
286void ready_queue_grow(struct cluster * cltr) {
287 int target = cltr->procs.total;
288
289 /* paranoid */ verify( ready_mutate_islocked() );
290 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
291
292 // Make sure that everything is consistent
293 /* paranoid */ check_readyQ( cltr );
294
295
296 // Find new count
297 // Make sure we always have atleast 1 list
298 size_t ocount = cltr->sched.readyQ.count;
299 size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
300
301 // Do we have to do anything?
302 if( ocount != ncount ) {
303
304 // grow the ready queue
305 with( cltr->sched ) {
306
307 // Allocate new array (uses realloc and memcpies the data)
308 readyQ.data = alloc( ncount, readyQ.data`realloc );
309
310 // Fix the moved data
311 for( idx; ocount ) {
312 fix(readyQ.data[idx]);
313 }
314
315 // Construct new data
316 for( idx; ocount ~ ncount) {
317 (readyQ.data[idx]){};
318 }
319
320 // Update original count
321 readyQ.count = ncount;
322 }
323
324
325 fix_times(cltr->sched.readyQ.tscs, cltr->sched.readyQ.count);
326 }
327
328 // Fix the io times
329 cltr->sched.io.count = target * __shard_factor.io;
330 fix_times(cltr->sched.io.tscs, cltr->sched.io.count);
331
332 // realloc the caches
333 cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
334
335 // reassign the clusters.
336 reassign_cltr_id(cltr);
337
338 cltr->sched.io.data = alloc( cltr->sched.io.count, cltr->sched.io.data`realloc );
339 reassign_cltr_io(cltr);
340
341 // Make sure that everything is consistent
342 /* paranoid */ check_readyQ( cltr );
343// /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
344
345 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
346
347 /* paranoid */ verify( ready_mutate_islocked() );
348}
349
350// Shrink the ready queue
351void ready_queue_shrink(struct cluster * cltr) {
352 /* paranoid */ verify( ready_mutate_islocked() );
353 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
354
355 // Make sure that everything is consistent
356 /* paranoid */ check_readyQ( cltr );
357
358 int target = cltr->procs.total;
359
360 with( cltr->sched ) {
361 // Remember old count
362 size_t ocount = readyQ.count;
363
364 // Find new count
365 // Make sure we always have atleast 1 list
366 size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
367 /* paranoid */ verifyf( ocount >= ncount, "Error in shrinking size calculation, %zu >= %zu", ocount, ncount );
368 /* paranoid */ verifyf( ncount == target * __shard_factor.readyq || ncount == __readyq_single_shard,
369 /* paranoid */ "Error in shrinking size calculation, expected %u or %u, got %zu", target * __shard_factor.readyq, __readyq_single_shard, ncount );
370
371 readyQ.count = ncount;
372
373 // for printing count the number of displaced threads
374 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
375 __attribute__((unused)) size_t displaced = 0;
376 #endif
377
378 // redistribute old data
379 for( idx; ncount ~ ocount) {
380 // Lock is not strictly needed but makes checking invariants much easier
381 __attribute__((unused)) bool locked = __atomic_try_acquire(&readyQ.data[idx].l.lock);
382 verify(locked);
383
384 // As long as we can pop from this lane to push the threads somewhere else in the queue
385 while(!is_empty(readyQ.data[idx])) {
386 struct thread$ * thrd;
387 unsigned long long _;
388 [thrd, _] = pop(readyQ.data[idx]);
389
390 push(cltr, thrd, true);
391
392 // for printing count the number of displaced threads
393 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
394 displaced++;
395 #endif
396 }
397
398 // Unlock the lane
399 __atomic_unlock(&readyQ.data[idx].l.lock);
400
401 // TODO print the queue statistics here
402
403 ^(readyQ.data[idx]){};
404 }
405
406 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
407
408 // Allocate new array (uses realloc and memcpies the data)
409 readyQ.data = alloc( ncount, readyQ.data`realloc );
410
411 // Fix the moved data
412 for( idx; ncount ) {
413 fix(readyQ.data[idx]);
414 }
415
416 fix_times(readyQ.tscs, ncount);
417 }
418
419 cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
420
421 // Fix the io times
422 cltr->sched.io.count = target * __shard_factor.io;
423 fix_times(cltr->sched.io.tscs, cltr->sched.io.count);
424
425 reassign_cltr_id(cltr);
426
427 cltr->sched.io.data = alloc( cltr->sched.io.count, cltr->sched.io.data`realloc );
428 reassign_cltr_io(cltr);
429
430 // Make sure that everything is consistent
431// /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
432 /* paranoid */ check_readyQ( cltr );
433
434 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
435 /* paranoid */ verify( ready_mutate_islocked() );
436}
437
438void ready_queue_close(struct cluster * cltr) {
439 free( cltr->sched.readyQ.data );
440 free( cltr->sched.readyQ.tscs );
441 cltr->sched.readyQ.data = 0p;
442 cltr->sched.readyQ.tscs = 0p;
443 cltr->sched.readyQ.count = 0;
444
445 free( cltr->sched.io.tscs );
446 free( cltr->sched.caches );
447}
448
449#define nested_offsetof(type, field) ((off_t)(&(((type*)0)-> field)))
450
451// Ctor
452void ?{}( __intrusive_lane_t & this ) {
453 this.l.lock = false;
454 this.l.prev = mock_head(this);
455 this.l.anchor.next = 0p;
456 this.l.anchor.ts = MAX;
457 #if !defined(__CFA_NO_STATISTICS__)
458 this.l.cnt = 0;
459 #endif
460
461 // We add a boat-load of assertions here because the anchor code is very fragile
462 /* paranoid */ _Static_assert( offsetof( thread$, rdy_link ) == nested_offsetof(__intrusive_lane_t, l.anchor) );
463 /* paranoid */ verify( offsetof( thread$, rdy_link ) == nested_offsetof(__intrusive_lane_t, l.anchor) );
464 /* paranoid */ verify( ((uintptr_t)( mock_head(this) ) + offsetof( thread$, rdy_link )) == (uintptr_t)(&this.l.anchor) );
465 /* paranoid */ verify( &mock_head(this)->rdy_link.next == &this.l.anchor.next );
466 /* paranoid */ verify( &mock_head(this)->rdy_link.ts == &this.l.anchor.ts );
467 /* paranoid */ verify( mock_head(this)->rdy_link.next == 0p );
468 /* paranoid */ verify( mock_head(this)->rdy_link.ts == MAX );
469 /* paranoid */ verify( mock_head(this) == this.l.prev );
470 /* paranoid */ verify( __alignof__(__intrusive_lane_t) == 64 );
471 /* paranoid */ verify( __alignof__(this) == 64 );
472 /* paranoid */ verifyf( ((intptr_t)(&this) % 64) == 0, "Expected address to be aligned %p %% 64 == %zd", &this, ((intptr_t)(&this) % 64) );
473}
474
475#undef nested_offsetof
476
477// Dtor is trivial
478void ^?{}( __intrusive_lane_t & this ) {
479 // Make sure the list is empty
480 /* paranoid */ verify( this.l.anchor.next == 0p );
481 /* paranoid */ verify( this.l.anchor.ts == MAX );
482 /* paranoid */ verify( mock_head(this) == this.l.prev );
483}
Note: See TracBrowser for help on using the repository browser.