Changeset b77f0e1
- Timestamp:
- Nov 14, 2022, 11:52:40 AM (2 years ago)
- Branches:
- ADT, ast-experimental, master
- Children:
- 63be3387
- Parents:
- bc899d6
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/locks.hfa
rbc899d6 rb77f0e1 30 30 #include "time.hfa" 31 31 32 #include <fstream.hfa> 33 34 35 // futex headers 36 #include <linux/futex.h> /* Definition of FUTEX_* constants */ 37 #include <sys/syscall.h> /* Definition of SYS_* constants */ 38 #include <unistd.h> 39 32 40 //----------------------------------------------------------------------------- 33 41 // Semaphore … … 140 148 141 149 //----------------------------------------------------------------------------- 150 // futex_mutex 151 152 // - No cond var support 153 // - Kernel thd blocking alternative to the spinlock 154 // - No ownership (will deadlock on reacq) 155 struct futex_mutex { 156 // lock state any state other than UNLOCKED is locked 157 // enum LockState { UNLOCKED = 0, UNCONTENDED = 1, CONTENDED = 2 }; 158 159 // stores a lock state 160 int val; 161 }; 162 163 // to use for FUTEX_WAKE and FUTEX_WAIT (other futex calls will need more params) 164 static int futex(int *uaddr, int futex_op, int val) { 165 return syscall(SYS_futex, uaddr, futex_op, val, NULL, NULL, 0); 166 } 167 168 static inline void ?{}( futex_mutex & this ) with(this) { val = 0; } 169 170 static inline bool internal_try_lock(futex_mutex & this, int & compare_val) with(this) { 171 return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE); 172 } 173 174 static inline int internal_exchange(futex_mutex & this) with(this) { 175 return __atomic_exchange_n((int*)&val, 2, __ATOMIC_ACQUIRE); 176 } 177 178 // if this is called recursively IT WILL DEADLOCK!!!!! 179 static inline void lock(futex_mutex & this) with(this) { 180 int state; 181 182 183 // linear backoff 184 for( int spin = 4; spin < 1024; spin += spin) { 185 state = 0; 186 // if unlocked, lock and return 187 if (internal_try_lock(this, state)) return; 188 if (2 == state) break; 189 for (int i = 0; i < spin; i++) Pause(); 190 } 191 // if (internal_try_lock(this, state)) return; 192 193 // if not in contended state, set to be in contended state 194 if (state != 2) state = internal_exchange(this); 195 196 // block and spin until we win the lock 197 while (state != 0) { 198 futex((int*)&val, FUTEX_WAIT, 2); // if val is not 2 this returns with EWOULDBLOCK 199 state = internal_exchange(this); 200 } 201 } 202 203 static inline void unlock(futex_mutex & this) with(this) { 204 // if uncontended do atomice unlock and then return 205 if (__atomic_fetch_sub(&val, 1, __ATOMIC_RELEASE) == 1) return; // TODO: try acq/rel 206 207 // otherwise threads are blocked so we must wake one 208 __atomic_store_n((int *)&val, 0, __ATOMIC_RELEASE); 209 futex((int *)&val, FUTEX_WAKE, 1); 210 } 211 212 static inline void on_notify( futex_mutex & f, thread$ * t){ unpark(t); } 213 static inline size_t on_wait( futex_mutex & f ) {unlock(f); return 0;} 214 215 // to set recursion count after getting signalled; 216 static inline void on_wakeup( futex_mutex & f, size_t recursion ) {} 217 218 //----------------------------------------------------------------------------- 142 219 // CLH Spinlock 143 220 // - No recursive acquisition … … 165 242 } 166 243 244 static inline void on_notify(clh_lock & this, struct thread$ * t ) { unpark(t); } 245 static inline size_t on_wait(clh_lock & this) { unlock(this); return 0; } 246 static inline void on_wakeup(clh_lock & this, size_t recursion ) { 247 #ifdef REACQ 248 lock(this); 249 #endif 250 } 251 252 167 253 //----------------------------------------------------------------------------- 168 254 // Linear backoff Spinlock … … 171 257 __spinlock_t spinlock; 172 258 173 // Current thread owning the lock174 struct thread$ * owner;175 176 259 // List of blocked threads 177 260 dlist( thread$ ) blocked_threads; … … 179 262 // Used for comparing and exchanging 180 263 volatile size_t lock_value; 181 182 // used for linear backoff spinning 183 int spin_start; 184 int spin_end; 185 int spin_count; 186 187 // after unsuccessful linear backoff yield this many times 188 int yield_count; 189 }; 190 191 static inline void ?{}( linear_backoff_then_block_lock & this, int spin_start, int spin_end, int spin_count, int yield_count ) { 264 }; 265 266 static inline void ?{}( linear_backoff_then_block_lock & this ) { 192 267 this.spinlock{}; 193 268 this.blocked_threads{}; 194 269 this.lock_value = 0; 195 this.spin_start = spin_start; 196 this.spin_end = spin_end; 197 this.spin_count = spin_count; 198 this.yield_count = yield_count; 199 } 200 static inline void ?{}( linear_backoff_then_block_lock & this ) { this{4, 1024, 16, 0}; } 270 } 201 271 static inline void ^?{}( linear_backoff_then_block_lock & this ) {} 202 static inline void ?{}( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;203 static inline void ?=?( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;272 // static inline void ?{}( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void; 273 // static inline void ?=?( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void; 204 274 205 275 static inline bool internal_try_lock(linear_backoff_then_block_lock & this, size_t & compare_val) with(this) { 206 276 if (__atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) { 207 owner = active_thread();208 277 return true; 209 278 } … … 215 284 static inline bool try_lock_contention(linear_backoff_then_block_lock & this) with(this) { 216 285 if (__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE) == 0) { 217 owner = active_thread();218 286 return true; 219 287 } … … 234 302 235 303 static inline void lock(linear_backoff_then_block_lock & this) with(this) { 236 // if owner just return237 if (active_thread() == owner) return;238 304 size_t compare_val = 0; 239 int spin = spin_start;305 int spin = 4; 240 306 // linear backoff 241 307 for( ;; ) { … … 244 310 if (2 == compare_val) break; 245 311 for (int i = 0; i < spin; i++) Pause(); 246 if (spin >= spin_end) break;312 if (spin >= 1024) break; 247 313 spin += spin; 248 314 } … … 254 320 255 321 static inline void unlock(linear_backoff_then_block_lock & this) with(this) { 256 verify(lock_value > 0);257 owner = 0p;258 322 if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return; 259 323 lock( spinlock __cfaabi_dbg_ctx2 ); … … 265 329 static inline void on_notify(linear_backoff_then_block_lock & this, struct thread$ * t ) { unpark(t); } 266 330 static inline size_t on_wait(linear_backoff_then_block_lock & this) { unlock(this); return 0; } 267 static inline void on_wakeup(linear_backoff_then_block_lock & this, size_t recursion ) { lock(this); } 331 static inline void on_wakeup(linear_backoff_then_block_lock & this, size_t recursion ) { 332 #ifdef REACQ 333 lock(this); 334 #endif 335 } 268 336 269 337 //----------------------------------------------------------------------------- … … 306 374 assert(!(held && owner == active_thread())); 307 375 #endif 308 if ( held) {376 if ( held ) { 309 377 insert_last( blocked_threads, *active_thread() ); 310 378 unlock( lock ); … … 331 399 } 332 400 333 static inline void on_notify(fast_block_lock & this, struct thread$ * t ) { unpark(t); } 401 static inline void on_notify(fast_block_lock & this, struct thread$ * t ) with(this) { 402 #ifdef REACQ 403 lock( lock __cfaabi_dbg_ctx2 ); 404 insert_last( blocked_threads, *t ); 405 unlock( lock ); 406 #else 407 unpark(t); 408 #endif 409 } 334 410 static inline size_t on_wait(fast_block_lock & this) { unlock(this); return 0; } 335 411 static inline void on_wakeup(fast_block_lock & this, size_t recursion ) { } … … 412 488 if ( owner != 0p ) { 413 489 insert_last( blocked_threads, *t ); 414 unlock( lock );415 490 } 416 491 // lock not held … … 419 494 recursion_count = 1; 420 495 unpark( t ); 421 unlock( lock );422 }496 } 497 unlock( lock ); 423 498 } 424 499 … … 474 549 static inline void lock(spin_queue_lock & this) with(this) { 475 550 mcs_spin_node node; 476 #ifdef __CFA_DEBUG__477 assert(!(held && owner == active_thread()));478 #endif479 551 lock( lock, node ); 480 552 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); 481 553 __atomic_store_n(&held, true, __ATOMIC_SEQ_CST); 482 554 unlock( lock, node ); 483 #ifdef __CFA_DEBUG__484 owner = active_thread();485 #endif486 555 } 487 556 488 557 static inline void unlock(spin_queue_lock & this) with(this) { 489 #ifdef __CFA_DEBUG__490 owner = 0p;491 #endif492 558 __atomic_store_n(&held, false, __ATOMIC_RELEASE); 493 559 } 494 560 495 static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) { unpark(t); } 561 static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) { 562 unpark(t); 563 } 496 564 static inline size_t on_wait(spin_queue_lock & this) { unlock(this); return 0; } 497 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { } 565 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { 566 #ifdef REACQ 567 lock(this); 568 #endif 569 } 498 570 499 571 … … 511 583 // flag showing if lock is held 512 584 volatile bool held; 513 514 #ifdef __CFA_DEBUG__515 // for deadlock detection516 struct thread$ * owner;517 #endif518 585 }; 519 586 … … 529 596 static inline void lock(mcs_block_spin_lock & this) with(this) { 530 597 mcs_node node; 531 #ifdef __CFA_DEBUG__532 assert(!(held && owner == active_thread()));533 #endif534 598 lock( lock, node ); 535 599 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); 536 600 __atomic_store_n(&held, true, __ATOMIC_SEQ_CST); 537 601 unlock( lock, node ); 538 #ifdef __CFA_DEBUG__539 owner = active_thread();540 #endif541 602 } 542 603 543 604 static inline void unlock(mcs_block_spin_lock & this) with(this) { 544 #ifdef __CFA_DEBUG__545 owner = 0p;546 #endif547 605 __atomic_store_n(&held, false, __ATOMIC_SEQ_CST); 548 606 } … … 550 608 static inline void on_notify(mcs_block_spin_lock & this, struct thread$ * t ) { unpark(t); } 551 609 static inline size_t on_wait(mcs_block_spin_lock & this) { unlock(this); return 0; } 552 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) { } 610 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) { 611 #ifdef REACQ 612 lock(this); 613 #endif 614 } 553 615 554 616 //----------------------------------------------------------------------------- … … 565 627 // flag showing if lock is held 566 628 volatile bool held; 567 568 #ifdef __CFA_DEBUG__569 // for deadlock detection570 struct thread$ * owner;571 #endif572 629 }; 573 630 … … 582 639 // if this is called recursively IT WILL DEADLOCK!!!!! 583 640 static inline void lock(block_spin_lock & this) with(this) { 584 #ifdef __CFA_DEBUG__585 assert(!(held && owner == active_thread()));586 #endif587 641 lock( lock ); 588 642 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); 589 643 __atomic_store_n(&held, true, __ATOMIC_RELEASE); 590 644 unlock( lock ); 645 } 646 647 static inline void unlock(block_spin_lock & this) with(this) { 648 __atomic_store_n(&held, false, __ATOMIC_RELEASE); 649 } 650 651 static inline void on_notify(block_spin_lock & this, struct thread$ * t ) with(this.lock) { 652 #ifdef REACQ 653 // first we acquire internal fast_block_lock 654 lock( lock __cfaabi_dbg_ctx2 ); 655 if ( held ) { // if internal fast_block_lock is held 656 insert_last( blocked_threads, *t ); 657 unlock( lock ); 658 return; 659 } 660 // if internal fast_block_lock is not held 661 held = true; 591 662 #ifdef __CFA_DEBUG__ 592 owner = active_thread(); 593 #endif 594 } 595 596 static inline void unlock(block_spin_lock & this) with(this) { 597 #ifdef __CFA_DEBUG__ 598 owner = 0p; 599 #endif 600 __atomic_store_n(&held, false, __ATOMIC_RELEASE); 601 } 602 603 static inline void on_notify(block_spin_lock & this, struct thread$ * t ) { unpark(t); } 663 owner = t; 664 #endif 665 unlock( lock ); 666 667 #endif 668 unpark(t); 669 670 } 604 671 static inline size_t on_wait(block_spin_lock & this) { unlock(this); return 0; } 605 static inline void on_wakeup(block_spin_lock & this, size_t recursion ) { } 672 static inline void on_wakeup(block_spin_lock & this, size_t recursion ) with(this) { 673 #ifdef REACQ 674 // now we acquire the entire block_spin_lock upon waking up 675 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); 676 __atomic_store_n(&held, true, __ATOMIC_RELEASE); 677 unlock( lock ); // Now we release the internal fast_spin_lock 678 #endif 679 } 606 680 607 681 //-----------------------------------------------------------------------------
Note: See TracChangeset
for help on using the changeset viewer.