source: libcfa/src/concurrency/ready_queue.cfa @ dca5802

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since dca5802 was dca5802, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Started doing some of the x86 implementations and some changes after a code review

  • Property mode set to 100644
File size: 30.1 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#define __cforall_thread__
17
18#include "bits/defs.hfa"
19#include "kernel_private.hfa"
20
21#define _GNU_SOURCE
22#include "stdlib.hfa"
23
24static const size_t cache_line_size = 64;
25
26// No overriden function, no environment variable, no define
27// fall back to a magic number
28#ifndef __CFA_MAX_PROCESSORS__
29        #define __CFA_MAX_PROCESSORS__ 128
30#endif
31
32// returns the maximum number of processors the RWLock support
33__attribute__((weak)) unsigned __max_processors() {
34        const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
35        if(!max_cores_s) {
36                __cfaabi_dbg_print_nolock("No CFA_MAX_PROCESSORS in ENV");
37                return __CFA_MAX_PROCESSORS__;
38        }
39
40        char * endptr = 0p;
41        long int max_cores_l = strtol(max_cores_s, &endptr, 10);
42        if(max_cores_l < 1 || max_cores_l > 65535) {
43                __cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS out of range : %ld", max_cores_l);
44                return __CFA_MAX_PROCESSORS__;
45        }
46        if('\0' != *endptr) {
47                __cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS not a decimal number : %s", max_cores_s);
48                return __CFA_MAX_PROCESSORS__;
49        }
50
51        return max_cores_l;
52}
53
54// Picks a random 1 bit in 'mask' according to random number 'rnum'.
55static inline unsigned rand_bit(unsigned rnum, __cfa_readyQ_mask_t mask) {
56#if defined( __i386 )
57        static_assert(sizeof(mask) == 4);
58        unsigned bit = mask ? rnum % __builtin_popcount(mask) : 0;
59        #if !defined(__BMI2__)
60                #error rand_bit not implemented for non __BMI2__ i386
61        #else
62                uint32_t picked = _pdep_u32(1ul << bit, mask);
63                return picked ? __builtin_ctz(picked) : 0;
64        #endif
65#elif defined( __x86_64 )
66        static_assert(sizeof(mask) == 8);
67        unsigned bit = mask ? rnum % __builtin_popcountl(mask) : 0;
68        #if !defined(__BMI2__)
69                uint64_t v = mask;   // Input value to find position with rank r.
70                unsigned int r = bit + 1;// Input: bit's desired rank [1-64].
71                unsigned int s;      // Output: Resulting position of bit with rank r [1-64]
72                uint64_t a, b, c, d; // Intermediate temporaries for bit count.
73                unsigned int t;      // Bit count temporary.
74
75                // Do a normal parallel bit count for a 64-bit integer,
76                // but store all intermediate steps.
77                a =  v - ((v >> 1) & ~0UL/3);
78                b = (a & ~0UL/5) + ((a >> 2) & ~0UL/5);
79                c = (b + (b >> 4)) & ~0UL/0x11;
80                d = (c + (c >> 8)) & ~0UL/0x101;
81
82
83                t = (d >> 32) + (d >> 48);
84                // Now do branchless select!
85                s  = 64;
86                s -= ((t - r) & 256) >> 3; r -= (t & ((t - r) >> 8));
87                t  = (d >> (s - 16)) & 0xff;
88                s -= ((t - r) & 256) >> 4; r -= (t & ((t - r) >> 8));
89                t  = (c >> (s - 8)) & 0xf;
90                s -= ((t - r) & 256) >> 5; r -= (t & ((t - r) >> 8));
91                t  = (b >> (s - 4)) & 0x7;
92                s -= ((t - r) & 256) >> 6; r -= (t & ((t - r) >> 8));
93                t  = (a >> (s - 2)) & 0x3;
94                s -= ((t - r) & 256) >> 7; r -= (t & ((t - r) >> 8));
95                t  = (v >> (s - 1)) & 0x1;
96                s -= ((t - r) & 256) >> 8;
97                return s - 1;
98        #else
99                uint64_t picked = _pdep_u64(1ul << bit, mask);
100                return picked ? __builtin_ctzl(picked) : 0;
101        #endif
102#elif defined( __ARM_ARCH )
103        #error rand_bit not implemented for arm
104#else
105        #error uknown hardware architecture
106#endif
107}
108
109
110//-----------------------------------------------------------------------------
111// Helpers used by extract
112// (_mask_bitsidx() & X) returns a bit index valid for a __cfa_readyQ_mask_t, where X is any integer
113static inline __cfa_readyQ_mask_t _mask_bitsidx () { return (8 * sizeof(__cfa_readyQ_mask_t)) - 1; }
114
115// (X >> _mask_shiftidx()) retuns an index into an array of __cfa_readyQ_mask_t
116static inline __cfa_readyQ_mask_t _mask_shiftidx() { return (8 * sizeof(__cfa_readyQ_mask_t)) - __builtin_clzl(_mask_bitsidx()); }
117
118
119// Assuming a large bit mask represented as an array of __cfa_readyQ_mask_t
120// Given an index into the large mask, returns the bit index and which __cfa_readyQ_mask_t index in the array
121static inline [__cfa_readyQ_mask_t, __cfa_readyQ_mask_t] extract(__cfa_readyQ_mask_t idx) {
122        __cfa_readyQ_mask_t word = idx >> _mask_bitsidx();
123        __cfa_readyQ_mask_t bit  = idx &  _mask_shiftidx();
124        return [bit, word];
125}
126
127//=======================================================================
128// Cluster wide reader-writer lock
129//=======================================================================
130void  ?{}(__clusterRWLock_t & this) {
131        this.max   = __max_processors();
132        this.alloc = 0;
133        this.ready = 0;
134        this.lock  = false;
135        this.data  = alloc(this.max);
136
137        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data    )) % 64) );
138        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
139        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
140        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
141
142}
143void ^?{}(__clusterRWLock_t & this) {
144        free(this.data);
145}
146
147void ?{}( __processor_id & this, struct processor * proc ) {
148        this.handle = proc;
149        this.lock   = false;
150}
151
152//=======================================================================
153// Lock-Free registering/unregistering of threads
154unsigned doregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
155        // Step - 1 : check if there is already space in the data
156        uint_fast32_t s = ready;
157
158        // Check among all the ready
159        for(uint_fast32_t i = 0; i < s; i++) {
160                processor * null = 0p; // Re-write every loop since compare thrashes it
161                if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
162                        && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
163                        /*paranoid*/ verify(i < ready);
164                        /*paranoid*/ verify(__alignof__(data[i]) == cache_line_size);
165                        /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
166                        return i;
167                }
168        }
169
170        if(max <= alloc) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
171
172        // Step - 2 : F&A to get a new spot in the array.
173        uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
174        if(max <= n) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
175
176        // Step - 3 : Mark space as used and then publish it.
177        __processor_id * storage = (__processor_id *)&data[n];
178        (*storage){ proc };
179        while(true) {
180                unsigned copy = n;
181                if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
182                        && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
183                        break;
184                asm volatile("pause");
185        }
186
187        // Return new spot.
188        /*paranoid*/ verify(n < ready);
189        /*paranoid*/ verify(__alignof__(data[n]) == cache_line_size);
190        /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
191        return n;
192}
193
194void unregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
195        unsigned id = proc->id;
196        /*paranoid*/ verify(id < ready);
197        /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
198        __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
199}
200
201//-----------------------------------------------------------------------
202// Writer side : acquire when changing the ready queue, e.g. adding more
203//  queues or removing them.
204uint_fast32_t ready_mutate_lock( struct cluster & cltr ) with(cltr.ready_lock) {
205        // Step 1 : lock global lock
206        // It is needed to avoid processors that register mid Critical-Section
207        //   to simply lock their own lock and enter.
208        __atomic_acquire( &lock );
209
210        // Step 2 : lock per-proc lock
211        // Processors that are currently being registered aren't counted
212        //   but can't be in read_lock or in the critical section.
213        // All other processors are counted
214        uint_fast32_t s = ready;
215        for(uint_fast32_t i = 0; i < s; i++) {
216                __atomic_acquire( &data[i].lock );
217        }
218
219        return s;
220}
221
222void ready_mutate_unlock( struct cluster & cltr, uint_fast32_t last_s ) with(cltr.ready_lock) {
223        // Step 1 : release local locks
224        // This must be done while the global lock is held to avoid
225        //   threads that where created mid critical section
226        //   to race to lock their local locks and have the writer
227        //   immidiately unlock them
228        // Alternative solution : return s in write_lock and pass it to write_unlock
229        for(uint_fast32_t i = 0; i < last_s; i++) {
230                verify(data[i].lock);
231                __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
232        }
233
234        // Step 2 : release global lock
235        /*paranoid*/ assert(true == lock);
236        __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
237}
238
239//=======================================================================
240// Intrusive Queue used by ready queue
241//=======================================================================
242// Get the head pointer (one before the first element) from the anchor
243static inline thread_desc * head(const __intrusive_lane_t & this) {
244        thread_desc * rhead = (thread_desc *)(
245                (uintptr_t)( &this.before ) - offsetof( thread_desc, link )
246        );
247        /* paranoid */ verify(rhead);
248        return rhead;
249}
250
251// Get the tail pointer (one after the last element) from the anchor
252static inline thread_desc * tail(const __intrusive_lane_t & this) {
253        thread_desc * rtail = (thread_desc *)(
254                (uintptr_t)( &this.after ) - offsetof( thread_desc, link )
255        );
256        /* paranoid */ verify(rtail);
257        return rtail;
258}
259
260// Ctor
261void ?{}( __intrusive_lane_t & this ) {
262        this.lock = false;
263        this.last_id = -1u;
264        this.count = 0u;
265
266        this.before.link.prev = 0p;
267        this.before.link.next = tail(this);
268        this.before.link.ts   = 0;
269
270        this.after .link.prev = head(this);
271        this.after .link.next = 0p;
272        this.after .link.ts   = 0;
273
274        #if !defined(__CFA_NO_SCHED_STATS__)
275                this.stat.diff = 0;
276                this.stat.push = 0;
277                this.stat.pop  = 0;
278        #endif
279
280        // We add a boat-load of assertions here because the anchor code is very fragile
281        /* paranoid */ verify(((uintptr_t)( head(this) ) + offsetof( thread_desc, link )) == (uintptr_t)(&this.before));
282        /* paranoid */ verify(((uintptr_t)( tail(this) ) + offsetof( thread_desc, link )) == (uintptr_t)(&this.after ));
283        /* paranoid */ verify(head(this)->link.prev == 0p );
284        /* paranoid */ verify(head(this)->link.next == tail(this) );
285        /* paranoid */ verify(tail(this)->link.next == 0p );
286        /* paranoid */ verify(tail(this)->link.prev == head(this) );
287        /* paranoid */ verify(&head(this)->link.prev == &this.before.link.prev );
288        /* paranoid */ verify(&head(this)->link.next == &this.before.link.next );
289        /* paranoid */ verify(&tail(this)->link.prev == &this.after .link.prev );
290        /* paranoid */ verify(&tail(this)->link.next == &this.after .link.next );
291        /* paranoid */ verify(sizeof(__intrusive_lane_t) == 128);
292        /* paranoid */ verify(sizeof(this) == 128);
293        /* paranoid */ verify(__alignof__(__intrusive_lane_t) == 128);
294        /* paranoid */ verify(__alignof__(this) == 128);
295        /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128));
296
297        /* paranoid */ verifyf(_mask_shiftidx() == 6 , "%zu", _mask_shiftidx());
298        /* paranoid */ verifyf(_mask_bitsidx () == 63, "%zu", _mask_bitsidx());
299}
300
301// Dtor is trivial
302void ^?{}( __intrusive_lane_t & this ) {
303        // Make sure the list is empty
304        /* paranoid */ verify(head(this)->link.prev == 0p );
305        /* paranoid */ verify(head(this)->link.next == tail(this) );
306        /* paranoid */ verify(tail(this)->link.next == 0p );
307        /* paranoid */ verify(tail(this)->link.prev == head(this) );
308        /* paranoid */ verify(this.count == 0u );
309}
310
311// Push a thread onto this lane
312// returns true of lane was empty before push, false otherwise
313bool push(__intrusive_lane_t & this, thread_desc * node) {
314        #if defined(__CFA_WITH_VERIFY__)
315                /* paranoid */ verify(this.lock);
316                /* paranoid */ verify(node->link.ts != 0);
317                /* paranoid */ verify(node->link.next == 0p);
318                /* paranoid */ verify(node->link.prev == 0p);
319
320                this.count++;
321
322                if(this.before.link.ts == 0l) {
323                        /* paranoid */ verify(tail(this)->link.next == 0p);
324                        /* paranoid */ verify(tail(this)->link.prev == head(this));
325                        /* paranoid */ verify(head(this)->link.next == tail(this));
326                        /* paranoid */ verify(head(this)->link.prev == 0p);
327                }
328        #endif
329
330        // Get the relevant nodes locally
331        thread_desc * tail = tail(this);
332        thread_desc * prev = tail->link.prev;
333
334        // Do the push
335        node->link.next = tail;
336        node->link.prev = prev;
337        prev->link.next = node;
338        tail->link.prev = node;
339
340        // Update stats
341        #if !defined(__CFA_NO_SCHED_STATS__)
342                this.stat.diff++;
343                this.stat.push++;
344        #endif
345
346        verify(node->link.next == tail(this));
347
348        // Check if the queue used to be empty
349        if(this.before.link.ts == 0l) {
350                this.before.link.ts = node->link.ts;
351                /* paranoid */ verify(node->link.prev == head(this));
352                return true;
353        }
354        return false;
355}
356
357// Pop a thread from this lane (must be non-empty)
358// returns popped
359// returns true of lane was empty before push, false otherwise
360[thread_desc *, bool] pop(__intrusive_lane_t & this) {
361        /* paranoid */ verify(this.lock);
362        /* paranoid */ verify(this.before.link.ts != 0ul);
363
364        // Get anchors locally
365        thread_desc * head = head(this);
366        thread_desc * tail = tail(this);
367
368        // Get the relevant nodes locally
369        thread_desc * node = head->link.next;
370        thread_desc * next = node->link.next;
371
372        #if defined(__CFA_WITH_VERIFY__)
373                this.count--;
374                /* paranoid */ verify(node != tail);
375                /* paranoid */ verify(node);
376        #endif
377
378        // Do the pop
379        head->link.next = next;
380        next->link.prev = head;
381        node->link.[next, prev] = 0p;
382
383        // Update head time stamp
384        this.before.link.ts = next->link.ts;
385
386        // Update stats
387        #ifndef __CFA_NO_SCHED_STATS__
388                this.stat.diff--;
389                this.stat.pop ++;
390        #endif
391
392        // Check if we emptied list and return accordingly
393        if(next == tail) {
394                /* paranoid */ verify(this.before.link.ts == 0);
395                /* paranoid */ verify(tail(this)->link.next == 0p);
396                /* paranoid */ verify(tail(this)->link.prev == head(this));
397                /* paranoid */ verify(head(this)->link.next == tail(this));
398                /* paranoid */ verify(head(this)->link.prev == 0p);
399                return [node, true];
400        }
401        else {
402                /* paranoid */ verify(next->link.ts != 0);
403                /* paranoid */ verify(this.before.link.ts != 0);
404                return [node, false];
405        }
406}
407
408// Check whether or not list is empty
409static inline bool is_empty(__intrusive_lane_t & this) {
410        verify( (this.before.link.ts == 0) == (this.count == 0) );
411        return this.before.link.ts == 0;
412}
413
414// Return the timestamp
415static inline unsigned long long ts(__intrusive_lane_t & this) {
416        verify( this.before.link.ts == this.before.link.next->link.ts );
417        return this.before.link.ts;
418}
419
420//=======================================================================
421// Cforall Reqdy Queue used by ready queue
422//=======================================================================
423
424// Thread local mirror of ready queue statistics
425#if !defined(__CFA_NO_STATISTICS__)
426static __attribute__((aligned(128))) thread_local struct {
427        struct {
428                struct {
429                        size_t attempt;
430                        size_t success;
431                } push;
432                struct {
433                        size_t maskrds;
434                        size_t attempt;
435                        size_t success;
436                } pop;
437        } pick;
438        struct {
439                size_t value;
440                size_t count;
441        } used;
442} tls = {
443        /* pick */{
444                /* push */{ 0, 0 },
445                /* pop  */{ 0, 0, 0 },
446        },
447        /* used */{ 0, 0 }
448};
449#endif
450
451//-----------------------------------------------------------------------
452
453void ?{}(__ready_queue_t & this) with (this) {
454        used.count = 0;
455        for( i ; __cfa_lane_mask_size ) {
456                used.mask[i] = 0;
457        }
458
459        lanes.data = alloc(4);
460        for( i; 4 ) {
461                (lanes.data[i]){};
462        }
463        lanes.count = 4;
464
465        #if !defined(__CFA_NO_STATISTICS__)
466                global_stats.pick.push.attempt = 0;
467                global_stats.pick.push.success = 0;
468                global_stats.pick.pop .maskrds = 0;
469                global_stats.pick.pop .attempt = 0;
470                global_stats.pick.pop .success = 0;
471
472                global_stats.used.value = 0;
473                global_stats.used.count = 0;
474        #endif
475}
476
477void ^?{}(__ready_queue_t & this) with (this) {
478        verify( 4  == lanes.count );
479        verify( 0  == used .count );
480
481        for( i; 4 ) {
482                ^(lanes.data[i]){};
483        }
484        free(lanes.data);
485
486
487        #if defined(__CFA_WITH_VERIFY__)
488                for( i ; __cfa_lane_mask_size ) {
489                        assert( 0 == used.mask[i] );
490                }
491        #endif
492}
493
494//-----------------------------------------------------------------------
495enum mask_strictness {
496        STRICT,
497        NOCHECK
498};
499
500// Set a given bit in the bit mask array
501// strictness determines of the bit had to be cleared before
502static inline void mask_set(__cfa_readyQ_mask_t * mask, unsigned index, mask_strictness strict) {
503        // Extract the array and bit indexes
504        __cfa_readyQ_mask_t word;
505        __cfa_readyQ_mask_t bit;
506        [bit, word] = extract(index);
507
508        // Conditional check
509        verifyf(
510                strict == STRICT && // Conditional check if it was expected to be cleared
511                ((mask[word] & (1ull << bit)) == 0),
512                "Before set %llu:%llu (%u), %llx & %llx", word, bit, index, mask[word], (1ull << bit)
513        );
514
515        // Atomically set the bit
516        __attribute__((unused)) bool ret = __atomic_bts(&mask[word], bit);
517
518        // Conditional check
519        verifyf(
520                strict == STRICT && // Conditional check if it was expected to be cleared
521                !ret,
522                "Bit was not set but bts returned true"
523        );
524
525        // Unconditional check
526        verifyf(
527                (mask[word] & (1ull << bit)) != 0,
528                "After set %llu:%llu (%u), %llx & %llx", word, bit, index, mask[word], (1ull << bit)
529        );
530}
531
532static inline void mask_clear(__cfa_readyQ_mask_t * mask, unsigned index, mask_strictness strict) {
533        // Extract the array and bit indexes
534        __cfa_readyQ_mask_t word;
535        __cfa_readyQ_mask_t bit;
536        [bit, word] = extract(index);
537
538        // Conditional check
539        verifyf(
540                strict == STRICT && // Conditional check if it was expected to be set
541                ((mask[word] & (1ull << bit)) != 0),
542                "Before clear %llu:%llu (%u), %llx & %llx", word, bit, index, mask[word], (1ull << bit)
543        );
544
545        // Atomically clear the bit
546        __attribute__((unused)) bool ret = __atomic_btr(&mask[word], bit);
547
548        // Conditional check
549        verifyf(
550                strict == STRICT && // Conditional check if it was expected to be cleared
551                ret,
552                "Bit was set but btr returned false"
553        );
554
555        // Unconditional check
556        verifyf(
557                (mask[word] & (1ull << bit)) == 0,
558                "After clear %llu:%llu (%u), %llx & %llx", word, bit, index, mask[word], (1ull << bit)
559        );
560}
561
562//-----------------------------------------------------------------------
563__attribute__((hot)) bool push(struct cluster * cltr, struct thread_desc * thrd) with (cltr->ready_queue) {
564        // write timestamp
565        thrd->link.ts = rdtscl();
566
567        // Try to pick a lane and lock it
568        unsigned i;
569        do {
570                // Pick the index of a lane
571                unsigned i = tls_rand() % lanes.count;
572
573                #if !defined(__CFA_NO_STATISTICS__)
574                        tls.pick.push.attempt++;
575                #endif
576
577                // If we can't lock it retry
578        } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
579
580        #if defined(__CFA_WITH_VERIFY__)
581                /* paranoid */ verify(lanes.data[i].last_id == -1u);
582                /* paranoid */ lanes.data[i].last_id = kernelTLS.this_processor->id;
583        #endif
584
585        __attribute__((unused)) size_t num = __atomic_load_n( &used.count, __ATOMIC_RELAXED );
586        bool first = false;
587
588        // Actually push it
589        bool lane_first = push(lanes.data[i], thrd);
590
591        // If this lane used to be empty we need to do more
592        if(lane_first) {
593                // Update the global count
594                size_t ret = __atomic_fetch_add( &used.count, 1z, __ATOMIC_SEQ_CST);
595
596                // Check if the entire quue used to be empty
597                first = (ret == 0);
598
599                // Update the bit mask
600                mask_set((__cfa_readyQ_mask_t *)used.mask, i, STRICT);
601        }
602
603        #if defined(__CFA_WITH_VERIFY__)
604                /* paranoid */ verifyf( used.count <= lanes.count, "Non-empty count (%zu) exceeds actual count (%zu)\n", used.count, lanes.count );
605                /* paranoid */ verifyf( lanes.data[i].last_id == kernelTLS.this_processor->id, "Expected last processor to lock queue %u to be %u, was %u\n", i, lanes.data[i].last_id, kernelTLS.this_processor->id );
606                /* paranoid */ verifyf( lanes.data[i].lock, "List %u is not locked\n", i );
607                /* paranoid */ lanes.data[i].last_id = -1u;
608        #endif
609
610        // Unlock and return
611        __atomic_unlock( &lanes.data[i].lock );
612
613        // Update statistics
614        #if !defined(__CFA_NO_STATISTICS__)
615                tls.pick.push.success++;
616                tls.used.value += num;
617                tls.used.count += 1;
618        #endif
619
620        // return whether or not the list was empty before this push
621        return first;
622}
623
624//-----------------------------------------------------------------------
625// Given 2 indexes, pick the list with the oldest push an try to pop from it
626static struct thread_desc * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
627        #if !defined(__CFA_NO_STATISTICS__)
628                tls.pick.pop.attempt++;
629        #endif
630
631        // Pick the bet list
632        int w = i;
633        if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
634                w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
635        }
636
637        // Get relevant elements locally
638        __intrusive_lane_t & lane = lanes.data[w];
639
640        // If list looks empty retry
641        if( is_empty(lane) ) return 0p;
642
643        // If we can't get the lock retry
644        if( !__atomic_try_acquire(&lane.lock) ) return 0p;
645
646        #if defined(__CFA_WITH_VERIFY__)
647                /* paranoid */ verify(lane.last_id == -1u);
648                /* paranoid */ lane.last_id = kernelTLS.this_processor->id;
649        #endif
650
651
652        // If list is empty, unlock and retry
653        if( is_empty(lane) ) {
654                #if defined(__CFA_WITH_VERIFY__)
655                        /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id);
656                        /* paranoid */ lane.last_id = -1u;
657                #endif
658
659                __atomic_unlock(&lane.lock);
660                return 0p;
661        }
662
663        // Actually pop the list
664        struct thread_desc * thrd;
665        bool emptied;
666        [thrd, emptied] = pop(lane);
667
668        /* paranoid */ verify(thrd);
669        /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id);
670        /* paranoid */ verify(lane.lock);
671
672        // If this was the last element in the lane
673        if(emptied) {
674                // Update the global count
675                __atomic_fetch_sub( &used.count, 1z, __ATOMIC_SEQ_CST);
676
677                // Update the bit mask
678                mask_clear((__cfa_readyQ_mask_t *)used.mask, w, STRICT);
679        }
680
681        #if defined(__CFA_WITH_VERIFY__)
682                /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id);
683                /* paranoid */ lane.last_id = -1u;
684        #endif
685
686        // For statistics, check the count before we release the lock
687        #if !defined(__CFA_NO_STATISTICS__)
688                int num = __atomic_load_n( &used.count, __ATOMIC_RELAXED );
689        #endif
690
691        // Unlock and return
692        __atomic_unlock(&lane.lock);
693
694        // Update statistics
695        #if !defined(__CFA_NO_STATISTICS__)
696                tls.pick.pop.success++;
697                tls.used.value += num;
698                tls.used.count += 1;
699        #endif
700
701        // return the popped thread
702        return thrd;
703}
704
705// Pop from the ready queue from a given cluster
706__attribute__((hot)) thread_desc * pop(struct cluster * cltr) with (cltr->ready_queue) {
707        /* paranoid */ verify( lanes.count > 0 );
708
709        // As long as the list is not empty, try finding a lane that isn't empty and pop from it
710        while( __atomic_load_n( &used.count, __ATOMIC_RELAXED ) != 0) {
711                #if !defined(__CFA_READQ_NO_BITMASK__)
712                        // If using bit masks
713                        #if !defined(__CFA_NO_SCHED_STATS__)
714                                tls.pick.pop.maskrds++;
715                        #endif
716
717                        // Pick two lists at random
718                        unsigned ri = tls_rand();
719                        unsigned rj = tls_rand();
720
721                        // Find which __cfa_readyQ_mask_t the two lists belong
722                        unsigned num = ((__atomic_load_n( &lanes.count, __ATOMIC_RELAXED ) - 1) >> 6) + 1;
723                        unsigned wdxi = (ri >> 6u) % num;
724                        unsigned wdxj = (rj >> 6u) % num;
725
726                        // Get the actual __cfa_readyQ_mask_t
727                        size_t maski = __atomic_load_n( &used.mask[wdxi], __ATOMIC_RELAXED );
728                        size_t maskj = __atomic_load_n( &used.mask[wdxj], __ATOMIC_RELAXED );
729
730                        // If both of these masks are empty, retry
731                        if(maski == 0 && maskj == 0) continue;
732
733                        // Pick one of the non-zero bits in the masks and get the bit indexes
734                        unsigned bi = rand_bit(ri, maski);
735                        unsigned bj = rand_bit(rj, maskj);
736
737                        // some checks
738                        /* paranoid */ verifyf(bi < 64, "%zu %u", maski, bi);
739                        /* paranoid */ verifyf(bj < 64, "%zu %u", maskj, bj);
740
741                        // get the general list index
742                        unsigned i = bi | (wdxi << 6);
743                        unsigned j = bj | (wdxj << 6);
744
745                        // some more checks
746                        /* paranoid */ verifyf(i < lanes.count, "%u", wdxi << 6);
747                        /* paranoid */ verifyf(j < lanes.count, "%u", wdxj << 6);
748
749                        // try popping from the 2 picked lists
750                        struct thread_desc * thrd = try_pop(cltr, i, j);
751                        if(thrd) return thrd;
752                #else
753                        // Pick two lists at random
754                        int i = tls_rand() % __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
755                        int j = tls_rand() % __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
756
757                        // try popping from the 2 picked lists
758                        struct thread_desc * thrd = try_pop(cltr, i, j);
759                        if(thrd) return thrd;
760                #endif
761        }
762
763        // All lanes where empty return 0p
764        return 0p;
765}
766
767//-----------------------------------------------------------------------
768
769static void check( __ready_queue_t & q ) with (q) {
770        #if defined(__CFA_WITH_VERIFY__)
771                {
772                        int idx = 0;
773                        for( w ; __cfa_lane_mask_size ) {
774                                for( b ; 8 * sizeof(__cfa_readyQ_mask_t) ) {
775                                        bool is_empty = idx < lanes.count ? (ts(lanes.data[idx]) == 0) : true;
776                                        bool should_be_empty = 0 == (used.mask[w] & (1z << b));
777                                        assertf(should_be_empty == is_empty, "Inconsistent list %d, mask expect : %d, actual is got %d", idx, should_be_empty, (bool)is_empty);
778                                        assert(__cfa_max_lanes > idx);
779                                        idx++;
780                                }
781                        }
782                }
783
784                {
785                        for( idx ; lanes.count ) {
786                                __intrusive_lane_t & sl = lanes.data[idx];
787                                assert(!lanes.data[idx].lock);
788
789                                assert(head(sl)->link.prev == 0p );
790                                assert(head(sl)->link.next->link.prev == head(sl) );
791                                assert(tail(sl)->link.next == 0p );
792                                assert(tail(sl)->link.prev->link.next == tail(sl) );
793
794                                if(sl.before.link.ts == 0l) {
795                                        assert(tail(sl)->link.next == 0p);
796                                        assert(tail(sl)->link.prev == head(sl));
797                                        assert(head(sl)->link.next == tail(sl));
798                                        assert(head(sl)->link.prev == 0p);
799                                }
800                        }
801                }
802        #endif
803}
804
805// Call this function of the intrusive list was moved using memcpy
806// fixes the list so that the pointers back to anchors aren't left dangling
807static inline void fix(__intrusive_lane_t & ll) {
808        // if the list is not empty then follow he pointer and fix its reverse
809        if(!is_empty(ll)) {
810                head(ll)->link.next->link.prev = head(ll);
811                tail(ll)->link.prev->link.next = tail(ll);
812        }
813        // Otherwise just reset the list
814        else {
815                verify(tail(ll)->link.next == 0p);
816                tail(ll)->link.prev = head(ll);
817                head(ll)->link.next = tail(ll);
818                verify(head(ll)->link.prev == 0p);
819        }
820}
821
822// Grow the ready queue
823void ready_queue_grow  (struct cluster * cltr) {
824        // Lock the RWlock so no-one pushes/pops while we are changing the queue
825        uint_fast32_t last_size = ready_mutate_lock( *cltr );
826
827        __cfaabi_dbg_print_safe("Kernel : Growing ready queue\n");
828
829        // Make sure that everything is consistent
830        /* paranoid */ check( cltr->ready_queue );
831
832        // grow the ready queue
833        with( cltr->ready_queue ) {
834                size_t ncount = lanes.count;
835
836                // Check that we have some space left
837                if(ncount + 4 >= __cfa_max_lanes) abort("Program attempted to create more than maximum number of Ready Queues (%zu)", __cfa_max_lanes);
838
839                // increase count
840                ncount += 4;
841
842                // Allocate new array (uses realloc and memcpies the data)
843                lanes.data = alloc(lanes.data, ncount);
844
845                // Fix the moved data
846                for( idx; (size_t)lanes.count ) {
847                        fix(lanes.data[idx]);
848                }
849
850                // Construct new data
851                for( idx; (size_t)lanes.count ~ ncount) {
852                        (lanes.data[idx]){};
853                }
854
855                // Update original
856                lanes.count = ncount;
857
858                // fields in 'used' don't need to change when growing
859        }
860
861        // Make sure that everything is consistent
862        /* paranoid */ check( cltr->ready_queue );
863
864        __cfaabi_dbg_print_safe("Kernel : Growing ready queue done\n");
865
866        // Unlock the RWlock
867        ready_mutate_unlock( *cltr, last_size );
868}
869
870// Shrink the ready queue
871void ready_queue_shrink(struct cluster * cltr) {
872        // Lock the RWlock so no-one pushes/pops while we are changing the queue
873        uint_fast32_t last_size = ready_mutate_lock( *cltr );
874
875        __cfaabi_dbg_print_safe("Kernel : Shrinking ready queue\n");
876
877        // Make sure that everything is consistent
878        /* paranoid */ check( cltr->ready_queue );
879
880        with( cltr->ready_queue ) {
881                // Make sure that the total thread count stays the same
882                #if defined(__CFA_WITH_VERIFY__)
883                        size_t nthreads = 0;
884                        for( idx; (size_t)lanes.count ) {
885                                nthreads += lanes.data[idx].count;
886                        }
887                #endif
888
889                size_t ocount = lanes.count;
890                // Check that we have some space left
891                if(ocount < 8) abort("Program attempted to destroy more Ready Queues than were created");
892
893                // reduce the actual count so push doesn't use the old queues
894                lanes.count -= 4;
895                verify(ocount > lanes.count);
896
897                // for printing count the number of displaced threads
898                #if defined(__CFA_DEBUG_PRINT__)
899                        __attribute__((unused)) size_t displaced = 0;
900                #endif
901
902                // redistribute old data
903                for( idx; (size_t)lanes.count ~ ocount) {
904                        // Lock is not strictly needed but makes checking invariants much easier
905                        bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
906                        verify(locked);
907
908                        // As long as we can pop from this lane to push the threads somewhere else in the queue
909                        while(!is_empty(lanes.data[idx])) {
910                                struct thread_desc * thrd;
911                                __attribute__((unused)) bool _;
912                                [thrd, _] = pop(lanes.data[idx]);
913
914                                push(cltr, thrd);
915
916                                // for printing count the number of displaced threads
917                                #if defined(__CFA_DEBUG_PRINT__)
918                                        displaced++;
919                                #endif
920                        }
921
922                        mask_clear((__cfa_readyQ_mask_t *)used.mask, idx, NOCHECK);
923
924                        // Unlock the lane
925                        __atomic_unlock(&lanes.data[idx].lock);
926
927                        // TODO print the queue statistics here
928
929                        ^(lanes.data[idx]){};
930                }
931
932                __cfaabi_dbg_print_safe("Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
933
934                // recompute the used.count instead of maintaining it
935                used.count = 0;
936                for( i ; __cfa_lane_mask_size ) {
937                        used.count += __builtin_popcountl(used.mask[i]);
938                }
939
940                // Allocate new array (uses realloc and memcpies the data)
941                lanes.data = alloc(lanes.data, lanes.count);
942
943                // Fix the moved data
944                for( idx; (size_t)lanes.count ) {
945                        fix(lanes.data[idx]);
946                }
947
948                // Make sure that the total thread count stayed the same
949                #if defined(__CFA_WITH_VERIFY__)
950                        for( idx; (size_t)lanes.count ) {
951                                nthreads -= lanes.data[idx].count;
952                        }
953                        verifyf(nthreads == 0, "Shrinking changed number of threads");
954                #endif
955        }
956
957        // Make sure that everything is consistent
958        /* paranoid */ check( cltr->ready_queue );
959
960        __cfaabi_dbg_print_safe("Kernel : Shrinking ready queue done\n");
961
962        // Unlock the RWlock
963        ready_mutate_unlock( *cltr, last_size );
964}
965
966//-----------------------------------------------------------------------
967
968#if !defined(__CFA_NO_STATISTICS__)
969void stats_tls_tally(struct cluster * cltr) with (cltr->ready_queue) {
970        __atomic_fetch_add( &global_stats.pick.push.attempt, tls.pick.push.attempt, __ATOMIC_SEQ_CST );
971        __atomic_fetch_add( &global_stats.pick.push.success, tls.pick.push.success, __ATOMIC_SEQ_CST );
972        __atomic_fetch_add( &global_stats.pick.pop .maskrds, tls.pick.pop .maskrds, __ATOMIC_SEQ_CST );
973        __atomic_fetch_add( &global_stats.pick.pop .attempt, tls.pick.pop .attempt, __ATOMIC_SEQ_CST );
974        __atomic_fetch_add( &global_stats.pick.pop .success, tls.pick.pop .success, __ATOMIC_SEQ_CST );
975
976        __atomic_fetch_add( &global_stats.used.value, tls.used.value, __ATOMIC_SEQ_CST );
977        __atomic_fetch_add( &global_stats.used.count, tls.used.count, __ATOMIC_SEQ_CST );
978}
979#endif
Note: See TracBrowser for help on using the repository browser.