source: libcfa/src/concurrency/ready_queue.cfa @ 2073d207

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 2073d207 was 8834751, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Moved statistics to stats.cfa to combine ready Q stats and IO stats

  • Property mode set to 100644
File size: 24.6 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#define __cforall_thread__
17// #define __CFA_DEBUG_PRINT_READY_QUEUE__
18
19#include "bits/defs.hfa"
20#include "kernel_private.hfa"
21
22#define _GNU_SOURCE
23#include "stdlib.hfa"
24#include "math.hfa"
25
26static const size_t cache_line_size = 64;
27
28// No overriden function, no environment variable, no define
29// fall back to a magic number
30#ifndef __CFA_MAX_PROCESSORS__
31        #define __CFA_MAX_PROCESSORS__ 1024
32#endif
33
34// returns the maximum number of processors the RWLock support
35__attribute__((weak)) unsigned __max_processors() {
36        const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
37        if(!max_cores_s) {
38                __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
39                return __CFA_MAX_PROCESSORS__;
40        }
41
42        char * endptr = 0p;
43        long int max_cores_l = strtol(max_cores_s, &endptr, 10);
44        if(max_cores_l < 1 || max_cores_l > 65535) {
45                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
46                return __CFA_MAX_PROCESSORS__;
47        }
48        if('\0' != *endptr) {
49                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
50                return __CFA_MAX_PROCESSORS__;
51        }
52
53        return max_cores_l;
54}
55
56//=======================================================================
57// Cluster wide reader-writer lock
58//=======================================================================
59void  ?{}(__scheduler_RWLock_t & this) {
60        this.max   = __max_processors();
61        this.alloc = 0;
62        this.ready = 0;
63        this.lock  = false;
64        this.data  = alloc(this.max);
65
66        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data    )) % 64) );
67        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
68        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
69        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
70
71}
72void ^?{}(__scheduler_RWLock_t & this) {
73        free(this.data);
74}
75
76void ?{}( __scheduler_lock_id_t & this, __processor_id_t * proc ) {
77        this.handle = proc;
78        this.lock   = false;
79}
80
81//=======================================================================
82// Lock-Free registering/unregistering of threads
83unsigned doregister( struct __processor_id_t * proc ) with(*__scheduler_lock) {
84        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
85
86        // Step - 1 : check if there is already space in the data
87        uint_fast32_t s = ready;
88
89        // Check among all the ready
90        for(uint_fast32_t i = 0; i < s; i++) {
91                __processor_id_t * null = 0p; // Re-write every loop since compare thrashes it
92                if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
93                        && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
94                        /*paranoid*/ verify(i < ready);
95                        /*paranoid*/ verify(__alignof__(data[i]) == cache_line_size);
96                        /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
97                        return i;
98                }
99        }
100
101        if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock->max);
102
103        // Step - 2 : F&A to get a new spot in the array.
104        uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
105        if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock->max);
106
107        // Step - 3 : Mark space as used and then publish it.
108        __scheduler_lock_id_t * storage = (__scheduler_lock_id_t *)&data[n];
109        (*storage){ proc };
110        while(true) {
111                unsigned copy = n;
112                if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
113                        && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
114                        break;
115                asm volatile("pause");
116        }
117
118        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);
119
120        // Return new spot.
121        /*paranoid*/ verify(n < ready);
122        /*paranoid*/ verify(__alignof__(data[n]) == cache_line_size);
123        /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
124        return n;
125}
126
127void unregister( struct __processor_id_t * proc ) with(*__scheduler_lock) {
128        unsigned id = proc->id;
129        /*paranoid*/ verify(id < ready);
130        /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
131        __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
132
133        __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
134}
135
136//-----------------------------------------------------------------------
137// Writer side : acquire when changing the ready queue, e.g. adding more
138//  queues or removing them.
139uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
140        // Step 1 : lock global lock
141        // It is needed to avoid processors that register mid Critical-Section
142        //   to simply lock their own lock and enter.
143        __atomic_acquire( &lock );
144
145        // Step 2 : lock per-proc lock
146        // Processors that are currently being registered aren't counted
147        //   but can't be in read_lock or in the critical section.
148        // All other processors are counted
149        uint_fast32_t s = ready;
150        for(uint_fast32_t i = 0; i < s; i++) {
151                __atomic_acquire( &data[i].lock );
152        }
153
154        return s;
155}
156
157void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) {
158        // Step 1 : release local locks
159        // This must be done while the global lock is held to avoid
160        //   threads that where created mid critical section
161        //   to race to lock their local locks and have the writer
162        //   immidiately unlock them
163        // Alternative solution : return s in write_lock and pass it to write_unlock
164        for(uint_fast32_t i = 0; i < last_s; i++) {
165                verify(data[i].lock);
166                __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
167        }
168
169        // Step 2 : release global lock
170        /*paranoid*/ assert(true == lock);
171        __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
172}
173
174//=======================================================================
175// Intrusive Queue used by ready queue
176//=======================================================================
177// Intrusives lanes which are used by the relaxed ready queue
178struct __attribute__((aligned(128))) __intrusive_lane_t {
179        // spin lock protecting the queue
180        volatile bool lock;
181
182        // anchor for the head and the tail of the queue
183        struct __sentinel_t {
184                // Link lists fields
185                // instrusive link field for threads
186                // must be exactly as in $thread
187                __thread_desc_link link;
188        } before, after;
189
190        // Optional statistic counters
191        #if !defined(__CFA_NO_SCHED_STATS__)
192                struct __attribute__((aligned(64))) {
193                        // difference between number of push and pops
194                        ssize_t diff;
195
196                        // total number of pushes and pops
197                        size_t  push;
198                        size_t  pop ;
199                } stat;
200        #endif
201};
202
203void  ?{}(__intrusive_lane_t & this);
204void ^?{}(__intrusive_lane_t & this);
205
206// Get the head pointer (one before the first element) from the anchor
207static inline $thread * head(const __intrusive_lane_t & this) {
208        $thread * rhead = ($thread *)(
209                (uintptr_t)( &this.before ) - offsetof( $thread, link )
210        );
211        /* paranoid */ verify(rhead);
212        return rhead;
213}
214
215// Get the tail pointer (one after the last element) from the anchor
216static inline $thread * tail(const __intrusive_lane_t & this) {
217        $thread * rtail = ($thread *)(
218                (uintptr_t)( &this.after ) - offsetof( $thread, link )
219        );
220        /* paranoid */ verify(rtail);
221        return rtail;
222}
223
224// Ctor
225void ?{}( __intrusive_lane_t & this ) {
226        this.lock = false;
227
228        this.before.link.prev = 0p;
229        this.before.link.next = tail(this);
230        this.before.link.ts   = 0;
231
232        this.after .link.prev = head(this);
233        this.after .link.next = 0p;
234        this.after .link.ts   = 0;
235
236        #if !defined(__CFA_NO_SCHED_STATS__)
237                this.stat.diff = 0;
238                this.stat.push = 0;
239                this.stat.pop  = 0;
240        #endif
241
242        // We add a boat-load of assertions here because the anchor code is very fragile
243        /* paranoid */ verify(((uintptr_t)( head(this) ) + offsetof( $thread, link )) == (uintptr_t)(&this.before));
244        /* paranoid */ verify(((uintptr_t)( tail(this) ) + offsetof( $thread, link )) == (uintptr_t)(&this.after ));
245        /* paranoid */ verify(head(this)->link.prev == 0p );
246        /* paranoid */ verify(head(this)->link.next == tail(this) );
247        /* paranoid */ verify(tail(this)->link.next == 0p );
248        /* paranoid */ verify(tail(this)->link.prev == head(this) );
249        /* paranoid */ verify(&head(this)->link.prev == &this.before.link.prev );
250        /* paranoid */ verify(&head(this)->link.next == &this.before.link.next );
251        /* paranoid */ verify(&tail(this)->link.prev == &this.after .link.prev );
252        /* paranoid */ verify(&tail(this)->link.next == &this.after .link.next );
253        /* paranoid */ verify(sizeof(__intrusive_lane_t) == 128);
254        /* paranoid */ verify(sizeof(this) == 128);
255        /* paranoid */ verify(__alignof__(__intrusive_lane_t) == 128);
256        /* paranoid */ verify(__alignof__(this) == 128);
257        /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128));
258}
259
260// Dtor is trivial
261void ^?{}( __intrusive_lane_t & this ) {
262        // Make sure the list is empty
263        /* paranoid */ verify(head(this)->link.prev == 0p );
264        /* paranoid */ verify(head(this)->link.next == tail(this) );
265        /* paranoid */ verify(tail(this)->link.next == 0p );
266        /* paranoid */ verify(tail(this)->link.prev == head(this) );
267}
268
269// Push a thread onto this lane
270// returns true of lane was empty before push, false otherwise
271bool push(__intrusive_lane_t & this, $thread * node) {
272        #if defined(__CFA_WITH_VERIFY__)
273                /* paranoid */ verify(this.lock);
274                /* paranoid */ verify(node->link.ts != 0);
275                /* paranoid */ verify(node->link.next == 0p);
276                /* paranoid */ verify(node->link.prev == 0p);
277                /* paranoid */ verify(tail(this)->link.next == 0p);
278                /* paranoid */ verify(head(this)->link.prev == 0p);
279
280                if(this.before.link.ts == 0l) {
281                        /* paranoid */ verify(tail(this)->link.prev == head(this));
282                        /* paranoid */ verify(head(this)->link.next == tail(this));
283                } else {
284                        /* paranoid */ verify(tail(this)->link.prev != head(this));
285                        /* paranoid */ verify(head(this)->link.next != tail(this));
286                }
287        #endif
288
289        // Get the relevant nodes locally
290        $thread * tail = tail(this);
291        $thread * prev = tail->link.prev;
292
293        // Do the push
294        node->link.next = tail;
295        node->link.prev = prev;
296        prev->link.next = node;
297        tail->link.prev = node;
298
299        // Update stats
300        #if !defined(__CFA_NO_SCHED_STATS__)
301                this.stat.diff++;
302                this.stat.push++;
303        #endif
304
305        verify(node->link.next == tail(this));
306
307        // Check if the queue used to be empty
308        if(this.before.link.ts == 0l) {
309                this.before.link.ts = node->link.ts;
310                /* paranoid */ verify(node->link.prev == head(this));
311                return true;
312        }
313        return false;
314}
315
316// Pop a thread from this lane (must be non-empty)
317// returns popped
318// returns true of lane was empty before push, false otherwise
319[$thread *, bool] pop(__intrusive_lane_t & this) {
320        /* paranoid */ verify(this.lock);
321        /* paranoid */ verify(this.before.link.ts != 0ul);
322
323        // Get anchors locally
324        $thread * head = head(this);
325        $thread * tail = tail(this);
326
327        // Get the relevant nodes locally
328        $thread * node = head->link.next;
329        $thread * next = node->link.next;
330
331        /* paranoid */ verify(node != tail);
332        /* paranoid */ verify(node);
333
334        // Do the pop
335        head->link.next = next;
336        next->link.prev = head;
337        node->link.[next, prev] = 0p;
338
339        // Update head time stamp
340        this.before.link.ts = next->link.ts;
341
342        // Update stats
343        #ifndef __CFA_NO_SCHED_STATS__
344                this.stat.diff--;
345                this.stat.pop ++;
346        #endif
347
348        // Check if we emptied list and return accordingly
349        /* paranoid */ verify(tail(this)->link.next == 0p);
350        /* paranoid */ verify(head(this)->link.prev == 0p);
351        if(next == tail) {
352                /* paranoid */ verify(this.before.link.ts == 0);
353                /* paranoid */ verify(tail(this)->link.prev == head(this));
354                /* paranoid */ verify(head(this)->link.next == tail(this));
355                return [node, true];
356        }
357        else {
358                /* paranoid */ verify(next->link.ts != 0);
359                /* paranoid */ verify(tail(this)->link.prev != head(this));
360                /* paranoid */ verify(head(this)->link.next != tail(this));
361                /* paranoid */ verify(this.before.link.ts != 0);
362                return [node, false];
363        }
364}
365
366// Check whether or not list is empty
367static inline bool is_empty(__intrusive_lane_t & this) {
368        // Cannot verify here since it may not be locked
369        return this.before.link.ts == 0;
370}
371
372// Return the timestamp
373static inline unsigned long long ts(__intrusive_lane_t & this) {
374        // Cannot verify here since it may not be locked
375        return this.before.link.ts;
376}
377
378//=======================================================================
379// Scalable Non-Zero counter
380//=======================================================================
381
382union __snzi_val_t {
383        uint64_t _all;
384        struct __attribute__((packed)) {
385                char cnt;
386                uint64_t ver:56;
387        };
388};
389
390bool cas(volatile __snzi_val_t & self, __snzi_val_t & exp, char _cnt, uint64_t _ver) {
391        __snzi_val_t t;
392        t.ver = _ver;
393        t.cnt = _cnt;
394        /* paranoid */ verify(t._all == ((_ver << 8) | ((unsigned char)_cnt)));
395        return __atomic_compare_exchange_n(&self._all, &exp._all, t._all, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
396}
397
398bool cas(volatile __snzi_val_t & self, __snzi_val_t & exp, const __snzi_val_t & tar) {
399        return __atomic_compare_exchange_n(&self._all, &exp._all, tar._all, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
400}
401
402void ?{}( __snzi_val_t & this ) { this._all = 0; }
403void ?{}( __snzi_val_t & this, const volatile __snzi_val_t & o) { this._all = o._all; }
404
405struct __attribute__((aligned(128))) __snzi_node_t {
406        volatile __snzi_val_t value;
407        struct __snzi_node_t * parent;
408        bool is_root;
409};
410
411static inline void arrive( __snzi_node_t & );
412static inline void depart( __snzi_node_t & );
413
414#define __snzi_half -1
415
416//--------------------------------------------------
417// Root node
418static void arrive_r( __snzi_node_t & this ) {
419        /* paranoid */ verify( this.is_root );
420        __atomic_fetch_add(&this.value._all, 1, __ATOMIC_SEQ_CST);
421}
422
423static void depart_r( __snzi_node_t & this ) {
424        /* paranoid */ verify( this.is_root );
425        __atomic_fetch_sub(&this.value._all, 1, __ATOMIC_SEQ_CST);
426}
427
428//--------------------------------------------------
429// Hierarchical node
430static void arrive_h( __snzi_node_t & this ) {
431        int undoArr = 0;
432        bool success = false;
433        while(!success) {
434                __snzi_val_t x = { this.value };
435                /* paranoid */ verify(x.cnt <= 120);
436                if( x.cnt >= 1 ) {
437                        if( cas( this.value, x, x.cnt + 1, x.ver ) ) {
438                                success = true;
439                        }
440                }
441                /* paranoid */ verify(x.cnt <= 120);
442                if( x.cnt == 0 ) {
443                        if( cas( this.value, x, __snzi_half, x.ver + 1) ) {
444                                success = true;
445                                x.cnt = __snzi_half;
446                                x.ver = x.ver + 1;
447                        }
448                }
449                /* paranoid */ verify(x.cnt <= 120);
450                if( x.cnt == __snzi_half ) {
451                        /* paranoid */ verify( this.parent);
452                        arrive( *this.parent );
453                        if( !cas( this.value, x, 1, x.ver) ) {
454                                undoArr = undoArr + 1;
455                        }
456                }
457        }
458
459        for(int i = 0; i < undoArr; i++) {
460                /* paranoid */ verify( this.parent );
461                depart( *this.parent );
462        }
463}
464
465static void depart_h( __snzi_node_t & this ) {
466        while(true) {
467                const __snzi_val_t x = { this.value };
468                /* paranoid */ verifyf(x.cnt >= 1, "%d", x.cnt);
469                if( cas( this.value, x, x.cnt - 1, x.ver ) ) {
470                        if( x.cnt == 1 ) {
471                                /* paranoid */ verify( this.parent );
472                                depart( *this.parent );
473                        }
474                        return;
475                }
476        }
477}
478
479//--------------------------------------------------
480// All nodes
481static inline void arrive( __snzi_node_t & this ) {
482        if(this.is_root) arrive_r( this );
483        else arrive_h( this );
484}
485
486static inline void depart( __snzi_node_t & this ) {
487        if(this.is_root) depart_r( this );
488        else depart_h( this );
489}
490
491static inline bool query( __snzi_node_t & this ) {
492        /* paranoid */ verify( this.is_root );
493        return this.value._all > 0;
494}
495
496//--------------------------------------------------
497// SNZI object
498void  ?{}( __snzi_t & this, unsigned depth ) with( this ) {
499        mask = (1 << depth) - 1;
500        root = (1 << (depth + 1)) - 2;
501        nodes = alloc( root + 1 );
502
503        int width = 1 << depth;
504        for(int i = 0; i < root; i++) {
505                nodes[i].value._all = 0;
506                nodes[i].parent = &nodes[(i / 2) + width ];
507                nodes[i].is_root = false;
508        }
509
510        nodes[ root ].value._all = 0;
511        nodes[ root ].parent = 0p;
512        nodes[ root ].is_root = true;
513}
514
515void ^?{}( __snzi_t & this ) {
516        free( this.nodes );
517}
518
519static inline void arrive( __snzi_t & this, int idx) {
520        idx &= this.mask;
521        arrive( this.nodes[idx] );
522}
523
524static inline void depart( __snzi_t & this, int idx) {
525        idx &= this.mask;
526        depart( this.nodes[idx] );
527}
528
529static inline bool query( const __snzi_t & this ) {
530        return query( this.nodes[ this.root ] );
531}
532
533//=======================================================================
534// Cforall Reqdy Queue used by ready queue
535//=======================================================================
536
537void ?{}(__ready_queue_t & this) with (this) {
538
539        lanes.data = alloc(4);
540        for( i; 4 ) {
541                (lanes.data[i]){};
542        }
543        lanes.count = 4;
544        snzi{ log2( lanes.count / 8 ) };
545}
546
547void ^?{}(__ready_queue_t & this) with (this) {
548        verify( 4  == lanes.count );
549        verify( !query( snzi ) );
550
551        ^(snzi){};
552
553        for( i; 4 ) {
554                ^(lanes.data[i]){};
555        }
556        free(lanes.data);
557}
558
559//-----------------------------------------------------------------------
560__attribute__((hot)) bool push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
561        __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
562
563        // write timestamp
564        thrd->link.ts = rdtscl();
565
566        // Try to pick a lane and lock it
567        unsigned i;
568        do {
569                // Pick the index of a lane
570                i = __tls_rand() % lanes.count;
571
572                #if !defined(__CFA_NO_STATISTICS__)
573                        __tls_stats()->ready.pick.push.attempt++;
574                #endif
575
576                // If we can't lock it retry
577        } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
578
579        bool first = false;
580
581        // Actually push it
582        bool lane_first = push(lanes.data[i], thrd);
583
584        // If this lane used to be empty we need to do more
585        if(lane_first) {
586                // Check if the entire queue used to be empty
587                first = !query(snzi);
588
589                // Update the snzi
590                arrive( snzi, i );
591        }
592
593        // Unlock and return
594        __atomic_unlock( &lanes.data[i].lock );
595
596        __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
597
598        // Update statistics
599        #if !defined(__CFA_NO_STATISTICS__)
600                __tls_stats()->ready.pick.push.success++;
601        #endif
602
603        // return whether or not the list was empty before this push
604        return first;
605}
606
607//-----------------------------------------------------------------------
608// Given 2 indexes, pick the list with the oldest push an try to pop from it
609static struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
610        #if !defined(__CFA_NO_STATISTICS__)
611                __tls_stats()->ready.pick.pop.attempt++;
612        #endif
613
614        // Pick the bet list
615        int w = i;
616        if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
617                w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
618        }
619
620        // Get relevant elements locally
621        __intrusive_lane_t & lane = lanes.data[w];
622
623        // If list looks empty retry
624        if( is_empty(lane) ) return 0p;
625
626        // If we can't get the lock retry
627        if( !__atomic_try_acquire(&lane.lock) ) return 0p;
628
629
630        // If list is empty, unlock and retry
631        if( is_empty(lane) ) {
632                __atomic_unlock(&lane.lock);
633                return 0p;
634        }
635
636        // Actually pop the list
637        struct $thread * thrd;
638        bool emptied;
639        [thrd, emptied] = pop(lane);
640
641        /* paranoid */ verify(thrd);
642        /* paranoid */ verify(lane.lock);
643
644        // If this was the last element in the lane
645        if(emptied) {
646                depart( snzi, w );
647        }
648
649        // Unlock and return
650        __atomic_unlock(&lane.lock);
651
652        // Update statistics
653        #if !defined(__CFA_NO_STATISTICS__)
654                __tls_stats()->ready.pick.pop.success++;
655        #endif
656
657        // return the popped thread
658        return thrd;
659}
660
661// Pop from the ready queue from a given cluster
662__attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) {
663        /* paranoid */ verify( lanes.count > 0 );
664
665        // As long as the list is not empty, try finding a lane that isn't empty and pop from it
666        while( query(snzi) ) {
667                // Pick two lists at random
668                int i = __tls_rand() % __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
669                int j = __tls_rand() % __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
670
671                // try popping from the 2 picked lists
672                struct $thread * thrd = try_pop(cltr, i, j);
673                if(thrd) return thrd;
674        }
675
676        // All lanes where empty return 0p
677        return 0p;
678}
679
680//-----------------------------------------------------------------------
681
682static void check( __ready_queue_t & q ) with (q) {
683        #if defined(__CFA_WITH_VERIFY__)
684                {
685                        for( idx ; lanes.count ) {
686                                __intrusive_lane_t & sl = lanes.data[idx];
687                                assert(!lanes.data[idx].lock);
688
689                                assert(head(sl)->link.prev == 0p );
690                                assert(head(sl)->link.next->link.prev == head(sl) );
691                                assert(tail(sl)->link.next == 0p );
692                                assert(tail(sl)->link.prev->link.next == tail(sl) );
693
694                                if(sl.before.link.ts == 0l) {
695                                        assert(tail(sl)->link.prev == head(sl));
696                                        assert(head(sl)->link.next == tail(sl));
697                                } else {
698                                        assert(tail(sl)->link.prev != head(sl));
699                                        assert(head(sl)->link.next != tail(sl));
700                                }
701                        }
702                }
703        #endif
704}
705
706// Call this function of the intrusive list was moved using memcpy
707// fixes the list so that the pointers back to anchors aren't left dangling
708static inline void fix(__intrusive_lane_t & ll) {
709        // if the list is not empty then follow he pointer and fix its reverse
710        if(!is_empty(ll)) {
711                head(ll)->link.next->link.prev = head(ll);
712                tail(ll)->link.prev->link.next = tail(ll);
713        }
714        // Otherwise just reset the list
715        else {
716                verify(tail(ll)->link.next == 0p);
717                tail(ll)->link.prev = head(ll);
718                head(ll)->link.next = tail(ll);
719                verify(head(ll)->link.prev == 0p);
720        }
721}
722
723// Grow the ready queue
724void ready_queue_grow  (struct cluster * cltr) {
725        // Lock the RWlock so no-one pushes/pops while we are changing the queue
726        uint_fast32_t last_size = ready_mutate_lock();
727
728        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
729
730        // Make sure that everything is consistent
731        /* paranoid */ check( cltr->ready_queue );
732
733        // grow the ready queue
734        with( cltr->ready_queue ) {
735                ^(snzi){};
736
737                size_t ncount = lanes.count;
738
739                // increase count
740                ncount += 4;
741
742                // Allocate new array (uses realloc and memcpies the data)
743                lanes.data = alloc(lanes.data, ncount);
744
745                // Fix the moved data
746                for( idx; (size_t)lanes.count ) {
747                        fix(lanes.data[idx]);
748                }
749
750                // Construct new data
751                for( idx; (size_t)lanes.count ~ ncount) {
752                        (lanes.data[idx]){};
753                }
754
755                // Update original
756                lanes.count = ncount;
757
758                // Re-create the snzi
759                snzi{ log2( lanes.count / 8 ) };
760                for( idx; (size_t)lanes.count ) {
761                        if( !is_empty(lanes.data[idx]) ) {
762                                arrive(snzi, idx);
763                        }
764                }
765        }
766
767        // Make sure that everything is consistent
768        /* paranoid */ check( cltr->ready_queue );
769
770        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
771
772        // Unlock the RWlock
773        ready_mutate_unlock( last_size );
774}
775
776// Shrink the ready queue
777void ready_queue_shrink(struct cluster * cltr) {
778        // Lock the RWlock so no-one pushes/pops while we are changing the queue
779        uint_fast32_t last_size = ready_mutate_lock();
780
781        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
782
783        // Make sure that everything is consistent
784        /* paranoid */ check( cltr->ready_queue );
785
786        with( cltr->ready_queue ) {
787                ^(snzi){};
788
789                size_t ocount = lanes.count;
790                // Check that we have some space left
791                if(ocount < 8) abort("Program attempted to destroy more Ready Queues than were created");
792
793                // reduce the actual count so push doesn't use the old queues
794                lanes.count -= 4;
795                verify(ocount > lanes.count);
796
797                // for printing count the number of displaced threads
798                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
799                        __attribute__((unused)) size_t displaced = 0;
800                #endif
801
802                // redistribute old data
803                for( idx; (size_t)lanes.count ~ ocount) {
804                        // Lock is not strictly needed but makes checking invariants much easier
805                        __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
806                        verify(locked);
807
808                        // As long as we can pop from this lane to push the threads somewhere else in the queue
809                        while(!is_empty(lanes.data[idx])) {
810                                struct $thread * thrd;
811                                __attribute__((unused)) bool _;
812                                [thrd, _] = pop(lanes.data[idx]);
813
814                                push(cltr, thrd);
815
816                                // for printing count the number of displaced threads
817                                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
818                                        displaced++;
819                                #endif
820                        }
821
822                        // Unlock the lane
823                        __atomic_unlock(&lanes.data[idx].lock);
824
825                        // TODO print the queue statistics here
826
827                        ^(lanes.data[idx]){};
828                }
829
830                __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
831
832                // Allocate new array (uses realloc and memcpies the data)
833                lanes.data = alloc(lanes.data, lanes.count);
834
835                // Fix the moved data
836                for( idx; (size_t)lanes.count ) {
837                        fix(lanes.data[idx]);
838                }
839
840                // Re-create the snzi
841                snzi{ log2( lanes.count / 8 ) };
842                for( idx; (size_t)lanes.count ) {
843                        if( !is_empty(lanes.data[idx]) ) {
844                                arrive(snzi, idx);
845                        }
846                }
847        }
848
849        // Make sure that everything is consistent
850        /* paranoid */ check( cltr->ready_queue );
851
852        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
853
854        // Unlock the RWlock
855        ready_mutate_unlock( last_size );
856}
Note: See TracBrowser for help on using the repository browser.