source: libcfa/src/concurrency/ready_queue.cfa@ dca5802

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast new-ast-unique-expr pthread-emulation qualifiedEnum stuck-waitfor-destruct
Last change on this file since dca5802 was dca5802, checked in by Thierry Delisle <tdelisle@…>, 6 years ago

Started doing some of the x86 implementations and some changes after a code review

  • Property mode set to 100644
File size: 30.1 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
17
18#include "bits/defs.hfa"
19#include "kernel_private.hfa"
20
21#define _GNU_SOURCE
22#include "stdlib.hfa"
23
24static const size_t cache_line_size = 64;
25
26// No overriden function, no environment variable, no define
27// fall back to a magic number
28#ifndef __CFA_MAX_PROCESSORS__
29 #define __CFA_MAX_PROCESSORS__ 128
30#endif
31
32// returns the maximum number of processors the RWLock support
33__attribute__((weak)) unsigned __max_processors() {
34 const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
35 if(!max_cores_s) {
36 __cfaabi_dbg_print_nolock("No CFA_MAX_PROCESSORS in ENV");
37 return __CFA_MAX_PROCESSORS__;
38 }
39
40 char * endptr = 0p;
41 long int max_cores_l = strtol(max_cores_s, &endptr, 10);
42 if(max_cores_l < 1 || max_cores_l > 65535) {
43 __cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS out of range : %ld", max_cores_l);
44 return __CFA_MAX_PROCESSORS__;
45 }
46 if('\0' != *endptr) {
47 __cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS not a decimal number : %s", max_cores_s);
48 return __CFA_MAX_PROCESSORS__;
49 }
50
51 return max_cores_l;
52}
53
54// Picks a random 1 bit in 'mask' according to random number 'rnum'.
55static inline unsigned rand_bit(unsigned rnum, __cfa_readyQ_mask_t mask) {
56#if defined( __i386 )
57 static_assert(sizeof(mask) == 4);
58 unsigned bit = mask ? rnum % __builtin_popcount(mask) : 0;
59 #if !defined(__BMI2__)
60 #error rand_bit not implemented for non __BMI2__ i386
61 #else
62 uint32_t picked = _pdep_u32(1ul << bit, mask);
63 return picked ? __builtin_ctz(picked) : 0;
64 #endif
65#elif defined( __x86_64 )
66 static_assert(sizeof(mask) == 8);
67 unsigned bit = mask ? rnum % __builtin_popcountl(mask) : 0;
68 #if !defined(__BMI2__)
69 uint64_t v = mask; // Input value to find position with rank r.
70 unsigned int r = bit + 1;// Input: bit's desired rank [1-64].
71 unsigned int s; // Output: Resulting position of bit with rank r [1-64]
72 uint64_t a, b, c, d; // Intermediate temporaries for bit count.
73 unsigned int t; // Bit count temporary.
74
75 // Do a normal parallel bit count for a 64-bit integer,
76 // but store all intermediate steps.
77 a = v - ((v >> 1) & ~0UL/3);
78 b = (a & ~0UL/5) + ((a >> 2) & ~0UL/5);
79 c = (b + (b >> 4)) & ~0UL/0x11;
80 d = (c + (c >> 8)) & ~0UL/0x101;
81
82
83 t = (d >> 32) + (d >> 48);
84 // Now do branchless select!
85 s = 64;
86 s -= ((t - r) & 256) >> 3; r -= (t & ((t - r) >> 8));
87 t = (d >> (s - 16)) & 0xff;
88 s -= ((t - r) & 256) >> 4; r -= (t & ((t - r) >> 8));
89 t = (c >> (s - 8)) & 0xf;
90 s -= ((t - r) & 256) >> 5; r -= (t & ((t - r) >> 8));
91 t = (b >> (s - 4)) & 0x7;
92 s -= ((t - r) & 256) >> 6; r -= (t & ((t - r) >> 8));
93 t = (a >> (s - 2)) & 0x3;
94 s -= ((t - r) & 256) >> 7; r -= (t & ((t - r) >> 8));
95 t = (v >> (s - 1)) & 0x1;
96 s -= ((t - r) & 256) >> 8;
97 return s - 1;
98 #else
99 uint64_t picked = _pdep_u64(1ul << bit, mask);
100 return picked ? __builtin_ctzl(picked) : 0;
101 #endif
102#elif defined( __ARM_ARCH )
103 #error rand_bit not implemented for arm
104#else
105 #error uknown hardware architecture
106#endif
107}
108
109
110//-----------------------------------------------------------------------------
111// Helpers used by extract
112// (_mask_bitsidx() & X) returns a bit index valid for a __cfa_readyQ_mask_t, where X is any integer
113static inline __cfa_readyQ_mask_t _mask_bitsidx () { return (8 * sizeof(__cfa_readyQ_mask_t)) - 1; }
114
115// (X >> _mask_shiftidx()) retuns an index into an array of __cfa_readyQ_mask_t
116static inline __cfa_readyQ_mask_t _mask_shiftidx() { return (8 * sizeof(__cfa_readyQ_mask_t)) - __builtin_clzl(_mask_bitsidx()); }
117
118
119// Assuming a large bit mask represented as an array of __cfa_readyQ_mask_t
120// Given an index into the large mask, returns the bit index and which __cfa_readyQ_mask_t index in the array
121static inline [__cfa_readyQ_mask_t, __cfa_readyQ_mask_t] extract(__cfa_readyQ_mask_t idx) {
122 __cfa_readyQ_mask_t word = idx >> _mask_bitsidx();
123 __cfa_readyQ_mask_t bit = idx & _mask_shiftidx();
124 return [bit, word];
125}
126
127//=======================================================================
128// Cluster wide reader-writer lock
129//=======================================================================
130void ?{}(__clusterRWLock_t & this) {
131 this.max = __max_processors();
132 this.alloc = 0;
133 this.ready = 0;
134 this.lock = false;
135 this.data = alloc(this.max);
136
137 /*paranoid*/ verify( 0 == (((uintptr_t)(this.data )) % 64) );
138 /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
139 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
140 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
141
142}
143void ^?{}(__clusterRWLock_t & this) {
144 free(this.data);
145}
146
147void ?{}( __processor_id & this, struct processor * proc ) {
148 this.handle = proc;
149 this.lock = false;
150}
151
152//=======================================================================
153// Lock-Free registering/unregistering of threads
154unsigned doregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
155 // Step - 1 : check if there is already space in the data
156 uint_fast32_t s = ready;
157
158 // Check among all the ready
159 for(uint_fast32_t i = 0; i < s; i++) {
160 processor * null = 0p; // Re-write every loop since compare thrashes it
161 if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
162 && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
163 /*paranoid*/ verify(i < ready);
164 /*paranoid*/ verify(__alignof__(data[i]) == cache_line_size);
165 /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
166 return i;
167 }
168 }
169
170 if(max <= alloc) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
171
172 // Step - 2 : F&A to get a new spot in the array.
173 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
174 if(max <= n) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
175
176 // Step - 3 : Mark space as used and then publish it.
177 __processor_id * storage = (__processor_id *)&data[n];
178 (*storage){ proc };
179 while(true) {
180 unsigned copy = n;
181 if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
182 && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
183 break;
184 asm volatile("pause");
185 }
186
187 // Return new spot.
188 /*paranoid*/ verify(n < ready);
189 /*paranoid*/ verify(__alignof__(data[n]) == cache_line_size);
190 /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
191 return n;
192}
193
194void unregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
195 unsigned id = proc->id;
196 /*paranoid*/ verify(id < ready);
197 /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
198 __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
199}
200
201//-----------------------------------------------------------------------
202// Writer side : acquire when changing the ready queue, e.g. adding more
203// queues or removing them.
204uint_fast32_t ready_mutate_lock( struct cluster & cltr ) with(cltr.ready_lock) {
205 // Step 1 : lock global lock
206 // It is needed to avoid processors that register mid Critical-Section
207 // to simply lock their own lock and enter.
208 __atomic_acquire( &lock );
209
210 // Step 2 : lock per-proc lock
211 // Processors that are currently being registered aren't counted
212 // but can't be in read_lock or in the critical section.
213 // All other processors are counted
214 uint_fast32_t s = ready;
215 for(uint_fast32_t i = 0; i < s; i++) {
216 __atomic_acquire( &data[i].lock );
217 }
218
219 return s;
220}
221
222void ready_mutate_unlock( struct cluster & cltr, uint_fast32_t last_s ) with(cltr.ready_lock) {
223 // Step 1 : release local locks
224 // This must be done while the global lock is held to avoid
225 // threads that where created mid critical section
226 // to race to lock their local locks and have the writer
227 // immidiately unlock them
228 // Alternative solution : return s in write_lock and pass it to write_unlock
229 for(uint_fast32_t i = 0; i < last_s; i++) {
230 verify(data[i].lock);
231 __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
232 }
233
234 // Step 2 : release global lock
235 /*paranoid*/ assert(true == lock);
236 __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
237}
238
239//=======================================================================
240// Intrusive Queue used by ready queue
241//=======================================================================
242// Get the head pointer (one before the first element) from the anchor
243static inline thread_desc * head(const __intrusive_lane_t & this) {
244 thread_desc * rhead = (thread_desc *)(
245 (uintptr_t)( &this.before ) - offsetof( thread_desc, link )
246 );
247 /* paranoid */ verify(rhead);
248 return rhead;
249}
250
251// Get the tail pointer (one after the last element) from the anchor
252static inline thread_desc * tail(const __intrusive_lane_t & this) {
253 thread_desc * rtail = (thread_desc *)(
254 (uintptr_t)( &this.after ) - offsetof( thread_desc, link )
255 );
256 /* paranoid */ verify(rtail);
257 return rtail;
258}
259
260// Ctor
261void ?{}( __intrusive_lane_t & this ) {
262 this.lock = false;
263 this.last_id = -1u;
264 this.count = 0u;
265
266 this.before.link.prev = 0p;
267 this.before.link.next = tail(this);
268 this.before.link.ts = 0;
269
270 this.after .link.prev = head(this);
271 this.after .link.next = 0p;
272 this.after .link.ts = 0;
273
274 #if !defined(__CFA_NO_SCHED_STATS__)
275 this.stat.diff = 0;
276 this.stat.push = 0;
277 this.stat.pop = 0;
278 #endif
279
280 // We add a boat-load of assertions here because the anchor code is very fragile
281 /* paranoid */ verify(((uintptr_t)( head(this) ) + offsetof( thread_desc, link )) == (uintptr_t)(&this.before));
282 /* paranoid */ verify(((uintptr_t)( tail(this) ) + offsetof( thread_desc, link )) == (uintptr_t)(&this.after ));
283 /* paranoid */ verify(head(this)->link.prev == 0p );
284 /* paranoid */ verify(head(this)->link.next == tail(this) );
285 /* paranoid */ verify(tail(this)->link.next == 0p );
286 /* paranoid */ verify(tail(this)->link.prev == head(this) );
287 /* paranoid */ verify(&head(this)->link.prev == &this.before.link.prev );
288 /* paranoid */ verify(&head(this)->link.next == &this.before.link.next );
289 /* paranoid */ verify(&tail(this)->link.prev == &this.after .link.prev );
290 /* paranoid */ verify(&tail(this)->link.next == &this.after .link.next );
291 /* paranoid */ verify(sizeof(__intrusive_lane_t) == 128);
292 /* paranoid */ verify(sizeof(this) == 128);
293 /* paranoid */ verify(__alignof__(__intrusive_lane_t) == 128);
294 /* paranoid */ verify(__alignof__(this) == 128);
295 /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128));
296
297 /* paranoid */ verifyf(_mask_shiftidx() == 6 , "%zu", _mask_shiftidx());
298 /* paranoid */ verifyf(_mask_bitsidx () == 63, "%zu", _mask_bitsidx());
299}
300
301// Dtor is trivial
302void ^?{}( __intrusive_lane_t & this ) {
303 // Make sure the list is empty
304 /* paranoid */ verify(head(this)->link.prev == 0p );
305 /* paranoid */ verify(head(this)->link.next == tail(this) );
306 /* paranoid */ verify(tail(this)->link.next == 0p );
307 /* paranoid */ verify(tail(this)->link.prev == head(this) );
308 /* paranoid */ verify(this.count == 0u );
309}
310
311// Push a thread onto this lane
312// returns true of lane was empty before push, false otherwise
313bool push(__intrusive_lane_t & this, thread_desc * node) {
314 #if defined(__CFA_WITH_VERIFY__)
315 /* paranoid */ verify(this.lock);
316 /* paranoid */ verify(node->link.ts != 0);
317 /* paranoid */ verify(node->link.next == 0p);
318 /* paranoid */ verify(node->link.prev == 0p);
319
320 this.count++;
321
322 if(this.before.link.ts == 0l) {
323 /* paranoid */ verify(tail(this)->link.next == 0p);
324 /* paranoid */ verify(tail(this)->link.prev == head(this));
325 /* paranoid */ verify(head(this)->link.next == tail(this));
326 /* paranoid */ verify(head(this)->link.prev == 0p);
327 }
328 #endif
329
330 // Get the relevant nodes locally
331 thread_desc * tail = tail(this);
332 thread_desc * prev = tail->link.prev;
333
334 // Do the push
335 node->link.next = tail;
336 node->link.prev = prev;
337 prev->link.next = node;
338 tail->link.prev = node;
339
340 // Update stats
341 #if !defined(__CFA_NO_SCHED_STATS__)
342 this.stat.diff++;
343 this.stat.push++;
344 #endif
345
346 verify(node->link.next == tail(this));
347
348 // Check if the queue used to be empty
349 if(this.before.link.ts == 0l) {
350 this.before.link.ts = node->link.ts;
351 /* paranoid */ verify(node->link.prev == head(this));
352 return true;
353 }
354 return false;
355}
356
357// Pop a thread from this lane (must be non-empty)
358// returns popped
359// returns true of lane was empty before push, false otherwise
360[thread_desc *, bool] pop(__intrusive_lane_t & this) {
361 /* paranoid */ verify(this.lock);
362 /* paranoid */ verify(this.before.link.ts != 0ul);
363
364 // Get anchors locally
365 thread_desc * head = head(this);
366 thread_desc * tail = tail(this);
367
368 // Get the relevant nodes locally
369 thread_desc * node = head->link.next;
370 thread_desc * next = node->link.next;
371
372 #if defined(__CFA_WITH_VERIFY__)
373 this.count--;
374 /* paranoid */ verify(node != tail);
375 /* paranoid */ verify(node);
376 #endif
377
378 // Do the pop
379 head->link.next = next;
380 next->link.prev = head;
381 node->link.[next, prev] = 0p;
382
383 // Update head time stamp
384 this.before.link.ts = next->link.ts;
385
386 // Update stats
387 #ifndef __CFA_NO_SCHED_STATS__
388 this.stat.diff--;
389 this.stat.pop ++;
390 #endif
391
392 // Check if we emptied list and return accordingly
393 if(next == tail) {
394 /* paranoid */ verify(this.before.link.ts == 0);
395 /* paranoid */ verify(tail(this)->link.next == 0p);
396 /* paranoid */ verify(tail(this)->link.prev == head(this));
397 /* paranoid */ verify(head(this)->link.next == tail(this));
398 /* paranoid */ verify(head(this)->link.prev == 0p);
399 return [node, true];
400 }
401 else {
402 /* paranoid */ verify(next->link.ts != 0);
403 /* paranoid */ verify(this.before.link.ts != 0);
404 return [node, false];
405 }
406}
407
408// Check whether or not list is empty
409static inline bool is_empty(__intrusive_lane_t & this) {
410 verify( (this.before.link.ts == 0) == (this.count == 0) );
411 return this.before.link.ts == 0;
412}
413
414// Return the timestamp
415static inline unsigned long long ts(__intrusive_lane_t & this) {
416 verify( this.before.link.ts == this.before.link.next->link.ts );
417 return this.before.link.ts;
418}
419
420//=======================================================================
421// Cforall Reqdy Queue used by ready queue
422//=======================================================================
423
424// Thread local mirror of ready queue statistics
425#if !defined(__CFA_NO_STATISTICS__)
426static __attribute__((aligned(128))) thread_local struct {
427 struct {
428 struct {
429 size_t attempt;
430 size_t success;
431 } push;
432 struct {
433 size_t maskrds;
434 size_t attempt;
435 size_t success;
436 } pop;
437 } pick;
438 struct {
439 size_t value;
440 size_t count;
441 } used;
442} tls = {
443 /* pick */{
444 /* push */{ 0, 0 },
445 /* pop */{ 0, 0, 0 },
446 },
447 /* used */{ 0, 0 }
448};
449#endif
450
451//-----------------------------------------------------------------------
452
453void ?{}(__ready_queue_t & this) with (this) {
454 used.count = 0;
455 for( i ; __cfa_lane_mask_size ) {
456 used.mask[i] = 0;
457 }
458
459 lanes.data = alloc(4);
460 for( i; 4 ) {
461 (lanes.data[i]){};
462 }
463 lanes.count = 4;
464
465 #if !defined(__CFA_NO_STATISTICS__)
466 global_stats.pick.push.attempt = 0;
467 global_stats.pick.push.success = 0;
468 global_stats.pick.pop .maskrds = 0;
469 global_stats.pick.pop .attempt = 0;
470 global_stats.pick.pop .success = 0;
471
472 global_stats.used.value = 0;
473 global_stats.used.count = 0;
474 #endif
475}
476
477void ^?{}(__ready_queue_t & this) with (this) {
478 verify( 4 == lanes.count );
479 verify( 0 == used .count );
480
481 for( i; 4 ) {
482 ^(lanes.data[i]){};
483 }
484 free(lanes.data);
485
486
487 #if defined(__CFA_WITH_VERIFY__)
488 for( i ; __cfa_lane_mask_size ) {
489 assert( 0 == used.mask[i] );
490 }
491 #endif
492}
493
494//-----------------------------------------------------------------------
495enum mask_strictness {
496 STRICT,
497 NOCHECK
498};
499
500// Set a given bit in the bit mask array
501// strictness determines of the bit had to be cleared before
502static inline void mask_set(__cfa_readyQ_mask_t * mask, unsigned index, mask_strictness strict) {
503 // Extract the array and bit indexes
504 __cfa_readyQ_mask_t word;
505 __cfa_readyQ_mask_t bit;
506 [bit, word] = extract(index);
507
508 // Conditional check
509 verifyf(
510 strict == STRICT && // Conditional check if it was expected to be cleared
511 ((mask[word] & (1ull << bit)) == 0),
512 "Before set %llu:%llu (%u), %llx & %llx", word, bit, index, mask[word], (1ull << bit)
513 );
514
515 // Atomically set the bit
516 __attribute__((unused)) bool ret = __atomic_bts(&mask[word], bit);
517
518 // Conditional check
519 verifyf(
520 strict == STRICT && // Conditional check if it was expected to be cleared
521 !ret,
522 "Bit was not set but bts returned true"
523 );
524
525 // Unconditional check
526 verifyf(
527 (mask[word] & (1ull << bit)) != 0,
528 "After set %llu:%llu (%u), %llx & %llx", word, bit, index, mask[word], (1ull << bit)
529 );
530}
531
532static inline void mask_clear(__cfa_readyQ_mask_t * mask, unsigned index, mask_strictness strict) {
533 // Extract the array and bit indexes
534 __cfa_readyQ_mask_t word;
535 __cfa_readyQ_mask_t bit;
536 [bit, word] = extract(index);
537
538 // Conditional check
539 verifyf(
540 strict == STRICT && // Conditional check if it was expected to be set
541 ((mask[word] & (1ull << bit)) != 0),
542 "Before clear %llu:%llu (%u), %llx & %llx", word, bit, index, mask[word], (1ull << bit)
543 );
544
545 // Atomically clear the bit
546 __attribute__((unused)) bool ret = __atomic_btr(&mask[word], bit);
547
548 // Conditional check
549 verifyf(
550 strict == STRICT && // Conditional check if it was expected to be cleared
551 ret,
552 "Bit was set but btr returned false"
553 );
554
555 // Unconditional check
556 verifyf(
557 (mask[word] & (1ull << bit)) == 0,
558 "After clear %llu:%llu (%u), %llx & %llx", word, bit, index, mask[word], (1ull << bit)
559 );
560}
561
562//-----------------------------------------------------------------------
563__attribute__((hot)) bool push(struct cluster * cltr, struct thread_desc * thrd) with (cltr->ready_queue) {
564 // write timestamp
565 thrd->link.ts = rdtscl();
566
567 // Try to pick a lane and lock it
568 unsigned i;
569 do {
570 // Pick the index of a lane
571 unsigned i = tls_rand() % lanes.count;
572
573 #if !defined(__CFA_NO_STATISTICS__)
574 tls.pick.push.attempt++;
575 #endif
576
577 // If we can't lock it retry
578 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
579
580 #if defined(__CFA_WITH_VERIFY__)
581 /* paranoid */ verify(lanes.data[i].last_id == -1u);
582 /* paranoid */ lanes.data[i].last_id = kernelTLS.this_processor->id;
583 #endif
584
585 __attribute__((unused)) size_t num = __atomic_load_n( &used.count, __ATOMIC_RELAXED );
586 bool first = false;
587
588 // Actually push it
589 bool lane_first = push(lanes.data[i], thrd);
590
591 // If this lane used to be empty we need to do more
592 if(lane_first) {
593 // Update the global count
594 size_t ret = __atomic_fetch_add( &used.count, 1z, __ATOMIC_SEQ_CST);
595
596 // Check if the entire quue used to be empty
597 first = (ret == 0);
598
599 // Update the bit mask
600 mask_set((__cfa_readyQ_mask_t *)used.mask, i, STRICT);
601 }
602
603 #if defined(__CFA_WITH_VERIFY__)
604 /* paranoid */ verifyf( used.count <= lanes.count, "Non-empty count (%zu) exceeds actual count (%zu)\n", used.count, lanes.count );
605 /* paranoid */ verifyf( lanes.data[i].last_id == kernelTLS.this_processor->id, "Expected last processor to lock queue %u to be %u, was %u\n", i, lanes.data[i].last_id, kernelTLS.this_processor->id );
606 /* paranoid */ verifyf( lanes.data[i].lock, "List %u is not locked\n", i );
607 /* paranoid */ lanes.data[i].last_id = -1u;
608 #endif
609
610 // Unlock and return
611 __atomic_unlock( &lanes.data[i].lock );
612
613 // Update statistics
614 #if !defined(__CFA_NO_STATISTICS__)
615 tls.pick.push.success++;
616 tls.used.value += num;
617 tls.used.count += 1;
618 #endif
619
620 // return whether or not the list was empty before this push
621 return first;
622}
623
624//-----------------------------------------------------------------------
625// Given 2 indexes, pick the list with the oldest push an try to pop from it
626static struct thread_desc * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
627 #if !defined(__CFA_NO_STATISTICS__)
628 tls.pick.pop.attempt++;
629 #endif
630
631 // Pick the bet list
632 int w = i;
633 if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
634 w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
635 }
636
637 // Get relevant elements locally
638 __intrusive_lane_t & lane = lanes.data[w];
639
640 // If list looks empty retry
641 if( is_empty(lane) ) return 0p;
642
643 // If we can't get the lock retry
644 if( !__atomic_try_acquire(&lane.lock) ) return 0p;
645
646 #if defined(__CFA_WITH_VERIFY__)
647 /* paranoid */ verify(lane.last_id == -1u);
648 /* paranoid */ lane.last_id = kernelTLS.this_processor->id;
649 #endif
650
651
652 // If list is empty, unlock and retry
653 if( is_empty(lane) ) {
654 #if defined(__CFA_WITH_VERIFY__)
655 /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id);
656 /* paranoid */ lane.last_id = -1u;
657 #endif
658
659 __atomic_unlock(&lane.lock);
660 return 0p;
661 }
662
663 // Actually pop the list
664 struct thread_desc * thrd;
665 bool emptied;
666 [thrd, emptied] = pop(lane);
667
668 /* paranoid */ verify(thrd);
669 /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id);
670 /* paranoid */ verify(lane.lock);
671
672 // If this was the last element in the lane
673 if(emptied) {
674 // Update the global count
675 __atomic_fetch_sub( &used.count, 1z, __ATOMIC_SEQ_CST);
676
677 // Update the bit mask
678 mask_clear((__cfa_readyQ_mask_t *)used.mask, w, STRICT);
679 }
680
681 #if defined(__CFA_WITH_VERIFY__)
682 /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id);
683 /* paranoid */ lane.last_id = -1u;
684 #endif
685
686 // For statistics, check the count before we release the lock
687 #if !defined(__CFA_NO_STATISTICS__)
688 int num = __atomic_load_n( &used.count, __ATOMIC_RELAXED );
689 #endif
690
691 // Unlock and return
692 __atomic_unlock(&lane.lock);
693
694 // Update statistics
695 #if !defined(__CFA_NO_STATISTICS__)
696 tls.pick.pop.success++;
697 tls.used.value += num;
698 tls.used.count += 1;
699 #endif
700
701 // return the popped thread
702 return thrd;
703}
704
705// Pop from the ready queue from a given cluster
706__attribute__((hot)) thread_desc * pop(struct cluster * cltr) with (cltr->ready_queue) {
707 /* paranoid */ verify( lanes.count > 0 );
708
709 // As long as the list is not empty, try finding a lane that isn't empty and pop from it
710 while( __atomic_load_n( &used.count, __ATOMIC_RELAXED ) != 0) {
711 #if !defined(__CFA_READQ_NO_BITMASK__)
712 // If using bit masks
713 #if !defined(__CFA_NO_SCHED_STATS__)
714 tls.pick.pop.maskrds++;
715 #endif
716
717 // Pick two lists at random
718 unsigned ri = tls_rand();
719 unsigned rj = tls_rand();
720
721 // Find which __cfa_readyQ_mask_t the two lists belong
722 unsigned num = ((__atomic_load_n( &lanes.count, __ATOMIC_RELAXED ) - 1) >> 6) + 1;
723 unsigned wdxi = (ri >> 6u) % num;
724 unsigned wdxj = (rj >> 6u) % num;
725
726 // Get the actual __cfa_readyQ_mask_t
727 size_t maski = __atomic_load_n( &used.mask[wdxi], __ATOMIC_RELAXED );
728 size_t maskj = __atomic_load_n( &used.mask[wdxj], __ATOMIC_RELAXED );
729
730 // If both of these masks are empty, retry
731 if(maski == 0 && maskj == 0) continue;
732
733 // Pick one of the non-zero bits in the masks and get the bit indexes
734 unsigned bi = rand_bit(ri, maski);
735 unsigned bj = rand_bit(rj, maskj);
736
737 // some checks
738 /* paranoid */ verifyf(bi < 64, "%zu %u", maski, bi);
739 /* paranoid */ verifyf(bj < 64, "%zu %u", maskj, bj);
740
741 // get the general list index
742 unsigned i = bi | (wdxi << 6);
743 unsigned j = bj | (wdxj << 6);
744
745 // some more checks
746 /* paranoid */ verifyf(i < lanes.count, "%u", wdxi << 6);
747 /* paranoid */ verifyf(j < lanes.count, "%u", wdxj << 6);
748
749 // try popping from the 2 picked lists
750 struct thread_desc * thrd = try_pop(cltr, i, j);
751 if(thrd) return thrd;
752 #else
753 // Pick two lists at random
754 int i = tls_rand() % __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
755 int j = tls_rand() % __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
756
757 // try popping from the 2 picked lists
758 struct thread_desc * thrd = try_pop(cltr, i, j);
759 if(thrd) return thrd;
760 #endif
761 }
762
763 // All lanes where empty return 0p
764 return 0p;
765}
766
767//-----------------------------------------------------------------------
768
769static void check( __ready_queue_t & q ) with (q) {
770 #if defined(__CFA_WITH_VERIFY__)
771 {
772 int idx = 0;
773 for( w ; __cfa_lane_mask_size ) {
774 for( b ; 8 * sizeof(__cfa_readyQ_mask_t) ) {
775 bool is_empty = idx < lanes.count ? (ts(lanes.data[idx]) == 0) : true;
776 bool should_be_empty = 0 == (used.mask[w] & (1z << b));
777 assertf(should_be_empty == is_empty, "Inconsistent list %d, mask expect : %d, actual is got %d", idx, should_be_empty, (bool)is_empty);
778 assert(__cfa_max_lanes > idx);
779 idx++;
780 }
781 }
782 }
783
784 {
785 for( idx ; lanes.count ) {
786 __intrusive_lane_t & sl = lanes.data[idx];
787 assert(!lanes.data[idx].lock);
788
789 assert(head(sl)->link.prev == 0p );
790 assert(head(sl)->link.next->link.prev == head(sl) );
791 assert(tail(sl)->link.next == 0p );
792 assert(tail(sl)->link.prev->link.next == tail(sl) );
793
794 if(sl.before.link.ts == 0l) {
795 assert(tail(sl)->link.next == 0p);
796 assert(tail(sl)->link.prev == head(sl));
797 assert(head(sl)->link.next == tail(sl));
798 assert(head(sl)->link.prev == 0p);
799 }
800 }
801 }
802 #endif
803}
804
805// Call this function of the intrusive list was moved using memcpy
806// fixes the list so that the pointers back to anchors aren't left dangling
807static inline void fix(__intrusive_lane_t & ll) {
808 // if the list is not empty then follow he pointer and fix its reverse
809 if(!is_empty(ll)) {
810 head(ll)->link.next->link.prev = head(ll);
811 tail(ll)->link.prev->link.next = tail(ll);
812 }
813 // Otherwise just reset the list
814 else {
815 verify(tail(ll)->link.next == 0p);
816 tail(ll)->link.prev = head(ll);
817 head(ll)->link.next = tail(ll);
818 verify(head(ll)->link.prev == 0p);
819 }
820}
821
822// Grow the ready queue
823void ready_queue_grow (struct cluster * cltr) {
824 // Lock the RWlock so no-one pushes/pops while we are changing the queue
825 uint_fast32_t last_size = ready_mutate_lock( *cltr );
826
827 __cfaabi_dbg_print_safe("Kernel : Growing ready queue\n");
828
829 // Make sure that everything is consistent
830 /* paranoid */ check( cltr->ready_queue );
831
832 // grow the ready queue
833 with( cltr->ready_queue ) {
834 size_t ncount = lanes.count;
835
836 // Check that we have some space left
837 if(ncount + 4 >= __cfa_max_lanes) abort("Program attempted to create more than maximum number of Ready Queues (%zu)", __cfa_max_lanes);
838
839 // increase count
840 ncount += 4;
841
842 // Allocate new array (uses realloc and memcpies the data)
843 lanes.data = alloc(lanes.data, ncount);
844
845 // Fix the moved data
846 for( idx; (size_t)lanes.count ) {
847 fix(lanes.data[idx]);
848 }
849
850 // Construct new data
851 for( idx; (size_t)lanes.count ~ ncount) {
852 (lanes.data[idx]){};
853 }
854
855 // Update original
856 lanes.count = ncount;
857
858 // fields in 'used' don't need to change when growing
859 }
860
861 // Make sure that everything is consistent
862 /* paranoid */ check( cltr->ready_queue );
863
864 __cfaabi_dbg_print_safe("Kernel : Growing ready queue done\n");
865
866 // Unlock the RWlock
867 ready_mutate_unlock( *cltr, last_size );
868}
869
870// Shrink the ready queue
871void ready_queue_shrink(struct cluster * cltr) {
872 // Lock the RWlock so no-one pushes/pops while we are changing the queue
873 uint_fast32_t last_size = ready_mutate_lock( *cltr );
874
875 __cfaabi_dbg_print_safe("Kernel : Shrinking ready queue\n");
876
877 // Make sure that everything is consistent
878 /* paranoid */ check( cltr->ready_queue );
879
880 with( cltr->ready_queue ) {
881 // Make sure that the total thread count stays the same
882 #if defined(__CFA_WITH_VERIFY__)
883 size_t nthreads = 0;
884 for( idx; (size_t)lanes.count ) {
885 nthreads += lanes.data[idx].count;
886 }
887 #endif
888
889 size_t ocount = lanes.count;
890 // Check that we have some space left
891 if(ocount < 8) abort("Program attempted to destroy more Ready Queues than were created");
892
893 // reduce the actual count so push doesn't use the old queues
894 lanes.count -= 4;
895 verify(ocount > lanes.count);
896
897 // for printing count the number of displaced threads
898 #if defined(__CFA_DEBUG_PRINT__)
899 __attribute__((unused)) size_t displaced = 0;
900 #endif
901
902 // redistribute old data
903 for( idx; (size_t)lanes.count ~ ocount) {
904 // Lock is not strictly needed but makes checking invariants much easier
905 bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
906 verify(locked);
907
908 // As long as we can pop from this lane to push the threads somewhere else in the queue
909 while(!is_empty(lanes.data[idx])) {
910 struct thread_desc * thrd;
911 __attribute__((unused)) bool _;
912 [thrd, _] = pop(lanes.data[idx]);
913
914 push(cltr, thrd);
915
916 // for printing count the number of displaced threads
917 #if defined(__CFA_DEBUG_PRINT__)
918 displaced++;
919 #endif
920 }
921
922 mask_clear((__cfa_readyQ_mask_t *)used.mask, idx, NOCHECK);
923
924 // Unlock the lane
925 __atomic_unlock(&lanes.data[idx].lock);
926
927 // TODO print the queue statistics here
928
929 ^(lanes.data[idx]){};
930 }
931
932 __cfaabi_dbg_print_safe("Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
933
934 // recompute the used.count instead of maintaining it
935 used.count = 0;
936 for( i ; __cfa_lane_mask_size ) {
937 used.count += __builtin_popcountl(used.mask[i]);
938 }
939
940 // Allocate new array (uses realloc and memcpies the data)
941 lanes.data = alloc(lanes.data, lanes.count);
942
943 // Fix the moved data
944 for( idx; (size_t)lanes.count ) {
945 fix(lanes.data[idx]);
946 }
947
948 // Make sure that the total thread count stayed the same
949 #if defined(__CFA_WITH_VERIFY__)
950 for( idx; (size_t)lanes.count ) {
951 nthreads -= lanes.data[idx].count;
952 }
953 verifyf(nthreads == 0, "Shrinking changed number of threads");
954 #endif
955 }
956
957 // Make sure that everything is consistent
958 /* paranoid */ check( cltr->ready_queue );
959
960 __cfaabi_dbg_print_safe("Kernel : Shrinking ready queue done\n");
961
962 // Unlock the RWlock
963 ready_mutate_unlock( *cltr, last_size );
964}
965
966//-----------------------------------------------------------------------
967
968#if !defined(__CFA_NO_STATISTICS__)
969void stats_tls_tally(struct cluster * cltr) with (cltr->ready_queue) {
970 __atomic_fetch_add( &global_stats.pick.push.attempt, tls.pick.push.attempt, __ATOMIC_SEQ_CST );
971 __atomic_fetch_add( &global_stats.pick.push.success, tls.pick.push.success, __ATOMIC_SEQ_CST );
972 __atomic_fetch_add( &global_stats.pick.pop .maskrds, tls.pick.pop .maskrds, __ATOMIC_SEQ_CST );
973 __atomic_fetch_add( &global_stats.pick.pop .attempt, tls.pick.pop .attempt, __ATOMIC_SEQ_CST );
974 __atomic_fetch_add( &global_stats.pick.pop .success, tls.pick.pop .success, __ATOMIC_SEQ_CST );
975
976 __atomic_fetch_add( &global_stats.used.value, tls.used.value, __ATOMIC_SEQ_CST );
977 __atomic_fetch_add( &global_stats.used.count, tls.used.count, __ATOMIC_SEQ_CST );
978}
979#endif
Note: See TracBrowser for help on using the repository browser.