Changeset 4a16ddfa
- Timestamp:
- Nov 17, 2025, 9:13:40 AM (44 hours ago)
- Branches:
- master
- Children:
- f04623f
- Parents:
- b94579a
- File:
-
- 1 edited
-
libcfa/src/concurrency/future.hfa (modified) (10 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/future.hfa
rb94579a r4a16ddfa 7 7 // concurrency/future.hfa -- 8 8 // 9 // Author : Thierry Delisle & Peiran Hong & Colby Parsons 9 // Author : Thierry Delisle & Peiran Hong & Colby Parsons & Peter Buhr 10 10 // Created On : Wed Jan 06 17:33:18 2021 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Tue Nov 4 22:04:42202513 // Update Count : 2312 // Last Modified On : Mon Nov 17 08:58:38 2025 13 // Update Count : 164 14 14 // 15 15 … … 21 21 #include "locks.hfa" 22 22 23 //---------------------------------------------------------------------------- 24 // future 25 // I don't use future_t here as I need to use a lock for this future since it supports multiple consumers. 26 // future_t is lockfree and uses atomics which aren't needed given we use locks here 23 //-------------------------------------------------------------------------------------------------------- 24 // future does not use future_t as it needs a lock to support multiple consumers. future_t is lockfree 25 // and uses atomics which are not needed. 26 //-------------------------------------------------------------------------------------------------------- 27 27 28 forall( T ) { 28 29 enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; … … 43 44 44 45 static inline { 45 46 46 void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) { 47 47 ((select_node &)this){ blocked_thread }; … … 49 49 } 50 50 51 void ?{}( future(T) &this ) {52 this.waiters{};53 this.except = 0p;54 this.state = FUTURE_EMPTY;55 this.lock{};56 }57 58 void ^?{}( future(T) & this ) {59 free( this.except );51 void ?{}( future(T) & this ) with( this ) { 52 waiters{}; 53 except = 0p; 54 state = FUTURE_EMPTY; 55 lock{}; 56 } 57 58 void ^?{}( future(T) & this ) with( this ) { 59 free( except ); 60 60 } 61 61 … … 77 77 78 78 // memcpy wrapper to help copy values 79 void copy_T ( T & from, T & to ) {79 void copy_T$( T & from, T & to ) { 80 80 memcpy((void *)&to, (void *)&from, sizeof(T)); 81 81 } … … 90 90 91 91 if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks 92 copy_T ( result, *(((future_node(T) &)s).my_result) );92 copy_T$( result, *(((future_node(T) &)s).my_result) ); 93 93 94 94 wake_one( waiters, s ); … … 104 104 abort("Attempting to fulfil a future that has already been fulfilled"); 105 105 106 copy_T ( val, result );106 copy_T$( val, result ); 107 107 return fulfil$( this ); 108 108 } … … 143 143 if ( state == FUTURE_FULFILLED ) { 144 144 exceptCheck(); 145 copy_T ( result, ret_val );145 copy_T$( result, ret_val ); 146 146 unlock( lock ); 147 147 return [ret_val, false]; … … 175 175 T ret_val; 176 176 if ( state == FUTURE_FULFILLED ) { 177 copy_T ( result, ret_val );177 copy_T$( result, ret_val ); 178 178 unlock( lock ); 179 179 return [ret_val, true]; 180 180 } 181 181 unlock( lock ); 182 183 182 return [ret_val, false]; 184 183 } … … 220 219 221 220 //-------------------------------------------------------------------------------------------------------- 222 // These futures below do not support select statements so they may not have as many features as 'future' 221 // future_rc uses reference counting to eliminate explicit storage-management and support the waituntil 222 // statement. 223 //-------------------------------------------------------------------------------------------------------- 224 225 forall( T ) { 226 struct future_rc_impl$ { 227 futex_mutex lock; // concurrent protection 228 size_t refCnt; // number of references to future 229 future(T) fut; // underlying future 230 }; // future_rc_impl$ 231 232 static inline { 233 void incRef$( future_rc_impl$( T ) & impl ) with( impl ) { 234 __atomic_fetch_add( &refCnt, 1, __ATOMIC_RELAXED ); 235 // lock( lock ); 236 // refCnt += 1; 237 // unlock( lock ); 238 } // incRef$ 239 240 bool decRef$( future_rc_impl$( T ) & impl ) with( impl ) { 241 return __atomic_fetch_add( &refCnt, -1, __ATOMIC_RELAXED ) == 1; 242 // lock( lock ); 243 // refCnt -= 1; 244 // bool ret = refCnt == 0; 245 // unlock( lock ); 246 // return ret; 247 } // decRef$ 248 249 void ?{}( future_rc_impl$( T ) & frc ) with( frc ) { 250 lock{}; // intialization 251 refCnt = 1; 252 } // ?{} 253 254 void ^?{}( future_rc_impl$( T ) & frc ) with( frc ) { 255 decRef$( frc ); 256 } // ^?{} 257 } // static inline 258 259 struct future_rc { 260 future_rc_impl$(T) * impl; 261 }; // future_rc 262 __CFA_SELECT_GET_TYPE( future_rc(T) ); 263 264 static inline { 265 void ?{}( future_rc( T ) & frc ) with( frc ) { 266 impl = new(); 267 } // ?{} 268 269 void ?{}( future_rc( T ) & to, future_rc( T ) & from ) with( to ) { 270 impl = from.impl; // point at new impl 271 incRef$( *impl ); 272 } // ?{} 273 274 void ^?{}( future_rc( T ) & frc ) with( frc ) { 275 if ( decRef$( *impl ) ) { delete( impl ); impl = 0p; } 276 } // ^?{} 277 278 future_rc( T ) & ?=?( future_rc( T ) & lhs, future_rc( T ) & rhs ) with( lhs ) { 279 if ( impl == rhs.impl ) return lhs; // self assignment ? 280 if ( decRef$( *impl ) ) { delete( impl ); impl = 0p; } // no references => delete current impl 281 impl = rhs.impl; // point at new impl 282 incRef$( *impl ); // and increment reference count 283 return lhs; 284 } // ?+? 285 286 bool register_select( future_rc(T) & this, select_node & s ) with( this ) { 287 return register_select( this.impl->fut, s ); 288 } 289 290 bool unregister_select( future_rc(T) & this, select_node & s ) with( this ) { 291 return unregister_select( this.impl->fut, s ); 292 } 293 294 bool on_selected( future_rc(T) &, select_node & ) { return true; } 295 296 // USED BY CLIENT 297 298 bool available( future_rc( T ) & frc ) { return available( frc.impl->fut ); } // future result available ? 299 300 bool fulfil( future_rc(T) & frc, T val ) with( frc ) { return fulfil( impl->fut, val ); } 301 bool ?()( future_rc(T) & frc, T val ) { return fulfil( frc, val ); } // alternate interface 302 303 int ?==?( future_rc( T ) & lhs, future_rc( T ) & rhs ) { return lhs.impl == rhs.impl; } // referential equality 304 305 // USED BY SERVER 306 307 T get( future_rc(T) & frc ) with( frc ) { return get( impl->fut ); } 308 T ?()( future_rc(T) & frc ) with( frc ) { return get( frc ); } // alternate interface 309 310 bool fulfil( future_rc(T) & frc, exception_t * ex ) with( frc ) { return fulfil( impl->fut, ex ); } 311 bool ?()( future_rc(T) & frc, exception_t * ex ) { return fulfil( frc, ex ); } // alternate interface 312 313 void reset( future_rc(T) & frc ) with( frc ) { reset( impl->fut ); } // mark future as empty (for reuse) 314 } // static inline 315 } // forall( T ) 316 317 //-------------------------------------------------------------------------------------------------------- 318 // These futures below do not support waituntil statements so they may not have as many features as 'future' 223 319 // however the 'single_future' is cheap and cheerful and is most likely more performant than 'future' 224 320 // since it uses raw atomics and no locks 225 321 // 226 322 // As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future' 227 // since it is monitor based and also is not compatible with select statements323 // since it is monitor based and also is not compatible with waituntil statement. 228 324 //-------------------------------------------------------------------------------------------------------- 229 325
Note:
See TracChangeset
for help on using the changeset viewer.