source: libcfa/src/concurrency/kernel/cluster.cfa@ cefd0b9

ADT ast-experimental pthread-emulation
Last change on this file since cefd0b9 was cd3fc46, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Changed scheduler lock to remove one level of pointer.

  • Property mode set to 100644
File size: 18.2 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2022 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// cluster.cfa -- file that includes helpers for subsystem that need cluster wide support
8//
9// Author : Thierry Delisle
10// Created On : Fri Mar 11 12:39:24 2022
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
17#define _GNU_SOURCE
18
19#include "bits/defs.hfa"
20#include "device/cpu.hfa"
21#include "kernel/cluster.hfa"
22#include "kernel/private.hfa"
23
24#include "stdlib.hfa"
25#include "limits.hfa"
26#include "math.hfa"
27
28#include "ready_subqueue.hfa"
29#include "io/types.hfa"
30
31#include <errno.h>
32#include <unistd.h>
33
34extern "C" {
35 #include <sys/syscall.h> // __NR_xxx
36}
37
38// No overriden function, no environment variable, no define
39// fall back to a magic number
40#ifndef __CFA_MAX_PROCESSORS__
41 #define __CFA_MAX_PROCESSORS__ 1024
42#endif
43
44#if !defined(__CFA_NO_STATISTICS__)
45 #define __STATS(...) __VA_ARGS__
46#else
47 #define __STATS(...)
48#endif
49
50// returns the maximum number of processors the RWLock support
51__attribute__((weak)) unsigned __max_processors() libcfa_public {
52 const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
53 if(!max_cores_s) {
54 __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
55 return __CFA_MAX_PROCESSORS__;
56 }
57
58 char * endptr = 0p;
59 long int max_cores_l = strtol(max_cores_s, &endptr, 10);
60 if(max_cores_l < 1 || max_cores_l > 65535) {
61 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
62 return __CFA_MAX_PROCESSORS__;
63 }
64 if('\0' != *endptr) {
65 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
66 return __CFA_MAX_PROCESSORS__;
67 }
68
69 return max_cores_l;
70}
71
72#if defined(CFA_HAVE_LINUX_LIBRSEQ)
73 // No forward declaration needed
74 #define __kernel_rseq_register rseq_register_current_thread
75 #define __kernel_rseq_unregister rseq_unregister_current_thread
76#elif defined(CFA_HAVE_LINUX_RSEQ_H)
77 static void __kernel_raw_rseq_register (void);
78 static void __kernel_raw_rseq_unregister(void);
79
80 #define __kernel_rseq_register __kernel_raw_rseq_register
81 #define __kernel_rseq_unregister __kernel_raw_rseq_unregister
82#else
83 // No forward declaration needed
84 // No initialization needed
85 static inline void noop(void) {}
86
87 #define __kernel_rseq_register noop
88 #define __kernel_rseq_unregister noop
89#endif
90
91//=======================================================================
92// Cluster wide reader-writer lock
93//=======================================================================
94void ?{}(__scheduler_RWLock_t & this) {
95 this.lock.max = __max_processors();
96 this.lock.alloc = 0;
97 this.lock.ready = 0;
98 this.lock.data = alloc(this.lock.max);
99 this.lock.write_lock = false;
100
101 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.lock.alloc), &this.lock.alloc));
102 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.lock.ready), &this.lock.ready));
103
104}
105void ^?{}(__scheduler_RWLock_t & this) {
106 free(this.lock.data);
107}
108
109
110//=======================================================================
111// Lock-Free registering/unregistering of threads
112unsigned register_proc_id( void ) with(__scheduler_lock.lock) {
113 __kernel_rseq_register();
114
115 bool * handle = (bool *)&kernelTLS().sched_lock;
116
117 // Step - 1 : check if there is already space in the data
118 uint_fast32_t s = ready;
119
120 // Check among all the ready
121 for(uint_fast32_t i = 0; i < s; i++) {
122 bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems
123 /* paranoid */ verify( handle != *cell );
124
125 bool * null = 0p; // Re-write every loop since compare thrashes it
126 if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null
127 && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
128 /* paranoid */ verify(i < ready);
129 /* paranoid */ verify( (kernelTLS().sched_id = i, true) );
130 return i;
131 }
132 }
133
134 if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock.lock.max);
135
136 // Step - 2 : F&A to get a new spot in the array.
137 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
138 if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock.lock.max);
139
140 // Step - 3 : Mark space as used and then publish it.
141 data[n] = handle;
142 while() {
143 unsigned copy = n;
144 if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
145 && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
146 break;
147 Pause();
148 }
149
150 // Return new spot.
151 /* paranoid */ verify(n < ready);
152 /* paranoid */ verify( (kernelTLS().sched_id = n, true) );
153 return n;
154}
155
156void unregister_proc_id( unsigned id ) with(__scheduler_lock.lock) {
157 /* paranoid */ verify(id < ready);
158 /* paranoid */ verify(id == kernelTLS().sched_id);
159 /* paranoid */ verify(data[id] == &kernelTLS().sched_lock);
160
161 bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems
162
163 __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
164
165 __kernel_rseq_unregister();
166}
167
168//-----------------------------------------------------------------------
169// Writer side : acquire when changing the ready queue, e.g. adding more
170// queues or removing them.
171uint_fast32_t ready_mutate_lock( void ) with(__scheduler_lock.lock) {
172 /* paranoid */ verify( ! __preemption_enabled() );
173
174 // Step 1 : lock global lock
175 // It is needed to avoid processors that register mid Critical-Section
176 // to simply lock their own lock and enter.
177 __atomic_acquire( &write_lock );
178
179 // Make sure we won't deadlock ourself
180 // Checking before acquiring the writer lock isn't safe
181 // because someone else could have locked us.
182 /* paranoid */ verify( ! kernelTLS().sched_lock );
183
184 // Step 2 : lock per-proc lock
185 // Processors that are currently being registered aren't counted
186 // but can't be in read_lock or in the critical section.
187 // All other processors are counted
188 uint_fast32_t s = ready;
189 for(uint_fast32_t i = 0; i < s; i++) {
190 volatile bool * llock = data[i];
191 if(llock) __atomic_acquire( llock );
192 }
193
194 /* paranoid */ verify( ! __preemption_enabled() );
195 return s;
196}
197
198void ready_mutate_unlock( uint_fast32_t last_s ) with(__scheduler_lock.lock) {
199 /* paranoid */ verify( ! __preemption_enabled() );
200
201 // Step 1 : release local locks
202 // This must be done while the global lock is held to avoid
203 // threads that where created mid critical section
204 // to race to lock their local locks and have the writer
205 // immidiately unlock them
206 // Alternative solution : return s in write_lock and pass it to write_unlock
207 for(uint_fast32_t i = 0; i < last_s; i++) {
208 volatile bool * llock = data[i];
209 if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE);
210 }
211
212 // Step 2 : release global lock
213 /*paranoid*/ assert(true == write_lock);
214 __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE);
215
216 /* paranoid */ verify( ! __preemption_enabled() );
217}
218
219//=======================================================================
220// Cluster growth
221static const unsigned __readyq_single_shard = 2;
222
223//-----------------------------------------------------------------------
224// Check that all the intrusive queues in the data structure are still consistent
225static void check_readyQ( cluster * cltr ) with (cltr->sched) {
226 #if defined(__CFA_WITH_VERIFY__)
227 {
228 const unsigned lanes_count = readyQ.count;
229 for( idx ; lanes_count ) {
230 __intrusive_lane_t & sl = readyQ.data[idx];
231 assert(!readyQ.data[idx].l.lock);
232
233 if(is_empty(sl)) {
234 assert( sl.l.anchor.next == 0p );
235 assert( sl.l.anchor.ts == MAX );
236 assert( mock_head(sl) == sl.l.prev );
237 } else {
238 assert( sl.l.anchor.next != 0p );
239 assert( sl.l.anchor.ts != MAX );
240 assert( mock_head(sl) != sl.l.prev );
241 }
242 }
243 }
244 #endif
245}
246
247// Call this function of the intrusive list was moved using memcpy
248// fixes the list so that the pointers back to anchors aren't left dangling
249static inline void fix(__intrusive_lane_t & ll) {
250 if(is_empty(ll)) {
251 verify(ll.l.anchor.next == 0p);
252 ll.l.prev = mock_head(ll);
253 }
254}
255
256static void assign_list(unsigned & valrq, unsigned & valio, dlist(processor) & list, unsigned count) {
257 processor * it = &list`first;
258 for(unsigned i = 0; i < count; i++) {
259 /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
260 it->rdq.id = valrq;
261 it->rdq.target = UINT_MAX;
262 valrq += __shard_factor.readyq;
263 #if defined(CFA_HAVE_LINUX_IO_URING_H)
264 it->io.ctx->cq.id = valio;
265 it->io.target = UINT_MAX;
266 valio += __shard_factor.io;
267 #endif
268 it = &(*it)`next;
269 }
270}
271
272static void reassign_cltr_id(struct cluster * cltr) {
273 unsigned prefrq = 0;
274 unsigned prefio = 0;
275 assign_list(prefrq, prefio, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
276 assign_list(prefrq, prefio, cltr->procs.idles , cltr->procs.idle );
277}
278
279#if defined(CFA_HAVE_LINUX_IO_URING_H)
280 static void assign_io(io_context$ ** data, size_t count, dlist(processor) & list) {
281 processor * it = &list`first;
282 while(it) {
283 /* paranoid */ verifyf( it, "Unexpected null iterator\n");
284 /* paranoid */ verifyf( it->io.ctx->cq.id < count, "Processor %p has id %u above count %zu\n", it, it->rdq.id, count);
285 data[it->io.ctx->cq.id] = it->io.ctx;
286 it = &(*it)`next;
287 }
288 }
289
290 static void reassign_cltr_io(struct cluster * cltr) {
291 assign_io(cltr->sched.io.data, cltr->sched.io.count, cltr->procs.actives);
292 assign_io(cltr->sched.io.data, cltr->sched.io.count, cltr->procs.idles );
293 }
294#else
295 static void reassign_cltr_io(struct cluster *) {}
296#endif
297
298static void fix_times( __timestamp_t * volatile & tscs, unsigned count ) {
299 tscs = alloc(count, tscs`realloc);
300 for(i; count) {
301 tscs[i].t.tv = rdtscl();
302 tscs[i].t.ma = 0;
303 }
304}
305
306// Grow the ready queue
307void ready_queue_grow(struct cluster * cltr) {
308 int target = cltr->procs.total;
309
310 /* paranoid */ verify( ready_mutate_islocked() );
311 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
312
313 // Make sure that everything is consistent
314 /* paranoid */ check_readyQ( cltr );
315
316
317 // Find new count
318 // Make sure we always have atleast 1 list
319 size_t ocount = cltr->sched.readyQ.count;
320 size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
321
322 // Do we have to do anything?
323 if( ocount != ncount ) {
324
325 // grow the ready queue
326 with( cltr->sched ) {
327
328 // Allocate new array (uses realloc and memcpies the data)
329 readyQ.data = alloc( ncount, readyQ.data`realloc );
330
331 // Fix the moved data
332 for( idx; ocount ) {
333 fix(readyQ.data[idx]);
334 }
335
336 // Construct new data
337 for( idx; ocount ~ ncount) {
338 (readyQ.data[idx]){};
339 }
340
341 // Update original count
342 readyQ.count = ncount;
343 }
344
345
346 fix_times(cltr->sched.readyQ.tscs, cltr->sched.readyQ.count);
347 }
348
349 // Fix the io times
350 cltr->sched.io.count = target * __shard_factor.io;
351 fix_times(cltr->sched.io.tscs, cltr->sched.io.count);
352
353 // realloc the caches
354 cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
355
356 // reassign the clusters.
357 reassign_cltr_id(cltr);
358
359 cltr->sched.io.data = alloc( cltr->sched.io.count, cltr->sched.io.data`realloc );
360 reassign_cltr_io(cltr);
361
362 // Make sure that everything is consistent
363 /* paranoid */ check_readyQ( cltr );
364 /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
365
366 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
367
368 /* paranoid */ verify( ready_mutate_islocked() );
369}
370
371// Shrink the ready queue
372void ready_queue_shrink(struct cluster * cltr) {
373 /* paranoid */ verify( ready_mutate_islocked() );
374 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
375
376 // Make sure that everything is consistent
377 /* paranoid */ check_readyQ( cltr );
378
379 int target = cltr->procs.total;
380
381 with( cltr->sched ) {
382 // Remember old count
383 size_t ocount = readyQ.count;
384
385 // Find new count
386 // Make sure we always have atleast 1 list
387 size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
388 /* paranoid */ verifyf( ocount >= ncount, "Error in shrinking size calculation, %zu >= %zu", ocount, ncount );
389 /* paranoid */ verifyf( ncount == target * __shard_factor.readyq || ncount == __readyq_single_shard,
390 /* paranoid */ "Error in shrinking size calculation, expected %u or %u, got %zu", target * __shard_factor.readyq, __readyq_single_shard, ncount );
391
392 readyQ.count = ncount;
393
394 // for printing count the number of displaced threads
395 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
396 __attribute__((unused)) size_t displaced = 0;
397 #endif
398
399 // redistribute old data
400 for( idx; ncount ~ ocount) {
401 // Lock is not strictly needed but makes checking invariants much easier
402 __attribute__((unused)) bool locked = __atomic_try_acquire(&readyQ.data[idx].l.lock);
403 verify(locked);
404
405 // As long as we can pop from this lane to push the threads somewhere else in the queue
406 while(!is_empty(readyQ.data[idx])) {
407 struct thread$ * thrd;
408 unsigned long long _;
409 [thrd, _] = pop(readyQ.data[idx]);
410
411 push(cltr, thrd, true);
412
413 // for printing count the number of displaced threads
414 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
415 displaced++;
416 #endif
417 }
418
419 // Unlock the lane
420 __atomic_unlock(&readyQ.data[idx].l.lock);
421
422 // TODO print the queue statistics here
423
424 ^(readyQ.data[idx]){};
425 }
426
427 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
428
429 // Allocate new array (uses realloc and memcpies the data)
430 readyQ.data = alloc( ncount, readyQ.data`realloc );
431
432 // Fix the moved data
433 for( idx; ncount ) {
434 fix(readyQ.data[idx]);
435 }
436
437 fix_times(readyQ.tscs, ncount);
438 }
439 cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
440
441 // Fix the io times
442 cltr->sched.io.count = target * __shard_factor.io;
443 fix_times(cltr->sched.io.tscs, cltr->sched.io.count);
444
445 reassign_cltr_id(cltr);
446
447 cltr->sched.io.data = alloc( cltr->sched.io.count, cltr->sched.io.data`realloc );
448 reassign_cltr_io(cltr);
449
450 // Make sure that everything is consistent
451 /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
452 /* paranoid */ check_readyQ( cltr );
453
454 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
455 /* paranoid */ verify( ready_mutate_islocked() );
456}
457
458void ready_queue_close(struct cluster * cltr) {
459 free( cltr->sched.readyQ.data );
460 free( cltr->sched.readyQ.tscs );
461 cltr->sched.readyQ.data = 0p;
462 cltr->sched.readyQ.tscs = 0p;
463 cltr->sched.readyQ.count = 0;
464
465 free( cltr->sched.io.tscs );
466 free( cltr->sched.caches );
467}
468
469#define nested_offsetof(type, field) ((off_t)(&(((type*)0)-> field)))
470
471// Ctor
472void ?{}( __intrusive_lane_t & this ) {
473 this.l.lock = false;
474 this.l.prev = mock_head(this);
475 this.l.anchor.next = 0p;
476 this.l.anchor.ts = MAX;
477 #if !defined(__CFA_NO_STATISTICS__)
478 this.l.cnt = 0;
479 #endif
480
481 // We add a boat-load of assertions here because the anchor code is very fragile
482 /* paranoid */ _Static_assert( offsetof( thread$, link ) == nested_offsetof(__intrusive_lane_t, l.anchor) );
483 /* paranoid */ verify( offsetof( thread$, link ) == nested_offsetof(__intrusive_lane_t, l.anchor) );
484 /* paranoid */ verify( ((uintptr_t)( mock_head(this) ) + offsetof( thread$, link )) == (uintptr_t)(&this.l.anchor) );
485 /* paranoid */ verify( &mock_head(this)->link.next == &this.l.anchor.next );
486 /* paranoid */ verify( &mock_head(this)->link.ts == &this.l.anchor.ts );
487 /* paranoid */ verify( mock_head(this)->link.next == 0p );
488 /* paranoid */ verify( mock_head(this)->link.ts == MAX );
489 /* paranoid */ verify( mock_head(this) == this.l.prev );
490 /* paranoid */ verify( __alignof__(__intrusive_lane_t) == 64 );
491 /* paranoid */ verify( __alignof__(this) == 64 );
492 /* paranoid */ verifyf( ((intptr_t)(&this) % 64) == 0, "Expected address to be aligned %p %% 64 == %zd", &this, ((intptr_t)(&this) % 64) );
493}
494
495#undef nested_offsetof
496
497// Dtor is trivial
498void ^?{}( __intrusive_lane_t & this ) {
499 // Make sure the list is empty
500 /* paranoid */ verify( this.l.anchor.next == 0p );
501 /* paranoid */ verify( this.l.anchor.ts == MAX );
502 /* paranoid */ verify( mock_head(this) == this.l.prev );
503}
504
505#if defined(CFA_HAVE_LINUX_LIBRSEQ)
506 // No definition needed
507#elif defined(CFA_HAVE_LINUX_RSEQ_H)
508
509 #if defined( __x86_64 ) || defined( __i386 )
510 #define RSEQ_SIG 0x53053053
511 #elif defined( __ARM_ARCH )
512 #ifdef __ARMEB__
513 #define RSEQ_SIG 0xf3def5e7 /* udf #24035 ; 0x5de3 (ARMv6+) */
514 #else
515 #define RSEQ_SIG 0xe7f5def3 /* udf #24035 ; 0x5de3 */
516 #endif
517 #endif
518
519 extern void __disable_interrupts_hard();
520 extern void __enable_interrupts_hard();
521
522 static void __kernel_raw_rseq_register (void) {
523 /* paranoid */ verify( __cfaabi_rseq.cpu_id == RSEQ_CPU_ID_UNINITIALIZED );
524
525 // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, (sigset_t *)0p, _NSIG / 8);
526 int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, RSEQ_SIG);
527 if(ret != 0) {
528 int e = errno;
529 switch(e) {
530 case EINVAL: abort("KERNEL ERROR: rseq register invalid argument");
531 case ENOSYS: abort("KERNEL ERROR: rseq register no supported");
532 case EFAULT: abort("KERNEL ERROR: rseq register with invalid argument");
533 case EBUSY : abort("KERNEL ERROR: rseq register already registered");
534 case EPERM : abort("KERNEL ERROR: rseq register sig argument on unregistration does not match the signature received on registration");
535 default: abort("KERNEL ERROR: rseq register unexpected return %d", e);
536 }
537 }
538 }
539
540 static void __kernel_raw_rseq_unregister(void) {
541 /* paranoid */ verify( __cfaabi_rseq.cpu_id >= 0 );
542
543 // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, (sigset_t *)0p, _NSIG / 8);
544 int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, RSEQ_SIG);
545 if(ret != 0) {
546 int e = errno;
547 switch(e) {
548 case EINVAL: abort("KERNEL ERROR: rseq unregister invalid argument");
549 case ENOSYS: abort("KERNEL ERROR: rseq unregister no supported");
550 case EFAULT: abort("KERNEL ERROR: rseq unregister with invalid argument");
551 case EBUSY : abort("KERNEL ERROR: rseq unregister already registered");
552 case EPERM : abort("KERNEL ERROR: rseq unregister sig argument on unregistration does not match the signature received on registration");
553 default: abort("KERNEL ERROR: rseq unregisteunexpected return %d", e);
554 }
555 }
556 }
557#else
558 // No definition needed
559#endif
Note: See TracBrowser for help on using the repository browser.