source: libcfa/src/concurrency/ready_queue.cfa @ 1b143de

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 1b143de was 1b143de, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Several fixes to relaxed_ready queue

  • Property mode set to 100644
File size: 31.2 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#define __cforall_thread__
17// #define __CFA_DEBUG_PRINT_READY_QUEUE__
18
19#include "bits/defs.hfa"
20#include "kernel_private.hfa"
21
22#define _GNU_SOURCE
23#include "stdlib.hfa"
24
25static const size_t cache_line_size = 64;
26
27// No overriden function, no environment variable, no define
28// fall back to a magic number
29#ifndef __CFA_MAX_PROCESSORS__
30        #define __CFA_MAX_PROCESSORS__ 128
31#endif
32
33// returns the maximum number of processors the RWLock support
34__attribute__((weak)) unsigned __max_processors() {
35        const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
36        if(!max_cores_s) {
37                __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
38                return __CFA_MAX_PROCESSORS__;
39        }
40
41        char * endptr = 0p;
42        long int max_cores_l = strtol(max_cores_s, &endptr, 10);
43        if(max_cores_l < 1 || max_cores_l > 65535) {
44                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
45                return __CFA_MAX_PROCESSORS__;
46        }
47        if('\0' != *endptr) {
48                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
49                return __CFA_MAX_PROCESSORS__;
50        }
51
52        return max_cores_l;
53}
54
55// Picks a random 1 bit in 'mask' according to random number 'rnum'.
56static inline unsigned rand_bit(unsigned rnum, __cfa_readyQ_mask_t mask) {
57#if defined( __i386 )
58        static_assert(sizeof(mask) == 4);
59        unsigned bit = mask ? rnum % __builtin_popcount(mask) : 0;
60        #if !defined(__BMI2__)
61                #error rand_bit not implemented for non __BMI2__ i386
62        #else
63                uint32_t picked = _pdep_u32(1ul << bit, mask);
64                return picked ? __builtin_ctz(picked) : 0;
65        #endif
66#elif defined( __x86_64 )
67        static_assert(sizeof(mask) == 8);
68        unsigned bit = mask ? rnum % __builtin_popcountl(mask) : 0;
69        #if !defined(__BMI2__)
70                uint64_t v = mask;   // Input value to find position with rank r.
71                unsigned int r = bit + 1;// Input: bit's desired rank [1-64].
72                unsigned int s;      // Output: Resulting position of bit with rank r [1-64]
73                uint64_t a, b, c, d; // Intermediate temporaries for bit count.
74                unsigned int t;      // Bit count temporary.
75
76                // Do a normal parallel bit count for a 64-bit integer,
77                // but store all intermediate steps.
78                a =  v - ((v >> 1) & ~0UL/3);
79                b = (a & ~0UL/5) + ((a >> 2) & ~0UL/5);
80                c = (b + (b >> 4)) & ~0UL/0x11;
81                d = (c + (c >> 8)) & ~0UL/0x101;
82
83
84                t = (d >> 32) + (d >> 48);
85                // Now do branchless select!
86                s  = 64;
87                s -= ((t - r) & 256) >> 3; r -= (t & ((t - r) >> 8));
88                t  = (d >> (s - 16)) & 0xff;
89                s -= ((t - r) & 256) >> 4; r -= (t & ((t - r) >> 8));
90                t  = (c >> (s - 8)) & 0xf;
91                s -= ((t - r) & 256) >> 5; r -= (t & ((t - r) >> 8));
92                t  = (b >> (s - 4)) & 0x7;
93                s -= ((t - r) & 256) >> 6; r -= (t & ((t - r) >> 8));
94                t  = (a >> (s - 2)) & 0x3;
95                s -= ((t - r) & 256) >> 7; r -= (t & ((t - r) >> 8));
96                t  = (v >> (s - 1)) & 0x1;
97                s -= ((t - r) & 256) >> 8;
98                return s - 1;
99        #else
100                uint64_t picked = _pdep_u64(1ul << bit, mask);
101                return picked ? __builtin_ctzl(picked) : 0;
102        #endif
103#elif defined( __ARM_ARCH )
104        #error rand_bit not implemented for arm
105#else
106        #error uknown hardware architecture
107#endif
108}
109
110
111//-----------------------------------------------------------------------------
112// Helpers used by extract
113// (_mask_bitsidx() & X) returns a bit index valid for a __cfa_readyQ_mask_t, where X is any integer
114static inline __cfa_readyQ_mask_t _mask_bitsidx () __attribute__ ((const)) { return (8 * sizeof(__cfa_readyQ_mask_t)) - 1; }
115
116// (X >> _mask_shiftidx()) retuns an index into an array of __cfa_readyQ_mask_t
117static inline __cfa_readyQ_mask_t _mask_shiftidx() __attribute__ ((const)) { return (8 * sizeof(__cfa_readyQ_mask_t)) - __builtin_clzl(_mask_bitsidx()); }
118
119
120// Assuming a large bit mask represented as an array of __cfa_readyQ_mask_t
121// Given an index into the large mask, returns the bit index and which __cfa_readyQ_mask_t index in the array
122static inline [__cfa_readyQ_mask_t, __cfa_readyQ_mask_t] extract(__cfa_readyQ_mask_t idx) {
123        __cfa_readyQ_mask_t word = idx >> _mask_shiftidx();
124        __cfa_readyQ_mask_t bit  = idx &  _mask_bitsidx();
125        return [bit, word];
126}
127
128//=======================================================================
129// Cluster wide reader-writer lock
130//=======================================================================
131void  ?{}(__clusterRWLock_t & this) {
132        this.max   = __max_processors();
133        this.alloc = 0;
134        this.ready = 0;
135        this.lock  = false;
136        this.data  = alloc(this.max);
137
138        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data    )) % 64) );
139        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
140        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
141        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
142
143}
144void ^?{}(__clusterRWLock_t & this) {
145        free(this.data);
146}
147
148void ?{}( __processor_id & this, struct processor * proc ) {
149        this.handle = proc;
150        this.lock   = false;
151}
152
153//=======================================================================
154// Lock-Free registering/unregistering of threads
155unsigned doregister2( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
156        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p with cluster %p\n", proc, cltr);
157
158        // Step - 1 : check if there is already space in the data
159        uint_fast32_t s = ready;
160
161        // Check among all the ready
162        for(uint_fast32_t i = 0; i < s; i++) {
163                processor * null = 0p; // Re-write every loop since compare thrashes it
164                if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
165                        && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
166                        /*paranoid*/ verify(i < ready);
167                        /*paranoid*/ verify(__alignof__(data[i]) == cache_line_size);
168                        /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
169                        return i;
170                }
171        }
172
173        if(max <= alloc) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
174
175        // Step - 2 : F&A to get a new spot in the array.
176        uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
177        if(max <= n) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
178
179        // Step - 3 : Mark space as used and then publish it.
180        __processor_id * storage = (__processor_id *)&data[n];
181        (*storage){ proc };
182        while(true) {
183                unsigned copy = n;
184                if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
185                        && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
186                        break;
187                asm volatile("pause");
188        }
189
190        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);
191
192        // Return new spot.
193        /*paranoid*/ verify(n < ready);
194        /*paranoid*/ verify(__alignof__(data[n]) == cache_line_size);
195        /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
196        return n;
197}
198
199void unregister2( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
200        unsigned id = proc->id;
201        /*paranoid*/ verify(id < ready);
202        /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
203        __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
204
205        __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
206}
207
208//-----------------------------------------------------------------------
209// Writer side : acquire when changing the ready queue, e.g. adding more
210//  queues or removing them.
211uint_fast32_t ready_mutate_lock( struct cluster & cltr ) with(cltr.ready_lock) {
212        // Step 1 : lock global lock
213        // It is needed to avoid processors that register mid Critical-Section
214        //   to simply lock their own lock and enter.
215        __atomic_acquire( &lock );
216
217        // Step 2 : lock per-proc lock
218        // Processors that are currently being registered aren't counted
219        //   but can't be in read_lock or in the critical section.
220        // All other processors are counted
221        uint_fast32_t s = ready;
222        for(uint_fast32_t i = 0; i < s; i++) {
223                __atomic_acquire( &data[i].lock );
224        }
225
226        return s;
227}
228
229void ready_mutate_unlock( struct cluster & cltr, uint_fast32_t last_s ) with(cltr.ready_lock) {
230        // Step 1 : release local locks
231        // This must be done while the global lock is held to avoid
232        //   threads that where created mid critical section
233        //   to race to lock their local locks and have the writer
234        //   immidiately unlock them
235        // Alternative solution : return s in write_lock and pass it to write_unlock
236        for(uint_fast32_t i = 0; i < last_s; i++) {
237                verify(data[i].lock);
238                __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
239        }
240
241        // Step 2 : release global lock
242        /*paranoid*/ assert(true == lock);
243        __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
244}
245
246//=======================================================================
247// Intrusive Queue used by ready queue
248//=======================================================================
249// Get the head pointer (one before the first element) from the anchor
250static inline $thread * head(const __intrusive_lane_t & this) {
251        $thread * rhead = ($thread *)(
252                (uintptr_t)( &this.before ) - offsetof( $thread, link )
253        );
254        /* paranoid */ verify(rhead);
255        return rhead;
256}
257
258// Get the tail pointer (one after the last element) from the anchor
259static inline $thread * tail(const __intrusive_lane_t & this) {
260        $thread * rtail = ($thread *)(
261                (uintptr_t)( &this.after ) - offsetof( $thread, link )
262        );
263        /* paranoid */ verify(rtail);
264        return rtail;
265}
266
267// Ctor
268void ?{}( __intrusive_lane_t & this ) {
269        this.lock = false;
270        #if defined(__CFA_WITH_VERIFY__)
271                this.last_id = -1u;
272                this.count = 0u;
273        #endif
274
275        this.before.link.prev = 0p;
276        this.before.link.next = tail(this);
277        this.before.link.ts   = 0;
278
279        this.after .link.prev = head(this);
280        this.after .link.next = 0p;
281        this.after .link.ts   = 0;
282
283        #if !defined(__CFA_NO_SCHED_STATS__)
284                this.stat.diff = 0;
285                this.stat.push = 0;
286                this.stat.pop  = 0;
287        #endif
288
289        // We add a boat-load of assertions here because the anchor code is very fragile
290        /* paranoid */ verify(((uintptr_t)( head(this) ) + offsetof( $thread, link )) == (uintptr_t)(&this.before));
291        /* paranoid */ verify(((uintptr_t)( tail(this) ) + offsetof( $thread, link )) == (uintptr_t)(&this.after ));
292        /* paranoid */ verify(head(this)->link.prev == 0p );
293        /* paranoid */ verify(head(this)->link.next == tail(this) );
294        /* paranoid */ verify(tail(this)->link.next == 0p );
295        /* paranoid */ verify(tail(this)->link.prev == head(this) );
296        /* paranoid */ verify(&head(this)->link.prev == &this.before.link.prev );
297        /* paranoid */ verify(&head(this)->link.next == &this.before.link.next );
298        /* paranoid */ verify(&tail(this)->link.prev == &this.after .link.prev );
299        /* paranoid */ verify(&tail(this)->link.next == &this.after .link.next );
300        /* paranoid */ verify(sizeof(__intrusive_lane_t) == 128);
301        /* paranoid */ verify(sizeof(this) == 128);
302        /* paranoid */ verify(__alignof__(__intrusive_lane_t) == 128);
303        /* paranoid */ verify(__alignof__(this) == 128);
304        /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128));
305
306        /* paranoid */ verifyf(_mask_shiftidx() == 6 , "%llu", _mask_shiftidx());
307        /* paranoid */ verifyf(_mask_bitsidx () == 63, "%llu", _mask_bitsidx());
308}
309
310// Dtor is trivial
311void ^?{}( __intrusive_lane_t & this ) {
312        // Make sure the list is empty
313        /* paranoid */ verify(head(this)->link.prev == 0p );
314        /* paranoid */ verify(head(this)->link.next == tail(this) );
315        /* paranoid */ verify(tail(this)->link.next == 0p );
316        /* paranoid */ verify(tail(this)->link.prev == head(this) );
317        /* paranoid */ verify(this.count == 0u );
318}
319
320// Push a thread onto this lane
321// returns true of lane was empty before push, false otherwise
322bool push(__intrusive_lane_t & this, $thread * node) {
323        #if defined(__CFA_WITH_VERIFY__)
324                /* paranoid */ verify(this.lock);
325                /* paranoid */ verify(node->link.ts != 0);
326                /* paranoid */ verify(node->link.next == 0p);
327                /* paranoid */ verify(node->link.prev == 0p);
328                /* paranoid */ verify(tail(this)->link.next == 0p);
329                /* paranoid */ verify(head(this)->link.prev == 0p);
330
331                this.count++;
332
333                if(this.before.link.ts == 0l) {
334                        /* paranoid */ verify(tail(this)->link.prev == head(this));
335                        /* paranoid */ verify(head(this)->link.next == tail(this));
336                } else {
337                        /* paranoid */ verify(tail(this)->link.prev != head(this));
338                        /* paranoid */ verify(head(this)->link.next != tail(this));
339                }
340        #endif
341
342        // Get the relevant nodes locally
343        $thread * tail = tail(this);
344        $thread * prev = tail->link.prev;
345
346        // Do the push
347        node->link.next = tail;
348        node->link.prev = prev;
349        prev->link.next = node;
350        tail->link.prev = node;
351
352        // Update stats
353        #if !defined(__CFA_NO_SCHED_STATS__)
354                this.stat.diff++;
355                this.stat.push++;
356        #endif
357
358        verify(node->link.next == tail(this));
359
360        // Check if the queue used to be empty
361        if(this.before.link.ts == 0l) {
362                this.before.link.ts = node->link.ts;
363                /* paranoid */ verify(node->link.prev == head(this));
364                return true;
365        }
366        return false;
367}
368
369// Pop a thread from this lane (must be non-empty)
370// returns popped
371// returns true of lane was empty before push, false otherwise
372[$thread *, bool] pop(__intrusive_lane_t & this) {
373        /* paranoid */ verify(this.lock);
374        /* paranoid */ verify(this.before.link.ts != 0ul);
375
376        // Get anchors locally
377        $thread * head = head(this);
378        $thread * tail = tail(this);
379
380        // Get the relevant nodes locally
381        $thread * node = head->link.next;
382        $thread * next = node->link.next;
383
384        #if defined(__CFA_WITH_VERIFY__)
385                this.count--;
386                /* paranoid */ verify(node != tail);
387                /* paranoid */ verify(node);
388        #endif
389
390        // Do the pop
391        head->link.next = next;
392        next->link.prev = head;
393        node->link.[next, prev] = 0p;
394
395        // Update head time stamp
396        this.before.link.ts = next->link.ts;
397
398        // Update stats
399        #ifndef __CFA_NO_SCHED_STATS__
400                this.stat.diff--;
401                this.stat.pop ++;
402        #endif
403
404        // Check if we emptied list and return accordingly
405        /* paranoid */ verify(tail(this)->link.next == 0p);
406        /* paranoid */ verify(head(this)->link.prev == 0p);
407        if(next == tail) {
408                /* paranoid */ verify(this.before.link.ts == 0);
409                /* paranoid */ verify(tail(this)->link.prev == head(this));
410                /* paranoid */ verify(head(this)->link.next == tail(this));
411                return [node, true];
412        }
413        else {
414                /* paranoid */ verify(next->link.ts != 0);
415                /* paranoid */ verify(tail(this)->link.prev != head(this));
416                /* paranoid */ verify(head(this)->link.next != tail(this));
417                /* paranoid */ verify(this.before.link.ts != 0);
418                return [node, false];
419        }
420}
421
422// Check whether or not list is empty
423static inline bool is_empty(__intrusive_lane_t & this) {
424        // Cannot verify here since it may not be locked
425        return this.before.link.ts == 0;
426}
427
428// Return the timestamp
429static inline unsigned long long ts(__intrusive_lane_t & this) {
430        // Cannot verify here since it may not be locked
431        return this.before.link.ts;
432}
433
434//=======================================================================
435// Cforall Reqdy Queue used by ready queue
436//=======================================================================
437
438// Thread local mirror of ready queue statistics
439#if !defined(__CFA_NO_STATISTICS__)
440static __attribute__((aligned(128))) thread_local struct {
441        struct {
442                struct {
443                        size_t attempt;
444                        size_t success;
445                } push;
446                struct {
447                        size_t maskrds;
448                        size_t attempt;
449                        size_t success;
450                } pop;
451        } pick;
452        struct {
453                size_t value;
454                size_t count;
455        } used;
456} tls = {
457        /* pick */{
458                /* push */{ 0, 0 },
459                /* pop  */{ 0, 0, 0 },
460        },
461        /* used */{ 0, 0 }
462};
463#endif
464
465//-----------------------------------------------------------------------
466
467void ?{}(__ready_queue_t & this) with (this) {
468        used.count = 0;
469        for( i ; __cfa_lane_mask_size ) {
470                used.mask[i] = 0;
471        }
472
473        lanes.data = alloc(4);
474        for( i; 4 ) {
475                (lanes.data[i]){};
476        }
477        lanes.count = 4;
478
479        #if !defined(__CFA_NO_STATISTICS__)
480                global_stats.pick.push.attempt = 0;
481                global_stats.pick.push.success = 0;
482                global_stats.pick.pop .maskrds = 0;
483                global_stats.pick.pop .attempt = 0;
484                global_stats.pick.pop .success = 0;
485
486                global_stats.used.value = 0;
487                global_stats.used.count = 0;
488        #endif
489}
490
491void ^?{}(__ready_queue_t & this) with (this) {
492        verify( 4  == lanes.count );
493        verify( 0  == used .count );
494
495        for( i; 4 ) {
496                ^(lanes.data[i]){};
497        }
498        free(lanes.data);
499
500
501        #if defined(__CFA_WITH_VERIFY__)
502                for( i ; __cfa_lane_mask_size ) {
503                        assert( 0 == used.mask[i] );
504                }
505        #endif
506}
507
508//-----------------------------------------------------------------------
509enum mask_strictness {
510        STRICT,
511        NOCHECK
512};
513
514// Set a given bit in the bit mask array
515// strictness determines of the bit had to be cleared before
516static inline void mask_set(__cfa_readyQ_mask_t * mask, unsigned index, mask_strictness strict) {
517        // Extract the array and bit indexes
518        __cfa_readyQ_mask_t word;
519        __cfa_readyQ_mask_t bit;
520        [bit, word] = extract(index);
521
522        __cfadbg_print_safe(ready_queue, "Kernel : Ready queue extracted index %u as [bit %llu, word %llu]\n", index, bit, word);
523
524        // Conditional check
525        verifyf(
526                strict != STRICT || // Conditional check if it was expected to be cleared
527                ((mask[word] & (1ull << bit)) == 0),
528                "Before set %llu:%llu (%u), %llx & %llx", word, bit, index, mask[word], (1ull << bit)
529        );
530
531        // Atomically set the bit
532        __attribute__((unused)) bool ret = __atomic_bts(&mask[word], bit);
533
534        // Conditional check
535        verifyf(
536                strict != STRICT || // Conditional check if it was expected to be cleared
537                !ret,
538                "Bit was not set but bts returned true"
539        );
540
541        // Unconditional check
542        verifyf(
543                (mask[word] & (1ull << bit)) != 0,
544                "After set %llu:%llu (%u), %llx & %llx", word, bit, index, mask[word], (1ull << bit)
545        );
546}
547
548static inline void mask_clear(__cfa_readyQ_mask_t * mask, unsigned index, mask_strictness strict) {
549        // Extract the array and bit indexes
550        __cfa_readyQ_mask_t word;
551        __cfa_readyQ_mask_t bit;
552        [bit, word] = extract(index);
553
554        // Conditional check
555        verifyf(
556                strict != STRICT || // Conditional check if it was expected to be set
557                ((mask[word] & (1ull << bit)) != 0),
558                "Before clear %llu:%llu (%u), %llx & %llx", word, bit, index, mask[word], (1ull << bit)
559        );
560
561        // Atomically clear the bit
562        __attribute__((unused)) bool ret = __atomic_btr(&mask[word], bit);
563
564        // Conditional check
565        verifyf(
566                strict != STRICT || // Conditional check if it was expected to be cleared
567                ret,
568                "Bit was set but btr returned false"
569        );
570
571        // Unconditional check
572        verifyf(
573                (mask[word] & (1ull << bit)) == 0,
574                "After clear %llu:%llu (%u), %llx & %llx", word, bit, index, mask[word], (1ull << bit)
575        );
576}
577
578//-----------------------------------------------------------------------
579__attribute__((hot)) bool push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
580        __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p (mask %llu)\n", thrd, cltr, used.mask[0]);
581
582        // write timestamp
583        thrd->link.ts = rdtscl();
584
585        // Try to pick a lane and lock it
586        unsigned i;
587        do {
588                // Pick the index of a lane
589                i = __tls_rand() % lanes.count;
590
591                #if !defined(__CFA_NO_STATISTICS__)
592                        tls.pick.push.attempt++;
593                #endif
594
595                // If we can't lock it retry
596        } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
597
598        #if defined(__CFA_WITH_VERIFY__)
599                /* paranoid */ verify(lanes.data[i].last_id == -1u);
600                /* paranoid */ lanes.data[i].last_id = kernelTLS.this_processor->id;
601        #endif
602
603        __attribute__((unused)) size_t num = __atomic_load_n( &used.count, __ATOMIC_RELAXED );
604        bool first = false;
605
606        // Actually push it
607        bool lane_first = push(lanes.data[i], thrd);
608
609        // If this lane used to be empty we need to do more
610        if(lane_first) {
611                // Update the bit mask
612                mask_set((__cfa_readyQ_mask_t *)used.mask, i, STRICT);
613
614                // Update the global count
615                size_t ret = __atomic_fetch_add( &used.count, 1z, __ATOMIC_SEQ_CST);
616
617                // Check if the entire queue used to be empty
618                first = (ret == 0);
619        }
620
621        #if defined(__CFA_WITH_VERIFY__)
622                /* paranoid */ verifyf( used.count <= lanes.count, "Non-empty count (%zu) exceeds actual count (%zu)\n", used.count, lanes.count );
623                /* paranoid */ verifyf( lanes.data[i].last_id == kernelTLS.this_processor->id, "Expected last processor to lock queue %u to be %u, was %u\n", i, lanes.data[i].last_id, kernelTLS.this_processor->id );
624                /* paranoid */ verifyf( lanes.data[i].lock, "List %u is not locked\n", i );
625                /* paranoid */ lanes.data[i].last_id = -1u;
626        #endif
627
628        // Unlock and return
629        __atomic_unlock( &lanes.data[i].lock );
630
631        __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
632
633        // Update statistics
634        #if !defined(__CFA_NO_STATISTICS__)
635                tls.pick.push.success++;
636                tls.used.value += num;
637                tls.used.count += 1;
638        #endif
639
640        // return whether or not the list was empty before this push
641        return first;
642}
643
644//-----------------------------------------------------------------------
645// Given 2 indexes, pick the list with the oldest push an try to pop from it
646static struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
647        #if !defined(__CFA_NO_STATISTICS__)
648                tls.pick.pop.attempt++;
649        #endif
650
651        // Pick the bet list
652        int w = i;
653        if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
654                w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
655        }
656
657        // Get relevant elements locally
658        __intrusive_lane_t & lane = lanes.data[w];
659
660        // If list looks empty retry
661        if( is_empty(lane) ) return 0p;
662
663        // If we can't get the lock retry
664        if( !__atomic_try_acquire(&lane.lock) ) return 0p;
665
666        #if defined(__CFA_WITH_VERIFY__)
667                /* paranoid */ verify(lane.last_id == -1u);
668                /* paranoid */ lane.last_id = kernelTLS.this_processor->id;
669        #endif
670
671
672        // If list is empty, unlock and retry
673        if( is_empty(lane) ) {
674                #if defined(__CFA_WITH_VERIFY__)
675                        /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id);
676                        /* paranoid */ lane.last_id = -1u;
677                #endif
678
679                __atomic_unlock(&lane.lock);
680                return 0p;
681        }
682
683        // Actually pop the list
684        struct $thread * thrd;
685        bool emptied;
686        [thrd, emptied] = pop(lane);
687
688        /* paranoid */ verify(thrd);
689        /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id);
690        /* paranoid */ verify(lane.lock);
691
692        // If this was the last element in the lane
693        if(emptied) {
694                // Update the global count
695                __atomic_fetch_sub( &used.count, 1z, __ATOMIC_SEQ_CST);
696
697                // Update the bit mask
698                mask_clear((__cfa_readyQ_mask_t *)used.mask, w, STRICT);
699        }
700
701        #if defined(__CFA_WITH_VERIFY__)
702                /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id);
703                /* paranoid */ lane.last_id = -1u;
704        #endif
705
706        // For statistics, check the count before we release the lock
707        #if !defined(__CFA_NO_STATISTICS__)
708                int num = __atomic_load_n( &used.count, __ATOMIC_RELAXED );
709        #endif
710
711        // Unlock and return
712        __atomic_unlock(&lane.lock);
713
714        // Update statistics
715        #if !defined(__CFA_NO_STATISTICS__)
716                tls.pick.pop.success++;
717                tls.used.value += num;
718                tls.used.count += 1;
719        #endif
720
721        // return the popped thread
722        return thrd;
723}
724
725// Pop from the ready queue from a given cluster
726__attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) {
727        /* paranoid */ verify( lanes.count > 0 );
728
729        // As long as the list is not empty, try finding a lane that isn't empty and pop from it
730        while( __atomic_load_n( &used.count, __ATOMIC_RELAXED ) != 0) {
731                #if !defined(__CFA_READQ_NO_BITMASK__)
732                        // If using bit masks
733                        #if !defined(__CFA_NO_SCHED_STATS__)
734                                tls.pick.pop.maskrds++;
735                        #endif
736
737                        // Pick two lists at random
738                        unsigned ri = __tls_rand();
739                        unsigned rj = __tls_rand();
740
741                        // Find which __cfa_readyQ_mask_t the two lists belong
742                        unsigned num = ((__atomic_load_n( &lanes.count, __ATOMIC_RELAXED ) - 1) >> 6) + 1;
743                        unsigned wdxi = (ri >> 6u) % num;
744                        unsigned wdxj = (rj >> 6u) % num;
745
746                        // Get the actual __cfa_readyQ_mask_t
747                        size_t maski = __atomic_load_n( &used.mask[wdxi], __ATOMIC_RELAXED );
748                        size_t maskj = __atomic_load_n( &used.mask[wdxj], __ATOMIC_RELAXED );
749
750                        // If both of these masks are empty, retry
751                        if(maski == 0 && maskj == 0) continue;
752
753                        // Pick one of the non-zero bits in the masks and get the bit indexes
754                        unsigned bi = rand_bit(ri, maski);
755                        unsigned bj = rand_bit(rj, maskj);
756
757                        // some checks
758                        /* paranoid */ verifyf(bi < 64, "%zu %u", maski, bi);
759                        /* paranoid */ verifyf(bj < 64, "%zu %u", maskj, bj);
760
761                        // get the general list index
762                        unsigned i = bi | (wdxi << 6);
763                        unsigned j = bj | (wdxj << 6);
764
765                        // some more checks
766                        /* paranoid */ verifyf(i < lanes.count, "%u", wdxi << 6);
767                        /* paranoid */ verifyf(j < lanes.count, "%u", wdxj << 6);
768
769                        // try popping from the 2 picked lists
770                        struct $thread * thrd = try_pop(cltr, i, j);
771                        if(thrd) return thrd;
772                #else
773                        // Pick two lists at random
774                        int i = __tls_rand() % __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
775                        int j = __tls_rand() % __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
776
777                        // try popping from the 2 picked lists
778                        struct $thread * thrd = try_pop(cltr, i, j);
779                        if(thrd) return thrd;
780                #endif
781        }
782
783        // All lanes where empty return 0p
784        return 0p;
785}
786
787//-----------------------------------------------------------------------
788
789static void check( __ready_queue_t & q ) with (q) {
790        #if defined(__CFA_WITH_VERIFY__)
791                {
792                        int idx = 0;
793                        for( w ; __cfa_lane_mask_size ) {
794                                for( b ; 8 * sizeof(__cfa_readyQ_mask_t) ) {
795                                        bool is_empty = idx < lanes.count ? (ts(lanes.data[idx]) == 0) : true;
796                                        bool should_be_empty = 0 == (used.mask[w] & (1z << b));
797                                        assertf(should_be_empty == is_empty, "Inconsistent list %d, mask expect : %d, actual is got %d", idx, should_be_empty, (bool)is_empty);
798                                        assert(__cfa_max_lanes > idx);
799                                        idx++;
800                                }
801                        }
802                }
803
804                {
805                        for( idx ; lanes.count ) {
806                                __intrusive_lane_t & sl = lanes.data[idx];
807                                assert(!lanes.data[idx].lock);
808
809                                assert(head(sl)->link.prev == 0p );
810                                assert(head(sl)->link.next->link.prev == head(sl) );
811                                assert(tail(sl)->link.next == 0p );
812                                assert(tail(sl)->link.prev->link.next == tail(sl) );
813
814                                if(sl.before.link.ts == 0l) {
815                                        assert(tail(sl)->link.prev == head(sl));
816                                        assert(head(sl)->link.next == tail(sl));
817                                } else {
818                                        assert(tail(sl)->link.prev != head(sl));
819                                        assert(head(sl)->link.next != tail(sl));
820                                }
821                        }
822                }
823        #endif
824}
825
826// Call this function of the intrusive list was moved using memcpy
827// fixes the list so that the pointers back to anchors aren't left dangling
828static inline void fix(__intrusive_lane_t & ll) {
829        // if the list is not empty then follow he pointer and fix its reverse
830        if(!is_empty(ll)) {
831                head(ll)->link.next->link.prev = head(ll);
832                tail(ll)->link.prev->link.next = tail(ll);
833        }
834        // Otherwise just reset the list
835        else {
836                verify(tail(ll)->link.next == 0p);
837                tail(ll)->link.prev = head(ll);
838                head(ll)->link.next = tail(ll);
839                verify(head(ll)->link.prev == 0p);
840        }
841}
842
843// Grow the ready queue
844void ready_queue_grow  (struct cluster * cltr) {
845        // Lock the RWlock so no-one pushes/pops while we are changing the queue
846        uint_fast32_t last_size = ready_mutate_lock( *cltr );
847
848        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
849
850        // Make sure that everything is consistent
851        /* paranoid */ check( cltr->ready_queue );
852
853        // grow the ready queue
854        with( cltr->ready_queue ) {
855                size_t ncount = lanes.count;
856
857                // Check that we have some space left
858                if(ncount + 4 >= __cfa_max_lanes) abort("Program attempted to create more than maximum number of Ready Queues (%zu)", __cfa_max_lanes);
859
860                // increase count
861                ncount += 4;
862
863                // Allocate new array (uses realloc and memcpies the data)
864                lanes.data = alloc(lanes.data, ncount);
865
866                // Fix the moved data
867                for( idx; (size_t)lanes.count ) {
868                        fix(lanes.data[idx]);
869                }
870
871                // Construct new data
872                for( idx; (size_t)lanes.count ~ ncount) {
873                        (lanes.data[idx]){};
874                }
875
876                // Update original
877                lanes.count = ncount;
878
879                // fields in 'used' don't need to change when growing
880        }
881
882        // Make sure that everything is consistent
883        /* paranoid */ check( cltr->ready_queue );
884
885        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
886
887        // Unlock the RWlock
888        ready_mutate_unlock( *cltr, last_size );
889}
890
891// Shrink the ready queue
892void ready_queue_shrink(struct cluster * cltr) {
893        // Lock the RWlock so no-one pushes/pops while we are changing the queue
894        uint_fast32_t last_size = ready_mutate_lock( *cltr );
895
896        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
897
898        // Make sure that everything is consistent
899        /* paranoid */ check( cltr->ready_queue );
900
901        with( cltr->ready_queue ) {
902                // Make sure that the total thread count stays the same
903                #if defined(__CFA_WITH_VERIFY__)
904                        size_t nthreads = 0;
905                        for( idx; (size_t)lanes.count ) {
906                                nthreads += lanes.data[idx].count;
907                        }
908                #endif
909
910                size_t ocount = lanes.count;
911                // Check that we have some space left
912                if(ocount < 8) abort("Program attempted to destroy more Ready Queues than were created");
913
914                // reduce the actual count so push doesn't use the old queues
915                lanes.count -= 4;
916                verify(ocount > lanes.count);
917
918                // for printing count the number of displaced threads
919                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
920                        __attribute__((unused)) size_t displaced = 0;
921                #endif
922
923                // redistribute old data
924                for( idx; (size_t)lanes.count ~ ocount) {
925                        // Lock is not strictly needed but makes checking invariants much easier
926                        __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
927                        verify(locked);
928
929                        // As long as we can pop from this lane to push the threads somewhere else in the queue
930                        while(!is_empty(lanes.data[idx])) {
931                                struct $thread * thrd;
932                                __attribute__((unused)) bool _;
933                                [thrd, _] = pop(lanes.data[idx]);
934
935                                push(cltr, thrd);
936
937                                // for printing count the number of displaced threads
938                                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
939                                        displaced++;
940                                #endif
941                        }
942
943                        mask_clear((__cfa_readyQ_mask_t *)used.mask, idx, NOCHECK);
944
945                        // Unlock the lane
946                        __atomic_unlock(&lanes.data[idx].lock);
947
948                        // TODO print the queue statistics here
949
950                        ^(lanes.data[idx]){};
951                }
952
953                __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
954
955                // recompute the used.count instead of maintaining it
956                used.count = 0;
957                for( i ; __cfa_lane_mask_size ) {
958                        used.count += __builtin_popcountl(used.mask[i]);
959                }
960
961                // Allocate new array (uses realloc and memcpies the data)
962                lanes.data = alloc(lanes.data, lanes.count);
963
964                // Fix the moved data
965                for( idx; (size_t)lanes.count ) {
966                        fix(lanes.data[idx]);
967                }
968
969                // Make sure that the total thread count stayed the same
970                #if defined(__CFA_WITH_VERIFY__)
971                        for( idx; (size_t)lanes.count ) {
972                                nthreads -= lanes.data[idx].count;
973                        }
974                        verifyf(nthreads == 0, "Shrinking changed number of threads");
975                #endif
976        }
977
978        // Make sure that everything is consistent
979        /* paranoid */ check( cltr->ready_queue );
980
981        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
982
983        // Unlock the RWlock
984        ready_mutate_unlock( *cltr, last_size );
985}
986
987//-----------------------------------------------------------------------
988
989#if !defined(__CFA_NO_STATISTICS__)
990void stats_tls_tally(struct cluster * cltr) with (cltr->ready_queue) {
991        __atomic_fetch_add( &global_stats.pick.push.attempt, tls.pick.push.attempt, __ATOMIC_SEQ_CST );
992        __atomic_fetch_add( &global_stats.pick.push.success, tls.pick.push.success, __ATOMIC_SEQ_CST );
993        __atomic_fetch_add( &global_stats.pick.pop .maskrds, tls.pick.pop .maskrds, __ATOMIC_SEQ_CST );
994        __atomic_fetch_add( &global_stats.pick.pop .attempt, tls.pick.pop .attempt, __ATOMIC_SEQ_CST );
995        __atomic_fetch_add( &global_stats.pick.pop .success, tls.pick.pop .success, __ATOMIC_SEQ_CST );
996
997        __atomic_fetch_add( &global_stats.used.value, tls.used.value, __ATOMIC_SEQ_CST );
998        __atomic_fetch_add( &global_stats.used.count, tls.used.count, __ATOMIC_SEQ_CST );
999}
1000#endif
Note: See TracBrowser for help on using the repository browser.