source: libcfa/src/bits/locks.hfa@ 930e57e

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since 930e57e was a77496cb, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

First draft at core objects of futures and basic tests

  • Property mode set to 100644
File size: 11.7 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2016 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// bits/locks.hfa -- Fast internal locks.
8//
9// Author : Thierry Delisle
10// Created On : Tue Oct 31 15:14:38 2017
11// Last Modified By : Peter A. Buhr
12// Last Modified On : Wed Aug 12 14:18:07 2020
13// Update Count : 13
14//
15
16#pragma once
17
18#include "bits/debug.hfa"
19#include "bits/defs.hfa"
20#include <assert.h>
21
22#ifdef __cforall
23 extern "C" {
24 #include <pthread.h>
25 }
26#endif
27
28// pause to prevent excess processor bus usage
29#if defined( __i386 ) || defined( __x86_64 )
30 #define Pause() __asm__ __volatile__ ( "pause" : : : )
31#elif defined( __ARM_ARCH )
32 #define Pause() __asm__ __volatile__ ( "YIELD" : : : )
33#else
34 #error unsupported architecture
35#endif
36
37struct __spinlock_t {
38 // Wrap in struct to prevent false sharing with debug info
39 volatile bool lock;
40 #ifdef __CFA_DEBUG__
41 // previous function to acquire the lock
42 const char * prev_name;
43 // previous thread to acquire the lock
44 void* prev_thrd;
45 #endif
46};
47
48#ifdef __cforall
49 extern "C" {
50 extern void disable_interrupts() OPTIONAL_THREAD;
51 extern void enable_interrupts_noPoll() OPTIONAL_THREAD;
52
53 #ifdef __CFA_DEBUG__
54 void __cfaabi_dbg_record_lock(__spinlock_t & this, const char prev_name[]);
55 #else
56 #define __cfaabi_dbg_record_lock(x, y)
57 #endif
58 }
59
60 static inline void ?{}( __spinlock_t & this ) {
61 this.lock = 0;
62 }
63
64 // Lock the spinlock, return false if already acquired
65 static inline bool try_lock ( __spinlock_t & this __cfaabi_dbg_ctx_param2 ) {
66 disable_interrupts();
67 bool result = (this.lock == 0) && (__atomic_test_and_set( &this.lock, __ATOMIC_ACQUIRE ) == 0);
68 if( result ) {
69 __cfaabi_dbg_record_lock( this, caller );
70 } else {
71 enable_interrupts_noPoll();
72 }
73 return result;
74 }
75
76 // Lock the spinlock, spin if already acquired
77 static inline void lock( __spinlock_t & this __cfaabi_dbg_ctx_param2 ) {
78 #ifndef NOEXPBACK
79 enum { SPIN_START = 4, SPIN_END = 64 * 1024, };
80 unsigned int spin = SPIN_START;
81 #endif
82
83 disable_interrupts();
84 for ( unsigned int i = 1;; i += 1 ) {
85 if ( (this.lock == 0) && (__atomic_test_and_set( &this.lock, __ATOMIC_ACQUIRE ) == 0) ) break;
86 #ifndef NOEXPBACK
87 // exponential spin
88 for ( volatile unsigned int s = 0; s < spin; s += 1 ) Pause();
89
90 // slowly increase by powers of 2
91 if ( i % 64 == 0 ) spin += spin;
92
93 // prevent overflow
94 if ( spin > SPIN_END ) spin = SPIN_START;
95 #else
96 Pause();
97 #endif
98 }
99 __cfaabi_dbg_record_lock( this, caller );
100 }
101
102 static inline void unlock( __spinlock_t & this ) {
103 __atomic_clear( &this.lock, __ATOMIC_RELEASE );
104 enable_interrupts_noPoll();
105 }
106
107
108 #ifdef __CFA_WITH_VERIFY__
109 extern bool __cfaabi_dbg_in_kernel();
110 #endif
111
112 extern "C" {
113 char * strerror(int);
114 }
115 #define CHECKED(x) { int err = x; if( err != 0 ) abort("KERNEL ERROR: Operation \"" #x "\" return error %d - %s\n", err, strerror(err)); }
116
117 struct __bin_sem_t {
118 pthread_mutex_t lock;
119 pthread_cond_t cond;
120 int val;
121 };
122
123 static inline void ?{}(__bin_sem_t & this) with( this ) {
124 // Create the mutex with error checking
125 pthread_mutexattr_t mattr;
126 pthread_mutexattr_init( &mattr );
127 pthread_mutexattr_settype( &mattr, PTHREAD_MUTEX_ERRORCHECK_NP);
128 pthread_mutex_init(&lock, &mattr);
129
130 pthread_cond_init (&cond, (const pthread_condattr_t *)0p); // workaround trac#208: cast should not be required
131 val = 0;
132 }
133
134 static inline void ^?{}(__bin_sem_t & this) with( this ) {
135 CHECKED( pthread_mutex_destroy(&lock) );
136 CHECKED( pthread_cond_destroy (&cond) );
137 }
138
139 static inline void wait(__bin_sem_t & this) with( this ) {
140 verify(__cfaabi_dbg_in_kernel());
141 CHECKED( pthread_mutex_lock(&lock) );
142 while(val < 1) {
143 pthread_cond_wait(&cond, &lock);
144 }
145 val -= 1;
146 CHECKED( pthread_mutex_unlock(&lock) );
147 }
148
149 static inline bool post(__bin_sem_t & this) with( this ) {
150 bool needs_signal = false;
151
152 CHECKED( pthread_mutex_lock(&lock) );
153 if(val < 1) {
154 val += 1;
155 pthread_cond_signal(&cond);
156 needs_signal = true;
157 }
158 CHECKED( pthread_mutex_unlock(&lock) );
159
160 return needs_signal;
161 }
162
163 #undef CHECKED
164
165 struct $thread;
166 extern void park( __cfaabi_dbg_ctx_param );
167 extern void unpark( struct $thread * this __cfaabi_dbg_ctx_param2 );
168 static inline struct $thread * active_thread ();
169
170 // Semaphore which only supports a single thread
171 struct single_sem {
172 struct $thread * volatile ptr;
173 };
174
175 static inline {
176 void ?{}(single_sem & this) {
177 this.ptr = 0p;
178 }
179
180 void ^?{}(single_sem & this) {}
181
182 bool wait(single_sem & this) {
183 for() {
184 struct $thread * expected = this.ptr;
185 if(expected == 1p) {
186 if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
187 return false;
188 }
189 }
190 else {
191 /* paranoid */ verify( expected == 0p );
192 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
193 park( __cfaabi_dbg_ctx );
194 return true;
195 }
196 }
197
198 }
199 }
200
201 bool post(single_sem & this) {
202 for() {
203 struct $thread * expected = this.ptr;
204 if(expected == 1p) return false;
205 if(expected == 0p) {
206 if(__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
207 return false;
208 }
209 }
210 else {
211 if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
212 unpark( expected __cfaabi_dbg_ctx2 );
213 return true;
214 }
215 }
216 }
217 }
218 }
219
220 // Synchronozation primitive which only supports a single thread and one post
221 // Similar to a binary semaphore with a 'one shot' semantic
222 // is expected to be discarded after each party call their side
223 struct oneshot {
224 // Internal state :
225 // 0p : is initial state (wait will block)
226 // 1p : fulfilled (wait won't block)
227 // any thread : a thread is currently waiting
228 struct $thread * volatile ptr;
229 };
230
231 static inline {
232 void ?{}(oneshot & this) {
233 this.ptr = 0p;
234 }
235
236 void ^?{}(oneshot & this) {}
237
238 // Wait for the post, return immidiately if it already happened.
239 // return true if the thread was parked
240 bool wait(oneshot & this) {
241 for() {
242 struct $thread * expected = this.ptr;
243 if(expected == 1p) return false;
244 /* paranoid */ verify( expected == 0p );
245 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
246 park( __cfaabi_dbg_ctx );
247 /* paranoid */ verify( this.ptr == 1p );
248 return true;
249 }
250 }
251 }
252
253 // Mark as fulfilled, wake thread if needed
254 // return true if a thread was unparked
255 bool post(oneshot & this) {
256 struct $thread * got = __atomic_exchange_n( &this.ptr, 1p, __ATOMIC_SEQ_CST);
257 if( got == 0p ) return false;
258 unpark( got __cfaabi_dbg_ctx2 );
259 return true;
260 }
261 }
262
263 // base types for future to build upon
264 // It is based on the 'oneshot' type to allow multiple futures
265 // to block on the same instance, permitting users to block a single
266 // thread on "any of" [a given set of] futures.
267 // does not support multiple threads waiting on the same future
268 struct future_t {
269 // Internal state :
270 // 0p : is initial state (wait will block)
271 // 1p : fulfilled (wait won't block)
272 // 2p : in progress ()
273 // 3p : abandoned, server should delete
274 // any oneshot : a context has been setup to wait, a thread could wait on it
275 struct oneshot * volatile ptr;
276 };
277
278 static inline {
279 void ?{}(future_t & this) {
280 this.ptr = 0p;
281 }
282
283 void ^?{}(future_t & this) {}
284
285 // check if the future is available
286 bool available( future_t & this ) {
287 return this.ptr == 1p;
288 }
289
290 // Prepare the future to be waited on
291 // intented to be use by wait, wait_any, waitfor, etc. rather than used directly
292 bool setup( future_t & this, oneshot & wait_ctx ) {
293 /* paranoid */ verify( wait_ctx.ptr == 0p );
294 // The future needs to set the wait context
295 for() {
296 struct oneshot * expected = this.ptr;
297 // Is the future already fulfilled?
298 if(expected == 1p) return false; // Yes, just return false (didn't block)
299
300 // The future is not fulfilled, try to setup the wait context
301 /* paranoid */ verify( expected == 0p );
302 if(__atomic_compare_exchange_n(&this.ptr, &expected, &wait_ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
303 return true;
304 }
305 }
306 }
307
308 // Stop waiting on a future
309 // When multiple futures are waited for together in "any of" pattern
310 // futures that weren't fulfilled before the thread woke up
311 // should retract the wait ctx
312 // intented to be use by wait, wait_any, waitfor, etc. rather than used directly
313 void retract( future_t & this, oneshot & wait_ctx ) {
314 // Remove the wait context
315 struct oneshot * got = __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST);
316
317 // got == 0p: future was never actually setup, just return
318 if( got == 0p ) return;
319
320 // got == wait_ctx: since fulfil does an atomic_swap,
321 // if we got back the original then no one else saw context
322 // It is safe to delete (which could happen after the return)
323 if( got == &wait_ctx ) return;
324
325 // got == 1p: the future is ready and the context was fully consumed
326 // the server won't use the pointer again
327 // It is safe to delete (which could happen after the return)
328 if( got == 1p ) return;
329
330 // got == 2p: the future is ready but the context hasn't fully been consumed
331 // spin until it is safe to move on
332 if( got == 2p ) {
333 while( this.ptr != 1p ) Pause();
334 return;
335 }
336
337 // got == any thing else, something wen't wrong here, abort
338 abort("Future in unexpected state");
339 }
340
341 // Mark the future as abandoned, meaning it will be deleted by the server
342 void abandon( future_t & this ) {
343 struct oneshot * got = __atomic_exchange_n( &this.ptr, 3p, __ATOMIC_SEQ_CST);
344
345 // got == 2p: the future is ready but the context hasn't fully been consumed
346 // spin until it is safe to move on
347 if( got == 2p ) {
348 while( this.ptr != 1p ) Pause();
349 }
350 return;
351 }
352
353 // from the server side, mark the future as fulfilled
354 // delete it if needed
355 bool fulfil( future_t & this ) {
356 for() {
357 struct oneshot * expected = this.ptr;
358 // was this abandoned?
359 if( expected == 3p ) { free( &this ); return false; }
360
361 /* paranoid */ verify( expected != 1p ); // Future is already fulfilled, should not happen
362 /* paranoid */ verify( expected != 2p ); // Future is bein fulfilled by someone else, this is even less supported then the previous case.
363
364 // If there is a wait context, we need to consume it and mark it as consumed after
365 // If there is no context then we can skip the in progress phase
366 struct oneshot * want = expected == 0p ? 1p : 2p;
367 if(__atomic_compare_exchange_n(&this.ptr, &expected, want, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
368 if( expected == 0p ) { /* paranoid */ verify( this.ptr == 1p); return false; }
369 bool ret = post( *expected );
370 __atomic_store_n( &this.ptr, 1p, __ATOMIC_SEQ_CST);
371 return ret;
372 }
373 }
374
375 }
376
377 // Wait for the future to be fulfilled
378 bool wait( future_t & this ) {
379 oneshot temp;
380 if( !setup(this, temp) ) return false;
381
382 // Wait context is setup, just wait on it
383 bool ret = wait( temp );
384
385 // Wait for the future to tru
386 while( this.ptr == 2p ) Pause();
387 // Make sure the state makes sense
388 // Should be fulfilled, could be in progress but it's out of date if so
389 // since if that is the case, the oneshot was fulfilled (unparking this thread)
390 // and the oneshot should not be needed any more
391 __attribute__((unused)) struct oneshot * was = this.ptr;
392 /* paranoid */ verifyf( was == 1p, "Expected this.ptr to be 1p, was %p\n", was );
393
394 // Mark the future as fulfilled, to be consistent
395 // with potential calls to avail
396 // this.ptr = 1p;
397 return ret;
398 }
399 }
400#endif
Note: See TracBrowser for help on using the repository browser.