Changeset 454f478 for libcfa/src/concurrency
- Timestamp:
- Jan 20, 2021, 5:35:39 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 9db2c92
- Parents:
- dafbde8
- Location:
- libcfa/src/concurrency
- Files:
-
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io/types.hfa
rdafbde8 r454f478 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // io/types.hfa -- 7 // io/types.hfa -- PRIVATE 8 // Types used by the I/O subsystem 8 9 // 9 10 // Author : Thierry Delisle … … 20 21 } 21 22 22 #include " bits/locks.hfa"23 #include "kernel/fwd.hfa" 23 24 24 25 #if defined(CFA_HAVE_LINUX_IO_URING_H) -
libcfa/src/concurrency/kernel.cfa
rdafbde8 r454f478 224 224 } 225 225 226 V( this->terminated );226 post( this->terminated ); 227 227 228 228 if(this == mainProcessor) { … … 665 665 //============================================================================================= 666 666 //----------------------------------------------------------------------------- 667 // Locks668 void ?{}( semaphore & this, int count = 1 ) {669 (this.lock){};670 this.count = count;671 (this.waiting){};672 }673 void ^?{}(semaphore & this) {}674 675 bool P(semaphore & this) with( this ){676 lock( lock __cfaabi_dbg_ctx2 );677 count -= 1;678 if ( count < 0 ) {679 // queue current task680 append( waiting, active_thread() );681 682 // atomically release spin lock and block683 unlock( lock );684 park();685 return true;686 }687 else {688 unlock( lock );689 return false;690 }691 }692 693 bool V(semaphore & this) with( this ) {694 $thread * thrd = 0p;695 lock( lock __cfaabi_dbg_ctx2 );696 count += 1;697 if ( count <= 0 ) {698 // remove task at head of waiting list699 thrd = pop_head( waiting );700 }701 702 unlock( lock );703 704 // make new owner705 unpark( thrd );706 707 return thrd != 0p;708 }709 710 bool V(semaphore & this, unsigned diff) with( this ) {711 $thread * thrd = 0p;712 lock( lock __cfaabi_dbg_ctx2 );713 int release = max(-count, (int)diff);714 count += diff;715 for(release) {716 unpark( pop_head( waiting ) );717 }718 719 unlock( lock );720 721 return thrd != 0p;722 }723 724 //-----------------------------------------------------------------------------725 667 // Debug 726 668 __cfaabi_dbg_debug_do( -
libcfa/src/concurrency/kernel.hfa
rdafbde8 r454f478 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // kernel -- 7 // kernel -- Header containing the core of the kernel API 8 8 // 9 9 // Author : Thierry Delisle … … 24 24 extern "C" { 25 25 #include <bits/pthreadtypes.h> 26 #include <pthread.h> 26 27 #include <linux/types.h> 27 28 } 28 29 29 30 //----------------------------------------------------------------------------- 30 // Locks 31 struct semaphore { 32 __spinlock_t lock; 33 int count; 34 __queue_t($thread) waiting; 35 }; 36 37 void ?{}(semaphore & this, int count = 1); 38 void ^?{}(semaphore & this); 39 bool P (semaphore & this); 40 bool V (semaphore & this); 41 bool V (semaphore & this, unsigned count); 31 // Underlying Locks 32 #ifdef __CFA_WITH_VERIFY__ 33 extern bool __cfaabi_dbg_in_kernel(); 34 #endif 35 36 extern "C" { 37 char * strerror(int); 38 } 39 #define CHECKED(x) { int err = x; if( err != 0 ) abort("KERNEL ERROR: Operation \"" #x "\" return error %d - %s\n", err, strerror(err)); } 40 41 struct __bin_sem_t { 42 pthread_mutex_t lock; 43 pthread_cond_t cond; 44 int val; 45 }; 46 47 static inline void ?{}(__bin_sem_t & this) with( this ) { 48 // Create the mutex with error checking 49 pthread_mutexattr_t mattr; 50 pthread_mutexattr_init( &mattr ); 51 pthread_mutexattr_settype( &mattr, PTHREAD_MUTEX_ERRORCHECK_NP); 52 pthread_mutex_init(&lock, &mattr); 53 54 pthread_cond_init (&cond, (const pthread_condattr_t *)0p); // workaround trac#208: cast should not be required 55 val = 0; 56 } 57 58 static inline void ^?{}(__bin_sem_t & this) with( this ) { 59 CHECKED( pthread_mutex_destroy(&lock) ); 60 CHECKED( pthread_cond_destroy (&cond) ); 61 } 62 63 static inline void wait(__bin_sem_t & this) with( this ) { 64 verify(__cfaabi_dbg_in_kernel()); 65 CHECKED( pthread_mutex_lock(&lock) ); 66 while(val < 1) { 67 pthread_cond_wait(&cond, &lock); 68 } 69 val -= 1; 70 CHECKED( pthread_mutex_unlock(&lock) ); 71 } 72 73 static inline bool post(__bin_sem_t & this) with( this ) { 74 bool needs_signal = false; 75 76 CHECKED( pthread_mutex_lock(&lock) ); 77 if(val < 1) { 78 val += 1; 79 pthread_cond_signal(&cond); 80 needs_signal = true; 81 } 82 CHECKED( pthread_mutex_unlock(&lock) ); 83 84 return needs_signal; 85 } 86 87 #undef CHECKED 42 88 43 89 … … 91 137 92 138 // Termination synchronisation (user semaphore) 93 semaphoreterminated;139 oneshot terminated; 94 140 95 141 // pthread Stack -
libcfa/src/concurrency/kernel/fwd.hfa
rdafbde8 r454f478 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // kernel/fwd.hfa -- 7 // kernel/fwd.hfa -- PUBLIC 8 // Fundamental code needed to implement threading M.E.S. algorithms. 8 9 // 9 10 // Author : Thierry Delisle … … 134 135 extern uint64_t thread_rand(); 135 136 137 // Semaphore which only supports a single thread 138 struct single_sem { 139 struct $thread * volatile ptr; 140 }; 141 142 static inline { 143 void ?{}(single_sem & this) { 144 this.ptr = 0p; 145 } 146 147 void ^?{}(single_sem &) {} 148 149 bool wait(single_sem & this) { 150 for() { 151 struct $thread * expected = this.ptr; 152 if(expected == 1p) { 153 if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 154 return false; 155 } 156 } 157 else { 158 /* paranoid */ verify( expected == 0p ); 159 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 160 park(); 161 return true; 162 } 163 } 164 165 } 166 } 167 168 bool post(single_sem & this) { 169 for() { 170 struct $thread * expected = this.ptr; 171 if(expected == 1p) return false; 172 if(expected == 0p) { 173 if(__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 174 return false; 175 } 176 } 177 else { 178 if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 179 unpark( expected ); 180 return true; 181 } 182 } 183 } 184 } 185 } 186 187 // Synchronozation primitive which only supports a single thread and one post 188 // Similar to a binary semaphore with a 'one shot' semantic 189 // is expected to be discarded after each party call their side 190 struct oneshot { 191 // Internal state : 192 // 0p : is initial state (wait will block) 193 // 1p : fulfilled (wait won't block) 194 // any thread : a thread is currently waiting 195 struct $thread * volatile ptr; 196 }; 197 198 static inline { 199 void ?{}(oneshot & this) { 200 this.ptr = 0p; 201 } 202 203 void ^?{}(oneshot &) {} 204 205 // Wait for the post, return immidiately if it already happened. 206 // return true if the thread was parked 207 bool wait(oneshot & this) { 208 for() { 209 struct $thread * expected = this.ptr; 210 if(expected == 1p) return false; 211 /* paranoid */ verify( expected == 0p ); 212 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 213 park(); 214 /* paranoid */ verify( this.ptr == 1p ); 215 return true; 216 } 217 } 218 } 219 220 // Mark as fulfilled, wake thread if needed 221 // return true if a thread was unparked 222 bool post(oneshot & this) { 223 struct $thread * got = __atomic_exchange_n( &this.ptr, 1p, __ATOMIC_SEQ_CST); 224 if( got == 0p ) return false; 225 unpark( got ); 226 return true; 227 } 228 } 229 230 // base types for future to build upon 231 // It is based on the 'oneshot' type to allow multiple futures 232 // to block on the same instance, permitting users to block a single 233 // thread on "any of" [a given set of] futures. 234 // does not support multiple threads waiting on the same future 235 struct future_t { 236 // Internal state : 237 // 0p : is initial state (wait will block) 238 // 1p : fulfilled (wait won't block) 239 // 2p : in progress () 240 // 3p : abandoned, server should delete 241 // any oneshot : a context has been setup to wait, a thread could wait on it 242 struct oneshot * volatile ptr; 243 }; 244 245 static inline { 246 void ?{}(future_t & this) { 247 this.ptr = 0p; 248 } 249 250 void ^?{}(future_t &) {} 251 252 void reset(future_t & this) { 253 // needs to be in 0p or 1p 254 __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST); 255 } 256 257 // check if the future is available 258 bool available( future_t & this ) { 259 return this.ptr == 1p; 260 } 261 262 // Prepare the future to be waited on 263 // intented to be use by wait, wait_any, waitfor, etc. rather than used directly 264 bool setup( future_t & this, oneshot & wait_ctx ) { 265 /* paranoid */ verify( wait_ctx.ptr == 0p ); 266 // The future needs to set the wait context 267 for() { 268 struct oneshot * expected = this.ptr; 269 // Is the future already fulfilled? 270 if(expected == 1p) return false; // Yes, just return false (didn't block) 271 272 // The future is not fulfilled, try to setup the wait context 273 /* paranoid */ verify( expected == 0p ); 274 if(__atomic_compare_exchange_n(&this.ptr, &expected, &wait_ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 275 return true; 276 } 277 } 278 } 279 280 // Stop waiting on a future 281 // When multiple futures are waited for together in "any of" pattern 282 // futures that weren't fulfilled before the thread woke up 283 // should retract the wait ctx 284 // intented to be use by wait, wait_any, waitfor, etc. rather than used directly 285 void retract( future_t & this, oneshot & wait_ctx ) { 286 // Remove the wait context 287 struct oneshot * got = __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST); 288 289 // got == 0p: future was never actually setup, just return 290 if( got == 0p ) return; 291 292 // got == wait_ctx: since fulfil does an atomic_swap, 293 // if we got back the original then no one else saw context 294 // It is safe to delete (which could happen after the return) 295 if( got == &wait_ctx ) return; 296 297 // got == 1p: the future is ready and the context was fully consumed 298 // the server won't use the pointer again 299 // It is safe to delete (which could happen after the return) 300 if( got == 1p ) return; 301 302 // got == 2p: the future is ready but the context hasn't fully been consumed 303 // spin until it is safe to move on 304 if( got == 2p ) { 305 while( this.ptr != 1p ) Pause(); 306 return; 307 } 308 309 // got == any thing else, something wen't wrong here, abort 310 abort("Future in unexpected state"); 311 } 312 313 // Mark the future as abandoned, meaning it will be deleted by the server 314 bool abandon( future_t & this ) { 315 /* paranoid */ verify( this.ptr != 3p ); 316 317 // Mark the future as abandonned 318 struct oneshot * got = __atomic_exchange_n( &this.ptr, 3p, __ATOMIC_SEQ_CST); 319 320 // If the future isn't already fulfilled, let the server delete it 321 if( got == 0p ) return false; 322 323 // got == 2p: the future is ready but the context hasn't fully been consumed 324 // spin until it is safe to move on 325 if( got == 2p ) { 326 while( this.ptr != 1p ) Pause(); 327 got = 1p; 328 } 329 330 // The future is completed delete it now 331 /* paranoid */ verify( this.ptr != 1p ); 332 free( &this ); 333 return true; 334 } 335 336 // from the server side, mark the future as fulfilled 337 // delete it if needed 338 bool fulfil( future_t & this ) { 339 for() { 340 struct oneshot * expected = this.ptr; 341 // was this abandoned? 342 #if defined(__GNUC__) && __GNUC__ >= 7 343 #pragma GCC diagnostic push 344 #pragma GCC diagnostic ignored "-Wfree-nonheap-object" 345 #endif 346 if( expected == 3p ) { free( &this ); return false; } 347 #if defined(__GNUC__) && __GNUC__ >= 7 348 #pragma GCC diagnostic pop 349 #endif 350 351 /* paranoid */ verify( expected != 1p ); // Future is already fulfilled, should not happen 352 /* paranoid */ verify( expected != 2p ); // Future is bein fulfilled by someone else, this is even less supported then the previous case. 353 354 // If there is a wait context, we need to consume it and mark it as consumed after 355 // If there is no context then we can skip the in progress phase 356 struct oneshot * want = expected == 0p ? 1p : 2p; 357 if(__atomic_compare_exchange_n(&this.ptr, &expected, want, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 358 if( expected == 0p ) { /* paranoid */ verify( this.ptr == 1p); return false; } 359 bool ret = post( *expected ); 360 __atomic_store_n( &this.ptr, 1p, __ATOMIC_SEQ_CST); 361 return ret; 362 } 363 } 364 365 } 366 367 // Wait for the future to be fulfilled 368 bool wait( future_t & this ) { 369 oneshot temp; 370 if( !setup(this, temp) ) return false; 371 372 // Wait context is setup, just wait on it 373 bool ret = wait( temp ); 374 375 // Wait for the future to tru 376 while( this.ptr == 2p ) Pause(); 377 // Make sure the state makes sense 378 // Should be fulfilled, could be in progress but it's out of date if so 379 // since if that is the case, the oneshot was fulfilled (unparking this thread) 380 // and the oneshot should not be needed any more 381 __attribute__((unused)) struct oneshot * was = this.ptr; 382 /* paranoid */ verifyf( was == 1p, "Expected this.ptr to be 1p, was %p\n", was ); 383 384 // Mark the future as fulfilled, to be consistent 385 // with potential calls to avail 386 // this.ptr = 1p; 387 return ret; 388 } 389 } 390 136 391 //----------------------------------------------------------------------- 137 392 // Statics call at the end of each thread to register statistics -
libcfa/src/concurrency/kernel/startup.cfa
rdafbde8 r454f478 199 199 void ?{}(processor & this) with( this ) { 200 200 ( this.idle ){}; 201 ( this.terminated ){ 0};201 ( this.terminated ){}; 202 202 ( this.runner ){}; 203 203 init( this, "Main Processor", *mainCluster ); … … 528 528 void ?{}(processor & this, const char name[], cluster & _cltr) { 529 529 ( this.idle ){}; 530 ( this.terminated ){ 0};530 ( this.terminated ){}; 531 531 ( this.runner ){}; 532 532 … … 549 549 __wake_proc( &this ); 550 550 551 P( terminated );551 wait( terminated ); 552 552 /* paranoid */ verify( active_processor() != &this); 553 553 } -
libcfa/src/concurrency/locks.cfa
rdafbde8 r454f478 356 356 bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time ) with(this) { WAIT_TIME( info, &l , time ) } 357 357 } 358 359 //----------------------------------------------------------------------------- 360 // Semaphore 361 void ?{}( semaphore & this, int count = 1 ) { 362 (this.lock){}; 363 this.count = count; 364 (this.waiting){}; 365 } 366 void ^?{}(semaphore & this) {} 367 368 bool P(semaphore & this) with( this ){ 369 lock( lock __cfaabi_dbg_ctx2 ); 370 count -= 1; 371 if ( count < 0 ) { 372 // queue current task 373 append( waiting, active_thread() ); 374 375 // atomically release spin lock and block 376 unlock( lock ); 377 park(); 378 return true; 379 } 380 else { 381 unlock( lock ); 382 return false; 383 } 384 } 385 386 bool V(semaphore & this) with( this ) { 387 $thread * thrd = 0p; 388 lock( lock __cfaabi_dbg_ctx2 ); 389 count += 1; 390 if ( count <= 0 ) { 391 // remove task at head of waiting list 392 thrd = pop_head( waiting ); 393 } 394 395 unlock( lock ); 396 397 // make new owner 398 unpark( thrd ); 399 400 return thrd != 0p; 401 } 402 403 bool V(semaphore & this, unsigned diff) with( this ) { 404 $thread * thrd = 0p; 405 lock( lock __cfaabi_dbg_ctx2 ); 406 int release = max(-count, (int)diff); 407 count += diff; 408 for(release) { 409 unpark( pop_head( waiting ) ); 410 } 411 412 unlock( lock ); 413 414 return thrd != 0p; 415 } -
libcfa/src/concurrency/locks.hfa
rdafbde8 r454f478 157 157 bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time ); 158 158 } 159 160 //----------------------------------------------------------------------------- 161 // Semaphore 162 struct semaphore { 163 __spinlock_t lock; 164 int count; 165 __queue_t($thread) waiting; 166 }; 167 168 void ?{}(semaphore & this, int count = 1); 169 void ^?{}(semaphore & this); 170 bool P (semaphore & this); 171 bool V (semaphore & this); 172 bool V (semaphore & this, unsigned count);
Note:
See TracChangeset
for help on using the changeset viewer.