source: libcfa/src/concurrency/ready_queue.cfa @ 2a3d446

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 2a3d446 was b798713, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Working ready queue

  • Property mode set to 100644
File size: 23.8 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#define __cforall_thread__
17
18#include "bits/defs.hfa"
19#include "kernel_private.hfa"
20
21#define _GNU_SOURCE
22#include "stdlib.hfa"
23
24static const size_t cache_line_size = 64;
25
26static inline unsigned __max_processors_fallback() {
27        #ifdef __CFA_MAX_PROCESSORS__
28                return __CFA_MAX_PROCESSORS__;
29        #else
30                // No overriden function, no environment variable, no define
31                // fall back to a magic number
32                return 128;
33        #endif
34}
35
36__attribute__((weak)) unsigned __max_processors() {
37        const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
38        if(!max_cores_s) {
39                __cfaabi_dbg_print_nolock("No CFA_MAX_PROCESSORS in ENV");
40                return __max_processors_fallback();
41        }
42
43        char * endptr = 0p;
44        long int max_cores_l = strtol(max_cores_s, &endptr, 10);
45        if(max_cores_l < 1 || max_cores_l > 65535) {
46                __cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS out of range : %ld", max_cores_l);
47                return __max_processors_fallback();
48        }
49        if('\0' != *endptr) {
50                __cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS not a decimal number : %s", max_cores_s);
51                return __max_processors_fallback();
52        }
53
54        return max_cores_l;
55}
56
57static inline unsigned rand_bit(unsigned rnum, size_t mask) {
58        verify(sizeof(mask) == 8);
59        unsigned bit = mask ? rnum % __builtin_popcountl(mask) : 0;
60#if !defined(__BMI2__)
61        uint64_t v = mask;   // Input value to find position with rank r.
62        unsigned int r = bit + 1;// Input: bit's desired rank [1-64].
63        unsigned int s;      // Output: Resulting position of bit with rank r [1-64]
64        uint64_t a, b, c, d; // Intermediate temporaries for bit count.
65        unsigned int t;      // Bit count temporary.
66
67        // Do a normal parallel bit count for a 64-bit integer,
68        // but store all intermediate steps.
69        a =  v - ((v >> 1) & ~0UL/3);
70        b = (a & ~0UL/5) + ((a >> 2) & ~0UL/5);
71        c = (b + (b >> 4)) & ~0UL/0x11;
72        d = (c + (c >> 8)) & ~0UL/0x101;
73
74
75        t = (d >> 32) + (d >> 48);
76        // Now do branchless select!
77        s  = 64;
78        s -= ((t - r) & 256) >> 3; r -= (t & ((t - r) >> 8));
79        t  = (d >> (s - 16)) & 0xff;
80        s -= ((t - r) & 256) >> 4; r -= (t & ((t - r) >> 8));
81        t  = (c >> (s - 8)) & 0xf;
82        s -= ((t - r) & 256) >> 5; r -= (t & ((t - r) >> 8));
83        t  = (b >> (s - 4)) & 0x7;
84        s -= ((t - r) & 256) >> 6; r -= (t & ((t - r) >> 8));
85        t  = (a >> (s - 2)) & 0x3;
86        s -= ((t - r) & 256) >> 7; r -= (t & ((t - r) >> 8));
87        t  = (v >> (s - 1)) & 0x1;
88        s -= ((t - r) & 256) >> 8;
89        return s - 1;
90#else
91        uint64_t picked = _pdep_u64(1ul << bit, mask);
92        return picked ? __builtin_ctzl(picked) : 0;
93#endif
94}
95
96static inline __cfa_readyQ_mask_t readyQ_mask_full       () { return (8 * sizeof(__cfa_readyQ_mask_t)) - 1; }
97static inline __cfa_readyQ_mask_t readyQ_mask_shit_length() { return (8 * sizeof(__cfa_readyQ_mask_t)) - __builtin_clzl(readyQ_mask_full()); }
98
99static inline [__cfa_readyQ_mask_t, __cfa_readyQ_mask_t] extract(__cfa_readyQ_mask_t idx) {
100        __cfa_readyQ_mask_t word = idx >> readyQ_mask_shit_length();
101        __cfa_readyQ_mask_t bit  = idx &  readyQ_mask_full();
102        return [bit, word];
103}
104
105//=======================================================================
106// Cluster wide reader-writer lock
107//=======================================================================
108void  ?{}(__clusterRWLock_t & this) {
109        this.max   = __max_processors();
110        this.alloc = 0;
111        this.ready = 0;
112        this.lock  = false;
113        this.data  = alloc(this.max);
114
115        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data    )) % 64) );
116        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
117        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
118        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
119
120}
121void ^?{}(__clusterRWLock_t & this) {
122        free(this.data);
123}
124
125void ?{}( __processor_id & this, struct processor * proc ) {
126        this.handle = proc;
127        this.lock   = false;
128}
129
130//=======================================================================
131// Lock-Free registering/unregistering of threads
132unsigned doregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
133        // Step - 1 : check if there is already space in the data
134        uint_fast32_t s = ready;
135
136        // Check among all the ready
137        for(uint_fast32_t i = 0; i < s; i++) {
138                processor * null = 0p; // Re-write every loop since compare thrashes it
139                if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
140                        && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
141                        /*paranoid*/ verify(i < ready);
142                        /*paranoid*/ verify(__alignof__(data[i]) == cache_line_size);
143                        /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
144                        return i;
145                }
146        }
147
148        if(max <= alloc) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
149
150        // Step - 2 : F&A to get a new spot in the array.
151        uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
152        if(max <= n) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
153
154        // Step - 3 : Mark space as used and then publish it.
155        __processor_id * storage = (__processor_id *)&data[n];
156        (*storage){ proc };
157        while(true) {
158                unsigned copy = n;
159                if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
160                        && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
161                        break;
162                asm volatile("pause");
163        }
164
165        // Return new spot.
166        /*paranoid*/ verify(n < ready);
167        /*paranoid*/ verify(__alignof__(data[n]) == cache_line_size);
168        /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
169        return n;
170}
171
172void unregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
173        unsigned id = proc->id;
174        /*paranoid*/ verify(id < ready);
175        /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
176        __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
177}
178
179//-----------------------------------------------------------------------
180// Writer side : acquire when changing the ready queue, e.g. adding more
181//  queues or removing them.
182uint_fast32_t ready_mutate_lock( struct cluster & cltr ) with(cltr.ready_lock) {
183        // Step 1 : lock global lock
184        // It is needed to avoid processors that register mid Critical-Section
185        //   to simply lock their own lock and enter.
186        __atomic_acquire( &lock );
187
188        // Step 2 : lock per-proc lock
189        // Processors that are currently being registered aren't counted
190        //   but can't be in read_lock or in the critical section.
191        // All other processors are counted
192        uint_fast32_t s = ready;
193        for(uint_fast32_t i = 0; i < s; i++) {
194                __atomic_acquire( &data[i].lock );
195        }
196
197        return s;
198}
199
200void ready_mutate_unlock( struct cluster & cltr, uint_fast32_t last_s ) with(cltr.ready_lock) {
201        // Step 1 : release local locks
202        // This must be done while the global lock is held to avoid
203        //   threads that where created mid critical section
204        //   to race to lock their local locks and have the writer
205        //   immidiately unlock them
206        // Alternative solution : return s in write_lock and pass it to write_unlock
207        for(uint_fast32_t i = 0; i < last_s; i++) {
208                verify(data[i].lock);
209                __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
210        }
211
212        // Step 2 : release global lock
213        /*paranoid*/ assert(true == lock);
214        __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
215}
216
217//=======================================================================
218// Intrusive Queue used by ready queue
219//=======================================================================
220// Get the head pointer (one before the first element) from the anchor
221static inline thread_desc * head(const __intrusive_ready_queue_t & this) {
222        thread_desc * rhead = (thread_desc *)(
223                (uintptr_t)( &this.before ) - offsetof( thread_desc, link )
224        );
225        /* paranoid */ verify(rhead);
226        return rhead;
227}
228
229// Get the tail pointer (one after the last element) from the anchor
230static inline thread_desc * tail(const __intrusive_ready_queue_t & this) {
231        thread_desc * rtail = (thread_desc *)(
232                (uintptr_t)( &this.after ) - offsetof( thread_desc, link )
233        );
234        /* paranoid */ verify(rtail);
235        return rtail;
236}
237
238// Ctor
239void ?{}( __intrusive_ready_queue_t & this ) {
240        this.lock = false;
241        this.last_id = -1u;
242
243        this.before.link.prev = 0p;
244        this.before.link.next = tail(this);
245        this.before.link.ts   = 0;
246
247        this.after .link.prev = head(this);
248        this.after .link.next = 0p;
249        this.after .link.ts   = 0;
250
251        #if !defined(__CFA_NO_SCHED_STATS__)
252                this.stat.diff = 0;
253                this.stat.push = 0;
254                this.stat.pop  = 0;
255        #endif
256
257        // We add a boat-load of assertions here because the anchor code is very fragile
258        /* paranoid */ verify(((uintptr_t)( head(this) ) + offsetof( thread_desc, link )) == (uintptr_t)(&this.before));
259        /* paranoid */ verify(((uintptr_t)( tail(this) ) + offsetof( thread_desc, link )) == (uintptr_t)(&this.after ));
260        /* paranoid */ verify(head(this)->link.prev == 0p );
261        /* paranoid */ verify(head(this)->link.next == tail(this) );
262        /* paranoid */ verify(tail(this)->link.next == 0p );
263        /* paranoid */ verify(tail(this)->link.prev == head(this) );
264        /* paranoid */ verify(&head(this)->link.prev == &this.before.link.prev );
265        /* paranoid */ verify(&head(this)->link.next == &this.before.link.next );
266        /* paranoid */ verify(&tail(this)->link.prev == &this.after .link.prev );
267        /* paranoid */ verify(&tail(this)->link.next == &this.after .link.next );
268        /* paranoid */ verify(sizeof(__intrusive_ready_queue_t) == 128);
269        /* paranoid */ verify(sizeof(this) == 128);
270        /* paranoid */ verify(__alignof__(__intrusive_ready_queue_t) == 128);
271        /* paranoid */ verify(__alignof__(this) == 128);
272        /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128));
273
274        /* paranoid */ verifyf(readyQ_mask_shit_length() == 6 , "%zu", readyQ_mask_shit_length());
275        /* paranoid */ verifyf(readyQ_mask_full()        == 63, "%zu", readyQ_mask_full());
276}
277
278// Dtor is trivial
279void ^?{}( __intrusive_ready_queue_t & this ) {
280        // Make sure the list is empty
281        /* paranoid */ verify(head(this)->link.prev == 0p );
282        /* paranoid */ verify(head(this)->link.next == tail(this) );
283        /* paranoid */ verify(tail(this)->link.next == 0p );
284        /* paranoid */ verify(tail(this)->link.prev == head(this) );
285}
286
287
288
289bool push(__intrusive_ready_queue_t & this, thread_desc * node) {
290        verify(this.lock);
291        verify(node->link.ts != 0);
292        verify(node->link.next == 0p);
293        verify(node->link.prev == 0p);
294
295        if(this.before.link.ts == 0l) {
296                verify(tail(this)->link.next == 0p);
297                verify(tail(this)->link.prev == head(this));
298                verify(head(this)->link.next == tail(this));
299                verify(head(this)->link.prev == 0p);
300        }
301
302        // Get the relevant nodes locally
303        thread_desc * tail = tail(this);
304        thread_desc * prev = tail->link.prev;
305
306        // Do the push
307        node->link.next = tail;
308        node->link.prev = prev;
309        prev->link.next = node;
310        tail->link.prev = node;
311
312        // Update stats
313        #ifndef __CFA_NO_SCHED_STATS__
314                this.stat.diff++;
315                this.stat.push++;
316        #endif
317
318        verify(node->link.next == tail(this));
319
320        // Check if the queue used to be empty
321        if(this.before.link.ts == 0l) {
322                this.before.link.ts = node->link.ts;
323                verify(node->link.prev == head(this));
324                return true;
325        }
326        return false;
327}
328
329[thread_desc *, bool] pop(__intrusive_ready_queue_t & this) {
330        verify(this.lock);
331        verify(this.before.link.ts != 0ul);
332        thread_desc * head = head(this);
333        thread_desc * tail = tail(this);
334
335        thread_desc * node = head->link.next;
336        thread_desc * next = node->link.next;
337        if(node == tail) {
338                verify(false);
339                verify(this.before.link.ts == 0ul);
340                verify(tail(this)->link.next == 0p);
341                verify(tail(this)->link.prev == head(this));
342                verify(head(this)->link.next == tail(this));
343                verify(head(this)->link.prev == 0p);
344                return [0p, false];
345        }
346
347        /* paranoid */ verify(node);
348
349        head->link.next = next;
350        next->link.prev = head;
351
352        #ifndef __CFA_NO_SCHED_STATS__
353                this.stat.diff--;
354                this.stat.pop ++;
355        #endif
356
357        if(next == tail) {
358                this.before.link.ts = 0ul;
359                verify(tail(this)->link.next == 0p);
360                verify(tail(this)->link.prev == head(this));
361                verify(head(this)->link.next == tail(this));
362                verify(head(this)->link.prev == 0p);
363                node->link.[next, prev] = 0p;
364                return [node, true];
365        }
366        else {
367                verify(next->link.ts != 0);
368                this.before.link.ts = next->link.ts;
369                verify(this.before.link.ts != 0);
370                node->link.[next, prev] = 0p;
371                return [node, false];
372        }
373}
374
375static inline unsigned long long ts(__intrusive_ready_queue_t & this) {
376        return this.before.link.ts;
377}
378
379//=======================================================================
380// Cforall Reqdy Queue used by ready queue
381//=======================================================================
382
383static __attribute__((aligned(128))) thread_local struct {
384        struct {
385                struct {
386                        size_t attempt;
387                        size_t success;
388                } push;
389                struct {
390                        size_t maskrds;
391                        size_t attempt;
392                        size_t success;
393                } pop;
394        } pick;
395        struct {
396                size_t value;
397                size_t count;
398        } full;
399} tls = {
400        /* pick */{
401                /* push */{ 0, 0 },
402                /* pop  */{ 0, 0, 0 },
403        },
404        /* full */{ 0, 0 }
405};
406
407//-----------------------------------------------------------------------
408
409void ?{}(__ready_queue_t & this) with (this) {
410        empty.count = 0;
411        for( i ; __cfa_readyQ_mask_size ) {
412                empty.mask[i] = 0;
413        }
414
415        list.data = alloc(4);
416        for( i; 4 ) {
417                (list.data[i]){};
418        }
419        list.count = 4;
420
421        #if !defined(__CFA_NO_STATISTICS__)
422                global_stats.pick.push.attempt = 0;
423                global_stats.pick.push.success = 0;
424                global_stats.pick.pop .maskrds = 0;
425                global_stats.pick.pop .attempt = 0;
426                global_stats.pick.pop .success = 0;
427
428                global_stats.full.value = 0;
429                global_stats.full.count = 0;
430        #endif
431}
432
433void ^?{}(__ready_queue_t & this) with (this) {
434        verify( 4  == list .count );
435        verify( 0  == empty.count );
436
437        for( i; 4 ) {
438                ^(list.data[i]){};
439        }
440        free(list.data);
441
442
443        #if defined(__CFA_WITH_VERIFY__)
444                for( i ; __cfa_readyQ_mask_size ) {
445                        assert( 0 == empty.mask[i] );
446                }
447        #endif
448}
449
450//-----------------------------------------------------------------------
451
452__attribute__((hot)) bool push(struct cluster * cltr, struct thread_desc * thrd) with (cltr->ready_queue) {
453        thrd->link.ts = rdtscl();
454
455        while(true) {
456                // Pick a random list
457                unsigned i = tls_rand() % list.count;
458
459                #if !defined(__CFA_NO_STATISTICS__)
460                        tls.pick.push.attempt++;
461                #endif
462
463                // If we can't lock it retry
464                if( !__atomic_try_acquire( &list.data[i].lock ) ) continue;
465                verify(list.data[i].last_id == -1u);
466                list.data[i].last_id = kernelTLS.this_processor->id;
467
468                __attribute__((unused)) size_t num = __atomic_load_n( &empty.count, __ATOMIC_RELAXED );
469                bool first = false;
470
471                verify( list.data[i].last_id == kernelTLS.this_processor->id );
472                verify( list.data[i].lock );
473                // Actually push it
474                if(push(list.data[i], thrd)) {
475                        size_t ret = __atomic_fetch_add( &empty.count, 1z, __ATOMIC_SEQ_CST);
476                        first = (ret == 0);
477
478                        __cfa_readyQ_mask_t word;
479                        __cfa_readyQ_mask_t bit;
480                        [bit, word] = extract(i);
481                        verifyf((empty.mask[word] & (1ull << bit)) == 0, "Before set %llu:%llu (%u), %llx & %llx", word, bit, i, empty.mask[word], (1ull << bit));
482                        __attribute__((unused)) bool ret = bts(&empty.mask[word], bit);
483                        verify(!(bool)ret);
484                        verifyf((empty.mask[word] & (1ull << bit)) != 0, "After set %llu:%llu (%u), %llx & %llx", word, bit, i, empty.mask[word], (1ull << bit));
485                }
486                verify(empty.count <= (int)list.count);
487                verify( list.data[i].last_id == kernelTLS.this_processor->id );
488                verify( list.data[i].lock );
489
490                // Unlock and return
491                list.data[i].last_id = -1u;
492                __atomic_unlock( &list.data[i].lock );
493
494                #if !defined(__CFA_NO_STATISTICS__)
495                        tls.pick.push.success++;
496                        tls.full.value += num;
497                        tls.full.count += 1;
498                #endif
499                return first;
500        }
501}
502
503//-----------------------------------------------------------------------
504
505static struct thread_desc * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
506        #if !defined(__CFA_NO_STATISTICS__)
507                tls.pick.pop.attempt++;
508        #endif
509
510        // Pick the bet list
511        int w = i;
512        if( __builtin_expect(ts(list.data[j]) != 0, true) ) {
513                w = (ts(list.data[i]) < ts(list.data[j])) ? i : j;
514        }
515
516        __intrusive_ready_queue_t & list = list.data[w];
517        // If list looks empty retry
518        if( ts(list) == 0 ) return 0p;
519
520        // If we can't get the lock retry
521        if( !__atomic_try_acquire(&list.lock) ) return 0p;
522        verify(list.last_id == -1u);
523        list.last_id = kernelTLS.this_processor->id;
524
525        verify(list.last_id == kernelTLS.this_processor->id);
526
527        __attribute__((unused)) int num = __atomic_load_n( &empty.count, __ATOMIC_RELAXED );
528
529
530        // If list is empty, unlock and retry
531        if( ts(list) == 0 ) {
532                list.last_id = -1u;
533                __atomic_unlock(&list.lock);
534                return 0p;
535        }
536        {
537                __cfa_readyQ_mask_t word;
538                __cfa_readyQ_mask_t bit;
539                [bit, word] = extract(w);
540                verify((empty.mask[word] & (1ull << bit)) != 0);
541        }
542
543        verify(list.last_id == kernelTLS.this_processor->id);
544        verify(list.lock);
545
546        // Actually pop the list
547        struct thread_desc * thrd;
548        bool emptied;
549        [thrd, emptied] = pop(list);
550        verify(thrd);
551
552        verify(list.last_id == kernelTLS.this_processor->id);
553        verify(list.lock);
554
555        if(emptied) {
556                __atomic_fetch_sub( &empty.count, 1z, __ATOMIC_SEQ_CST);
557
558                __cfa_readyQ_mask_t word;
559                __cfa_readyQ_mask_t bit;
560                [bit, word] = extract(w);
561                verify((empty.mask[word] & (1ull << bit)) != 0);
562                __attribute__((unused)) bool ret = btr(&empty.mask[word], bit);
563                verify(ret);
564                verify((empty.mask[word] & (1ull << bit)) == 0);
565        }
566
567        verify(list.lock);
568
569        // Unlock and return
570        list.last_id = -1u;
571        __atomic_unlock(&list.lock);
572        verify(empty.count >= 0);
573
574        #if !defined(__CFA_NO_STATISTICS__)
575                tls.pick.pop.success++;
576                tls.full.value += num;
577                tls.full.count += 1;
578        #endif
579
580        return thrd;
581}
582
583__attribute__((hot)) thread_desc * pop(struct cluster * cltr) with (cltr->ready_queue) {
584        verify( list.count > 0 );
585        while( __atomic_load_n( &empty.count, __ATOMIC_RELAXED ) != 0) {
586                #if !defined(__CFA_READQ_NO_BITMASK__)
587                        tls.pick.pop.maskrds++;
588                        unsigned i, j;
589                        {
590                                #if !defined(__CFA_NO_SCHED_STATS__)
591                                        tls.pick.pop.maskrds++;
592                                #endif
593
594                                // Pick two lists at random
595                                unsigned num = ((__atomic_load_n( &list.count, __ATOMIC_RELAXED ) - 1) >> 6) + 1;
596
597                                unsigned ri = tls_rand();
598                                unsigned rj = tls_rand();
599
600                                unsigned wdxi = (ri >> 6u) % num;
601                                unsigned wdxj = (rj >> 6u) % num;
602
603                                size_t maski = __atomic_load_n( &empty.mask[wdxi], __ATOMIC_RELAXED );
604                                size_t maskj = __atomic_load_n( &empty.mask[wdxj], __ATOMIC_RELAXED );
605
606                                if(maski == 0 && maskj == 0) continue;
607
608                                unsigned bi = rand_bit(ri, maski);
609                                unsigned bj = rand_bit(rj, maskj);
610
611                                verifyf(bi < 64, "%zu %u", maski, bi);
612                                verifyf(bj < 64, "%zu %u", maskj, bj);
613
614                                i = bi | (wdxi << 6);
615                                j = bj | (wdxj << 6);
616
617                                verifyf(i < list.count, "%u", wdxi << 6);
618                                verifyf(j < list.count, "%u", wdxj << 6);
619                        }
620
621                        struct thread_desc * thrd = try_pop(cltr, i, j);
622                        if(thrd) return thrd;
623                #else
624                        // Pick two lists at random
625                        int i = tls_rand() % __atomic_load_n( &list.count, __ATOMIC_RELAXED );
626                        int j = tls_rand() % __atomic_load_n( &list.count, __ATOMIC_RELAXED );
627
628                        struct thread_desc * thrd = try_pop(cltr, i, j);
629                        if(thrd) return thrd;
630                #endif
631        }
632
633        return 0p;
634}
635
636//-----------------------------------------------------------------------
637
638static void check( __ready_queue_t & q ) with (q) {
639        #if defined(__CFA_WITH_VERIFY__)
640                {
641                        int idx = 0;
642                        for( w ; __cfa_readyQ_mask_size ) {
643                                for( b ; 8 * sizeof(__cfa_readyQ_mask_t) ) {
644                                        bool is_empty = idx < list.count ? (ts(list.data[idx]) == 0) : true;
645                                        bool should_be_empty = 0 == (empty.mask[w] & (1z << b));
646                                        assertf(should_be_empty == is_empty, "Inconsistent list %d, mask expect : %d, actual is got %d", idx, should_be_empty, (bool)is_empty);
647                                        assert(__cfa_max_readyQs > idx);
648                                        idx++;
649                                }
650                        }
651                }
652
653                {
654                        for( idx ; list.count ) {
655                                __intrusive_ready_queue_t & sl = list.data[idx];
656                                assert(!list.data[idx].lock);
657
658                                assert(head(sl)->link.prev == 0p );
659                                assert(head(sl)->link.next->link.prev == head(sl) );
660                                assert(tail(sl)->link.next == 0p );
661                                assert(tail(sl)->link.prev->link.next == tail(sl) );
662
663                                if(sl.before.link.ts == 0l) {
664                                        assert(tail(sl)->link.next == 0p);
665                                        assert(tail(sl)->link.prev == head(sl));
666                                        assert(head(sl)->link.next == tail(sl));
667                                        assert(head(sl)->link.prev == 0p);
668                                }
669                        }
670                }
671        #endif
672}
673
674// Call this function of the intrusive list was moved using memcpy
675// fixes the list so that the pointers back to anchors aren't left
676// dangling
677static inline void fix(__intrusive_ready_queue_t & ll) {
678        // if the list is not empty then follow he pointer
679        // and fix its reverse
680        if(ll.before.link.ts != 0l) {
681                head(ll)->link.next->link.prev = head(ll);
682                tail(ll)->link.prev->link.next = tail(ll);
683        }
684        // Otherwise just reset the list
685        else {
686                tail(ll)->link.next = 0p;
687                tail(ll)->link.prev = head(ll);
688                head(ll)->link.next = tail(ll);
689                head(ll)->link.prev = 0p;
690        }
691}
692
693void ready_queue_grow  (struct cluster * cltr) {
694        uint_fast32_t last_size = ready_mutate_lock( *cltr );
695        check( cltr->ready_queue );
696
697        with( cltr->ready_queue ) {
698                size_t ncount = list.count;
699
700                // Check that we have some space left
701                if(ncount + 4 >= __cfa_max_readyQs) abort("Program attempted to create more than maximum number of Ready Queues (%zu)", __cfa_max_readyQs);
702
703                ncount += 4;
704
705                // Allocate new array
706                list.data = alloc(list.data, ncount);
707
708                // Fix the moved data
709                for( idx; (size_t)list.count ) {
710                        fix(list.data[idx]);
711                }
712
713                // Construct new data
714                for( idx; (size_t)list.count ~ ncount) {
715                        (list.data[idx]){};
716                }
717
718                // Update original
719                list.count = ncount;
720                // fields in empty don't need to change
721        }
722
723        // Make sure that everything is consistent
724        check( cltr->ready_queue );
725        ready_mutate_unlock( *cltr, last_size );
726}
727
728void ready_queue_shrink(struct cluster * cltr) {
729        uint_fast32_t last_size = ready_mutate_lock( *cltr );
730        with( cltr->ready_queue ) {
731                size_t ocount = list.count;
732                // Check that we have some space left
733                if(ocount < 8) abort("Program attempted to destroy more Ready Queues than were created");
734
735                list.count -= 4;
736
737                // redistribute old data
738                verify(ocount > list.count);
739                for( idx; (size_t)list.count ~ ocount) {
740                        // This is not strictly needed but makes checking invariants much easier
741                        bool locked = __atomic_try_acquire(&list.data[idx].lock);
742                        verify(locked);
743                        while(0 != ts(list.data[idx])) {
744                                struct thread_desc * thrd;
745                                __attribute__((unused)) bool _;
746                                [thrd, _] = pop(list.data[idx]);
747                                verify(thrd);
748                                push(cltr, thrd);
749                        }
750
751                        __atomic_unlock(&list.data[idx].lock);
752
753                        // TODO print the queue statistics here
754
755                        ^(list.data[idx]){};
756                }
757
758                // clear the now unused masks
759                {
760                        __cfa_readyQ_mask_t fword, fbit, lword, lbit;
761                        [fbit, fword] = extract(ocount);
762                        [lbit, lword] = extract(list.count);
763
764                        // For now assume that all queues where coverd by the same bitmask
765                        // This is highly probable as long as grow and shrink use groups of 4
766                        // exclusively
767                        verify(fword == lword);
768                        __cfa_readyQ_mask_t clears = ~0;
769
770                        for( b ; fbit ~ lbit ) {
771                                clears ^= 1 << b;
772                        }
773
774                        empty.mask[fword] &= clears;
775                }
776
777                // Allocate new array
778                list.data = alloc(list.data, list.count);
779
780                // Fix the moved data
781                for( idx; (size_t)list.count ) {
782                        fix(list.data[idx]);
783                }
784        }
785
786        // Make sure that everything is consistent
787        check( cltr->ready_queue );
788        ready_mutate_unlock( *cltr, last_size );
789}
790
791//-----------------------------------------------------------------------
792
793#if !defined(__CFA_NO_STATISTICS__)
794void stats_tls_tally(struct cluster * cltr) with (cltr->ready_queue) {
795        __atomic_fetch_add( &global_stats.pick.push.attempt, tls.pick.push.attempt, __ATOMIC_SEQ_CST );
796        __atomic_fetch_add( &global_stats.pick.push.success, tls.pick.push.success, __ATOMIC_SEQ_CST );
797        __atomic_fetch_add( &global_stats.pick.pop .maskrds, tls.pick.pop .maskrds, __ATOMIC_SEQ_CST );
798        __atomic_fetch_add( &global_stats.pick.pop .attempt, tls.pick.pop .attempt, __ATOMIC_SEQ_CST );
799        __atomic_fetch_add( &global_stats.pick.pop .success, tls.pick.pop .success, __ATOMIC_SEQ_CST );
800
801        __atomic_fetch_add( &global_stats.full.value, tls.full.value, __ATOMIC_SEQ_CST );
802        __atomic_fetch_add( &global_stats.full.count, tls.full.count, __ATOMIC_SEQ_CST );
803}
804#endif
Note: See TracBrowser for help on using the repository browser.