source: libcfa/src/concurrency/ready_queue.cfa@ 4fa44e7

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since 4fa44e7 was 504a7dc, checked in by Thierry Delisle <tdelisle@…>, 6 years ago

Some fixes after the merge, compiles but still has livelocks

  • Property mode set to 100644
File size: 30.7 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
17#define __CFA_DEBUG_PRINT_READY_QUEUE__
18
19#include "bits/defs.hfa"
20#include "kernel_private.hfa"
21
22#define _GNU_SOURCE
23#include "stdlib.hfa"
24
25static const size_t cache_line_size = 64;
26
27// No overriden function, no environment variable, no define
28// fall back to a magic number
29#ifndef __CFA_MAX_PROCESSORS__
30 #define __CFA_MAX_PROCESSORS__ 128
31#endif
32
33// returns the maximum number of processors the RWLock support
34__attribute__((weak)) unsigned __max_processors() {
35 const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
36 if(!max_cores_s) {
37 __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
38 return __CFA_MAX_PROCESSORS__;
39 }
40
41 char * endptr = 0p;
42 long int max_cores_l = strtol(max_cores_s, &endptr, 10);
43 if(max_cores_l < 1 || max_cores_l > 65535) {
44 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
45 return __CFA_MAX_PROCESSORS__;
46 }
47 if('\0' != *endptr) {
48 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
49 return __CFA_MAX_PROCESSORS__;
50 }
51
52 return max_cores_l;
53}
54
55// Picks a random 1 bit in 'mask' according to random number 'rnum'.
56static inline unsigned rand_bit(unsigned rnum, __cfa_readyQ_mask_t mask) {
57#if defined( __i386 )
58 static_assert(sizeof(mask) == 4);
59 unsigned bit = mask ? rnum % __builtin_popcount(mask) : 0;
60 #if !defined(__BMI2__)
61 #error rand_bit not implemented for non __BMI2__ i386
62 #else
63 uint32_t picked = _pdep_u32(1ul << bit, mask);
64 return picked ? __builtin_ctz(picked) : 0;
65 #endif
66#elif defined( __x86_64 )
67 static_assert(sizeof(mask) == 8);
68 unsigned bit = mask ? rnum % __builtin_popcountl(mask) : 0;
69 #if !defined(__BMI2__)
70 uint64_t v = mask; // Input value to find position with rank r.
71 unsigned int r = bit + 1;// Input: bit's desired rank [1-64].
72 unsigned int s; // Output: Resulting position of bit with rank r [1-64]
73 uint64_t a, b, c, d; // Intermediate temporaries for bit count.
74 unsigned int t; // Bit count temporary.
75
76 // Do a normal parallel bit count for a 64-bit integer,
77 // but store all intermediate steps.
78 a = v - ((v >> 1) & ~0UL/3);
79 b = (a & ~0UL/5) + ((a >> 2) & ~0UL/5);
80 c = (b + (b >> 4)) & ~0UL/0x11;
81 d = (c + (c >> 8)) & ~0UL/0x101;
82
83
84 t = (d >> 32) + (d >> 48);
85 // Now do branchless select!
86 s = 64;
87 s -= ((t - r) & 256) >> 3; r -= (t & ((t - r) >> 8));
88 t = (d >> (s - 16)) & 0xff;
89 s -= ((t - r) & 256) >> 4; r -= (t & ((t - r) >> 8));
90 t = (c >> (s - 8)) & 0xf;
91 s -= ((t - r) & 256) >> 5; r -= (t & ((t - r) >> 8));
92 t = (b >> (s - 4)) & 0x7;
93 s -= ((t - r) & 256) >> 6; r -= (t & ((t - r) >> 8));
94 t = (a >> (s - 2)) & 0x3;
95 s -= ((t - r) & 256) >> 7; r -= (t & ((t - r) >> 8));
96 t = (v >> (s - 1)) & 0x1;
97 s -= ((t - r) & 256) >> 8;
98 return s - 1;
99 #else
100 uint64_t picked = _pdep_u64(1ul << bit, mask);
101 return picked ? __builtin_ctzl(picked) : 0;
102 #endif
103#elif defined( __ARM_ARCH )
104 #error rand_bit not implemented for arm
105#else
106 #error uknown hardware architecture
107#endif
108}
109
110
111//-----------------------------------------------------------------------------
112// Helpers used by extract
113// (_mask_bitsidx() & X) returns a bit index valid for a __cfa_readyQ_mask_t, where X is any integer
114static inline __cfa_readyQ_mask_t _mask_bitsidx () { return (8 * sizeof(__cfa_readyQ_mask_t)) - 1; }
115
116// (X >> _mask_shiftidx()) retuns an index into an array of __cfa_readyQ_mask_t
117static inline __cfa_readyQ_mask_t _mask_shiftidx() { return (8 * sizeof(__cfa_readyQ_mask_t)) - __builtin_clzl(_mask_bitsidx()); }
118
119
120// Assuming a large bit mask represented as an array of __cfa_readyQ_mask_t
121// Given an index into the large mask, returns the bit index and which __cfa_readyQ_mask_t index in the array
122static inline [__cfa_readyQ_mask_t, __cfa_readyQ_mask_t] extract(__cfa_readyQ_mask_t idx) {
123 __cfa_readyQ_mask_t word = idx >> _mask_bitsidx();
124 __cfa_readyQ_mask_t bit = idx & _mask_shiftidx();
125 return [bit, word];
126}
127
128//=======================================================================
129// Cluster wide reader-writer lock
130//=======================================================================
131void ?{}(__clusterRWLock_t & this) {
132 this.max = __max_processors();
133 this.alloc = 0;
134 this.ready = 0;
135 this.lock = false;
136 this.data = alloc(this.max);
137
138 /*paranoid*/ verify( 0 == (((uintptr_t)(this.data )) % 64) );
139 /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
140 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
141 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
142
143}
144void ^?{}(__clusterRWLock_t & this) {
145 free(this.data);
146}
147
148void ?{}( __processor_id & this, struct processor * proc ) {
149 this.handle = proc;
150 this.lock = false;
151}
152
153//=======================================================================
154// Lock-Free registering/unregistering of threads
155unsigned doregister2( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
156 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p with cluster %p\n", proc, cltr);
157
158 // Step - 1 : check if there is already space in the data
159 uint_fast32_t s = ready;
160
161 // Check among all the ready
162 for(uint_fast32_t i = 0; i < s; i++) {
163 processor * null = 0p; // Re-write every loop since compare thrashes it
164 if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
165 && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
166 /*paranoid*/ verify(i < ready);
167 /*paranoid*/ verify(__alignof__(data[i]) == cache_line_size);
168 /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
169 return i;
170 }
171 }
172
173 if(max <= alloc) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
174
175 // Step - 2 : F&A to get a new spot in the array.
176 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
177 if(max <= n) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
178
179 // Step - 3 : Mark space as used and then publish it.
180 __processor_id * storage = (__processor_id *)&data[n];
181 (*storage){ proc };
182 while(true) {
183 unsigned copy = n;
184 if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
185 && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
186 break;
187 asm volatile("pause");
188 }
189
190 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %u\n", proc, n);
191
192 // Return new spot.
193 /*paranoid*/ verify(n < ready);
194 /*paranoid*/ verify(__alignof__(data[n]) == cache_line_size);
195 /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
196 return n;
197}
198
199void unregister2( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
200 unsigned id = proc->id;
201 /*paranoid*/ verify(id < ready);
202 /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
203 __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
204
205 __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
206}
207
208//-----------------------------------------------------------------------
209// Writer side : acquire when changing the ready queue, e.g. adding more
210// queues or removing them.
211uint_fast32_t ready_mutate_lock( struct cluster & cltr ) with(cltr.ready_lock) {
212 // Step 1 : lock global lock
213 // It is needed to avoid processors that register mid Critical-Section
214 // to simply lock their own lock and enter.
215 __atomic_acquire( &lock );
216
217 // Step 2 : lock per-proc lock
218 // Processors that are currently being registered aren't counted
219 // but can't be in read_lock or in the critical section.
220 // All other processors are counted
221 uint_fast32_t s = ready;
222 for(uint_fast32_t i = 0; i < s; i++) {
223 __atomic_acquire( &data[i].lock );
224 }
225
226 return s;
227}
228
229void ready_mutate_unlock( struct cluster & cltr, uint_fast32_t last_s ) with(cltr.ready_lock) {
230 // Step 1 : release local locks
231 // This must be done while the global lock is held to avoid
232 // threads that where created mid critical section
233 // to race to lock their local locks and have the writer
234 // immidiately unlock them
235 // Alternative solution : return s in write_lock and pass it to write_unlock
236 for(uint_fast32_t i = 0; i < last_s; i++) {
237 verify(data[i].lock);
238 __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
239 }
240
241 // Step 2 : release global lock
242 /*paranoid*/ assert(true == lock);
243 __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
244}
245
246//=======================================================================
247// Intrusive Queue used by ready queue
248//=======================================================================
249// Get the head pointer (one before the first element) from the anchor
250static inline $thread * head(const __intrusive_lane_t & this) {
251 $thread * rhead = ($thread *)(
252 (uintptr_t)( &this.before ) - offsetof( $thread, link )
253 );
254 /* paranoid */ verify(rhead);
255 return rhead;
256}
257
258// Get the tail pointer (one after the last element) from the anchor
259static inline $thread * tail(const __intrusive_lane_t & this) {
260 $thread * rtail = ($thread *)(
261 (uintptr_t)( &this.after ) - offsetof( $thread, link )
262 );
263 /* paranoid */ verify(rtail);
264 return rtail;
265}
266
267// Ctor
268void ?{}( __intrusive_lane_t & this ) {
269 this.lock = false;
270 #if defined(__CFA_WITH_VERIFY__)
271 this.last_id = -1u;
272 this.count = 0u;
273 #endif
274
275 this.before.link.prev = 0p;
276 this.before.link.next = tail(this);
277 this.before.link.ts = 0;
278
279 this.after .link.prev = head(this);
280 this.after .link.next = 0p;
281 this.after .link.ts = 0;
282
283 #if !defined(__CFA_NO_SCHED_STATS__)
284 this.stat.diff = 0;
285 this.stat.push = 0;
286 this.stat.pop = 0;
287 #endif
288
289 // We add a boat-load of assertions here because the anchor code is very fragile
290 /* paranoid */ verify(((uintptr_t)( head(this) ) + offsetof( $thread, link )) == (uintptr_t)(&this.before));
291 /* paranoid */ verify(((uintptr_t)( tail(this) ) + offsetof( $thread, link )) == (uintptr_t)(&this.after ));
292 /* paranoid */ verify(head(this)->link.prev == 0p );
293 /* paranoid */ verify(head(this)->link.next == tail(this) );
294 /* paranoid */ verify(tail(this)->link.next == 0p );
295 /* paranoid */ verify(tail(this)->link.prev == head(this) );
296 /* paranoid */ verify(&head(this)->link.prev == &this.before.link.prev );
297 /* paranoid */ verify(&head(this)->link.next == &this.before.link.next );
298 /* paranoid */ verify(&tail(this)->link.prev == &this.after .link.prev );
299 /* paranoid */ verify(&tail(this)->link.next == &this.after .link.next );
300 /* paranoid */ verify(sizeof(__intrusive_lane_t) == 128);
301 /* paranoid */ verify(sizeof(this) == 128);
302 /* paranoid */ verify(__alignof__(__intrusive_lane_t) == 128);
303 /* paranoid */ verify(__alignof__(this) == 128);
304 /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128));
305
306 /* paranoid */ verifyf(_mask_shiftidx() == 6 , "%zu", _mask_shiftidx());
307 /* paranoid */ verifyf(_mask_bitsidx () == 63, "%zu", _mask_bitsidx());
308}
309
310// Dtor is trivial
311void ^?{}( __intrusive_lane_t & this ) {
312 // Make sure the list is empty
313 /* paranoid */ verify(head(this)->link.prev == 0p );
314 /* paranoid */ verify(head(this)->link.next == tail(this) );
315 /* paranoid */ verify(tail(this)->link.next == 0p );
316 /* paranoid */ verify(tail(this)->link.prev == head(this) );
317 /* paranoid */ verify(this.count == 0u );
318}
319
320// Push a thread onto this lane
321// returns true of lane was empty before push, false otherwise
322bool push(__intrusive_lane_t & this, $thread * node) {
323 #if defined(__CFA_WITH_VERIFY__)
324 /* paranoid */ verify(this.lock);
325 /* paranoid */ verify(node->link.ts != 0);
326 /* paranoid */ verify(node->link.next == 0p);
327 /* paranoid */ verify(node->link.prev == 0p);
328 /* paranoid */ verify(tail(this)->link.next == 0p);
329 /* paranoid */ verify(head(this)->link.prev == 0p);
330
331 this.count++;
332
333 if(this.before.link.ts == 0l) {
334 /* paranoid */ verify(tail(this)->link.prev == head(this));
335 /* paranoid */ verify(head(this)->link.next == tail(this));
336 } else {
337 /* paranoid */ verify(tail(this)->link.prev != head(this));
338 /* paranoid */ verify(head(this)->link.next != tail(this));
339 }
340 #endif
341
342 // Get the relevant nodes locally
343 $thread * tail = tail(this);
344 $thread * prev = tail->link.prev;
345
346 // Do the push
347 node->link.next = tail;
348 node->link.prev = prev;
349 prev->link.next = node;
350 tail->link.prev = node;
351
352 // Update stats
353 #if !defined(__CFA_NO_SCHED_STATS__)
354 this.stat.diff++;
355 this.stat.push++;
356 #endif
357
358 verify(node->link.next == tail(this));
359
360 // Check if the queue used to be empty
361 if(this.before.link.ts == 0l) {
362 this.before.link.ts = node->link.ts;
363 /* paranoid */ verify(node->link.prev == head(this));
364 return true;
365 }
366 return false;
367}
368
369// Pop a thread from this lane (must be non-empty)
370// returns popped
371// returns true of lane was empty before push, false otherwise
372[$thread *, bool] pop(__intrusive_lane_t & this) {
373 /* paranoid */ verify(this.lock);
374 /* paranoid */ verify(this.before.link.ts != 0ul);
375
376 // Get anchors locally
377 $thread * head = head(this);
378 $thread * tail = tail(this);
379
380 // Get the relevant nodes locally
381 $thread * node = head->link.next;
382 $thread * next = node->link.next;
383
384 #if defined(__CFA_WITH_VERIFY__)
385 this.count--;
386 /* paranoid */ verify(node != tail);
387 /* paranoid */ verify(node);
388 #endif
389
390 // Do the pop
391 head->link.next = next;
392 next->link.prev = head;
393 node->link.[next, prev] = 0p;
394
395 // Update head time stamp
396 this.before.link.ts = next->link.ts;
397
398 // Update stats
399 #ifndef __CFA_NO_SCHED_STATS__
400 this.stat.diff--;
401 this.stat.pop ++;
402 #endif
403
404 // Check if we emptied list and return accordingly
405 /* paranoid */ verify(tail(this)->link.next == 0p);
406 /* paranoid */ verify(head(this)->link.prev == 0p);
407 if(next == tail) {
408 /* paranoid */ verify(this.before.link.ts == 0);
409 /* paranoid */ verify(tail(this)->link.prev == head(this));
410 /* paranoid */ verify(head(this)->link.next == tail(this));
411 return [node, true];
412 }
413 else {
414 /* paranoid */ verify(next->link.ts != 0);
415 /* paranoid */ verify(tail(this)->link.prev != head(this));
416 /* paranoid */ verify(head(this)->link.next != tail(this));
417 /* paranoid */ verify(this.before.link.ts != 0);
418 return [node, false];
419 }
420}
421
422// Check whether or not list is empty
423static inline bool is_empty(__intrusive_lane_t & this) {
424 verify( (this.before.link.ts == 0) == (this.count == 0) );
425 return this.before.link.ts == 0;
426}
427
428// Return the timestamp
429static inline unsigned long long ts(__intrusive_lane_t & this) {
430 verify( this.before.link.ts == this.before.link.next->link.ts );
431 return this.before.link.ts;
432}
433
434//=======================================================================
435// Cforall Reqdy Queue used by ready queue
436//=======================================================================
437
438// Thread local mirror of ready queue statistics
439#if !defined(__CFA_NO_STATISTICS__)
440static __attribute__((aligned(128))) thread_local struct {
441 struct {
442 struct {
443 size_t attempt;
444 size_t success;
445 } push;
446 struct {
447 size_t maskrds;
448 size_t attempt;
449 size_t success;
450 } pop;
451 } pick;
452 struct {
453 size_t value;
454 size_t count;
455 } used;
456} tls = {
457 /* pick */{
458 /* push */{ 0, 0 },
459 /* pop */{ 0, 0, 0 },
460 },
461 /* used */{ 0, 0 }
462};
463#endif
464
465//-----------------------------------------------------------------------
466
467void ?{}(__ready_queue_t & this) with (this) {
468 used.count = 0;
469 for( i ; __cfa_lane_mask_size ) {
470 used.mask[i] = 0;
471 }
472
473 lanes.data = alloc(4);
474 for( i; 4 ) {
475 (lanes.data[i]){};
476 }
477 lanes.count = 4;
478
479 #if !defined(__CFA_NO_STATISTICS__)
480 global_stats.pick.push.attempt = 0;
481 global_stats.pick.push.success = 0;
482 global_stats.pick.pop .maskrds = 0;
483 global_stats.pick.pop .attempt = 0;
484 global_stats.pick.pop .success = 0;
485
486 global_stats.used.value = 0;
487 global_stats.used.count = 0;
488 #endif
489}
490
491void ^?{}(__ready_queue_t & this) with (this) {
492 verify( 4 == lanes.count );
493 verify( 0 == used .count );
494
495 for( i; 4 ) {
496 ^(lanes.data[i]){};
497 }
498 free(lanes.data);
499
500
501 #if defined(__CFA_WITH_VERIFY__)
502 for( i ; __cfa_lane_mask_size ) {
503 assert( 0 == used.mask[i] );
504 }
505 #endif
506}
507
508//-----------------------------------------------------------------------
509enum mask_strictness {
510 STRICT,
511 NOCHECK
512};
513
514// Set a given bit in the bit mask array
515// strictness determines of the bit had to be cleared before
516static inline void mask_set(__cfa_readyQ_mask_t * mask, unsigned index, mask_strictness strict) {
517 // Extract the array and bit indexes
518 __cfa_readyQ_mask_t word;
519 __cfa_readyQ_mask_t bit;
520 [bit, word] = extract(index);
521
522 // Conditional check
523 verifyf(
524 strict != STRICT || // Conditional check if it was expected to be cleared
525 ((mask[word] & (1ull << bit)) == 0),
526 "Before set %llu:%llu (%u), %llx & %llx", word, bit, index, mask[word], (1ull << bit)
527 );
528
529 // Atomically set the bit
530 __attribute__((unused)) bool ret = __atomic_bts(&mask[word], bit);
531
532 // Conditional check
533 verifyf(
534 strict != STRICT || // Conditional check if it was expected to be cleared
535 !ret,
536 "Bit was not set but bts returned true"
537 );
538
539 // Unconditional check
540 verifyf(
541 (mask[word] & (1ull << bit)) != 0,
542 "After set %llu:%llu (%u), %llx & %llx", word, bit, index, mask[word], (1ull << bit)
543 );
544}
545
546static inline void mask_clear(__cfa_readyQ_mask_t * mask, unsigned index, mask_strictness strict) {
547 // Extract the array and bit indexes
548 __cfa_readyQ_mask_t word;
549 __cfa_readyQ_mask_t bit;
550 [bit, word] = extract(index);
551
552 // Conditional check
553 verifyf(
554 strict == STRICT && // Conditional check if it was expected to be set
555 ((mask[word] & (1ull << bit)) != 0),
556 "Before clear %llu:%llu (%u), %llx & %llx", word, bit, index, mask[word], (1ull << bit)
557 );
558
559 // Atomically clear the bit
560 __attribute__((unused)) bool ret = __atomic_btr(&mask[word], bit);
561
562 // Conditional check
563 verifyf(
564 strict == STRICT && // Conditional check if it was expected to be cleared
565 ret,
566 "Bit was set but btr returned false"
567 );
568
569 // Unconditional check
570 verifyf(
571 (mask[word] & (1ull << bit)) == 0,
572 "After clear %llu:%llu (%u), %llx & %llx", word, bit, index, mask[word], (1ull << bit)
573 );
574}
575
576//-----------------------------------------------------------------------
577__attribute__((hot)) bool push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
578 // write timestamp
579 thrd->link.ts = rdtscl();
580
581 // Try to pick a lane and lock it
582 unsigned i;
583 do {
584 // Pick the index of a lane
585 i = __tls_rand() % lanes.count;
586
587 #if !defined(__CFA_NO_STATISTICS__)
588 tls.pick.push.attempt++;
589 #endif
590
591 // If we can't lock it retry
592 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
593
594 #if defined(__CFA_WITH_VERIFY__)
595 /* paranoid */ verify(lanes.data[i].last_id == -1u);
596 /* paranoid */ lanes.data[i].last_id = kernelTLS.this_processor->id;
597 #endif
598
599 __attribute__((unused)) size_t num = __atomic_load_n( &used.count, __ATOMIC_RELAXED );
600 bool first = false;
601
602 // Actually push it
603 bool lane_first = push(lanes.data[i], thrd);
604
605 // If this lane used to be empty we need to do more
606 if(lane_first) {
607 // Update the global count
608 size_t ret = __atomic_fetch_add( &used.count, 1z, __ATOMIC_SEQ_CST);
609
610 // Check if the entire queue used to be empty
611 first = (ret == 0);
612
613 // Update the bit mask
614 mask_set((__cfa_readyQ_mask_t *)used.mask, i, STRICT);
615 }
616
617 #if defined(__CFA_WITH_VERIFY__)
618 /* paranoid */ verifyf( used.count <= lanes.count, "Non-empty count (%zu) exceeds actual count (%zu)\n", used.count, lanes.count );
619 /* paranoid */ verifyf( lanes.data[i].last_id == kernelTLS.this_processor->id, "Expected last processor to lock queue %u to be %u, was %u\n", i, lanes.data[i].last_id, kernelTLS.this_processor->id );
620 /* paranoid */ verifyf( lanes.data[i].lock, "List %u is not locked\n", i );
621 /* paranoid */ lanes.data[i].last_id = -1u;
622 #endif
623
624 // Unlock and return
625 __atomic_unlock( &lanes.data[i].lock );
626
627 // Update statistics
628 #if !defined(__CFA_NO_STATISTICS__)
629 tls.pick.push.success++;
630 tls.used.value += num;
631 tls.used.count += 1;
632 #endif
633
634 // return whether or not the list was empty before this push
635 return first;
636}
637
638//-----------------------------------------------------------------------
639// Given 2 indexes, pick the list with the oldest push an try to pop from it
640static struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
641 #if !defined(__CFA_NO_STATISTICS__)
642 tls.pick.pop.attempt++;
643 #endif
644
645 // Pick the bet list
646 int w = i;
647 if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
648 w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
649 }
650
651 // Get relevant elements locally
652 __intrusive_lane_t & lane = lanes.data[w];
653
654 // If list looks empty retry
655 if( is_empty(lane) ) return 0p;
656
657 // If we can't get the lock retry
658 if( !__atomic_try_acquire(&lane.lock) ) return 0p;
659
660 #if defined(__CFA_WITH_VERIFY__)
661 /* paranoid */ verify(lane.last_id == -1u);
662 /* paranoid */ lane.last_id = kernelTLS.this_processor->id;
663 #endif
664
665
666 // If list is empty, unlock and retry
667 if( is_empty(lane) ) {
668 #if defined(__CFA_WITH_VERIFY__)
669 /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id);
670 /* paranoid */ lane.last_id = -1u;
671 #endif
672
673 __atomic_unlock(&lane.lock);
674 return 0p;
675 }
676
677 // Actually pop the list
678 struct $thread * thrd;
679 bool emptied;
680 [thrd, emptied] = pop(lane);
681
682 /* paranoid */ verify(thrd);
683 /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id);
684 /* paranoid */ verify(lane.lock);
685
686 // If this was the last element in the lane
687 if(emptied) {
688 // Update the global count
689 __atomic_fetch_sub( &used.count, 1z, __ATOMIC_SEQ_CST);
690
691 // Update the bit mask
692 mask_clear((__cfa_readyQ_mask_t *)used.mask, w, STRICT);
693 }
694
695 #if defined(__CFA_WITH_VERIFY__)
696 /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id);
697 /* paranoid */ lane.last_id = -1u;
698 #endif
699
700 // For statistics, check the count before we release the lock
701 #if !defined(__CFA_NO_STATISTICS__)
702 int num = __atomic_load_n( &used.count, __ATOMIC_RELAXED );
703 #endif
704
705 // Unlock and return
706 __atomic_unlock(&lane.lock);
707
708 // Update statistics
709 #if !defined(__CFA_NO_STATISTICS__)
710 tls.pick.pop.success++;
711 tls.used.value += num;
712 tls.used.count += 1;
713 #endif
714
715 // return the popped thread
716 return thrd;
717}
718
719// Pop from the ready queue from a given cluster
720__attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) {
721 /* paranoid */ verify( lanes.count > 0 );
722
723 // As long as the list is not empty, try finding a lane that isn't empty and pop from it
724 while( __atomic_load_n( &used.count, __ATOMIC_RELAXED ) != 0) {
725 #if !defined(__CFA_READQ_NO_BITMASK__)
726 // If using bit masks
727 #if !defined(__CFA_NO_SCHED_STATS__)
728 tls.pick.pop.maskrds++;
729 #endif
730
731 // Pick two lists at random
732 unsigned ri = __tls_rand();
733 unsigned rj = __tls_rand();
734
735 // Find which __cfa_readyQ_mask_t the two lists belong
736 unsigned num = ((__atomic_load_n( &lanes.count, __ATOMIC_RELAXED ) - 1) >> 6) + 1;
737 unsigned wdxi = (ri >> 6u) % num;
738 unsigned wdxj = (rj >> 6u) % num;
739
740 // Get the actual __cfa_readyQ_mask_t
741 size_t maski = __atomic_load_n( &used.mask[wdxi], __ATOMIC_RELAXED );
742 size_t maskj = __atomic_load_n( &used.mask[wdxj], __ATOMIC_RELAXED );
743
744 // If both of these masks are empty, retry
745 if(maski == 0 && maskj == 0) continue;
746
747 // Pick one of the non-zero bits in the masks and get the bit indexes
748 unsigned bi = rand_bit(ri, maski);
749 unsigned bj = rand_bit(rj, maskj);
750
751 // some checks
752 /* paranoid */ verifyf(bi < 64, "%zu %u", maski, bi);
753 /* paranoid */ verifyf(bj < 64, "%zu %u", maskj, bj);
754
755 // get the general list index
756 unsigned i = bi | (wdxi << 6);
757 unsigned j = bj | (wdxj << 6);
758
759 // some more checks
760 /* paranoid */ verifyf(i < lanes.count, "%u", wdxi << 6);
761 /* paranoid */ verifyf(j < lanes.count, "%u", wdxj << 6);
762
763 // try popping from the 2 picked lists
764 struct $thread * thrd = try_pop(cltr, i, j);
765 if(thrd) return thrd;
766 #else
767 // Pick two lists at random
768 int i = __tls_rand() % __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
769 int j = __tls_rand() % __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
770
771 // try popping from the 2 picked lists
772 struct $thread * thrd = try_pop(cltr, i, j);
773 if(thrd) return thrd;
774 #endif
775 }
776
777 // All lanes where empty return 0p
778 return 0p;
779}
780
781//-----------------------------------------------------------------------
782
783static void check( __ready_queue_t & q ) with (q) {
784 #if defined(__CFA_WITH_VERIFY__)
785 {
786 int idx = 0;
787 for( w ; __cfa_lane_mask_size ) {
788 for( b ; 8 * sizeof(__cfa_readyQ_mask_t) ) {
789 bool is_empty = idx < lanes.count ? (ts(lanes.data[idx]) == 0) : true;
790 bool should_be_empty = 0 == (used.mask[w] & (1z << b));
791 assertf(should_be_empty == is_empty, "Inconsistent list %d, mask expect : %d, actual is got %d", idx, should_be_empty, (bool)is_empty);
792 assert(__cfa_max_lanes > idx);
793 idx++;
794 }
795 }
796 }
797
798 {
799 for( idx ; lanes.count ) {
800 __intrusive_lane_t & sl = lanes.data[idx];
801 assert(!lanes.data[idx].lock);
802
803 assert(head(sl)->link.prev == 0p );
804 assert(head(sl)->link.next->link.prev == head(sl) );
805 assert(tail(sl)->link.next == 0p );
806 assert(tail(sl)->link.prev->link.next == tail(sl) );
807
808 if(sl.before.link.ts == 0l) {
809 assert(tail(sl)->link.next == 0p);
810 assert(tail(sl)->link.prev == head(sl));
811 assert(head(sl)->link.next == tail(sl));
812 assert(head(sl)->link.prev == 0p);
813 }
814 }
815 }
816 #endif
817}
818
819// Call this function of the intrusive list was moved using memcpy
820// fixes the list so that the pointers back to anchors aren't left dangling
821static inline void fix(__intrusive_lane_t & ll) {
822 // if the list is not empty then follow he pointer and fix its reverse
823 if(!is_empty(ll)) {
824 head(ll)->link.next->link.prev = head(ll);
825 tail(ll)->link.prev->link.next = tail(ll);
826 }
827 // Otherwise just reset the list
828 else {
829 verify(tail(ll)->link.next == 0p);
830 tail(ll)->link.prev = head(ll);
831 head(ll)->link.next = tail(ll);
832 verify(head(ll)->link.prev == 0p);
833 }
834}
835
836// Grow the ready queue
837void ready_queue_grow (struct cluster * cltr) {
838 // Lock the RWlock so no-one pushes/pops while we are changing the queue
839 uint_fast32_t last_size = ready_mutate_lock( *cltr );
840
841 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
842
843 // Make sure that everything is consistent
844 /* paranoid */ check( cltr->ready_queue );
845
846 // grow the ready queue
847 with( cltr->ready_queue ) {
848 size_t ncount = lanes.count;
849
850 // Check that we have some space left
851 if(ncount + 4 >= __cfa_max_lanes) abort("Program attempted to create more than maximum number of Ready Queues (%zu)", __cfa_max_lanes);
852
853 // increase count
854 ncount += 4;
855
856 // Allocate new array (uses realloc and memcpies the data)
857 lanes.data = alloc(lanes.data, ncount);
858
859 // Fix the moved data
860 for( idx; (size_t)lanes.count ) {
861 fix(lanes.data[idx]);
862 }
863
864 // Construct new data
865 for( idx; (size_t)lanes.count ~ ncount) {
866 (lanes.data[idx]){};
867 }
868
869 // Update original
870 lanes.count = ncount;
871
872 // fields in 'used' don't need to change when growing
873 }
874
875 // Make sure that everything is consistent
876 /* paranoid */ check( cltr->ready_queue );
877
878 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
879
880 // Unlock the RWlock
881 ready_mutate_unlock( *cltr, last_size );
882}
883
884// Shrink the ready queue
885void ready_queue_shrink(struct cluster * cltr) {
886 // Lock the RWlock so no-one pushes/pops while we are changing the queue
887 uint_fast32_t last_size = ready_mutate_lock( *cltr );
888
889 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
890
891 // Make sure that everything is consistent
892 /* paranoid */ check( cltr->ready_queue );
893
894 with( cltr->ready_queue ) {
895 // Make sure that the total thread count stays the same
896 #if defined(__CFA_WITH_VERIFY__)
897 size_t nthreads = 0;
898 for( idx; (size_t)lanes.count ) {
899 nthreads += lanes.data[idx].count;
900 }
901 #endif
902
903 size_t ocount = lanes.count;
904 // Check that we have some space left
905 if(ocount < 8) abort("Program attempted to destroy more Ready Queues than were created");
906
907 // reduce the actual count so push doesn't use the old queues
908 lanes.count -= 4;
909 verify(ocount > lanes.count);
910
911 // for printing count the number of displaced threads
912 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
913 __attribute__((unused)) size_t displaced = 0;
914 #endif
915
916 // redistribute old data
917 for( idx; (size_t)lanes.count ~ ocount) {
918 // Lock is not strictly needed but makes checking invariants much easier
919 bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
920 verify(locked);
921
922 // As long as we can pop from this lane to push the threads somewhere else in the queue
923 while(!is_empty(lanes.data[idx])) {
924 struct $thread * thrd;
925 __attribute__((unused)) bool _;
926 [thrd, _] = pop(lanes.data[idx]);
927
928 push(cltr, thrd);
929
930 // for printing count the number of displaced threads
931 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
932 displaced++;
933 #endif
934 }
935
936 mask_clear((__cfa_readyQ_mask_t *)used.mask, idx, NOCHECK);
937
938 // Unlock the lane
939 __atomic_unlock(&lanes.data[idx].lock);
940
941 // TODO print the queue statistics here
942
943 ^(lanes.data[idx]){};
944 }
945
946 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
947
948 // recompute the used.count instead of maintaining it
949 used.count = 0;
950 for( i ; __cfa_lane_mask_size ) {
951 used.count += __builtin_popcountl(used.mask[i]);
952 }
953
954 // Allocate new array (uses realloc and memcpies the data)
955 lanes.data = alloc(lanes.data, lanes.count);
956
957 // Fix the moved data
958 for( idx; (size_t)lanes.count ) {
959 fix(lanes.data[idx]);
960 }
961
962 // Make sure that the total thread count stayed the same
963 #if defined(__CFA_WITH_VERIFY__)
964 for( idx; (size_t)lanes.count ) {
965 nthreads -= lanes.data[idx].count;
966 }
967 verifyf(nthreads == 0, "Shrinking changed number of threads");
968 #endif
969 }
970
971 // Make sure that everything is consistent
972 /* paranoid */ check( cltr->ready_queue );
973
974 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
975
976 // Unlock the RWlock
977 ready_mutate_unlock( *cltr, last_size );
978}
979
980//-----------------------------------------------------------------------
981
982#if !defined(__CFA_NO_STATISTICS__)
983void stats_tls_tally(struct cluster * cltr) with (cltr->ready_queue) {
984 __atomic_fetch_add( &global_stats.pick.push.attempt, tls.pick.push.attempt, __ATOMIC_SEQ_CST );
985 __atomic_fetch_add( &global_stats.pick.push.success, tls.pick.push.success, __ATOMIC_SEQ_CST );
986 __atomic_fetch_add( &global_stats.pick.pop .maskrds, tls.pick.pop .maskrds, __ATOMIC_SEQ_CST );
987 __atomic_fetch_add( &global_stats.pick.pop .attempt, tls.pick.pop .attempt, __ATOMIC_SEQ_CST );
988 __atomic_fetch_add( &global_stats.pick.pop .success, tls.pick.pop .success, __ATOMIC_SEQ_CST );
989
990 __atomic_fetch_add( &global_stats.used.value, tls.used.value, __ATOMIC_SEQ_CST );
991 __atomic_fetch_add( &global_stats.used.count, tls.used.count, __ATOMIC_SEQ_CST );
992}
993#endif
Note: See TracBrowser for help on using the repository browser.