source: libcfa/src/concurrency/ready_queue.cfa @ 61d7bec

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 61d7bec was 61d7bec, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Replaced the bitmask approached for the ready-queue with a SNZI

  • Property mode set to 100644
File size: 28.2 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#define __cforall_thread__
17// #define __CFA_DEBUG_PRINT_READY_QUEUE__
18
19#include "bits/defs.hfa"
20#include "kernel_private.hfa"
21
22#define _GNU_SOURCE
23#include "stdlib.hfa"
24#include "math.hfa"
25
26static const size_t cache_line_size = 64;
27
28// No overriden function, no environment variable, no define
29// fall back to a magic number
30#ifndef __CFA_MAX_PROCESSORS__
31        #define __CFA_MAX_PROCESSORS__ 128
32#endif
33
34// returns the maximum number of processors the RWLock support
35__attribute__((weak)) unsigned __max_processors() {
36        const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
37        if(!max_cores_s) {
38                __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
39                return __CFA_MAX_PROCESSORS__;
40        }
41
42        char * endptr = 0p;
43        long int max_cores_l = strtol(max_cores_s, &endptr, 10);
44        if(max_cores_l < 1 || max_cores_l > 65535) {
45                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
46                return __CFA_MAX_PROCESSORS__;
47        }
48        if('\0' != *endptr) {
49                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
50                return __CFA_MAX_PROCESSORS__;
51        }
52
53        return max_cores_l;
54}
55
56//=======================================================================
57// Cluster wide reader-writer lock
58//=======================================================================
59void  ?{}(__clusterRWLock_t & this) {
60        this.max   = __max_processors();
61        this.alloc = 0;
62        this.ready = 0;
63        this.lock  = false;
64        this.data  = alloc(this.max);
65
66        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data    )) % 64) );
67        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
68        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
69        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
70
71}
72void ^?{}(__clusterRWLock_t & this) {
73        free(this.data);
74}
75
76void ?{}( __processor_id & this, struct processor * proc ) {
77        this.handle = proc;
78        this.lock   = false;
79}
80
81//=======================================================================
82// Lock-Free registering/unregistering of threads
83unsigned doregister2( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
84        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p with cluster %p\n", proc, cltr);
85
86        // Step - 1 : check if there is already space in the data
87        uint_fast32_t s = ready;
88
89        // Check among all the ready
90        for(uint_fast32_t i = 0; i < s; i++) {
91                processor * null = 0p; // Re-write every loop since compare thrashes it
92                if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
93                        && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
94                        /*paranoid*/ verify(i < ready);
95                        /*paranoid*/ verify(__alignof__(data[i]) == cache_line_size);
96                        /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
97                        return i;
98                }
99        }
100
101        if(max <= alloc) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
102
103        // Step - 2 : F&A to get a new spot in the array.
104        uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
105        if(max <= n) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
106
107        // Step - 3 : Mark space as used and then publish it.
108        __processor_id * storage = (__processor_id *)&data[n];
109        (*storage){ proc };
110        while(true) {
111                unsigned copy = n;
112                if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
113                        && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
114                        break;
115                asm volatile("pause");
116        }
117
118        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);
119
120        // Return new spot.
121        /*paranoid*/ verify(n < ready);
122        /*paranoid*/ verify(__alignof__(data[n]) == cache_line_size);
123        /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
124        return n;
125}
126
127void unregister2( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
128        unsigned id = proc->id;
129        /*paranoid*/ verify(id < ready);
130        /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
131        __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
132
133        __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
134}
135
136//-----------------------------------------------------------------------
137// Writer side : acquire when changing the ready queue, e.g. adding more
138//  queues or removing them.
139uint_fast32_t ready_mutate_lock( struct cluster & cltr ) with(cltr.ready_lock) {
140        // Step 1 : lock global lock
141        // It is needed to avoid processors that register mid Critical-Section
142        //   to simply lock their own lock and enter.
143        __atomic_acquire( &lock );
144
145        // Step 2 : lock per-proc lock
146        // Processors that are currently being registered aren't counted
147        //   but can't be in read_lock or in the critical section.
148        // All other processors are counted
149        uint_fast32_t s = ready;
150        for(uint_fast32_t i = 0; i < s; i++) {
151                __atomic_acquire( &data[i].lock );
152        }
153
154        return s;
155}
156
157void ready_mutate_unlock( struct cluster & cltr, uint_fast32_t last_s ) with(cltr.ready_lock) {
158        // Step 1 : release local locks
159        // This must be done while the global lock is held to avoid
160        //   threads that where created mid critical section
161        //   to race to lock their local locks and have the writer
162        //   immidiately unlock them
163        // Alternative solution : return s in write_lock and pass it to write_unlock
164        for(uint_fast32_t i = 0; i < last_s; i++) {
165                verify(data[i].lock);
166                __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
167        }
168
169        // Step 2 : release global lock
170        /*paranoid*/ assert(true == lock);
171        __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
172}
173
174//=======================================================================
175// Intrusive Queue used by ready queue
176//=======================================================================
177// Intrusives lanes which are used by the relaxed ready queue
178struct __attribute__((aligned(128))) __intrusive_lane_t {
179        // spin lock protecting the queue
180        volatile bool lock;
181
182        // anchor for the head and the tail of the queue
183        struct __sentinel_t {
184                // Link lists fields
185                // instrusive link field for threads
186                // must be exactly as in $thread
187                __thread_desc_link link;
188        } before, after;
189
190#if defined(__CFA_WITH_VERIFY__)
191        // id of last processor to acquire the lock
192        // needed only to check for mutual exclusion violations
193        unsigned int last_id;
194
195        // number of items on this list
196        // needed only to check for deadlocks
197        unsigned int count;
198#endif
199
200        // Optional statistic counters
201        #if !defined(__CFA_NO_SCHED_STATS__)
202                struct __attribute__((aligned(64))) {
203                        // difference between number of push and pops
204                        ssize_t diff;
205
206                        // total number of pushes and pops
207                        size_t  push;
208                        size_t  pop ;
209                } stat;
210        #endif
211};
212
213void  ?{}(__intrusive_lane_t & this);
214void ^?{}(__intrusive_lane_t & this);
215
216// Get the head pointer (one before the first element) from the anchor
217static inline $thread * head(const __intrusive_lane_t & this) {
218        $thread * rhead = ($thread *)(
219                (uintptr_t)( &this.before ) - offsetof( $thread, link )
220        );
221        /* paranoid */ verify(rhead);
222        return rhead;
223}
224
225// Get the tail pointer (one after the last element) from the anchor
226static inline $thread * tail(const __intrusive_lane_t & this) {
227        $thread * rtail = ($thread *)(
228                (uintptr_t)( &this.after ) - offsetof( $thread, link )
229        );
230        /* paranoid */ verify(rtail);
231        return rtail;
232}
233
234// Ctor
235void ?{}( __intrusive_lane_t & this ) {
236        this.lock = false;
237        #if defined(__CFA_WITH_VERIFY__)
238                this.last_id = -1u;
239                this.count = 0u;
240        #endif
241
242        this.before.link.prev = 0p;
243        this.before.link.next = tail(this);
244        this.before.link.ts   = 0;
245
246        this.after .link.prev = head(this);
247        this.after .link.next = 0p;
248        this.after .link.ts   = 0;
249
250        #if !defined(__CFA_NO_SCHED_STATS__)
251                this.stat.diff = 0;
252                this.stat.push = 0;
253                this.stat.pop  = 0;
254        #endif
255
256        // We add a boat-load of assertions here because the anchor code is very fragile
257        /* paranoid */ verify(((uintptr_t)( head(this) ) + offsetof( $thread, link )) == (uintptr_t)(&this.before));
258        /* paranoid */ verify(((uintptr_t)( tail(this) ) + offsetof( $thread, link )) == (uintptr_t)(&this.after ));
259        /* paranoid */ verify(head(this)->link.prev == 0p );
260        /* paranoid */ verify(head(this)->link.next == tail(this) );
261        /* paranoid */ verify(tail(this)->link.next == 0p );
262        /* paranoid */ verify(tail(this)->link.prev == head(this) );
263        /* paranoid */ verify(&head(this)->link.prev == &this.before.link.prev );
264        /* paranoid */ verify(&head(this)->link.next == &this.before.link.next );
265        /* paranoid */ verify(&tail(this)->link.prev == &this.after .link.prev );
266        /* paranoid */ verify(&tail(this)->link.next == &this.after .link.next );
267        /* paranoid */ verify(sizeof(__intrusive_lane_t) == 128);
268        /* paranoid */ verify(sizeof(this) == 128);
269        /* paranoid */ verify(__alignof__(__intrusive_lane_t) == 128);
270        /* paranoid */ verify(__alignof__(this) == 128);
271        /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128));
272}
273
274// Dtor is trivial
275void ^?{}( __intrusive_lane_t & this ) {
276        // Make sure the list is empty
277        /* paranoid */ verify(head(this)->link.prev == 0p );
278        /* paranoid */ verify(head(this)->link.next == tail(this) );
279        /* paranoid */ verify(tail(this)->link.next == 0p );
280        /* paranoid */ verify(tail(this)->link.prev == head(this) );
281        /* paranoid */ verify(this.count == 0u );
282}
283
284// Push a thread onto this lane
285// returns true of lane was empty before push, false otherwise
286bool push(__intrusive_lane_t & this, $thread * node) {
287        #if defined(__CFA_WITH_VERIFY__)
288                /* paranoid */ verify(this.lock);
289                /* paranoid */ verify(node->link.ts != 0);
290                /* paranoid */ verify(node->link.next == 0p);
291                /* paranoid */ verify(node->link.prev == 0p);
292                /* paranoid */ verify(tail(this)->link.next == 0p);
293                /* paranoid */ verify(head(this)->link.prev == 0p);
294
295                this.count++;
296
297                if(this.before.link.ts == 0l) {
298                        /* paranoid */ verify(tail(this)->link.prev == head(this));
299                        /* paranoid */ verify(head(this)->link.next == tail(this));
300                } else {
301                        /* paranoid */ verify(tail(this)->link.prev != head(this));
302                        /* paranoid */ verify(head(this)->link.next != tail(this));
303                }
304        #endif
305
306        // Get the relevant nodes locally
307        $thread * tail = tail(this);
308        $thread * prev = tail->link.prev;
309
310        // Do the push
311        node->link.next = tail;
312        node->link.prev = prev;
313        prev->link.next = node;
314        tail->link.prev = node;
315
316        // Update stats
317        #if !defined(__CFA_NO_SCHED_STATS__)
318                this.stat.diff++;
319                this.stat.push++;
320        #endif
321
322        verify(node->link.next == tail(this));
323
324        // Check if the queue used to be empty
325        if(this.before.link.ts == 0l) {
326                this.before.link.ts = node->link.ts;
327                /* paranoid */ verify(node->link.prev == head(this));
328                return true;
329        }
330        return false;
331}
332
333// Pop a thread from this lane (must be non-empty)
334// returns popped
335// returns true of lane was empty before push, false otherwise
336[$thread *, bool] pop(__intrusive_lane_t & this) {
337        /* paranoid */ verify(this.lock);
338        /* paranoid */ verify(this.before.link.ts != 0ul);
339
340        // Get anchors locally
341        $thread * head = head(this);
342        $thread * tail = tail(this);
343
344        // Get the relevant nodes locally
345        $thread * node = head->link.next;
346        $thread * next = node->link.next;
347
348        #if defined(__CFA_WITH_VERIFY__)
349                this.count--;
350                /* paranoid */ verify(node != tail);
351                /* paranoid */ verify(node);
352        #endif
353
354        // Do the pop
355        head->link.next = next;
356        next->link.prev = head;
357        node->link.[next, prev] = 0p;
358
359        // Update head time stamp
360        this.before.link.ts = next->link.ts;
361
362        // Update stats
363        #ifndef __CFA_NO_SCHED_STATS__
364                this.stat.diff--;
365                this.stat.pop ++;
366        #endif
367
368        // Check if we emptied list and return accordingly
369        /* paranoid */ verify(tail(this)->link.next == 0p);
370        /* paranoid */ verify(head(this)->link.prev == 0p);
371        if(next == tail) {
372                /* paranoid */ verify(this.before.link.ts == 0);
373                /* paranoid */ verify(tail(this)->link.prev == head(this));
374                /* paranoid */ verify(head(this)->link.next == tail(this));
375                return [node, true];
376        }
377        else {
378                /* paranoid */ verify(next->link.ts != 0);
379                /* paranoid */ verify(tail(this)->link.prev != head(this));
380                /* paranoid */ verify(head(this)->link.next != tail(this));
381                /* paranoid */ verify(this.before.link.ts != 0);
382                return [node, false];
383        }
384}
385
386// Check whether or not list is empty
387static inline bool is_empty(__intrusive_lane_t & this) {
388        // Cannot verify here since it may not be locked
389        return this.before.link.ts == 0;
390}
391
392// Return the timestamp
393static inline unsigned long long ts(__intrusive_lane_t & this) {
394        // Cannot verify here since it may not be locked
395        return this.before.link.ts;
396}
397
398//=======================================================================
399// Scalable Non-Zero counter
400//=======================================================================
401
402union __snzi_val_t {
403        uint64_t _all;
404        struct __attribute__((packed)) {
405                char cnt;
406                uint64_t ver:56;
407        };
408};
409
410bool cas(volatile __snzi_val_t & self, __snzi_val_t & exp, char _cnt, uint64_t _ver) {
411        __snzi_val_t t;
412        t.ver = _ver;
413        t.cnt = _cnt;
414        /* paranoid */ verify(t._all == ((_ver << 8) | ((unsigned char)_cnt)));
415        return __atomic_compare_exchange_n(&self._all, &exp._all, t._all, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
416}
417
418bool cas(volatile __snzi_val_t & self, __snzi_val_t & exp, const __snzi_val_t & tar) {
419        return __atomic_compare_exchange_n(&self._all, &exp._all, tar._all, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
420}
421
422void ?{}( __snzi_val_t & this ) { this._all = 0; }
423void ?{}( __snzi_val_t & this, const volatile __snzi_val_t & o) { this._all = o._all; }
424
425struct __attribute__((aligned(128))) __snzi_node_t {
426        volatile __snzi_val_t value;
427        struct __snzi_node_t * parent;
428        bool is_root;
429};
430
431static inline void arrive( __snzi_node_t & );
432static inline void depart( __snzi_node_t & );
433
434#define __snzi_half -1
435
436//--------------------------------------------------
437// Root node
438static void arrive_r( __snzi_node_t & this ) {
439        /* paranoid */ verify( this.is_root );
440        __atomic_fetch_add(&this.value._all, 1, __ATOMIC_SEQ_CST);
441}
442
443static void depart_r( __snzi_node_t & this ) {
444        /* paranoid */ verify( this.is_root );
445        __atomic_fetch_sub(&this.value._all, 1, __ATOMIC_SEQ_CST);
446}
447
448//--------------------------------------------------
449// Hierarchical node
450static void arrive_h( __snzi_node_t & this ) {
451        int undoArr = 0;
452        bool success = false;
453        while(!success) {
454                __snzi_val_t x = { this.value };
455                /* paranoid */ verify(x.cnt <= 120);
456                if( x.cnt >= 1 ) {
457                        if( cas( this.value, x, x.cnt + 1, x.ver ) ) {
458                                success = true;
459                        }
460                }
461                /* paranoid */ verify(x.cnt <= 120);
462                if( x.cnt == 0 ) {
463                        if( cas( this.value, x, __snzi_half, x.ver + 1) ) {
464                                success = true;
465                                x.cnt = __snzi_half;
466                                x.ver = x.ver + 1;
467                        }
468                }
469                /* paranoid */ verify(x.cnt <= 120);
470                if( x.cnt == __snzi_half ) {
471                        /* paranoid */ verify( this.parent);
472                        arrive( *this.parent );
473                        if( !cas( this.value, x, 1, x.ver) ) {
474                                undoArr = undoArr + 1;
475                        }
476                }
477        }
478
479        for(int i = 0; i < undoArr; i++) {
480                /* paranoid */ verify( this.parent );
481                depart( *this.parent );
482        }
483}
484
485static void depart_h( __snzi_node_t & this ) {
486        while(true) {
487                const __snzi_val_t x = { this.value };
488                /* paranoid */ verifyf(x.cnt >= 1, "%d", x.cnt);
489                if( cas( this.value, x, x.cnt - 1, x.ver ) ) {
490                        if( x.cnt == 1 ) {
491                                /* paranoid */ verify( this.parent );
492                                depart( *this.parent );
493                        }
494                        return;
495                }
496        }
497}
498
499//--------------------------------------------------
500// All nodes
501static inline void arrive( __snzi_node_t & this ) {
502        if(this.is_root) arrive_r( this );
503        else arrive_h( this );
504}
505
506static inline void depart( __snzi_node_t & this ) {
507        if(this.is_root) depart_r( this );
508        else depart_h( this );
509}
510
511static inline bool query( __snzi_node_t & this ) {
512        /* paranoid */ verify( this.is_root );
513        return this.value._all > 0;
514}
515
516//--------------------------------------------------
517// SNZI object
518void  ?{}( __snzi_t & this, unsigned depth ) with( this ) {
519        mask = (1 << depth) - 1;
520        root = (1 << (depth + 1)) - 2;
521        nodes = alloc( root + 1 );
522
523        int width = 1 << depth;
524        for(int i = 0; i < root; i++) {
525                nodes[i].value._all = 0;
526                nodes[i].parent = &nodes[(i / 2) + width ];
527                nodes[i].is_root = false;
528        }
529
530        nodes[ root ].value._all = 0;
531        nodes[ root ].parent = 0p;
532        nodes[ root ].is_root = true;
533}
534
535void ^?{}( __snzi_t & this ) {
536        free( this.nodes );
537}
538
539static inline void arrive( __snzi_t & this, int idx) {
540        idx &= this.mask;
541        arrive( this.nodes[idx] );
542}
543
544static inline void depart( __snzi_t & this, int idx) {
545        idx &= this.mask;
546        depart( this.nodes[idx] );
547}
548
549static inline bool query( const __snzi_t & this ) {
550        return query( this.nodes[ this.root ] );
551}
552
553//=======================================================================
554// Cforall Reqdy Queue used by ready queue
555//=======================================================================
556
557// Thread local mirror of ready queue statistics
558#if !defined(__CFA_NO_STATISTICS__)
559static __attribute__((aligned(128))) thread_local struct {
560        struct {
561                struct {
562                        size_t attempt;
563                        size_t success;
564                } push;
565                struct {
566                        size_t maskrds;
567                        size_t attempt;
568                        size_t success;
569                } pop;
570        } pick;
571        struct {
572                size_t value;
573                size_t count;
574        } used;
575} tls = {
576        /* pick */{
577                /* push */{ 0, 0 },
578                /* pop  */{ 0, 0, 0 },
579        },
580        /* used */{ 0, 0 }
581};
582#endif
583
584//-----------------------------------------------------------------------
585
586void ?{}(__ready_queue_t & this) with (this) {
587
588        lanes.data = alloc(4);
589        for( i; 4 ) {
590                (lanes.data[i]){};
591        }
592        lanes.count = 4;
593        snzi{ log2( lanes.count / 8 ) };
594
595        #if !defined(__CFA_NO_STATISTICS__)
596                global_stats.pick.push.attempt = 0;
597                global_stats.pick.push.success = 0;
598                global_stats.pick.pop .maskrds = 0;
599                global_stats.pick.pop .attempt = 0;
600                global_stats.pick.pop .success = 0;
601
602                global_stats.used.value = 0;
603                global_stats.used.count = 0;
604        #endif
605}
606
607void ^?{}(__ready_queue_t & this) with (this) {
608        verify( 4  == lanes.count );
609        verify( !query( snzi ) );
610
611        ^(snzi){};
612
613        for( i; 4 ) {
614                ^(lanes.data[i]){};
615        }
616        free(lanes.data);
617}
618
619//-----------------------------------------------------------------------
620__attribute__((hot)) bool push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
621        __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
622
623        // write timestamp
624        thrd->link.ts = rdtscl();
625
626        // Try to pick a lane and lock it
627        unsigned i;
628        do {
629                // Pick the index of a lane
630                i = __tls_rand() % lanes.count;
631
632                #if !defined(__CFA_NO_STATISTICS__)
633                        tls.pick.push.attempt++;
634                #endif
635
636                // If we can't lock it retry
637        } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
638
639        #if defined(__CFA_WITH_VERIFY__)
640                /* paranoid */ verify(lanes.data[i].last_id == -1u);
641                /* paranoid */ lanes.data[i].last_id = kernelTLS.this_processor->id;
642        #endif
643
644        bool first = false;
645
646        // Actually push it
647        bool lane_first = push(lanes.data[i], thrd);
648
649        // If this lane used to be empty we need to do more
650        if(lane_first) {
651                // Check if the entire queue used to be empty
652                first = !query(snzi);
653
654                // Update the snzi
655                arrive( snzi, i );
656        }
657
658        #if defined(__CFA_WITH_VERIFY__)
659                /* paranoid */ verifyf( lanes.data[i].last_id == kernelTLS.this_processor->id, "Expected last processor to lock queue %u to be %u, was %u\n", i, lanes.data[i].last_id, kernelTLS.this_processor->id );
660                /* paranoid */ verifyf( lanes.data[i].lock, "List %u is not locked\n", i );
661                /* paranoid */ lanes.data[i].last_id = -1u;
662        #endif
663
664        // Unlock and return
665        __atomic_unlock( &lanes.data[i].lock );
666
667        __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
668
669        // Update statistics
670        #if !defined(__CFA_NO_STATISTICS__)
671                tls.pick.push.success++;
672        #endif
673
674        // return whether or not the list was empty before this push
675        return first;
676}
677
678//-----------------------------------------------------------------------
679// Given 2 indexes, pick the list with the oldest push an try to pop from it
680static struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
681        #if !defined(__CFA_NO_STATISTICS__)
682                tls.pick.pop.attempt++;
683        #endif
684
685        // Pick the bet list
686        int w = i;
687        if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
688                w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
689        }
690
691        // Get relevant elements locally
692        __intrusive_lane_t & lane = lanes.data[w];
693
694        // If list looks empty retry
695        if( is_empty(lane) ) return 0p;
696
697        // If we can't get the lock retry
698        if( !__atomic_try_acquire(&lane.lock) ) return 0p;
699
700        #if defined(__CFA_WITH_VERIFY__)
701                /* paranoid */ verify(lane.last_id == -1u);
702                /* paranoid */ lane.last_id = kernelTLS.this_processor->id;
703        #endif
704
705
706        // If list is empty, unlock and retry
707        if( is_empty(lane) ) {
708                #if defined(__CFA_WITH_VERIFY__)
709                        /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id);
710                        /* paranoid */ lane.last_id = -1u;
711                #endif
712
713                __atomic_unlock(&lane.lock);
714                return 0p;
715        }
716
717        // Actually pop the list
718        struct $thread * thrd;
719        bool emptied;
720        [thrd, emptied] = pop(lane);
721
722        /* paranoid */ verify(thrd);
723        /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id);
724        /* paranoid */ verify(lane.lock);
725
726        // If this was the last element in the lane
727        if(emptied) {
728                depart( snzi, w );
729        }
730
731        #if defined(__CFA_WITH_VERIFY__)
732                /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id);
733                /* paranoid */ lane.last_id = -1u;
734        #endif
735
736        // Unlock and return
737        __atomic_unlock(&lane.lock);
738
739        // Update statistics
740        #if !defined(__CFA_NO_STATISTICS__)
741                tls.pick.pop.success++;
742        #endif
743
744        // return the popped thread
745        return thrd;
746}
747
748// Pop from the ready queue from a given cluster
749__attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) {
750        /* paranoid */ verify( lanes.count > 0 );
751
752        // As long as the list is not empty, try finding a lane that isn't empty and pop from it
753        while( query(snzi) ) {
754                // Pick two lists at random
755                int i = __tls_rand() % __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
756                int j = __tls_rand() % __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
757
758                // try popping from the 2 picked lists
759                struct $thread * thrd = try_pop(cltr, i, j);
760                if(thrd) return thrd;
761        }
762
763        // All lanes where empty return 0p
764        return 0p;
765}
766
767//-----------------------------------------------------------------------
768
769static void check( __ready_queue_t & q ) with (q) {
770        #if defined(__CFA_WITH_VERIFY__)
771                {
772                        for( idx ; lanes.count ) {
773                                __intrusive_lane_t & sl = lanes.data[idx];
774                                assert(!lanes.data[idx].lock);
775
776                                assert(head(sl)->link.prev == 0p );
777                                assert(head(sl)->link.next->link.prev == head(sl) );
778                                assert(tail(sl)->link.next == 0p );
779                                assert(tail(sl)->link.prev->link.next == tail(sl) );
780
781                                if(sl.before.link.ts == 0l) {
782                                        assert(tail(sl)->link.prev == head(sl));
783                                        assert(head(sl)->link.next == tail(sl));
784                                } else {
785                                        assert(tail(sl)->link.prev != head(sl));
786                                        assert(head(sl)->link.next != tail(sl));
787                                }
788                        }
789                }
790        #endif
791}
792
793// Call this function of the intrusive list was moved using memcpy
794// fixes the list so that the pointers back to anchors aren't left dangling
795static inline void fix(__intrusive_lane_t & ll) {
796        // if the list is not empty then follow he pointer and fix its reverse
797        if(!is_empty(ll)) {
798                head(ll)->link.next->link.prev = head(ll);
799                tail(ll)->link.prev->link.next = tail(ll);
800        }
801        // Otherwise just reset the list
802        else {
803                verify(tail(ll)->link.next == 0p);
804                tail(ll)->link.prev = head(ll);
805                head(ll)->link.next = tail(ll);
806                verify(head(ll)->link.prev == 0p);
807        }
808}
809
810// Grow the ready queue
811void ready_queue_grow  (struct cluster * cltr) {
812        // Lock the RWlock so no-one pushes/pops while we are changing the queue
813        uint_fast32_t last_size = ready_mutate_lock( *cltr );
814
815        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
816
817        // Make sure that everything is consistent
818        /* paranoid */ check( cltr->ready_queue );
819
820        // grow the ready queue
821        with( cltr->ready_queue ) {
822                ^(snzi){};
823
824                size_t ncount = lanes.count;
825
826                // increase count
827                ncount += 4;
828
829                // Allocate new array (uses realloc and memcpies the data)
830                lanes.data = alloc(lanes.data, ncount);
831
832                // Fix the moved data
833                for( idx; (size_t)lanes.count ) {
834                        fix(lanes.data[idx]);
835                }
836
837                // Construct new data
838                for( idx; (size_t)lanes.count ~ ncount) {
839                        (lanes.data[idx]){};
840                }
841
842                // Update original
843                lanes.count = ncount;
844
845                // Re-create the snzi
846                snzi{ log2( lanes.count / 8 ) };
847                for( idx; (size_t)lanes.count ) {
848                        if( !is_empty(lanes.data[idx]) ) {
849                                arrive(snzi, idx);
850                        }
851                }
852        }
853
854        // Make sure that everything is consistent
855        /* paranoid */ check( cltr->ready_queue );
856
857        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
858
859        // Unlock the RWlock
860        ready_mutate_unlock( *cltr, last_size );
861}
862
863// Shrink the ready queue
864void ready_queue_shrink(struct cluster * cltr) {
865        // Lock the RWlock so no-one pushes/pops while we are changing the queue
866        uint_fast32_t last_size = ready_mutate_lock( *cltr );
867
868        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
869
870        // Make sure that everything is consistent
871        /* paranoid */ check( cltr->ready_queue );
872
873        with( cltr->ready_queue ) {
874                ^(snzi){};
875
876                // Make sure that the total thread count stays the same
877                #if defined(__CFA_WITH_VERIFY__)
878                        size_t nthreads = 0;
879                        for( idx; (size_t)lanes.count ) {
880                                nthreads += lanes.data[idx].count;
881                        }
882                #endif
883
884                size_t ocount = lanes.count;
885                // Check that we have some space left
886                if(ocount < 8) abort("Program attempted to destroy more Ready Queues than were created");
887
888                // reduce the actual count so push doesn't use the old queues
889                lanes.count -= 4;
890                verify(ocount > lanes.count);
891
892                // for printing count the number of displaced threads
893                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
894                        __attribute__((unused)) size_t displaced = 0;
895                #endif
896
897                // redistribute old data
898                for( idx; (size_t)lanes.count ~ ocount) {
899                        // Lock is not strictly needed but makes checking invariants much easier
900                        __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
901                        verify(locked);
902
903                        // As long as we can pop from this lane to push the threads somewhere else in the queue
904                        while(!is_empty(lanes.data[idx])) {
905                                struct $thread * thrd;
906                                __attribute__((unused)) bool _;
907                                [thrd, _] = pop(lanes.data[idx]);
908
909                                push(cltr, thrd);
910
911                                // for printing count the number of displaced threads
912                                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
913                                        displaced++;
914                                #endif
915                        }
916
917                        // Unlock the lane
918                        __atomic_unlock(&lanes.data[idx].lock);
919
920                        // TODO print the queue statistics here
921
922                        ^(lanes.data[idx]){};
923                }
924
925                __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
926
927                // Allocate new array (uses realloc and memcpies the data)
928                lanes.data = alloc(lanes.data, lanes.count);
929
930                // Fix the moved data
931                for( idx; (size_t)lanes.count ) {
932                        fix(lanes.data[idx]);
933                }
934
935                // Re-create the snzi
936                snzi{ log2( lanes.count / 8 ) };
937                for( idx; (size_t)lanes.count ) {
938                        if( !is_empty(lanes.data[idx]) ) {
939                                arrive(snzi, idx);
940                        }
941                }
942
943                // Make sure that the total thread count stayed the same
944                #if defined(__CFA_WITH_VERIFY__)
945                        for( idx; (size_t)lanes.count ) {
946                                nthreads -= lanes.data[idx].count;
947                        }
948                        verifyf(nthreads == 0, "Shrinking changed number of threads");
949                #endif
950        }
951
952        // Make sure that everything is consistent
953        /* paranoid */ check( cltr->ready_queue );
954
955        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
956
957        // Unlock the RWlock
958        ready_mutate_unlock( *cltr, last_size );
959}
960
961//-----------------------------------------------------------------------
962
963#if !defined(__CFA_NO_STATISTICS__)
964void stats_tls_tally(struct cluster * cltr) with (cltr->ready_queue) {
965        __atomic_fetch_add( &global_stats.pick.push.attempt, tls.pick.push.attempt, __ATOMIC_SEQ_CST );
966        __atomic_fetch_add( &global_stats.pick.push.success, tls.pick.push.success, __ATOMIC_SEQ_CST );
967        __atomic_fetch_add( &global_stats.pick.pop .maskrds, tls.pick.pop .maskrds, __ATOMIC_SEQ_CST );
968        __atomic_fetch_add( &global_stats.pick.pop .attempt, tls.pick.pop .attempt, __ATOMIC_SEQ_CST );
969        __atomic_fetch_add( &global_stats.pick.pop .success, tls.pick.pop .success, __ATOMIC_SEQ_CST );
970
971        __atomic_fetch_add( &global_stats.used.value, tls.used.value, __ATOMIC_SEQ_CST );
972        __atomic_fetch_add( &global_stats.used.count, tls.used.count, __ATOMIC_SEQ_CST );
973}
974#endif
Note: See TracBrowser for help on using the repository browser.