source: libcfa/src/concurrency/future.hfa@ 9c8afc7

Last change on this file since 9c8afc7 was 00aa122, checked in by Peter A. Buhr <pabuhr@…>, 3 weeks ago

harmonize single_future with other future types, remove multi_future, marks its test as deprecated, and turn off its test

  • Property mode set to 100644
File size: 11.1 KB
RevLine 
[70f8bcd2]1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
[339e30a]7// concurrency/future.hfa --
[70f8bcd2]8//
[4a16ddfa]9// Author : Thierry Delisle & Peiran Hong & Colby Parsons & Peter Buhr
[70f8bcd2]10// Created On : Wed Jan 06 17:33:18 2021
[3483185]11// Last Modified By : Peter A. Buhr
[00aa122]12// Last Modified On : Mon Nov 24 16:08:52 2025
13// Update Count : 222
[70f8bcd2]14//
15
[5e4a830]16#pragma once
[70f8bcd2]17
18#include "bits/locks.hfa"
19#include "monitor.hfa"
[339e30a]20#include "select.hfa"
[beeff61e]21#include "locks.hfa"
[70f8bcd2]22
[4a16ddfa]23//--------------------------------------------------------------------------------------------------------
24// future does not use future_t as it needs a lock to support multiple consumers. future_t is lockfree
25// and uses atomics which are not needed.
26//--------------------------------------------------------------------------------------------------------
27
[fd54fef]28forall( T ) {
[8ffee9a]29 // PRIVATE
30
31 struct future_node$ {
32 inline select_node;
33 T * my_result;
34 };
35
36 static inline {
37 // memcpy wrapper to help copy values
38 void copy_T$( T & to, T & from ) { memcpy( (void *)&to, (void *)&from, sizeof(T) ); }
39 } // distribution
40
41 enum { FUTURE_EMPTY$ = 0, FUTURE_FULFILLED$ = 1 };
42
43 // PUBLIC
[339e30a]44
[70f8bcd2]45 struct future {
[339e30a]46 int state;
47 T result;
[3483185]48 exception_t * except;
49 futex_mutex lock;
[8ffee9a]50 dlist( select_node ) waiters;
[339e30a]51 };
[8ffee9a]52 __CFA_SELECT_GET_TYPE( future(T) ); // magic
[339e30a]53
54 static inline {
[8ffee9a]55 // PRIVATE
[339e30a]56
[fbaea970]57 bool register_select$( future(T) & fut, select_node & s ) with( fut ) { // for waituntil statement
[3483185]58 lock( lock );
[339e30a]59
[8ffee9a]60 // check if we can complete operation. If so race to establish winner in special OR case
61 if ( !s.park_counter && state != FUTURE_EMPTY$ ) {
62 if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering
63 unlock( lock );
64 return false;
65 }
66 }
[339e30a]67
[8ffee9a]68 // future not ready -> insert select node and return
69 if ( state == FUTURE_EMPTY$ ) {
70 insert_last( waiters, s );
71 unlock( lock );
72 return false;
73 }
[339e30a]74
[8ffee9a]75 __make_select_node_available( s );
76 unlock( lock );
77 return true;
[3483185]78 }
[339e30a]79
[fbaea970]80 bool unregister_select$( future(T) & fut, select_node & s ) with( fut ) { // for waituntil statement
[8ffee9a]81 if ( ! isListed( s ) ) return false;
82 lock( lock );
83 if ( isListed( s ) ) remove( s );
[3483185]84 unlock( lock );
[8ffee9a]85 return false;
[3483185]86 }
[339e30a]87
[fbaea970]88 bool on_selected$( future(T) &, select_node & ) { return true; } // for waituntil statement
[3483185]89
[8ffee9a]90 // PUBLIC
[3483185]91
[8ffee9a]92 // General
93
94 void ?{}( future_node$(T) & fut, thread$ * blocked_thread, T * my_result ) {
95 ((select_node &)fut){ blocked_thread };
96 fut.my_result = my_result;
[3483185]97 }
[339e30a]98
[8ffee9a]99 void ?{}( future(T) & fut ) with( fut ) {
100 except = 0p;
101 state = FUTURE_EMPTY$;
102 }
[3483185]103
[8ffee9a]104 void ^?{}( future(T) & fut ) with( fut ) {
105 free( except );
[3483185]106 }
[339e30a]107
[8ffee9a]108 // Used by Client
[339e30a]109
[8ffee9a]110 // PRIVATE
111
112 // Return a value/exception from the future.
113 T get$( future(T) & fut ) with( fut ) { // helper
[3483185]114 void exceptCheck() { // helper
115 if ( except ) {
116 exception_t * ex = ( exception_t * ) alloca( except->virtual_table->size );
117 except->virtual_table->copy( ex, except );
118 unlock( lock );
119 throwResume * ex;
120 }
121 }
122 T ret_val;
[8ffee9a]123
124 // LOCK ACQUIRED IN PUBLIC get
125 if ( state == FUTURE_FULFILLED$ ) {
[3483185]126 exceptCheck();
[8ffee9a]127 copy_T$( ret_val, result );
[3483185]128 unlock( lock );
[8ffee9a]129 return ret_val;
[3483185]130 }
131
[8ffee9a]132 future_node$(T) node = { active_thread(), &ret_val };
[3483185]133 insert_last( waiters, ((select_node &)node) );
134 unlock( lock );
135 park( );
136 exceptCheck();
[8ffee9a]137 return ret_val;
[339e30a]138 }
139
[8ffee9a]140 // PUBLIC
141
142 bool available( future( T ) & fut ) { return __atomic_load_n( &fut.state, __ATOMIC_RELAXED ); } // future result available ?
143
144 // Return a value/exception from the future.
145 [T, bool] get( future(T) & fut ) with( fut ) {
146 lock( lock );
147 bool ret = state == FUTURE_EMPTY$;
148 return [ get$( fut ), ret ];
[339e30a]149 }
150
[8ffee9a]151 T get( future(T) & fut ) with( fut ) {
152 lock( lock );
153 return get$( fut );
[3483185]154 }
[8ffee9a]155 T ?()( future(T) & fut ) { return get( fut ); } // alternate interface
[3483185]156
[8ffee9a]157 // Non-blocking get: true => return defined value, false => value return undefined.
158 [T, bool] try_get( future(T) & fut ) with( fut ) {
[3483185]159 lock( lock );
160 T ret_val;
[8ffee9a]161 if ( state == FUTURE_FULFILLED$ ) {
162 copy_T$( ret_val, result );
[3483185]163 unlock( lock );
164 return [ret_val, true];
165 }
166 unlock( lock );
167 return [ret_val, false];
168 }
169
[8ffee9a]170 // Used by Server
[3483185]171
[8ffee9a]172 // PRIVATE
[3483185]173
[8ffee9a]174 bool fulfil$( future(T) & fut ) with( fut ) { // helper
175 bool ret_val = ! isEmpty( waiters );
176 state = FUTURE_FULFILLED$;
177 while ( ! isEmpty( waiters ) ) {
178 if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case
179 break; // if handle_OR returns false then waiters is empty so break
180 select_node &s = remove_first( waiters );
[3483185]181
[8ffee9a]182 if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks
183 copy_T$( *(((future_node$(T) &)s).my_result), result );
184
185 wake_one( waiters, s );
186 }
[3483185]187 unlock( lock );
[8ffee9a]188 return ret_val;
[3483185]189 }
190
[8ffee9a]191 // PUBLIC
192
193 // Load a value/exception into the future, returns whether or not waiting threads.
194 bool fulfil( future(T) & fut, T val ) with( fut ) {
[3483185]195 lock( lock );
[8ffee9a]196 if ( state != FUTURE_EMPTY$ ) abort("Attempting to fulfil a future that has already been fulfilled");
197 copy_T$( result, val );
198 return fulfil$( fut );
[3483185]199 }
[8ffee9a]200 bool ?()( future(T) & fut, T val ) { return fulfil( fut, val ); } // alternate interface
[3483185]201
[8ffee9a]202 bool fulfil( future(T) & fut, exception_t * ex ) with( fut ) {
203 lock( lock );
204 if ( state != FUTURE_EMPTY$ ) abort( "Attempting to fulfil a future that has already been fulfilled" );
205 except = ( exception_t * ) malloc( ex->virtual_table->size );
206 ex->virtual_table->copy( except, ex );
207 return fulfil$( fut );
208 }
209 bool ?()( future(T) & fut, exception_t * ex ) { return fulfil( fut, ex ); } // alternate interface
210
211 void reset( future(T) & fut ) with( fut ) { // mark future as empty (for reuse)
212 lock( lock );
213 if ( ! isEmpty( waiters ) ) abort( "Attempting to reset a future with blocked waiters" );
214 state = FUTURE_EMPTY$;
215 free( except );
216 except = 0p;
217 unlock( lock );
218 }
219 } // static inline
220} // forall( T )
[339e30a]221
222//--------------------------------------------------------------------------------------------------------
[4a16ddfa]223// future_rc uses reference counting to eliminate explicit storage-management and support the waituntil
224// statement.
225//--------------------------------------------------------------------------------------------------------
226
227forall( T ) {
[8ffee9a]228 // PRIVATE
229
[4a16ddfa]230 struct future_rc_impl$ {
231 futex_mutex lock; // concurrent protection
232 size_t refCnt; // number of references to future
233 future(T) fut; // underlying future
234 }; // future_rc_impl$
235
236 static inline {
[8ffee9a]237 size_t incRef$( future_rc_impl$( T ) & impl ) with( impl ) {
238 return __atomic_fetch_add( &refCnt, 1, __ATOMIC_SEQ_CST );
[4a16ddfa]239 } // incRef$
240
[8ffee9a]241 size_t decRef$( future_rc_impl$( T ) & impl ) with( impl ) {
242 return __atomic_fetch_add( &refCnt, -1, __ATOMIC_SEQ_CST );
[4a16ddfa]243 } // decRef$
244
245 void ?{}( future_rc_impl$( T ) & frc ) with( frc ) {
[8ffee9a]246 refCnt = 1; // count initial object
[4a16ddfa]247 } // ?{}
248 } // static inline
249
[8ffee9a]250 // PUBLIC
251
[4a16ddfa]252 struct future_rc {
253 future_rc_impl$(T) * impl;
254 }; // future_rc
[8ffee9a]255 __CFA_SELECT_GET_TYPE( future_rc(T) ); // magic
[4a16ddfa]256
257 static inline {
[8ffee9a]258 // PRIVATE
259
[fbaea970]260 bool register_select$( future_rc(T) & frc, select_node & s ) with( frc ) { // for waituntil statement
261 return register_select$( frc.impl->fut, s );
[8ffee9a]262 }
263
[fbaea970]264 bool unregister_select$( future_rc(T) & frc, select_node & s ) with( frc ) { // for waituntil statement
265 return unregister_select$( frc.impl->fut, s );
[8ffee9a]266 }
267
[fbaea970]268 bool on_selected$( future_rc(T) &, select_node & ) { return true; } // for waituntil statement
[8ffee9a]269
270 // PUBLIC
271
272 // General
273
274 void ?{}( future_rc( T ) & frc ) with( frc ) { // default constructor
[4a16ddfa]275 impl = new();
276 } // ?{}
277
[8ffee9a]278 void ?{}( future_rc( T ) & to, future_rc( T ) & from ) with( to ) { // copy constructor
[4a16ddfa]279 impl = from.impl; // point at new impl
280 incRef$( *impl );
281 } // ?{}
282
283 void ^?{}( future_rc( T ) & frc ) with( frc ) {
[8ffee9a]284 if ( decRef$( *impl ) == 1 ) { delete( impl ); impl = 0p; }
[4a16ddfa]285 } // ^?{}
286
287 future_rc( T ) & ?=?( future_rc( T ) & lhs, future_rc( T ) & rhs ) with( lhs ) {
288 if ( impl == rhs.impl ) return lhs; // self assignment ?
[8ffee9a]289 if ( decRef$( *impl ) == 1 ) { delete( impl ); impl = 0p; } // no references ? => delete current impl
[4a16ddfa]290 impl = rhs.impl; // point at new impl
291 incRef$( *impl ); // and increment reference count
292 return lhs;
293 } // ?+?
294
[8ffee9a]295 // Used by Client
[4a16ddfa]296
297 bool available( future_rc( T ) & frc ) { return available( frc.impl->fut ); } // future result available ?
298
[8ffee9a]299 // Return a value/exception from the future.
300 [T, bool] get( future_rc(T) & frc ) with( frc ) { return get( impl->fut ); } // return future value
301 T get( future_rc(T) & frc ) with( frc ) { return get( impl->fut ); } // return future value
302 T ?()( future_rc(T) & frc ) with( frc ) { return get( frc ); } // alternate interface
303 [T, bool] try_get( future_rc(T) & frc ) with( frc ) { return try_get( impl->fut ); }
[4a16ddfa]304
305 int ?==?( future_rc( T ) & lhs, future_rc( T ) & rhs ) { return lhs.impl == rhs.impl; } // referential equality
306
[8ffee9a]307 // Used by Server
[4a16ddfa]308
[8ffee9a]309 // Load a value/exception into the future, returns whether or not waiting threads.
310 bool fulfil( future_rc(T) & frc, T val ) with( frc ) { return fulfil( impl->fut, val ); } // copy-in future value
311 bool ?()( future_rc(T) & frc, T val ) { return fulfil( frc, val ); } // alternate interface
[4a16ddfa]312
[8ffee9a]313 bool fulfil( future_rc(T) & frc, exception_t * ex ) with( frc ) { return fulfil( impl->fut, ex ); } // insert future exception
[4a16ddfa]314 bool ?()( future_rc(T) & frc, exception_t * ex ) { return fulfil( frc, ex ); } // alternate interface
315
316 void reset( future_rc(T) & frc ) with( frc ) { reset( impl->fut ); } // mark future as empty (for reuse)
317 } // static inline
318} // forall( T )
319
320//--------------------------------------------------------------------------------------------------------
[00aa122]321// This future does not support waituntil statements so it does not have as many features as 'future'.
322// However, it is cheap and cheerful and is more performant than 'future' since it uses raw atomics
323// and no locks
[339e30a]324//--------------------------------------------------------------------------------------------------------
325
326forall( T ) {
[00aa122]327 // PUBLIC
[70f8bcd2]328
[00aa122]329 struct single_future {
[70f8bcd2]330 inline future_t;
331 T result;
332 };
333
334 static inline {
[00aa122]335 // PUBLIC
[70f8bcd2]336
[00aa122]337 bool available( single_future(T) & fut ) { return available( (future_t &)fut ); } // future result available ?
[70f8bcd2]338
[00aa122]339 // Return a value/exception from the future.
340 [T, bool] get( single_future(T) & fut ) { return [fut.result, wait( fut )]; }
341 T get( single_future(T) & fut ) { wait( fut ); return fut.result; }
342 T ?()( single_future(T) & fut ) { return get( fut ); } // alternate interface
343
344 // Load a value into the future, returns whether or not waiting threads.
345 bool fulfil( single_future(T) & fut, T result ) {
346 fut.result = result;
347 return fulfil( (future_t &)fut ) != 0p;
[70f8bcd2]348 }
[00aa122]349 bool ?()( single_future(T) & fut, T val ) { return fulfil( fut, val ); } // alternate interface
[70f8bcd2]350
[00aa122]351 void reset( single_future(T) & fut ) { reset( (future_t &)fut ); } // mark future as empty (for reuse)
352 } // static inline
353} // forall( T )
Note: See TracBrowser for help on using the repository browser.