source: libcfa/src/concurrency/kernel/cluster.cfa@ b39e961b

ADT ast-experimental enum pthread-emulation qualifiedEnum
Last change on this file since b39e961b was 708ae38, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Some more cleanup and grow/shrink now readjusts io timestamps.
(They are still unused).

  • Property mode set to 100644
File size: 16.8 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2022 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// cluster.cfa -- file that includes helpers for subsystem that need cluster wide support
8//
9// Author : Thierry Delisle
10// Created On : Fri Mar 11 12:39:24 2022
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
17#define _GNU_SOURCE
18
19#include "bits/defs.hfa"
20#include "device/cpu.hfa"
21#include "kernel/cluster.hfa"
22#include "kernel/private.hfa"
23
24#include "stdlib.hfa"
25#include "limits.hfa"
26#include "math.hfa"
27
28#include "ready_subqueue.hfa"
29
30#include <errno.h>
31#include <unistd.h>
32
33extern "C" {
34 #include <sys/syscall.h> // __NR_xxx
35}
36
37// No overriden function, no environment variable, no define
38// fall back to a magic number
39#ifndef __CFA_MAX_PROCESSORS__
40 #define __CFA_MAX_PROCESSORS__ 1024
41#endif
42
43#if !defined(__CFA_NO_STATISTICS__)
44 #define __STATS(...) __VA_ARGS__
45#else
46 #define __STATS(...)
47#endif
48
49// returns the maximum number of processors the RWLock support
50__attribute__((weak)) unsigned __max_processors() {
51 const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
52 if(!max_cores_s) {
53 __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
54 return __CFA_MAX_PROCESSORS__;
55 }
56
57 char * endptr = 0p;
58 long int max_cores_l = strtol(max_cores_s, &endptr, 10);
59 if(max_cores_l < 1 || max_cores_l > 65535) {
60 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
61 return __CFA_MAX_PROCESSORS__;
62 }
63 if('\0' != *endptr) {
64 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
65 return __CFA_MAX_PROCESSORS__;
66 }
67
68 return max_cores_l;
69}
70
71#if defined(CFA_HAVE_LINUX_LIBRSEQ)
72 // No forward declaration needed
73 #define __kernel_rseq_register rseq_register_current_thread
74 #define __kernel_rseq_unregister rseq_unregister_current_thread
75#elif defined(CFA_HAVE_LINUX_RSEQ_H)
76 static void __kernel_raw_rseq_register (void);
77 static void __kernel_raw_rseq_unregister(void);
78
79 #define __kernel_rseq_register __kernel_raw_rseq_register
80 #define __kernel_rseq_unregister __kernel_raw_rseq_unregister
81#else
82 // No forward declaration needed
83 // No initialization needed
84 static inline void noop(void) {}
85
86 #define __kernel_rseq_register noop
87 #define __kernel_rseq_unregister noop
88#endif
89
90//=======================================================================
91// Cluster wide reader-writer lock
92//=======================================================================
93void ?{}(__scheduler_RWLock_t & this) {
94 this.max = __max_processors();
95 this.alloc = 0;
96 this.ready = 0;
97 this.data = alloc(this.max);
98 this.write_lock = false;
99
100 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
101 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
102
103}
104void ^?{}(__scheduler_RWLock_t & this) {
105 free(this.data);
106}
107
108
109//=======================================================================
110// Lock-Free registering/unregistering of threads
111unsigned register_proc_id( void ) with(*__scheduler_lock) {
112 __kernel_rseq_register();
113
114 bool * handle = (bool *)&kernelTLS().sched_lock;
115
116 // Step - 1 : check if there is already space in the data
117 uint_fast32_t s = ready;
118
119 // Check among all the ready
120 for(uint_fast32_t i = 0; i < s; i++) {
121 bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems
122 /* paranoid */ verify( handle != *cell );
123
124 bool * null = 0p; // Re-write every loop since compare thrashes it
125 if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null
126 && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
127 /* paranoid */ verify(i < ready);
128 /* paranoid */ verify( (kernelTLS().sched_id = i, true) );
129 return i;
130 }
131 }
132
133 if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock->max);
134
135 // Step - 2 : F&A to get a new spot in the array.
136 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
137 if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock->max);
138
139 // Step - 3 : Mark space as used and then publish it.
140 data[n] = handle;
141 while() {
142 unsigned copy = n;
143 if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
144 && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
145 break;
146 Pause();
147 }
148
149 // Return new spot.
150 /* paranoid */ verify(n < ready);
151 /* paranoid */ verify( (kernelTLS().sched_id = n, true) );
152 return n;
153}
154
155void unregister_proc_id( unsigned id ) with(*__scheduler_lock) {
156 /* paranoid */ verify(id < ready);
157 /* paranoid */ verify(id == kernelTLS().sched_id);
158 /* paranoid */ verify(data[id] == &kernelTLS().sched_lock);
159
160 bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems
161
162 __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
163
164 __kernel_rseq_unregister();
165}
166
167//-----------------------------------------------------------------------
168// Writer side : acquire when changing the ready queue, e.g. adding more
169// queues or removing them.
170uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
171 /* paranoid */ verify( ! __preemption_enabled() );
172
173 // Step 1 : lock global lock
174 // It is needed to avoid processors that register mid Critical-Section
175 // to simply lock their own lock and enter.
176 __atomic_acquire( &write_lock );
177
178 // Make sure we won't deadlock ourself
179 // Checking before acquiring the writer lock isn't safe
180 // because someone else could have locked us.
181 /* paranoid */ verify( ! kernelTLS().sched_lock );
182
183 // Step 2 : lock per-proc lock
184 // Processors that are currently being registered aren't counted
185 // but can't be in read_lock or in the critical section.
186 // All other processors are counted
187 uint_fast32_t s = ready;
188 for(uint_fast32_t i = 0; i < s; i++) {
189 volatile bool * llock = data[i];
190 if(llock) __atomic_acquire( llock );
191 }
192
193 /* paranoid */ verify( ! __preemption_enabled() );
194 return s;
195}
196
197void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) {
198 /* paranoid */ verify( ! __preemption_enabled() );
199
200 // Step 1 : release local locks
201 // This must be done while the global lock is held to avoid
202 // threads that where created mid critical section
203 // to race to lock their local locks and have the writer
204 // immidiately unlock them
205 // Alternative solution : return s in write_lock and pass it to write_unlock
206 for(uint_fast32_t i = 0; i < last_s; i++) {
207 volatile bool * llock = data[i];
208 if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE);
209 }
210
211 // Step 2 : release global lock
212 /*paranoid*/ assert(true == write_lock);
213 __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE);
214
215 /* paranoid */ verify( ! __preemption_enabled() );
216}
217
218//=======================================================================
219// Cluster growth
220static const unsigned __readyq_single_shard = 2;
221
222//-----------------------------------------------------------------------
223// Check that all the intrusive queues in the data structure are still consistent
224static void check_readyQ( cluster * cltr ) with (cltr->sched) {
225 #if defined(__CFA_WITH_VERIFY__)
226 {
227 const unsigned lanes_count = readyQ.count;
228 for( idx ; lanes_count ) {
229 __intrusive_lane_t & sl = readyQ.data[idx];
230 assert(!readyQ.data[idx].lock);
231
232 if(is_empty(sl)) {
233 assert( sl.anchor.next == 0p );
234 assert( sl.anchor.ts == -1llu );
235 assert( mock_head(sl) == sl.prev );
236 } else {
237 assert( sl.anchor.next != 0p );
238 assert( sl.anchor.ts != -1llu );
239 assert( mock_head(sl) != sl.prev );
240 }
241 }
242 }
243 #endif
244}
245
246// Call this function of the intrusive list was moved using memcpy
247// fixes the list so that the pointers back to anchors aren't left dangling
248static inline void fix(__intrusive_lane_t & ll) {
249 if(is_empty(ll)) {
250 verify(ll.anchor.next == 0p);
251 ll.prev = mock_head(ll);
252 }
253}
254
255static void assign_list(unsigned & value, dlist(processor) & list, unsigned count) {
256 processor * it = &list`first;
257 for(unsigned i = 0; i < count; i++) {
258 /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
259 it->rdq.id = value;
260 it->rdq.target = MAX;
261 value += __shard_factor.readyq;
262 it = &(*it)`next;
263 }
264}
265
266static void reassign_cltr_id(struct cluster * cltr) {
267 unsigned preferred = 0;
268 assign_list(preferred, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
269 assign_list(preferred, cltr->procs.idles , cltr->procs.idle );
270}
271
272static void fix_times( __timestamp_t * volatile & tscs, unsigned count ) {
273 tscs = alloc(count, tscs`realloc);
274 for(i; count) {
275 tscs[i].tv = rdtscl();
276 tscs[i].ma = 0;
277 }
278}
279
280// Grow the ready queue
281void ready_queue_grow(struct cluster * cltr) {
282 int target = cltr->procs.total;
283
284 /* paranoid */ verify( ready_mutate_islocked() );
285 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
286
287 // Make sure that everything is consistent
288 /* paranoid */ check_readyQ( cltr );
289
290
291 // Find new count
292 // Make sure we always have atleast 1 list
293 size_t ocount = cltr->sched.readyQ.count;
294 size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
295
296 // Do we have to do anything?
297 if( ocount != ncount ) {
298
299 // grow the ready queue
300 with( cltr->sched ) {
301
302 // Allocate new array (uses realloc and memcpies the data)
303 readyQ.data = alloc( ncount, readyQ.data`realloc );
304
305 // Fix the moved data
306 for( idx; ocount ) {
307 fix(readyQ.data[idx]);
308 }
309
310 // Construct new data
311 for( idx; ocount ~ ncount) {
312 (readyQ.data[idx]){};
313 }
314
315 // Update original count
316 readyQ.count = ncount;
317 }
318
319
320 fix_times(cltr->sched.readyQ.tscs, cltr->sched.readyQ.count);
321 }
322
323 // Fix the io times
324 cltr->sched.io.count = target;
325 fix_times(cltr->sched.io.tscs, cltr->sched.io.count);
326
327 // realloc the caches
328 cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
329
330 // reassign the clusters.
331 reassign_cltr_id(cltr);
332
333 // Make sure that everything is consistent
334 /* paranoid */ check_readyQ( cltr );
335 /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
336
337 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
338
339 /* paranoid */ verify( ready_mutate_islocked() );
340}
341
342// Shrink the ready queue
343void ready_queue_shrink(struct cluster * cltr) {
344 /* paranoid */ verify( ready_mutate_islocked() );
345 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
346
347 // Make sure that everything is consistent
348 /* paranoid */ check_readyQ( cltr );
349
350 int target = cltr->procs.total;
351
352 with( cltr->sched ) {
353 // Remember old count
354 size_t ocount = readyQ.count;
355
356 // Find new count
357 // Make sure we always have atleast 1 list
358 size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
359 /* paranoid */ verifyf( ocount >= ncount, "Error in shrinking size calculation, %zu >= %zu", ocount, ncount );
360 /* paranoid */ verifyf( ncount == target * __shard_factor.readyq || ncount == __readyq_single_shard,
361 /* paranoid */ "Error in shrinking size calculation, expected %u or %u, got %zu", target * __shard_factor.readyq, __readyq_single_shard, ncount );
362
363 readyQ.count = ncount;
364
365 // for printing count the number of displaced threads
366 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
367 __attribute__((unused)) size_t displaced = 0;
368 #endif
369
370 // redistribute old data
371 for( idx; ncount ~ ocount) {
372 // Lock is not strictly needed but makes checking invariants much easier
373 __attribute__((unused)) bool locked = __atomic_try_acquire(&readyQ.data[idx].lock);
374 verify(locked);
375
376 // As long as we can pop from this lane to push the threads somewhere else in the queue
377 while(!is_empty(readyQ.data[idx])) {
378 struct thread$ * thrd;
379 unsigned long long _;
380 [thrd, _] = pop(readyQ.data[idx]);
381
382 push(cltr, thrd, true);
383
384 // for printing count the number of displaced threads
385 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
386 displaced++;
387 #endif
388 }
389
390 // Unlock the lane
391 __atomic_unlock(&readyQ.data[idx].lock);
392
393 // TODO print the queue statistics here
394
395 ^(readyQ.data[idx]){};
396 }
397
398 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
399
400 // Allocate new array (uses realloc and memcpies the data)
401 readyQ.data = alloc( ncount, readyQ.data`realloc );
402
403 // Fix the moved data
404 for( idx; ncount ) {
405 fix(readyQ.data[idx]);
406 }
407
408 fix_times(readyQ.tscs, ncount);
409 }
410 cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
411
412 // Fix the io times
413 cltr->sched.io.count = target;
414 fix_times(cltr->sched.io.tscs, cltr->sched.io.count);
415
416 reassign_cltr_id(cltr);
417
418 // Make sure that everything is consistent
419 /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
420 /* paranoid */ check_readyQ( cltr );
421
422 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
423 /* paranoid */ verify( ready_mutate_islocked() );
424}
425
426void ready_queue_close(struct cluster * cltr) {
427 free( cltr->sched.readyQ.data );
428 free( cltr->sched.readyQ.tscs );
429 cltr->sched.readyQ.data = 0p;
430 cltr->sched.readyQ.tscs = 0p;
431 cltr->sched.readyQ.count = 0;
432
433 free( cltr->sched.io.tscs );
434 free( cltr->sched.caches );
435}
436
437// Ctor
438void ?{}( __intrusive_lane_t & this ) {
439 this.lock = false;
440 this.prev = mock_head(this);
441 this.anchor.next = 0p;
442 this.anchor.ts = -1llu;
443 #if !defined(__CFA_NO_STATISTICS__)
444 this.cnt = 0;
445 #endif
446
447 // We add a boat-load of assertions here because the anchor code is very fragile
448 /* paranoid */ _Static_assert( offsetof( thread$, link ) == offsetof(__intrusive_lane_t, anchor) );
449 /* paranoid */ verify( offsetof( thread$, link ) == offsetof(__intrusive_lane_t, anchor) );
450 /* paranoid */ verify( ((uintptr_t)( mock_head(this) ) + offsetof( thread$, link )) == (uintptr_t)(&this.anchor) );
451 /* paranoid */ verify( &mock_head(this)->link.next == &this.anchor.next );
452 /* paranoid */ verify( &mock_head(this)->link.ts == &this.anchor.ts );
453 /* paranoid */ verify( mock_head(this)->link.next == 0p );
454 /* paranoid */ verify( mock_head(this)->link.ts == -1llu );
455 /* paranoid */ verify( mock_head(this) == this.prev );
456 /* paranoid */ verify( __alignof__(__intrusive_lane_t) == 128 );
457 /* paranoid */ verify( __alignof__(this) == 128 );
458 /* paranoid */ verifyf( ((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128) );
459}
460
461// Dtor is trivial
462void ^?{}( __intrusive_lane_t & this ) {
463 // Make sure the list is empty
464 /* paranoid */ verify( this.anchor.next == 0p );
465 /* paranoid */ verify( this.anchor.ts == -1llu );
466 /* paranoid */ verify( mock_head(this) == this.prev );
467}
468
469#if defined(CFA_HAVE_LINUX_LIBRSEQ)
470 // No definition needed
471#elif defined(CFA_HAVE_LINUX_RSEQ_H)
472
473 #if defined( __x86_64 ) || defined( __i386 )
474 #define RSEQ_SIG 0x53053053
475 #elif defined( __ARM_ARCH )
476 #ifdef __ARMEB__
477 #define RSEQ_SIG 0xf3def5e7 /* udf #24035 ; 0x5de3 (ARMv6+) */
478 #else
479 #define RSEQ_SIG 0xe7f5def3 /* udf #24035 ; 0x5de3 */
480 #endif
481 #endif
482
483 extern void __disable_interrupts_hard();
484 extern void __enable_interrupts_hard();
485
486 static void __kernel_raw_rseq_register (void) {
487 /* paranoid */ verify( __cfaabi_rseq.cpu_id == RSEQ_CPU_ID_UNINITIALIZED );
488
489 // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, (sigset_t *)0p, _NSIG / 8);
490 int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, RSEQ_SIG);
491 if(ret != 0) {
492 int e = errno;
493 switch(e) {
494 case EINVAL: abort("KERNEL ERROR: rseq register invalid argument");
495 case ENOSYS: abort("KERNEL ERROR: rseq register no supported");
496 case EFAULT: abort("KERNEL ERROR: rseq register with invalid argument");
497 case EBUSY : abort("KERNEL ERROR: rseq register already registered");
498 case EPERM : abort("KERNEL ERROR: rseq register sig argument on unregistration does not match the signature received on registration");
499 default: abort("KERNEL ERROR: rseq register unexpected return %d", e);
500 }
501 }
502 }
503
504 static void __kernel_raw_rseq_unregister(void) {
505 /* paranoid */ verify( __cfaabi_rseq.cpu_id >= 0 );
506
507 // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, (sigset_t *)0p, _NSIG / 8);
508 int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, RSEQ_SIG);
509 if(ret != 0) {
510 int e = errno;
511 switch(e) {
512 case EINVAL: abort("KERNEL ERROR: rseq unregister invalid argument");
513 case ENOSYS: abort("KERNEL ERROR: rseq unregister no supported");
514 case EFAULT: abort("KERNEL ERROR: rseq unregister with invalid argument");
515 case EBUSY : abort("KERNEL ERROR: rseq unregister already registered");
516 case EPERM : abort("KERNEL ERROR: rseq unregister sig argument on unregistration does not match the signature received on registration");
517 default: abort("KERNEL ERROR: rseq unregisteunexpected return %d", e);
518 }
519 }
520 }
521#else
522 // No definition needed
523#endif
Note: See TracBrowser for help on using the repository browser.