source: libcfa/src/concurrency/ready_queue.cfa@ ff79d5e

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since ff79d5e was 9b1dcc2, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Changed scheduling API to adapt to non-Processors scheduling threads.

  • Property mode set to 100644
File size: 26.1 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
17// #define __CFA_DEBUG_PRINT_READY_QUEUE__
18
19#include "bits/defs.hfa"
20#include "kernel_private.hfa"
21
22#define _GNU_SOURCE
23#include "stdlib.hfa"
24#include "math.hfa"
25
26static const size_t cache_line_size = 64;
27
28// No overriden function, no environment variable, no define
29// fall back to a magic number
30#ifndef __CFA_MAX_PROCESSORS__
31 #define __CFA_MAX_PROCESSORS__ 1024
32#endif
33
34// returns the maximum number of processors the RWLock support
35__attribute__((weak)) unsigned __max_processors() {
36 const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
37 if(!max_cores_s) {
38 __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
39 return __CFA_MAX_PROCESSORS__;
40 }
41
42 char * endptr = 0p;
43 long int max_cores_l = strtol(max_cores_s, &endptr, 10);
44 if(max_cores_l < 1 || max_cores_l > 65535) {
45 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
46 return __CFA_MAX_PROCESSORS__;
47 }
48 if('\0' != *endptr) {
49 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
50 return __CFA_MAX_PROCESSORS__;
51 }
52
53 return max_cores_l;
54}
55
56//=======================================================================
57// Cluster wide reader-writer lock
58//=======================================================================
59void ?{}(__scheduler_RWLock_t & this) {
60 this.max = __max_processors();
61 this.alloc = 0;
62 this.ready = 0;
63 this.lock = false;
64 this.data = alloc(this.max);
65
66 /*paranoid*/ verify( 0 == (((uintptr_t)(this.data )) % 64) );
67 /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
68 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
69 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
70
71}
72void ^?{}(__scheduler_RWLock_t & this) {
73 free(this.data);
74}
75
76void ?{}( __scheduler_lock_id_t & this, __processor_id_t * proc ) {
77 this.handle = proc;
78 this.lock = false;
79}
80
81//=======================================================================
82// Lock-Free registering/unregistering of threads
83unsigned doregister( struct __processor_id_t * proc ) with(*__scheduler_lock) {
84 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
85
86 // Step - 1 : check if there is already space in the data
87 uint_fast32_t s = ready;
88
89 // Check among all the ready
90 for(uint_fast32_t i = 0; i < s; i++) {
91 __processor_id_t * null = 0p; // Re-write every loop since compare thrashes it
92 if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
93 && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
94 /*paranoid*/ verify(i < ready);
95 /*paranoid*/ verify(__alignof__(data[i]) == cache_line_size);
96 /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
97 return i;
98 }
99 }
100
101 if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock->max);
102
103 // Step - 2 : F&A to get a new spot in the array.
104 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
105 if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock->max);
106
107 // Step - 3 : Mark space as used and then publish it.
108 __scheduler_lock_id_t * storage = (__scheduler_lock_id_t *)&data[n];
109 (*storage){ proc };
110 while(true) {
111 unsigned copy = n;
112 if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
113 && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
114 break;
115 asm volatile("pause");
116 }
117
118 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);
119
120 // Return new spot.
121 /*paranoid*/ verify(n < ready);
122 /*paranoid*/ verify(__alignof__(data[n]) == cache_line_size);
123 /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
124 return n;
125}
126
127void unregister( struct __processor_id_t * proc ) with(*__scheduler_lock) {
128 unsigned id = proc->id;
129 /*paranoid*/ verify(id < ready);
130 /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
131 __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
132
133 __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
134}
135
136//-----------------------------------------------------------------------
137// Writer side : acquire when changing the ready queue, e.g. adding more
138// queues or removing them.
139uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
140 // Step 1 : lock global lock
141 // It is needed to avoid processors that register mid Critical-Section
142 // to simply lock their own lock and enter.
143 __atomic_acquire( &lock );
144
145 // Step 2 : lock per-proc lock
146 // Processors that are currently being registered aren't counted
147 // but can't be in read_lock or in the critical section.
148 // All other processors are counted
149 uint_fast32_t s = ready;
150 for(uint_fast32_t i = 0; i < s; i++) {
151 __atomic_acquire( &data[i].lock );
152 }
153
154 return s;
155}
156
157void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) {
158 // Step 1 : release local locks
159 // This must be done while the global lock is held to avoid
160 // threads that where created mid critical section
161 // to race to lock their local locks and have the writer
162 // immidiately unlock them
163 // Alternative solution : return s in write_lock and pass it to write_unlock
164 for(uint_fast32_t i = 0; i < last_s; i++) {
165 verify(data[i].lock);
166 __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
167 }
168
169 // Step 2 : release global lock
170 /*paranoid*/ assert(true == lock);
171 __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
172}
173
174//=======================================================================
175// Intrusive Queue used by ready queue
176//=======================================================================
177// Intrusives lanes which are used by the relaxed ready queue
178struct __attribute__((aligned(128))) __intrusive_lane_t {
179 // spin lock protecting the queue
180 volatile bool lock;
181
182 // anchor for the head and the tail of the queue
183 struct __sentinel_t {
184 // Link lists fields
185 // instrusive link field for threads
186 // must be exactly as in $thread
187 __thread_desc_link link;
188 } before, after;
189
190 // Optional statistic counters
191 #if !defined(__CFA_NO_SCHED_STATS__)
192 struct __attribute__((aligned(64))) {
193 // difference between number of push and pops
194 ssize_t diff;
195
196 // total number of pushes and pops
197 size_t push;
198 size_t pop ;
199 } stat;
200 #endif
201};
202
203void ?{}(__intrusive_lane_t & this);
204void ^?{}(__intrusive_lane_t & this);
205
206// Get the head pointer (one before the first element) from the anchor
207static inline $thread * head(const __intrusive_lane_t & this) {
208 $thread * rhead = ($thread *)(
209 (uintptr_t)( &this.before ) - offsetof( $thread, link )
210 );
211 /* paranoid */ verify(rhead);
212 return rhead;
213}
214
215// Get the tail pointer (one after the last element) from the anchor
216static inline $thread * tail(const __intrusive_lane_t & this) {
217 $thread * rtail = ($thread *)(
218 (uintptr_t)( &this.after ) - offsetof( $thread, link )
219 );
220 /* paranoid */ verify(rtail);
221 return rtail;
222}
223
224// Ctor
225void ?{}( __intrusive_lane_t & this ) {
226 this.lock = false;
227
228 this.before.link.prev = 0p;
229 this.before.link.next = tail(this);
230 this.before.link.ts = 0;
231
232 this.after .link.prev = head(this);
233 this.after .link.next = 0p;
234 this.after .link.ts = 0;
235
236 #if !defined(__CFA_NO_SCHED_STATS__)
237 this.stat.diff = 0;
238 this.stat.push = 0;
239 this.stat.pop = 0;
240 #endif
241
242 // We add a boat-load of assertions here because the anchor code is very fragile
243 /* paranoid */ verify(((uintptr_t)( head(this) ) + offsetof( $thread, link )) == (uintptr_t)(&this.before));
244 /* paranoid */ verify(((uintptr_t)( tail(this) ) + offsetof( $thread, link )) == (uintptr_t)(&this.after ));
245 /* paranoid */ verify(head(this)->link.prev == 0p );
246 /* paranoid */ verify(head(this)->link.next == tail(this) );
247 /* paranoid */ verify(tail(this)->link.next == 0p );
248 /* paranoid */ verify(tail(this)->link.prev == head(this) );
249 /* paranoid */ verify(&head(this)->link.prev == &this.before.link.prev );
250 /* paranoid */ verify(&head(this)->link.next == &this.before.link.next );
251 /* paranoid */ verify(&tail(this)->link.prev == &this.after .link.prev );
252 /* paranoid */ verify(&tail(this)->link.next == &this.after .link.next );
253 /* paranoid */ verify(sizeof(__intrusive_lane_t) == 128);
254 /* paranoid */ verify(sizeof(this) == 128);
255 /* paranoid */ verify(__alignof__(__intrusive_lane_t) == 128);
256 /* paranoid */ verify(__alignof__(this) == 128);
257 /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128));
258}
259
260// Dtor is trivial
261void ^?{}( __intrusive_lane_t & this ) {
262 // Make sure the list is empty
263 /* paranoid */ verify(head(this)->link.prev == 0p );
264 /* paranoid */ verify(head(this)->link.next == tail(this) );
265 /* paranoid */ verify(tail(this)->link.next == 0p );
266 /* paranoid */ verify(tail(this)->link.prev == head(this) );
267}
268
269// Push a thread onto this lane
270// returns true of lane was empty before push, false otherwise
271bool push(__intrusive_lane_t & this, $thread * node) {
272 #if defined(__CFA_WITH_VERIFY__)
273 /* paranoid */ verify(this.lock);
274 /* paranoid */ verify(node->link.ts != 0);
275 /* paranoid */ verify(node->link.next == 0p);
276 /* paranoid */ verify(node->link.prev == 0p);
277 /* paranoid */ verify(tail(this)->link.next == 0p);
278 /* paranoid */ verify(head(this)->link.prev == 0p);
279
280 if(this.before.link.ts == 0l) {
281 /* paranoid */ verify(tail(this)->link.prev == head(this));
282 /* paranoid */ verify(head(this)->link.next == tail(this));
283 } else {
284 /* paranoid */ verify(tail(this)->link.prev != head(this));
285 /* paranoid */ verify(head(this)->link.next != tail(this));
286 }
287 #endif
288
289 // Get the relevant nodes locally
290 $thread * tail = tail(this);
291 $thread * prev = tail->link.prev;
292
293 // Do the push
294 node->link.next = tail;
295 node->link.prev = prev;
296 prev->link.next = node;
297 tail->link.prev = node;
298
299 // Update stats
300 #if !defined(__CFA_NO_SCHED_STATS__)
301 this.stat.diff++;
302 this.stat.push++;
303 #endif
304
305 verify(node->link.next == tail(this));
306
307 // Check if the queue used to be empty
308 if(this.before.link.ts == 0l) {
309 this.before.link.ts = node->link.ts;
310 /* paranoid */ verify(node->link.prev == head(this));
311 return true;
312 }
313 return false;
314}
315
316// Pop a thread from this lane (must be non-empty)
317// returns popped
318// returns true of lane was empty before push, false otherwise
319[$thread *, bool] pop(__intrusive_lane_t & this) {
320 /* paranoid */ verify(this.lock);
321 /* paranoid */ verify(this.before.link.ts != 0ul);
322
323 // Get anchors locally
324 $thread * head = head(this);
325 $thread * tail = tail(this);
326
327 // Get the relevant nodes locally
328 $thread * node = head->link.next;
329 $thread * next = node->link.next;
330
331 /* paranoid */ verify(node != tail);
332 /* paranoid */ verify(node);
333
334 // Do the pop
335 head->link.next = next;
336 next->link.prev = head;
337 node->link.[next, prev] = 0p;
338
339 // Update head time stamp
340 this.before.link.ts = next->link.ts;
341
342 // Update stats
343 #ifndef __CFA_NO_SCHED_STATS__
344 this.stat.diff--;
345 this.stat.pop ++;
346 #endif
347
348 // Check if we emptied list and return accordingly
349 /* paranoid */ verify(tail(this)->link.next == 0p);
350 /* paranoid */ verify(head(this)->link.prev == 0p);
351 if(next == tail) {
352 /* paranoid */ verify(this.before.link.ts == 0);
353 /* paranoid */ verify(tail(this)->link.prev == head(this));
354 /* paranoid */ verify(head(this)->link.next == tail(this));
355 return [node, true];
356 }
357 else {
358 /* paranoid */ verify(next->link.ts != 0);
359 /* paranoid */ verify(tail(this)->link.prev != head(this));
360 /* paranoid */ verify(head(this)->link.next != tail(this));
361 /* paranoid */ verify(this.before.link.ts != 0);
362 return [node, false];
363 }
364}
365
366// Check whether or not list is empty
367static inline bool is_empty(__intrusive_lane_t & this) {
368 // Cannot verify here since it may not be locked
369 return this.before.link.ts == 0;
370}
371
372// Return the timestamp
373static inline unsigned long long ts(__intrusive_lane_t & this) {
374 // Cannot verify here since it may not be locked
375 return this.before.link.ts;
376}
377
378//=======================================================================
379// Scalable Non-Zero counter
380//=======================================================================
381
382union __snzi_val_t {
383 uint64_t _all;
384 struct __attribute__((packed)) {
385 char cnt;
386 uint64_t ver:56;
387 };
388};
389
390bool cas(volatile __snzi_val_t & self, __snzi_val_t & exp, char _cnt, uint64_t _ver) {
391 __snzi_val_t t;
392 t.ver = _ver;
393 t.cnt = _cnt;
394 /* paranoid */ verify(t._all == ((_ver << 8) | ((unsigned char)_cnt)));
395 return __atomic_compare_exchange_n(&self._all, &exp._all, t._all, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
396}
397
398bool cas(volatile __snzi_val_t & self, __snzi_val_t & exp, const __snzi_val_t & tar) {
399 return __atomic_compare_exchange_n(&self._all, &exp._all, tar._all, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
400}
401
402void ?{}( __snzi_val_t & this ) { this._all = 0; }
403void ?{}( __snzi_val_t & this, const volatile __snzi_val_t & o) { this._all = o._all; }
404
405struct __attribute__((aligned(128))) __snzi_node_t {
406 volatile __snzi_val_t value;
407 struct __snzi_node_t * parent;
408 bool is_root;
409};
410
411static inline void arrive( __snzi_node_t & );
412static inline void depart( __snzi_node_t & );
413
414#define __snzi_half -1
415
416//--------------------------------------------------
417// Root node
418static void arrive_r( __snzi_node_t & this ) {
419 /* paranoid */ verify( this.is_root );
420 __atomic_fetch_add(&this.value._all, 1, __ATOMIC_SEQ_CST);
421}
422
423static void depart_r( __snzi_node_t & this ) {
424 /* paranoid */ verify( this.is_root );
425 __atomic_fetch_sub(&this.value._all, 1, __ATOMIC_SEQ_CST);
426}
427
428//--------------------------------------------------
429// Hierarchical node
430static void arrive_h( __snzi_node_t & this ) {
431 int undoArr = 0;
432 bool success = false;
433 while(!success) {
434 __snzi_val_t x = { this.value };
435 /* paranoid */ verify(x.cnt <= 120);
436 if( x.cnt >= 1 ) {
437 if( cas( this.value, x, x.cnt + 1, x.ver ) ) {
438 success = true;
439 }
440 }
441 /* paranoid */ verify(x.cnt <= 120);
442 if( x.cnt == 0 ) {
443 if( cas( this.value, x, __snzi_half, x.ver + 1) ) {
444 success = true;
445 x.cnt = __snzi_half;
446 x.ver = x.ver + 1;
447 }
448 }
449 /* paranoid */ verify(x.cnt <= 120);
450 if( x.cnt == __snzi_half ) {
451 /* paranoid */ verify( this.parent);
452 arrive( *this.parent );
453 if( !cas( this.value, x, 1, x.ver) ) {
454 undoArr = undoArr + 1;
455 }
456 }
457 }
458
459 for(int i = 0; i < undoArr; i++) {
460 /* paranoid */ verify( this.parent );
461 depart( *this.parent );
462 }
463}
464
465static void depart_h( __snzi_node_t & this ) {
466 while(true) {
467 const __snzi_val_t x = { this.value };
468 /* paranoid */ verifyf(x.cnt >= 1, "%d", x.cnt);
469 if( cas( this.value, x, x.cnt - 1, x.ver ) ) {
470 if( x.cnt == 1 ) {
471 /* paranoid */ verify( this.parent );
472 depart( *this.parent );
473 }
474 return;
475 }
476 }
477}
478
479//--------------------------------------------------
480// All nodes
481static inline void arrive( __snzi_node_t & this ) {
482 if(this.is_root) arrive_r( this );
483 else arrive_h( this );
484}
485
486static inline void depart( __snzi_node_t & this ) {
487 if(this.is_root) depart_r( this );
488 else depart_h( this );
489}
490
491static inline bool query( __snzi_node_t & this ) {
492 /* paranoid */ verify( this.is_root );
493 return this.value._all > 0;
494}
495
496//--------------------------------------------------
497// SNZI object
498void ?{}( __snzi_t & this, unsigned depth ) with( this ) {
499 mask = (1 << depth) - 1;
500 root = (1 << (depth + 1)) - 2;
501 nodes = alloc( root + 1 );
502
503 int width = 1 << depth;
504 for(int i = 0; i < root; i++) {
505 nodes[i].value._all = 0;
506 nodes[i].parent = &nodes[(i / 2) + width ];
507 nodes[i].is_root = false;
508 }
509
510 nodes[ root ].value._all = 0;
511 nodes[ root ].parent = 0p;
512 nodes[ root ].is_root = true;
513}
514
515void ^?{}( __snzi_t & this ) {
516 free( this.nodes );
517}
518
519static inline void arrive( __snzi_t & this, int idx) {
520 idx &= this.mask;
521 arrive( this.nodes[idx] );
522}
523
524static inline void depart( __snzi_t & this, int idx) {
525 idx &= this.mask;
526 depart( this.nodes[idx] );
527}
528
529static inline bool query( const __snzi_t & this ) {
530 return query( this.nodes[ this.root ] );
531}
532
533//=======================================================================
534// Cforall Reqdy Queue used by ready queue
535//=======================================================================
536
537// Thread local mirror of ready queue statistics
538#if !defined(__CFA_NO_STATISTICS__)
539static __attribute__((aligned(128))) thread_local struct {
540 struct {
541 struct {
542 size_t attempt;
543 size_t success;
544 } push;
545 struct {
546 size_t maskrds;
547 size_t attempt;
548 size_t success;
549 } pop;
550 } pick;
551 struct {
552 size_t value;
553 size_t count;
554 } used;
555} tls = {
556 /* pick */{
557 /* push */{ 0, 0 },
558 /* pop */{ 0, 0, 0 },
559 },
560 /* used */{ 0, 0 }
561};
562#endif
563
564//-----------------------------------------------------------------------
565
566void ?{}(__ready_queue_t & this) with (this) {
567
568 lanes.data = alloc(4);
569 for( i; 4 ) {
570 (lanes.data[i]){};
571 }
572 lanes.count = 4;
573 snzi{ log2( lanes.count / 8 ) };
574
575 #if !defined(__CFA_NO_STATISTICS__)
576 global_stats.pick.push.attempt = 0;
577 global_stats.pick.push.success = 0;
578 global_stats.pick.pop .maskrds = 0;
579 global_stats.pick.pop .attempt = 0;
580 global_stats.pick.pop .success = 0;
581
582 global_stats.used.value = 0;
583 global_stats.used.count = 0;
584 #endif
585}
586
587void ^?{}(__ready_queue_t & this) with (this) {
588 verify( 4 == lanes.count );
589 verify( !query( snzi ) );
590
591 ^(snzi){};
592
593 for( i; 4 ) {
594 ^(lanes.data[i]){};
595 }
596 free(lanes.data);
597}
598
599//-----------------------------------------------------------------------
600__attribute__((hot)) bool push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
601 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
602
603 // write timestamp
604 thrd->link.ts = rdtscl();
605
606 // Try to pick a lane and lock it
607 unsigned i;
608 do {
609 // Pick the index of a lane
610 i = __tls_rand() % lanes.count;
611
612 #if !defined(__CFA_NO_STATISTICS__)
613 tls.pick.push.attempt++;
614 #endif
615
616 // If we can't lock it retry
617 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
618
619 bool first = false;
620
621 // Actually push it
622 bool lane_first = push(lanes.data[i], thrd);
623
624 // If this lane used to be empty we need to do more
625 if(lane_first) {
626 // Check if the entire queue used to be empty
627 first = !query(snzi);
628
629 // Update the snzi
630 arrive( snzi, i );
631 }
632
633 // Unlock and return
634 __atomic_unlock( &lanes.data[i].lock );
635
636 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
637
638 // Update statistics
639 #if !defined(__CFA_NO_STATISTICS__)
640 tls.pick.push.success++;
641 #endif
642
643 // return whether or not the list was empty before this push
644 return first;
645}
646
647//-----------------------------------------------------------------------
648// Given 2 indexes, pick the list with the oldest push an try to pop from it
649static struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
650 #if !defined(__CFA_NO_STATISTICS__)
651 tls.pick.pop.attempt++;
652 #endif
653
654 // Pick the bet list
655 int w = i;
656 if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
657 w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
658 }
659
660 // Get relevant elements locally
661 __intrusive_lane_t & lane = lanes.data[w];
662
663 // If list looks empty retry
664 if( is_empty(lane) ) return 0p;
665
666 // If we can't get the lock retry
667 if( !__atomic_try_acquire(&lane.lock) ) return 0p;
668
669
670 // If list is empty, unlock and retry
671 if( is_empty(lane) ) {
672 __atomic_unlock(&lane.lock);
673 return 0p;
674 }
675
676 // Actually pop the list
677 struct $thread * thrd;
678 bool emptied;
679 [thrd, emptied] = pop(lane);
680
681 /* paranoid */ verify(thrd);
682 /* paranoid */ verify(lane.lock);
683
684 // If this was the last element in the lane
685 if(emptied) {
686 depart( snzi, w );
687 }
688
689 // Unlock and return
690 __atomic_unlock(&lane.lock);
691
692 // Update statistics
693 #if !defined(__CFA_NO_STATISTICS__)
694 tls.pick.pop.success++;
695 #endif
696
697 // return the popped thread
698 return thrd;
699}
700
701// Pop from the ready queue from a given cluster
702__attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) {
703 /* paranoid */ verify( lanes.count > 0 );
704
705 // As long as the list is not empty, try finding a lane that isn't empty and pop from it
706 while( query(snzi) ) {
707 // Pick two lists at random
708 int i = __tls_rand() % __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
709 int j = __tls_rand() % __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
710
711 // try popping from the 2 picked lists
712 struct $thread * thrd = try_pop(cltr, i, j);
713 if(thrd) return thrd;
714 }
715
716 // All lanes where empty return 0p
717 return 0p;
718}
719
720//-----------------------------------------------------------------------
721
722static void check( __ready_queue_t & q ) with (q) {
723 #if defined(__CFA_WITH_VERIFY__)
724 {
725 for( idx ; lanes.count ) {
726 __intrusive_lane_t & sl = lanes.data[idx];
727 assert(!lanes.data[idx].lock);
728
729 assert(head(sl)->link.prev == 0p );
730 assert(head(sl)->link.next->link.prev == head(sl) );
731 assert(tail(sl)->link.next == 0p );
732 assert(tail(sl)->link.prev->link.next == tail(sl) );
733
734 if(sl.before.link.ts == 0l) {
735 assert(tail(sl)->link.prev == head(sl));
736 assert(head(sl)->link.next == tail(sl));
737 } else {
738 assert(tail(sl)->link.prev != head(sl));
739 assert(head(sl)->link.next != tail(sl));
740 }
741 }
742 }
743 #endif
744}
745
746// Call this function of the intrusive list was moved using memcpy
747// fixes the list so that the pointers back to anchors aren't left dangling
748static inline void fix(__intrusive_lane_t & ll) {
749 // if the list is not empty then follow he pointer and fix its reverse
750 if(!is_empty(ll)) {
751 head(ll)->link.next->link.prev = head(ll);
752 tail(ll)->link.prev->link.next = tail(ll);
753 }
754 // Otherwise just reset the list
755 else {
756 verify(tail(ll)->link.next == 0p);
757 tail(ll)->link.prev = head(ll);
758 head(ll)->link.next = tail(ll);
759 verify(head(ll)->link.prev == 0p);
760 }
761}
762
763// Grow the ready queue
764void ready_queue_grow (struct cluster * cltr) {
765 // Lock the RWlock so no-one pushes/pops while we are changing the queue
766 uint_fast32_t last_size = ready_mutate_lock();
767
768 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
769
770 // Make sure that everything is consistent
771 /* paranoid */ check( cltr->ready_queue );
772
773 // grow the ready queue
774 with( cltr->ready_queue ) {
775 ^(snzi){};
776
777 size_t ncount = lanes.count;
778
779 // increase count
780 ncount += 4;
781
782 // Allocate new array (uses realloc and memcpies the data)
783 lanes.data = alloc(lanes.data, ncount);
784
785 // Fix the moved data
786 for( idx; (size_t)lanes.count ) {
787 fix(lanes.data[idx]);
788 }
789
790 // Construct new data
791 for( idx; (size_t)lanes.count ~ ncount) {
792 (lanes.data[idx]){};
793 }
794
795 // Update original
796 lanes.count = ncount;
797
798 // Re-create the snzi
799 snzi{ log2( lanes.count / 8 ) };
800 for( idx; (size_t)lanes.count ) {
801 if( !is_empty(lanes.data[idx]) ) {
802 arrive(snzi, idx);
803 }
804 }
805 }
806
807 // Make sure that everything is consistent
808 /* paranoid */ check( cltr->ready_queue );
809
810 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
811
812 // Unlock the RWlock
813 ready_mutate_unlock( last_size );
814}
815
816// Shrink the ready queue
817void ready_queue_shrink(struct cluster * cltr) {
818 // Lock the RWlock so no-one pushes/pops while we are changing the queue
819 uint_fast32_t last_size = ready_mutate_lock();
820
821 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
822
823 // Make sure that everything is consistent
824 /* paranoid */ check( cltr->ready_queue );
825
826 with( cltr->ready_queue ) {
827 ^(snzi){};
828
829 size_t ocount = lanes.count;
830 // Check that we have some space left
831 if(ocount < 8) abort("Program attempted to destroy more Ready Queues than were created");
832
833 // reduce the actual count so push doesn't use the old queues
834 lanes.count -= 4;
835 verify(ocount > lanes.count);
836
837 // for printing count the number of displaced threads
838 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
839 __attribute__((unused)) size_t displaced = 0;
840 #endif
841
842 // redistribute old data
843 for( idx; (size_t)lanes.count ~ ocount) {
844 // Lock is not strictly needed but makes checking invariants much easier
845 __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
846 verify(locked);
847
848 // As long as we can pop from this lane to push the threads somewhere else in the queue
849 while(!is_empty(lanes.data[idx])) {
850 struct $thread * thrd;
851 __attribute__((unused)) bool _;
852 [thrd, _] = pop(lanes.data[idx]);
853
854 push(cltr, thrd);
855
856 // for printing count the number of displaced threads
857 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
858 displaced++;
859 #endif
860 }
861
862 // Unlock the lane
863 __atomic_unlock(&lanes.data[idx].lock);
864
865 // TODO print the queue statistics here
866
867 ^(lanes.data[idx]){};
868 }
869
870 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
871
872 // Allocate new array (uses realloc and memcpies the data)
873 lanes.data = alloc(lanes.data, lanes.count);
874
875 // Fix the moved data
876 for( idx; (size_t)lanes.count ) {
877 fix(lanes.data[idx]);
878 }
879
880 // Re-create the snzi
881 snzi{ log2( lanes.count / 8 ) };
882 for( idx; (size_t)lanes.count ) {
883 if( !is_empty(lanes.data[idx]) ) {
884 arrive(snzi, idx);
885 }
886 }
887 }
888
889 // Make sure that everything is consistent
890 /* paranoid */ check( cltr->ready_queue );
891
892 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
893
894 // Unlock the RWlock
895 ready_mutate_unlock( last_size );
896}
897
898//-----------------------------------------------------------------------
899
900#if !defined(__CFA_NO_STATISTICS__)
901void stats_tls_tally(struct cluster * cltr) with (cltr->ready_queue) {
902 __atomic_fetch_add( &global_stats.pick.push.attempt, tls.pick.push.attempt, __ATOMIC_SEQ_CST );
903 __atomic_fetch_add( &global_stats.pick.push.success, tls.pick.push.success, __ATOMIC_SEQ_CST );
904 __atomic_fetch_add( &global_stats.pick.pop .maskrds, tls.pick.pop .maskrds, __ATOMIC_SEQ_CST );
905 __atomic_fetch_add( &global_stats.pick.pop .attempt, tls.pick.pop .attempt, __ATOMIC_SEQ_CST );
906 __atomic_fetch_add( &global_stats.pick.pop .success, tls.pick.pop .success, __ATOMIC_SEQ_CST );
907
908 __atomic_fetch_add( &global_stats.used.value, tls.used.value, __ATOMIC_SEQ_CST );
909 __atomic_fetch_add( &global_stats.used.count, tls.used.count, __ATOMIC_SEQ_CST );
910}
911#endif
Note: See TracBrowser for help on using the repository browser.