source: libcfa/src/concurrency/ready_queue.cfa@ c84b4be

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since c84b4be was c84b4be, checked in by Thierry Delisle <tdelisle@…>, 6 years ago

new ready queue seems to work but halting does not, had to be disabled

  • Property mode set to 100644
File size: 24.9 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
17
18#include "bits/defs.hfa"
19#include "kernel_private.hfa"
20
21#define _GNU_SOURCE
22#include "stdlib.hfa"
23
24static const size_t cache_line_size = 64;
25
26static inline unsigned __max_processors_fallback() {
27 #ifdef __CFA_MAX_PROCESSORS__
28 return __CFA_MAX_PROCESSORS__;
29 #else
30 // No overriden function, no environment variable, no define
31 // fall back to a magic number
32 return 128;
33 #endif
34}
35
36__attribute__((weak)) unsigned __max_processors() {
37 const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
38 if(!max_cores_s) {
39 __cfaabi_dbg_print_nolock("No CFA_MAX_PROCESSORS in ENV");
40 return __max_processors_fallback();
41 }
42
43 char * endptr = 0p;
44 long int max_cores_l = strtol(max_cores_s, &endptr, 10);
45 if(max_cores_l < 1 || max_cores_l > 65535) {
46 __cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS out of range : %ld", max_cores_l);
47 return __max_processors_fallback();
48 }
49 if('\0' != *endptr) {
50 __cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS not a decimal number : %s", max_cores_s);
51 return __max_processors_fallback();
52 }
53
54 return max_cores_l;
55}
56
57static inline unsigned rand_bit(unsigned rnum, size_t mask) {
58 verify(sizeof(mask) == 8);
59 unsigned bit = mask ? rnum % __builtin_popcountl(mask) : 0;
60#if !defined(__BMI2__)
61 uint64_t v = mask; // Input value to find position with rank r.
62 unsigned int r = bit + 1;// Input: bit's desired rank [1-64].
63 unsigned int s; // Output: Resulting position of bit with rank r [1-64]
64 uint64_t a, b, c, d; // Intermediate temporaries for bit count.
65 unsigned int t; // Bit count temporary.
66
67 // Do a normal parallel bit count for a 64-bit integer,
68 // but store all intermediate steps.
69 a = v - ((v >> 1) & ~0UL/3);
70 b = (a & ~0UL/5) + ((a >> 2) & ~0UL/5);
71 c = (b + (b >> 4)) & ~0UL/0x11;
72 d = (c + (c >> 8)) & ~0UL/0x101;
73
74
75 t = (d >> 32) + (d >> 48);
76 // Now do branchless select!
77 s = 64;
78 s -= ((t - r) & 256) >> 3; r -= (t & ((t - r) >> 8));
79 t = (d >> (s - 16)) & 0xff;
80 s -= ((t - r) & 256) >> 4; r -= (t & ((t - r) >> 8));
81 t = (c >> (s - 8)) & 0xf;
82 s -= ((t - r) & 256) >> 5; r -= (t & ((t - r) >> 8));
83 t = (b >> (s - 4)) & 0x7;
84 s -= ((t - r) & 256) >> 6; r -= (t & ((t - r) >> 8));
85 t = (a >> (s - 2)) & 0x3;
86 s -= ((t - r) & 256) >> 7; r -= (t & ((t - r) >> 8));
87 t = (v >> (s - 1)) & 0x1;
88 s -= ((t - r) & 256) >> 8;
89 return s - 1;
90#else
91 uint64_t picked = _pdep_u64(1ul << bit, mask);
92 return picked ? __builtin_ctzl(picked) : 0;
93#endif
94}
95
96static inline __cfa_readyQ_mask_t readyQ_mask_full () { return (8 * sizeof(__cfa_readyQ_mask_t)) - 1; }
97static inline __cfa_readyQ_mask_t readyQ_mask_shit_length() { return (8 * sizeof(__cfa_readyQ_mask_t)) - __builtin_clzl(readyQ_mask_full()); }
98
99static inline [__cfa_readyQ_mask_t, __cfa_readyQ_mask_t] extract(__cfa_readyQ_mask_t idx) {
100 __cfa_readyQ_mask_t word = idx >> readyQ_mask_shit_length();
101 __cfa_readyQ_mask_t bit = idx & readyQ_mask_full();
102 return [bit, word];
103}
104
105//=======================================================================
106// Cluster wide reader-writer lock
107//=======================================================================
108void ?{}(__clusterRWLock_t & this) {
109 this.max = __max_processors();
110 this.alloc = 0;
111 this.ready = 0;
112 this.lock = false;
113 this.data = alloc(this.max);
114
115 /*paranoid*/ verify( 0 == (((uintptr_t)(this.data )) % 64) );
116 /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
117 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
118 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
119
120}
121void ^?{}(__clusterRWLock_t & this) {
122 free(this.data);
123}
124
125void ?{}( __processor_id & this, struct processor * proc ) {
126 this.handle = proc;
127 this.lock = false;
128}
129
130//=======================================================================
131// Lock-Free registering/unregistering of threads
132unsigned doregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
133 // Step - 1 : check if there is already space in the data
134 uint_fast32_t s = ready;
135
136 // Check among all the ready
137 for(uint_fast32_t i = 0; i < s; i++) {
138 processor * null = 0p; // Re-write every loop since compare thrashes it
139 if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
140 && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
141 /*paranoid*/ verify(i < ready);
142 /*paranoid*/ verify(__alignof__(data[i]) == cache_line_size);
143 /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
144 return i;
145 }
146 }
147
148 if(max <= alloc) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
149
150 // Step - 2 : F&A to get a new spot in the array.
151 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
152 if(max <= n) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
153
154 // Step - 3 : Mark space as used and then publish it.
155 __processor_id * storage = (__processor_id *)&data[n];
156 (*storage){ proc };
157 while(true) {
158 unsigned copy = n;
159 if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
160 && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
161 break;
162 asm volatile("pause");
163 }
164
165 // Return new spot.
166 /*paranoid*/ verify(n < ready);
167 /*paranoid*/ verify(__alignof__(data[n]) == cache_line_size);
168 /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
169 return n;
170}
171
172void unregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
173 unsigned id = proc->id;
174 /*paranoid*/ verify(id < ready);
175 /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
176 __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
177}
178
179//-----------------------------------------------------------------------
180// Writer side : acquire when changing the ready queue, e.g. adding more
181// queues or removing them.
182uint_fast32_t ready_mutate_lock( struct cluster & cltr ) with(cltr.ready_lock) {
183 // Step 1 : lock global lock
184 // It is needed to avoid processors that register mid Critical-Section
185 // to simply lock their own lock and enter.
186 __atomic_acquire( &lock );
187
188 // Step 2 : lock per-proc lock
189 // Processors that are currently being registered aren't counted
190 // but can't be in read_lock or in the critical section.
191 // All other processors are counted
192 uint_fast32_t s = ready;
193 for(uint_fast32_t i = 0; i < s; i++) {
194 __atomic_acquire( &data[i].lock );
195 }
196
197 return s;
198}
199
200void ready_mutate_unlock( struct cluster & cltr, uint_fast32_t last_s ) with(cltr.ready_lock) {
201 // Step 1 : release local locks
202 // This must be done while the global lock is held to avoid
203 // threads that where created mid critical section
204 // to race to lock their local locks and have the writer
205 // immidiately unlock them
206 // Alternative solution : return s in write_lock and pass it to write_unlock
207 for(uint_fast32_t i = 0; i < last_s; i++) {
208 verify(data[i].lock);
209 __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
210 }
211
212 // Step 2 : release global lock
213 /*paranoid*/ assert(true == lock);
214 __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
215}
216
217//=======================================================================
218// Intrusive Queue used by ready queue
219//=======================================================================
220// Get the head pointer (one before the first element) from the anchor
221static inline thread_desc * head(const __intrusive_ready_queue_t & this) {
222 thread_desc * rhead = (thread_desc *)(
223 (uintptr_t)( &this.before ) - offsetof( thread_desc, link )
224 );
225 /* paranoid */ verify(rhead);
226 return rhead;
227}
228
229// Get the tail pointer (one after the last element) from the anchor
230static inline thread_desc * tail(const __intrusive_ready_queue_t & this) {
231 thread_desc * rtail = (thread_desc *)(
232 (uintptr_t)( &this.after ) - offsetof( thread_desc, link )
233 );
234 /* paranoid */ verify(rtail);
235 return rtail;
236}
237
238// Ctor
239void ?{}( __intrusive_ready_queue_t & this ) {
240 this.lock = false;
241 this.last_id = -1u;
242 this.count = 0u;
243
244 this.before.link.prev = 0p;
245 this.before.link.next = tail(this);
246 this.before.link.ts = 0;
247
248 this.after .link.prev = head(this);
249 this.after .link.next = 0p;
250 this.after .link.ts = 0;
251
252 #if !defined(__CFA_NO_SCHED_STATS__)
253 this.stat.diff = 0;
254 this.stat.push = 0;
255 this.stat.pop = 0;
256 #endif
257
258 // We add a boat-load of assertions here because the anchor code is very fragile
259 /* paranoid */ verify(((uintptr_t)( head(this) ) + offsetof( thread_desc, link )) == (uintptr_t)(&this.before));
260 /* paranoid */ verify(((uintptr_t)( tail(this) ) + offsetof( thread_desc, link )) == (uintptr_t)(&this.after ));
261 /* paranoid */ verify(head(this)->link.prev == 0p );
262 /* paranoid */ verify(head(this)->link.next == tail(this) );
263 /* paranoid */ verify(tail(this)->link.next == 0p );
264 /* paranoid */ verify(tail(this)->link.prev == head(this) );
265 /* paranoid */ verify(&head(this)->link.prev == &this.before.link.prev );
266 /* paranoid */ verify(&head(this)->link.next == &this.before.link.next );
267 /* paranoid */ verify(&tail(this)->link.prev == &this.after .link.prev );
268 /* paranoid */ verify(&tail(this)->link.next == &this.after .link.next );
269 /* paranoid */ verify(sizeof(__intrusive_ready_queue_t) == 128);
270 /* paranoid */ verify(sizeof(this) == 128);
271 /* paranoid */ verify(__alignof__(__intrusive_ready_queue_t) == 128);
272 /* paranoid */ verify(__alignof__(this) == 128);
273 /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128));
274
275 /* paranoid */ verifyf(readyQ_mask_shit_length() == 6 , "%zu", readyQ_mask_shit_length());
276 /* paranoid */ verifyf(readyQ_mask_full() == 63, "%zu", readyQ_mask_full());
277}
278
279// Dtor is trivial
280void ^?{}( __intrusive_ready_queue_t & this ) {
281 // Make sure the list is empty
282 /* paranoid */ verify(head(this)->link.prev == 0p );
283 /* paranoid */ verify(head(this)->link.next == tail(this) );
284 /* paranoid */ verify(tail(this)->link.next == 0p );
285 /* paranoid */ verify(tail(this)->link.prev == head(this) );
286 /* paranoid */ verify(this.count == 0u );
287}
288
289
290
291bool push(__intrusive_ready_queue_t & this, thread_desc * node) {
292 verify(this.lock);
293 verify(node->link.ts != 0);
294 verify(node->link.next == 0p);
295 verify(node->link.prev == 0p);
296
297 this.count++;
298
299 if(this.before.link.ts == 0l) {
300 verify(tail(this)->link.next == 0p);
301 verify(tail(this)->link.prev == head(this));
302 verify(head(this)->link.next == tail(this));
303 verify(head(this)->link.prev == 0p);
304 }
305
306 // Get the relevant nodes locally
307 thread_desc * tail = tail(this);
308 thread_desc * prev = tail->link.prev;
309
310 // Do the push
311 node->link.next = tail;
312 node->link.prev = prev;
313 prev->link.next = node;
314 tail->link.prev = node;
315
316 // Update stats
317 #ifndef __CFA_NO_SCHED_STATS__
318 this.stat.diff++;
319 this.stat.push++;
320 #endif
321
322 verify(node->link.next == tail(this));
323
324 // Check if the queue used to be empty
325 if(this.before.link.ts == 0l) {
326 this.before.link.ts = node->link.ts;
327 verify(node->link.prev == head(this));
328 return true;
329 }
330 return false;
331}
332
333[thread_desc *, bool] pop(__intrusive_ready_queue_t & this) {
334 verify(this.lock);
335 verify(this.before.link.ts != 0ul);
336 thread_desc * head = head(this);
337 thread_desc * tail = tail(this);
338
339 thread_desc * node = head->link.next;
340 thread_desc * next = node->link.next;
341 if(node == tail) {
342 verify(false);
343 verify(this.before.link.ts == 0ul);
344 verify(tail(this)->link.next == 0p);
345 verify(tail(this)->link.prev == head(this));
346 verify(head(this)->link.next == tail(this));
347 verify(head(this)->link.prev == 0p);
348 return [0p, false];
349 }
350
351 this.count--;
352 /* paranoid */ verify(node);
353
354 head->link.next = next;
355 next->link.prev = head;
356
357 #ifndef __CFA_NO_SCHED_STATS__
358 this.stat.diff--;
359 this.stat.pop ++;
360 #endif
361
362 if(next == tail) {
363 this.before.link.ts = 0ul;
364 verify(tail(this)->link.next == 0p);
365 verify(tail(this)->link.prev == head(this));
366 verify(head(this)->link.next == tail(this));
367 verify(head(this)->link.prev == 0p);
368 node->link.[next, prev] = 0p;
369 return [node, true];
370 }
371 else {
372 verify(next->link.ts != 0);
373 this.before.link.ts = next->link.ts;
374 verify(this.before.link.ts != 0);
375 node->link.[next, prev] = 0p;
376 return [node, false];
377 }
378}
379
380static inline unsigned long long ts(__intrusive_ready_queue_t & this) {
381 return this.before.link.ts;
382}
383
384//=======================================================================
385// Cforall Reqdy Queue used by ready queue
386//=======================================================================
387
388static __attribute__((aligned(128))) thread_local struct {
389 struct {
390 struct {
391 size_t attempt;
392 size_t success;
393 } push;
394 struct {
395 size_t maskrds;
396 size_t attempt;
397 size_t success;
398 } pop;
399 } pick;
400 struct {
401 size_t value;
402 size_t count;
403 } full;
404} tls = {
405 /* pick */{
406 /* push */{ 0, 0 },
407 /* pop */{ 0, 0, 0 },
408 },
409 /* full */{ 0, 0 }
410};
411
412//-----------------------------------------------------------------------
413
414void ?{}(__ready_queue_t & this) with (this) {
415 empty.count = 0;
416 for( i ; __cfa_readyQ_mask_size ) {
417 empty.mask[i] = 0;
418 }
419
420 list.data = alloc(4);
421 for( i; 4 ) {
422 (list.data[i]){};
423 }
424 list.count = 4;
425
426 #if !defined(__CFA_NO_STATISTICS__)
427 global_stats.pick.push.attempt = 0;
428 global_stats.pick.push.success = 0;
429 global_stats.pick.pop .maskrds = 0;
430 global_stats.pick.pop .attempt = 0;
431 global_stats.pick.pop .success = 0;
432
433 global_stats.full.value = 0;
434 global_stats.full.count = 0;
435 #endif
436}
437
438void ^?{}(__ready_queue_t & this) with (this) {
439 verify( 4 == list .count );
440 verify( 0 == empty.count );
441
442 for( i; 4 ) {
443 ^(list.data[i]){};
444 }
445 free(list.data);
446
447
448 #if defined(__CFA_WITH_VERIFY__)
449 for( i ; __cfa_readyQ_mask_size ) {
450 assert( 0 == empty.mask[i] );
451 }
452 #endif
453}
454
455//-----------------------------------------------------------------------
456
457__attribute__((hot)) bool push(struct cluster * cltr, struct thread_desc * thrd) with (cltr->ready_queue) {
458 thrd->link.ts = rdtscl();
459
460 while(true) {
461 // Pick a random list
462 unsigned i = tls_rand() % list.count;
463
464 #if !defined(__CFA_NO_STATISTICS__)
465 tls.pick.push.attempt++;
466 #endif
467
468 // If we can't lock it retry
469 if( !__atomic_try_acquire( &list.data[i].lock ) ) continue;
470 verify(list.data[i].last_id == -1u);
471 list.data[i].last_id = kernelTLS.this_processor->id;
472
473 __attribute__((unused)) size_t num = __atomic_load_n( &empty.count, __ATOMIC_RELAXED );
474 bool first = false;
475
476 verify( list.data[i].last_id == kernelTLS.this_processor->id );
477 verify( list.data[i].lock );
478 // Actually push it
479 if(push(list.data[i], thrd)) {
480 size_t ret = __atomic_fetch_add( &empty.count, 1z, __ATOMIC_SEQ_CST);
481 first = (ret == 0);
482
483 __cfa_readyQ_mask_t word;
484 __cfa_readyQ_mask_t bit;
485 [bit, word] = extract(i);
486 verifyf((empty.mask[word] & (1ull << bit)) == 0, "Before set %llu:%llu (%u), %llx & %llx", word, bit, i, empty.mask[word], (1ull << bit));
487 __attribute__((unused)) bool ret = bts(&empty.mask[word], bit);
488 verify(!(bool)ret);
489 verifyf((empty.mask[word] & (1ull << bit)) != 0, "After set %llu:%llu (%u), %llx & %llx", word, bit, i, empty.mask[word], (1ull << bit));
490 }
491 verifyf( empty.count <= list.count, "Non-empty count (%zu) exceeds actual count (%zu)\n", empty.count, list.count );
492 verifyf( list.data[i].last_id == kernelTLS.this_processor->id, "Expected last processor to lock queue %u to be %u, was %u\n", i, list.data[i].last_id, kernelTLS.this_processor->id );
493 verifyf( list.data[i].lock, "List %u is not locked\n", i );
494
495 // Unlock and return
496 list.data[i].last_id = -1u;
497 __atomic_unlock( &list.data[i].lock );
498
499 #if !defined(__CFA_NO_STATISTICS__)
500 tls.pick.push.success++;
501 tls.full.value += num;
502 tls.full.count += 1;
503 #endif
504 return first;
505 }
506}
507
508//-----------------------------------------------------------------------
509
510static struct thread_desc * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
511 #if !defined(__CFA_NO_STATISTICS__)
512 tls.pick.pop.attempt++;
513 #endif
514
515 // Pick the bet list
516 int w = i;
517 if( __builtin_expect(ts(list.data[j]) != 0, true) ) {
518 w = (ts(list.data[i]) < ts(list.data[j])) ? i : j;
519 }
520
521 __intrusive_ready_queue_t & list = list.data[w];
522 // If list looks empty retry
523 if( ts(list) == 0 ) return 0p;
524
525 // If we can't get the lock retry
526 if( !__atomic_try_acquire(&list.lock) ) return 0p;
527 verify(list.last_id == -1u);
528 list.last_id = kernelTLS.this_processor->id;
529
530 verify(list.last_id == kernelTLS.this_processor->id);
531
532 __attribute__((unused)) int num = __atomic_load_n( &empty.count, __ATOMIC_RELAXED );
533
534
535 // If list is empty, unlock and retry
536 if( ts(list) == 0 ) {
537 list.last_id = -1u;
538 __atomic_unlock(&list.lock);
539 return 0p;
540 }
541 {
542 __cfa_readyQ_mask_t word;
543 __cfa_readyQ_mask_t bit;
544 [bit, word] = extract(w);
545 verify((empty.mask[word] & (1ull << bit)) != 0);
546 }
547
548 verify(list.last_id == kernelTLS.this_processor->id);
549 verify(list.lock);
550
551 // Actually pop the list
552 struct thread_desc * thrd;
553 bool emptied;
554 [thrd, emptied] = pop(list);
555 verify(thrd);
556
557 verify(list.last_id == kernelTLS.this_processor->id);
558 verify(list.lock);
559
560 if(emptied) {
561 __atomic_fetch_sub( &empty.count, 1z, __ATOMIC_SEQ_CST);
562
563 __cfa_readyQ_mask_t word;
564 __cfa_readyQ_mask_t bit;
565 [bit, word] = extract(w);
566 verify((empty.mask[word] & (1ull << bit)) != 0);
567 __attribute__((unused)) bool ret = btr(&empty.mask[word], bit);
568 verify(ret);
569 verify((empty.mask[word] & (1ull << bit)) == 0);
570 }
571
572 verify(list.lock);
573
574 // Unlock and return
575 list.last_id = -1u;
576 __atomic_unlock(&list.lock);
577 verify(empty.count >= 0);
578
579 #if !defined(__CFA_NO_STATISTICS__)
580 tls.pick.pop.success++;
581 tls.full.value += num;
582 tls.full.count += 1;
583 #endif
584
585 return thrd;
586}
587
588__attribute__((hot)) thread_desc * pop(struct cluster * cltr) with (cltr->ready_queue) {
589 verify( list.count > 0 );
590 while( __atomic_load_n( &empty.count, __ATOMIC_RELAXED ) != 0) {
591 #if !defined(__CFA_READQ_NO_BITMASK__)
592 tls.pick.pop.maskrds++;
593 unsigned i, j;
594 {
595 #if !defined(__CFA_NO_SCHED_STATS__)
596 tls.pick.pop.maskrds++;
597 #endif
598
599 // Pick two lists at random
600 unsigned num = ((__atomic_load_n( &list.count, __ATOMIC_RELAXED ) - 1) >> 6) + 1;
601
602 unsigned ri = tls_rand();
603 unsigned rj = tls_rand();
604
605 unsigned wdxi = (ri >> 6u) % num;
606 unsigned wdxj = (rj >> 6u) % num;
607
608 size_t maski = __atomic_load_n( &empty.mask[wdxi], __ATOMIC_RELAXED );
609 size_t maskj = __atomic_load_n( &empty.mask[wdxj], __ATOMIC_RELAXED );
610
611 if(maski == 0 && maskj == 0) continue;
612
613 unsigned bi = rand_bit(ri, maski);
614 unsigned bj = rand_bit(rj, maskj);
615
616 verifyf(bi < 64, "%zu %u", maski, bi);
617 verifyf(bj < 64, "%zu %u", maskj, bj);
618
619 i = bi | (wdxi << 6);
620 j = bj | (wdxj << 6);
621
622 verifyf(i < list.count, "%u", wdxi << 6);
623 verifyf(j < list.count, "%u", wdxj << 6);
624 }
625
626 struct thread_desc * thrd = try_pop(cltr, i, j);
627 if(thrd) return thrd;
628 #else
629 // Pick two lists at random
630 int i = tls_rand() % __atomic_load_n( &list.count, __ATOMIC_RELAXED );
631 int j = tls_rand() % __atomic_load_n( &list.count, __ATOMIC_RELAXED );
632
633 struct thread_desc * thrd = try_pop(cltr, i, j);
634 if(thrd) return thrd;
635 #endif
636 }
637
638 return 0p;
639}
640
641//-----------------------------------------------------------------------
642
643static void check( __ready_queue_t & q ) with (q) {
644 #if defined(__CFA_WITH_VERIFY__)
645 {
646 int idx = 0;
647 for( w ; __cfa_readyQ_mask_size ) {
648 for( b ; 8 * sizeof(__cfa_readyQ_mask_t) ) {
649 bool is_empty = idx < list.count ? (ts(list.data[idx]) == 0) : true;
650 bool should_be_empty = 0 == (empty.mask[w] & (1z << b));
651 assertf(should_be_empty == is_empty, "Inconsistent list %d, mask expect : %d, actual is got %d", idx, should_be_empty, (bool)is_empty);
652 assert(__cfa_max_readyQs > idx);
653 idx++;
654 }
655 }
656 }
657
658 {
659 for( idx ; list.count ) {
660 __intrusive_ready_queue_t & sl = list.data[idx];
661 assert(!list.data[idx].lock);
662
663 assert(head(sl)->link.prev == 0p );
664 assert(head(sl)->link.next->link.prev == head(sl) );
665 assert(tail(sl)->link.next == 0p );
666 assert(tail(sl)->link.prev->link.next == tail(sl) );
667
668 if(sl.before.link.ts == 0l) {
669 assert(tail(sl)->link.next == 0p);
670 assert(tail(sl)->link.prev == head(sl));
671 assert(head(sl)->link.next == tail(sl));
672 assert(head(sl)->link.prev == 0p);
673 }
674 }
675 }
676 #endif
677}
678
679// Call this function of the intrusive list was moved using memcpy
680// fixes the list so that the pointers back to anchors aren't left
681// dangling
682static inline void fix(__intrusive_ready_queue_t & ll) {
683 // if the list is not empty then follow he pointer
684 // and fix its reverse
685 if(ll.before.link.ts != 0l) {
686 head(ll)->link.next->link.prev = head(ll);
687 tail(ll)->link.prev->link.next = tail(ll);
688 }
689 // Otherwise just reset the list
690 else {
691 tail(ll)->link.next = 0p;
692 tail(ll)->link.prev = head(ll);
693 head(ll)->link.next = tail(ll);
694 head(ll)->link.prev = 0p;
695 }
696}
697
698void ready_queue_grow (struct cluster * cltr) {
699 uint_fast32_t last_size = ready_mutate_lock( *cltr );
700 __cfaabi_dbg_print_safe("Kernel : Growing ready queue\n");
701 check( cltr->ready_queue );
702
703 with( cltr->ready_queue ) {
704 size_t ncount = list.count;
705
706 // Check that we have some space left
707 if(ncount + 4 >= __cfa_max_readyQs) abort("Program attempted to create more than maximum number of Ready Queues (%zu)", __cfa_max_readyQs);
708
709 ncount += 4;
710
711 // Allocate new array
712 list.data = alloc(list.data, ncount);
713
714 // Fix the moved data
715 for( idx; (size_t)list.count ) {
716 fix(list.data[idx]);
717 }
718
719 // Construct new data
720 for( idx; (size_t)list.count ~ ncount) {
721 (list.data[idx]){};
722 }
723
724 // Update original
725 list.count = ncount;
726 // fields in empty don't need to change
727 }
728
729 // Make sure that everything is consistent
730 check( cltr->ready_queue );
731 __cfaabi_dbg_print_safe("Kernel : Growing ready queue done\n");
732 ready_mutate_unlock( *cltr, last_size );
733}
734
735void ready_queue_shrink(struct cluster * cltr) {
736 uint_fast32_t last_size = ready_mutate_lock( *cltr );
737 __cfaabi_dbg_print_safe("Kernel : Shrinking ready queue\n");
738 with( cltr->ready_queue ) {
739 #if defined(__CFA_WITH_VERIFY__)
740 size_t nthreads = 0;
741 for( idx; (size_t)list.count ) {
742 nthreads += list.data[idx].count;
743 }
744 #endif
745
746 size_t ocount = list.count;
747 // Check that we have some space left
748 if(ocount < 8) abort("Program attempted to destroy more Ready Queues than were created");
749
750 list.count -= 4;
751
752 // redistribute old data
753 verify(ocount > list.count);
754 __attribute__((unused)) size_t displaced = 0;
755 for( idx; (size_t)list.count ~ ocount) {
756 // This is not strictly needed but makes checking invariants much easier
757 bool locked = __atomic_try_acquire(&list.data[idx].lock);
758 verify(locked);
759 while(0 != ts(list.data[idx])) {
760 struct thread_desc * thrd;
761 __attribute__((unused)) bool _;
762 [thrd, _] = pop(list.data[idx]);
763 verify(thrd);
764 push(cltr, thrd);
765 displaced++;
766 }
767
768 __atomic_unlock(&list.data[idx].lock);
769
770 // TODO print the queue statistics here
771
772 ^(list.data[idx]){};
773 }
774
775 __cfaabi_dbg_print_safe("Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
776
777 // clear the now unused masks
778 {
779 __cfa_readyQ_mask_t fword, fbit, lword, lbit;
780 [fbit, fword] = extract(ocount);
781 [lbit, lword] = extract(list.count);
782
783 // For now assume that all queues where coverd by the same bitmask
784 // This is highly probable as long as grow and shrink use groups of 4
785 // exclusively
786 verify(fword == lword);
787 __cfa_readyQ_mask_t clears = ~0;
788
789 for( b ; lbit ~ fbit ) {
790 clears ^= 1 << b;
791 }
792
793 empty.mask[fword] &= clears;
794
795
796 empty.count = 0;
797 for( i ; 0 ~= lword ) {
798 empty.count += __builtin_popcountl(empty.mask[i]);
799 }
800 }
801
802 // Allocate new array
803 list.data = alloc(list.data, list.count);
804
805 // Fix the moved data
806 for( idx; (size_t)list.count ) {
807 fix(list.data[idx]);
808 }
809
810 #if defined(__CFA_WITH_VERIFY__)
811 for( idx; (size_t)list.count ) {
812 nthreads -= list.data[idx].count;
813 }
814 assertf(nthreads == 0, "Shrinking changed number of threads");
815 #endif
816 }
817
818 // Make sure that everything is consistent
819 check( cltr->ready_queue );
820 __cfaabi_dbg_print_safe("Kernel : Shrinking ready queue done\n");
821 ready_mutate_unlock( *cltr, last_size );
822}
823
824//-----------------------------------------------------------------------
825
826#if !defined(__CFA_NO_STATISTICS__)
827void stats_tls_tally(struct cluster * cltr) with (cltr->ready_queue) {
828 __atomic_fetch_add( &global_stats.pick.push.attempt, tls.pick.push.attempt, __ATOMIC_SEQ_CST );
829 __atomic_fetch_add( &global_stats.pick.push.success, tls.pick.push.success, __ATOMIC_SEQ_CST );
830 __atomic_fetch_add( &global_stats.pick.pop .maskrds, tls.pick.pop .maskrds, __ATOMIC_SEQ_CST );
831 __atomic_fetch_add( &global_stats.pick.pop .attempt, tls.pick.pop .attempt, __ATOMIC_SEQ_CST );
832 __atomic_fetch_add( &global_stats.pick.pop .success, tls.pick.pop .success, __ATOMIC_SEQ_CST );
833
834 __atomic_fetch_add( &global_stats.full.value, tls.full.value, __ATOMIC_SEQ_CST );
835 __atomic_fetch_add( &global_stats.full.count, tls.full.count, __ATOMIC_SEQ_CST );
836}
837#endif
Note: See TracBrowser for help on using the repository browser.