Changeset 8ffee9a
- Timestamp:
- Nov 19, 2025, 10:00:11 AM (18 hours ago)
- Branches:
- master
- Parents:
- b5749f9
- File:
-
- 1 edited
-
libcfa/src/concurrency/future.hfa (modified) (11 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/future.hfa
rb5749f9 r8ffee9a 10 10 // Created On : Wed Jan 06 17:33:18 2021 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Mon Nov 17 08:58:38 202513 // Update Count : 16412 // Last Modified On : Wed Nov 19 09:26:38 2025 13 // Update Count : 204 14 14 // 15 15 … … 27 27 28 28 forall( T ) { 29 enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; 29 // PRIVATE 30 31 struct future_node$ { 32 inline select_node; 33 T * my_result; 34 }; 35 36 static inline { 37 // memcpy wrapper to help copy values 38 void copy_T$( T & to, T & from ) { memcpy( (void *)&to, (void *)&from, sizeof(T) ); } 39 } // distribution 40 41 enum { FUTURE_EMPTY$ = 0, FUTURE_FULFILLED$ = 1 }; 42 43 // PUBLIC 30 44 31 45 struct future { … … 33 47 T result; 34 48 exception_t * except; 49 futex_mutex lock; 35 50 dlist( select_node ) waiters; 36 futex_mutex lock;37 51 }; 38 __CFA_SELECT_GET_TYPE( future(T) ); 39 40 struct future_node { 41 inline select_node; 42 T * my_result; 43 }; 44 45 static inline { 46 void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) { 47 ((select_node &)this){ blocked_thread }; 48 this.my_result = my_result; 49 } 50 51 void ?{}( future(T) & this ) with( this ) { 52 waiters{}; 52 __CFA_SELECT_GET_TYPE( future(T) ); // magic 53 54 static inline { 55 // PRIVATE 56 57 bool register_select( future(T) & fut, select_node & s ) with( fut ) { // for waituntil statement 58 lock( lock ); 59 60 // check if we can complete operation. If so race to establish winner in special OR case 61 if ( !s.park_counter && state != FUTURE_EMPTY$ ) { 62 if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering 63 unlock( lock ); 64 return false; 65 } 66 } 67 68 // future not ready -> insert select node and return 69 if ( state == FUTURE_EMPTY$ ) { 70 insert_last( waiters, s ); 71 unlock( lock ); 72 return false; 73 } 74 75 __make_select_node_available( s ); 76 unlock( lock ); 77 return true; 78 } 79 80 bool unregister_select( future(T) & fut, select_node & s ) with( fut ) { // for waituntil statement 81 if ( ! isListed( s ) ) return false; 82 lock( lock ); 83 if ( isListed( s ) ) remove( s ); 84 unlock( lock ); 85 return false; 86 } 87 88 bool on_selected( future(T) &, select_node & ) { return true; } // for waituntil statement 89 90 // PUBLIC 91 92 // General 93 94 void ?{}( future_node$(T) & fut, thread$ * blocked_thread, T * my_result ) { 95 ((select_node &)fut){ blocked_thread }; 96 fut.my_result = my_result; 97 } 98 99 void ?{}( future(T) & fut ) with( fut ) { 100 // waiters{}; 53 101 except = 0p; 54 state = FUTURE_EMPTY ;102 state = FUTURE_EMPTY$; 55 103 lock{}; 56 104 } 57 105 58 void ^?{}( future(T) & this ) with( this) {106 void ^?{}( future(T) & fut ) with( fut ) { 59 107 free( except ); 60 108 } 61 109 62 // Reset future back to original state 63 void reset( future(T) & this ) with(this) { 64 lock( lock ); 65 if ( ! isEmpty( waiters ) ) 66 abort("Attempting to reset a future with blocked waiters"); 67 state = FUTURE_EMPTY; 68 free( except ); 69 except = 0p; 70 unlock( lock ); 71 } 72 73 // check if the future is available 74 // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected 75 bool available( future(T) & this ) { return __atomic_load_n( &this.state, __ATOMIC_RELAXED ); } 76 77 78 // memcpy wrapper to help copy values 79 void copy_T$( T & from, T & to ) { 80 memcpy((void *)&to, (void *)&from, sizeof(T)); 81 } 82 83 bool fulfil$( future(T) & this ) with(this) { // helper 84 bool ret_val = ! isEmpty( waiters ); 85 state = FUTURE_FULFILLED; 86 while ( ! isEmpty( waiters ) ) { 87 if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case 88 break; // if handle_OR returns false then waiters is empty so break 89 select_node &s = remove_first( waiters ); 90 91 if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks 92 copy_T$( result, *(((future_node(T) &)s).my_result) ); 93 94 wake_one( waiters, s ); 95 } 96 unlock( lock ); 97 return ret_val; 98 } 99 100 // Fulfil the future, returns whether or not someone was unblocked 101 bool fulfil( future(T) & this, T val ) with(this) { 102 lock( lock ); 103 if ( state != FUTURE_EMPTY ) 104 abort("Attempting to fulfil a future that has already been fulfilled"); 105 106 copy_T$( val, result ); 107 return fulfil$( this ); 108 } 109 110 bool ?()( future(T) & this, T val ) { // alternate interface 111 return fulfil( this, val ); 112 } 113 114 // Load an exception to the future, returns whether or not someone was unblocked 115 bool fulfil( future(T) & this, exception_t * ex ) with(this) { 116 lock( lock ); 117 if ( state != FUTURE_EMPTY ) 118 abort("Attempting to fulfil a future that has already been fulfilled"); 119 120 except = ( exception_t * ) malloc( ex->virtual_table->size ); 121 ex->virtual_table->copy( except, ex ); 122 return fulfil$( this ); 123 } 124 125 bool ?()( future(T) & this, exception_t * ex ) { // alternate interface 126 return fulfil( this, ex ); 127 } 128 129 // Wait for the future to be fulfilled 130 // Also return whether the thread had to block or not 131 [T, bool] get( future(T) & this ) with( this ) { 110 // Used by Client 111 112 // PRIVATE 113 114 // Return a value/exception from the future. 115 T get$( future(T) & fut ) with( fut ) { // helper 132 116 void exceptCheck() { // helper 133 117 if ( except ) { … … 138 122 } 139 123 } 140 141 lock( lock );142 124 T ret_val; 143 if ( state == FUTURE_FULFILLED ) { 125 126 // LOCK ACQUIRED IN PUBLIC get 127 if ( state == FUTURE_FULFILLED$ ) { 144 128 exceptCheck(); 145 copy_T$( re sult, ret_val);129 copy_T$( ret_val, result ); 146 130 unlock( lock ); 147 return [ret_val, false];148 } 149 150 future_node (T) node = { active_thread(), &ret_val };131 return ret_val; 132 } 133 134 future_node$(T) node = { active_thread(), &ret_val }; 151 135 insert_last( waiters, ((select_node &)node) ); 152 136 unlock( lock ); 153 137 park( ); 154 138 exceptCheck(); 155 156 return [ret_val, true]; 157 } 158 159 // Wait for the future to be fulfilled 160 T get( future(T) & this ) { 161 [T, bool] tt; 162 tt = get(this); 163 return tt.0; 164 } 165 166 T ?()( future(T) & this ) { // alternate interface 167 return get( this ); 168 } 169 170 // Gets value if it is available and returns [ val, true ] 171 // otherwise returns [ default_val, false] 172 // will not block 173 [T, bool] try_get( future(T) & this ) with(this) { 139 return ret_val; 140 } 141 142 // PUBLIC 143 144 bool available( future( T ) & fut ) { return __atomic_load_n( &fut.state, __ATOMIC_RELAXED ); } // future result available ? 145 146 // Return a value/exception from the future. 147 [T, bool] get( future(T) & fut ) with( fut ) { 148 lock( lock ); 149 bool ret = state == FUTURE_EMPTY$; 150 return [ get$( fut ), ret ]; 151 } 152 153 T get( future(T) & fut ) with( fut ) { 154 lock( lock ); 155 return get$( fut ); 156 } 157 T ?()( future(T) & fut ) { return get( fut ); } // alternate interface 158 159 // Non-blocking get: true => return defined value, false => value return undefined. 160 [T, bool] try_get( future(T) & fut ) with( fut ) { 174 161 lock( lock ); 175 162 T ret_val; 176 if ( state == FUTURE_FULFILLED ) {177 copy_T$( re sult, ret_val);163 if ( state == FUTURE_FULFILLED$ ) { 164 copy_T$( ret_val, result ); 178 165 unlock( lock ); 179 166 return [ret_val, true]; … … 183 170 } 184 171 185 bool register_select( future(T) & this, select_node & s ) with(this) { 186 lock( lock ); 187 188 // check if we can complete operation. If so race to establish winner in special OR case 189 if ( !s.park_counter && state != FUTURE_EMPTY ) { 190 if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering 191 unlock( lock ); 192 return false; 193 } 194 } 195 196 // future not ready -> insert select node and return 197 if ( state == FUTURE_EMPTY ) { 198 insert_last( waiters, s ); 199 unlock( lock ); 200 return false; 201 } 202 203 __make_select_node_available( s ); 204 unlock( lock ); 205 return true; 206 } 207 208 bool unregister_select( future(T) & this, select_node & s ) with(this) { 209 if ( ! isListed( s ) ) return false; 210 lock( lock ); 211 if ( isListed( s ) ) remove( s ); 212 unlock( lock ); 213 return false; 214 } 215 216 bool on_selected( future(T) &, select_node & ) { return true; } 217 } 218 } 172 // Used by Server 173 174 // PRIVATE 175 176 bool fulfil$( future(T) & fut ) with( fut ) { // helper 177 bool ret_val = ! isEmpty( waiters ); 178 state = FUTURE_FULFILLED$; 179 while ( ! isEmpty( waiters ) ) { 180 if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case 181 break; // if handle_OR returns false then waiters is empty so break 182 select_node &s = remove_first( waiters ); 183 184 if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks 185 copy_T$( *(((future_node$(T) &)s).my_result), result ); 186 187 wake_one( waiters, s ); 188 } 189 unlock( lock ); 190 return ret_val; 191 } 192 193 // PUBLIC 194 195 // Load a value/exception into the future, returns whether or not waiting threads. 196 bool fulfil( future(T) & fut, T val ) with( fut ) { 197 lock( lock ); 198 if ( state != FUTURE_EMPTY$ ) abort("Attempting to fulfil a future that has already been fulfilled"); 199 copy_T$( result, val ); 200 return fulfil$( fut ); 201 } 202 bool ?()( future(T) & fut, T val ) { return fulfil( fut, val ); } // alternate interface 203 204 bool fulfil( future(T) & fut, exception_t * ex ) with( fut ) { 205 lock( lock ); 206 if ( state != FUTURE_EMPTY$ ) abort( "Attempting to fulfil a future that has already been fulfilled" ); 207 except = ( exception_t * ) malloc( ex->virtual_table->size ); 208 ex->virtual_table->copy( except, ex ); 209 return fulfil$( fut ); 210 } 211 bool ?()( future(T) & fut, exception_t * ex ) { return fulfil( fut, ex ); } // alternate interface 212 213 void reset( future(T) & fut ) with( fut ) { // mark future as empty (for reuse) 214 lock( lock ); 215 if ( ! isEmpty( waiters ) ) abort( "Attempting to reset a future with blocked waiters" ); 216 state = FUTURE_EMPTY$; 217 free( except ); 218 except = 0p; 219 unlock( lock ); 220 } 221 } // static inline 222 } // forall( T ) 219 223 220 224 //-------------------------------------------------------------------------------------------------------- … … 224 228 225 229 forall( T ) { 230 // PRIVATE 231 226 232 struct future_rc_impl$ { 227 233 futex_mutex lock; // concurrent protection … … 231 237 232 238 static inline { 233 void incRef$( future_rc_impl$( T ) & impl ) with( impl ) { 234 __atomic_fetch_add( &refCnt, 1, __ATOMIC_RELAXED ); 235 // lock( lock ); 236 // refCnt += 1; 237 // unlock( lock ); 239 size_t incRef$( future_rc_impl$( T ) & impl ) with( impl ) { 240 return __atomic_fetch_add( &refCnt, 1, __ATOMIC_SEQ_CST ); 238 241 } // incRef$ 239 242 240 bool decRef$( future_rc_impl$( T ) & impl ) with( impl ) { 241 return __atomic_fetch_add( &refCnt, -1, __ATOMIC_RELAXED ) == 1; 242 // lock( lock ); 243 // refCnt -= 1; 244 // bool ret = refCnt == 0; 245 // unlock( lock ); 246 // return ret; 243 size_t decRef$( future_rc_impl$( T ) & impl ) with( impl ) { 244 return __atomic_fetch_add( &refCnt, -1, __ATOMIC_SEQ_CST ); 247 245 } // decRef$ 248 246 249 247 void ?{}( future_rc_impl$( T ) & frc ) with( frc ) { 250 lock{}; // intialization 251 refCnt = 1; 248 refCnt = 1; // count initial object 252 249 } // ?{} 253 254 void ^?{}( future_rc_impl$( T ) & frc ) with( frc ) {255 decRef$( frc );256 } // ^?{}257 250 } // static inline 258 251 252 // PUBLIC 253 259 254 struct future_rc { 260 255 future_rc_impl$(T) * impl; 261 256 }; // future_rc 262 __CFA_SELECT_GET_TYPE( future_rc(T) ); 257 __CFA_SELECT_GET_TYPE( future_rc(T) ); // magic 263 258 264 259 static inline { 265 void ?{}( future_rc( T ) & frc ) with( frc ) { 260 // PRIVATE 261 262 bool register_select( future_rc(T) & frc, select_node & s ) with( frc ) { // for waituntil statement 263 return register_select( frc.impl->fut, s ); 264 } 265 266 bool unregister_select( future_rc(T) & frc, select_node & s ) with( frc ) { // for waituntil statement 267 return unregister_select( frc.impl->fut, s ); 268 } 269 270 bool on_selected( future_rc(T) &, select_node & ) { return true; } // for waituntil statement 271 272 // PUBLIC 273 274 // General 275 276 void ?{}( future_rc( T ) & frc ) with( frc ) { // default constructor 266 277 impl = new(); 267 278 } // ?{} 268 279 269 void ?{}( future_rc( T ) & to, future_rc( T ) & from ) with( to ) { 280 void ?{}( future_rc( T ) & to, future_rc( T ) & from ) with( to ) { // copy constructor 270 281 impl = from.impl; // point at new impl 271 282 incRef$( *impl ); … … 273 284 274 285 void ^?{}( future_rc( T ) & frc ) with( frc ) { 275 if ( decRef$( *impl ) ) { delete( impl ); impl = 0p; }286 if ( decRef$( *impl ) == 1 ) { delete( impl ); impl = 0p; } 276 287 } // ^?{} 277 288 278 289 future_rc( T ) & ?=?( future_rc( T ) & lhs, future_rc( T ) & rhs ) with( lhs ) { 279 290 if ( impl == rhs.impl ) return lhs; // self assignment ? 280 if ( decRef$( *impl ) ) { delete( impl ); impl = 0p; } // no references=> delete current impl291 if ( decRef$( *impl ) == 1 ) { delete( impl ); impl = 0p; } // no references ? => delete current impl 281 292 impl = rhs.impl; // point at new impl 282 293 incRef$( *impl ); // and increment reference count … … 284 295 } // ?+? 285 296 286 bool register_select( future_rc(T) & this, select_node & s ) with( this ) { 287 return register_select( this.impl->fut, s ); 288 } 289 290 bool unregister_select( future_rc(T) & this, select_node & s ) with( this ) { 291 return unregister_select( this.impl->fut, s ); 292 } 293 294 bool on_selected( future_rc(T) &, select_node & ) { return true; } 295 296 // USED BY CLIENT 297 // Used by Client 297 298 298 299 bool available( future_rc( T ) & frc ) { return available( frc.impl->fut ); } // future result available ? 299 300 300 bool fulfil( future_rc(T) & frc, T val ) with( frc ) { return fulfil( impl->fut, val ); } 301 // Return a value/exception from the future. 302 [T, bool] get( future_rc(T) & frc ) with( frc ) { return get( impl->fut ); } // return future value 303 T get( future_rc(T) & frc ) with( frc ) { return get( impl->fut ); } // return future value 304 T ?()( future_rc(T) & frc ) with( frc ) { return get( frc ); } // alternate interface 305 [T, bool] try_get( future_rc(T) & frc ) with( frc ) { return try_get( impl->fut ); } 306 307 int ?==?( future_rc( T ) & lhs, future_rc( T ) & rhs ) { return lhs.impl == rhs.impl; } // referential equality 308 309 // Used by Server 310 311 // Load a value/exception into the future, returns whether or not waiting threads. 312 bool fulfil( future_rc(T) & frc, T val ) with( frc ) { return fulfil( impl->fut, val ); } // copy-in future value 301 313 bool ?()( future_rc(T) & frc, T val ) { return fulfil( frc, val ); } // alternate interface 302 314 303 int ?==?( future_rc( T ) & lhs, future_rc( T ) & rhs ) { return lhs.impl == rhs.impl; } // referential equality 304 305 // USED BY SERVER 306 307 T get( future_rc(T) & frc ) with( frc ) { return get( impl->fut ); } 308 T ?()( future_rc(T) & frc ) with( frc ) { return get( frc ); } // alternate interface 309 310 bool fulfil( future_rc(T) & frc, exception_t * ex ) with( frc ) { return fulfil( impl->fut, ex ); } 315 bool fulfil( future_rc(T) & frc, exception_t * ex ) with( frc ) { return fulfil( impl->fut, ex ); } // insert future exception 311 316 bool ?()( future_rc(T) & frc, exception_t * ex ) { return fulfil( frc, ex ); } // alternate interface 312 317 … … 357 362 T wait( single_future(T) & this ) { 358 363 [T, bool] tt; 359 tt = wait( this);364 tt = wait( this ); 360 365 return tt.0; 361 366 } … … 419 424 // Wait for the future to be fulfilled 420 425 T wait( multi_future(T) & this ) { 421 return wait( this).0;426 return wait( this ).0; 422 427 } 423 428 }
Note:
See TracChangeset
for help on using the changeset viewer.