source: libcfa/src/concurrency/ready_queue.cfa @ c84b4be

arm-ehenumforall-pointer-decayjacob/cs343-translationnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since c84b4be was c84b4be, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

new ready queue seems to work but halting does not, had to be disabled

  • Property mode set to 100644
File size: 24.9 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#define __cforall_thread__
17
18#include "bits/defs.hfa"
19#include "kernel_private.hfa"
20
21#define _GNU_SOURCE
22#include "stdlib.hfa"
23
24static const size_t cache_line_size = 64;
25
26static inline unsigned __max_processors_fallback() {
27        #ifdef __CFA_MAX_PROCESSORS__
28                return __CFA_MAX_PROCESSORS__;
29        #else
30                // No overriden function, no environment variable, no define
31                // fall back to a magic number
32                return 128;
33        #endif
34}
35
36__attribute__((weak)) unsigned __max_processors() {
37        const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
38        if(!max_cores_s) {
39                __cfaabi_dbg_print_nolock("No CFA_MAX_PROCESSORS in ENV");
40                return __max_processors_fallback();
41        }
42
43        char * endptr = 0p;
44        long int max_cores_l = strtol(max_cores_s, &endptr, 10);
45        if(max_cores_l < 1 || max_cores_l > 65535) {
46                __cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS out of range : %ld", max_cores_l);
47                return __max_processors_fallback();
48        }
49        if('\0' != *endptr) {
50                __cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS not a decimal number : %s", max_cores_s);
51                return __max_processors_fallback();
52        }
53
54        return max_cores_l;
55}
56
57static inline unsigned rand_bit(unsigned rnum, size_t mask) {
58        verify(sizeof(mask) == 8);
59        unsigned bit = mask ? rnum % __builtin_popcountl(mask) : 0;
60#if !defined(__BMI2__)
61        uint64_t v = mask;   // Input value to find position with rank r.
62        unsigned int r = bit + 1;// Input: bit's desired rank [1-64].
63        unsigned int s;      // Output: Resulting position of bit with rank r [1-64]
64        uint64_t a, b, c, d; // Intermediate temporaries for bit count.
65        unsigned int t;      // Bit count temporary.
66
67        // Do a normal parallel bit count for a 64-bit integer,
68        // but store all intermediate steps.
69        a =  v - ((v >> 1) & ~0UL/3);
70        b = (a & ~0UL/5) + ((a >> 2) & ~0UL/5);
71        c = (b + (b >> 4)) & ~0UL/0x11;
72        d = (c + (c >> 8)) & ~0UL/0x101;
73
74
75        t = (d >> 32) + (d >> 48);
76        // Now do branchless select!
77        s  = 64;
78        s -= ((t - r) & 256) >> 3; r -= (t & ((t - r) >> 8));
79        t  = (d >> (s - 16)) & 0xff;
80        s -= ((t - r) & 256) >> 4; r -= (t & ((t - r) >> 8));
81        t  = (c >> (s - 8)) & 0xf;
82        s -= ((t - r) & 256) >> 5; r -= (t & ((t - r) >> 8));
83        t  = (b >> (s - 4)) & 0x7;
84        s -= ((t - r) & 256) >> 6; r -= (t & ((t - r) >> 8));
85        t  = (a >> (s - 2)) & 0x3;
86        s -= ((t - r) & 256) >> 7; r -= (t & ((t - r) >> 8));
87        t  = (v >> (s - 1)) & 0x1;
88        s -= ((t - r) & 256) >> 8;
89        return s - 1;
90#else
91        uint64_t picked = _pdep_u64(1ul << bit, mask);
92        return picked ? __builtin_ctzl(picked) : 0;
93#endif
94}
95
96static inline __cfa_readyQ_mask_t readyQ_mask_full       () { return (8 * sizeof(__cfa_readyQ_mask_t)) - 1; }
97static inline __cfa_readyQ_mask_t readyQ_mask_shit_length() { return (8 * sizeof(__cfa_readyQ_mask_t)) - __builtin_clzl(readyQ_mask_full()); }
98
99static inline [__cfa_readyQ_mask_t, __cfa_readyQ_mask_t] extract(__cfa_readyQ_mask_t idx) {
100        __cfa_readyQ_mask_t word = idx >> readyQ_mask_shit_length();
101        __cfa_readyQ_mask_t bit  = idx &  readyQ_mask_full();
102        return [bit, word];
103}
104
105//=======================================================================
106// Cluster wide reader-writer lock
107//=======================================================================
108void  ?{}(__clusterRWLock_t & this) {
109        this.max   = __max_processors();
110        this.alloc = 0;
111        this.ready = 0;
112        this.lock  = false;
113        this.data  = alloc(this.max);
114
115        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data    )) % 64) );
116        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
117        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
118        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
119
120}
121void ^?{}(__clusterRWLock_t & this) {
122        free(this.data);
123}
124
125void ?{}( __processor_id & this, struct processor * proc ) {
126        this.handle = proc;
127        this.lock   = false;
128}
129
130//=======================================================================
131// Lock-Free registering/unregistering of threads
132unsigned doregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
133        // Step - 1 : check if there is already space in the data
134        uint_fast32_t s = ready;
135
136        // Check among all the ready
137        for(uint_fast32_t i = 0; i < s; i++) {
138                processor * null = 0p; // Re-write every loop since compare thrashes it
139                if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
140                        && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
141                        /*paranoid*/ verify(i < ready);
142                        /*paranoid*/ verify(__alignof__(data[i]) == cache_line_size);
143                        /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
144                        return i;
145                }
146        }
147
148        if(max <= alloc) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
149
150        // Step - 2 : F&A to get a new spot in the array.
151        uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
152        if(max <= n) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
153
154        // Step - 3 : Mark space as used and then publish it.
155        __processor_id * storage = (__processor_id *)&data[n];
156        (*storage){ proc };
157        while(true) {
158                unsigned copy = n;
159                if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
160                        && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
161                        break;
162                asm volatile("pause");
163        }
164
165        // Return new spot.
166        /*paranoid*/ verify(n < ready);
167        /*paranoid*/ verify(__alignof__(data[n]) == cache_line_size);
168        /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
169        return n;
170}
171
172void unregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
173        unsigned id = proc->id;
174        /*paranoid*/ verify(id < ready);
175        /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
176        __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
177}
178
179//-----------------------------------------------------------------------
180// Writer side : acquire when changing the ready queue, e.g. adding more
181//  queues or removing them.
182uint_fast32_t ready_mutate_lock( struct cluster & cltr ) with(cltr.ready_lock) {
183        // Step 1 : lock global lock
184        // It is needed to avoid processors that register mid Critical-Section
185        //   to simply lock their own lock and enter.
186        __atomic_acquire( &lock );
187
188        // Step 2 : lock per-proc lock
189        // Processors that are currently being registered aren't counted
190        //   but can't be in read_lock or in the critical section.
191        // All other processors are counted
192        uint_fast32_t s = ready;
193        for(uint_fast32_t i = 0; i < s; i++) {
194                __atomic_acquire( &data[i].lock );
195        }
196
197        return s;
198}
199
200void ready_mutate_unlock( struct cluster & cltr, uint_fast32_t last_s ) with(cltr.ready_lock) {
201        // Step 1 : release local locks
202        // This must be done while the global lock is held to avoid
203        //   threads that where created mid critical section
204        //   to race to lock their local locks and have the writer
205        //   immidiately unlock them
206        // Alternative solution : return s in write_lock and pass it to write_unlock
207        for(uint_fast32_t i = 0; i < last_s; i++) {
208                verify(data[i].lock);
209                __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
210        }
211
212        // Step 2 : release global lock
213        /*paranoid*/ assert(true == lock);
214        __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
215}
216
217//=======================================================================
218// Intrusive Queue used by ready queue
219//=======================================================================
220// Get the head pointer (one before the first element) from the anchor
221static inline thread_desc * head(const __intrusive_ready_queue_t & this) {
222        thread_desc * rhead = (thread_desc *)(
223                (uintptr_t)( &this.before ) - offsetof( thread_desc, link )
224        );
225        /* paranoid */ verify(rhead);
226        return rhead;
227}
228
229// Get the tail pointer (one after the last element) from the anchor
230static inline thread_desc * tail(const __intrusive_ready_queue_t & this) {
231        thread_desc * rtail = (thread_desc *)(
232                (uintptr_t)( &this.after ) - offsetof( thread_desc, link )
233        );
234        /* paranoid */ verify(rtail);
235        return rtail;
236}
237
238// Ctor
239void ?{}( __intrusive_ready_queue_t & this ) {
240        this.lock = false;
241        this.last_id = -1u;
242        this.count = 0u;
243
244        this.before.link.prev = 0p;
245        this.before.link.next = tail(this);
246        this.before.link.ts   = 0;
247
248        this.after .link.prev = head(this);
249        this.after .link.next = 0p;
250        this.after .link.ts   = 0;
251
252        #if !defined(__CFA_NO_SCHED_STATS__)
253                this.stat.diff = 0;
254                this.stat.push = 0;
255                this.stat.pop  = 0;
256        #endif
257
258        // We add a boat-load of assertions here because the anchor code is very fragile
259        /* paranoid */ verify(((uintptr_t)( head(this) ) + offsetof( thread_desc, link )) == (uintptr_t)(&this.before));
260        /* paranoid */ verify(((uintptr_t)( tail(this) ) + offsetof( thread_desc, link )) == (uintptr_t)(&this.after ));
261        /* paranoid */ verify(head(this)->link.prev == 0p );
262        /* paranoid */ verify(head(this)->link.next == tail(this) );
263        /* paranoid */ verify(tail(this)->link.next == 0p );
264        /* paranoid */ verify(tail(this)->link.prev == head(this) );
265        /* paranoid */ verify(&head(this)->link.prev == &this.before.link.prev );
266        /* paranoid */ verify(&head(this)->link.next == &this.before.link.next );
267        /* paranoid */ verify(&tail(this)->link.prev == &this.after .link.prev );
268        /* paranoid */ verify(&tail(this)->link.next == &this.after .link.next );
269        /* paranoid */ verify(sizeof(__intrusive_ready_queue_t) == 128);
270        /* paranoid */ verify(sizeof(this) == 128);
271        /* paranoid */ verify(__alignof__(__intrusive_ready_queue_t) == 128);
272        /* paranoid */ verify(__alignof__(this) == 128);
273        /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128));
274
275        /* paranoid */ verifyf(readyQ_mask_shit_length() == 6 , "%zu", readyQ_mask_shit_length());
276        /* paranoid */ verifyf(readyQ_mask_full()        == 63, "%zu", readyQ_mask_full());
277}
278
279// Dtor is trivial
280void ^?{}( __intrusive_ready_queue_t & this ) {
281        // Make sure the list is empty
282        /* paranoid */ verify(head(this)->link.prev == 0p );
283        /* paranoid */ verify(head(this)->link.next == tail(this) );
284        /* paranoid */ verify(tail(this)->link.next == 0p );
285        /* paranoid */ verify(tail(this)->link.prev == head(this) );
286        /* paranoid */ verify(this.count == 0u );
287}
288
289
290
291bool push(__intrusive_ready_queue_t & this, thread_desc * node) {
292        verify(this.lock);
293        verify(node->link.ts != 0);
294        verify(node->link.next == 0p);
295        verify(node->link.prev == 0p);
296
297        this.count++;
298
299        if(this.before.link.ts == 0l) {
300                verify(tail(this)->link.next == 0p);
301                verify(tail(this)->link.prev == head(this));
302                verify(head(this)->link.next == tail(this));
303                verify(head(this)->link.prev == 0p);
304        }
305
306        // Get the relevant nodes locally
307        thread_desc * tail = tail(this);
308        thread_desc * prev = tail->link.prev;
309
310        // Do the push
311        node->link.next = tail;
312        node->link.prev = prev;
313        prev->link.next = node;
314        tail->link.prev = node;
315
316        // Update stats
317        #ifndef __CFA_NO_SCHED_STATS__
318                this.stat.diff++;
319                this.stat.push++;
320        #endif
321
322        verify(node->link.next == tail(this));
323
324        // Check if the queue used to be empty
325        if(this.before.link.ts == 0l) {
326                this.before.link.ts = node->link.ts;
327                verify(node->link.prev == head(this));
328                return true;
329        }
330        return false;
331}
332
333[thread_desc *, bool] pop(__intrusive_ready_queue_t & this) {
334        verify(this.lock);
335        verify(this.before.link.ts != 0ul);
336        thread_desc * head = head(this);
337        thread_desc * tail = tail(this);
338
339        thread_desc * node = head->link.next;
340        thread_desc * next = node->link.next;
341        if(node == tail) {
342                verify(false);
343                verify(this.before.link.ts == 0ul);
344                verify(tail(this)->link.next == 0p);
345                verify(tail(this)->link.prev == head(this));
346                verify(head(this)->link.next == tail(this));
347                verify(head(this)->link.prev == 0p);
348                return [0p, false];
349        }
350
351        this.count--;
352        /* paranoid */ verify(node);
353
354        head->link.next = next;
355        next->link.prev = head;
356
357        #ifndef __CFA_NO_SCHED_STATS__
358                this.stat.diff--;
359                this.stat.pop ++;
360        #endif
361
362        if(next == tail) {
363                this.before.link.ts = 0ul;
364                verify(tail(this)->link.next == 0p);
365                verify(tail(this)->link.prev == head(this));
366                verify(head(this)->link.next == tail(this));
367                verify(head(this)->link.prev == 0p);
368                node->link.[next, prev] = 0p;
369                return [node, true];
370        }
371        else {
372                verify(next->link.ts != 0);
373                this.before.link.ts = next->link.ts;
374                verify(this.before.link.ts != 0);
375                node->link.[next, prev] = 0p;
376                return [node, false];
377        }
378}
379
380static inline unsigned long long ts(__intrusive_ready_queue_t & this) {
381        return this.before.link.ts;
382}
383
384//=======================================================================
385// Cforall Reqdy Queue used by ready queue
386//=======================================================================
387
388static __attribute__((aligned(128))) thread_local struct {
389        struct {
390                struct {
391                        size_t attempt;
392                        size_t success;
393                } push;
394                struct {
395                        size_t maskrds;
396                        size_t attempt;
397                        size_t success;
398                } pop;
399        } pick;
400        struct {
401                size_t value;
402                size_t count;
403        } full;
404} tls = {
405        /* pick */{
406                /* push */{ 0, 0 },
407                /* pop  */{ 0, 0, 0 },
408        },
409        /* full */{ 0, 0 }
410};
411
412//-----------------------------------------------------------------------
413
414void ?{}(__ready_queue_t & this) with (this) {
415        empty.count = 0;
416        for( i ; __cfa_readyQ_mask_size ) {
417                empty.mask[i] = 0;
418        }
419
420        list.data = alloc(4);
421        for( i; 4 ) {
422                (list.data[i]){};
423        }
424        list.count = 4;
425
426        #if !defined(__CFA_NO_STATISTICS__)
427                global_stats.pick.push.attempt = 0;
428                global_stats.pick.push.success = 0;
429                global_stats.pick.pop .maskrds = 0;
430                global_stats.pick.pop .attempt = 0;
431                global_stats.pick.pop .success = 0;
432
433                global_stats.full.value = 0;
434                global_stats.full.count = 0;
435        #endif
436}
437
438void ^?{}(__ready_queue_t & this) with (this) {
439        verify( 4  == list .count );
440        verify( 0  == empty.count );
441
442        for( i; 4 ) {
443                ^(list.data[i]){};
444        }
445        free(list.data);
446
447
448        #if defined(__CFA_WITH_VERIFY__)
449                for( i ; __cfa_readyQ_mask_size ) {
450                        assert( 0 == empty.mask[i] );
451                }
452        #endif
453}
454
455//-----------------------------------------------------------------------
456
457__attribute__((hot)) bool push(struct cluster * cltr, struct thread_desc * thrd) with (cltr->ready_queue) {
458        thrd->link.ts = rdtscl();
459
460        while(true) {
461                // Pick a random list
462                unsigned i = tls_rand() % list.count;
463
464                #if !defined(__CFA_NO_STATISTICS__)
465                        tls.pick.push.attempt++;
466                #endif
467
468                // If we can't lock it retry
469                if( !__atomic_try_acquire( &list.data[i].lock ) ) continue;
470                verify(list.data[i].last_id == -1u);
471                list.data[i].last_id = kernelTLS.this_processor->id;
472
473                __attribute__((unused)) size_t num = __atomic_load_n( &empty.count, __ATOMIC_RELAXED );
474                bool first = false;
475
476                verify( list.data[i].last_id == kernelTLS.this_processor->id );
477                verify( list.data[i].lock );
478                // Actually push it
479                if(push(list.data[i], thrd)) {
480                        size_t ret = __atomic_fetch_add( &empty.count, 1z, __ATOMIC_SEQ_CST);
481                        first = (ret == 0);
482
483                        __cfa_readyQ_mask_t word;
484                        __cfa_readyQ_mask_t bit;
485                        [bit, word] = extract(i);
486                        verifyf((empty.mask[word] & (1ull << bit)) == 0, "Before set %llu:%llu (%u), %llx & %llx", word, bit, i, empty.mask[word], (1ull << bit));
487                        __attribute__((unused)) bool ret = bts(&empty.mask[word], bit);
488                        verify(!(bool)ret);
489                        verifyf((empty.mask[word] & (1ull << bit)) != 0, "After set %llu:%llu (%u), %llx & %llx", word, bit, i, empty.mask[word], (1ull << bit));
490                }
491                verifyf( empty.count <= list.count, "Non-empty count (%zu) exceeds actual count (%zu)\n", empty.count, list.count );
492                verifyf( list.data[i].last_id == kernelTLS.this_processor->id, "Expected last processor to lock queue %u to be %u, was %u\n", i, list.data[i].last_id, kernelTLS.this_processor->id );
493                verifyf( list.data[i].lock, "List %u is not locked\n", i );
494
495                // Unlock and return
496                list.data[i].last_id = -1u;
497                __atomic_unlock( &list.data[i].lock );
498
499                #if !defined(__CFA_NO_STATISTICS__)
500                        tls.pick.push.success++;
501                        tls.full.value += num;
502                        tls.full.count += 1;
503                #endif
504                return first;
505        }
506}
507
508//-----------------------------------------------------------------------
509
510static struct thread_desc * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
511        #if !defined(__CFA_NO_STATISTICS__)
512                tls.pick.pop.attempt++;
513        #endif
514
515        // Pick the bet list
516        int w = i;
517        if( __builtin_expect(ts(list.data[j]) != 0, true) ) {
518                w = (ts(list.data[i]) < ts(list.data[j])) ? i : j;
519        }
520
521        __intrusive_ready_queue_t & list = list.data[w];
522        // If list looks empty retry
523        if( ts(list) == 0 ) return 0p;
524
525        // If we can't get the lock retry
526        if( !__atomic_try_acquire(&list.lock) ) return 0p;
527        verify(list.last_id == -1u);
528        list.last_id = kernelTLS.this_processor->id;
529
530        verify(list.last_id == kernelTLS.this_processor->id);
531
532        __attribute__((unused)) int num = __atomic_load_n( &empty.count, __ATOMIC_RELAXED );
533
534
535        // If list is empty, unlock and retry
536        if( ts(list) == 0 ) {
537                list.last_id = -1u;
538                __atomic_unlock(&list.lock);
539                return 0p;
540        }
541        {
542                __cfa_readyQ_mask_t word;
543                __cfa_readyQ_mask_t bit;
544                [bit, word] = extract(w);
545                verify((empty.mask[word] & (1ull << bit)) != 0);
546        }
547
548        verify(list.last_id == kernelTLS.this_processor->id);
549        verify(list.lock);
550
551        // Actually pop the list
552        struct thread_desc * thrd;
553        bool emptied;
554        [thrd, emptied] = pop(list);
555        verify(thrd);
556
557        verify(list.last_id == kernelTLS.this_processor->id);
558        verify(list.lock);
559
560        if(emptied) {
561                __atomic_fetch_sub( &empty.count, 1z, __ATOMIC_SEQ_CST);
562
563                __cfa_readyQ_mask_t word;
564                __cfa_readyQ_mask_t bit;
565                [bit, word] = extract(w);
566                verify((empty.mask[word] & (1ull << bit)) != 0);
567                __attribute__((unused)) bool ret = btr(&empty.mask[word], bit);
568                verify(ret);
569                verify((empty.mask[word] & (1ull << bit)) == 0);
570        }
571
572        verify(list.lock);
573
574        // Unlock and return
575        list.last_id = -1u;
576        __atomic_unlock(&list.lock);
577        verify(empty.count >= 0);
578
579        #if !defined(__CFA_NO_STATISTICS__)
580                tls.pick.pop.success++;
581                tls.full.value += num;
582                tls.full.count += 1;
583        #endif
584
585        return thrd;
586}
587
588__attribute__((hot)) thread_desc * pop(struct cluster * cltr) with (cltr->ready_queue) {
589        verify( list.count > 0 );
590        while( __atomic_load_n( &empty.count, __ATOMIC_RELAXED ) != 0) {
591                #if !defined(__CFA_READQ_NO_BITMASK__)
592                        tls.pick.pop.maskrds++;
593                        unsigned i, j;
594                        {
595                                #if !defined(__CFA_NO_SCHED_STATS__)
596                                        tls.pick.pop.maskrds++;
597                                #endif
598
599                                // Pick two lists at random
600                                unsigned num = ((__atomic_load_n( &list.count, __ATOMIC_RELAXED ) - 1) >> 6) + 1;
601
602                                unsigned ri = tls_rand();
603                                unsigned rj = tls_rand();
604
605                                unsigned wdxi = (ri >> 6u) % num;
606                                unsigned wdxj = (rj >> 6u) % num;
607
608                                size_t maski = __atomic_load_n( &empty.mask[wdxi], __ATOMIC_RELAXED );
609                                size_t maskj = __atomic_load_n( &empty.mask[wdxj], __ATOMIC_RELAXED );
610
611                                if(maski == 0 && maskj == 0) continue;
612
613                                unsigned bi = rand_bit(ri, maski);
614                                unsigned bj = rand_bit(rj, maskj);
615
616                                verifyf(bi < 64, "%zu %u", maski, bi);
617                                verifyf(bj < 64, "%zu %u", maskj, bj);
618
619                                i = bi | (wdxi << 6);
620                                j = bj | (wdxj << 6);
621
622                                verifyf(i < list.count, "%u", wdxi << 6);
623                                verifyf(j < list.count, "%u", wdxj << 6);
624                        }
625
626                        struct thread_desc * thrd = try_pop(cltr, i, j);
627                        if(thrd) return thrd;
628                #else
629                        // Pick two lists at random
630                        int i = tls_rand() % __atomic_load_n( &list.count, __ATOMIC_RELAXED );
631                        int j = tls_rand() % __atomic_load_n( &list.count, __ATOMIC_RELAXED );
632
633                        struct thread_desc * thrd = try_pop(cltr, i, j);
634                        if(thrd) return thrd;
635                #endif
636        }
637
638        return 0p;
639}
640
641//-----------------------------------------------------------------------
642
643static void check( __ready_queue_t & q ) with (q) {
644        #if defined(__CFA_WITH_VERIFY__)
645                {
646                        int idx = 0;
647                        for( w ; __cfa_readyQ_mask_size ) {
648                                for( b ; 8 * sizeof(__cfa_readyQ_mask_t) ) {
649                                        bool is_empty = idx < list.count ? (ts(list.data[idx]) == 0) : true;
650                                        bool should_be_empty = 0 == (empty.mask[w] & (1z << b));
651                                        assertf(should_be_empty == is_empty, "Inconsistent list %d, mask expect : %d, actual is got %d", idx, should_be_empty, (bool)is_empty);
652                                        assert(__cfa_max_readyQs > idx);
653                                        idx++;
654                                }
655                        }
656                }
657
658                {
659                        for( idx ; list.count ) {
660                                __intrusive_ready_queue_t & sl = list.data[idx];
661                                assert(!list.data[idx].lock);
662
663                                assert(head(sl)->link.prev == 0p );
664                                assert(head(sl)->link.next->link.prev == head(sl) );
665                                assert(tail(sl)->link.next == 0p );
666                                assert(tail(sl)->link.prev->link.next == tail(sl) );
667
668                                if(sl.before.link.ts == 0l) {
669                                        assert(tail(sl)->link.next == 0p);
670                                        assert(tail(sl)->link.prev == head(sl));
671                                        assert(head(sl)->link.next == tail(sl));
672                                        assert(head(sl)->link.prev == 0p);
673                                }
674                        }
675                }
676        #endif
677}
678
679// Call this function of the intrusive list was moved using memcpy
680// fixes the list so that the pointers back to anchors aren't left
681// dangling
682static inline void fix(__intrusive_ready_queue_t & ll) {
683        // if the list is not empty then follow he pointer
684        // and fix its reverse
685        if(ll.before.link.ts != 0l) {
686                head(ll)->link.next->link.prev = head(ll);
687                tail(ll)->link.prev->link.next = tail(ll);
688        }
689        // Otherwise just reset the list
690        else {
691                tail(ll)->link.next = 0p;
692                tail(ll)->link.prev = head(ll);
693                head(ll)->link.next = tail(ll);
694                head(ll)->link.prev = 0p;
695        }
696}
697
698void ready_queue_grow  (struct cluster * cltr) {
699        uint_fast32_t last_size = ready_mutate_lock( *cltr );
700        __cfaabi_dbg_print_safe("Kernel : Growing ready queue\n");
701        check( cltr->ready_queue );
702
703        with( cltr->ready_queue ) {
704                size_t ncount = list.count;
705
706                // Check that we have some space left
707                if(ncount + 4 >= __cfa_max_readyQs) abort("Program attempted to create more than maximum number of Ready Queues (%zu)", __cfa_max_readyQs);
708
709                ncount += 4;
710
711                // Allocate new array
712                list.data = alloc(list.data, ncount);
713
714                // Fix the moved data
715                for( idx; (size_t)list.count ) {
716                        fix(list.data[idx]);
717                }
718
719                // Construct new data
720                for( idx; (size_t)list.count ~ ncount) {
721                        (list.data[idx]){};
722                }
723
724                // Update original
725                list.count = ncount;
726                // fields in empty don't need to change
727        }
728
729        // Make sure that everything is consistent
730        check( cltr->ready_queue );
731        __cfaabi_dbg_print_safe("Kernel : Growing ready queue done\n");
732        ready_mutate_unlock( *cltr, last_size );
733}
734
735void ready_queue_shrink(struct cluster * cltr) {
736        uint_fast32_t last_size = ready_mutate_lock( *cltr );
737        __cfaabi_dbg_print_safe("Kernel : Shrinking ready queue\n");
738        with( cltr->ready_queue ) {
739                #if defined(__CFA_WITH_VERIFY__)
740                        size_t nthreads = 0;
741                        for( idx; (size_t)list.count ) {
742                                nthreads += list.data[idx].count;
743                        }
744                #endif
745
746                size_t ocount = list.count;
747                // Check that we have some space left
748                if(ocount < 8) abort("Program attempted to destroy more Ready Queues than were created");
749
750                list.count -= 4;
751
752                // redistribute old data
753                verify(ocount > list.count);
754                __attribute__((unused)) size_t displaced = 0;
755                for( idx; (size_t)list.count ~ ocount) {
756                        // This is not strictly needed but makes checking invariants much easier
757                        bool locked = __atomic_try_acquire(&list.data[idx].lock);
758                        verify(locked);
759                        while(0 != ts(list.data[idx])) {
760                                struct thread_desc * thrd;
761                                __attribute__((unused)) bool _;
762                                [thrd, _] = pop(list.data[idx]);
763                                verify(thrd);
764                                push(cltr, thrd);
765                                displaced++;
766                        }
767
768                        __atomic_unlock(&list.data[idx].lock);
769
770                        // TODO print the queue statistics here
771
772                        ^(list.data[idx]){};
773                }
774
775                __cfaabi_dbg_print_safe("Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
776
777                // clear the now unused masks
778                {
779                        __cfa_readyQ_mask_t fword, fbit, lword, lbit;
780                        [fbit, fword] = extract(ocount);
781                        [lbit, lword] = extract(list.count);
782
783                        // For now assume that all queues where coverd by the same bitmask
784                        // This is highly probable as long as grow and shrink use groups of 4
785                        // exclusively
786                        verify(fword == lword);
787                        __cfa_readyQ_mask_t clears = ~0;
788
789                        for( b ; lbit ~ fbit ) {
790                                clears ^= 1 << b;
791                        }
792
793                        empty.mask[fword] &= clears;
794
795
796                        empty.count = 0;
797                        for( i ; 0 ~= lword ) {
798                                empty.count += __builtin_popcountl(empty.mask[i]);
799                        }
800                }
801
802                // Allocate new array
803                list.data = alloc(list.data, list.count);
804
805                // Fix the moved data
806                for( idx; (size_t)list.count ) {
807                        fix(list.data[idx]);
808                }
809
810                #if defined(__CFA_WITH_VERIFY__)
811                        for( idx; (size_t)list.count ) {
812                                nthreads -= list.data[idx].count;
813                        }
814                        assertf(nthreads == 0, "Shrinking changed number of threads");
815                #endif
816        }
817
818        // Make sure that everything is consistent
819        check( cltr->ready_queue );
820        __cfaabi_dbg_print_safe("Kernel : Shrinking ready queue done\n");
821        ready_mutate_unlock( *cltr, last_size );
822}
823
824//-----------------------------------------------------------------------
825
826#if !defined(__CFA_NO_STATISTICS__)
827void stats_tls_tally(struct cluster * cltr) with (cltr->ready_queue) {
828        __atomic_fetch_add( &global_stats.pick.push.attempt, tls.pick.push.attempt, __ATOMIC_SEQ_CST );
829        __atomic_fetch_add( &global_stats.pick.push.success, tls.pick.push.success, __ATOMIC_SEQ_CST );
830        __atomic_fetch_add( &global_stats.pick.pop .maskrds, tls.pick.pop .maskrds, __ATOMIC_SEQ_CST );
831        __atomic_fetch_add( &global_stats.pick.pop .attempt, tls.pick.pop .attempt, __ATOMIC_SEQ_CST );
832        __atomic_fetch_add( &global_stats.pick.pop .success, tls.pick.pop .success, __ATOMIC_SEQ_CST );
833
834        __atomic_fetch_add( &global_stats.full.value, tls.full.value, __ATOMIC_SEQ_CST );
835        __atomic_fetch_add( &global_stats.full.count, tls.full.count, __ATOMIC_SEQ_CST );
836}
837#endif
Note: See TracBrowser for help on using the repository browser.