source: libcfa/src/concurrency/ready_queue.cfa@ d4f1521

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since d4f1521 was 7768b8d, checked in by Thierry Delisle <tdelisle@…>, 6 years ago

First step at adding the new ready queue to Cforall

  • Property mode set to 100644
File size: 9.3 KB
RevLine 
[7768b8d]1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
17
18#include "bits/defs.hfa"
19#include "kernel_private.hfa"
20
21#define _GNU_SOURCE
22#include "stdlib.hfa"
23
24static const size_t cache_line_size = 64;
25
26static inline unsigned __max_processors_fallback() {
27 #ifdef __CFA_MAX_PROCESSORS__
28 return __CFA_MAX_PROCESSORS__;
29 #else
30 // No overriden function, no environment variable, no define
31 // fall back to a magic number
32 return 128;
33 #endif
34}
35
36__attribute__((weak)) unsigned __max_processors() {
37 const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
38 if(!max_cores_s) {
39 __cfaabi_dbg_print_nolock("No CFA_MAX_PROCESSORS in ENV");
40 return __max_processors_fallback();
41 }
42
43 char * endptr = 0p;
44 long int max_cores_l = strtol(max_cores_s, &endptr, 10);
45 if(max_cores_l < 1 || max_cores_l > 65535) {
46 __cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS out of range : %ld", max_cores_l);
47 return __max_processors_fallback();
48 }
49 if('\0' != *endptr) {
50 __cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS not a decimal number : %s", max_cores_s);
51 return __max_processors_fallback();
52 }
53
54 return max_cores_l;
55}
56
57//=======================================================================
58// Cluster wide reader-writer lock
59//=======================================================================
60void ?{}(__clusterRWLock_t & this) {
61 this.max = __max_processors();
62 this.alloc = 0;
63 this.ready = 0;
64 this.lock = false;
65 this.data = alloc(this.max);
66
67 /*paranoid*/ verify( 0 == (((uintptr_t)(this.data )) % 64) );
68 /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
69 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
70 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
71
72}
73void ^?{}(__clusterRWLock_t & this) {
74 free(this.data);
75}
76
77void ?{}( __processor_id & this, struct processor * proc ) {
78 this.handle = proc;
79 this.lock = false;
80}
81
82//=======================================================================
83// Lock-Free registering/unregistering of threads
84unsigned doregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
85 // Step - 1 : check if there is already space in the data
86 uint_fast32_t s = ready;
87
88 // Check among all the ready
89 for(uint_fast32_t i = 0; i < s; i++) {
90 processor * null = 0p; // Re-write every loop since compare thrashes it
91 if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
92 && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
93 /*paranoid*/ verify(i < ready);
94 /*paranoid*/ verify(__alignof__(data[i]) == cache_line_size);
95 /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
96 return i;
97 }
98 }
99
100 if(max <= alloc) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
101
102 // Step - 2 : F&A to get a new spot in the array.
103 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
104 if(max <= n) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
105
106 // Step - 3 : Mark space as used and then publish it.
107 __processor_id * storage = (__processor_id *)&data[n];
108 (*storage){ proc };
109 while(true) {
110 unsigned copy = n;
111 if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
112 && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
113 break;
114 asm volatile("pause");
115 }
116
117 // Return new spot.
118 /*paranoid*/ verify(n < ready);
119 /*paranoid*/ verify(__alignof__(data[n]) == cache_line_size);
120 /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
121 return n;
122}
123
124void unregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
125 unsigned id = proc->id;
126 /*paranoid*/ verify(id < ready);
127 /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
128 __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
129}
130
131//-----------------------------------------------------------------------
132// Writer side : acquire when changing the ready queue, e.g. adding more
133// queues or removing them.
134uint_fast32_t ready_mutate_lock( struct cluster & cltr ) with(cltr.ready_lock) {
135 // Step 1 : lock global lock
136 // It is needed to avoid processors that register mid Critical-Section
137 // to simply lock their own lock and enter.
138 __atomic_acquire( &lock );
139
140 // Step 2 : lock per-proc lock
141 // Processors that are currently being registered aren't counted
142 // but can't be in read_lock or in the critical section.
143 // All other processors are counted
144 uint_fast32_t s = ready;
145 for(uint_fast32_t i = 0; i < s; i++) {
146 __atomic_acquire( &data[i].lock );
147 }
148
149 return s;
150}
151
152void ready_mutate_unlock( struct cluster & cltr, uint_fast32_t last_s ) with(cltr.ready_lock) {
153 // Step 1 : release local locks
154 // This must be done while the global lock is held to avoid
155 // threads that where created mid critical section
156 // to race to lock their local locks and have the writer
157 // immidiately unlock them
158 // Alternative solution : return s in write_lock and pass it to write_unlock
159 for(uint_fast32_t i = 0; i < last_s; i++) {
160 verify(data[i].lock);
161 __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
162 }
163
164 // Step 2 : release global lock
165 /*paranoid*/ assert(true == lock);
166 __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
167}
168
169//=======================================================================
170// Intrusive Queue used by ready queue
171//=======================================================================
172static const size_t fields_offset = offsetof( thread_desc, next );
173
174// Get the head pointer (one before the first element) from the anchor
175static inline thread_desc * head(const __intrusive_ready_queue_t & this) {
176 thread_desc * rhead = (thread_desc *)(
177 (uintptr_t)( &this.before ) - fields_offset
178 );
179 /* paranoid */ verify(rhead);
180 return rhead;
181}
182
183// Get the tail pointer (one after the last element) from the anchor
184static inline thread_desc * tail(const __intrusive_ready_queue_t & this) {
185 thread_desc * rtail = (thread_desc *)(
186 (uintptr_t)( &this.after ) - fields_offset
187 );
188 /* paranoid */ verify(rtail);
189 return rtail;
190}
191
192// Ctor
193void ?{}( __intrusive_ready_queue_t & this ) {
194 this.before.prev = 0p;
195 this.before.next = tail(this);
196
197 this.after .prev = head(this);
198 this.after .next = 0p;
199
200 // We add a boat-load of assertions here because the anchor code is very fragile
201 /* paranoid */ verify(((uintptr_t)( head(this) ) + fields_offset) == (uintptr_t)(&this.before));
202 /* paranoid */ verify(((uintptr_t)( tail(this) ) + fields_offset) == (uintptr_t)(&this.after ));
203 /* paranoid */ verify(head(this)->prev == 0p );
204 /* paranoid */ verify(head(this)->next == tail(this) );
205 /* paranoid */ verify(tail(this)->next == 0p );
206 /* paranoid */ verify(tail(this)->prev == head(this) );
207 /* paranoid */ verify(&head(this)->prev == &this.before.prev );
208 /* paranoid */ verify(&head(this)->next == &this.before.next );
209 /* paranoid */ verify(&tail(this)->prev == &this.after .prev );
210 /* paranoid */ verify(&tail(this)->next == &this.after .next );
211 /* paranoid */ verify(sizeof(__intrusive_ready_queue_t) == 128);
212 /* paranoid */ verify(sizeof(this) == 128);
213 /* paranoid */ verify(__alignof__(__intrusive_ready_queue_t) == 128);
214 /* paranoid */ verify(__alignof__(this) == 128);
215 /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128));
216}
217
218// Dtor is trivial
219void ^?{}( __intrusive_ready_queue_t & this ) {
220 // Make sure the list is empty
221 /* paranoid */ verify(head(this)->prev == 0p );
222 /* paranoid */ verify(head(this)->next == tail(this) );
223 /* paranoid */ verify(tail(this)->next == 0p );
224 /* paranoid */ verify(tail(this)->prev == head(this) );
225}
226
227
228
229bool push(__intrusive_ready_queue_t & this, thread_desc * node) {
230 verify(this.lock);
231 verify(node->ts != 0);
232 verify(node->next == 0p);
233 verify(node->prev == 0p);
234
235
236 // Get the relevant nodes locally
237 thread_desc * tail = tail(this);
238 thread_desc * prev = tail->prev;
239
240 // Do the push
241 node->next = tail;
242 node->prev = prev;
243 prev->next = node;
244 tail->prev = node;
245
246 // Update stats
247 #ifndef __CFA_NO_SCHED_STATS__
248 this.stat.diff++;
249 this.stat.push++;
250 #endif
251
252 // Check if the queue used to be empty
253 if(this.before.ts == 0l) {
254 this.before.ts = node->ts;
255 verify(node->prev == head(this));
256 return true;
257 }
258 return false;
259}
260
261[thread_desc *, bool] pop(__intrusive_ready_queue_t & this) {
262 verify(this.lock);
263 thread_desc * head = head(this);
264 thread_desc * tail = tail(this);
265
266 thread_desc * node = head->next;
267 thread_desc * next = node->next;
268 if(node == tail) return [0p, false];
269
270 /* paranoid */ verify(node);
271
272 head->next = next;
273 next->prev = head;
274
275 #ifndef __CFA_NO_SCHED_STATS__
276 this.stat.diff--;
277 this.stat.pop ++;
278 #endif
279
280 if(next == tail) {
281 this.before.ts = 0ul;
282 node->[next, prev] = 0p;
283 return [node, true];
284 }
285 else {
286 verify(next->ts != 0);
287 this.before.ts = next->ts;
288 verify(this.before.ts != 0);
289 node->[next, prev] = 0p;
290 return [node, false];
291 }
292}
293
294static inline unsigned long long ts(__intrusive_ready_queue_t & this) {
295 return this.before.ts;
296}
Note: See TracBrowser for help on using the repository browser.