source: libcfa/src/concurrency/kernel/cluster.cfa@ 14f6a3cb

ADT ast-experimental
Last change on this file since 14f6a3cb was f5f2768, checked in by Peter A. Buhr <pabuhr@…>, 3 years ago

make _GNU_SOURCE default, change IO to use SOCKADDR_ARG and CONST_SOCKADDR_ARG, move sys/socket.h to first include because of anonymous naming problem

  • Property mode set to 100644
File size: 18.3 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2022 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// cluster.cfa -- file that includes helpers for subsystem that need cluster wide support
8//
9// Author : Thierry Delisle
10// Created On : Fri Mar 11 12:39:24 2022
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
17
18#include "bits/defs.hfa"
19#include "device/cpu.hfa"
20#include "kernel/cluster.hfa"
21#include "kernel/private.hfa"
22
23#include "stdlib.hfa"
24#include "limits.hfa"
25#include "math.hfa"
26
27#include "ready_subqueue.hfa"
28#include "io/types.hfa"
29
30#include <errno.h>
31#include <unistd.h>
32
33extern "C" {
34 #include <sys/syscall.h> // __NR_xxx
35}
36
37// No overriden function, no environment variable, no define
38// fall back to a magic number
39#ifndef __CFA_MAX_PROCESSORS__
40 #define __CFA_MAX_PROCESSORS__ 1024
41#endif
42
43#if !defined(__CFA_NO_STATISTICS__)
44 #define __STATS(...) __VA_ARGS__
45#else
46 #define __STATS(...)
47#endif
48
49// returns the maximum number of processors the RWLock support
50__attribute__((weak)) unsigned __max_processors() libcfa_public {
51 const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
52 if(!max_cores_s) {
53 __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
54 return __CFA_MAX_PROCESSORS__;
55 }
56
57 char * endptr = 0p;
58 long int max_cores_l = strtol(max_cores_s, &endptr, 10);
59 if(max_cores_l < 1 || max_cores_l > 65535) {
60 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
61 return __CFA_MAX_PROCESSORS__;
62 }
63 if('\0' != *endptr) {
64 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
65 return __CFA_MAX_PROCESSORS__;
66 }
67
68 return max_cores_l;
69}
70
71#if defined(CFA_HAVE_LINUX_LIBRSEQ)
72 // No forward declaration needed
73 #define __kernel_rseq_register rseq_register_current_thread
74 #define __kernel_rseq_unregister rseq_unregister_current_thread
75#elif defined(CFA_HAVE_LINUX_RSEQ_H)
76 static void __kernel_raw_rseq_register (void);
77 static void __kernel_raw_rseq_unregister(void);
78
79 #define __kernel_rseq_register __kernel_raw_rseq_register
80 #define __kernel_rseq_unregister __kernel_raw_rseq_unregister
81#else
82 // No forward declaration needed
83 // No initialization needed
84 static inline void noop(void) {}
85
86 #define __kernel_rseq_register noop
87 #define __kernel_rseq_unregister noop
88#endif
89
90//=======================================================================
91// Cluster wide reader-writer lock
92//=======================================================================
93void ?{}(__scheduler_RWLock_t & this) {
94 this.lock.max = __max_processors();
95 this.lock.alloc = 0;
96 this.lock.ready = 0;
97 this.lock.data = alloc(this.lock.max);
98 this.lock.write_lock = false;
99
100 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.lock.alloc), &this.lock.alloc));
101 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.lock.ready), &this.lock.ready));
102
103}
104void ^?{}(__scheduler_RWLock_t & this) {
105 free(this.lock.data);
106}
107
108
109//=======================================================================
110// Lock-Free registering/unregistering of threads
111unsigned register_proc_id( void ) with(__scheduler_lock.lock) {
112 __kernel_rseq_register();
113
114 bool * handle = (bool *)&kernelTLS().sched_lock;
115
116 // Step - 1 : check if there is already space in the data
117 uint_fast32_t s = ready;
118
119 // Check among all the ready
120 for(uint_fast32_t i = 0; i < s; i++) {
121 bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems
122 /* paranoid */ verify( handle != *cell );
123
124 bool * null = 0p; // Re-write every loop since compare thrashes it
125 if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null
126 && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
127 /* paranoid */ verify(i < ready);
128 /* paranoid */ verify( (kernelTLS().sched_id = i, true) );
129 return i;
130 }
131 }
132
133 if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock.lock.max);
134
135 // Step - 2 : F&A to get a new spot in the array.
136 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
137 if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock.lock.max);
138
139 // Step - 3 : Mark space as used and then publish it.
140 data[n] = handle;
141 while() {
142 unsigned copy = n;
143 if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
144 && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
145 break;
146 Pause();
147 }
148
149 // Return new spot.
150 /* paranoid */ verify(n < ready);
151 /* paranoid */ verify( (kernelTLS().sched_id = n, true) );
152 return n;
153}
154
155void unregister_proc_id( unsigned id ) with(__scheduler_lock.lock) {
156 /* paranoid */ verify(id < ready);
157 /* paranoid */ verify(id == kernelTLS().sched_id);
158 /* paranoid */ verify(data[id] == &kernelTLS().sched_lock);
159
160 bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems
161
162 __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
163
164 __kernel_rseq_unregister();
165}
166
167//-----------------------------------------------------------------------
168// Writer side : acquire when changing the ready queue, e.g. adding more
169// queues or removing them.
170uint_fast32_t ready_mutate_lock( void ) with(__scheduler_lock.lock) {
171 /* paranoid */ verify( ! __preemption_enabled() );
172
173 // Step 1 : lock global lock
174 // It is needed to avoid processors that register mid Critical-Section
175 // to simply lock their own lock and enter.
176 __atomic_acquire( &write_lock );
177
178 // Make sure we won't deadlock ourself
179 // Checking before acquiring the writer lock isn't safe
180 // because someone else could have locked us.
181 /* paranoid */ verify( ! kernelTLS().sched_lock );
182
183 // Step 2 : lock per-proc lock
184 // Processors that are currently being registered aren't counted
185 // but can't be in read_lock or in the critical section.
186 // All other processors are counted
187 uint_fast32_t s = ready;
188 for(uint_fast32_t i = 0; i < s; i++) {
189 volatile bool * llock = data[i];
190 if(llock) __atomic_acquire( llock );
191 }
192
193 /* paranoid */ verify( ! __preemption_enabled() );
194 return s;
195}
196
197void ready_mutate_unlock( uint_fast32_t last_s ) with(__scheduler_lock.lock) {
198 /* paranoid */ verify( ! __preemption_enabled() );
199
200 // Step 1 : release local locks
201 // This must be done while the global lock is held to avoid
202 // threads that where created mid critical section
203 // to race to lock their local locks and have the writer
204 // immidiately unlock them
205 // Alternative solution : return s in write_lock and pass it to write_unlock
206 for(uint_fast32_t i = 0; i < last_s; i++) {
207 volatile bool * llock = data[i];
208 if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE);
209 }
210
211 // Step 2 : release global lock
212 /*paranoid*/ assert(true == write_lock);
213 __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE);
214
215 /* paranoid */ verify( ! __preemption_enabled() );
216}
217
218//=======================================================================
219// Cluster growth
220static const unsigned __readyq_single_shard = 2;
221
222void ?{}(__timestamp_t & this) { this.t.tv = 0; this.t.ma = 0; }
223void ^?{}(__timestamp_t &) {}
224
225//-----------------------------------------------------------------------
226// Check that all the intrusive queues in the data structure are still consistent
227static void check_readyQ( cluster * cltr ) with (cltr->sched) {
228 #if defined(__CFA_WITH_VERIFY__)
229 {
230 const unsigned lanes_count = readyQ.count;
231 for( idx ; lanes_count ) {
232 __intrusive_lane_t & sl = readyQ.data[idx];
233 assert(!readyQ.data[idx].l.lock);
234
235 if(is_empty(sl)) {
236 assert( sl.l.anchor.next == 0p );
237 assert( sl.l.anchor.ts == MAX );
238 assert( mock_head(sl) == sl.l.prev );
239 } else {
240 assert( sl.l.anchor.next != 0p );
241 assert( sl.l.anchor.ts != MAX );
242 assert( mock_head(sl) != sl.l.prev );
243 }
244 }
245 }
246 #endif
247}
248
249// Call this function of the intrusive list was moved using memcpy
250// fixes the list so that the pointers back to anchors aren't left dangling
251static inline void fix(__intrusive_lane_t & ll) {
252 if(is_empty(ll)) {
253 verify(ll.l.anchor.next == 0p);
254 ll.l.prev = mock_head(ll);
255 }
256}
257
258static void assign_list(unsigned & valrq, unsigned & valio, dlist(struct processor) & list, unsigned count) {
259 struct processor * it = &list`first;
260 for(unsigned i = 0; i < count; i++) {
261 /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
262 it->rdq.id = valrq;
263 it->rdq.target = UINT_MAX;
264 valrq += __shard_factor.readyq;
265 #if defined(CFA_HAVE_LINUX_IO_URING_H)
266 it->io.ctx->cq.id = valio;
267 it->io.target = UINT_MAX;
268 valio += __shard_factor.io;
269 #endif
270 it = &(*it)`next;
271 }
272}
273
274static void reassign_cltr_id(struct cluster * cltr) {
275 unsigned prefrq = 0;
276 unsigned prefio = 0;
277 assign_list(prefrq, prefio, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
278 assign_list(prefrq, prefio, cltr->procs.idles , cltr->procs.idle );
279}
280
281#if defined(CFA_HAVE_LINUX_IO_URING_H)
282 static void assign_io(io_context$ ** data, size_t count, dlist(struct processor) & list) {
283 struct processor * it = &list`first;
284 while(it) {
285 /* paranoid */ verifyf( it, "Unexpected null iterator\n");
286 /* paranoid */ verifyf( it->io.ctx->cq.id < count, "Processor %p has id %u above count %zu\n", it, it->rdq.id, count);
287 data[it->io.ctx->cq.id] = it->io.ctx;
288 it = &(*it)`next;
289 }
290 }
291
292 static void reassign_cltr_io(struct cluster * cltr) {
293 assign_io(cltr->sched.io.data, cltr->sched.io.count, cltr->procs.actives);
294 assign_io(cltr->sched.io.data, cltr->sched.io.count, cltr->procs.idles );
295 }
296#else
297 static void reassign_cltr_io(struct cluster *) {}
298#endif
299
300static void fix_times( __timestamp_t * volatile & tscs, unsigned count ) {
301 tscs = alloc(count, tscs`realloc);
302 for(i; count) {
303 tscs[i].t.tv = rdtscl();
304 tscs[i].t.ma = 0;
305 }
306}
307
308// Grow the ready queue
309void ready_queue_grow(struct cluster * cltr) {
310 int target = cltr->procs.total;
311
312 /* paranoid */ verify( ready_mutate_islocked() );
313 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
314
315 // Make sure that everything is consistent
316 /* paranoid */ check_readyQ( cltr );
317
318
319 // Find new count
320 // Make sure we always have atleast 1 list
321 size_t ocount = cltr->sched.readyQ.count;
322 size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
323
324 // Do we have to do anything?
325 if( ocount != ncount ) {
326
327 // grow the ready queue
328 with( cltr->sched ) {
329
330 // Allocate new array (uses realloc and memcpies the data)
331 readyQ.data = alloc( ncount, readyQ.data`realloc );
332
333 // Fix the moved data
334 for( idx; ocount ) {
335 fix(readyQ.data[idx]);
336 }
337
338 // Construct new data
339 for( idx; ocount ~ ncount) {
340 (readyQ.data[idx]){};
341 }
342
343 // Update original count
344 readyQ.count = ncount;
345 }
346
347
348 fix_times(cltr->sched.readyQ.tscs, cltr->sched.readyQ.count);
349 }
350
351 // Fix the io times
352 cltr->sched.io.count = target * __shard_factor.io;
353 fix_times(cltr->sched.io.tscs, cltr->sched.io.count);
354
355 // realloc the caches
356 cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
357
358 // reassign the clusters.
359 reassign_cltr_id(cltr);
360
361 cltr->sched.io.data = alloc( cltr->sched.io.count, cltr->sched.io.data`realloc );
362 reassign_cltr_io(cltr);
363
364 // Make sure that everything is consistent
365 /* paranoid */ check_readyQ( cltr );
366 /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
367
368 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
369
370 /* paranoid */ verify( ready_mutate_islocked() );
371}
372
373// Shrink the ready queue
374void ready_queue_shrink(struct cluster * cltr) {
375 /* paranoid */ verify( ready_mutate_islocked() );
376 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
377
378 // Make sure that everything is consistent
379 /* paranoid */ check_readyQ( cltr );
380
381 int target = cltr->procs.total;
382
383 with( cltr->sched ) {
384 // Remember old count
385 size_t ocount = readyQ.count;
386
387 // Find new count
388 // Make sure we always have atleast 1 list
389 size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
390 /* paranoid */ verifyf( ocount >= ncount, "Error in shrinking size calculation, %zu >= %zu", ocount, ncount );
391 /* paranoid */ verifyf( ncount == target * __shard_factor.readyq || ncount == __readyq_single_shard,
392 /* paranoid */ "Error in shrinking size calculation, expected %u or %u, got %zu", target * __shard_factor.readyq, __readyq_single_shard, ncount );
393
394 readyQ.count = ncount;
395
396 // for printing count the number of displaced threads
397 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
398 __attribute__((unused)) size_t displaced = 0;
399 #endif
400
401 // redistribute old data
402 for( idx; ncount ~ ocount) {
403 // Lock is not strictly needed but makes checking invariants much easier
404 __attribute__((unused)) bool locked = __atomic_try_acquire(&readyQ.data[idx].l.lock);
405 verify(locked);
406
407 // As long as we can pop from this lane to push the threads somewhere else in the queue
408 while(!is_empty(readyQ.data[idx])) {
409 struct thread$ * thrd;
410 unsigned long long _;
411 [thrd, _] = pop(readyQ.data[idx]);
412
413 push(cltr, thrd, true);
414
415 // for printing count the number of displaced threads
416 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
417 displaced++;
418 #endif
419 }
420
421 // Unlock the lane
422 __atomic_unlock(&readyQ.data[idx].l.lock);
423
424 // TODO print the queue statistics here
425
426 ^(readyQ.data[idx]){};
427 }
428
429 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
430
431 // Allocate new array (uses realloc and memcpies the data)
432 readyQ.data = alloc( ncount, readyQ.data`realloc );
433
434 // Fix the moved data
435 for( idx; ncount ) {
436 fix(readyQ.data[idx]);
437 }
438
439 fix_times(readyQ.tscs, ncount);
440 }
441 cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
442
443 // Fix the io times
444 cltr->sched.io.count = target * __shard_factor.io;
445 fix_times(cltr->sched.io.tscs, cltr->sched.io.count);
446
447 reassign_cltr_id(cltr);
448
449 cltr->sched.io.data = alloc( cltr->sched.io.count, cltr->sched.io.data`realloc );
450 reassign_cltr_io(cltr);
451
452 // Make sure that everything is consistent
453 /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
454 /* paranoid */ check_readyQ( cltr );
455
456 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
457 /* paranoid */ verify( ready_mutate_islocked() );
458}
459
460void ready_queue_close(struct cluster * cltr) {
461 free( cltr->sched.readyQ.data );
462 free( cltr->sched.readyQ.tscs );
463 cltr->sched.readyQ.data = 0p;
464 cltr->sched.readyQ.tscs = 0p;
465 cltr->sched.readyQ.count = 0;
466
467 free( cltr->sched.io.tscs );
468 free( cltr->sched.caches );
469}
470
471#define nested_offsetof(type, field) ((off_t)(&(((type*)0)-> field)))
472
473// Ctor
474void ?{}( __intrusive_lane_t & this ) {
475 this.l.lock = false;
476 this.l.prev = mock_head(this);
477 this.l.anchor.next = 0p;
478 this.l.anchor.ts = MAX;
479 #if !defined(__CFA_NO_STATISTICS__)
480 this.l.cnt = 0;
481 #endif
482
483 // We add a boat-load of assertions here because the anchor code is very fragile
484 /* paranoid */ _Static_assert( offsetof( thread$, rdy_link ) == nested_offsetof(__intrusive_lane_t, l.anchor) );
485 /* paranoid */ verify( offsetof( thread$, rdy_link ) == nested_offsetof(__intrusive_lane_t, l.anchor) );
486 /* paranoid */ verify( ((uintptr_t)( mock_head(this) ) + offsetof( thread$, rdy_link )) == (uintptr_t)(&this.l.anchor) );
487 /* paranoid */ verify( &mock_head(this)->rdy_link.next == &this.l.anchor.next );
488 /* paranoid */ verify( &mock_head(this)->rdy_link.ts == &this.l.anchor.ts );
489 /* paranoid */ verify( mock_head(this)->rdy_link.next == 0p );
490 /* paranoid */ verify( mock_head(this)->rdy_link.ts == MAX );
491 /* paranoid */ verify( mock_head(this) == this.l.prev );
492 /* paranoid */ verify( __alignof__(__intrusive_lane_t) == 64 );
493 /* paranoid */ verify( __alignof__(this) == 64 );
494 /* paranoid */ verifyf( ((intptr_t)(&this) % 64) == 0, "Expected address to be aligned %p %% 64 == %zd", &this, ((intptr_t)(&this) % 64) );
495}
496
497#undef nested_offsetof
498
499// Dtor is trivial
500void ^?{}( __intrusive_lane_t & this ) {
501 // Make sure the list is empty
502 /* paranoid */ verify( this.l.anchor.next == 0p );
503 /* paranoid */ verify( this.l.anchor.ts == MAX );
504 /* paranoid */ verify( mock_head(this) == this.l.prev );
505}
506
507#if defined(CFA_HAVE_LINUX_LIBRSEQ)
508 // No definition needed
509#elif defined(CFA_HAVE_LINUX_RSEQ_H)
510
511 #if defined( __x86_64 ) || defined( __i386 )
512 #define RSEQ_SIG 0x53053053
513 #elif defined( __ARM_ARCH )
514 #ifdef __ARMEB__
515 #define RSEQ_SIG 0xf3def5e7 /* udf #24035 ; 0x5de3 (ARMv6+) */
516 #else
517 #define RSEQ_SIG 0xe7f5def3 /* udf #24035 ; 0x5de3 */
518 #endif
519 #endif
520
521 extern void __disable_interrupts_hard();
522 extern void __enable_interrupts_hard();
523
524 static void __kernel_raw_rseq_register (void) {
525 /* paranoid */ verify( __cfaabi_rseq.cpu_id == RSEQ_CPU_ID_UNINITIALIZED );
526
527 // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, (sigset_t *)0p, _NSIG / 8);
528 int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, RSEQ_SIG);
529 if(ret != 0) {
530 int e = errno;
531 switch(e) {
532 case EINVAL: abort("KERNEL ERROR: rseq register invalid argument");
533 case ENOSYS: abort("KERNEL ERROR: rseq register no supported");
534 case EFAULT: abort("KERNEL ERROR: rseq register with invalid argument");
535 case EBUSY : abort("KERNEL ERROR: rseq register already registered");
536 case EPERM : abort("KERNEL ERROR: rseq register sig argument on unregistration does not match the signature received on registration");
537 default: abort("KERNEL ERROR: rseq register unexpected return %d", e);
538 }
539 }
540 }
541
542 static void __kernel_raw_rseq_unregister(void) {
543 /* paranoid */ verify( __cfaabi_rseq.cpu_id >= 0 );
544
545 // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, (sigset_t *)0p, _NSIG / 8);
546 int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, RSEQ_SIG);
547 if(ret != 0) {
548 int e = errno;
549 switch(e) {
550 case EINVAL: abort("KERNEL ERROR: rseq unregister invalid argument");
551 case ENOSYS: abort("KERNEL ERROR: rseq unregister no supported");
552 case EFAULT: abort("KERNEL ERROR: rseq unregister with invalid argument");
553 case EBUSY : abort("KERNEL ERROR: rseq unregister already registered");
554 case EPERM : abort("KERNEL ERROR: rseq unregister sig argument on unregistration does not match the signature received on registration");
555 default: abort("KERNEL ERROR: rseq unregisteunexpected return %d", e);
556 }
557 }
558 }
559#else
560 // No definition needed
561#endif
Note: See TracBrowser for help on using the repository browser.