source: libcfa/src/concurrency/ready_queue.cfa@ 7f51b9d

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since 7f51b9d was 62502cc4, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Fixed deadlock where threads could acquire the central scheduler lock for writing while preemption was enabled, leading to any attempt at running any thread to deadlock.
Also added runtime checks to catch new code which could forget to disable interrupts

  • Property mode set to 100644
File size: 18.0 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
17// #define __CFA_DEBUG_PRINT_READY_QUEUE__
18
19// #define USE_SNZI
20
21#include "bits/defs.hfa"
22#include "kernel_private.hfa"
23
24#define _GNU_SOURCE
25#include "stdlib.hfa"
26#include "math.hfa"
27
28#include <unistd.h>
29
30#include "snzi.hfa"
31#include "ready_subqueue.hfa"
32
33static const size_t cache_line_size = 64;
34
35// No overriden function, no environment variable, no define
36// fall back to a magic number
37#ifndef __CFA_MAX_PROCESSORS__
38 #define __CFA_MAX_PROCESSORS__ 1024
39#endif
40
41#define BIAS 16
42
43// returns the maximum number of processors the RWLock support
44__attribute__((weak)) unsigned __max_processors() {
45 const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
46 if(!max_cores_s) {
47 __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
48 return __CFA_MAX_PROCESSORS__;
49 }
50
51 char * endptr = 0p;
52 long int max_cores_l = strtol(max_cores_s, &endptr, 10);
53 if(max_cores_l < 1 || max_cores_l > 65535) {
54 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
55 return __CFA_MAX_PROCESSORS__;
56 }
57 if('\0' != *endptr) {
58 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
59 return __CFA_MAX_PROCESSORS__;
60 }
61
62 return max_cores_l;
63}
64
65//=======================================================================
66// Cluster wide reader-writer lock
67//=======================================================================
68void ?{}(__scheduler_RWLock_t & this) {
69 this.max = __max_processors();
70 this.alloc = 0;
71 this.ready = 0;
72 this.lock = false;
73 this.data = alloc(this.max);
74
75 /*paranoid*/ verify( 0 == (((uintptr_t)(this.data )) % 64) );
76 /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
77 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
78 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
79
80}
81void ^?{}(__scheduler_RWLock_t & this) {
82 free(this.data);
83}
84
85void ?{}( __scheduler_lock_id_t & this, __processor_id_t * proc ) {
86 this.handle = proc;
87 this.lock = false;
88 #ifdef __CFA_WITH_VERIFY__
89 this.owned = false;
90 #endif
91}
92
93//=======================================================================
94// Lock-Free registering/unregistering of threads
95unsigned doregister( struct __processor_id_t * proc ) with(*__scheduler_lock) {
96 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
97
98 // Step - 1 : check if there is already space in the data
99 uint_fast32_t s = ready;
100
101 // Check among all the ready
102 for(uint_fast32_t i = 0; i < s; i++) {
103 __processor_id_t * null = 0p; // Re-write every loop since compare thrashes it
104 if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
105 && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
106 /*paranoid*/ verify(i < ready);
107 /*paranoid*/ verify(0 == (__alignof__(data[i]) % cache_line_size));
108 /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
109 return i;
110 }
111 }
112
113 if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock->max);
114
115 // Step - 2 : F&A to get a new spot in the array.
116 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
117 if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock->max);
118
119 // Step - 3 : Mark space as used and then publish it.
120 __scheduler_lock_id_t * storage = (__scheduler_lock_id_t *)&data[n];
121 (*storage){ proc };
122 while(true) {
123 unsigned copy = n;
124 if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
125 && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
126 break;
127 asm volatile("pause");
128 }
129
130 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);
131
132 // Return new spot.
133 /*paranoid*/ verify(n < ready);
134 /*paranoid*/ verify(__alignof__(data[n]) == (2 * cache_line_size));
135 /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
136 return n;
137}
138
139void unregister( struct __processor_id_t * proc ) with(*__scheduler_lock) {
140 unsigned id = proc->id;
141 /*paranoid*/ verify(id < ready);
142 /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
143 __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
144
145 __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
146}
147
148//-----------------------------------------------------------------------
149// Writer side : acquire when changing the ready queue, e.g. adding more
150// queues or removing them.
151uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
152 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
153
154 // Step 1 : lock global lock
155 // It is needed to avoid processors that register mid Critical-Section
156 // to simply lock their own lock and enter.
157 __atomic_acquire( &lock );
158
159 // Step 2 : lock per-proc lock
160 // Processors that are currently being registered aren't counted
161 // but can't be in read_lock or in the critical section.
162 // All other processors are counted
163 uint_fast32_t s = ready;
164 for(uint_fast32_t i = 0; i < s; i++) {
165 __atomic_acquire( &data[i].lock );
166 }
167
168 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
169 return s;
170}
171
172void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) {
173 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
174
175 // Step 1 : release local locks
176 // This must be done while the global lock is held to avoid
177 // threads that where created mid critical section
178 // to race to lock their local locks and have the writer
179 // immidiately unlock them
180 // Alternative solution : return s in write_lock and pass it to write_unlock
181 for(uint_fast32_t i = 0; i < last_s; i++) {
182 verify(data[i].lock);
183 __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
184 }
185
186 // Step 2 : release global lock
187 /*paranoid*/ assert(true == lock);
188 __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
189
190 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
191}
192
193//=======================================================================
194// Cforall Reqdy Queue used for scheduling
195//=======================================================================
196void ?{}(__ready_queue_t & this) with (this) {
197 lanes.data = 0p;
198 lanes.count = 0;
199}
200
201void ^?{}(__ready_queue_t & this) with (this) {
202 verify( 1 == lanes.count );
203 #ifdef USE_SNZI
204 verify( !query( snzi ) );
205 #endif
206 free(lanes.data);
207}
208
209//-----------------------------------------------------------------------
210__attribute__((hot)) bool query(struct cluster * cltr) {
211 #ifdef USE_SNZI
212 return query(cltr->ready_queue.snzi);
213 #endif
214 return true;
215}
216
217//-----------------------------------------------------------------------
218__attribute__((hot)) bool push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
219 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
220
221 // write timestamp
222 thrd->link.ts = rdtscl();
223
224 #if defined(BIAS) && !defined(__CFA_NO_STATISTICS__)
225 bool local = false;
226 int preferred =
227 //*
228 kernelTLS.this_processor ? kernelTLS.this_processor->id * 4 : -1;
229 /*/
230 thrd->link.preferred * 4;
231 //*/
232
233
234 #endif
235
236 // Try to pick a lane and lock it
237 unsigned i;
238 do {
239 // Pick the index of a lane
240 #if defined(BIAS)
241 unsigned r = __tls_rand();
242 unsigned rlow = r % BIAS;
243 unsigned rhigh = r / BIAS;
244 if((0 != rlow) && preferred >= 0) {
245 // (BIAS - 1) out of BIAS chances
246 // Use perferred queues
247 i = preferred + (rhigh % 4);
248
249 #if !defined(__CFA_NO_STATISTICS__)
250 local = true;
251 __tls_stats()->ready.pick.push.local++;
252 #endif
253 }
254 else {
255 // 1 out of BIAS chances
256 // Use all queues
257 i = rhigh;
258 local = false;
259 }
260 #else
261 i = __tls_rand();
262 #endif
263
264 i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
265
266 #if !defined(__CFA_NO_STATISTICS__)
267 __tls_stats()->ready.pick.push.attempt++;
268 #endif
269
270 // If we can't lock it retry
271 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
272
273 bool first = false;
274
275 // Actually push it
276 bool lane_first = push(lanes.data[i], thrd);
277
278 #ifdef USE_SNZI
279 // If this lane used to be empty we need to do more
280 if(lane_first) {
281 // Check if the entire queue used to be empty
282 first = !query(snzi);
283
284 // Update the snzi
285 arrive( snzi, i );
286 }
287 #endif
288
289 // Unlock and return
290 __atomic_unlock( &lanes.data[i].lock );
291
292 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
293
294 // Update statistics
295 #if !defined(__CFA_NO_STATISTICS__)
296 #if defined(BIAS)
297 if( local ) __tls_stats()->ready.pick.push.lsuccess++;
298 #endif
299 __tls_stats()->ready.pick.push.success++;
300 #endif
301
302 // return whether or not the list was empty before this push
303 return first;
304}
305
306static struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j);
307static struct $thread * try_pop(struct cluster * cltr, unsigned i);
308
309// Pop from the ready queue from a given cluster
310__attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) {
311 /* paranoid */ verify( lanes.count > 0 );
312 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
313 #if defined(BIAS)
314 // Don't bother trying locally too much
315 int local_tries = 8;
316 #endif
317
318 // As long as the list is not empty, try finding a lane that isn't empty and pop from it
319 #ifdef USE_SNZI
320 while( query(snzi) ) {
321 #else
322 for(25) {
323 #endif
324 // Pick two lists at random
325 unsigned i,j;
326 #if defined(BIAS)
327 #if !defined(__CFA_NO_STATISTICS__)
328 bool local = false;
329 #endif
330 uint64_t r = __tls_rand();
331 unsigned rlow = r % BIAS;
332 uint64_t rhigh = r / BIAS;
333 if(local_tries && 0 != rlow) {
334 // (BIAS - 1) out of BIAS chances
335 // Use perferred queues
336 unsigned pid = kernelTLS.this_processor->id * 4;
337 i = pid + (rhigh % 4);
338 j = pid + ((rhigh >> 32ull) % 4);
339
340 // count the tries
341 local_tries--;
342
343 #if !defined(__CFA_NO_STATISTICS__)
344 local = true;
345 __tls_stats()->ready.pick.pop.local++;
346 #endif
347 }
348 else {
349 // 1 out of BIAS chances
350 // Use all queues
351 i = rhigh;
352 j = rhigh >> 32ull;
353 }
354 #else
355 i = __tls_rand();
356 j = __tls_rand();
357 #endif
358
359 i %= count;
360 j %= count;
361
362 // try popping from the 2 picked lists
363 struct $thread * thrd = try_pop(cltr, i, j);
364 if(thrd) {
365 #if defined(BIAS) && !defined(__CFA_NO_STATISTICS__)
366 if( local ) __tls_stats()->ready.pick.pop.lsuccess++;
367 #endif
368 return thrd;
369 }
370 }
371
372 // All lanes where empty return 0p
373 return 0p;
374}
375
376__attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
377 /* paranoid */ verify( lanes.count > 0 );
378 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
379 unsigned offset = __tls_rand();
380 for(i; count) {
381 unsigned idx = (offset + i) % count;
382 struct $thread * thrd = try_pop(cltr, idx);
383 if(thrd) {
384 return thrd;
385 }
386 }
387
388 // All lanes where empty return 0p
389 return 0p;
390}
391
392
393//-----------------------------------------------------------------------
394// Given 2 indexes, pick the list with the oldest push an try to pop from it
395static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
396 #if !defined(__CFA_NO_STATISTICS__)
397 __tls_stats()->ready.pick.pop.attempt++;
398 #endif
399
400 // Pick the bet list
401 int w = i;
402 if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
403 w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
404 }
405
406 return try_pop(cltr, w);
407}
408
409static inline struct $thread * try_pop(struct cluster * cltr, unsigned w) with (cltr->ready_queue) {
410 // Get relevant elements locally
411 __intrusive_lane_t & lane = lanes.data[w];
412
413 // If list looks empty retry
414 if( is_empty(lane) ) return 0p;
415
416 // If we can't get the lock retry
417 if( !__atomic_try_acquire(&lane.lock) ) return 0p;
418
419
420 // If list is empty, unlock and retry
421 if( is_empty(lane) ) {
422 __atomic_unlock(&lane.lock);
423 return 0p;
424 }
425
426 // Actually pop the list
427 struct $thread * thrd;
428 thrd = pop(lane);
429
430 /* paranoid */ verify(thrd);
431 /* paranoid */ verify(lane.lock);
432
433 #ifdef USE_SNZI
434 // If this was the last element in the lane
435 if(emptied) {
436 depart( snzi, w );
437 }
438 #endif
439
440 // Unlock and return
441 __atomic_unlock(&lane.lock);
442
443 // Update statistics
444 #if !defined(__CFA_NO_STATISTICS__)
445 __tls_stats()->ready.pick.pop.success++;
446 #endif
447
448 // Update the thread bias
449 thrd->link.preferred = w / 4;
450
451 // return the popped thread
452 return thrd;
453}
454//-----------------------------------------------------------------------
455
456bool remove_head(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
457 for(i; lanes.count) {
458 __intrusive_lane_t & lane = lanes.data[i];
459
460 bool removed = false;
461
462 __atomic_acquire(&lane.lock);
463 if(head(lane)->link.next == thrd) {
464 $thread * pthrd;
465 pthrd = pop(lane);
466
467 /* paranoid */ verify( pthrd == thrd );
468
469 removed = true;
470 #ifdef USE_SNZI
471 if(emptied) {
472 depart( snzi, i );
473 }
474 #endif
475 }
476 __atomic_unlock(&lane.lock);
477
478 if( removed ) return true;
479 }
480 return false;
481}
482
483//-----------------------------------------------------------------------
484
485static void check( __ready_queue_t & q ) with (q) {
486 #if defined(__CFA_WITH_VERIFY__)
487 {
488 for( idx ; lanes.count ) {
489 __intrusive_lane_t & sl = lanes.data[idx];
490 assert(!lanes.data[idx].lock);
491
492 assert(head(sl)->link.prev == 0p );
493 assert(head(sl)->link.next->link.prev == head(sl) );
494 assert(tail(sl)->link.next == 0p );
495 assert(tail(sl)->link.prev->link.next == tail(sl) );
496
497 if(sl.before.link.ts == 0l) {
498 assert(tail(sl)->link.prev == head(sl));
499 assert(head(sl)->link.next == tail(sl));
500 } else {
501 assert(tail(sl)->link.prev != head(sl));
502 assert(head(sl)->link.next != tail(sl));
503 }
504 }
505 }
506 #endif
507}
508
509// Call this function of the intrusive list was moved using memcpy
510// fixes the list so that the pointers back to anchors aren't left dangling
511static inline void fix(__intrusive_lane_t & ll) {
512 // if the list is not empty then follow he pointer and fix its reverse
513 if(!is_empty(ll)) {
514 head(ll)->link.next->link.prev = head(ll);
515 tail(ll)->link.prev->link.next = tail(ll);
516 }
517 // Otherwise just reset the list
518 else {
519 verify(tail(ll)->link.next == 0p);
520 tail(ll)->link.prev = head(ll);
521 head(ll)->link.next = tail(ll);
522 verify(head(ll)->link.prev == 0p);
523 }
524}
525
526// Grow the ready queue
527void ready_queue_grow (struct cluster * cltr, int target) {
528 /* paranoid */ verify( ready_mutate_islocked() );
529 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
530
531 // Make sure that everything is consistent
532 /* paranoid */ check( cltr->ready_queue );
533
534 // grow the ready queue
535 with( cltr->ready_queue ) {
536 #ifdef USE_SNZI
537 ^(snzi){};
538 #endif
539
540 // Find new count
541 // Make sure we always have atleast 1 list
542 size_t ncount = target >= 2 ? target * 4: 1;
543
544 // Allocate new array (uses realloc and memcpies the data)
545 lanes.data = alloc(lanes.data, ncount);
546
547 // Fix the moved data
548 for( idx; (size_t)lanes.count ) {
549 fix(lanes.data[idx]);
550 }
551
552 // Construct new data
553 for( idx; (size_t)lanes.count ~ ncount) {
554 (lanes.data[idx]){};
555 }
556
557 // Update original
558 lanes.count = ncount;
559
560 #ifdef USE_SNZI
561 // Re-create the snzi
562 snzi{ log2( lanes.count / 8 ) };
563 for( idx; (size_t)lanes.count ) {
564 if( !is_empty(lanes.data[idx]) ) {
565 arrive(snzi, idx);
566 }
567 }
568 #endif
569 }
570
571 // Make sure that everything is consistent
572 /* paranoid */ check( cltr->ready_queue );
573
574 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
575
576 /* paranoid */ verify( ready_mutate_islocked() );
577}
578
579// Shrink the ready queue
580void ready_queue_shrink(struct cluster * cltr, int target) {
581 /* paranoid */ verify( ready_mutate_islocked() );
582 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
583
584 // Make sure that everything is consistent
585 /* paranoid */ check( cltr->ready_queue );
586
587 with( cltr->ready_queue ) {
588 #ifdef USE_SNZI
589 ^(snzi){};
590 #endif
591
592 // Remember old count
593 size_t ocount = lanes.count;
594
595 // Find new count
596 // Make sure we always have atleast 1 list
597 lanes.count = target >= 2 ? target * 4: 1;
598 /* paranoid */ verify( ocount >= lanes.count );
599 /* paranoid */ verify( lanes.count == target * 4 || target < 2 );
600
601 // for printing count the number of displaced threads
602 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
603 __attribute__((unused)) size_t displaced = 0;
604 #endif
605
606 // redistribute old data
607 for( idx; (size_t)lanes.count ~ ocount) {
608 // Lock is not strictly needed but makes checking invariants much easier
609 __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
610 verify(locked);
611
612 // As long as we can pop from this lane to push the threads somewhere else in the queue
613 while(!is_empty(lanes.data[idx])) {
614 struct $thread * thrd;
615 thrd = pop(lanes.data[idx]);
616
617 push(cltr, thrd);
618
619 // for printing count the number of displaced threads
620 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
621 displaced++;
622 #endif
623 }
624
625 // Unlock the lane
626 __atomic_unlock(&lanes.data[idx].lock);
627
628 // TODO print the queue statistics here
629
630 ^(lanes.data[idx]){};
631 }
632
633 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
634
635 // Allocate new array (uses realloc and memcpies the data)
636 lanes.data = alloc(lanes.data, lanes.count);
637
638 // Fix the moved data
639 for( idx; (size_t)lanes.count ) {
640 fix(lanes.data[idx]);
641 }
642
643 #ifdef USE_SNZI
644 // Re-create the snzi
645 snzi{ log2( lanes.count / 8 ) };
646 for( idx; (size_t)lanes.count ) {
647 if( !is_empty(lanes.data[idx]) ) {
648 arrive(snzi, idx);
649 }
650 }
651 #endif
652 }
653
654 // Make sure that everything is consistent
655 /* paranoid */ check( cltr->ready_queue );
656
657 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
658 /* paranoid */ verify( ready_mutate_islocked() );
659}
Note: See TracBrowser for help on using the repository browser.