source: libcfa/src/concurrency/future.hfa@ 9c8afc7

Last change on this file since 9c8afc7 was 00aa122, checked in by Peter A. Buhr <pabuhr@…>, 3 weeks ago

harmonize single_future with other future types, remove multi_future, marks its test as deprecated, and turn off its test

  • Property mode set to 100644
File size: 11.1 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// concurrency/future.hfa --
8//
9// Author : Thierry Delisle & Peiran Hong & Colby Parsons & Peter Buhr
10// Created On : Wed Jan 06 17:33:18 2021
11// Last Modified By : Peter A. Buhr
12// Last Modified On : Mon Nov 24 16:08:52 2025
13// Update Count : 222
14//
15
16#pragma once
17
18#include "bits/locks.hfa"
19#include "monitor.hfa"
20#include "select.hfa"
21#include "locks.hfa"
22
23//--------------------------------------------------------------------------------------------------------
24// future does not use future_t as it needs a lock to support multiple consumers. future_t is lockfree
25// and uses atomics which are not needed.
26//--------------------------------------------------------------------------------------------------------
27
28forall( T ) {
29 // PRIVATE
30
31 struct future_node$ {
32 inline select_node;
33 T * my_result;
34 };
35
36 static inline {
37 // memcpy wrapper to help copy values
38 void copy_T$( T & to, T & from ) { memcpy( (void *)&to, (void *)&from, sizeof(T) ); }
39 } // distribution
40
41 enum { FUTURE_EMPTY$ = 0, FUTURE_FULFILLED$ = 1 };
42
43 // PUBLIC
44
45 struct future {
46 int state;
47 T result;
48 exception_t * except;
49 futex_mutex lock;
50 dlist( select_node ) waiters;
51 };
52 __CFA_SELECT_GET_TYPE( future(T) ); // magic
53
54 static inline {
55 // PRIVATE
56
57 bool register_select$( future(T) & fut, select_node & s ) with( fut ) { // for waituntil statement
58 lock( lock );
59
60 // check if we can complete operation. If so race to establish winner in special OR case
61 if ( !s.park_counter && state != FUTURE_EMPTY$ ) {
62 if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering
63 unlock( lock );
64 return false;
65 }
66 }
67
68 // future not ready -> insert select node and return
69 if ( state == FUTURE_EMPTY$ ) {
70 insert_last( waiters, s );
71 unlock( lock );
72 return false;
73 }
74
75 __make_select_node_available( s );
76 unlock( lock );
77 return true;
78 }
79
80 bool unregister_select$( future(T) & fut, select_node & s ) with( fut ) { // for waituntil statement
81 if ( ! isListed( s ) ) return false;
82 lock( lock );
83 if ( isListed( s ) ) remove( s );
84 unlock( lock );
85 return false;
86 }
87
88 bool on_selected$( future(T) &, select_node & ) { return true; } // for waituntil statement
89
90 // PUBLIC
91
92 // General
93
94 void ?{}( future_node$(T) & fut, thread$ * blocked_thread, T * my_result ) {
95 ((select_node &)fut){ blocked_thread };
96 fut.my_result = my_result;
97 }
98
99 void ?{}( future(T) & fut ) with( fut ) {
100 except = 0p;
101 state = FUTURE_EMPTY$;
102 }
103
104 void ^?{}( future(T) & fut ) with( fut ) {
105 free( except );
106 }
107
108 // Used by Client
109
110 // PRIVATE
111
112 // Return a value/exception from the future.
113 T get$( future(T) & fut ) with( fut ) { // helper
114 void exceptCheck() { // helper
115 if ( except ) {
116 exception_t * ex = ( exception_t * ) alloca( except->virtual_table->size );
117 except->virtual_table->copy( ex, except );
118 unlock( lock );
119 throwResume * ex;
120 }
121 }
122 T ret_val;
123
124 // LOCK ACQUIRED IN PUBLIC get
125 if ( state == FUTURE_FULFILLED$ ) {
126 exceptCheck();
127 copy_T$( ret_val, result );
128 unlock( lock );
129 return ret_val;
130 }
131
132 future_node$(T) node = { active_thread(), &ret_val };
133 insert_last( waiters, ((select_node &)node) );
134 unlock( lock );
135 park( );
136 exceptCheck();
137 return ret_val;
138 }
139
140 // PUBLIC
141
142 bool available( future( T ) & fut ) { return __atomic_load_n( &fut.state, __ATOMIC_RELAXED ); } // future result available ?
143
144 // Return a value/exception from the future.
145 [T, bool] get( future(T) & fut ) with( fut ) {
146 lock( lock );
147 bool ret = state == FUTURE_EMPTY$;
148 return [ get$( fut ), ret ];
149 }
150
151 T get( future(T) & fut ) with( fut ) {
152 lock( lock );
153 return get$( fut );
154 }
155 T ?()( future(T) & fut ) { return get( fut ); } // alternate interface
156
157 // Non-blocking get: true => return defined value, false => value return undefined.
158 [T, bool] try_get( future(T) & fut ) with( fut ) {
159 lock( lock );
160 T ret_val;
161 if ( state == FUTURE_FULFILLED$ ) {
162 copy_T$( ret_val, result );
163 unlock( lock );
164 return [ret_val, true];
165 }
166 unlock( lock );
167 return [ret_val, false];
168 }
169
170 // Used by Server
171
172 // PRIVATE
173
174 bool fulfil$( future(T) & fut ) with( fut ) { // helper
175 bool ret_val = ! isEmpty( waiters );
176 state = FUTURE_FULFILLED$;
177 while ( ! isEmpty( waiters ) ) {
178 if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case
179 break; // if handle_OR returns false then waiters is empty so break
180 select_node &s = remove_first( waiters );
181
182 if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks
183 copy_T$( *(((future_node$(T) &)s).my_result), result );
184
185 wake_one( waiters, s );
186 }
187 unlock( lock );
188 return ret_val;
189 }
190
191 // PUBLIC
192
193 // Load a value/exception into the future, returns whether or not waiting threads.
194 bool fulfil( future(T) & fut, T val ) with( fut ) {
195 lock( lock );
196 if ( state != FUTURE_EMPTY$ ) abort("Attempting to fulfil a future that has already been fulfilled");
197 copy_T$( result, val );
198 return fulfil$( fut );
199 }
200 bool ?()( future(T) & fut, T val ) { return fulfil( fut, val ); } // alternate interface
201
202 bool fulfil( future(T) & fut, exception_t * ex ) with( fut ) {
203 lock( lock );
204 if ( state != FUTURE_EMPTY$ ) abort( "Attempting to fulfil a future that has already been fulfilled" );
205 except = ( exception_t * ) malloc( ex->virtual_table->size );
206 ex->virtual_table->copy( except, ex );
207 return fulfil$( fut );
208 }
209 bool ?()( future(T) & fut, exception_t * ex ) { return fulfil( fut, ex ); } // alternate interface
210
211 void reset( future(T) & fut ) with( fut ) { // mark future as empty (for reuse)
212 lock( lock );
213 if ( ! isEmpty( waiters ) ) abort( "Attempting to reset a future with blocked waiters" );
214 state = FUTURE_EMPTY$;
215 free( except );
216 except = 0p;
217 unlock( lock );
218 }
219 } // static inline
220} // forall( T )
221
222//--------------------------------------------------------------------------------------------------------
223// future_rc uses reference counting to eliminate explicit storage-management and support the waituntil
224// statement.
225//--------------------------------------------------------------------------------------------------------
226
227forall( T ) {
228 // PRIVATE
229
230 struct future_rc_impl$ {
231 futex_mutex lock; // concurrent protection
232 size_t refCnt; // number of references to future
233 future(T) fut; // underlying future
234 }; // future_rc_impl$
235
236 static inline {
237 size_t incRef$( future_rc_impl$( T ) & impl ) with( impl ) {
238 return __atomic_fetch_add( &refCnt, 1, __ATOMIC_SEQ_CST );
239 } // incRef$
240
241 size_t decRef$( future_rc_impl$( T ) & impl ) with( impl ) {
242 return __atomic_fetch_add( &refCnt, -1, __ATOMIC_SEQ_CST );
243 } // decRef$
244
245 void ?{}( future_rc_impl$( T ) & frc ) with( frc ) {
246 refCnt = 1; // count initial object
247 } // ?{}
248 } // static inline
249
250 // PUBLIC
251
252 struct future_rc {
253 future_rc_impl$(T) * impl;
254 }; // future_rc
255 __CFA_SELECT_GET_TYPE( future_rc(T) ); // magic
256
257 static inline {
258 // PRIVATE
259
260 bool register_select$( future_rc(T) & frc, select_node & s ) with( frc ) { // for waituntil statement
261 return register_select$( frc.impl->fut, s );
262 }
263
264 bool unregister_select$( future_rc(T) & frc, select_node & s ) with( frc ) { // for waituntil statement
265 return unregister_select$( frc.impl->fut, s );
266 }
267
268 bool on_selected$( future_rc(T) &, select_node & ) { return true; } // for waituntil statement
269
270 // PUBLIC
271
272 // General
273
274 void ?{}( future_rc( T ) & frc ) with( frc ) { // default constructor
275 impl = new();
276 } // ?{}
277
278 void ?{}( future_rc( T ) & to, future_rc( T ) & from ) with( to ) { // copy constructor
279 impl = from.impl; // point at new impl
280 incRef$( *impl );
281 } // ?{}
282
283 void ^?{}( future_rc( T ) & frc ) with( frc ) {
284 if ( decRef$( *impl ) == 1 ) { delete( impl ); impl = 0p; }
285 } // ^?{}
286
287 future_rc( T ) & ?=?( future_rc( T ) & lhs, future_rc( T ) & rhs ) with( lhs ) {
288 if ( impl == rhs.impl ) return lhs; // self assignment ?
289 if ( decRef$( *impl ) == 1 ) { delete( impl ); impl = 0p; } // no references ? => delete current impl
290 impl = rhs.impl; // point at new impl
291 incRef$( *impl ); // and increment reference count
292 return lhs;
293 } // ?+?
294
295 // Used by Client
296
297 bool available( future_rc( T ) & frc ) { return available( frc.impl->fut ); } // future result available ?
298
299 // Return a value/exception from the future.
300 [T, bool] get( future_rc(T) & frc ) with( frc ) { return get( impl->fut ); } // return future value
301 T get( future_rc(T) & frc ) with( frc ) { return get( impl->fut ); } // return future value
302 T ?()( future_rc(T) & frc ) with( frc ) { return get( frc ); } // alternate interface
303 [T, bool] try_get( future_rc(T) & frc ) with( frc ) { return try_get( impl->fut ); }
304
305 int ?==?( future_rc( T ) & lhs, future_rc( T ) & rhs ) { return lhs.impl == rhs.impl; } // referential equality
306
307 // Used by Server
308
309 // Load a value/exception into the future, returns whether or not waiting threads.
310 bool fulfil( future_rc(T) & frc, T val ) with( frc ) { return fulfil( impl->fut, val ); } // copy-in future value
311 bool ?()( future_rc(T) & frc, T val ) { return fulfil( frc, val ); } // alternate interface
312
313 bool fulfil( future_rc(T) & frc, exception_t * ex ) with( frc ) { return fulfil( impl->fut, ex ); } // insert future exception
314 bool ?()( future_rc(T) & frc, exception_t * ex ) { return fulfil( frc, ex ); } // alternate interface
315
316 void reset( future_rc(T) & frc ) with( frc ) { reset( impl->fut ); } // mark future as empty (for reuse)
317 } // static inline
318} // forall( T )
319
320//--------------------------------------------------------------------------------------------------------
321// This future does not support waituntil statements so it does not have as many features as 'future'.
322// However, it is cheap and cheerful and is more performant than 'future' since it uses raw atomics
323// and no locks
324//--------------------------------------------------------------------------------------------------------
325
326forall( T ) {
327 // PUBLIC
328
329 struct single_future {
330 inline future_t;
331 T result;
332 };
333
334 static inline {
335 // PUBLIC
336
337 bool available( single_future(T) & fut ) { return available( (future_t &)fut ); } // future result available ?
338
339 // Return a value/exception from the future.
340 [T, bool] get( single_future(T) & fut ) { return [fut.result, wait( fut )]; }
341 T get( single_future(T) & fut ) { wait( fut ); return fut.result; }
342 T ?()( single_future(T) & fut ) { return get( fut ); } // alternate interface
343
344 // Load a value into the future, returns whether or not waiting threads.
345 bool fulfil( single_future(T) & fut, T result ) {
346 fut.result = result;
347 return fulfil( (future_t &)fut ) != 0p;
348 }
349 bool ?()( single_future(T) & fut, T val ) { return fulfil( fut, val ); } // alternate interface
350
351 void reset( single_future(T) & fut ) { reset( (future_t &)fut ); } // mark future as empty (for reuse)
352 } // static inline
353} // forall( T )
Note: See TracBrowser for help on using the repository browser.