source: libcfa/src/concurrency/ready_queue.cfa@ b798713

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since b798713 was b798713, checked in by Thierry Delisle <tdelisle@…>, 6 years ago

Working ready queue

  • Property mode set to 100644
File size: 23.8 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
17
18#include "bits/defs.hfa"
19#include "kernel_private.hfa"
20
21#define _GNU_SOURCE
22#include "stdlib.hfa"
23
24static const size_t cache_line_size = 64;
25
26static inline unsigned __max_processors_fallback() {
27 #ifdef __CFA_MAX_PROCESSORS__
28 return __CFA_MAX_PROCESSORS__;
29 #else
30 // No overriden function, no environment variable, no define
31 // fall back to a magic number
32 return 128;
33 #endif
34}
35
36__attribute__((weak)) unsigned __max_processors() {
37 const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
38 if(!max_cores_s) {
39 __cfaabi_dbg_print_nolock("No CFA_MAX_PROCESSORS in ENV");
40 return __max_processors_fallback();
41 }
42
43 char * endptr = 0p;
44 long int max_cores_l = strtol(max_cores_s, &endptr, 10);
45 if(max_cores_l < 1 || max_cores_l > 65535) {
46 __cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS out of range : %ld", max_cores_l);
47 return __max_processors_fallback();
48 }
49 if('\0' != *endptr) {
50 __cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS not a decimal number : %s", max_cores_s);
51 return __max_processors_fallback();
52 }
53
54 return max_cores_l;
55}
56
57static inline unsigned rand_bit(unsigned rnum, size_t mask) {
58 verify(sizeof(mask) == 8);
59 unsigned bit = mask ? rnum % __builtin_popcountl(mask) : 0;
60#if !defined(__BMI2__)
61 uint64_t v = mask; // Input value to find position with rank r.
62 unsigned int r = bit + 1;// Input: bit's desired rank [1-64].
63 unsigned int s; // Output: Resulting position of bit with rank r [1-64]
64 uint64_t a, b, c, d; // Intermediate temporaries for bit count.
65 unsigned int t; // Bit count temporary.
66
67 // Do a normal parallel bit count for a 64-bit integer,
68 // but store all intermediate steps.
69 a = v - ((v >> 1) & ~0UL/3);
70 b = (a & ~0UL/5) + ((a >> 2) & ~0UL/5);
71 c = (b + (b >> 4)) & ~0UL/0x11;
72 d = (c + (c >> 8)) & ~0UL/0x101;
73
74
75 t = (d >> 32) + (d >> 48);
76 // Now do branchless select!
77 s = 64;
78 s -= ((t - r) & 256) >> 3; r -= (t & ((t - r) >> 8));
79 t = (d >> (s - 16)) & 0xff;
80 s -= ((t - r) & 256) >> 4; r -= (t & ((t - r) >> 8));
81 t = (c >> (s - 8)) & 0xf;
82 s -= ((t - r) & 256) >> 5; r -= (t & ((t - r) >> 8));
83 t = (b >> (s - 4)) & 0x7;
84 s -= ((t - r) & 256) >> 6; r -= (t & ((t - r) >> 8));
85 t = (a >> (s - 2)) & 0x3;
86 s -= ((t - r) & 256) >> 7; r -= (t & ((t - r) >> 8));
87 t = (v >> (s - 1)) & 0x1;
88 s -= ((t - r) & 256) >> 8;
89 return s - 1;
90#else
91 uint64_t picked = _pdep_u64(1ul << bit, mask);
92 return picked ? __builtin_ctzl(picked) : 0;
93#endif
94}
95
96static inline __cfa_readyQ_mask_t readyQ_mask_full () { return (8 * sizeof(__cfa_readyQ_mask_t)) - 1; }
97static inline __cfa_readyQ_mask_t readyQ_mask_shit_length() { return (8 * sizeof(__cfa_readyQ_mask_t)) - __builtin_clzl(readyQ_mask_full()); }
98
99static inline [__cfa_readyQ_mask_t, __cfa_readyQ_mask_t] extract(__cfa_readyQ_mask_t idx) {
100 __cfa_readyQ_mask_t word = idx >> readyQ_mask_shit_length();
101 __cfa_readyQ_mask_t bit = idx & readyQ_mask_full();
102 return [bit, word];
103}
104
105//=======================================================================
106// Cluster wide reader-writer lock
107//=======================================================================
108void ?{}(__clusterRWLock_t & this) {
109 this.max = __max_processors();
110 this.alloc = 0;
111 this.ready = 0;
112 this.lock = false;
113 this.data = alloc(this.max);
114
115 /*paranoid*/ verify( 0 == (((uintptr_t)(this.data )) % 64) );
116 /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
117 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
118 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
119
120}
121void ^?{}(__clusterRWLock_t & this) {
122 free(this.data);
123}
124
125void ?{}( __processor_id & this, struct processor * proc ) {
126 this.handle = proc;
127 this.lock = false;
128}
129
130//=======================================================================
131// Lock-Free registering/unregistering of threads
132unsigned doregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
133 // Step - 1 : check if there is already space in the data
134 uint_fast32_t s = ready;
135
136 // Check among all the ready
137 for(uint_fast32_t i = 0; i < s; i++) {
138 processor * null = 0p; // Re-write every loop since compare thrashes it
139 if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
140 && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
141 /*paranoid*/ verify(i < ready);
142 /*paranoid*/ verify(__alignof__(data[i]) == cache_line_size);
143 /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
144 return i;
145 }
146 }
147
148 if(max <= alloc) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
149
150 // Step - 2 : F&A to get a new spot in the array.
151 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
152 if(max <= n) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
153
154 // Step - 3 : Mark space as used and then publish it.
155 __processor_id * storage = (__processor_id *)&data[n];
156 (*storage){ proc };
157 while(true) {
158 unsigned copy = n;
159 if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
160 && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
161 break;
162 asm volatile("pause");
163 }
164
165 // Return new spot.
166 /*paranoid*/ verify(n < ready);
167 /*paranoid*/ verify(__alignof__(data[n]) == cache_line_size);
168 /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
169 return n;
170}
171
172void unregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
173 unsigned id = proc->id;
174 /*paranoid*/ verify(id < ready);
175 /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
176 __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
177}
178
179//-----------------------------------------------------------------------
180// Writer side : acquire when changing the ready queue, e.g. adding more
181// queues or removing them.
182uint_fast32_t ready_mutate_lock( struct cluster & cltr ) with(cltr.ready_lock) {
183 // Step 1 : lock global lock
184 // It is needed to avoid processors that register mid Critical-Section
185 // to simply lock their own lock and enter.
186 __atomic_acquire( &lock );
187
188 // Step 2 : lock per-proc lock
189 // Processors that are currently being registered aren't counted
190 // but can't be in read_lock or in the critical section.
191 // All other processors are counted
192 uint_fast32_t s = ready;
193 for(uint_fast32_t i = 0; i < s; i++) {
194 __atomic_acquire( &data[i].lock );
195 }
196
197 return s;
198}
199
200void ready_mutate_unlock( struct cluster & cltr, uint_fast32_t last_s ) with(cltr.ready_lock) {
201 // Step 1 : release local locks
202 // This must be done while the global lock is held to avoid
203 // threads that where created mid critical section
204 // to race to lock their local locks and have the writer
205 // immidiately unlock them
206 // Alternative solution : return s in write_lock and pass it to write_unlock
207 for(uint_fast32_t i = 0; i < last_s; i++) {
208 verify(data[i].lock);
209 __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
210 }
211
212 // Step 2 : release global lock
213 /*paranoid*/ assert(true == lock);
214 __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
215}
216
217//=======================================================================
218// Intrusive Queue used by ready queue
219//=======================================================================
220// Get the head pointer (one before the first element) from the anchor
221static inline thread_desc * head(const __intrusive_ready_queue_t & this) {
222 thread_desc * rhead = (thread_desc *)(
223 (uintptr_t)( &this.before ) - offsetof( thread_desc, link )
224 );
225 /* paranoid */ verify(rhead);
226 return rhead;
227}
228
229// Get the tail pointer (one after the last element) from the anchor
230static inline thread_desc * tail(const __intrusive_ready_queue_t & this) {
231 thread_desc * rtail = (thread_desc *)(
232 (uintptr_t)( &this.after ) - offsetof( thread_desc, link )
233 );
234 /* paranoid */ verify(rtail);
235 return rtail;
236}
237
238// Ctor
239void ?{}( __intrusive_ready_queue_t & this ) {
240 this.lock = false;
241 this.last_id = -1u;
242
243 this.before.link.prev = 0p;
244 this.before.link.next = tail(this);
245 this.before.link.ts = 0;
246
247 this.after .link.prev = head(this);
248 this.after .link.next = 0p;
249 this.after .link.ts = 0;
250
251 #if !defined(__CFA_NO_SCHED_STATS__)
252 this.stat.diff = 0;
253 this.stat.push = 0;
254 this.stat.pop = 0;
255 #endif
256
257 // We add a boat-load of assertions here because the anchor code is very fragile
258 /* paranoid */ verify(((uintptr_t)( head(this) ) + offsetof( thread_desc, link )) == (uintptr_t)(&this.before));
259 /* paranoid */ verify(((uintptr_t)( tail(this) ) + offsetof( thread_desc, link )) == (uintptr_t)(&this.after ));
260 /* paranoid */ verify(head(this)->link.prev == 0p );
261 /* paranoid */ verify(head(this)->link.next == tail(this) );
262 /* paranoid */ verify(tail(this)->link.next == 0p );
263 /* paranoid */ verify(tail(this)->link.prev == head(this) );
264 /* paranoid */ verify(&head(this)->link.prev == &this.before.link.prev );
265 /* paranoid */ verify(&head(this)->link.next == &this.before.link.next );
266 /* paranoid */ verify(&tail(this)->link.prev == &this.after .link.prev );
267 /* paranoid */ verify(&tail(this)->link.next == &this.after .link.next );
268 /* paranoid */ verify(sizeof(__intrusive_ready_queue_t) == 128);
269 /* paranoid */ verify(sizeof(this) == 128);
270 /* paranoid */ verify(__alignof__(__intrusive_ready_queue_t) == 128);
271 /* paranoid */ verify(__alignof__(this) == 128);
272 /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128));
273
274 /* paranoid */ verifyf(readyQ_mask_shit_length() == 6 , "%zu", readyQ_mask_shit_length());
275 /* paranoid */ verifyf(readyQ_mask_full() == 63, "%zu", readyQ_mask_full());
276}
277
278// Dtor is trivial
279void ^?{}( __intrusive_ready_queue_t & this ) {
280 // Make sure the list is empty
281 /* paranoid */ verify(head(this)->link.prev == 0p );
282 /* paranoid */ verify(head(this)->link.next == tail(this) );
283 /* paranoid */ verify(tail(this)->link.next == 0p );
284 /* paranoid */ verify(tail(this)->link.prev == head(this) );
285}
286
287
288
289bool push(__intrusive_ready_queue_t & this, thread_desc * node) {
290 verify(this.lock);
291 verify(node->link.ts != 0);
292 verify(node->link.next == 0p);
293 verify(node->link.prev == 0p);
294
295 if(this.before.link.ts == 0l) {
296 verify(tail(this)->link.next == 0p);
297 verify(tail(this)->link.prev == head(this));
298 verify(head(this)->link.next == tail(this));
299 verify(head(this)->link.prev == 0p);
300 }
301
302 // Get the relevant nodes locally
303 thread_desc * tail = tail(this);
304 thread_desc * prev = tail->link.prev;
305
306 // Do the push
307 node->link.next = tail;
308 node->link.prev = prev;
309 prev->link.next = node;
310 tail->link.prev = node;
311
312 // Update stats
313 #ifndef __CFA_NO_SCHED_STATS__
314 this.stat.diff++;
315 this.stat.push++;
316 #endif
317
318 verify(node->link.next == tail(this));
319
320 // Check if the queue used to be empty
321 if(this.before.link.ts == 0l) {
322 this.before.link.ts = node->link.ts;
323 verify(node->link.prev == head(this));
324 return true;
325 }
326 return false;
327}
328
329[thread_desc *, bool] pop(__intrusive_ready_queue_t & this) {
330 verify(this.lock);
331 verify(this.before.link.ts != 0ul);
332 thread_desc * head = head(this);
333 thread_desc * tail = tail(this);
334
335 thread_desc * node = head->link.next;
336 thread_desc * next = node->link.next;
337 if(node == tail) {
338 verify(false);
339 verify(this.before.link.ts == 0ul);
340 verify(tail(this)->link.next == 0p);
341 verify(tail(this)->link.prev == head(this));
342 verify(head(this)->link.next == tail(this));
343 verify(head(this)->link.prev == 0p);
344 return [0p, false];
345 }
346
347 /* paranoid */ verify(node);
348
349 head->link.next = next;
350 next->link.prev = head;
351
352 #ifndef __CFA_NO_SCHED_STATS__
353 this.stat.diff--;
354 this.stat.pop ++;
355 #endif
356
357 if(next == tail) {
358 this.before.link.ts = 0ul;
359 verify(tail(this)->link.next == 0p);
360 verify(tail(this)->link.prev == head(this));
361 verify(head(this)->link.next == tail(this));
362 verify(head(this)->link.prev == 0p);
363 node->link.[next, prev] = 0p;
364 return [node, true];
365 }
366 else {
367 verify(next->link.ts != 0);
368 this.before.link.ts = next->link.ts;
369 verify(this.before.link.ts != 0);
370 node->link.[next, prev] = 0p;
371 return [node, false];
372 }
373}
374
375static inline unsigned long long ts(__intrusive_ready_queue_t & this) {
376 return this.before.link.ts;
377}
378
379//=======================================================================
380// Cforall Reqdy Queue used by ready queue
381//=======================================================================
382
383static __attribute__((aligned(128))) thread_local struct {
384 struct {
385 struct {
386 size_t attempt;
387 size_t success;
388 } push;
389 struct {
390 size_t maskrds;
391 size_t attempt;
392 size_t success;
393 } pop;
394 } pick;
395 struct {
396 size_t value;
397 size_t count;
398 } full;
399} tls = {
400 /* pick */{
401 /* push */{ 0, 0 },
402 /* pop */{ 0, 0, 0 },
403 },
404 /* full */{ 0, 0 }
405};
406
407//-----------------------------------------------------------------------
408
409void ?{}(__ready_queue_t & this) with (this) {
410 empty.count = 0;
411 for( i ; __cfa_readyQ_mask_size ) {
412 empty.mask[i] = 0;
413 }
414
415 list.data = alloc(4);
416 for( i; 4 ) {
417 (list.data[i]){};
418 }
419 list.count = 4;
420
421 #if !defined(__CFA_NO_STATISTICS__)
422 global_stats.pick.push.attempt = 0;
423 global_stats.pick.push.success = 0;
424 global_stats.pick.pop .maskrds = 0;
425 global_stats.pick.pop .attempt = 0;
426 global_stats.pick.pop .success = 0;
427
428 global_stats.full.value = 0;
429 global_stats.full.count = 0;
430 #endif
431}
432
433void ^?{}(__ready_queue_t & this) with (this) {
434 verify( 4 == list .count );
435 verify( 0 == empty.count );
436
437 for( i; 4 ) {
438 ^(list.data[i]){};
439 }
440 free(list.data);
441
442
443 #if defined(__CFA_WITH_VERIFY__)
444 for( i ; __cfa_readyQ_mask_size ) {
445 assert( 0 == empty.mask[i] );
446 }
447 #endif
448}
449
450//-----------------------------------------------------------------------
451
452__attribute__((hot)) bool push(struct cluster * cltr, struct thread_desc * thrd) with (cltr->ready_queue) {
453 thrd->link.ts = rdtscl();
454
455 while(true) {
456 // Pick a random list
457 unsigned i = tls_rand() % list.count;
458
459 #if !defined(__CFA_NO_STATISTICS__)
460 tls.pick.push.attempt++;
461 #endif
462
463 // If we can't lock it retry
464 if( !__atomic_try_acquire( &list.data[i].lock ) ) continue;
465 verify(list.data[i].last_id == -1u);
466 list.data[i].last_id = kernelTLS.this_processor->id;
467
468 __attribute__((unused)) size_t num = __atomic_load_n( &empty.count, __ATOMIC_RELAXED );
469 bool first = false;
470
471 verify( list.data[i].last_id == kernelTLS.this_processor->id );
472 verify( list.data[i].lock );
473 // Actually push it
474 if(push(list.data[i], thrd)) {
475 size_t ret = __atomic_fetch_add( &empty.count, 1z, __ATOMIC_SEQ_CST);
476 first = (ret == 0);
477
478 __cfa_readyQ_mask_t word;
479 __cfa_readyQ_mask_t bit;
480 [bit, word] = extract(i);
481 verifyf((empty.mask[word] & (1ull << bit)) == 0, "Before set %llu:%llu (%u), %llx & %llx", word, bit, i, empty.mask[word], (1ull << bit));
482 __attribute__((unused)) bool ret = bts(&empty.mask[word], bit);
483 verify(!(bool)ret);
484 verifyf((empty.mask[word] & (1ull << bit)) != 0, "After set %llu:%llu (%u), %llx & %llx", word, bit, i, empty.mask[word], (1ull << bit));
485 }
486 verify(empty.count <= (int)list.count);
487 verify( list.data[i].last_id == kernelTLS.this_processor->id );
488 verify( list.data[i].lock );
489
490 // Unlock and return
491 list.data[i].last_id = -1u;
492 __atomic_unlock( &list.data[i].lock );
493
494 #if !defined(__CFA_NO_STATISTICS__)
495 tls.pick.push.success++;
496 tls.full.value += num;
497 tls.full.count += 1;
498 #endif
499 return first;
500 }
501}
502
503//-----------------------------------------------------------------------
504
505static struct thread_desc * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
506 #if !defined(__CFA_NO_STATISTICS__)
507 tls.pick.pop.attempt++;
508 #endif
509
510 // Pick the bet list
511 int w = i;
512 if( __builtin_expect(ts(list.data[j]) != 0, true) ) {
513 w = (ts(list.data[i]) < ts(list.data[j])) ? i : j;
514 }
515
516 __intrusive_ready_queue_t & list = list.data[w];
517 // If list looks empty retry
518 if( ts(list) == 0 ) return 0p;
519
520 // If we can't get the lock retry
521 if( !__atomic_try_acquire(&list.lock) ) return 0p;
522 verify(list.last_id == -1u);
523 list.last_id = kernelTLS.this_processor->id;
524
525 verify(list.last_id == kernelTLS.this_processor->id);
526
527 __attribute__((unused)) int num = __atomic_load_n( &empty.count, __ATOMIC_RELAXED );
528
529
530 // If list is empty, unlock and retry
531 if( ts(list) == 0 ) {
532 list.last_id = -1u;
533 __atomic_unlock(&list.lock);
534 return 0p;
535 }
536 {
537 __cfa_readyQ_mask_t word;
538 __cfa_readyQ_mask_t bit;
539 [bit, word] = extract(w);
540 verify((empty.mask[word] & (1ull << bit)) != 0);
541 }
542
543 verify(list.last_id == kernelTLS.this_processor->id);
544 verify(list.lock);
545
546 // Actually pop the list
547 struct thread_desc * thrd;
548 bool emptied;
549 [thrd, emptied] = pop(list);
550 verify(thrd);
551
552 verify(list.last_id == kernelTLS.this_processor->id);
553 verify(list.lock);
554
555 if(emptied) {
556 __atomic_fetch_sub( &empty.count, 1z, __ATOMIC_SEQ_CST);
557
558 __cfa_readyQ_mask_t word;
559 __cfa_readyQ_mask_t bit;
560 [bit, word] = extract(w);
561 verify((empty.mask[word] & (1ull << bit)) != 0);
562 __attribute__((unused)) bool ret = btr(&empty.mask[word], bit);
563 verify(ret);
564 verify((empty.mask[word] & (1ull << bit)) == 0);
565 }
566
567 verify(list.lock);
568
569 // Unlock and return
570 list.last_id = -1u;
571 __atomic_unlock(&list.lock);
572 verify(empty.count >= 0);
573
574 #if !defined(__CFA_NO_STATISTICS__)
575 tls.pick.pop.success++;
576 tls.full.value += num;
577 tls.full.count += 1;
578 #endif
579
580 return thrd;
581}
582
583__attribute__((hot)) thread_desc * pop(struct cluster * cltr) with (cltr->ready_queue) {
584 verify( list.count > 0 );
585 while( __atomic_load_n( &empty.count, __ATOMIC_RELAXED ) != 0) {
586 #if !defined(__CFA_READQ_NO_BITMASK__)
587 tls.pick.pop.maskrds++;
588 unsigned i, j;
589 {
590 #if !defined(__CFA_NO_SCHED_STATS__)
591 tls.pick.pop.maskrds++;
592 #endif
593
594 // Pick two lists at random
595 unsigned num = ((__atomic_load_n( &list.count, __ATOMIC_RELAXED ) - 1) >> 6) + 1;
596
597 unsigned ri = tls_rand();
598 unsigned rj = tls_rand();
599
600 unsigned wdxi = (ri >> 6u) % num;
601 unsigned wdxj = (rj >> 6u) % num;
602
603 size_t maski = __atomic_load_n( &empty.mask[wdxi], __ATOMIC_RELAXED );
604 size_t maskj = __atomic_load_n( &empty.mask[wdxj], __ATOMIC_RELAXED );
605
606 if(maski == 0 && maskj == 0) continue;
607
608 unsigned bi = rand_bit(ri, maski);
609 unsigned bj = rand_bit(rj, maskj);
610
611 verifyf(bi < 64, "%zu %u", maski, bi);
612 verifyf(bj < 64, "%zu %u", maskj, bj);
613
614 i = bi | (wdxi << 6);
615 j = bj | (wdxj << 6);
616
617 verifyf(i < list.count, "%u", wdxi << 6);
618 verifyf(j < list.count, "%u", wdxj << 6);
619 }
620
621 struct thread_desc * thrd = try_pop(cltr, i, j);
622 if(thrd) return thrd;
623 #else
624 // Pick two lists at random
625 int i = tls_rand() % __atomic_load_n( &list.count, __ATOMIC_RELAXED );
626 int j = tls_rand() % __atomic_load_n( &list.count, __ATOMIC_RELAXED );
627
628 struct thread_desc * thrd = try_pop(cltr, i, j);
629 if(thrd) return thrd;
630 #endif
631 }
632
633 return 0p;
634}
635
636//-----------------------------------------------------------------------
637
638static void check( __ready_queue_t & q ) with (q) {
639 #if defined(__CFA_WITH_VERIFY__)
640 {
641 int idx = 0;
642 for( w ; __cfa_readyQ_mask_size ) {
643 for( b ; 8 * sizeof(__cfa_readyQ_mask_t) ) {
644 bool is_empty = idx < list.count ? (ts(list.data[idx]) == 0) : true;
645 bool should_be_empty = 0 == (empty.mask[w] & (1z << b));
646 assertf(should_be_empty == is_empty, "Inconsistent list %d, mask expect : %d, actual is got %d", idx, should_be_empty, (bool)is_empty);
647 assert(__cfa_max_readyQs > idx);
648 idx++;
649 }
650 }
651 }
652
653 {
654 for( idx ; list.count ) {
655 __intrusive_ready_queue_t & sl = list.data[idx];
656 assert(!list.data[idx].lock);
657
658 assert(head(sl)->link.prev == 0p );
659 assert(head(sl)->link.next->link.prev == head(sl) );
660 assert(tail(sl)->link.next == 0p );
661 assert(tail(sl)->link.prev->link.next == tail(sl) );
662
663 if(sl.before.link.ts == 0l) {
664 assert(tail(sl)->link.next == 0p);
665 assert(tail(sl)->link.prev == head(sl));
666 assert(head(sl)->link.next == tail(sl));
667 assert(head(sl)->link.prev == 0p);
668 }
669 }
670 }
671 #endif
672}
673
674// Call this function of the intrusive list was moved using memcpy
675// fixes the list so that the pointers back to anchors aren't left
676// dangling
677static inline void fix(__intrusive_ready_queue_t & ll) {
678 // if the list is not empty then follow he pointer
679 // and fix its reverse
680 if(ll.before.link.ts != 0l) {
681 head(ll)->link.next->link.prev = head(ll);
682 tail(ll)->link.prev->link.next = tail(ll);
683 }
684 // Otherwise just reset the list
685 else {
686 tail(ll)->link.next = 0p;
687 tail(ll)->link.prev = head(ll);
688 head(ll)->link.next = tail(ll);
689 head(ll)->link.prev = 0p;
690 }
691}
692
693void ready_queue_grow (struct cluster * cltr) {
694 uint_fast32_t last_size = ready_mutate_lock( *cltr );
695 check( cltr->ready_queue );
696
697 with( cltr->ready_queue ) {
698 size_t ncount = list.count;
699
700 // Check that we have some space left
701 if(ncount + 4 >= __cfa_max_readyQs) abort("Program attempted to create more than maximum number of Ready Queues (%zu)", __cfa_max_readyQs);
702
703 ncount += 4;
704
705 // Allocate new array
706 list.data = alloc(list.data, ncount);
707
708 // Fix the moved data
709 for( idx; (size_t)list.count ) {
710 fix(list.data[idx]);
711 }
712
713 // Construct new data
714 for( idx; (size_t)list.count ~ ncount) {
715 (list.data[idx]){};
716 }
717
718 // Update original
719 list.count = ncount;
720 // fields in empty don't need to change
721 }
722
723 // Make sure that everything is consistent
724 check( cltr->ready_queue );
725 ready_mutate_unlock( *cltr, last_size );
726}
727
728void ready_queue_shrink(struct cluster * cltr) {
729 uint_fast32_t last_size = ready_mutate_lock( *cltr );
730 with( cltr->ready_queue ) {
731 size_t ocount = list.count;
732 // Check that we have some space left
733 if(ocount < 8) abort("Program attempted to destroy more Ready Queues than were created");
734
735 list.count -= 4;
736
737 // redistribute old data
738 verify(ocount > list.count);
739 for( idx; (size_t)list.count ~ ocount) {
740 // This is not strictly needed but makes checking invariants much easier
741 bool locked = __atomic_try_acquire(&list.data[idx].lock);
742 verify(locked);
743 while(0 != ts(list.data[idx])) {
744 struct thread_desc * thrd;
745 __attribute__((unused)) bool _;
746 [thrd, _] = pop(list.data[idx]);
747 verify(thrd);
748 push(cltr, thrd);
749 }
750
751 __atomic_unlock(&list.data[idx].lock);
752
753 // TODO print the queue statistics here
754
755 ^(list.data[idx]){};
756 }
757
758 // clear the now unused masks
759 {
760 __cfa_readyQ_mask_t fword, fbit, lword, lbit;
761 [fbit, fword] = extract(ocount);
762 [lbit, lword] = extract(list.count);
763
764 // For now assume that all queues where coverd by the same bitmask
765 // This is highly probable as long as grow and shrink use groups of 4
766 // exclusively
767 verify(fword == lword);
768 __cfa_readyQ_mask_t clears = ~0;
769
770 for( b ; fbit ~ lbit ) {
771 clears ^= 1 << b;
772 }
773
774 empty.mask[fword] &= clears;
775 }
776
777 // Allocate new array
778 list.data = alloc(list.data, list.count);
779
780 // Fix the moved data
781 for( idx; (size_t)list.count ) {
782 fix(list.data[idx]);
783 }
784 }
785
786 // Make sure that everything is consistent
787 check( cltr->ready_queue );
788 ready_mutate_unlock( *cltr, last_size );
789}
790
791//-----------------------------------------------------------------------
792
793#if !defined(__CFA_NO_STATISTICS__)
794void stats_tls_tally(struct cluster * cltr) with (cltr->ready_queue) {
795 __atomic_fetch_add( &global_stats.pick.push.attempt, tls.pick.push.attempt, __ATOMIC_SEQ_CST );
796 __atomic_fetch_add( &global_stats.pick.push.success, tls.pick.push.success, __ATOMIC_SEQ_CST );
797 __atomic_fetch_add( &global_stats.pick.pop .maskrds, tls.pick.pop .maskrds, __ATOMIC_SEQ_CST );
798 __atomic_fetch_add( &global_stats.pick.pop .attempt, tls.pick.pop .attempt, __ATOMIC_SEQ_CST );
799 __atomic_fetch_add( &global_stats.pick.pop .success, tls.pick.pop .success, __ATOMIC_SEQ_CST );
800
801 __atomic_fetch_add( &global_stats.full.value, tls.full.value, __ATOMIC_SEQ_CST );
802 __atomic_fetch_add( &global_stats.full.count, tls.full.count, __ATOMIC_SEQ_CST );
803}
804#endif
Note: See TracBrowser for help on using the repository browser.