source: libcfa/src/concurrency/kernel/cluster.cfa@ c7f2d9b

ADT ast-experimental enum pthread-emulation qualifiedEnum
Last change on this file since c7f2d9b was bfb9bf5, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Fixed some warnings

  • Property mode set to 100644
File size: 16.6 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2022 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// cluster.cfa.cfa -- file that includes helpers for subsystem that need
8// cluster wide support
9//
10// Author : Thierry Delisle
11// Created On : Fri 03 11 12:39:24 2022
12// Last Modified By :
13// Last Modified On :
14// Update Count :
15//
16
17#define __cforall_thread__
18
19#include "bits/defs.hfa"
20#include "device/cpu.hfa"
21#include "kernel_private.hfa"
22
23#include "stdlib.hfa"
24#include "limits.hfa"
25#include "math.hfa"
26
27#include "ready_subqueue.hfa"
28
29#include <errno.h>
30#include <unistd.h>
31
32extern "C" {
33 #include <sys/syscall.h> // __NR_xxx
34}
35
36// No overriden function, no environment variable, no define
37// fall back to a magic number
38#ifndef __CFA_MAX_PROCESSORS__
39 #define __CFA_MAX_PROCESSORS__ 1024
40#endif
41
42#if !defined(__CFA_NO_STATISTICS__)
43 #define __STATS(...) __VA_ARGS__
44#else
45 #define __STATS(...)
46#endif
47
48// returns the maximum number of processors the RWLock support
49__attribute__((weak)) unsigned __max_processors() {
50 const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
51 if(!max_cores_s) {
52 __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
53 return __CFA_MAX_PROCESSORS__;
54 }
55
56 char * endptr = 0p;
57 long int max_cores_l = strtol(max_cores_s, &endptr, 10);
58 if(max_cores_l < 1 || max_cores_l > 65535) {
59 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
60 return __CFA_MAX_PROCESSORS__;
61 }
62 if('\0' != *endptr) {
63 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
64 return __CFA_MAX_PROCESSORS__;
65 }
66
67 return max_cores_l;
68}
69
70#if defined(CFA_HAVE_LINUX_LIBRSEQ)
71 // No forward declaration needed
72 #define __kernel_rseq_register rseq_register_current_thread
73 #define __kernel_rseq_unregister rseq_unregister_current_thread
74#elif defined(CFA_HAVE_LINUX_RSEQ_H)
75 static void __kernel_raw_rseq_register (void);
76 static void __kernel_raw_rseq_unregister(void);
77
78 #define __kernel_rseq_register __kernel_raw_rseq_register
79 #define __kernel_rseq_unregister __kernel_raw_rseq_unregister
80#else
81 // No forward declaration needed
82 // No initialization needed
83 static inline void noop(void) {}
84
85 #define __kernel_rseq_register noop
86 #define __kernel_rseq_unregister noop
87#endif
88
89//=======================================================================
90// Cluster wide reader-writer lock
91//=======================================================================
92void ?{}(__scheduler_RWLock_t & this) {
93 this.max = __max_processors();
94 this.alloc = 0;
95 this.ready = 0;
96 this.data = alloc(this.max);
97 this.write_lock = false;
98
99 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
100 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
101
102}
103void ^?{}(__scheduler_RWLock_t & this) {
104 free(this.data);
105}
106
107
108//=======================================================================
109// Lock-Free registering/unregistering of threads
110unsigned register_proc_id( void ) with(*__scheduler_lock) {
111 __kernel_rseq_register();
112
113 bool * handle = (bool *)&kernelTLS().sched_lock;
114
115 // Step - 1 : check if there is already space in the data
116 uint_fast32_t s = ready;
117
118 // Check among all the ready
119 for(uint_fast32_t i = 0; i < s; i++) {
120 bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems
121 /* paranoid */ verify( handle != *cell );
122
123 bool * null = 0p; // Re-write every loop since compare thrashes it
124 if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null
125 && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
126 /* paranoid */ verify(i < ready);
127 /* paranoid */ verify( (kernelTLS().sched_id = i, true) );
128 return i;
129 }
130 }
131
132 if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock->max);
133
134 // Step - 2 : F&A to get a new spot in the array.
135 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
136 if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock->max);
137
138 // Step - 3 : Mark space as used and then publish it.
139 data[n] = handle;
140 while() {
141 unsigned copy = n;
142 if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
143 && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
144 break;
145 Pause();
146 }
147
148 // Return new spot.
149 /* paranoid */ verify(n < ready);
150 /* paranoid */ verify( (kernelTLS().sched_id = n, true) );
151 return n;
152}
153
154void unregister_proc_id( unsigned id ) with(*__scheduler_lock) {
155 /* paranoid */ verify(id < ready);
156 /* paranoid */ verify(id == kernelTLS().sched_id);
157 /* paranoid */ verify(data[id] == &kernelTLS().sched_lock);
158
159 bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems
160
161 __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
162
163 __kernel_rseq_unregister();
164}
165
166//-----------------------------------------------------------------------
167// Writer side : acquire when changing the ready queue, e.g. adding more
168// queues or removing them.
169uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
170 /* paranoid */ verify( ! __preemption_enabled() );
171
172 // Step 1 : lock global lock
173 // It is needed to avoid processors that register mid Critical-Section
174 // to simply lock their own lock and enter.
175 __atomic_acquire( &write_lock );
176
177 // Make sure we won't deadlock ourself
178 // Checking before acquiring the writer lock isn't safe
179 // because someone else could have locked us.
180 /* paranoid */ verify( ! kernelTLS().sched_lock );
181
182 // Step 2 : lock per-proc lock
183 // Processors that are currently being registered aren't counted
184 // but can't be in read_lock or in the critical section.
185 // All other processors are counted
186 uint_fast32_t s = ready;
187 for(uint_fast32_t i = 0; i < s; i++) {
188 volatile bool * llock = data[i];
189 if(llock) __atomic_acquire( llock );
190 }
191
192 /* paranoid */ verify( ! __preemption_enabled() );
193 return s;
194}
195
196void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) {
197 /* paranoid */ verify( ! __preemption_enabled() );
198
199 // Step 1 : release local locks
200 // This must be done while the global lock is held to avoid
201 // threads that where created mid critical section
202 // to race to lock their local locks and have the writer
203 // immidiately unlock them
204 // Alternative solution : return s in write_lock and pass it to write_unlock
205 for(uint_fast32_t i = 0; i < last_s; i++) {
206 volatile bool * llock = data[i];
207 if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE);
208 }
209
210 // Step 2 : release global lock
211 /*paranoid*/ assert(true == write_lock);
212 __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE);
213
214 /* paranoid */ verify( ! __preemption_enabled() );
215}
216
217//=======================================================================
218// Cluster growth
219static const unsigned __readyq_single_shard = 2;
220
221//-----------------------------------------------------------------------
222// Check that all the intrusive queues in the data structure are still consistent
223static void check_readyQ( cluster * cltr ) with (cltr->sched) {
224 #if defined(__CFA_WITH_VERIFY__)
225 {
226 const unsigned lanes_count = readyQ.count;
227 for( idx ; lanes_count ) {
228 __intrusive_lane_t & sl = readyQ.data[idx];
229 assert(!readyQ.data[idx].lock);
230
231 if(is_empty(sl)) {
232 assert( sl.anchor.next == 0p );
233 assert( sl.anchor.ts == -1llu );
234 assert( mock_head(sl) == sl.prev );
235 } else {
236 assert( sl.anchor.next != 0p );
237 assert( sl.anchor.ts != -1llu );
238 assert( mock_head(sl) != sl.prev );
239 }
240 }
241 }
242 #endif
243}
244
245// Call this function of the intrusive list was moved using memcpy
246// fixes the list so that the pointers back to anchors aren't left dangling
247static inline void fix(__intrusive_lane_t & ll) {
248 if(is_empty(ll)) {
249 verify(ll.anchor.next == 0p);
250 ll.prev = mock_head(ll);
251 }
252}
253
254static void assign_list(unsigned & value, dlist(processor) & list, unsigned count) {
255 processor * it = &list`first;
256 for(unsigned i = 0; i < count; i++) {
257 /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
258 it->rdq.id = value;
259 it->rdq.target = MAX;
260 value += __shard_factor.readyq;
261 it = &(*it)`next;
262 }
263}
264
265static void reassign_cltr_id(struct cluster * cltr) {
266 unsigned preferred = 0;
267 assign_list(preferred, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
268 assign_list(preferred, cltr->procs.idles , cltr->procs.idle );
269}
270
271static void fix_times( __timestamp_t * volatile & tscs, unsigned count ) {
272 tscs = alloc(count, tscs`realloc);
273 for(i; count) {
274 tscs[i].tv = rdtscl();
275 tscs[i].ma = 0;
276 }
277}
278
279// Grow the ready queue
280void ready_queue_grow(struct cluster * cltr) {
281 int target = cltr->procs.total;
282
283 /* paranoid */ verify( ready_mutate_islocked() );
284 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
285
286 // Make sure that everything is consistent
287 /* paranoid */ check_readyQ( cltr );
288
289
290 // Find new count
291 // Make sure we always have atleast 1 list
292 size_t ocount = cltr->sched.readyQ.count;
293 size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
294
295 // Do we have to do anything?
296 if( ocount != ncount ) {
297
298 // grow the ready queue
299 with( cltr->sched ) {
300
301 // Allocate new array (uses realloc and memcpies the data)
302 readyQ.data = alloc( ncount, readyQ.data`realloc );
303
304 // Fix the moved data
305 for( idx; ocount ) {
306 fix(readyQ.data[idx]);
307 }
308
309 // Construct new data
310 for( idx; ocount ~ ncount) {
311 (readyQ.data[idx]){};
312 }
313
314 // Update original count
315 readyQ.count = ncount;
316 }
317
318
319 fix_times(cltr->sched.readyQ.tscs, cltr->sched.readyQ.count);
320 }
321
322 // realloc the caches
323 cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
324
325 // reassign the clusters.
326 reassign_cltr_id(cltr);
327
328 // Make sure that everything is consistent
329 /* paranoid */ check_readyQ( cltr );
330 /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
331
332 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
333
334 /* paranoid */ verify( ready_mutate_islocked() );
335}
336
337// Shrink the ready queue
338void ready_queue_shrink(struct cluster * cltr) {
339 /* paranoid */ verify( ready_mutate_islocked() );
340 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
341
342 // Make sure that everything is consistent
343 /* paranoid */ check_readyQ( cltr );
344
345 int target = cltr->procs.total;
346
347 with( cltr->sched ) {
348 // Remember old count
349 size_t ocount = readyQ.count;
350
351 // Find new count
352 // Make sure we always have atleast 1 list
353 size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard);
354 /* paranoid */ verifyf( ocount >= ncount, "Error in shrinking size calculation, %zu >= %zu", ocount, ncount );
355 /* paranoid */ verifyf( ncount == target * __shard_factor.readyq || ncount == __readyq_single_shard,
356 /* paranoid */ "Error in shrinking size calculation, expected %u or %u, got %zu", target * __shard_factor.readyq, __readyq_single_shard, ncount );
357
358 readyQ.count = ncount;
359
360 // for printing count the number of displaced threads
361 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
362 __attribute__((unused)) size_t displaced = 0;
363 #endif
364
365 // redistribute old data
366 for( idx; ncount ~ ocount) {
367 // Lock is not strictly needed but makes checking invariants much easier
368 __attribute__((unused)) bool locked = __atomic_try_acquire(&readyQ.data[idx].lock);
369 verify(locked);
370
371 // As long as we can pop from this lane to push the threads somewhere else in the queue
372 while(!is_empty(readyQ.data[idx])) {
373 struct thread$ * thrd;
374 unsigned long long _;
375 [thrd, _] = pop(readyQ.data[idx]);
376
377 push(cltr, thrd, true);
378
379 // for printing count the number of displaced threads
380 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
381 displaced++;
382 #endif
383 }
384
385 // Unlock the lane
386 __atomic_unlock(&readyQ.data[idx].lock);
387
388 // TODO print the queue statistics here
389
390 ^(readyQ.data[idx]){};
391 }
392
393 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
394
395 // Allocate new array (uses realloc and memcpies the data)
396 readyQ.data = alloc( ncount, readyQ.data`realloc );
397
398 // Fix the moved data
399 for( idx; ncount ) {
400 fix(readyQ.data[idx]);
401 }
402
403 fix_times(readyQ.tscs, ncount);
404 }
405 cltr->sched.caches = alloc( target, cltr->sched.caches`realloc );
406
407
408
409 reassign_cltr_id(cltr);
410
411 // Make sure that everything is consistent
412 /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) );
413 /* paranoid */ check_readyQ( cltr );
414
415 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
416 /* paranoid */ verify( ready_mutate_islocked() );
417}
418
419void ready_queue_close(struct cluster * cltr) {
420 free( cltr->sched.readyQ.data );
421 free( cltr->sched.readyQ.tscs );
422 cltr->sched.readyQ.data = 0p;
423 cltr->sched.readyQ.tscs = 0p;
424 cltr->sched.readyQ.count = 0;
425
426 free( cltr->sched.io.tscs );
427 free( cltr->sched.caches );
428}
429
430// Ctor
431void ?{}( __intrusive_lane_t & this ) {
432 this.lock = false;
433 this.prev = mock_head(this);
434 this.anchor.next = 0p;
435 this.anchor.ts = -1llu;
436 #if !defined(__CFA_NO_STATISTICS__)
437 this.cnt = 0;
438 #endif
439
440 // We add a boat-load of assertions here because the anchor code is very fragile
441 /* paranoid */ _Static_assert( offsetof( thread$, link ) == offsetof(__intrusive_lane_t, anchor) );
442 /* paranoid */ verify( offsetof( thread$, link ) == offsetof(__intrusive_lane_t, anchor) );
443 /* paranoid */ verify( ((uintptr_t)( mock_head(this) ) + offsetof( thread$, link )) == (uintptr_t)(&this.anchor) );
444 /* paranoid */ verify( &mock_head(this)->link.next == &this.anchor.next );
445 /* paranoid */ verify( &mock_head(this)->link.ts == &this.anchor.ts );
446 /* paranoid */ verify( mock_head(this)->link.next == 0p );
447 /* paranoid */ verify( mock_head(this)->link.ts == -1llu );
448 /* paranoid */ verify( mock_head(this) == this.prev );
449 /* paranoid */ verify( __alignof__(__intrusive_lane_t) == 128 );
450 /* paranoid */ verify( __alignof__(this) == 128 );
451 /* paranoid */ verifyf( ((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128) );
452}
453
454// Dtor is trivial
455void ^?{}( __intrusive_lane_t & this ) {
456 // Make sure the list is empty
457 /* paranoid */ verify( this.anchor.next == 0p );
458 /* paranoid */ verify( this.anchor.ts == -1llu );
459 /* paranoid */ verify( mock_head(this) == this.prev );
460}
461
462#if defined(CFA_HAVE_LINUX_LIBRSEQ)
463 // No definition needed
464#elif defined(CFA_HAVE_LINUX_RSEQ_H)
465
466 #if defined( __x86_64 ) || defined( __i386 )
467 #define RSEQ_SIG 0x53053053
468 #elif defined( __ARM_ARCH )
469 #ifdef __ARMEB__
470 #define RSEQ_SIG 0xf3def5e7 /* udf #24035 ; 0x5de3 (ARMv6+) */
471 #else
472 #define RSEQ_SIG 0xe7f5def3 /* udf #24035 ; 0x5de3 */
473 #endif
474 #endif
475
476 extern void __disable_interrupts_hard();
477 extern void __enable_interrupts_hard();
478
479 static void __kernel_raw_rseq_register (void) {
480 /* paranoid */ verify( __cfaabi_rseq.cpu_id == RSEQ_CPU_ID_UNINITIALIZED );
481
482 // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, (sigset_t *)0p, _NSIG / 8);
483 int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, RSEQ_SIG);
484 if(ret != 0) {
485 int e = errno;
486 switch(e) {
487 case EINVAL: abort("KERNEL ERROR: rseq register invalid argument");
488 case ENOSYS: abort("KERNEL ERROR: rseq register no supported");
489 case EFAULT: abort("KERNEL ERROR: rseq register with invalid argument");
490 case EBUSY : abort("KERNEL ERROR: rseq register already registered");
491 case EPERM : abort("KERNEL ERROR: rseq register sig argument on unregistration does not match the signature received on registration");
492 default: abort("KERNEL ERROR: rseq register unexpected return %d", e);
493 }
494 }
495 }
496
497 static void __kernel_raw_rseq_unregister(void) {
498 /* paranoid */ verify( __cfaabi_rseq.cpu_id >= 0 );
499
500 // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, (sigset_t *)0p, _NSIG / 8);
501 int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, RSEQ_SIG);
502 if(ret != 0) {
503 int e = errno;
504 switch(e) {
505 case EINVAL: abort("KERNEL ERROR: rseq unregister invalid argument");
506 case ENOSYS: abort("KERNEL ERROR: rseq unregister no supported");
507 case EFAULT: abort("KERNEL ERROR: rseq unregister with invalid argument");
508 case EBUSY : abort("KERNEL ERROR: rseq unregister already registered");
509 case EPERM : abort("KERNEL ERROR: rseq unregister sig argument on unregistration does not match the signature received on registration");
510 default: abort("KERNEL ERROR: rseq unregisteunexpected return %d", e);
511 }
512 }
513 }
514#else
515 // No definition needed
516#endif
Note: See TracBrowser for help on using the repository browser.