source: libcfa/src/concurrency/kernel/cluster.cfa@ dbe2533

ADT ast-experimental enum pthread-emulation qualifiedEnum
Last change on this file since dbe2533 was adb3ea1, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Some more incremental work towards using timestamps for io fairness

  • Property mode set to 100644
File size: 17.7 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2022 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// cluster.cfa -- file that includes helpers for subsystem that need cluster wide support
8//
9// Author : Thierry Delisle
10// Created On : Fri Mar 11 12:39:24 2022
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
17#define _GNU_SOURCE
18
19#include "bits/defs.hfa"
20#include "device/cpu.hfa"
21#include "kernel/cluster.hfa"
22#include "kernel/private.hfa"
23
24#include "stdlib.hfa"
25#include "limits.hfa"
26#include "math.hfa"
27
28#include "ready_subqueue.hfa"
29
30#include <errno.h>
31#include <unistd.h>
32
33extern "C" {
34 #include <sys/syscall.h> // __NR_xxx
35}
36
37// No overriden function, no environment variable, no define
38// fall back to a magic number
39#ifndef __CFA_MAX_PROCESSORS__
40 #define __CFA_MAX_PROCESSORS__ 1024
41#endif
42
43#if !defined(__CFA_NO_STATISTICS__)
44 #define __STATS(...) __VA_ARGS__
45#else
46 #define __STATS(...)
47#endif
48
49// returns the maximum number of processors the RWLock support
50__attribute__((weak)) unsigned __max_processors() {
51 const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
52 if(!max_cores_s) {
53 __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
54 return __CFA_MAX_PROCESSORS__;
55 }
56
57 char * endptr = 0p;
58 long int max_cores_l = strtol(max_cores_s, &endptr, 10);
59 if(max_cores_l < 1 || max_cores_l > 65535) {
60 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
61 return __CFA_MAX_PROCESSORS__;
62 }
63 if('\0' != *endptr) {
64 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
65 return __CFA_MAX_PROCESSORS__;
66 }
67
68 return max_cores_l;
69}
70
71#if defined(CFA_HAVE_LINUX_LIBRSEQ)
72 // No forward declaration needed
73 #define __kernel_rseq_register rseq_register_current_thread
74 #define __kernel_rseq_unregister rseq_unregister_current_thread
75#elif defined(CFA_HAVE_LINUX_RSEQ_H)
76 static void __kernel_raw_rseq_register (void);
77 static void __kernel_raw_rseq_unregister(void);
78
79 #define __kernel_rseq_register __kernel_raw_rseq_register
80 #define __kernel_rseq_unregister __kernel_raw_rseq_unregister
81#else
82 // No forward declaration needed
83 // No initialization needed
84 static inline void noop(void) {}
85
86 #define __kernel_rseq_register noop
87 #define __kernel_rseq_unregister noop
88#endif
89
90//=======================================================================
91// Cluster wide reader-writer lock
92//=======================================================================
93void ?{}(__scheduler_RWLock_t & this) {
94 this.max = __max_processors();
95 this.alloc = 0;
96 this.ready = 0;
97 this.data = alloc(this.max);
98 this.write_lock = false;
99
100 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
101 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
102
103}
104void ^?{}(__scheduler_RWLock_t & this) {
105 free(this.data);
106}
107
108
109//=======================================================================
110// Lock-Free registering/unregistering of threads
111unsigned register_proc_id( void ) with(*__scheduler_lock) {
112 __kernel_rseq_register();
113
114 bool * handle = (bool *)&kernelTLS().sched_lock;
115
116 // Step - 1 : check if there is already space in the data
117 uint_fast32_t s = ready;
118
119 // Check among all the ready
120 for(uint_fast32_t i = 0; i < s; i++) {
121 bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems
122 /* paranoid */ verify( handle != *cell );
123
124 bool * null = 0p; // Re-write every loop since compare thrashes it
125 if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null
126 && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
127 /* paranoid */ verify(i < ready);
128 /* paranoid */ verify( (kernelTLS().sched_id = i, true) );
129 return i;
130 }
131 }
132
133 if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock->max);
134
135 // Step - 2 : F&A to get a new spot in the array.
136 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
137 if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock->max);
138
139 // Step - 3 : Mark space as used and then publish it.
140 data[n] = handle;
141 while() {
142 unsigned copy = n;
143 if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
144 && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
145 break;
146 Pause();
147 }
148
149 // Return new spot.
150 /* paranoid */ verify(n < ready);
151 /* paranoid */ verify( (kernelTLS().sched_id = n, true) );
152 return n;
153}
154
155void unregister_proc_id( unsigned id ) with(*__scheduler_lock) {
156 /* paranoid */ verify(id < ready);
157 /* paranoid */ verify(id == kernelTLS().sched_id);
158 /* paranoid */ verify(data[id] == &kernelTLS().sched_lock);
159
160 bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems
161
162 __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
163
164 __kernel_rseq_unregister();
165}
166
167//-----------------------------------------------------------------------
168// Writer side : acquire when changing the ready queue, e.g. adding more
169// queues or removing them.
170uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
171 /* paranoid */ verify( ! __preemption_enabled() );
172
173 // Step 1 : lock global lock
174 // It is needed to avoid processors that register mid Critical-Section
175 // to simply lock their own lock and enter.
176 __atomic_acquire( &write_lock );
177
178 // Make sure we won't deadlock ourself
179 // Checking before acquiring the writer lock isn't safe
180 // because someone else could have locked us.
181 /* paranoid */ verify( ! kernelTLS().sched_lock );
182
183 // Step 2 : lock per-proc lock
184 // Processors that are currently being registered aren't counted
185 // but can't be in read_lock or in the critical section.
186 // All other processors are counted
187 uint_fast32_t s = ready;
188 for(uint_fast32_t i = 0; i < s; i++) {
189 volatile bool * llock = data[i];
190 if(llock) __atomic_acquire( llock );
191 }
192
193 /* paranoid */ verify( ! __preemption_enabled() );
194 return s;
195}
196
197void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) {
198 /* paranoid */ verify( ! __preemption_enabled() );
199
200 // Step 1 : release local locks
201 // This must be done while the global lock is held to avoid
202 // threads that where created mid critical section
203 // to race to lock their local locks and have the writer
204 // immidiately unlock them
205 // Alternative solution : return s in write_lock and pass it to write_unlock
206 for(uint_fast32_t i = 0; i < last_s; i++) {
207 volatile bool * llock = data[i];
208 if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE);
209 }
210
211 // Step 2 : release global lock
212 /*paranoid*/ assert(true == write_lock);
213 __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE);
214
215 /* paranoid */ verify( ! __preemption_enabled() );
216}
217
218//=======================================================================
219// Cluster growth
220static const unsigned __readyq_single_shard = 2;
221
222//-----------------------------------------------------------------------
223// Check that all the intrusive queues in the data structure are still consistent
224static void check_readyQ( cluster * cltr ) with (cltr->sched) {
225 #if defined(__CFA_WITH_VERIFY__)
226 {
227 const unsigned lanes_count = readyQ.count;
228 for( idx ; lanes_count ) {
229 __intrusive_lane_t & sl = readyQ.data[idx];
230 assert(!readyQ.data[idx].lock);
231
232 if(is_empty(sl)) {
233 assert( sl.anchor.next == 0p );
234 assert( sl.anchor.ts == -1llu );
235 assert( mock_head(sl) == sl.prev );
236 } else {
237 assert( sl.anchor.next != 0p );
238 assert( sl.anchor.ts != -1llu );
239 assert( mock_head(sl) != sl.prev );
240 }
241 }
242 }
243 #endif
244}
245
246// Call this function of the intrusive list was moved using memcpy
247// fixes the list so that the pointers back to anchors aren't left dangling
248static inline void fix(__intrusive_lane_t & ll) {
249 if(is_empty(ll)) {
250 verify(ll.anchor.next == 0p);
251 ll.prev = mock_head(ll);
252 }
253}
254
255static void assign_list(unsigned & valrq, unsigned & valio, dlist(processor) & list, unsigned count) {
256 processor * it = &list`first;
257 for(unsigned i = 0; i < count; i++) {
258 /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
259 it->rdq.id = valrq;
260 it->rdq.target = MAX;
261 it->io.id = valio;
262 it->io.target = MAX;
263 valrq += __shard_factor.readyq;
264 valio += __shard_factor.io;
265 it = &(*it)`next;
266 }
267}
268
269static void reassign_cltr_id(struct cluster * cltr) {
270 unsigned prefrq = 0;
271 unsigned prefio = 0;
272 assign_list(prefrq, prefio, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
273 assign_list(prefrq, prefio, cltr->procs.idles , cltr->procs.idle );
274}
275
276static void assign_io($io_context ** data, size_t count, dlist(processor) & list) {
277 processor * it = &list`first;
278 while(it) {
279 /* paranoid */ verifyf( it, "Unexpected null iterator\n");
280 /* paranoid */ verifyf( it->io.id < count, "Processor %p has id %u above count %zu\n", it, it->rdq.id, count);
281 data[it->io.id] = it->io.ctx;
282 it = &(*it)`next;
283 }
284}
285
286static void reassign_cltr_io(struct cluster * cltr) {
287 assign_io(cltr->sched.io.data, cltr->sched.io.count, cltr->procs.actives);
288 assign_io(cltr->sched.io.data, cltr->sched.io.count, cltr->procs.idles );
289}
290
291static void fix_times( __timestamp_t * volatile & tscs, unsigned count ) {
292 tscs = alloc(count, tscs`realloc);
293 for(i; count) {
294 tscs[i].tv = rdtscl();
295 tscs[i].ma = 0;
296 }
297}
298
299// Grow the ready queue
300void ready_queue_grow(struct cluster * cltr) {
301 int target = cltr->procs.total;
302
303 /* paranoid */ verify( ready_mutate_islocked() );
304 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
305
306 // Make sure that everything is consistent
307 /* paranoid */ check_readyQ( cltr );
308
309
310 // Find new count
311 // Make sure we always have atleast 1 list
312 size_t ocount = cltr->sched.readyQ.count;
313 size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
314
315 // Do we have to do anything?
316 if( ocount != ncount ) {
317
318 // grow the ready queue
319 with( cltr->sched ) {
320
321 // Allocate new array (uses realloc and memcpies the data)
322 readyQ.data = alloc( ncount, readyQ.data`realloc );
323
324 // Fix the moved data
325 for( idx; ocount ) {
326 fix(readyQ.data[idx]);
327 }
328
329 // Construct new data
330 for( idx; ocount ~ ncount) {
331 (readyQ.data[idx]){};
332 }
333
334 // Update original count
335 readyQ.count = ncount;
336 }
337
338
339 fix_times(cltr->sched.readyQ.tscs, cltr->sched.readyQ.count);
340 }
341
342 // Fix the io times
343 cltr->sched.io.count = target * __shard_factor.io;
344 fix_times(cltr->sched.io.tscs, cltr->sched.io.count);
345
346 // realloc the caches
347 cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
348
349 // reassign the clusters.
350 reassign_cltr_id(cltr);
351
352 cltr->sched.io.data = alloc( cltr->sched.io.count, cltr->sched.io.data`realloc );
353 reassign_cltr_io(cltr);
354
355 // Make sure that everything is consistent
356 /* paranoid */ check_readyQ( cltr );
357 /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
358
359 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
360
361 /* paranoid */ verify( ready_mutate_islocked() );
362}
363
364// Shrink the ready queue
365void ready_queue_shrink(struct cluster * cltr) {
366 /* paranoid */ verify( ready_mutate_islocked() );
367 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
368
369 // Make sure that everything is consistent
370 /* paranoid */ check_readyQ( cltr );
371
372 int target = cltr->procs.total;
373
374 with( cltr->sched ) {
375 // Remember old count
376 size_t ocount = readyQ.count;
377
378 // Find new count
379 // Make sure we always have atleast 1 list
380 size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
381 /* paranoid */ verifyf( ocount >= ncount, "Error in shrinking size calculation, %zu >= %zu", ocount, ncount );
382 /* paranoid */ verifyf( ncount == target * __shard_factor.readyq || ncount == __readyq_single_shard,
383 /* paranoid */ "Error in shrinking size calculation, expected %u or %u, got %zu", target * __shard_factor.readyq, __readyq_single_shard, ncount );
384
385 readyQ.count = ncount;
386
387 // for printing count the number of displaced threads
388 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
389 __attribute__((unused)) size_t displaced = 0;
390 #endif
391
392 // redistribute old data
393 for( idx; ncount ~ ocount) {
394 // Lock is not strictly needed but makes checking invariants much easier
395 __attribute__((unused)) bool locked = __atomic_try_acquire(&readyQ.data[idx].lock);
396 verify(locked);
397
398 // As long as we can pop from this lane to push the threads somewhere else in the queue
399 while(!is_empty(readyQ.data[idx])) {
400 struct thread$ * thrd;
401 unsigned long long _;
402 [thrd, _] = pop(readyQ.data[idx]);
403
404 push(cltr, thrd, true);
405
406 // for printing count the number of displaced threads
407 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
408 displaced++;
409 #endif
410 }
411
412 // Unlock the lane
413 __atomic_unlock(&readyQ.data[idx].lock);
414
415 // TODO print the queue statistics here
416
417 ^(readyQ.data[idx]){};
418 }
419
420 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
421
422 // Allocate new array (uses realloc and memcpies the data)
423 readyQ.data = alloc( ncount, readyQ.data`realloc );
424
425 // Fix the moved data
426 for( idx; ncount ) {
427 fix(readyQ.data[idx]);
428 }
429
430 fix_times(readyQ.tscs, ncount);
431 }
432 cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
433
434 // Fix the io times
435 cltr->sched.io.count = target * __shard_factor.io;
436 fix_times(cltr->sched.io.tscs, cltr->sched.io.count);
437
438 reassign_cltr_id(cltr);
439
440 cltr->sched.io.data = alloc( cltr->sched.io.count, cltr->sched.io.data`realloc );
441 reassign_cltr_io(cltr);
442
443 // Make sure that everything is consistent
444 /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
445 /* paranoid */ check_readyQ( cltr );
446
447 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
448 /* paranoid */ verify( ready_mutate_islocked() );
449}
450
451void ready_queue_close(struct cluster * cltr) {
452 free( cltr->sched.readyQ.data );
453 free( cltr->sched.readyQ.tscs );
454 cltr->sched.readyQ.data = 0p;
455 cltr->sched.readyQ.tscs = 0p;
456 cltr->sched.readyQ.count = 0;
457
458 free( cltr->sched.io.tscs );
459 free( cltr->sched.caches );
460}
461
462// Ctor
463void ?{}( __intrusive_lane_t & this ) {
464 this.lock = false;
465 this.prev = mock_head(this);
466 this.anchor.next = 0p;
467 this.anchor.ts = -1llu;
468 #if !defined(__CFA_NO_STATISTICS__)
469 this.cnt = 0;
470 #endif
471
472 // We add a boat-load of assertions here because the anchor code is very fragile
473 /* paranoid */ _Static_assert( offsetof( thread$, link ) == offsetof(__intrusive_lane_t, anchor) );
474 /* paranoid */ verify( offsetof( thread$, link ) == offsetof(__intrusive_lane_t, anchor) );
475 /* paranoid */ verify( ((uintptr_t)( mock_head(this) ) + offsetof( thread$, link )) == (uintptr_t)(&this.anchor) );
476 /* paranoid */ verify( &mock_head(this)->link.next == &this.anchor.next );
477 /* paranoid */ verify( &mock_head(this)->link.ts == &this.anchor.ts );
478 /* paranoid */ verify( mock_head(this)->link.next == 0p );
479 /* paranoid */ verify( mock_head(this)->link.ts == -1llu );
480 /* paranoid */ verify( mock_head(this) == this.prev );
481 /* paranoid */ verify( __alignof__(__intrusive_lane_t) == 128 );
482 /* paranoid */ verify( __alignof__(this) == 128 );
483 /* paranoid */ verifyf( ((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128) );
484}
485
486// Dtor is trivial
487void ^?{}( __intrusive_lane_t & this ) {
488 // Make sure the list is empty
489 /* paranoid */ verify( this.anchor.next == 0p );
490 /* paranoid */ verify( this.anchor.ts == -1llu );
491 /* paranoid */ verify( mock_head(this) == this.prev );
492}
493
494#if defined(CFA_HAVE_LINUX_LIBRSEQ)
495 // No definition needed
496#elif defined(CFA_HAVE_LINUX_RSEQ_H)
497
498 #if defined( __x86_64 ) || defined( __i386 )
499 #define RSEQ_SIG 0x53053053
500 #elif defined( __ARM_ARCH )
501 #ifdef __ARMEB__
502 #define RSEQ_SIG 0xf3def5e7 /* udf #24035 ; 0x5de3 (ARMv6+) */
503 #else
504 #define RSEQ_SIG 0xe7f5def3 /* udf #24035 ; 0x5de3 */
505 #endif
506 #endif
507
508 extern void __disable_interrupts_hard();
509 extern void __enable_interrupts_hard();
510
511 static void __kernel_raw_rseq_register (void) {
512 /* paranoid */ verify( __cfaabi_rseq.cpu_id == RSEQ_CPU_ID_UNINITIALIZED );
513
514 // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, (sigset_t *)0p, _NSIG / 8);
515 int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, RSEQ_SIG);
516 if(ret != 0) {
517 int e = errno;
518 switch(e) {
519 case EINVAL: abort("KERNEL ERROR: rseq register invalid argument");
520 case ENOSYS: abort("KERNEL ERROR: rseq register no supported");
521 case EFAULT: abort("KERNEL ERROR: rseq register with invalid argument");
522 case EBUSY : abort("KERNEL ERROR: rseq register already registered");
523 case EPERM : abort("KERNEL ERROR: rseq register sig argument on unregistration does not match the signature received on registration");
524 default: abort("KERNEL ERROR: rseq register unexpected return %d", e);
525 }
526 }
527 }
528
529 static void __kernel_raw_rseq_unregister(void) {
530 /* paranoid */ verify( __cfaabi_rseq.cpu_id >= 0 );
531
532 // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, (sigset_t *)0p, _NSIG / 8);
533 int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, RSEQ_SIG);
534 if(ret != 0) {
535 int e = errno;
536 switch(e) {
537 case EINVAL: abort("KERNEL ERROR: rseq unregister invalid argument");
538 case ENOSYS: abort("KERNEL ERROR: rseq unregister no supported");
539 case EFAULT: abort("KERNEL ERROR: rseq unregister with invalid argument");
540 case EBUSY : abort("KERNEL ERROR: rseq unregister already registered");
541 case EPERM : abort("KERNEL ERROR: rseq unregister sig argument on unregistration does not match the signature received on registration");
542 default: abort("KERNEL ERROR: rseq unregisteunexpected return %d", e);
543 }
544 }
545 }
546#else
547 // No definition needed
548#endif
Note: See TracBrowser for help on using the repository browser.