- Timestamp:
- Jan 20, 2023, 1:25:37 PM (3 years ago)
- Branches:
- ADT, ast-experimental, master
- Children:
- 79a6b17, cd5eb4b
- Parents:
- 466787a (diff), a0d1f1c (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - Location:
- libcfa
- Files:
-
- 2 added
- 5 edited
-
configure.ac (modified) (2 diffs)
-
src/Makefile.am (modified) (1 diff)
-
src/concurrency/channel.hfa (added)
-
src/concurrency/clib/cfathread.cfa (modified) (2 diffs)
-
src/concurrency/future.hfa (modified) (5 diffs)
-
src/concurrency/locks.hfa (modified) (13 diffs)
-
src/concurrency/select.hfa (added)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/configure.ac
r466787a rad861ef 122 122 AC_PROG_CC 123 123 AM_PROG_AS 124 AC_PROG_LIBTOOL 124 LT_INIT 125 125 AC_PROG_INSTALL 126 126 AC_PROG_MAKE_SET … … 246 246 AC_CONFIG_HEADERS(prelude/defines.hfa) 247 247 248 AC_OUTPUT ()248 AC_OUTPUT 249 249 250 250 # Final text -
libcfa/src/Makefile.am
r466787a rad861ef 113 113 concurrency/once.hfa \ 114 114 concurrency/kernel/fwd.hfa \ 115 concurrency/mutex_stmt.hfa 115 concurrency/mutex_stmt.hfa \ 116 concurrency/select.hfa \ 117 concurrency/channel.hfa 116 118 117 119 inst_thread_headers_src = \ -
libcfa/src/concurrency/clib/cfathread.cfa
r466787a rad861ef 439 439 // Mutex 440 440 struct cfathread_mutex { 441 linear_backoff_then_block_lock impl;441 exp_backoff_then_block_lock impl; 442 442 }; 443 443 int cfathread_mutex_init(cfathread_mutex_t *restrict mut, const cfathread_mutexattr_t *restrict) __attribute__((nonnull (1))) { *mut = new(); return 0; } … … 454 454 // Condition 455 455 struct cfathread_condition { 456 condition_variable( linear_backoff_then_block_lock) impl;456 condition_variable(exp_backoff_then_block_lock) impl; 457 457 }; 458 458 int cfathread_cond_init(cfathread_cond_t *restrict cond, const cfathread_condattr_t *restrict) __attribute__((nonnull (1))) { *cond = new(); return 0; } -
libcfa/src/concurrency/future.hfa
r466787a rad861ef 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // io/types.hfa --8 // 9 // Author : Thierry Delisle & Peiran Hong 7 // concurrency/future.hfa -- 8 // 9 // Author : Thierry Delisle & Peiran Hong & Colby Parsons 10 10 // Created On : Wed Jan 06 17:33:18 2021 11 11 // Last Modified By : … … 14 14 // 15 15 16 #pragma once16 // #pragma once 17 17 18 18 #include "bits/locks.hfa" 19 19 #include "monitor.hfa" 20 20 #include "select.hfa" 21 22 //---------------------------------------------------------------------------- 23 // future 24 // I don't use future_t here since I need to use a lock for this future 25 // since it supports multiple consumers 26 // future_t is lockfree and uses atomics which aren't needed given we use locks here 21 27 forall( T ) { 28 // enum(int) { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards 29 30 // temporary enum replacement 31 const int FUTURE_EMPTY = 0; 32 const int FUTURE_FULFILLED = 1; 33 22 34 struct future { 35 int state; 36 T result; 37 dlist( select_node ) waiters; 38 futex_mutex lock; 39 }; 40 41 struct future_node { 42 inline select_node; 43 T * my_result; 44 }; 45 46 // C_TODO: perhaps allow exceptions to be inserted like uC++? 47 48 static inline { 49 50 void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) { 51 ((select_node &)this){ blocked_thread }; 52 this.my_result = my_result; 53 } 54 55 void ?{}(future(T) & this) { 56 this.waiters{}; 57 this.state = FUTURE_EMPTY; 58 this.lock{}; 59 } 60 61 // Reset future back to original state 62 void reset(future(T) & this) with(this) 63 { 64 lock( lock ); 65 if( ! waiters`isEmpty ) 66 abort("Attempting to reset a future with blocked waiters"); 67 state = FUTURE_EMPTY; 68 unlock( lock ); 69 } 70 71 // check if the future is available 72 // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected 73 bool available( future(T) & this ) { return this.state; } 74 75 76 // memcpy wrapper to help copy values 77 void copy_T( T & from, T & to ) { 78 memcpy((void *)&to, (void *)&from, sizeof(T)); 79 } 80 81 // internal helper to signal waiters off of the future 82 void _internal_flush( future(T) & this ) with(this) { 83 while( ! waiters`isEmpty ) { 84 select_node &s = try_pop_front( waiters ); 85 86 if ( s.race_flag == 0p ) 87 // poke in result so that woken threads do not need to reacquire any locks 88 // *(((future_node(T) &)s).my_result) = result; 89 copy_T( result, *(((future_node(T) &)s).my_result) ); 90 else if ( !install_select_winner( s, &this ) ) continue; 91 92 // only unpark if future is not selected 93 // or if it is selected we only unpark if we win the race 94 unpark( s.blocked_thread ); 95 } 96 } 97 98 // Fulfil the future, returns whether or not someone was unblocked 99 bool fulfil( future(T) & this, T & val ) with(this) { 100 lock( lock ); 101 if( state != FUTURE_EMPTY ) 102 abort("Attempting to fulfil a future that has already been fulfilled"); 103 104 copy_T( val, result ); 105 106 bool ret_val = ! waiters`isEmpty; 107 state = FUTURE_FULFILLED; 108 _internal_flush( this ); 109 unlock( lock ); 110 return ret_val; 111 } 112 113 // Wait for the future to be fulfilled 114 // Also return whether the thread had to block or not 115 [T, bool] get( future(T) & this ) with( this ) { 116 lock( lock ); 117 T ret_val; 118 if( state == FUTURE_FULFILLED ) { 119 copy_T( result, ret_val ); 120 unlock( lock ); 121 return [ret_val, false]; 122 } 123 124 future_node(T) node = { active_thread(), &ret_val }; 125 insert_last( waiters, ((select_node &)node) ); 126 unlock( lock ); 127 park( ); 128 129 return [ret_val, true]; 130 } 131 132 // Wait for the future to be fulfilled 133 T get( future(T) & this ) { 134 [T, bool] tt; 135 tt = get(this); 136 return tt.0; 137 } 138 139 // Gets value if it is available and returns [ val, true ] 140 // otherwise returns [ default_val, false] 141 // will not block 142 [T, bool] try_get( future(T) & this ) with(this) { 143 lock( lock ); 144 T ret_val; 145 if( state == FUTURE_FULFILLED ) { 146 copy_T( result, ret_val ); 147 unlock( lock ); 148 return [ret_val, true]; 149 } 150 unlock( lock ); 151 152 return [ret_val, false]; 153 } 154 155 void * register_select( future(T) & this, select_node & s ) with(this) { 156 lock( lock ); 157 158 // future not ready -> insert select node and return 0p 159 if( state == FUTURE_EMPTY ) { 160 insert_last( waiters, s ); 161 unlock( lock ); 162 return 0p; 163 } 164 165 // future ready and we won race to install it as the select winner return 1p 166 if ( install_select_winner( s, &this ) ) { 167 unlock( lock ); 168 return 1p; 169 } 170 171 unlock( lock ); 172 // future ready and we lost race to install it as the select winner 173 return 2p; 174 } 175 176 void unregister_select( future(T) & this, select_node & s ) with(this) { 177 lock( lock ); 178 if ( s`isListed ) remove( s ); 179 unlock( lock ); 180 } 181 182 } 183 } 184 185 //-------------------------------------------------------------------------------------------------------- 186 // These futures below do not support select statements so they may not be as useful as 'future' 187 // however the 'single_future' is cheap and cheerful and is most likely more performant than 'future' 188 // since it uses raw atomics and no locks afaik 189 // 190 // As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future' 191 // since it is monitor based and also is not compatible with select statements 192 //-------------------------------------------------------------------------------------------------------- 193 194 forall( T ) { 195 struct single_future { 23 196 inline future_t; 24 197 T result; … … 27 200 static inline { 28 201 // Reset future back to original state 29 void reset( future(T) & this) { reset( (future_t&)this ); }202 void reset(single_future(T) & this) { reset( (future_t&)this ); } 30 203 31 204 // check if the future is available 32 bool available( future(T) & this ) { return available( (future_t&)this ); }205 bool available( single_future(T) & this ) { return available( (future_t&)this ); } 33 206 34 207 // Mark the future as abandoned, meaning it will be deleted by the server 35 208 // This doesn't work beause of the potential need for a destructor 36 void abandon( future(T) & this );209 void abandon( single_future(T) & this ); 37 210 38 211 // Fulfil the future, returns whether or not someone was unblocked 39 thread$ * fulfil( future(T) & this, T result ) {212 thread$ * fulfil( single_future(T) & this, T result ) { 40 213 this.result = result; 41 214 return fulfil( (future_t&)this ); … … 44 217 // Wait for the future to be fulfilled 45 218 // Also return whether the thread had to block or not 46 [T, bool] wait( future(T) & this ) {219 [T, bool] wait( single_future(T) & this ) { 47 220 bool r = wait( (future_t&)this ); 48 221 return [this.result, r]; … … 50 223 51 224 // Wait for the future to be fulfilled 52 T wait( future(T) & this ) {225 T wait( single_future(T) & this ) { 53 226 [T, bool] tt; 54 227 tt = wait(this); -
libcfa/src/concurrency/locks.hfa
r466787a rad861ef 38 38 #include <unistd.h> 39 39 40 // undef to make a number of the locks not reacquire upon waking from a condlock 41 #define REACQ 1 40 // C_TODO: cleanup this and locks.cfa 41 // - appropriate separation of interface and impl 42 // - clean up unused/unneeded locks 43 // - change messy big blocking lock from inheritance to composition to remove need for flags 42 44 43 45 //----------------------------------------------------------------------------- … … 249 251 static inline void on_notify(clh_lock & this, struct thread$ * t ) { unpark(t); } 250 252 static inline size_t on_wait(clh_lock & this) { unlock(this); return 0; } 251 static inline void on_wakeup(clh_lock & this, size_t recursion ) { 252 #ifdef REACQ 253 lock(this); 254 #endif 255 } 256 257 258 //----------------------------------------------------------------------------- 259 // Linear backoff Spinlock 260 struct linear_backoff_then_block_lock { 253 static inline void on_wakeup(clh_lock & this, size_t recursion ) { lock(this); } 254 255 256 //----------------------------------------------------------------------------- 257 // Exponential backoff then block lock 258 struct exp_backoff_then_block_lock { 261 259 // Spin lock used for mutual exclusion 262 260 __spinlock_t spinlock; … … 269 267 }; 270 268 271 static inline void ?{}( linear_backoff_then_block_lock & this ) {269 static inline void ?{}( exp_backoff_then_block_lock & this ) { 272 270 this.spinlock{}; 273 271 this.blocked_threads{}; 274 272 this.lock_value = 0; 275 273 } 276 static inline void ^?{}( linear_backoff_then_block_lock & this ) {}277 // static inline void ?{}( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;278 // static inline void ?=?( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;279 280 static inline bool internal_try_lock( linear_backoff_then_block_lock & this, size_t & compare_val) with(this) {274 static inline void ^?{}( exp_backoff_then_block_lock & this ) {} 275 // static inline void ?{}( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void; 276 // static inline void ?=?( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void; 277 278 static inline bool internal_try_lock(exp_backoff_then_block_lock & this, size_t & compare_val) with(this) { 281 279 if (__atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) { 282 280 return true; … … 285 283 } 286 284 287 static inline bool try_lock( linear_backoff_then_block_lock & this) { size_t compare_val = 0; return internal_try_lock(this, compare_val); }288 289 static inline bool try_lock_contention( linear_backoff_then_block_lock & this) with(this) {285 static inline bool try_lock(exp_backoff_then_block_lock & this) { size_t compare_val = 0; return internal_try_lock(this, compare_val); } 286 287 static inline bool try_lock_contention(exp_backoff_then_block_lock & this) with(this) { 290 288 if (__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE) == 0) { 291 289 return true; … … 294 292 } 295 293 296 static inline bool block( linear_backoff_then_block_lock & this) with(this) {294 static inline bool block(exp_backoff_then_block_lock & this) with(this) { 297 295 lock( spinlock __cfaabi_dbg_ctx2 ); // TODO change to lockfree queue (MPSC) 298 296 if (lock_value != 2) { … … 306 304 } 307 305 308 static inline void lock( linear_backoff_then_block_lock & this) with(this) {306 static inline void lock(exp_backoff_then_block_lock & this) with(this) { 309 307 size_t compare_val = 0; 310 308 int spin = 4; … … 324 322 } 325 323 326 static inline void unlock( linear_backoff_then_block_lock & this) with(this) {324 static inline void unlock(exp_backoff_then_block_lock & this) with(this) { 327 325 if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return; 328 326 lock( spinlock __cfaabi_dbg_ctx2 ); … … 332 330 } 333 331 334 static inline void on_notify(linear_backoff_then_block_lock & this, struct thread$ * t ) { unpark(t); } 335 static inline size_t on_wait(linear_backoff_then_block_lock & this) { unlock(this); return 0; } 336 static inline void on_wakeup(linear_backoff_then_block_lock & this, size_t recursion ) { 337 #ifdef REACQ 338 lock(this); 339 #endif 340 } 332 static inline void on_notify(exp_backoff_then_block_lock & this, struct thread$ * t ) { unpark(t); } 333 static inline size_t on_wait(exp_backoff_then_block_lock & this) { unlock(this); return 0; } 334 static inline void on_wakeup(exp_backoff_then_block_lock & this, size_t recursion ) { lock(this); } 341 335 342 336 //----------------------------------------------------------------------------- … … 390 384 391 385 static inline void on_notify(fast_block_lock & this, struct thread$ * t ) with(this) { 392 #ifdef REACQ 393 lock( lock __cfaabi_dbg_ctx2 ); 394 insert_last( blocked_threads, *t ); 395 unlock( lock ); 396 #else 397 unpark(t); 398 #endif 386 lock( lock __cfaabi_dbg_ctx2 ); 387 insert_last( blocked_threads, *t ); 388 unlock( lock ); 399 389 } 400 390 static inline size_t on_wait(fast_block_lock & this) { unlock(this); return 0; } … … 553 543 } 554 544 static inline size_t on_wait(spin_queue_lock & this) { unlock(this); return 0; } 555 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { 556 #ifdef REACQ 557 lock(this); 558 #endif 559 } 545 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { lock(this); } 560 546 561 547 … … 598 584 static inline void on_notify(mcs_block_spin_lock & this, struct thread$ * t ) { unpark(t); } 599 585 static inline size_t on_wait(mcs_block_spin_lock & this) { unlock(this); return 0; } 600 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) { 601 #ifdef REACQ 602 lock(this); 603 #endif 604 } 586 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) {lock(this); } 605 587 606 588 //----------------------------------------------------------------------------- … … 640 622 641 623 static inline void on_notify(block_spin_lock & this, struct thread$ * t ) with(this.lock) { 642 #ifdef REACQ643 624 // first we acquire internal fast_block_lock 644 625 lock( lock __cfaabi_dbg_ctx2 ); … … 652 633 unlock( lock ); 653 634 654 #endif655 635 unpark(t); 656 657 636 } 658 637 static inline size_t on_wait(block_spin_lock & this) { unlock(this); return 0; } 659 638 static inline void on_wakeup(block_spin_lock & this, size_t recursion ) with(this) { 660 #ifdef REACQ661 639 // now we acquire the entire block_spin_lock upon waking up 662 640 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); 663 641 __atomic_store_n(&held, true, __ATOMIC_RELEASE); 664 642 unlock( lock ); // Now we release the internal fast_spin_lock 665 #endif666 643 } 667 644
Note:
See TracChangeset
for help on using the changeset viewer.