source: libcfa/src/concurrency/kernel/cluster.cfa@ 4f102fa

ADT ast-experimental
Last change on this file since 4f102fa was 31c967b, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Changed ready-queue so I can easily change the averaging algorithm.
Changed averaging to use logscale.

  • Property mode set to 100644
File size: 18.3 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2022 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// cluster.cfa -- file that includes helpers for subsystem that need cluster wide support
8//
9// Author : Thierry Delisle
10// Created On : Fri Mar 11 12:39:24 2022
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
17#define _GNU_SOURCE
18
19#include "bits/defs.hfa"
20#include "device/cpu.hfa"
21#include "kernel/cluster.hfa"
22#include "kernel/private.hfa"
23
24#include "stdlib.hfa"
25#include "limits.hfa"
26#include "math.hfa"
27
28#include "ready_subqueue.hfa"
29#include "io/types.hfa"
30
31#include <errno.h>
32#include <unistd.h>
33
34extern "C" {
35 #include <sys/syscall.h> // __NR_xxx
36}
37
38// No overriden function, no environment variable, no define
39// fall back to a magic number
40#ifndef __CFA_MAX_PROCESSORS__
41 #define __CFA_MAX_PROCESSORS__ 1024
42#endif
43
44#if !defined(__CFA_NO_STATISTICS__)
45 #define __STATS(...) __VA_ARGS__
46#else
47 #define __STATS(...)
48#endif
49
50// returns the maximum number of processors the RWLock support
51__attribute__((weak)) unsigned __max_processors() libcfa_public {
52 const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
53 if(!max_cores_s) {
54 __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
55 return __CFA_MAX_PROCESSORS__;
56 }
57
58 char * endptr = 0p;
59 long int max_cores_l = strtol(max_cores_s, &endptr, 10);
60 if(max_cores_l < 1 || max_cores_l > 65535) {
61 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
62 return __CFA_MAX_PROCESSORS__;
63 }
64 if('\0' != *endptr) {
65 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
66 return __CFA_MAX_PROCESSORS__;
67 }
68
69 return max_cores_l;
70}
71
72#if defined(CFA_HAVE_LINUX_LIBRSEQ)
73 // No forward declaration needed
74 #define __kernel_rseq_register rseq_register_current_thread
75 #define __kernel_rseq_unregister rseq_unregister_current_thread
76#elif defined(CFA_HAVE_LINUX_RSEQ_H)
77 static void __kernel_raw_rseq_register (void);
78 static void __kernel_raw_rseq_unregister(void);
79
80 #define __kernel_rseq_register __kernel_raw_rseq_register
81 #define __kernel_rseq_unregister __kernel_raw_rseq_unregister
82#else
83 // No forward declaration needed
84 // No initialization needed
85 static inline void noop(void) {}
86
87 #define __kernel_rseq_register noop
88 #define __kernel_rseq_unregister noop
89#endif
90
91//=======================================================================
92// Cluster wide reader-writer lock
93//=======================================================================
94void ?{}(__scheduler_RWLock_t & this) {
95 this.lock.max = __max_processors();
96 this.lock.alloc = 0;
97 this.lock.ready = 0;
98 this.lock.data = alloc(this.lock.max);
99 this.lock.write_lock = false;
100
101 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.lock.alloc), &this.lock.alloc));
102 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.lock.ready), &this.lock.ready));
103
104}
105void ^?{}(__scheduler_RWLock_t & this) {
106 free(this.lock.data);
107}
108
109
110//=======================================================================
111// Lock-Free registering/unregistering of threads
112unsigned register_proc_id( void ) with(__scheduler_lock.lock) {
113 __kernel_rseq_register();
114
115 bool * handle = (bool *)&kernelTLS().sched_lock;
116
117 // Step - 1 : check if there is already space in the data
118 uint_fast32_t s = ready;
119
120 // Check among all the ready
121 for(uint_fast32_t i = 0; i < s; i++) {
122 bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems
123 /* paranoid */ verify( handle != *cell );
124
125 bool * null = 0p; // Re-write every loop since compare thrashes it
126 if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null
127 && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
128 /* paranoid */ verify(i < ready);
129 /* paranoid */ verify( (kernelTLS().sched_id = i, true) );
130 return i;
131 }
132 }
133
134 if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock.lock.max);
135
136 // Step - 2 : F&A to get a new spot in the array.
137 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
138 if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock.lock.max);
139
140 // Step - 3 : Mark space as used and then publish it.
141 data[n] = handle;
142 while() {
143 unsigned copy = n;
144 if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
145 && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
146 break;
147 Pause();
148 }
149
150 // Return new spot.
151 /* paranoid */ verify(n < ready);
152 /* paranoid */ verify( (kernelTLS().sched_id = n, true) );
153 return n;
154}
155
156void unregister_proc_id( unsigned id ) with(__scheduler_lock.lock) {
157 /* paranoid */ verify(id < ready);
158 /* paranoid */ verify(id == kernelTLS().sched_id);
159 /* paranoid */ verify(data[id] == &kernelTLS().sched_lock);
160
161 bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems
162
163 __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
164
165 __kernel_rseq_unregister();
166}
167
168//-----------------------------------------------------------------------
169// Writer side : acquire when changing the ready queue, e.g. adding more
170// queues or removing them.
171uint_fast32_t ready_mutate_lock( void ) with(__scheduler_lock.lock) {
172 /* paranoid */ verify( ! __preemption_enabled() );
173
174 // Step 1 : lock global lock
175 // It is needed to avoid processors that register mid Critical-Section
176 // to simply lock their own lock and enter.
177 __atomic_acquire( &write_lock );
178
179 // Make sure we won't deadlock ourself
180 // Checking before acquiring the writer lock isn't safe
181 // because someone else could have locked us.
182 /* paranoid */ verify( ! kernelTLS().sched_lock );
183
184 // Step 2 : lock per-proc lock
185 // Processors that are currently being registered aren't counted
186 // but can't be in read_lock or in the critical section.
187 // All other processors are counted
188 uint_fast32_t s = ready;
189 for(uint_fast32_t i = 0; i < s; i++) {
190 volatile bool * llock = data[i];
191 if(llock) __atomic_acquire( llock );
192 }
193
194 /* paranoid */ verify( ! __preemption_enabled() );
195 return s;
196}
197
198void ready_mutate_unlock( uint_fast32_t last_s ) with(__scheduler_lock.lock) {
199 /* paranoid */ verify( ! __preemption_enabled() );
200
201 // Step 1 : release local locks
202 // This must be done while the global lock is held to avoid
203 // threads that where created mid critical section
204 // to race to lock their local locks and have the writer
205 // immidiately unlock them
206 // Alternative solution : return s in write_lock and pass it to write_unlock
207 for(uint_fast32_t i = 0; i < last_s; i++) {
208 volatile bool * llock = data[i];
209 if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE);
210 }
211
212 // Step 2 : release global lock
213 /*paranoid*/ assert(true == write_lock);
214 __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE);
215
216 /* paranoid */ verify( ! __preemption_enabled() );
217}
218
219//=======================================================================
220// Cluster growth
221static const unsigned __readyq_single_shard = 2;
222
223void ?{}(__timestamp_t & this) { this.t.tv = 0; this.t.ma = 0; }
224void ^?{}(__timestamp_t &) {}
225
226//-----------------------------------------------------------------------
227// Check that all the intrusive queues in the data structure are still consistent
228static void check_readyQ( cluster * cltr ) with (cltr->sched) {
229 #if defined(__CFA_WITH_VERIFY__)
230 {
231 const unsigned lanes_count = readyQ.count;
232 for( idx ; lanes_count ) {
233 __intrusive_lane_t & sl = readyQ.data[idx];
234 assert(!readyQ.data[idx].l.lock);
235
236 if(is_empty(sl)) {
237 assert( sl.l.anchor.next == 0p );
238 assert( sl.l.anchor.ts == MAX );
239 assert( mock_head(sl) == sl.l.prev );
240 } else {
241 assert( sl.l.anchor.next != 0p );
242 assert( sl.l.anchor.ts != MAX );
243 assert( mock_head(sl) != sl.l.prev );
244 }
245 }
246 }
247 #endif
248}
249
250// Call this function of the intrusive list was moved using memcpy
251// fixes the list so that the pointers back to anchors aren't left dangling
252static inline void fix(__intrusive_lane_t & ll) {
253 if(is_empty(ll)) {
254 verify(ll.l.anchor.next == 0p);
255 ll.l.prev = mock_head(ll);
256 }
257}
258
259static void assign_list(unsigned & valrq, unsigned & valio, dlist(struct processor) & list, unsigned count) {
260 struct processor * it = &list`first;
261 for(unsigned i = 0; i < count; i++) {
262 /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
263 it->rdq.id = valrq;
264 it->rdq.target = UINT_MAX;
265 valrq += __shard_factor.readyq;
266 #if defined(CFA_HAVE_LINUX_IO_URING_H)
267 it->io.ctx->cq.id = valio;
268 it->io.target = UINT_MAX;
269 valio += __shard_factor.io;
270 #endif
271 it = &(*it)`next;
272 }
273}
274
275static void reassign_cltr_id(struct cluster * cltr) {
276 unsigned prefrq = 0;
277 unsigned prefio = 0;
278 assign_list(prefrq, prefio, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
279 assign_list(prefrq, prefio, cltr->procs.idles , cltr->procs.idle );
280}
281
282#if defined(CFA_HAVE_LINUX_IO_URING_H)
283 static void assign_io(io_context$ ** data, size_t count, dlist(struct processor) & list) {
284 struct processor * it = &list`first;
285 while(it) {
286 /* paranoid */ verifyf( it, "Unexpected null iterator\n");
287 /* paranoid */ verifyf( it->io.ctx->cq.id < count, "Processor %p has id %u above count %zu\n", it, it->rdq.id, count);
288 data[it->io.ctx->cq.id] = it->io.ctx;
289 it = &(*it)`next;
290 }
291 }
292
293 static void reassign_cltr_io(struct cluster * cltr) {
294 assign_io(cltr->sched.io.data, cltr->sched.io.count, cltr->procs.actives);
295 assign_io(cltr->sched.io.data, cltr->sched.io.count, cltr->procs.idles );
296 }
297#else
298 static void reassign_cltr_io(struct cluster *) {}
299#endif
300
301static void fix_times( __timestamp_t * volatile & tscs, unsigned count ) {
302 tscs = alloc(count, tscs`realloc);
303 for(i; count) {
304 tscs[i].t.tv = rdtscl();
305 tscs[i].t.ma = 0;
306 }
307}
308
309// Grow the ready queue
310void ready_queue_grow(struct cluster * cltr) {
311 int target = cltr->procs.total;
312
313 /* paranoid */ verify( ready_mutate_islocked() );
314 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
315
316 // Make sure that everything is consistent
317 /* paranoid */ check_readyQ( cltr );
318
319
320 // Find new count
321 // Make sure we always have atleast 1 list
322 size_t ocount = cltr->sched.readyQ.count;
323 size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
324
325 // Do we have to do anything?
326 if( ocount != ncount ) {
327
328 // grow the ready queue
329 with( cltr->sched ) {
330
331 // Allocate new array (uses realloc and memcpies the data)
332 readyQ.data = alloc( ncount, readyQ.data`realloc );
333
334 // Fix the moved data
335 for( idx; ocount ) {
336 fix(readyQ.data[idx]);
337 }
338
339 // Construct new data
340 for( idx; ocount ~ ncount) {
341 (readyQ.data[idx]){};
342 }
343
344 // Update original count
345 readyQ.count = ncount;
346 }
347
348
349 fix_times(cltr->sched.readyQ.tscs, cltr->sched.readyQ.count);
350 }
351
352 // Fix the io times
353 cltr->sched.io.count = target * __shard_factor.io;
354 fix_times(cltr->sched.io.tscs, cltr->sched.io.count);
355
356 // realloc the caches
357 cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
358
359 // reassign the clusters.
360 reassign_cltr_id(cltr);
361
362 cltr->sched.io.data = alloc( cltr->sched.io.count, cltr->sched.io.data`realloc );
363 reassign_cltr_io(cltr);
364
365 // Make sure that everything is consistent
366 /* paranoid */ check_readyQ( cltr );
367 /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
368
369 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
370
371 /* paranoid */ verify( ready_mutate_islocked() );
372}
373
374// Shrink the ready queue
375void ready_queue_shrink(struct cluster * cltr) {
376 /* paranoid */ verify( ready_mutate_islocked() );
377 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
378
379 // Make sure that everything is consistent
380 /* paranoid */ check_readyQ( cltr );
381
382 int target = cltr->procs.total;
383
384 with( cltr->sched ) {
385 // Remember old count
386 size_t ocount = readyQ.count;
387
388 // Find new count
389 // Make sure we always have atleast 1 list
390 size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
391 /* paranoid */ verifyf( ocount >= ncount, "Error in shrinking size calculation, %zu >= %zu", ocount, ncount );
392 /* paranoid */ verifyf( ncount == target * __shard_factor.readyq || ncount == __readyq_single_shard,
393 /* paranoid */ "Error in shrinking size calculation, expected %u or %u, got %zu", target * __shard_factor.readyq, __readyq_single_shard, ncount );
394
395 readyQ.count = ncount;
396
397 // for printing count the number of displaced threads
398 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
399 __attribute__((unused)) size_t displaced = 0;
400 #endif
401
402 // redistribute old data
403 for( idx; ncount ~ ocount) {
404 // Lock is not strictly needed but makes checking invariants much easier
405 __attribute__((unused)) bool locked = __atomic_try_acquire(&readyQ.data[idx].l.lock);
406 verify(locked);
407
408 // As long as we can pop from this lane to push the threads somewhere else in the queue
409 while(!is_empty(readyQ.data[idx])) {
410 struct thread$ * thrd;
411 unsigned long long _;
412 [thrd, _] = pop(readyQ.data[idx]);
413
414 push(cltr, thrd, true);
415
416 // for printing count the number of displaced threads
417 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
418 displaced++;
419 #endif
420 }
421
422 // Unlock the lane
423 __atomic_unlock(&readyQ.data[idx].l.lock);
424
425 // TODO print the queue statistics here
426
427 ^(readyQ.data[idx]){};
428 }
429
430 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
431
432 // Allocate new array (uses realloc and memcpies the data)
433 readyQ.data = alloc( ncount, readyQ.data`realloc );
434
435 // Fix the moved data
436 for( idx; ncount ) {
437 fix(readyQ.data[idx]);
438 }
439
440 fix_times(readyQ.tscs, ncount);
441 }
442 cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
443
444 // Fix the io times
445 cltr->sched.io.count = target * __shard_factor.io;
446 fix_times(cltr->sched.io.tscs, cltr->sched.io.count);
447
448 reassign_cltr_id(cltr);
449
450 cltr->sched.io.data = alloc( cltr->sched.io.count, cltr->sched.io.data`realloc );
451 reassign_cltr_io(cltr);
452
453 // Make sure that everything is consistent
454 /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
455 /* paranoid */ check_readyQ( cltr );
456
457 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
458 /* paranoid */ verify( ready_mutate_islocked() );
459}
460
461void ready_queue_close(struct cluster * cltr) {
462 free( cltr->sched.readyQ.data );
463 free( cltr->sched.readyQ.tscs );
464 cltr->sched.readyQ.data = 0p;
465 cltr->sched.readyQ.tscs = 0p;
466 cltr->sched.readyQ.count = 0;
467
468 free( cltr->sched.io.tscs );
469 free( cltr->sched.caches );
470}
471
472#define nested_offsetof(type, field) ((off_t)(&(((type*)0)-> field)))
473
474// Ctor
475void ?{}( __intrusive_lane_t & this ) {
476 this.l.lock = false;
477 this.l.prev = mock_head(this);
478 this.l.anchor.next = 0p;
479 this.l.anchor.ts = MAX;
480 #if !defined(__CFA_NO_STATISTICS__)
481 this.l.cnt = 0;
482 #endif
483
484 // We add a boat-load of assertions here because the anchor code is very fragile
485 /* paranoid */ _Static_assert( offsetof( thread$, link ) == nested_offsetof(__intrusive_lane_t, l.anchor) );
486 /* paranoid */ verify( offsetof( thread$, link ) == nested_offsetof(__intrusive_lane_t, l.anchor) );
487 /* paranoid */ verify( ((uintptr_t)( mock_head(this) ) + offsetof( thread$, link )) == (uintptr_t)(&this.l.anchor) );
488 /* paranoid */ verify( &mock_head(this)->link.next == &this.l.anchor.next );
489 /* paranoid */ verify( &mock_head(this)->link.ts == &this.l.anchor.ts );
490 /* paranoid */ verify( mock_head(this)->link.next == 0p );
491 /* paranoid */ verify( mock_head(this)->link.ts == MAX );
492 /* paranoid */ verify( mock_head(this) == this.l.prev );
493 /* paranoid */ verify( __alignof__(__intrusive_lane_t) == 64 );
494 /* paranoid */ verify( __alignof__(this) == 64 );
495 /* paranoid */ verifyf( ((intptr_t)(&this) % 64) == 0, "Expected address to be aligned %p %% 64 == %zd", &this, ((intptr_t)(&this) % 64) );
496}
497
498#undef nested_offsetof
499
500// Dtor is trivial
501void ^?{}( __intrusive_lane_t & this ) {
502 // Make sure the list is empty
503 /* paranoid */ verify( this.l.anchor.next == 0p );
504 /* paranoid */ verify( this.l.anchor.ts == MAX );
505 /* paranoid */ verify( mock_head(this) == this.l.prev );
506}
507
508#if defined(CFA_HAVE_LINUX_LIBRSEQ)
509 // No definition needed
510#elif defined(CFA_HAVE_LINUX_RSEQ_H)
511
512 #if defined( __x86_64 ) || defined( __i386 )
513 #define RSEQ_SIG 0x53053053
514 #elif defined( __ARM_ARCH )
515 #ifdef __ARMEB__
516 #define RSEQ_SIG 0xf3def5e7 /* udf #24035 ; 0x5de3 (ARMv6+) */
517 #else
518 #define RSEQ_SIG 0xe7f5def3 /* udf #24035 ; 0x5de3 */
519 #endif
520 #endif
521
522 extern void __disable_interrupts_hard();
523 extern void __enable_interrupts_hard();
524
525 static void __kernel_raw_rseq_register (void) {
526 /* paranoid */ verify( __cfaabi_rseq.cpu_id == RSEQ_CPU_ID_UNINITIALIZED );
527
528 // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, (sigset_t *)0p, _NSIG / 8);
529 int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, RSEQ_SIG);
530 if(ret != 0) {
531 int e = errno;
532 switch(e) {
533 case EINVAL: abort("KERNEL ERROR: rseq register invalid argument");
534 case ENOSYS: abort("KERNEL ERROR: rseq register no supported");
535 case EFAULT: abort("KERNEL ERROR: rseq register with invalid argument");
536 case EBUSY : abort("KERNEL ERROR: rseq register already registered");
537 case EPERM : abort("KERNEL ERROR: rseq register sig argument on unregistration does not match the signature received on registration");
538 default: abort("KERNEL ERROR: rseq register unexpected return %d", e);
539 }
540 }
541 }
542
543 static void __kernel_raw_rseq_unregister(void) {
544 /* paranoid */ verify( __cfaabi_rseq.cpu_id >= 0 );
545
546 // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, (sigset_t *)0p, _NSIG / 8);
547 int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, RSEQ_SIG);
548 if(ret != 0) {
549 int e = errno;
550 switch(e) {
551 case EINVAL: abort("KERNEL ERROR: rseq unregister invalid argument");
552 case ENOSYS: abort("KERNEL ERROR: rseq unregister no supported");
553 case EFAULT: abort("KERNEL ERROR: rseq unregister with invalid argument");
554 case EBUSY : abort("KERNEL ERROR: rseq unregister already registered");
555 case EPERM : abort("KERNEL ERROR: rseq unregister sig argument on unregistration does not match the signature received on registration");
556 default: abort("KERNEL ERROR: rseq unregisteunexpected return %d", e);
557 }
558 }
559 }
560#else
561 // No definition needed
562#endif
Note: See TracBrowser for help on using the repository browser.