source: libcfa/src/concurrency/ready_queue.cfa @ 64a7146

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 64a7146 was 64a7146, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Fixed idle sleep to no-longer use a spinlock, broke registration and gdbtools in the process

  • Property mode set to 100644
File size: 25.4 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#define __cforall_thread__
17// #define __CFA_DEBUG_PRINT_READY_QUEUE__
18
19#include "bits/defs.hfa"
20#include "kernel_private.hfa"
21
22#define _GNU_SOURCE
23#include "stdlib.hfa"
24#include "math.hfa"
25
26#include <unistd.h>
27
28static const size_t cache_line_size = 64;
29
30// No overriden function, no environment variable, no define
31// fall back to a magic number
32#ifndef __CFA_MAX_PROCESSORS__
33        #define __CFA_MAX_PROCESSORS__ 1024
34#endif
35
36#define BIAS 64
37
38// returns the maximum number of processors the RWLock support
39__attribute__((weak)) unsigned __max_processors() {
40        const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
41        if(!max_cores_s) {
42                __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
43                return __CFA_MAX_PROCESSORS__;
44        }
45
46        char * endptr = 0p;
47        long int max_cores_l = strtol(max_cores_s, &endptr, 10);
48        if(max_cores_l < 1 || max_cores_l > 65535) {
49                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
50                return __CFA_MAX_PROCESSORS__;
51        }
52        if('\0' != *endptr) {
53                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
54                return __CFA_MAX_PROCESSORS__;
55        }
56
57        return max_cores_l;
58}
59
60//=======================================================================
61// Cluster wide reader-writer lock
62//=======================================================================
63void  ?{}(__scheduler_RWLock_t & this) {
64        this.max   = __max_processors();
65        this.alloc = 0;
66        this.ready = 0;
67        this.lock  = false;
68        this.data  = alloc(this.max);
69
70        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data    )) % 64) );
71        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
72        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
73        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
74
75}
76void ^?{}(__scheduler_RWLock_t & this) {
77        free(this.data);
78}
79
80void ?{}( __scheduler_lock_id_t & this, __processor_id_t * proc ) {
81        this.handle = proc;
82        this.lock   = false;
83        #ifdef __CFA_WITH_VERIFY__
84                this.owned  = false;
85        #endif
86}
87
88//=======================================================================
89// Lock-Free registering/unregistering of threads
90unsigned doregister( struct __processor_id_t * proc ) with(*__scheduler_lock) {
91        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
92
93        // Step - 1 : check if there is already space in the data
94        uint_fast32_t s = ready;
95
96        // Check among all the ready
97        for(uint_fast32_t i = 0; i < s; i++) {
98                __processor_id_t * null = 0p; // Re-write every loop since compare thrashes it
99                if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
100                        && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
101                        /*paranoid*/ verify(i < ready);
102                        /*paranoid*/ verify(0 == (__alignof__(data[i]) % cache_line_size));
103                        /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
104                        return i;
105                }
106        }
107
108        if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock->max);
109
110        // Step - 2 : F&A to get a new spot in the array.
111        uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
112        if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock->max);
113
114        // Step - 3 : Mark space as used and then publish it.
115        __scheduler_lock_id_t * storage = (__scheduler_lock_id_t *)&data[n];
116        (*storage){ proc };
117        while(true) {
118                unsigned copy = n;
119                if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
120                        && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
121                        break;
122                asm volatile("pause");
123        }
124
125        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);
126
127        // Return new spot.
128        /*paranoid*/ verify(n < ready);
129        /*paranoid*/ verify(__alignof__(data[n]) == (2 * cache_line_size));
130        /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
131        return n;
132}
133
134void unregister( struct __processor_id_t * proc ) with(*__scheduler_lock) {
135        unsigned id = proc->id;
136        /*paranoid*/ verify(id < ready);
137        /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
138        __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
139
140        __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
141}
142
143//-----------------------------------------------------------------------
144// Writer side : acquire when changing the ready queue, e.g. adding more
145//  queues or removing them.
146uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
147        // Step 1 : lock global lock
148        // It is needed to avoid processors that register mid Critical-Section
149        //   to simply lock their own lock and enter.
150        __atomic_acquire( &lock );
151
152        // Step 2 : lock per-proc lock
153        // Processors that are currently being registered aren't counted
154        //   but can't be in read_lock or in the critical section.
155        // All other processors are counted
156        uint_fast32_t s = ready;
157        for(uint_fast32_t i = 0; i < s; i++) {
158                __atomic_acquire( &data[i].lock );
159        }
160
161        return s;
162}
163
164void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) {
165        // Step 1 : release local locks
166        // This must be done while the global lock is held to avoid
167        //   threads that where created mid critical section
168        //   to race to lock their local locks and have the writer
169        //   immidiately unlock them
170        // Alternative solution : return s in write_lock and pass it to write_unlock
171        for(uint_fast32_t i = 0; i < last_s; i++) {
172                verify(data[i].lock);
173                __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
174        }
175
176        // Step 2 : release global lock
177        /*paranoid*/ assert(true == lock);
178        __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
179}
180
181//=======================================================================
182// Intrusive Queue used by ready queue
183//=======================================================================
184// Intrusives lanes which are used by the relaxed ready queue
185struct __attribute__((aligned(128))) __intrusive_lane_t {
186        // spin lock protecting the queue
187        volatile bool lock;
188
189        // anchor for the head and the tail of the queue
190        struct __sentinel_t {
191                // Link lists fields
192                // instrusive link field for threads
193                // must be exactly as in $thread
194                __thread_desc_link link;
195        } before, after;
196
197        // Optional statistic counters
198        #if !defined(__CFA_NO_SCHED_STATS__)
199                struct __attribute__((aligned(64))) {
200                        // difference between number of push and pops
201                        ssize_t diff;
202
203                        // total number of pushes and pops
204                        size_t  push;
205                        size_t  pop ;
206                } stat;
207        #endif
208};
209
210void  ?{}(__intrusive_lane_t & this);
211void ^?{}(__intrusive_lane_t & this);
212
213// Get the head pointer (one before the first element) from the anchor
214static inline $thread * head(const __intrusive_lane_t & this) {
215        $thread * rhead = ($thread *)(
216                (uintptr_t)( &this.before ) - offsetof( $thread, link )
217        );
218        /* paranoid */ verify(rhead);
219        return rhead;
220}
221
222// Get the tail pointer (one after the last element) from the anchor
223static inline $thread * tail(const __intrusive_lane_t & this) {
224        $thread * rtail = ($thread *)(
225                (uintptr_t)( &this.after ) - offsetof( $thread, link )
226        );
227        /* paranoid */ verify(rtail);
228        return rtail;
229}
230
231// Ctor
232void ?{}( __intrusive_lane_t & this ) {
233        this.lock = false;
234
235        this.before.link.prev = 0p;
236        this.before.link.next = tail(this);
237        this.before.link.ts   = 0;
238
239        this.after .link.prev = head(this);
240        this.after .link.next = 0p;
241        this.after .link.ts   = 0;
242
243        #if !defined(__CFA_NO_SCHED_STATS__)
244                this.stat.diff = 0;
245                this.stat.push = 0;
246                this.stat.pop  = 0;
247        #endif
248
249        // We add a boat-load of assertions here because the anchor code is very fragile
250        /* paranoid */ verify(((uintptr_t)( head(this) ) + offsetof( $thread, link )) == (uintptr_t)(&this.before));
251        /* paranoid */ verify(((uintptr_t)( tail(this) ) + offsetof( $thread, link )) == (uintptr_t)(&this.after ));
252        /* paranoid */ verify(head(this)->link.prev == 0p );
253        /* paranoid */ verify(head(this)->link.next == tail(this) );
254        /* paranoid */ verify(tail(this)->link.next == 0p );
255        /* paranoid */ verify(tail(this)->link.prev == head(this) );
256        /* paranoid */ verify(&head(this)->link.prev == &this.before.link.prev );
257        /* paranoid */ verify(&head(this)->link.next == &this.before.link.next );
258        /* paranoid */ verify(&tail(this)->link.prev == &this.after .link.prev );
259        /* paranoid */ verify(&tail(this)->link.next == &this.after .link.next );
260        /* paranoid */ verify(sizeof(__intrusive_lane_t) == 128);
261        /* paranoid */ verify(sizeof(this) == 128);
262        /* paranoid */ verify(__alignof__(__intrusive_lane_t) == 128);
263        /* paranoid */ verify(__alignof__(this) == 128);
264        /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128));
265}
266
267// Dtor is trivial
268void ^?{}( __intrusive_lane_t & this ) {
269        // Make sure the list is empty
270        /* paranoid */ verify(head(this)->link.prev == 0p );
271        /* paranoid */ verify(head(this)->link.next == tail(this) );
272        /* paranoid */ verify(tail(this)->link.next == 0p );
273        /* paranoid */ verify(tail(this)->link.prev == head(this) );
274}
275
276// Push a thread onto this lane
277// returns true of lane was empty before push, false otherwise
278bool push(__intrusive_lane_t & this, $thread * node) {
279        #if defined(__CFA_WITH_VERIFY__)
280                /* paranoid */ verify(this.lock);
281                /* paranoid */ verify(node->link.ts != 0);
282                /* paranoid */ verify(node->link.next == 0p);
283                /* paranoid */ verify(node->link.prev == 0p);
284                /* paranoid */ verify(tail(this)->link.next == 0p);
285                /* paranoid */ verify(head(this)->link.prev == 0p);
286
287                if(this.before.link.ts == 0l) {
288                        /* paranoid */ verify(tail(this)->link.prev == head(this));
289                        /* paranoid */ verify(head(this)->link.next == tail(this));
290                } else {
291                        /* paranoid */ verify(tail(this)->link.prev != head(this));
292                        /* paranoid */ verify(head(this)->link.next != tail(this));
293                }
294        #endif
295
296        // Get the relevant nodes locally
297        $thread * tail = tail(this);
298        $thread * prev = tail->link.prev;
299
300        // Do the push
301        node->link.next = tail;
302        node->link.prev = prev;
303        prev->link.next = node;
304        tail->link.prev = node;
305
306        // Update stats
307        #if !defined(__CFA_NO_SCHED_STATS__)
308                this.stat.diff++;
309                this.stat.push++;
310        #endif
311
312        verify(node->link.next == tail(this));
313
314        // Check if the queue used to be empty
315        if(this.before.link.ts == 0l) {
316                this.before.link.ts = node->link.ts;
317                /* paranoid */ verify(node->link.prev == head(this));
318                return true;
319        }
320        return false;
321}
322
323// Pop a thread from this lane (must be non-empty)
324// returns popped
325// returns true of lane was empty before push, false otherwise
326[$thread *, bool] pop(__intrusive_lane_t & this) {
327        /* paranoid */ verify(this.lock);
328        /* paranoid */ verify(this.before.link.ts != 0ul);
329
330        // Get anchors locally
331        $thread * head = head(this);
332        $thread * tail = tail(this);
333
334        // Get the relevant nodes locally
335        $thread * node = head->link.next;
336        $thread * next = node->link.next;
337
338        /* paranoid */ verify(node != tail);
339        /* paranoid */ verify(node);
340
341        // Do the pop
342        head->link.next = next;
343        next->link.prev = head;
344        node->link.[next, prev] = 0p;
345
346        // Update head time stamp
347        this.before.link.ts = next->link.ts;
348
349        // Update stats
350        #ifndef __CFA_NO_SCHED_STATS__
351                this.stat.diff--;
352                this.stat.pop ++;
353        #endif
354
355        // Check if we emptied list and return accordingly
356        /* paranoid */ verify(tail(this)->link.next == 0p);
357        /* paranoid */ verify(head(this)->link.prev == 0p);
358        if(next == tail) {
359                /* paranoid */ verify(this.before.link.ts == 0);
360                /* paranoid */ verify(tail(this)->link.prev == head(this));
361                /* paranoid */ verify(head(this)->link.next == tail(this));
362                return [node, true];
363        }
364        else {
365                /* paranoid */ verify(next->link.ts != 0);
366                /* paranoid */ verify(tail(this)->link.prev != head(this));
367                /* paranoid */ verify(head(this)->link.next != tail(this));
368                /* paranoid */ verify(this.before.link.ts != 0);
369                return [node, false];
370        }
371}
372
373// Check whether or not list is empty
374static inline bool is_empty(__intrusive_lane_t & this) {
375        // Cannot verify here since it may not be locked
376        return this.before.link.ts == 0;
377}
378
379// Return the timestamp
380static inline unsigned long long ts(__intrusive_lane_t & this) {
381        // Cannot verify here since it may not be locked
382        return this.before.link.ts;
383}
384
385//=======================================================================
386// Scalable Non-Zero counter
387//=======================================================================
388
389union __snzi_val_t {
390        uint64_t _all;
391        struct __attribute__((packed)) {
392                char cnt;
393                uint64_t ver:56;
394        };
395};
396
397bool cas(volatile __snzi_val_t & self, __snzi_val_t & exp, char _cnt, uint64_t _ver) {
398        __snzi_val_t t;
399        t.ver = _ver;
400        t.cnt = _cnt;
401        /* paranoid */ verify(t._all == ((_ver << 8) | ((unsigned char)_cnt)));
402        return __atomic_compare_exchange_n(&self._all, &exp._all, t._all, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
403}
404
405bool cas(volatile __snzi_val_t & self, __snzi_val_t & exp, const __snzi_val_t & tar) {
406        return __atomic_compare_exchange_n(&self._all, &exp._all, tar._all, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
407}
408
409void ?{}( __snzi_val_t & this ) { this._all = 0; }
410void ?{}( __snzi_val_t & this, const volatile __snzi_val_t & o) { this._all = o._all; }
411
412struct __attribute__((aligned(128))) __snzi_node_t {
413        volatile __snzi_val_t value;
414        struct __snzi_node_t * parent;
415        bool is_root;
416};
417
418static inline void arrive( __snzi_node_t & );
419static inline void depart( __snzi_node_t & );
420
421#define __snzi_half -1
422
423//--------------------------------------------------
424// Root node
425static void arrive_r( __snzi_node_t & this ) {
426        /* paranoid */ verify( this.is_root );
427        __atomic_fetch_add(&this.value._all, 1, __ATOMIC_SEQ_CST);
428}
429
430static void depart_r( __snzi_node_t & this ) {
431        /* paranoid */ verify( this.is_root );
432        __atomic_fetch_sub(&this.value._all, 1, __ATOMIC_SEQ_CST);
433}
434
435//--------------------------------------------------
436// Hierarchical node
437static void arrive_h( __snzi_node_t & this ) {
438        int undoArr = 0;
439        bool success = false;
440        while(!success) {
441                __snzi_val_t x = { this.value };
442                /* paranoid */ verify(x.cnt <= 120);
443                if( x.cnt >= 1 ) {
444                        if( cas( this.value, x, x.cnt + 1, x.ver ) ) {
445                                success = true;
446                        }
447                }
448                /* paranoid */ verify(x.cnt <= 120);
449                if( x.cnt == 0 ) {
450                        if( cas( this.value, x, __snzi_half, x.ver + 1) ) {
451                                success = true;
452                                x.cnt = __snzi_half;
453                                x.ver = x.ver + 1;
454                        }
455                }
456                /* paranoid */ verify(x.cnt <= 120);
457                if( x.cnt == __snzi_half ) {
458                        /* paranoid */ verify( this.parent);
459                        arrive( *this.parent );
460                        if( !cas( this.value, x, 1, x.ver) ) {
461                                undoArr = undoArr + 1;
462                        }
463                }
464        }
465
466        for(int i = 0; i < undoArr; i++) {
467                /* paranoid */ verify( this.parent );
468                depart( *this.parent );
469        }
470}
471
472static void depart_h( __snzi_node_t & this ) {
473        while(true) {
474                const __snzi_val_t x = { this.value };
475                /* paranoid */ verifyf(x.cnt >= 1, "%d", x.cnt);
476                if( cas( this.value, x, x.cnt - 1, x.ver ) ) {
477                        if( x.cnt == 1 ) {
478                                /* paranoid */ verify( this.parent );
479                                depart( *this.parent );
480                        }
481                        return;
482                }
483        }
484}
485
486//--------------------------------------------------
487// All nodes
488static inline void arrive( __snzi_node_t & this ) {
489        if(this.is_root) arrive_r( this );
490        else arrive_h( this );
491}
492
493static inline void depart( __snzi_node_t & this ) {
494        if(this.is_root) depart_r( this );
495        else depart_h( this );
496}
497
498static inline bool query( __snzi_node_t & this ) {
499        /* paranoid */ verify( this.is_root );
500        return this.value._all > 0;
501}
502
503//--------------------------------------------------
504// SNZI object
505void  ?{}( __snzi_t & this, unsigned depth ) with( this ) {
506        mask = (1 << depth) - 1;
507        root = (1 << (depth + 1)) - 2;
508        nodes = alloc( root + 1 );
509
510        int width = 1 << depth;
511        for(int i = 0; i < root; i++) {
512                nodes[i].value._all = 0;
513                nodes[i].parent = &nodes[(i / 2) + width ];
514                nodes[i].is_root = false;
515        }
516
517        nodes[ root ].value._all = 0;
518        nodes[ root ].parent = 0p;
519        nodes[ root ].is_root = true;
520}
521
522void ^?{}( __snzi_t & this ) {
523        free( this.nodes );
524}
525
526static inline void arrive( __snzi_t & this, int idx) {
527        idx &= this.mask;
528        arrive( this.nodes[idx] );
529}
530
531static inline void depart( __snzi_t & this, int idx) {
532        idx &= this.mask;
533        depart( this.nodes[idx] );
534}
535
536static inline bool query( const __snzi_t & this ) {
537        return query( this.nodes[ this.root ] );
538}
539
540//=======================================================================
541// Cforall Reqdy Queue used by ready queue
542//=======================================================================
543
544void ?{}(__ready_queue_t & this) with (this) {
545
546        lanes.data = alloc(4);
547        for( i; 4 ) {
548                (lanes.data[i]){};
549        }
550        lanes.count = 4;
551        snzi{ log2( lanes.count / 8 ) };
552}
553
554void ^?{}(__ready_queue_t & this) with (this) {
555        verify( 4  == lanes.count );
556        verify( !query( snzi ) );
557
558        ^(snzi){};
559
560        for( i; 4 ) {
561                ^(lanes.data[i]){};
562        }
563        free(lanes.data);
564}
565
566//-----------------------------------------------------------------------
567__attribute__((hot)) bool query(struct cluster * cltr) {
568        return query(cltr->ready_queue.snzi);
569}
570
571//-----------------------------------------------------------------------
572__attribute__((hot)) bool push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
573        __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
574
575        // write timestamp
576        thrd->link.ts = rdtscl();
577
578        // Try to pick a lane and lock it
579        unsigned i;
580        do {
581                // Pick the index of a lane
582                #if defined(BIAS)
583                        unsigned r = __tls_rand();
584                        unsigned rlow  = r % BIAS;
585                        unsigned rhigh = r / BIAS;
586                        if(0 != (rlow % BIAS) && kernelTLS.this_processor) {
587                                // (BIAS - 1) out of BIAS chances
588                                // Use perferred queues
589                                i = (kernelTLS.this_processor->id * 4) + (rhigh % 4);
590                        }
591                        else {
592                                // 1 out of BIAS chances
593                                // Use all queues
594                                i = rhigh;
595                        }
596                #else
597                        i = __tls_rand();
598                #endif
599
600                i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
601
602                #if !defined(__CFA_NO_STATISTICS__)
603                        __tls_stats()->ready.pick.push.attempt++;
604                #endif
605
606                // If we can't lock it retry
607        } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
608
609        bool first = false;
610
611        // Actually push it
612        bool lane_first = push(lanes.data[i], thrd);
613
614        // If this lane used to be empty we need to do more
615        if(lane_first) {
616                // Check if the entire queue used to be empty
617                first = !query(snzi);
618
619                // Update the snzi
620                arrive( snzi, i );
621        }
622
623        // Unlock and return
624        __atomic_unlock( &lanes.data[i].lock );
625
626        __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
627
628        // Update statistics
629        #if !defined(__CFA_NO_STATISTICS__)
630                __tls_stats()->ready.pick.push.success++;
631        #endif
632
633        // return whether or not the list was empty before this push
634        return first;
635}
636
637//-----------------------------------------------------------------------
638// Given 2 indexes, pick the list with the oldest push an try to pop from it
639static struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
640        #if !defined(__CFA_NO_STATISTICS__)
641                __tls_stats()->ready.pick.pop.attempt++;
642        #endif
643
644        // Pick the bet list
645        int w = i;
646        if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
647                w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
648        }
649
650        // Get relevant elements locally
651        __intrusive_lane_t & lane = lanes.data[w];
652
653        // If list looks empty retry
654        if( is_empty(lane) ) return 0p;
655
656        // If we can't get the lock retry
657        if( !__atomic_try_acquire(&lane.lock) ) return 0p;
658
659
660        // If list is empty, unlock and retry
661        if( is_empty(lane) ) {
662                __atomic_unlock(&lane.lock);
663                return 0p;
664        }
665
666        // Actually pop the list
667        struct $thread * thrd;
668        bool emptied;
669        [thrd, emptied] = pop(lane);
670
671        /* paranoid */ verify(thrd);
672        /* paranoid */ verify(lane.lock);
673
674        // If this was the last element in the lane
675        if(emptied) {
676                depart( snzi, w );
677        }
678
679        // Unlock and return
680        __atomic_unlock(&lane.lock);
681
682        // Update statistics
683        #if !defined(__CFA_NO_STATISTICS__)
684                __tls_stats()->ready.pick.pop.success++;
685        #endif
686
687        // return the popped thread
688        return thrd;
689}
690
691// Pop from the ready queue from a given cluster
692__attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) {
693        /* paranoid */ verify( lanes.count > 0 );
694
695        // As long as the list is not empty, try finding a lane that isn't empty and pop from it
696        while( query(snzi) ) {
697                // Pick two lists at random
698                #if defined(BIAS)
699                        unsigned i = __tls_rand();
700                        unsigned j = __tls_rand();
701
702                        if(0 == (i % BIAS)) {
703                                i = i / BIAS;
704                        }
705                        else {
706                                i = ((kernelTLS.this_processor->id * 4) + ((i / BIAS) % 4));
707                                j = ((kernelTLS.this_processor->id * 4) + ((j / BIAS) % 4));
708                        }
709                #else
710                        unsigned i = __tls_rand();
711                        unsigned j = __tls_rand();
712                #endif
713
714                i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
715                j %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
716
717                // try popping from the 2 picked lists
718                struct $thread * thrd = try_pop(cltr, i, j);
719                if(thrd) return thrd;
720        }
721
722        // All lanes where empty return 0p
723        return 0p;
724}
725
726//-----------------------------------------------------------------------
727
728static void check( __ready_queue_t & q ) with (q) {
729        #if defined(__CFA_WITH_VERIFY__)
730                {
731                        for( idx ; lanes.count ) {
732                                __intrusive_lane_t & sl = lanes.data[idx];
733                                assert(!lanes.data[idx].lock);
734
735                                assert(head(sl)->link.prev == 0p );
736                                assert(head(sl)->link.next->link.prev == head(sl) );
737                                assert(tail(sl)->link.next == 0p );
738                                assert(tail(sl)->link.prev->link.next == tail(sl) );
739
740                                if(sl.before.link.ts == 0l) {
741                                        assert(tail(sl)->link.prev == head(sl));
742                                        assert(head(sl)->link.next == tail(sl));
743                                } else {
744                                        assert(tail(sl)->link.prev != head(sl));
745                                        assert(head(sl)->link.next != tail(sl));
746                                }
747                        }
748                }
749        #endif
750}
751
752// Call this function of the intrusive list was moved using memcpy
753// fixes the list so that the pointers back to anchors aren't left dangling
754static inline void fix(__intrusive_lane_t & ll) {
755        // if the list is not empty then follow he pointer and fix its reverse
756        if(!is_empty(ll)) {
757                head(ll)->link.next->link.prev = head(ll);
758                tail(ll)->link.prev->link.next = tail(ll);
759        }
760        // Otherwise just reset the list
761        else {
762                verify(tail(ll)->link.next == 0p);
763                tail(ll)->link.prev = head(ll);
764                head(ll)->link.next = tail(ll);
765                verify(head(ll)->link.prev == 0p);
766        }
767}
768
769// Grow the ready queue
770void ready_queue_grow  (struct cluster * cltr) {
771        /* paranoid */ verify( ready_mutate_islocked() );
772        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
773
774        // Make sure that everything is consistent
775        /* paranoid */ check( cltr->ready_queue );
776
777        // grow the ready queue
778        with( cltr->ready_queue ) {
779                ^(snzi){};
780
781                size_t ncount = lanes.count;
782
783                // increase count
784                ncount += 4;
785
786                // Allocate new array (uses realloc and memcpies the data)
787                lanes.data = alloc(lanes.data, ncount);
788
789                // Fix the moved data
790                for( idx; (size_t)lanes.count ) {
791                        fix(lanes.data[idx]);
792                }
793
794                // Construct new data
795                for( idx; (size_t)lanes.count ~ ncount) {
796                        (lanes.data[idx]){};
797                }
798
799                // Update original
800                lanes.count = ncount;
801
802                // Re-create the snzi
803                snzi{ log2( lanes.count / 8 ) };
804                for( idx; (size_t)lanes.count ) {
805                        if( !is_empty(lanes.data[idx]) ) {
806                                arrive(snzi, idx);
807                        }
808                }
809        }
810
811        // Make sure that everything is consistent
812        /* paranoid */ check( cltr->ready_queue );
813
814        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
815
816        /* paranoid */ verify( ready_mutate_islocked() );
817}
818
819// Shrink the ready queue
820void ready_queue_shrink(struct cluster * cltr) {
821        /* paranoid */ verify( ready_mutate_islocked() );
822        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
823
824        // Make sure that everything is consistent
825        /* paranoid */ check( cltr->ready_queue );
826
827        with( cltr->ready_queue ) {
828                ^(snzi){};
829
830                size_t ocount = lanes.count;
831                // Check that we have some space left
832                if(ocount < 8) abort("Program attempted to destroy more Ready Queues than were created");
833
834                // reduce the actual count so push doesn't use the old queues
835                lanes.count -= 4;
836                verify(ocount > lanes.count);
837
838                // for printing count the number of displaced threads
839                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
840                        __attribute__((unused)) size_t displaced = 0;
841                #endif
842
843                // redistribute old data
844                for( idx; (size_t)lanes.count ~ ocount) {
845                        // Lock is not strictly needed but makes checking invariants much easier
846                        __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
847                        verify(locked);
848
849                        // As long as we can pop from this lane to push the threads somewhere else in the queue
850                        while(!is_empty(lanes.data[idx])) {
851                                struct $thread * thrd;
852                                __attribute__((unused)) bool _;
853                                [thrd, _] = pop(lanes.data[idx]);
854
855                                push(cltr, thrd);
856
857                                // for printing count the number of displaced threads
858                                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
859                                        displaced++;
860                                #endif
861                        }
862
863                        // Unlock the lane
864                        __atomic_unlock(&lanes.data[idx].lock);
865
866                        // TODO print the queue statistics here
867
868                        ^(lanes.data[idx]){};
869                }
870
871                __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
872
873                // Allocate new array (uses realloc and memcpies the data)
874                lanes.data = alloc(lanes.data, lanes.count);
875
876                // Fix the moved data
877                for( idx; (size_t)lanes.count ) {
878                        fix(lanes.data[idx]);
879                }
880
881                // Re-create the snzi
882                snzi{ log2( lanes.count / 8 ) };
883                for( idx; (size_t)lanes.count ) {
884                        if( !is_empty(lanes.data[idx]) ) {
885                                arrive(snzi, idx);
886                        }
887                }
888        }
889
890        // Make sure that everything is consistent
891        /* paranoid */ check( cltr->ready_queue );
892
893        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
894        /* paranoid */ verify( ready_mutate_islocked() );
895}
Note: See TracBrowser for help on using the repository browser.