source: libcfa/src/concurrency/future.hfa@ a7b78c3

Last change on this file since a7b78c3 was 6b33e89, checked in by Peter A. Buhr <pabuhr@…>, 6 months ago

change backquote call to regular call

  • Property mode set to 100644
File size: 9.0 KB
RevLine 
[70f8bcd2]1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
[339e30a]7// concurrency/future.hfa --
[70f8bcd2]8//
[339e30a]9// Author : Thierry Delisle & Peiran Hong & Colby Parsons
[70f8bcd2]10// Created On : Wed Jan 06 17:33:18 2021
[3483185]11// Last Modified By : Peter A. Buhr
[6b33e89]12// Last Modified On : Wed Apr 23 22:41:10 2025
13// Update Count : 22
[70f8bcd2]14//
15
[5e4a830]16#pragma once
[70f8bcd2]17
18#include "bits/locks.hfa"
19#include "monitor.hfa"
[339e30a]20#include "select.hfa"
[beeff61e]21#include "locks.hfa"
[70f8bcd2]22
[339e30a]23//----------------------------------------------------------------------------
24// future
[1c0a3a4]25// I don't use future_t here as I need to use a lock for this future since it supports multiple consumers.
26// future_t is lockfree and uses atomics which aren't needed given we use locks here
[fd54fef]27forall( T ) {
[3483185]28 enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 };
[339e30a]29
[70f8bcd2]30 struct future {
[339e30a]31 int state;
32 T result;
[3483185]33 exception_t * except;
[339e30a]34 dlist( select_node ) waiters;
[3483185]35 futex_mutex lock;
[339e30a]36 };
[3483185]37 __CFA_SELECT_GET_TYPE( future(T) );
[339e30a]38
[3483185]39 struct future_node {
40 inline select_node;
41 T * my_result;
42 };
[339e30a]43
44 static inline {
45
[3483185]46 void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) {
47 ((select_node &)this){ blocked_thread };
48 this.my_result = my_result;
49 }
[339e30a]50
[3483185]51 void ?{}( future(T) & this ) {
[339e30a]52 this.waiters{};
[3483185]53 this.except = 0p;
54 this.state = FUTURE_EMPTY;
55 this.lock{};
56 }
57
58 void ^?{}( future(T) & this ) {
59 free( this.except );
[339e30a]60 }
61
62 // Reset future back to original state
[3483185]63 void reset( future(T) & this ) with(this) {
64 lock( lock );
[6b33e89]65 if ( ! isEmpty( waiters ) )
[3483185]66 abort("Attempting to reset a future with blocked waiters");
67 state = FUTURE_EMPTY;
68 free( except );
69 except = 0p;
70 unlock( lock );
71 }
[339e30a]72
73 // check if the future is available
[3483185]74 // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected
[70a4ed5]75 bool available( future(T) & this ) { return __atomic_load_n( &this.state, __ATOMIC_RELAXED ); }
[339e30a]76
77
[3483185]78 // memcpy wrapper to help copy values
79 void copy_T( T & from, T & to ) {
80 memcpy((void *)&to, (void *)&from, sizeof(T));
81 }
[339e30a]82
[3483185]83 bool fulfil$( future(T) & this ) with(this) { // helper
[6b33e89]84 bool ret_val = ! isEmpty( waiters );
[3483185]85 state = FUTURE_FULFILLED;
[6b33e89]86 while ( ! isEmpty( waiters ) ) {
[3483185]87 if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case
88 break; // if handle_OR returns false then waiters is empty so break
[6b33e89]89 select_node &s = remove_first( waiters );
[339e30a]90
[3483185]91 if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks
92 copy_T( result, *(((future_node(T) &)s).my_result) );
[db19e1d]93
[3483185]94 wake_one( waiters, s );
95 }
96 unlock( lock );
97 return ret_val;
98 }
[339e30a]99
100 // Fulfil the future, returns whether or not someone was unblocked
[beeff61e]101 bool fulfil( future(T) & this, T val ) with(this) {
[3483185]102 lock( lock );
103 if ( state != FUTURE_EMPTY )
104 abort("Attempting to fulfil a future that has already been fulfilled");
105
106 copy_T( val, result );
107 return fulfil$( this );
108 }
109
110 bool ?()( future(T) & this, T val ) { // alternate interface
111 return fulfil( this, val );
112 }
[339e30a]113
[3483185]114 // Load an exception to the future, returns whether or not someone was unblocked
115 bool fulfil( future(T) & this, exception_t * ex ) with(this) {
116 lock( lock );
117 if ( state != FUTURE_EMPTY )
118 abort("Attempting to fulfil a future that has already been fulfilled");
119
120 except = ( exception_t * ) malloc( ex->virtual_table->size );
121 ex->virtual_table->copy( except, ex );
122 return fulfil$( this );
123 }
[339e30a]124
[3483185]125 bool ?()( future(T) & this, exception_t * ex ) { // alternate interface
126 return fulfil( this, ex );
[339e30a]127 }
128
129 // Wait for the future to be fulfilled
130 // Also return whether the thread had to block or not
131 [T, bool] get( future(T) & this ) with( this ) {
[3483185]132 void exceptCheck() { // helper
133 if ( except ) {
134 exception_t * ex = ( exception_t * ) alloca( except->virtual_table->size );
135 except->virtual_table->copy( ex, except );
136 unlock( lock );
137 throwResume * ex;
138 }
139 }
140
141 lock( lock );
142 T ret_val;
143 if ( state == FUTURE_FULFILLED ) {
144 exceptCheck();
145 copy_T( result, ret_val );
146 unlock( lock );
147 return [ret_val, false];
148 }
149
150 future_node(T) node = { active_thread(), &ret_val };
151 insert_last( waiters, ((select_node &)node) );
152 unlock( lock );
153 park( );
154 exceptCheck();
[339e30a]155
156 return [ret_val, true];
157 }
158
159 // Wait for the future to be fulfilled
160 T get( future(T) & this ) {
161 [T, bool] tt;
162 tt = get(this);
163 return tt.0;
164 }
165
[3483185]166 T ?()( future(T) & this ) { // alternate interface
167 return get( this );
168 }
169
170 // Gets value if it is available and returns [ val, true ]
171 // otherwise returns [ default_val, false]
172 // will not block
173 [T, bool] try_get( future(T) & this ) with(this) {
174 lock( lock );
175 T ret_val;
176 if ( state == FUTURE_FULFILLED ) {
177 copy_T( result, ret_val );
178 unlock( lock );
179 return [ret_val, true];
180 }
181 unlock( lock );
182
183 return [ret_val, false];
184 }
185
186 bool register_select( future(T) & this, select_node & s ) with(this) {
187 lock( lock );
188
189 // check if we can complete operation. If so race to establish winner in special OR case
190 if ( !s.park_counter && state != FUTURE_EMPTY ) {
191 if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering
192 unlock( lock );
193 return false;
194 }
195 }
196
197 // future not ready -> insert select node and return
198 if ( state == FUTURE_EMPTY ) {
199 insert_last( waiters, s );
200 unlock( lock );
201 return false;
202 }
203
204 __make_select_node_available( s );
205 unlock( lock );
206 return true;
207 }
208
209 bool unregister_select( future(T) & this, select_node & s ) with(this) {
[6b33e89]210 if ( ! isListed( s ) ) return false;
[3483185]211 lock( lock );
[6b33e89]212 if ( isListed( s ) ) remove( s );
[3483185]213 unlock( lock );
214 return false;
215 }
216
217 bool on_selected( future(T) &, select_node & ) { return true; }
[339e30a]218 }
219}
220
221//--------------------------------------------------------------------------------------------------------
[70a4ed5]222// These futures below do not support select statements so they may not have as many features as 'future'
[339e30a]223// however the 'single_future' is cheap and cheerful and is most likely more performant than 'future'
[beeff61e]224// since it uses raw atomics and no locks
[339e30a]225//
226// As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future'
227// since it is monitor based and also is not compatible with select statements
228//--------------------------------------------------------------------------------------------------------
229
230forall( T ) {
231 struct single_future {
[70f8bcd2]232 inline future_t;
233 T result;
234 };
235
236 static inline {
237 // Reset future back to original state
[339e30a]238 void reset(single_future(T) & this) { reset( (future_t&)this ); }
[70f8bcd2]239
240 // check if the future is available
[339e30a]241 bool available( single_future(T) & this ) { return available( (future_t&)this ); }
[70f8bcd2]242
243 // Mark the future as abandoned, meaning it will be deleted by the server
244 // This doesn't work beause of the potential need for a destructor
[d923fca]245 // void abandon( single_future(T) & this );
[70f8bcd2]246
247 // Fulfil the future, returns whether or not someone was unblocked
[339e30a]248 thread$ * fulfil( single_future(T) & this, T result ) {
[70f8bcd2]249 this.result = result;
250 return fulfil( (future_t&)this );
251 }
252
253 // Wait for the future to be fulfilled
254 // Also return whether the thread had to block or not
[339e30a]255 [T, bool] wait( single_future(T) & this ) {
[70f8bcd2]256 bool r = wait( (future_t&)this );
257 return [this.result, r];
258 }
259
260 // Wait for the future to be fulfilled
[339e30a]261 T wait( single_future(T) & this ) {
[70f8bcd2]262 [T, bool] tt;
263 tt = wait(this);
264 return tt.0;
265 }
266 }
267}
268
[fd54fef]269forall( T ) {
[70f8bcd2]270 monitor multi_future {
271 inline future_t;
272 condition blocked;
273 bool has_first;
274 T result;
275 };
276
277 static inline {
278 void ?{}(multi_future(T) & this) {
279 this.has_first = false;
280 }
281
282 bool $first( multi_future(T) & mutex this ) {
[3483185]283 if ( this.has_first ) {
[70f8bcd2]284 wait( this.blocked );
285 return false;
286 }
287
288 this.has_first = true;
289 return true;
290 }
291
292 void $first_done( multi_future(T) & mutex this ) {
293 this.has_first = false;
294 signal_all( this.blocked );
295 }
296
297 // Reset future back to original state
298 void reset(multi_future(T) & mutex this) {
[3483185]299 if ( this.has_first != false ) abort("Attempting to reset a multi_future with at least one blocked threads");
300 if ( !is_empty(this.blocked) ) abort("Attempting to reset a multi_future with multiple blocked threads");
[71ca5b9]301 reset( (future_t&)*(future_t*)((uintptr_t)&this + sizeof(monitor$)) );
[70f8bcd2]302 }
303
304 // Fulfil the future, returns whether or not someone was unblocked
305 bool fulfil( multi_future(T) & this, T result ) {
306 this.result = result;
[71ca5b9]307 return fulfil( (future_t&)*(future_t*)((uintptr_t)&this + sizeof(monitor$)) ) != 0p;
[70f8bcd2]308 }
309
310 // Wait for the future to be fulfilled
311 // Also return whether the thread had to block or not
312 [T, bool] wait( multi_future(T) & this ) {
313 bool sw = $first( this );
314 bool w = !sw;
315 if ( sw ) {
[71ca5b9]316 w = wait( (future_t&)*(future_t*)((uintptr_t)&this + sizeof(monitor$)) );
[70f8bcd2]317 $first_done( this );
318 }
319
320 return [this.result, w];
321 }
322
323 // Wait for the future to be fulfilled
324 T wait( multi_future(T) & this ) {
325 return wait(this).0;
326 }
327 }
[db19e1d]328}
Note: See TracBrowser for help on using the repository browser.