source: libcfa/src/concurrency/ready_queue.cfa @ 7768b8d

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 7768b8d was 7768b8d, checked in by Thierry Delisle <tdelisle@…>, 6 years ago

First step at adding the new ready queue to Cforall

  • Property mode set to 100644
File size: 9.3 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#define __cforall_thread__
17
18#include "bits/defs.hfa"
19#include "kernel_private.hfa"
20
21#define _GNU_SOURCE
22#include "stdlib.hfa"
23
24static const size_t cache_line_size = 64;
25
26static inline unsigned __max_processors_fallback() {
27        #ifdef __CFA_MAX_PROCESSORS__
28                return __CFA_MAX_PROCESSORS__;
29        #else
30                // No overriden function, no environment variable, no define
31                // fall back to a magic number
32                return 128;
33        #endif
34}
35
36__attribute__((weak)) unsigned __max_processors() {
37        const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
38        if(!max_cores_s) {
39                __cfaabi_dbg_print_nolock("No CFA_MAX_PROCESSORS in ENV");
40                return __max_processors_fallback();
41        }
42
43        char * endptr = 0p;
44        long int max_cores_l = strtol(max_cores_s, &endptr, 10);
45        if(max_cores_l < 1 || max_cores_l > 65535) {
46                __cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS out of range : %ld", max_cores_l);
47                return __max_processors_fallback();
48        }
49        if('\0' != *endptr) {
50                __cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS not a decimal number : %s", max_cores_s);
51                return __max_processors_fallback();
52        }
53
54        return max_cores_l;
55}
56
57//=======================================================================
58// Cluster wide reader-writer lock
59//=======================================================================
60void  ?{}(__clusterRWLock_t & this) {
61        this.max   = __max_processors();
62        this.alloc = 0;
63        this.ready = 0;
64        this.lock  = false;
65        this.data  = alloc(this.max);
66
67        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data    )) % 64) );
68        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
69        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
70        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
71
72}
73void ^?{}(__clusterRWLock_t & this) {
74        free(this.data);
75}
76
77void ?{}( __processor_id & this, struct processor * proc ) {
78        this.handle = proc;
79        this.lock   = false;
80}
81
82//=======================================================================
83// Lock-Free registering/unregistering of threads
84unsigned doregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
85        // Step - 1 : check if there is already space in the data
86        uint_fast32_t s = ready;
87
88        // Check among all the ready
89        for(uint_fast32_t i = 0; i < s; i++) {
90                processor * null = 0p; // Re-write every loop since compare thrashes it
91                if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
92                        && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
93                        /*paranoid*/ verify(i < ready);
94                        /*paranoid*/ verify(__alignof__(data[i]) == cache_line_size);
95                        /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
96                        return i;
97                }
98        }
99
100        if(max <= alloc) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
101
102        // Step - 2 : F&A to get a new spot in the array.
103        uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
104        if(max <= n) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
105
106        // Step - 3 : Mark space as used and then publish it.
107        __processor_id * storage = (__processor_id *)&data[n];
108        (*storage){ proc };
109        while(true) {
110                unsigned copy = n;
111                if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
112                        && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
113                        break;
114                asm volatile("pause");
115        }
116
117        // Return new spot.
118        /*paranoid*/ verify(n < ready);
119        /*paranoid*/ verify(__alignof__(data[n]) == cache_line_size);
120        /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
121        return n;
122}
123
124void unregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
125        unsigned id = proc->id;
126        /*paranoid*/ verify(id < ready);
127        /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
128        __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
129}
130
131//-----------------------------------------------------------------------
132// Writer side : acquire when changing the ready queue, e.g. adding more
133//  queues or removing them.
134uint_fast32_t ready_mutate_lock( struct cluster & cltr ) with(cltr.ready_lock) {
135        // Step 1 : lock global lock
136        // It is needed to avoid processors that register mid Critical-Section
137        //   to simply lock their own lock and enter.
138        __atomic_acquire( &lock );
139
140        // Step 2 : lock per-proc lock
141        // Processors that are currently being registered aren't counted
142        //   but can't be in read_lock or in the critical section.
143        // All other processors are counted
144        uint_fast32_t s = ready;
145        for(uint_fast32_t i = 0; i < s; i++) {
146                __atomic_acquire( &data[i].lock );
147        }
148
149        return s;
150}
151
152void ready_mutate_unlock( struct cluster & cltr, uint_fast32_t last_s ) with(cltr.ready_lock) {
153        // Step 1 : release local locks
154        // This must be done while the global lock is held to avoid
155        //   threads that where created mid critical section
156        //   to race to lock their local locks and have the writer
157        //   immidiately unlock them
158        // Alternative solution : return s in write_lock and pass it to write_unlock
159        for(uint_fast32_t i = 0; i < last_s; i++) {
160                verify(data[i].lock);
161                __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
162        }
163
164        // Step 2 : release global lock
165        /*paranoid*/ assert(true == lock);
166        __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
167}
168
169//=======================================================================
170// Intrusive Queue used by ready queue
171//=======================================================================
172static const size_t fields_offset = offsetof( thread_desc, next );
173
174// Get the head pointer (one before the first element) from the anchor
175static inline thread_desc * head(const __intrusive_ready_queue_t & this) {
176        thread_desc * rhead = (thread_desc *)(
177                (uintptr_t)( &this.before ) - fields_offset
178        );
179        /* paranoid */ verify(rhead);
180        return rhead;
181}
182
183// Get the tail pointer (one after the last element) from the anchor
184static inline thread_desc * tail(const __intrusive_ready_queue_t & this) {
185        thread_desc * rtail = (thread_desc *)(
186                (uintptr_t)( &this.after ) - fields_offset
187        );
188        /* paranoid */ verify(rtail);
189        return rtail;
190}
191
192// Ctor
193void ?{}( __intrusive_ready_queue_t & this ) {
194        this.before.prev = 0p;
195        this.before.next = tail(this);
196
197        this.after .prev = head(this);
198        this.after .next = 0p;
199
200        // We add a boat-load of assertions here because the anchor code is very fragile
201        /* paranoid */ verify(((uintptr_t)( head(this) ) + fields_offset) == (uintptr_t)(&this.before));
202        /* paranoid */ verify(((uintptr_t)( tail(this) ) + fields_offset) == (uintptr_t)(&this.after ));
203        /* paranoid */ verify(head(this)->prev == 0p );
204        /* paranoid */ verify(head(this)->next == tail(this) );
205        /* paranoid */ verify(tail(this)->next == 0p );
206        /* paranoid */ verify(tail(this)->prev == head(this) );
207        /* paranoid */ verify(&head(this)->prev == &this.before.prev );
208        /* paranoid */ verify(&head(this)->next == &this.before.next );
209        /* paranoid */ verify(&tail(this)->prev == &this.after .prev );
210        /* paranoid */ verify(&tail(this)->next == &this.after .next );
211        /* paranoid */ verify(sizeof(__intrusive_ready_queue_t) == 128);
212        /* paranoid */ verify(sizeof(this) == 128);
213        /* paranoid */ verify(__alignof__(__intrusive_ready_queue_t) == 128);
214        /* paranoid */ verify(__alignof__(this) == 128);
215        /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128));
216}
217
218// Dtor is trivial
219void ^?{}( __intrusive_ready_queue_t & this ) {
220        // Make sure the list is empty
221        /* paranoid */ verify(head(this)->prev == 0p );
222        /* paranoid */ verify(head(this)->next == tail(this) );
223        /* paranoid */ verify(tail(this)->next == 0p );
224        /* paranoid */ verify(tail(this)->prev == head(this) );
225}
226
227
228
229bool push(__intrusive_ready_queue_t & this, thread_desc * node) {
230        verify(this.lock);
231        verify(node->ts != 0);
232        verify(node->next == 0p);
233        verify(node->prev == 0p);
234
235
236        // Get the relevant nodes locally
237        thread_desc * tail = tail(this);
238        thread_desc * prev = tail->prev;
239
240        // Do the push
241        node->next = tail;
242        node->prev = prev;
243        prev->next = node;
244        tail->prev = node;
245
246        // Update stats
247        #ifndef __CFA_NO_SCHED_STATS__
248                this.stat.diff++;
249                this.stat.push++;
250        #endif
251
252        // Check if the queue used to be empty
253        if(this.before.ts == 0l) {
254                this.before.ts = node->ts;
255                verify(node->prev == head(this));
256                return true;
257        }
258        return false;
259}
260
261[thread_desc *, bool] pop(__intrusive_ready_queue_t & this) {
262        verify(this.lock);
263        thread_desc * head = head(this);
264        thread_desc * tail = tail(this);
265
266        thread_desc * node = head->next;
267        thread_desc * next = node->next;
268        if(node == tail) return [0p, false];
269
270        /* paranoid */ verify(node);
271
272        head->next = next;
273        next->prev = head;
274
275        #ifndef __CFA_NO_SCHED_STATS__
276                this.stat.diff--;
277                this.stat.pop ++;
278        #endif
279
280        if(next == tail) {
281                this.before.ts = 0ul;
282                node->[next, prev] = 0p;
283                return [node, true];
284        }
285        else {
286                verify(next->ts != 0);
287                this.before.ts = next->ts;
288                verify(this.before.ts != 0);
289                node->[next, prev] = 0p;
290                return [node, false];
291        }
292}
293
294static inline unsigned long long ts(__intrusive_ready_queue_t & this) {
295        return this.before.ts;
296}
Note: See TracBrowser for help on using the repository browser.