source: libcfa/src/concurrency/future.hfa@ 822ae48

Last change on this file since 822ae48 was 4a16ddfa, checked in by Peter A. Buhr <pabuhr@…>, 6 days ago

add reference counting futures for use with waituntil

  • Property mode set to 100644
File size: 12.5 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// concurrency/future.hfa --
8//
9// Author : Thierry Delisle & Peiran Hong & Colby Parsons & Peter Buhr
10// Created On : Wed Jan 06 17:33:18 2021
11// Last Modified By : Peter A. Buhr
12// Last Modified On : Mon Nov 17 08:58:38 2025
13// Update Count : 164
14//
15
16#pragma once
17
18#include "bits/locks.hfa"
19#include "monitor.hfa"
20#include "select.hfa"
21#include "locks.hfa"
22
23//--------------------------------------------------------------------------------------------------------
24// future does not use future_t as it needs a lock to support multiple consumers. future_t is lockfree
25// and uses atomics which are not needed.
26//--------------------------------------------------------------------------------------------------------
27
28forall( T ) {
29 enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 };
30
31 struct future {
32 int state;
33 T result;
34 exception_t * except;
35 dlist( select_node ) waiters;
36 futex_mutex lock;
37 };
38 __CFA_SELECT_GET_TYPE( future(T) );
39
40 struct future_node {
41 inline select_node;
42 T * my_result;
43 };
44
45 static inline {
46 void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) {
47 ((select_node &)this){ blocked_thread };
48 this.my_result = my_result;
49 }
50
51 void ?{}( future(T) & this ) with( this ) {
52 waiters{};
53 except = 0p;
54 state = FUTURE_EMPTY;
55 lock{};
56 }
57
58 void ^?{}( future(T) & this ) with( this ) {
59 free( except );
60 }
61
62 // Reset future back to original state
63 void reset( future(T) & this ) with(this) {
64 lock( lock );
65 if ( ! isEmpty( waiters ) )
66 abort("Attempting to reset a future with blocked waiters");
67 state = FUTURE_EMPTY;
68 free( except );
69 except = 0p;
70 unlock( lock );
71 }
72
73 // check if the future is available
74 // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected
75 bool available( future(T) & this ) { return __atomic_load_n( &this.state, __ATOMIC_RELAXED ); }
76
77
78 // memcpy wrapper to help copy values
79 void copy_T$( T & from, T & to ) {
80 memcpy((void *)&to, (void *)&from, sizeof(T));
81 }
82
83 bool fulfil$( future(T) & this ) with(this) { // helper
84 bool ret_val = ! isEmpty( waiters );
85 state = FUTURE_FULFILLED;
86 while ( ! isEmpty( waiters ) ) {
87 if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case
88 break; // if handle_OR returns false then waiters is empty so break
89 select_node &s = remove_first( waiters );
90
91 if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks
92 copy_T$( result, *(((future_node(T) &)s).my_result) );
93
94 wake_one( waiters, s );
95 }
96 unlock( lock );
97 return ret_val;
98 }
99
100 // Fulfil the future, returns whether or not someone was unblocked
101 bool fulfil( future(T) & this, T val ) with(this) {
102 lock( lock );
103 if ( state != FUTURE_EMPTY )
104 abort("Attempting to fulfil a future that has already been fulfilled");
105
106 copy_T$( val, result );
107 return fulfil$( this );
108 }
109
110 bool ?()( future(T) & this, T val ) { // alternate interface
111 return fulfil( this, val );
112 }
113
114 // Load an exception to the future, returns whether or not someone was unblocked
115 bool fulfil( future(T) & this, exception_t * ex ) with(this) {
116 lock( lock );
117 if ( state != FUTURE_EMPTY )
118 abort("Attempting to fulfil a future that has already been fulfilled");
119
120 except = ( exception_t * ) malloc( ex->virtual_table->size );
121 ex->virtual_table->copy( except, ex );
122 return fulfil$( this );
123 }
124
125 bool ?()( future(T) & this, exception_t * ex ) { // alternate interface
126 return fulfil( this, ex );
127 }
128
129 // Wait for the future to be fulfilled
130 // Also return whether the thread had to block or not
131 [T, bool] get( future(T) & this ) with( this ) {
132 void exceptCheck() { // helper
133 if ( except ) {
134 exception_t * ex = ( exception_t * ) alloca( except->virtual_table->size );
135 except->virtual_table->copy( ex, except );
136 unlock( lock );
137 throwResume * ex;
138 }
139 }
140
141 lock( lock );
142 T ret_val;
143 if ( state == FUTURE_FULFILLED ) {
144 exceptCheck();
145 copy_T$( result, ret_val );
146 unlock( lock );
147 return [ret_val, false];
148 }
149
150 future_node(T) node = { active_thread(), &ret_val };
151 insert_last( waiters, ((select_node &)node) );
152 unlock( lock );
153 park( );
154 exceptCheck();
155
156 return [ret_val, true];
157 }
158
159 // Wait for the future to be fulfilled
160 T get( future(T) & this ) {
161 [T, bool] tt;
162 tt = get(this);
163 return tt.0;
164 }
165
166 T ?()( future(T) & this ) { // alternate interface
167 return get( this );
168 }
169
170 // Gets value if it is available and returns [ val, true ]
171 // otherwise returns [ default_val, false]
172 // will not block
173 [T, bool] try_get( future(T) & this ) with(this) {
174 lock( lock );
175 T ret_val;
176 if ( state == FUTURE_FULFILLED ) {
177 copy_T$( result, ret_val );
178 unlock( lock );
179 return [ret_val, true];
180 }
181 unlock( lock );
182 return [ret_val, false];
183 }
184
185 bool register_select( future(T) & this, select_node & s ) with(this) {
186 lock( lock );
187
188 // check if we can complete operation. If so race to establish winner in special OR case
189 if ( !s.park_counter && state != FUTURE_EMPTY ) {
190 if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering
191 unlock( lock );
192 return false;
193 }
194 }
195
196 // future not ready -> insert select node and return
197 if ( state == FUTURE_EMPTY ) {
198 insert_last( waiters, s );
199 unlock( lock );
200 return false;
201 }
202
203 __make_select_node_available( s );
204 unlock( lock );
205 return true;
206 }
207
208 bool unregister_select( future(T) & this, select_node & s ) with(this) {
209 if ( ! isListed( s ) ) return false;
210 lock( lock );
211 if ( isListed( s ) ) remove( s );
212 unlock( lock );
213 return false;
214 }
215
216 bool on_selected( future(T) &, select_node & ) { return true; }
217 }
218}
219
220//--------------------------------------------------------------------------------------------------------
221// future_rc uses reference counting to eliminate explicit storage-management and support the waituntil
222// statement.
223//--------------------------------------------------------------------------------------------------------
224
225forall( T ) {
226 struct future_rc_impl$ {
227 futex_mutex lock; // concurrent protection
228 size_t refCnt; // number of references to future
229 future(T) fut; // underlying future
230 }; // future_rc_impl$
231
232 static inline {
233 void incRef$( future_rc_impl$( T ) & impl ) with( impl ) {
234 __atomic_fetch_add( &refCnt, 1, __ATOMIC_RELAXED );
235// lock( lock );
236// refCnt += 1;
237// unlock( lock );
238 } // incRef$
239
240 bool decRef$( future_rc_impl$( T ) & impl ) with( impl ) {
241 return __atomic_fetch_add( &refCnt, -1, __ATOMIC_RELAXED ) == 1;
242 // lock( lock );
243 // refCnt -= 1;
244 // bool ret = refCnt == 0;
245 // unlock( lock );
246 // return ret;
247 } // decRef$
248
249 void ?{}( future_rc_impl$( T ) & frc ) with( frc ) {
250 lock{}; // intialization
251 refCnt = 1;
252 } // ?{}
253
254 void ^?{}( future_rc_impl$( T ) & frc ) with( frc ) {
255 decRef$( frc );
256 } // ^?{}
257 } // static inline
258
259 struct future_rc {
260 future_rc_impl$(T) * impl;
261 }; // future_rc
262 __CFA_SELECT_GET_TYPE( future_rc(T) );
263
264 static inline {
265 void ?{}( future_rc( T ) & frc ) with( frc ) {
266 impl = new();
267 } // ?{}
268
269 void ?{}( future_rc( T ) & to, future_rc( T ) & from ) with( to ) {
270 impl = from.impl; // point at new impl
271 incRef$( *impl );
272 } // ?{}
273
274 void ^?{}( future_rc( T ) & frc ) with( frc ) {
275 if ( decRef$( *impl ) ) { delete( impl ); impl = 0p; }
276 } // ^?{}
277
278 future_rc( T ) & ?=?( future_rc( T ) & lhs, future_rc( T ) & rhs ) with( lhs ) {
279 if ( impl == rhs.impl ) return lhs; // self assignment ?
280 if ( decRef$( *impl ) ) { delete( impl ); impl = 0p; } // no references => delete current impl
281 impl = rhs.impl; // point at new impl
282 incRef$( *impl ); // and increment reference count
283 return lhs;
284 } // ?+?
285
286 bool register_select( future_rc(T) & this, select_node & s ) with( this ) {
287 return register_select( this.impl->fut, s );
288 }
289
290 bool unregister_select( future_rc(T) & this, select_node & s ) with( this ) {
291 return unregister_select( this.impl->fut, s );
292 }
293
294 bool on_selected( future_rc(T) &, select_node & ) { return true; }
295
296 // USED BY CLIENT
297
298 bool available( future_rc( T ) & frc ) { return available( frc.impl->fut ); } // future result available ?
299
300 bool fulfil( future_rc(T) & frc, T val ) with( frc ) { return fulfil( impl->fut, val ); }
301 bool ?()( future_rc(T) & frc, T val ) { return fulfil( frc, val ); } // alternate interface
302
303 int ?==?( future_rc( T ) & lhs, future_rc( T ) & rhs ) { return lhs.impl == rhs.impl; } // referential equality
304
305 // USED BY SERVER
306
307 T get( future_rc(T) & frc ) with( frc ) { return get( impl->fut ); }
308 T ?()( future_rc(T) & frc ) with( frc ) { return get( frc ); } // alternate interface
309
310 bool fulfil( future_rc(T) & frc, exception_t * ex ) with( frc ) { return fulfil( impl->fut, ex ); }
311 bool ?()( future_rc(T) & frc, exception_t * ex ) { return fulfil( frc, ex ); } // alternate interface
312
313 void reset( future_rc(T) & frc ) with( frc ) { reset( impl->fut ); } // mark future as empty (for reuse)
314 } // static inline
315} // forall( T )
316
317//--------------------------------------------------------------------------------------------------------
318// These futures below do not support waituntil statements so they may not have as many features as 'future'
319// however the 'single_future' is cheap and cheerful and is most likely more performant than 'future'
320// since it uses raw atomics and no locks
321//
322// As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future'
323// since it is monitor based and also is not compatible with waituntil statement.
324//--------------------------------------------------------------------------------------------------------
325
326forall( T ) {
327 struct single_future {
328 inline future_t;
329 T result;
330 };
331
332 static inline {
333 // Reset future back to original state
334 void reset(single_future(T) & this) { reset( (future_t&)this ); }
335
336 // check if the future is available
337 bool available( single_future(T) & this ) { return available( (future_t&)this ); }
338
339 // Mark the future as abandoned, meaning it will be deleted by the server
340 // This doesn't work beause of the potential need for a destructor
341 // void abandon( single_future(T) & this );
342
343 // Fulfil the future, returns whether or not someone was unblocked
344 thread$ * fulfil( single_future(T) & this, T result ) {
345 this.result = result;
346 return fulfil( (future_t&)this );
347 }
348
349 // Wait for the future to be fulfilled
350 // Also return whether the thread had to block or not
351 [T, bool] wait( single_future(T) & this ) {
352 bool r = wait( (future_t&)this );
353 return [this.result, r];
354 }
355
356 // Wait for the future to be fulfilled
357 T wait( single_future(T) & this ) {
358 [T, bool] tt;
359 tt = wait(this);
360 return tt.0;
361 }
362 }
363}
364
365forall( T ) {
366 monitor multi_future {
367 inline future_t;
368 condition blocked;
369 bool has_first;
370 T result;
371 };
372
373 static inline {
374 void ?{}(multi_future(T) & this) {
375 this.has_first = false;
376 }
377
378 bool $first( multi_future(T) & mutex this ) {
379 if ( this.has_first ) {
380 wait( this.blocked );
381 return false;
382 }
383
384 this.has_first = true;
385 return true;
386 }
387
388 void $first_done( multi_future(T) & mutex this ) {
389 this.has_first = false;
390 signal_all( this.blocked );
391 }
392
393 // Reset future back to original state
394 void reset(multi_future(T) & mutex this) {
395 if ( this.has_first != false ) abort("Attempting to reset a multi_future with at least one blocked threads");
396 if ( ! empty( this.blocked ) ) abort("Attempting to reset a multi_future with multiple blocked threads");
397 reset( (future_t&)*(future_t*)((uintptr_t)&this + sizeof(monitor$)) );
398 }
399
400 // Fulfil the future, returns whether or not someone was unblocked
401 bool fulfil( multi_future(T) & this, T result ) {
402 this.result = result;
403 return fulfil( (future_t&)*(future_t*)((uintptr_t)&this + sizeof(monitor$)) ) != 0p;
404 }
405
406 // Wait for the future to be fulfilled
407 // Also return whether the thread had to block or not
408 [T, bool] wait( multi_future(T) & this ) {
409 bool sw = $first( this );
410 bool w = !sw;
411 if ( sw ) {
412 w = wait( (future_t&)*(future_t*)((uintptr_t)&this + sizeof(monitor$)) );
413 $first_done( this );
414 }
415
416 return [this.result, w];
417 }
418
419 // Wait for the future to be fulfilled
420 T wait( multi_future(T) & this ) {
421 return wait(this).0;
422 }
423 }
424}
Note: See TracBrowser for help on using the repository browser.