Changeset 5e0b6657 for libcfa/src/concurrency/future.hfa
- Timestamp:
- Dec 8, 2025, 11:29:33 AM (2 months ago)
- Branches:
- master
- Children:
- 79ba50c
- Parents:
- 8f448e0 (diff), 79ec8c3 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - File:
-
- 1 edited
-
libcfa/src/concurrency/future.hfa (modified) (5 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/future.hfa
r8f448e0 r5e0b6657 7 7 // concurrency/future.hfa -- 8 8 // 9 // Author : Thierry Delisle & Peiran Hong & Colby Parsons 9 // Author : Thierry Delisle & Peiran Hong & Colby Parsons & Peter Buhr 10 10 // Created On : Wed Jan 06 17:33:18 2021 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Wed Apr 23 22:41:10202513 // Update Count : 22 12 // Last Modified On : Mon Nov 24 16:08:52 2025 13 // Update Count : 222 14 14 // 15 15 … … 21 21 #include "locks.hfa" 22 22 23 //---------------------------------------------------------------------------- 24 // future 25 // I don't use future_t here as I need to use a lock for this future since it supports multiple consumers. 26 // future_t is lockfree and uses atomics which aren't needed given we use locks here 23 //-------------------------------------------------------------------------------------------------------- 24 // future does not use future_t as it needs a lock to support multiple consumers. future_t is lockfree 25 // and uses atomics which are not needed. 26 //-------------------------------------------------------------------------------------------------------- 27 27 28 forall( T ) { 28 enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; 29 // PRIVATE 30 31 struct future_node$ { 32 inline select_node; 33 T * my_result; 34 }; 35 36 static inline { 37 // memcpy wrapper to help copy values 38 void copy_T$( T & to, T & from ) { memcpy( (void *)&to, (void *)&from, sizeof(T) ); } 39 } // distribution 40 41 enum { FUTURE_EMPTY$ = 0, FUTURE_FULFILLED$ = 1 }; 42 43 // PUBLIC 29 44 30 45 struct future { … … 32 47 T result; 33 48 exception_t * except; 49 futex_mutex lock; 34 50 dlist( select_node ) waiters; 35 futex_mutex lock;36 51 }; 37 __CFA_SELECT_GET_TYPE( future(T) ); 38 39 struct future_node { 40 inline select_node; 41 T * my_result; 42 }; 43 44 static inline { 45 46 void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) { 47 ((select_node &)this){ blocked_thread }; 48 this.my_result = my_result; 49 } 50 51 void ?{}( future(T) & this ) { 52 this.waiters{}; 53 this.except = 0p; 54 this.state = FUTURE_EMPTY; 55 this.lock{}; 56 } 57 58 void ^?{}( future(T) & this ) { 59 free( this.except ); 60 } 61 62 // Reset future back to original state 63 void reset( future(T) & this ) with(this) { 64 lock( lock ); 65 if ( ! isEmpty( waiters ) ) 66 abort("Attempting to reset a future with blocked waiters"); 67 state = FUTURE_EMPTY; 52 __CFA_SELECT_GET_TYPE( future(T) ); // magic 53 54 static inline { 55 // PRIVATE 56 57 bool register_select$( future(T) & fut, select_node & s ) with( fut ) { // for waituntil statement 58 lock( lock ); 59 60 // check if we can complete operation. If so race to establish winner in special OR case 61 if ( !s.park_counter && state != FUTURE_EMPTY$ ) { 62 if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering 63 unlock( lock ); 64 return false; 65 } 66 } 67 68 // future not ready -> insert select node and return 69 if ( state == FUTURE_EMPTY$ ) { 70 insert_last( waiters, s ); 71 unlock( lock ); 72 return false; 73 } 74 75 __make_select_node_available( s ); 76 unlock( lock ); 77 return true; 78 } 79 80 bool unregister_select$( future(T) & fut, select_node & s ) with( fut ) { // for waituntil statement 81 if ( ! isListed( s ) ) return false; 82 lock( lock ); 83 if ( isListed( s ) ) remove( s ); 84 unlock( lock ); 85 return false; 86 } 87 88 bool on_selected$( future(T) &, select_node & ) { return true; } // for waituntil statement 89 90 // PUBLIC 91 92 // General 93 94 void ?{}( future_node$(T) & fut, thread$ * blocked_thread, T * my_result ) { 95 ((select_node &)fut){ blocked_thread }; 96 fut.my_result = my_result; 97 } 98 99 void ?{}( future(T) & fut ) with( fut ) { 100 except = 0p; 101 state = FUTURE_EMPTY$; 102 } 103 104 void ^?{}( future(T) & fut ) with( fut ) { 68 105 free( except ); 69 except = 0p; 70 unlock( lock ); 71 } 72 73 // check if the future is available 74 // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected 75 bool available( future(T) & this ) { return __atomic_load_n( &this.state, __ATOMIC_RELAXED ); } 76 77 78 // memcpy wrapper to help copy values 79 void copy_T( T & from, T & to ) { 80 memcpy((void *)&to, (void *)&from, sizeof(T)); 81 } 82 83 bool fulfil$( future(T) & this ) with(this) { // helper 84 bool ret_val = ! isEmpty( waiters ); 85 state = FUTURE_FULFILLED; 86 while ( ! isEmpty( waiters ) ) { 87 if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case 88 break; // if handle_OR returns false then waiters is empty so break 89 select_node &s = remove_first( waiters ); 90 91 if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks 92 copy_T( result, *(((future_node(T) &)s).my_result) ); 93 94 wake_one( waiters, s ); 95 } 96 unlock( lock ); 97 return ret_val; 98 } 99 100 // Fulfil the future, returns whether or not someone was unblocked 101 bool fulfil( future(T) & this, T val ) with(this) { 102 lock( lock ); 103 if ( state != FUTURE_EMPTY ) 104 abort("Attempting to fulfil a future that has already been fulfilled"); 105 106 copy_T( val, result ); 107 return fulfil$( this ); 108 } 109 110 bool ?()( future(T) & this, T val ) { // alternate interface 111 return fulfil( this, val ); 112 } 113 114 // Load an exception to the future, returns whether or not someone was unblocked 115 bool fulfil( future(T) & this, exception_t * ex ) with(this) { 116 lock( lock ); 117 if ( state != FUTURE_EMPTY ) 118 abort("Attempting to fulfil a future that has already been fulfilled"); 119 120 except = ( exception_t * ) malloc( ex->virtual_table->size ); 121 ex->virtual_table->copy( except, ex ); 122 return fulfil$( this ); 123 } 124 125 bool ?()( future(T) & this, exception_t * ex ) { // alternate interface 126 return fulfil( this, ex ); 127 } 128 129 // Wait for the future to be fulfilled 130 // Also return whether the thread had to block or not 131 [T, bool] get( future(T) & this ) with( this ) { 106 } 107 108 // Used by Client 109 110 // PRIVATE 111 112 // Return a value/exception from the future. 113 T get$( future(T) & fut ) with( fut ) { // helper 132 114 void exceptCheck() { // helper 133 115 if ( except ) { … … 138 120 } 139 121 } 140 141 lock( lock );142 122 T ret_val; 143 if ( state == FUTURE_FULFILLED ) { 123 124 // LOCK ACQUIRED IN PUBLIC get 125 if ( state == FUTURE_FULFILLED$ ) { 144 126 exceptCheck(); 145 copy_T ( result, ret_val);127 copy_T$( ret_val, result ); 146 128 unlock( lock ); 147 return [ret_val, false];148 } 149 150 future_node (T) node = { active_thread(), &ret_val };129 return ret_val; 130 } 131 132 future_node$(T) node = { active_thread(), &ret_val }; 151 133 insert_last( waiters, ((select_node &)node) ); 152 134 unlock( lock ); 153 135 park( ); 154 136 exceptCheck(); 155 156 return [ret_val, true]; 157 } 158 159 // Wait for the future to be fulfilled 160 T get( future(T) & this ) { 161 [T, bool] tt; 162 tt = get(this); 163 return tt.0; 164 } 165 166 T ?()( future(T) & this ) { // alternate interface 167 return get( this ); 168 } 169 170 // Gets value if it is available and returns [ val, true ] 171 // otherwise returns [ default_val, false] 172 // will not block 173 [T, bool] try_get( future(T) & this ) with(this) { 137 return ret_val; 138 } 139 140 // PUBLIC 141 142 bool available( future( T ) & fut ) { return __atomic_load_n( &fut.state, __ATOMIC_RELAXED ); } // future result available ? 143 144 // Return a value/exception from the future. 145 [T, bool] get( future(T) & fut ) with( fut ) { 146 lock( lock ); 147 bool ret = state == FUTURE_EMPTY$; 148 return [ get$( fut ), ret ]; 149 } 150 151 T get( future(T) & fut ) with( fut ) { 152 lock( lock ); 153 return get$( fut ); 154 } 155 T ?()( future(T) & fut ) { return get( fut ); } // alternate interface 156 157 // Non-blocking get: true => return defined value, false => value return undefined. 158 [T, bool] try_get( future(T) & fut ) with( fut ) { 174 159 lock( lock ); 175 160 T ret_val; 176 if ( state == FUTURE_FULFILLED ) {177 copy_T ( result, ret_val);161 if ( state == FUTURE_FULFILLED$ ) { 162 copy_T$( ret_val, result ); 178 163 unlock( lock ); 179 164 return [ret_val, true]; 180 165 } 181 166 unlock( lock ); 182 183 167 return [ret_val, false]; 184 168 } 185 169 186 bool register_select( future(T) & this, select_node & s ) with(this) { 187 lock( lock ); 188 189 // check if we can complete operation. If so race to establish winner in special OR case 190 if ( !s.park_counter && state != FUTURE_EMPTY ) { 191 if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering 192 unlock( lock ); 193 return false; 194 } 195 } 196 197 // future not ready -> insert select node and return 198 if ( state == FUTURE_EMPTY ) { 199 insert_last( waiters, s ); 200 unlock( lock ); 201 return false; 202 } 203 204 __make_select_node_available( s ); 205 unlock( lock ); 206 return true; 207 } 208 209 bool unregister_select( future(T) & this, select_node & s ) with(this) { 210 if ( ! isListed( s ) ) return false; 211 lock( lock ); 212 if ( isListed( s ) ) remove( s ); 213 unlock( lock ); 214 return false; 215 } 216 217 bool on_selected( future(T) &, select_node & ) { return true; } 218 } 219 } 220 221 //-------------------------------------------------------------------------------------------------------- 222 // These futures below do not support select statements so they may not have as many features as 'future' 223 // however the 'single_future' is cheap and cheerful and is most likely more performant than 'future' 224 // since it uses raw atomics and no locks 225 // 226 // As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future' 227 // since it is monitor based and also is not compatible with select statements 170 // Used by Server 171 172 // PRIVATE 173 174 bool fulfil$( future(T) & fut ) with( fut ) { // helper 175 bool ret_val = ! isEmpty( waiters ); 176 state = FUTURE_FULFILLED$; 177 while ( ! isEmpty( waiters ) ) { 178 if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case 179 break; // if handle_OR returns false then waiters is empty so break 180 select_node &s = remove_first( waiters ); 181 182 if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks 183 copy_T$( *(((future_node$(T) &)s).my_result), result ); 184 185 wake_one( waiters, s ); 186 } 187 unlock( lock ); 188 return ret_val; 189 } 190 191 // PUBLIC 192 193 // Load a value/exception into the future, returns whether or not waiting threads. 194 bool fulfil( future(T) & fut, T val ) with( fut ) { 195 lock( lock ); 196 if ( state != FUTURE_EMPTY$ ) abort("Attempting to fulfil a future that has already been fulfilled"); 197 copy_T$( result, val ); 198 return fulfil$( fut ); 199 } 200 bool ?()( future(T) & fut, T val ) { return fulfil( fut, val ); } // alternate interface 201 202 bool fulfil( future(T) & fut, exception_t * ex ) with( fut ) { 203 lock( lock ); 204 if ( state != FUTURE_EMPTY$ ) abort( "Attempting to fulfil a future that has already been fulfilled" ); 205 except = ( exception_t * ) malloc( ex->virtual_table->size ); 206 ex->virtual_table->copy( except, ex ); 207 return fulfil$( fut ); 208 } 209 bool ?()( future(T) & fut, exception_t * ex ) { return fulfil( fut, ex ); } // alternate interface 210 211 void reset( future(T) & fut ) with( fut ) { // mark future as empty (for reuse) 212 lock( lock ); 213 if ( ! isEmpty( waiters ) ) abort( "Attempting to reset a future with blocked waiters" ); 214 state = FUTURE_EMPTY$; 215 free( except ); 216 except = 0p; 217 unlock( lock ); 218 } 219 } // static inline 220 } // forall( T ) 221 222 //-------------------------------------------------------------------------------------------------------- 223 // future_rc uses reference counting to eliminate explicit storage-management and support the waituntil 224 // statement. 228 225 //-------------------------------------------------------------------------------------------------------- 229 226 230 227 forall( T ) { 228 // PRIVATE 229 230 struct future_rc_impl$ { 231 futex_mutex lock; // concurrent protection 232 size_t refCnt; // number of references to future 233 future(T) fut; // underlying future 234 }; // future_rc_impl$ 235 236 static inline { 237 size_t incRef$( future_rc_impl$( T ) & impl ) with( impl ) { 238 return __atomic_fetch_add( &refCnt, 1, __ATOMIC_SEQ_CST ); 239 } // incRef$ 240 241 size_t decRef$( future_rc_impl$( T ) & impl ) with( impl ) { 242 return __atomic_fetch_add( &refCnt, -1, __ATOMIC_SEQ_CST ); 243 } // decRef$ 244 245 void ?{}( future_rc_impl$( T ) & frc ) with( frc ) { 246 refCnt = 1; // count initial object 247 } // ?{} 248 } // static inline 249 250 // PUBLIC 251 252 struct future_rc { 253 future_rc_impl$(T) * impl; 254 }; // future_rc 255 __CFA_SELECT_GET_TYPE( future_rc(T) ); // magic 256 257 static inline { 258 // PRIVATE 259 260 bool register_select$( future_rc(T) & frc, select_node & s ) with( frc ) { // for waituntil statement 261 return register_select$( frc.impl->fut, s ); 262 } 263 264 bool unregister_select$( future_rc(T) & frc, select_node & s ) with( frc ) { // for waituntil statement 265 return unregister_select$( frc.impl->fut, s ); 266 } 267 268 bool on_selected$( future_rc(T) &, select_node & ) { return true; } // for waituntil statement 269 270 // PUBLIC 271 272 // General 273 274 void ?{}( future_rc( T ) & frc ) with( frc ) { // default constructor 275 impl = new(); 276 } // ?{} 277 278 void ?{}( future_rc( T ) & to, future_rc( T ) & from ) with( to ) { // copy constructor 279 impl = from.impl; // point at new impl 280 incRef$( *impl ); 281 } // ?{} 282 283 void ^?{}( future_rc( T ) & frc ) with( frc ) { 284 if ( decRef$( *impl ) == 1 ) { delete( impl ); impl = 0p; } 285 } // ^?{} 286 287 future_rc( T ) & ?=?( future_rc( T ) & lhs, future_rc( T ) & rhs ) with( lhs ) { 288 if ( impl == rhs.impl ) return lhs; // self assignment ? 289 if ( decRef$( *impl ) == 1 ) { delete( impl ); impl = 0p; } // no references ? => delete current impl 290 impl = rhs.impl; // point at new impl 291 incRef$( *impl ); // and increment reference count 292 return lhs; 293 } // ?+? 294 295 // Used by Client 296 297 bool available( future_rc( T ) & frc ) { return available( frc.impl->fut ); } // future result available ? 298 299 // Return a value/exception from the future. 300 [T, bool] get( future_rc(T) & frc ) with( frc ) { return get( impl->fut ); } // return future value 301 T get( future_rc(T) & frc ) with( frc ) { return get( impl->fut ); } // return future value 302 T ?()( future_rc(T) & frc ) with( frc ) { return get( frc ); } // alternate interface 303 [T, bool] try_get( future_rc(T) & frc ) with( frc ) { return try_get( impl->fut ); } 304 305 int ?==?( future_rc( T ) & lhs, future_rc( T ) & rhs ) { return lhs.impl == rhs.impl; } // referential equality 306 307 // Used by Server 308 309 // Load a value/exception into the future, returns whether or not waiting threads. 310 bool fulfil( future_rc(T) & frc, T val ) with( frc ) { return fulfil( impl->fut, val ); } // copy-in future value 311 bool ?()( future_rc(T) & frc, T val ) { return fulfil( frc, val ); } // alternate interface 312 313 bool fulfil( future_rc(T) & frc, exception_t * ex ) with( frc ) { return fulfil( impl->fut, ex ); } // insert future exception 314 bool ?()( future_rc(T) & frc, exception_t * ex ) { return fulfil( frc, ex ); } // alternate interface 315 316 void reset( future_rc(T) & frc ) with( frc ) { reset( impl->fut ); } // mark future as empty (for reuse) 317 } // static inline 318 } // forall( T ) 319 320 //-------------------------------------------------------------------------------------------------------- 321 // This future does not support waituntil statements so it does not have as many features as 'future'. 322 // However, it is cheap and cheerful and is more performant than 'future' since it uses raw atomics 323 // and no locks 324 //-------------------------------------------------------------------------------------------------------- 325 326 forall( T ) { 327 // PUBLIC 328 231 329 struct single_future { 232 330 inline future_t; … … 235 333 236 334 static inline { 237 // Reset future back to original state 238 void reset(single_future(T) & this) { reset( (future_t&)this ); } 239 240 // check if the future is available 241 bool available( single_future(T) & this ) { return available( (future_t&)this ); } 242 243 // Mark the future as abandoned, meaning it will be deleted by the server 244 // This doesn't work beause of the potential need for a destructor 245 // void abandon( single_future(T) & this ); 246 247 // Fulfil the future, returns whether or not someone was unblocked 248 thread$ * fulfil( single_future(T) & this, T result ) { 249 this.result = result; 250 return fulfil( (future_t&)this ); 251 } 252 253 // Wait for the future to be fulfilled 254 // Also return whether the thread had to block or not 255 [T, bool] wait( single_future(T) & this ) { 256 bool r = wait( (future_t&)this ); 257 return [this.result, r]; 258 } 259 260 // Wait for the future to be fulfilled 261 T wait( single_future(T) & this ) { 262 [T, bool] tt; 263 tt = wait(this); 264 return tt.0; 265 } 266 } 267 } 268 269 forall( T ) { 270 monitor multi_future { 271 inline future_t; 272 condition blocked; 273 bool has_first; 274 T result; 275 }; 276 277 static inline { 278 void ?{}(multi_future(T) & this) { 279 this.has_first = false; 280 } 281 282 bool $first( multi_future(T) & mutex this ) { 283 if ( this.has_first ) { 284 wait( this.blocked ); 285 return false; 286 } 287 288 this.has_first = true; 289 return true; 290 } 291 292 void $first_done( multi_future(T) & mutex this ) { 293 this.has_first = false; 294 signal_all( this.blocked ); 295 } 296 297 // Reset future back to original state 298 void reset(multi_future(T) & mutex this) { 299 if ( this.has_first != false ) abort("Attempting to reset a multi_future with at least one blocked threads"); 300 if ( !is_empty(this.blocked) ) abort("Attempting to reset a multi_future with multiple blocked threads"); 301 reset( (future_t&)*(future_t*)((uintptr_t)&this + sizeof(monitor$)) ); 302 } 303 304 // Fulfil the future, returns whether or not someone was unblocked 305 bool fulfil( multi_future(T) & this, T result ) { 306 this.result = result; 307 return fulfil( (future_t&)*(future_t*)((uintptr_t)&this + sizeof(monitor$)) ) != 0p; 308 } 309 310 // Wait for the future to be fulfilled 311 // Also return whether the thread had to block or not 312 [T, bool] wait( multi_future(T) & this ) { 313 bool sw = $first( this ); 314 bool w = !sw; 315 if ( sw ) { 316 w = wait( (future_t&)*(future_t*)((uintptr_t)&this + sizeof(monitor$)) ); 317 $first_done( this ); 318 } 319 320 return [this.result, w]; 321 } 322 323 // Wait for the future to be fulfilled 324 T wait( multi_future(T) & this ) { 325 return wait(this).0; 326 } 327 } 328 } 335 // PUBLIC 336 337 bool available( single_future(T) & fut ) { return available( (future_t &)fut ); } // future result available ? 338 339 // Return a value/exception from the future. 340 [T, bool] get( single_future(T) & fut ) { return [fut.result, wait( fut )]; } 341 T get( single_future(T) & fut ) { wait( fut ); return fut.result; } 342 T ?()( single_future(T) & fut ) { return get( fut ); } // alternate interface 343 344 // Load a value into the future, returns whether or not waiting threads. 345 bool fulfil( single_future(T) & fut, T result ) { 346 fut.result = result; 347 return fulfil( (future_t &)fut ) != 0p; 348 } 349 bool ?()( single_future(T) & fut, T val ) { return fulfil( fut, val ); } // alternate interface 350 351 void reset( single_future(T) & fut ) { reset( (future_t &)fut ); } // mark future as empty (for reuse) 352 } // static inline 353 } // forall( T )
Note:
See TracChangeset
for help on using the changeset viewer.