source: libcfa/src/concurrency/ready_queue.cfa@ b388ee81

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since b388ee81 was b388ee81, checked in by Thierry Delisle <tdelisle@…>, 6 years ago

Changed ready RW-Lock to be a single global lock instead of per cluster.
This was needed because otherwise, processors outside the cluster could not schedule threads.

  • Property mode set to 100644
File size: 28.0 KB
RevLine 
[7768b8d]1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
[1b143de]17// #define __CFA_DEBUG_PRINT_READY_QUEUE__
[7768b8d]18
19#include "bits/defs.hfa"
20#include "kernel_private.hfa"
21
22#define _GNU_SOURCE
23#include "stdlib.hfa"
[61d7bec]24#include "math.hfa"
[7768b8d]25
26static const size_t cache_line_size = 64;
27
[dca5802]28// No overriden function, no environment variable, no define
29// fall back to a magic number
30#ifndef __CFA_MAX_PROCESSORS__
[b388ee81]31 #define __CFA_MAX_PROCESSORS__ 1024
[dca5802]32#endif
[7768b8d]33
[dca5802]34// returns the maximum number of processors the RWLock support
[7768b8d]35__attribute__((weak)) unsigned __max_processors() {
36 const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
37 if(!max_cores_s) {
[504a7dc]38 __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
[dca5802]39 return __CFA_MAX_PROCESSORS__;
[7768b8d]40 }
41
42 char * endptr = 0p;
43 long int max_cores_l = strtol(max_cores_s, &endptr, 10);
44 if(max_cores_l < 1 || max_cores_l > 65535) {
[504a7dc]45 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
[dca5802]46 return __CFA_MAX_PROCESSORS__;
[7768b8d]47 }
48 if('\0' != *endptr) {
[504a7dc]49 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
[dca5802]50 return __CFA_MAX_PROCESSORS__;
[7768b8d]51 }
52
53 return max_cores_l;
54}
55
56//=======================================================================
57// Cluster wide reader-writer lock
58//=======================================================================
[b388ee81]59void ?{}(__scheduler_RWLock_t & this) {
[7768b8d]60 this.max = __max_processors();
61 this.alloc = 0;
62 this.ready = 0;
63 this.lock = false;
64 this.data = alloc(this.max);
65
66 /*paranoid*/ verify( 0 == (((uintptr_t)(this.data )) % 64) );
67 /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
68 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
69 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
70
71}
[b388ee81]72void ^?{}(__scheduler_RWLock_t & this) {
[7768b8d]73 free(this.data);
74}
75
76void ?{}( __processor_id & this, struct processor * proc ) {
77 this.handle = proc;
78 this.lock = false;
79}
80
81//=======================================================================
82// Lock-Free registering/unregistering of threads
[b388ee81]83unsigned doregister( struct processor * proc ) with(*__scheduler_lock) {
84 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
[504a7dc]85
[7768b8d]86 // Step - 1 : check if there is already space in the data
87 uint_fast32_t s = ready;
88
89 // Check among all the ready
90 for(uint_fast32_t i = 0; i < s; i++) {
91 processor * null = 0p; // Re-write every loop since compare thrashes it
92 if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
93 && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
94 /*paranoid*/ verify(i < ready);
95 /*paranoid*/ verify(__alignof__(data[i]) == cache_line_size);
96 /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
97 return i;
98 }
99 }
100
[b388ee81]101 if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock->max);
[7768b8d]102
103 // Step - 2 : F&A to get a new spot in the array.
104 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
[b388ee81]105 if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock->max);
[7768b8d]106
107 // Step - 3 : Mark space as used and then publish it.
108 __processor_id * storage = (__processor_id *)&data[n];
109 (*storage){ proc };
110 while(true) {
111 unsigned copy = n;
112 if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
113 && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
114 break;
115 asm volatile("pause");
116 }
117
[1b143de]118 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);
[504a7dc]119
[7768b8d]120 // Return new spot.
121 /*paranoid*/ verify(n < ready);
122 /*paranoid*/ verify(__alignof__(data[n]) == cache_line_size);
123 /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
124 return n;
125}
126
[b388ee81]127void unregister( struct processor * proc ) with(*__scheduler_lock) {
[7768b8d]128 unsigned id = proc->id;
129 /*paranoid*/ verify(id < ready);
130 /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
131 __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
[504a7dc]132
133 __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
[7768b8d]134}
135
136//-----------------------------------------------------------------------
137// Writer side : acquire when changing the ready queue, e.g. adding more
138// queues or removing them.
[b388ee81]139uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
[7768b8d]140 // Step 1 : lock global lock
141 // It is needed to avoid processors that register mid Critical-Section
142 // to simply lock their own lock and enter.
143 __atomic_acquire( &lock );
144
145 // Step 2 : lock per-proc lock
146 // Processors that are currently being registered aren't counted
147 // but can't be in read_lock or in the critical section.
148 // All other processors are counted
149 uint_fast32_t s = ready;
150 for(uint_fast32_t i = 0; i < s; i++) {
151 __atomic_acquire( &data[i].lock );
152 }
153
154 return s;
155}
156
[b388ee81]157void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) {
[7768b8d]158 // Step 1 : release local locks
159 // This must be done while the global lock is held to avoid
160 // threads that where created mid critical section
161 // to race to lock their local locks and have the writer
162 // immidiately unlock them
163 // Alternative solution : return s in write_lock and pass it to write_unlock
164 for(uint_fast32_t i = 0; i < last_s; i++) {
165 verify(data[i].lock);
166 __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
167 }
168
169 // Step 2 : release global lock
170 /*paranoid*/ assert(true == lock);
171 __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
172}
173
174//=======================================================================
175// Intrusive Queue used by ready queue
176//=======================================================================
[61d7bec]177// Intrusives lanes which are used by the relaxed ready queue
178struct __attribute__((aligned(128))) __intrusive_lane_t {
179 // spin lock protecting the queue
180 volatile bool lock;
181
182 // anchor for the head and the tail of the queue
183 struct __sentinel_t {
184 // Link lists fields
185 // instrusive link field for threads
186 // must be exactly as in $thread
187 __thread_desc_link link;
188 } before, after;
189
190#if defined(__CFA_WITH_VERIFY__)
191 // id of last processor to acquire the lock
192 // needed only to check for mutual exclusion violations
193 unsigned int last_id;
194
195 // number of items on this list
196 // needed only to check for deadlocks
197 unsigned int count;
198#endif
199
200 // Optional statistic counters
201 #if !defined(__CFA_NO_SCHED_STATS__)
202 struct __attribute__((aligned(64))) {
203 // difference between number of push and pops
204 ssize_t diff;
205
206 // total number of pushes and pops
207 size_t push;
208 size_t pop ;
209 } stat;
210 #endif
211};
212
213void ?{}(__intrusive_lane_t & this);
214void ^?{}(__intrusive_lane_t & this);
215
[7768b8d]216// Get the head pointer (one before the first element) from the anchor
[504a7dc]217static inline $thread * head(const __intrusive_lane_t & this) {
218 $thread * rhead = ($thread *)(
219 (uintptr_t)( &this.before ) - offsetof( $thread, link )
[7768b8d]220 );
221 /* paranoid */ verify(rhead);
222 return rhead;
223}
224
225// Get the tail pointer (one after the last element) from the anchor
[504a7dc]226static inline $thread * tail(const __intrusive_lane_t & this) {
227 $thread * rtail = ($thread *)(
228 (uintptr_t)( &this.after ) - offsetof( $thread, link )
[7768b8d]229 );
230 /* paranoid */ verify(rtail);
231 return rtail;
232}
233
234// Ctor
[dca5802]235void ?{}( __intrusive_lane_t & this ) {
[b798713]236 this.lock = false;
[504a7dc]237 #if defined(__CFA_WITH_VERIFY__)
238 this.last_id = -1u;
239 this.count = 0u;
240 #endif
[b798713]241
242 this.before.link.prev = 0p;
243 this.before.link.next = tail(this);
244 this.before.link.ts = 0;
[7768b8d]245
[b798713]246 this.after .link.prev = head(this);
247 this.after .link.next = 0p;
248 this.after .link.ts = 0;
249
250 #if !defined(__CFA_NO_SCHED_STATS__)
251 this.stat.diff = 0;
252 this.stat.push = 0;
253 this.stat.pop = 0;
254 #endif
[7768b8d]255
256 // We add a boat-load of assertions here because the anchor code is very fragile
[504a7dc]257 /* paranoid */ verify(((uintptr_t)( head(this) ) + offsetof( $thread, link )) == (uintptr_t)(&this.before));
258 /* paranoid */ verify(((uintptr_t)( tail(this) ) + offsetof( $thread, link )) == (uintptr_t)(&this.after ));
[b798713]259 /* paranoid */ verify(head(this)->link.prev == 0p );
260 /* paranoid */ verify(head(this)->link.next == tail(this) );
261 /* paranoid */ verify(tail(this)->link.next == 0p );
262 /* paranoid */ verify(tail(this)->link.prev == head(this) );
263 /* paranoid */ verify(&head(this)->link.prev == &this.before.link.prev );
264 /* paranoid */ verify(&head(this)->link.next == &this.before.link.next );
265 /* paranoid */ verify(&tail(this)->link.prev == &this.after .link.prev );
266 /* paranoid */ verify(&tail(this)->link.next == &this.after .link.next );
[dca5802]267 /* paranoid */ verify(sizeof(__intrusive_lane_t) == 128);
[7768b8d]268 /* paranoid */ verify(sizeof(this) == 128);
[dca5802]269 /* paranoid */ verify(__alignof__(__intrusive_lane_t) == 128);
[7768b8d]270 /* paranoid */ verify(__alignof__(this) == 128);
271 /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128));
272}
273
274// Dtor is trivial
[dca5802]275void ^?{}( __intrusive_lane_t & this ) {
[7768b8d]276 // Make sure the list is empty
[b798713]277 /* paranoid */ verify(head(this)->link.prev == 0p );
278 /* paranoid */ verify(head(this)->link.next == tail(this) );
279 /* paranoid */ verify(tail(this)->link.next == 0p );
280 /* paranoid */ verify(tail(this)->link.prev == head(this) );
[c84b4be]281 /* paranoid */ verify(this.count == 0u );
[7768b8d]282}
283
[dca5802]284// Push a thread onto this lane
285// returns true of lane was empty before push, false otherwise
[504a7dc]286bool push(__intrusive_lane_t & this, $thread * node) {
[dca5802]287 #if defined(__CFA_WITH_VERIFY__)
288 /* paranoid */ verify(this.lock);
289 /* paranoid */ verify(node->link.ts != 0);
290 /* paranoid */ verify(node->link.next == 0p);
291 /* paranoid */ verify(node->link.prev == 0p);
[504a7dc]292 /* paranoid */ verify(tail(this)->link.next == 0p);
293 /* paranoid */ verify(head(this)->link.prev == 0p);
[dca5802]294
295 this.count++;
296
297 if(this.before.link.ts == 0l) {
298 /* paranoid */ verify(tail(this)->link.prev == head(this));
299 /* paranoid */ verify(head(this)->link.next == tail(this));
[504a7dc]300 } else {
301 /* paranoid */ verify(tail(this)->link.prev != head(this));
302 /* paranoid */ verify(head(this)->link.next != tail(this));
[dca5802]303 }
304 #endif
[7768b8d]305
306 // Get the relevant nodes locally
[504a7dc]307 $thread * tail = tail(this);
308 $thread * prev = tail->link.prev;
[7768b8d]309
310 // Do the push
[b798713]311 node->link.next = tail;
312 node->link.prev = prev;
313 prev->link.next = node;
314 tail->link.prev = node;
[7768b8d]315
316 // Update stats
[dca5802]317 #if !defined(__CFA_NO_SCHED_STATS__)
[7768b8d]318 this.stat.diff++;
319 this.stat.push++;
320 #endif
321
[b798713]322 verify(node->link.next == tail(this));
323
[7768b8d]324 // Check if the queue used to be empty
[b798713]325 if(this.before.link.ts == 0l) {
326 this.before.link.ts = node->link.ts;
[dca5802]327 /* paranoid */ verify(node->link.prev == head(this));
[7768b8d]328 return true;
329 }
330 return false;
331}
332
[dca5802]333// Pop a thread from this lane (must be non-empty)
334// returns popped
335// returns true of lane was empty before push, false otherwise
[504a7dc]336[$thread *, bool] pop(__intrusive_lane_t & this) {
[dca5802]337 /* paranoid */ verify(this.lock);
338 /* paranoid */ verify(this.before.link.ts != 0ul);
339
340 // Get anchors locally
[504a7dc]341 $thread * head = head(this);
342 $thread * tail = tail(this);
[7768b8d]343
[dca5802]344 // Get the relevant nodes locally
[504a7dc]345 $thread * node = head->link.next;
346 $thread * next = node->link.next;
[7768b8d]347
[dca5802]348 #if defined(__CFA_WITH_VERIFY__)
349 this.count--;
350 /* paranoid */ verify(node != tail);
351 /* paranoid */ verify(node);
352 #endif
[7768b8d]353
[dca5802]354 // Do the pop
[b798713]355 head->link.next = next;
356 next->link.prev = head;
[dca5802]357 node->link.[next, prev] = 0p;
358
359 // Update head time stamp
360 this.before.link.ts = next->link.ts;
[7768b8d]361
[dca5802]362 // Update stats
[7768b8d]363 #ifndef __CFA_NO_SCHED_STATS__
364 this.stat.diff--;
365 this.stat.pop ++;
366 #endif
367
[dca5802]368 // Check if we emptied list and return accordingly
[504a7dc]369 /* paranoid */ verify(tail(this)->link.next == 0p);
370 /* paranoid */ verify(head(this)->link.prev == 0p);
[7768b8d]371 if(next == tail) {
[dca5802]372 /* paranoid */ verify(this.before.link.ts == 0);
373 /* paranoid */ verify(tail(this)->link.prev == head(this));
374 /* paranoid */ verify(head(this)->link.next == tail(this));
[7768b8d]375 return [node, true];
376 }
377 else {
[dca5802]378 /* paranoid */ verify(next->link.ts != 0);
[504a7dc]379 /* paranoid */ verify(tail(this)->link.prev != head(this));
380 /* paranoid */ verify(head(this)->link.next != tail(this));
[dca5802]381 /* paranoid */ verify(this.before.link.ts != 0);
[7768b8d]382 return [node, false];
383 }
384}
385
[dca5802]386// Check whether or not list is empty
387static inline bool is_empty(__intrusive_lane_t & this) {
[1b143de]388 // Cannot verify here since it may not be locked
[dca5802]389 return this.before.link.ts == 0;
390}
391
392// Return the timestamp
393static inline unsigned long long ts(__intrusive_lane_t & this) {
[1b143de]394 // Cannot verify here since it may not be locked
[b798713]395 return this.before.link.ts;
396}
397
[61d7bec]398//=======================================================================
399// Scalable Non-Zero counter
400//=======================================================================
401
402union __snzi_val_t {
403 uint64_t _all;
404 struct __attribute__((packed)) {
405 char cnt;
406 uint64_t ver:56;
407 };
408};
409
410bool cas(volatile __snzi_val_t & self, __snzi_val_t & exp, char _cnt, uint64_t _ver) {
411 __snzi_val_t t;
412 t.ver = _ver;
413 t.cnt = _cnt;
414 /* paranoid */ verify(t._all == ((_ver << 8) | ((unsigned char)_cnt)));
415 return __atomic_compare_exchange_n(&self._all, &exp._all, t._all, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
416}
417
418bool cas(volatile __snzi_val_t & self, __snzi_val_t & exp, const __snzi_val_t & tar) {
419 return __atomic_compare_exchange_n(&self._all, &exp._all, tar._all, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
420}
421
422void ?{}( __snzi_val_t & this ) { this._all = 0; }
423void ?{}( __snzi_val_t & this, const volatile __snzi_val_t & o) { this._all = o._all; }
424
425struct __attribute__((aligned(128))) __snzi_node_t {
426 volatile __snzi_val_t value;
427 struct __snzi_node_t * parent;
428 bool is_root;
429};
430
431static inline void arrive( __snzi_node_t & );
432static inline void depart( __snzi_node_t & );
433
434#define __snzi_half -1
435
436//--------------------------------------------------
437// Root node
438static void arrive_r( __snzi_node_t & this ) {
439 /* paranoid */ verify( this.is_root );
440 __atomic_fetch_add(&this.value._all, 1, __ATOMIC_SEQ_CST);
441}
442
443static void depart_r( __snzi_node_t & this ) {
444 /* paranoid */ verify( this.is_root );
445 __atomic_fetch_sub(&this.value._all, 1, __ATOMIC_SEQ_CST);
446}
447
448//--------------------------------------------------
449// Hierarchical node
450static void arrive_h( __snzi_node_t & this ) {
451 int undoArr = 0;
452 bool success = false;
453 while(!success) {
454 __snzi_val_t x = { this.value };
455 /* paranoid */ verify(x.cnt <= 120);
456 if( x.cnt >= 1 ) {
457 if( cas( this.value, x, x.cnt + 1, x.ver ) ) {
458 success = true;
459 }
460 }
461 /* paranoid */ verify(x.cnt <= 120);
462 if( x.cnt == 0 ) {
463 if( cas( this.value, x, __snzi_half, x.ver + 1) ) {
464 success = true;
465 x.cnt = __snzi_half;
466 x.ver = x.ver + 1;
467 }
468 }
469 /* paranoid */ verify(x.cnt <= 120);
470 if( x.cnt == __snzi_half ) {
471 /* paranoid */ verify( this.parent);
472 arrive( *this.parent );
473 if( !cas( this.value, x, 1, x.ver) ) {
474 undoArr = undoArr + 1;
475 }
476 }
477 }
478
479 for(int i = 0; i < undoArr; i++) {
480 /* paranoid */ verify( this.parent );
481 depart( *this.parent );
482 }
483}
484
485static void depart_h( __snzi_node_t & this ) {
486 while(true) {
487 const __snzi_val_t x = { this.value };
488 /* paranoid */ verifyf(x.cnt >= 1, "%d", x.cnt);
489 if( cas( this.value, x, x.cnt - 1, x.ver ) ) {
490 if( x.cnt == 1 ) {
491 /* paranoid */ verify( this.parent );
492 depart( *this.parent );
493 }
494 return;
495 }
496 }
497}
498
499//--------------------------------------------------
500// All nodes
501static inline void arrive( __snzi_node_t & this ) {
502 if(this.is_root) arrive_r( this );
503 else arrive_h( this );
504}
505
506static inline void depart( __snzi_node_t & this ) {
507 if(this.is_root) depart_r( this );
508 else depart_h( this );
509}
510
511static inline bool query( __snzi_node_t & this ) {
512 /* paranoid */ verify( this.is_root );
513 return this.value._all > 0;
514}
515
516//--------------------------------------------------
517// SNZI object
518void ?{}( __snzi_t & this, unsigned depth ) with( this ) {
519 mask = (1 << depth) - 1;
520 root = (1 << (depth + 1)) - 2;
521 nodes = alloc( root + 1 );
522
523 int width = 1 << depth;
524 for(int i = 0; i < root; i++) {
525 nodes[i].value._all = 0;
526 nodes[i].parent = &nodes[(i / 2) + width ];
527 nodes[i].is_root = false;
528 }
529
530 nodes[ root ].value._all = 0;
531 nodes[ root ].parent = 0p;
532 nodes[ root ].is_root = true;
533}
534
535void ^?{}( __snzi_t & this ) {
536 free( this.nodes );
537}
538
539static inline void arrive( __snzi_t & this, int idx) {
540 idx &= this.mask;
541 arrive( this.nodes[idx] );
542}
543
544static inline void depart( __snzi_t & this, int idx) {
545 idx &= this.mask;
546 depart( this.nodes[idx] );
547}
548
549static inline bool query( const __snzi_t & this ) {
550 return query( this.nodes[ this.root ] );
551}
552
[b798713]553//=======================================================================
554// Cforall Reqdy Queue used by ready queue
555//=======================================================================
556
[dca5802]557// Thread local mirror of ready queue statistics
558#if !defined(__CFA_NO_STATISTICS__)
[b798713]559static __attribute__((aligned(128))) thread_local struct {
560 struct {
561 struct {
562 size_t attempt;
563 size_t success;
564 } push;
565 struct {
566 size_t maskrds;
567 size_t attempt;
568 size_t success;
569 } pop;
570 } pick;
571 struct {
572 size_t value;
573 size_t count;
[dca5802]574 } used;
[b798713]575} tls = {
576 /* pick */{
577 /* push */{ 0, 0 },
578 /* pop */{ 0, 0, 0 },
579 },
[dca5802]580 /* used */{ 0, 0 }
[b798713]581};
[dca5802]582#endif
[b798713]583
584//-----------------------------------------------------------------------
585
586void ?{}(__ready_queue_t & this) with (this) {
587
[dca5802]588 lanes.data = alloc(4);
[b798713]589 for( i; 4 ) {
[dca5802]590 (lanes.data[i]){};
[b798713]591 }
[dca5802]592 lanes.count = 4;
[61d7bec]593 snzi{ log2( lanes.count / 8 ) };
[b798713]594
595 #if !defined(__CFA_NO_STATISTICS__)
596 global_stats.pick.push.attempt = 0;
597 global_stats.pick.push.success = 0;
598 global_stats.pick.pop .maskrds = 0;
599 global_stats.pick.pop .attempt = 0;
600 global_stats.pick.pop .success = 0;
601
[dca5802]602 global_stats.used.value = 0;
603 global_stats.used.count = 0;
[b798713]604 #endif
605}
606
607void ^?{}(__ready_queue_t & this) with (this) {
[dca5802]608 verify( 4 == lanes.count );
[61d7bec]609 verify( !query( snzi ) );
610
611 ^(snzi){};
[b798713]612
613 for( i; 4 ) {
[dca5802]614 ^(lanes.data[i]){};
[b798713]615 }
[dca5802]616 free(lanes.data);
617}
618
619//-----------------------------------------------------------------------
[504a7dc]620__attribute__((hot)) bool push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
[61d7bec]621 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
[1b143de]622
[dca5802]623 // write timestamp
[b798713]624 thrd->link.ts = rdtscl();
625
[dca5802]626 // Try to pick a lane and lock it
627 unsigned i;
628 do {
629 // Pick the index of a lane
[504a7dc]630 i = __tls_rand() % lanes.count;
[b798713]631
632 #if !defined(__CFA_NO_STATISTICS__)
633 tls.pick.push.attempt++;
634 #endif
635
636 // If we can't lock it retry
[dca5802]637 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
[b798713]638
[dca5802]639 #if defined(__CFA_WITH_VERIFY__)
640 /* paranoid */ verify(lanes.data[i].last_id == -1u);
641 /* paranoid */ lanes.data[i].last_id = kernelTLS.this_processor->id;
642 #endif
[b798713]643
[dca5802]644 bool first = false;
645
646 // Actually push it
647 bool lane_first = push(lanes.data[i], thrd);
648
649 // If this lane used to be empty we need to do more
650 if(lane_first) {
[504a7dc]651 // Check if the entire queue used to be empty
[61d7bec]652 first = !query(snzi);
653
654 // Update the snzi
655 arrive( snzi, i );
[b798713]656 }
[dca5802]657
658 #if defined(__CFA_WITH_VERIFY__)
659 /* paranoid */ verifyf( lanes.data[i].last_id == kernelTLS.this_processor->id, "Expected last processor to lock queue %u to be %u, was %u\n", i, lanes.data[i].last_id, kernelTLS.this_processor->id );
660 /* paranoid */ verifyf( lanes.data[i].lock, "List %u is not locked\n", i );
661 /* paranoid */ lanes.data[i].last_id = -1u;
662 #endif
663
664 // Unlock and return
665 __atomic_unlock( &lanes.data[i].lock );
666
[1b143de]667 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
668
[dca5802]669 // Update statistics
670 #if !defined(__CFA_NO_STATISTICS__)
671 tls.pick.push.success++;
672 #endif
673
674 // return whether or not the list was empty before this push
675 return first;
[b798713]676}
677
678//-----------------------------------------------------------------------
[dca5802]679// Given 2 indexes, pick the list with the oldest push an try to pop from it
[504a7dc]680static struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
[b798713]681 #if !defined(__CFA_NO_STATISTICS__)
682 tls.pick.pop.attempt++;
683 #endif
684
685 // Pick the bet list
686 int w = i;
[dca5802]687 if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
688 w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
[b798713]689 }
690
[dca5802]691 // Get relevant elements locally
692 __intrusive_lane_t & lane = lanes.data[w];
693
[b798713]694 // If list looks empty retry
[dca5802]695 if( is_empty(lane) ) return 0p;
[b798713]696
697 // If we can't get the lock retry
[dca5802]698 if( !__atomic_try_acquire(&lane.lock) ) return 0p;
[b798713]699
[dca5802]700 #if defined(__CFA_WITH_VERIFY__)
701 /* paranoid */ verify(lane.last_id == -1u);
702 /* paranoid */ lane.last_id = kernelTLS.this_processor->id;
703 #endif
[b798713]704
705
706 // If list is empty, unlock and retry
[dca5802]707 if( is_empty(lane) ) {
708 #if defined(__CFA_WITH_VERIFY__)
709 /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id);
710 /* paranoid */ lane.last_id = -1u;
711 #endif
712
713 __atomic_unlock(&lane.lock);
[b798713]714 return 0p;
715 }
716
717 // Actually pop the list
[504a7dc]718 struct $thread * thrd;
[b798713]719 bool emptied;
[dca5802]720 [thrd, emptied] = pop(lane);
[b798713]721
[dca5802]722 /* paranoid */ verify(thrd);
723 /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id);
724 /* paranoid */ verify(lane.lock);
[b798713]725
[dca5802]726 // If this was the last element in the lane
[b798713]727 if(emptied) {
[61d7bec]728 depart( snzi, w );
[b798713]729 }
730
[dca5802]731 #if defined(__CFA_WITH_VERIFY__)
732 /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id);
733 /* paranoid */ lane.last_id = -1u;
734 #endif
735
[b798713]736 // Unlock and return
[dca5802]737 __atomic_unlock(&lane.lock);
[b798713]738
[dca5802]739 // Update statistics
[b798713]740 #if !defined(__CFA_NO_STATISTICS__)
741 tls.pick.pop.success++;
742 #endif
743
[dca5802]744 // return the popped thread
[b798713]745 return thrd;
746}
747
[dca5802]748// Pop from the ready queue from a given cluster
[504a7dc]749__attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) {
[dca5802]750 /* paranoid */ verify( lanes.count > 0 );
[b798713]751
[dca5802]752 // As long as the list is not empty, try finding a lane that isn't empty and pop from it
[61d7bec]753 while( query(snzi) ) {
754 // Pick two lists at random
755 int i = __tls_rand() % __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
756 int j = __tls_rand() % __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
757
758 // try popping from the 2 picked lists
759 struct $thread * thrd = try_pop(cltr, i, j);
760 if(thrd) return thrd;
[b798713]761 }
762
[dca5802]763 // All lanes where empty return 0p
[b798713]764 return 0p;
765}
766
767//-----------------------------------------------------------------------
768
769static void check( __ready_queue_t & q ) with (q) {
770 #if defined(__CFA_WITH_VERIFY__)
771 {
[dca5802]772 for( idx ; lanes.count ) {
773 __intrusive_lane_t & sl = lanes.data[idx];
774 assert(!lanes.data[idx].lock);
[b798713]775
776 assert(head(sl)->link.prev == 0p );
777 assert(head(sl)->link.next->link.prev == head(sl) );
778 assert(tail(sl)->link.next == 0p );
779 assert(tail(sl)->link.prev->link.next == tail(sl) );
780
781 if(sl.before.link.ts == 0l) {
782 assert(tail(sl)->link.prev == head(sl));
783 assert(head(sl)->link.next == tail(sl));
[1b143de]784 } else {
785 assert(tail(sl)->link.prev != head(sl));
786 assert(head(sl)->link.next != tail(sl));
[b798713]787 }
788 }
789 }
790 #endif
791}
792
793// Call this function of the intrusive list was moved using memcpy
[dca5802]794// fixes the list so that the pointers back to anchors aren't left dangling
795static inline void fix(__intrusive_lane_t & ll) {
796 // if the list is not empty then follow he pointer and fix its reverse
797 if(!is_empty(ll)) {
[b798713]798 head(ll)->link.next->link.prev = head(ll);
799 tail(ll)->link.prev->link.next = tail(ll);
800 }
801 // Otherwise just reset the list
802 else {
[dca5802]803 verify(tail(ll)->link.next == 0p);
[b798713]804 tail(ll)->link.prev = head(ll);
805 head(ll)->link.next = tail(ll);
[dca5802]806 verify(head(ll)->link.prev == 0p);
[b798713]807 }
808}
809
[dca5802]810// Grow the ready queue
[b798713]811void ready_queue_grow (struct cluster * cltr) {
[dca5802]812 // Lock the RWlock so no-one pushes/pops while we are changing the queue
[b388ee81]813 uint_fast32_t last_size = ready_mutate_lock();
[dca5802]814
[504a7dc]815 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
[b798713]816
[dca5802]817 // Make sure that everything is consistent
818 /* paranoid */ check( cltr->ready_queue );
819
820 // grow the ready queue
[b798713]821 with( cltr->ready_queue ) {
[61d7bec]822 ^(snzi){};
[b798713]823
[61d7bec]824 size_t ncount = lanes.count;
[b798713]825
[dca5802]826 // increase count
[b798713]827 ncount += 4;
828
[dca5802]829 // Allocate new array (uses realloc and memcpies the data)
830 lanes.data = alloc(lanes.data, ncount);
[b798713]831
832 // Fix the moved data
[dca5802]833 for( idx; (size_t)lanes.count ) {
834 fix(lanes.data[idx]);
[b798713]835 }
836
837 // Construct new data
[dca5802]838 for( idx; (size_t)lanes.count ~ ncount) {
839 (lanes.data[idx]){};
[b798713]840 }
841
842 // Update original
[dca5802]843 lanes.count = ncount;
844
[61d7bec]845 // Re-create the snzi
846 snzi{ log2( lanes.count / 8 ) };
847 for( idx; (size_t)lanes.count ) {
848 if( !is_empty(lanes.data[idx]) ) {
849 arrive(snzi, idx);
850 }
851 }
[b798713]852 }
853
854 // Make sure that everything is consistent
[dca5802]855 /* paranoid */ check( cltr->ready_queue );
856
[504a7dc]857 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
[dca5802]858
859 // Unlock the RWlock
[b388ee81]860 ready_mutate_unlock( last_size );
[b798713]861}
862
[dca5802]863// Shrink the ready queue
[b798713]864void ready_queue_shrink(struct cluster * cltr) {
[dca5802]865 // Lock the RWlock so no-one pushes/pops while we are changing the queue
[b388ee81]866 uint_fast32_t last_size = ready_mutate_lock();
[dca5802]867
[504a7dc]868 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
[dca5802]869
870 // Make sure that everything is consistent
871 /* paranoid */ check( cltr->ready_queue );
872
[b798713]873 with( cltr->ready_queue ) {
[61d7bec]874 ^(snzi){};
875
[dca5802]876 // Make sure that the total thread count stays the same
[c84b4be]877 #if defined(__CFA_WITH_VERIFY__)
878 size_t nthreads = 0;
[dca5802]879 for( idx; (size_t)lanes.count ) {
880 nthreads += lanes.data[idx].count;
[c84b4be]881 }
882 #endif
883
[dca5802]884 size_t ocount = lanes.count;
[b798713]885 // Check that we have some space left
886 if(ocount < 8) abort("Program attempted to destroy more Ready Queues than were created");
887
[dca5802]888 // reduce the actual count so push doesn't use the old queues
889 lanes.count -= 4;
890 verify(ocount > lanes.count);
891
892 // for printing count the number of displaced threads
[504a7dc]893 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
[dca5802]894 __attribute__((unused)) size_t displaced = 0;
895 #endif
[b798713]896
897 // redistribute old data
[dca5802]898 for( idx; (size_t)lanes.count ~ ocount) {
899 // Lock is not strictly needed but makes checking invariants much easier
[1b143de]900 __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
[b798713]901 verify(locked);
[dca5802]902
903 // As long as we can pop from this lane to push the threads somewhere else in the queue
904 while(!is_empty(lanes.data[idx])) {
[504a7dc]905 struct $thread * thrd;
[b798713]906 __attribute__((unused)) bool _;
[dca5802]907 [thrd, _] = pop(lanes.data[idx]);
908
[b798713]909 push(cltr, thrd);
[dca5802]910
911 // for printing count the number of displaced threads
[504a7dc]912 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
[dca5802]913 displaced++;
914 #endif
[b798713]915 }
916
[dca5802]917 // Unlock the lane
918 __atomic_unlock(&lanes.data[idx].lock);
[b798713]919
920 // TODO print the queue statistics here
921
[dca5802]922 ^(lanes.data[idx]){};
[b798713]923 }
924
[504a7dc]925 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
[c84b4be]926
[dca5802]927 // Allocate new array (uses realloc and memcpies the data)
928 lanes.data = alloc(lanes.data, lanes.count);
[b798713]929
930 // Fix the moved data
[dca5802]931 for( idx; (size_t)lanes.count ) {
932 fix(lanes.data[idx]);
[b798713]933 }
[c84b4be]934
[61d7bec]935 // Re-create the snzi
936 snzi{ log2( lanes.count / 8 ) };
937 for( idx; (size_t)lanes.count ) {
938 if( !is_empty(lanes.data[idx]) ) {
939 arrive(snzi, idx);
940 }
941 }
942
[dca5802]943 // Make sure that the total thread count stayed the same
[c84b4be]944 #if defined(__CFA_WITH_VERIFY__)
[dca5802]945 for( idx; (size_t)lanes.count ) {
946 nthreads -= lanes.data[idx].count;
[c84b4be]947 }
[dca5802]948 verifyf(nthreads == 0, "Shrinking changed number of threads");
[c84b4be]949 #endif
[b798713]950 }
951
952 // Make sure that everything is consistent
[dca5802]953 /* paranoid */ check( cltr->ready_queue );
954
[504a7dc]955 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
[dca5802]956
957 // Unlock the RWlock
[b388ee81]958 ready_mutate_unlock( last_size );
[b798713]959}
960
961//-----------------------------------------------------------------------
962
963#if !defined(__CFA_NO_STATISTICS__)
964void stats_tls_tally(struct cluster * cltr) with (cltr->ready_queue) {
965 __atomic_fetch_add( &global_stats.pick.push.attempt, tls.pick.push.attempt, __ATOMIC_SEQ_CST );
966 __atomic_fetch_add( &global_stats.pick.push.success, tls.pick.push.success, __ATOMIC_SEQ_CST );
967 __atomic_fetch_add( &global_stats.pick.pop .maskrds, tls.pick.pop .maskrds, __ATOMIC_SEQ_CST );
968 __atomic_fetch_add( &global_stats.pick.pop .attempt, tls.pick.pop .attempt, __ATOMIC_SEQ_CST );
969 __atomic_fetch_add( &global_stats.pick.pop .success, tls.pick.pop .success, __ATOMIC_SEQ_CST );
970
[dca5802]971 __atomic_fetch_add( &global_stats.used.value, tls.used.value, __ATOMIC_SEQ_CST );
972 __atomic_fetch_add( &global_stats.used.count, tls.used.count, __ATOMIC_SEQ_CST );
[b798713]973}
974#endif
Note: See TracBrowser for help on using the repository browser.