Changeset 8cbe732 for libcfa/src/concurrency
- Timestamp:
- Oct 13, 2023, 7:13:21 PM (2 years ago)
- Branches:
- master
- Children:
- a97b9ed, bab2917
- Parents:
- 85034ed (diff), 0bf0b978 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- libcfa/src/concurrency
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/channel.hfa
r85034ed r8cbe732 130 130 static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) { 131 131 memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work 132 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 132 133 wake_one( cons ); 133 134 } … … 136 137 static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) { 137 138 memcpy( (void *)&retval, prods`first.extra, sizeof(T) ); 139 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 138 140 wake_one( prods ); 139 141 } -
libcfa/src/concurrency/cofor.cfa
r85034ed r8cbe732 4 4 // cofor ( uC++ COFOR ) 5 5 6 thread co _runner {6 thread cofor_runner { 7 7 ssize_t low, high; 8 8 __cofor_body_t loop_body; 9 9 }; 10 10 11 static void ?{}( co _runner & this, ssize_t low, ssize_t high, __cofor_body_t loop_body ) {11 static void ?{}( cofor_runner & this, ssize_t low, ssize_t high, __cofor_body_t loop_body ) { 12 12 this.low = low; 13 13 this.high = high; … … 15 15 } 16 16 17 void main( co _runner & this ) with( this ) {17 void main( cofor_runner & this ) with( this ) { 18 18 for ( ssize_t i = low; i < high; i++ ) 19 19 loop_body(i); 20 20 } 21 21 22 void cofor( ssize_t low, ssize_t high, __cofor_body_t loop_body ) libcfa_public {22 void __Cofor__( ssize_t low, ssize_t high, __cofor_body_t loop_body ) libcfa_public { 23 23 ssize_t range = high - low; 24 24 if ( range <= 0 ) return; … … 29 29 ssize_t i = 0; 30 30 ssize_t stride_iter = low; 31 co _runner * runners[ threads ];31 cofor_runner * runners[ threads ]; 32 32 for ( i; threads ) { 33 33 runners[i] = alloc(); … … 45 45 } 46 46 47 //////////////////////////////////////////////////////////////////////////////////////////48 // parallel (COBEGIN/COEND)49 47 50 thread para_runner {51 parallel_stmt_t body;52 void * arg;53 };54 55 static void ?{}( para_runner & this, parallel_stmt_t body, void * arg ) {56 this.body = body;57 this.arg = arg;58 }59 60 void main( para_runner & this ) with( this ) { body( arg ); }61 62 void parallel( parallel_stmt_t * stmts, void ** args, size_t num ) libcfa_public {63 para_runner * runners[ num ];64 for ( i; num )65 (*(runners[i] = malloc())){ stmts[i], args[i] };66 for ( i; num )67 delete( runners[i] );68 }69 -
libcfa/src/concurrency/cofor.hfa
r85034ed r8cbe732 5 5 typedef void (*__cofor_body_t)( ssize_t ); 6 6 7 void cofor( ssize_t low, ssize_t high, __cofor_body_t loop_body );7 void __Cofor__( ssize_t low, ssize_t high, __cofor_body_t loop_body ); 8 8 9 9 #define COFOR( lidname, low, high, loopbody ) \ … … 12 12 loopbody \ 13 13 } \ 14 cofor( low, high, __CFA_loopLambda__ ); \14 __Cofor__( low, high, __CFA_loopLambda__ ); \ 15 15 } 16 16 17 17 ////////////////////////////////////////////////////////////////////////////////////////// 18 // parallel (COBEGIN/COEND) 19 typedef void (*parallel_stmt_t)( void * ); 18 // corun 20 19 21 void parallel( parallel_stmt_t * stmts, void ** args, size_t num ); 20 // 21 typedef void (*__CFA_corun_lambda_t)( void ); 22 23 // used to run a corun statement in parallel 24 thread co_runner { 25 __CFA_corun_lambda_t body; 26 }; 27 28 // wraps a co_runner to provide RAII deallocation 29 struct runner_block { 30 co_runner * runner; 31 }; 32 static inline void ?{}( co_runner & this, __CFA_corun_lambda_t body ) { this.body = body; } 33 34 void main( co_runner & this ) with( this ) { body(); } 35 36 static inline void ?{}( runner_block & this ) {} 37 static inline void ?{}( runner_block & this, __CFA_corun_lambda_t body ) { 38 (*(this.runner = malloc())){ body }; 39 } 40 41 static inline void ^?{}( runner_block & this ) { 42 delete( this.runner ); 43 } 44 -
libcfa/src/concurrency/coroutine.cfa
r85034ed r8cbe732 343 343 344 344 bool poll() libcfa_public { return poll( active_coroutine() ); } 345 void enable_ehm() libcfa_public { active_coroutine()->ehm_state.ehm_enabled = true; } 346 void disable_ehm() libcfa_public { active_coroutine()->ehm_state.ehm_enabled = false; } 347 bool checked_poll() libcfa_public { return active_coroutine()->ehm_state.ehm_enabled ? poll( active_coroutine() ) : false; } 345 348 coroutine$ * resumer() libcfa_public { return active_coroutine()->last; } 346 349 coroutine$ * first_resumer() libcfa_public { return active_coroutine()->starter; } -
libcfa/src/concurrency/coroutine.hfa
r85034ed r8cbe732 224 224 225 225 // non local ehm and coroutine utility routines 226 void enable_ehm(); 227 void disable_ehm(); 226 228 bool poll( coroutine$ * cor ); 227 229 bool poll(); 230 bool checked_poll(); 228 231 coroutine$ * resumer(); 229 232 coroutine$ * first_resumer(); 230 233 231 234 forall(T & | is_coroutine(T)) { 232 void enable_ehm( T & cor ); 233 void disable_ehm( T & cor ); 235 void enable_ehm( T & cor ); // enable checking non-local exceptions for cor via checked_poll 236 void disable_ehm( T & cor ); // disable checking non-local exceptions for cor via checked_poll 234 237 bool poll( T & cor ); 235 bool checked_poll( T & cor ); 238 bool checked_poll( T & cor ); // check for non-local exceptions while respecting enable/disable 236 239 coroutine$ * resumer( T & cor ); 237 240 coroutine$ * first_resumer( T & cor ); -
libcfa/src/concurrency/kernel/fwd.hfa
r85034ed r8cbe732 118 118 // Yield: yield N times 119 119 static inline void yield( size_t times ) { 120 for ( times ) {120 for ( times ) { 121 121 yield(); 122 122 } … … 136 136 137 137 bool wait(single_sem & this) { 138 for () {138 for () { 139 139 struct thread$ * expected = this.ptr; 140 if (expected == 1p) {141 if (__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {140 if (expected == 1p) { 141 if (__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 142 142 return false; 143 143 } … … 145 145 else { 146 146 /* paranoid */ verify( expected == 0p ); 147 if (__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {147 if (__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 148 148 park(); 149 149 return true; … … 155 155 156 156 bool post(single_sem & this) { 157 for () {157 for () { 158 158 struct thread$ * expected = this.ptr; 159 if (expected == 1p) return false;160 if (expected == 0p) {161 if (__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {159 if (expected == 1p) return false; 160 if (expected == 0p) { 161 if (__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 162 162 return false; 163 163 } 164 164 } 165 165 else { 166 if (__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {166 if (__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 167 167 unpark( expected ); 168 168 return true; … … 195 195 // return true if the thread was parked 196 196 bool wait(oneshot & this) { 197 for () {197 for () { 198 198 struct thread$ * expected = this.ptr; 199 if (expected == oneshot_FULFILLED) return false;200 if (__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {199 if (expected == oneshot_FULFILLED) return false; 200 if (__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 201 201 park(); 202 202 /* paranoid */ verify( this.ptr == oneshot_FULFILLED ); … … 210 210 thread$ * post(oneshot & this, bool do_unpark = true) { 211 211 struct thread$ * got = __atomic_exchange_n( &this.ptr, oneshot_FULFILLED, __ATOMIC_SEQ_CST); 212 if ( got == oneshot_ARMED || got == oneshot_FULFILLED ) return 0p;213 if (do_unpark) unpark( got );212 if ( got == oneshot_ARMED || got == oneshot_FULFILLED ) return 0p; 213 if (do_unpark) unpark( got ); 214 214 return got; 215 215 } … … 255 255 /* paranoid */ verify( wait_ctx.ptr == oneshot_ARMED || wait_ctx.ptr == oneshot_FULFILLED ); 256 256 // The future needs to set the wait context 257 for () {257 for () { 258 258 struct oneshot * expected = this.ptr; 259 259 // Is the future already fulfilled? 260 if (expected == future_FULFILLED) return false; // Yes, just return false (didn't block)260 if (expected == future_FULFILLED) return false; // Yes, just return false (didn't block) 261 261 262 262 // The future is not fulfilled, try to setup the wait context 263 if (__atomic_compare_exchange_n(&this.ptr, &expected, &wait_ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {263 if (__atomic_compare_exchange_n(&this.ptr, &expected, &wait_ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 264 264 return true; 265 265 } … … 276 276 277 277 // attempt to remove the context so it doesn't get consumed. 278 if (__atomic_compare_exchange_n( &this.ptr, &expected, future_ARMED, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {278 if (__atomic_compare_exchange_n( &this.ptr, &expected, future_ARMED, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 279 279 // we still have the original context, then no one else saw it 280 280 return false; … … 282 282 283 283 // expected == ARMED: future was never actually setup, just return 284 if ( expected == future_ARMED ) return false;284 if ( expected == future_ARMED ) return false; 285 285 286 286 // expected == FULFILLED: the future is ready and the context was fully consumed 287 287 // the server won't use the pointer again 288 288 // It is safe to delete (which could happen after the return) 289 if ( expected == future_FULFILLED ) return true;289 if ( expected == future_FULFILLED ) return true; 290 290 291 291 // expected == PROGRESS: the future is ready but the context hasn't fully been consumed 292 292 // spin until it is safe to move on 293 if ( expected == future_PROGRESS ) {293 if ( expected == future_PROGRESS ) { 294 294 while( this.ptr != future_FULFILLED ) Pause(); 295 295 /* paranoid */ verify( this.ptr == future_FULFILLED ); … … 310 310 311 311 // If the future isn't already fulfilled, let the server delete it 312 if ( got == future_ARMED ) return false;312 if ( got == future_ARMED ) return false; 313 313 314 314 // got == PROGRESS: the future is ready but the context hasn't fully been consumed 315 315 // spin until it is safe to move on 316 if ( got == future_PROGRESS ) {316 if ( got == future_PROGRESS ) { 317 317 while( this.ptr != future_FULFILLED ) Pause(); 318 318 got = future_FULFILLED; … … 327 327 // from the server side, mark the future as fulfilled 328 328 // delete it if needed 329 329 330 thread$ * fulfil( future_t & this, bool do_unpark = true ) { 330 for () {331 for () { 331 332 struct oneshot * expected = this.ptr; 332 // was this abandoned? 333 333 334 #if defined(__GNUC__) && __GNUC__ >= 7 334 #pragma GCC diagnostic push 335 #pragma GCC diagnostic ignored "-Wfree-nonheap-object" 335 // SKULLDUGGERY: gcc bug does not handle push/pop for -Wfree-nonheap-object 336 //#pragma GCC diagnostic push 337 #pragma GCC diagnostic ignored "-Wfree-nonheap-object" 336 338 #endif 337 if( expected == future_ABANDONED ) { free( &this ); return 0p; } 339 340 if ( expected == future_ABANDONED ) { free( &this ); return 0p; } 341 338 342 #if defined(__GNUC__) && __GNUC__ >= 7 339 343 //#pragma GCC diagnostic pop 340 344 #endif 341 345 … … 346 350 // If there is no context then we can skip the in progress phase 347 351 struct oneshot * want = expected == future_ARMED ? future_FULFILLED : future_PROGRESS; 348 if (__atomic_compare_exchange_n(&this.ptr, &expected, want, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {349 if ( expected == future_ARMED ) { return 0p; }352 if (__atomic_compare_exchange_n(&this.ptr, &expected, want, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 353 if ( expected == future_ARMED ) { return 0p; } 350 354 thread$ * ret = post( *expected, do_unpark ); 351 355 __atomic_store_n( &this.ptr, future_FULFILLED, __ATOMIC_SEQ_CST); … … 359 363 bool wait( future_t & this ) { 360 364 oneshot temp; 361 if ( !setup(this, temp) ) return false;365 if ( !setup(this, temp) ) return false; 362 366 363 367 // Wait context is setup, just wait on it … … 387 391 // if any are already satisfied return 388 392 for ( i; num_futures ) { 389 if ( !setup(futures[i], temp) ) return futures[i];393 if ( !setup(futures[i], temp) ) return futures[i]; 390 394 } 391 395 … … 413 417 414 418 #define __STATS__(in_kernel, ...) { \ 415 if ( !(in_kernel) ) disable_interrupts(); \416 with ( *__tls_stats() ) { \419 if ( !(in_kernel) ) disable_interrupts(); \ 420 with ( *__tls_stats() ) { \ 417 421 __VA_ARGS__ \ 418 422 } \ 419 if ( !(in_kernel) ) enable_interrupts(); \423 if ( !(in_kernel) ) enable_interrupts(); \ 420 424 } 421 425 #if defined(CFA_HAVE_LINUX_IO_URING_H) 422 426 #define __IO_STATS__(in_kernel, ...) { \ 423 if ( !(in_kernel) ) disable_interrupts(); \424 with ( *__tls_stats() ) { \427 if ( !(in_kernel) ) disable_interrupts(); \ 428 with ( *__tls_stats() ) { \ 425 429 __VA_ARGS__ \ 426 430 } \ 427 if ( !(in_kernel) ) enable_interrupts(); \431 if ( !(in_kernel) ) enable_interrupts(); \ 428 432 } 429 433 #else … … 436 440 } 437 441 } 438 #endif 442 #endif // #endif
Note:
See TracChangeset
for help on using the changeset viewer.