Changes in / [d8bdf13:2dcd80a]
- Location:
- libcfa/src/concurrency
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/locks.cfa
rd8bdf13 r2dcd80a 414 414 #ifdef __CFA_DEBUG__ 415 415 if ( lock_used == 0p ) lock_used = &l; 416 else { assert(lock_used == &l); }416 else assert(lock_used == &l); 417 417 #endif 418 418 info_thread( L ) i = { active_thread(), info, &l }; -
libcfa/src/concurrency/locks.hfa
rd8bdf13 r2dcd80a 30 30 #include "time.hfa" 31 31 32 #include <fstream.hfa> 33 34 35 // futex headers 36 #include <linux/futex.h> /* Definition of FUTEX_* constants */ 37 #include <sys/syscall.h> /* Definition of SYS_* constants */ 38 #include <unistd.h> 39 32 40 //----------------------------------------------------------------------------- 33 41 // Semaphore … … 140 148 141 149 //----------------------------------------------------------------------------- 150 // futex_mutex 151 152 // - No cond var support 153 // - Kernel thd blocking alternative to the spinlock 154 // - No ownership (will deadlock on reacq) 155 struct futex_mutex { 156 // lock state any state other than UNLOCKED is locked 157 // enum LockState { UNLOCKED = 0, UNCONTENDED = 1, CONTENDED = 2 }; 158 159 // stores a lock state 160 int val; 161 }; 162 163 // to use for FUTEX_WAKE and FUTEX_WAIT (other futex calls will need more params) 164 static inline int futex(int *uaddr, int futex_op, int val) { 165 return syscall(SYS_futex, uaddr, futex_op, val, NULL, NULL, 0); 166 } 167 168 static inline void ?{}( futex_mutex & this ) with(this) { val = 0; } 169 170 static inline bool internal_try_lock(futex_mutex & this, int & compare_val) with(this) { 171 return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE); 172 } 173 174 static inline int internal_exchange(futex_mutex & this) with(this) { 175 return __atomic_exchange_n((int*)&val, 2, __ATOMIC_ACQUIRE); 176 } 177 178 // if this is called recursively IT WILL DEADLOCK!!!!! 179 static inline void lock(futex_mutex & this) with(this) { 180 int state; 181 182 183 // // linear backoff omitted for now 184 // for( int spin = 4; spin < 1024; spin += spin) { 185 // state = 0; 186 // // if unlocked, lock and return 187 // if (internal_try_lock(this, state)) return; 188 // if (2 == state) break; 189 // for (int i = 0; i < spin; i++) Pause(); 190 // } 191 192 // no contention try to acquire 193 if (internal_try_lock(this, state)) return; 194 195 // if not in contended state, set to be in contended state 196 if (state != 2) state = internal_exchange(this); 197 198 // block and spin until we win the lock 199 while (state != 0) { 200 futex((int*)&val, FUTEX_WAIT, 2); // if val is not 2 this returns with EWOULDBLOCK 201 state = internal_exchange(this); 202 } 203 } 204 205 static inline void unlock(futex_mutex & this) with(this) { 206 // if uncontended do atomice unlock and then return 207 if (__atomic_fetch_sub(&val, 1, __ATOMIC_RELEASE) == 1) return; // TODO: try acq/rel 208 209 // otherwise threads are blocked so we must wake one 210 __atomic_store_n((int *)&val, 0, __ATOMIC_RELEASE); 211 futex((int *)&val, FUTEX_WAKE, 1); 212 } 213 214 static inline void on_notify( futex_mutex & f, thread$ * t){ unpark(t); } 215 static inline size_t on_wait( futex_mutex & f ) {unlock(f); return 0;} 216 217 // to set recursion count after getting signalled; 218 static inline void on_wakeup( futex_mutex & f, size_t recursion ) {} 219 220 //----------------------------------------------------------------------------- 142 221 // CLH Spinlock 143 222 // - No recursive acquisition … … 165 244 } 166 245 246 static inline void on_notify(clh_lock & this, struct thread$ * t ) { unpark(t); } 247 static inline size_t on_wait(clh_lock & this) { unlock(this); return 0; } 248 static inline void on_wakeup(clh_lock & this, size_t recursion ) { 249 #ifdef REACQ 250 lock(this); 251 #endif 252 } 253 254 167 255 //----------------------------------------------------------------------------- 168 256 // Linear backoff Spinlock … … 171 259 __spinlock_t spinlock; 172 260 173 // Current thread owning the lock174 struct thread$ * owner;175 176 261 // List of blocked threads 177 262 dlist( thread$ ) blocked_threads; … … 179 264 // Used for comparing and exchanging 180 265 volatile size_t lock_value; 181 182 // used for linear backoff spinning 183 int spin_start; 184 int spin_end; 185 int spin_count; 186 187 // after unsuccessful linear backoff yield this many times 188 int yield_count; 189 }; 190 191 static inline void ?{}( linear_backoff_then_block_lock & this, int spin_start, int spin_end, int spin_count, int yield_count ) { 266 }; 267 268 static inline void ?{}( linear_backoff_then_block_lock & this ) { 192 269 this.spinlock{}; 193 270 this.blocked_threads{}; 194 271 this.lock_value = 0; 195 this.spin_start = spin_start; 196 this.spin_end = spin_end; 197 this.spin_count = spin_count; 198 this.yield_count = yield_count; 199 } 200 static inline void ?{}( linear_backoff_then_block_lock & this ) { this{4, 1024, 16, 0}; } 272 } 201 273 static inline void ^?{}( linear_backoff_then_block_lock & this ) {} 202 static inline void ?{}( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;203 static inline void ?=?( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;274 // static inline void ?{}( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void; 275 // static inline void ?=?( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void; 204 276 205 277 static inline bool internal_try_lock(linear_backoff_then_block_lock & this, size_t & compare_val) with(this) { 206 278 if (__atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) { 207 owner = active_thread();208 279 return true; 209 280 } … … 215 286 static inline bool try_lock_contention(linear_backoff_then_block_lock & this) with(this) { 216 287 if (__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE) == 0) { 217 owner = active_thread();218 288 return true; 219 289 } … … 222 292 223 293 static inline bool block(linear_backoff_then_block_lock & this) with(this) { 224 lock( spinlock __cfaabi_dbg_ctx2 ); 294 lock( spinlock __cfaabi_dbg_ctx2 ); // TODO change to lockfree queue (MPSC) 225 295 if (lock_value != 2) { 226 296 unlock( spinlock ); … … 234 304 235 305 static inline void lock(linear_backoff_then_block_lock & this) with(this) { 236 // if owner just return237 if (active_thread() == owner) return;238 306 size_t compare_val = 0; 239 int spin = spin_start;307 int spin = 4; 240 308 // linear backoff 241 309 for( ;; ) { … … 244 312 if (2 == compare_val) break; 245 313 for (int i = 0; i < spin; i++) Pause(); 246 if (spin >= spin_end) break;314 if (spin >= 1024) break; 247 315 spin += spin; 248 316 } … … 254 322 255 323 static inline void unlock(linear_backoff_then_block_lock & this) with(this) { 256 verify(lock_value > 0);257 owner = 0p;258 324 if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return; 259 325 lock( spinlock __cfaabi_dbg_ctx2 ); … … 265 331 static inline void on_notify(linear_backoff_then_block_lock & this, struct thread$ * t ) { unpark(t); } 266 332 static inline size_t on_wait(linear_backoff_then_block_lock & this) { unlock(this); return 0; } 267 static inline void on_wakeup(linear_backoff_then_block_lock & this, size_t recursion ) { lock(this); } 333 static inline void on_wakeup(linear_backoff_then_block_lock & this, size_t recursion ) { 334 #ifdef REACQ 335 lock(this); 336 #endif 337 } 268 338 269 339 //----------------------------------------------------------------------------- … … 306 376 assert(!(held && owner == active_thread())); 307 377 #endif 308 if ( held) {378 if ( held ) { 309 379 insert_last( blocked_threads, *active_thread() ); 310 380 unlock( lock ); … … 331 401 } 332 402 333 static inline void on_notify(fast_block_lock & this, struct thread$ * t ) { unpark(t); } 403 static inline void on_notify(fast_block_lock & this, struct thread$ * t ) with(this) { 404 #ifdef REACQ 405 lock( lock __cfaabi_dbg_ctx2 ); 406 insert_last( blocked_threads, *t ); 407 unlock( lock ); 408 #else 409 unpark(t); 410 #endif 411 } 334 412 static inline size_t on_wait(fast_block_lock & this) { unlock(this); return 0; } 335 413 static inline void on_wakeup(fast_block_lock & this, size_t recursion ) { } … … 412 490 if ( owner != 0p ) { 413 491 insert_last( blocked_threads, *t ); 414 unlock( lock );415 492 } 416 493 // lock not held … … 419 496 recursion_count = 1; 420 497 unpark( t ); 421 unlock( lock );422 }498 } 499 unlock( lock ); 423 500 } 424 501 … … 474 551 static inline void lock(spin_queue_lock & this) with(this) { 475 552 mcs_spin_node node; 476 #ifdef __CFA_DEBUG__477 assert(!(held && owner == active_thread()));478 #endif479 553 lock( lock, node ); 480 554 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); 481 555 __atomic_store_n(&held, true, __ATOMIC_SEQ_CST); 482 556 unlock( lock, node ); 483 #ifdef __CFA_DEBUG__484 owner = active_thread();485 #endif486 557 } 487 558 488 559 static inline void unlock(spin_queue_lock & this) with(this) { 489 #ifdef __CFA_DEBUG__490 owner = 0p;491 #endif492 560 __atomic_store_n(&held, false, __ATOMIC_RELEASE); 493 561 } 494 562 495 static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) { unpark(t); } 563 static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) { 564 unpark(t); 565 } 496 566 static inline size_t on_wait(spin_queue_lock & this) { unlock(this); return 0; } 497 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { } 567 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { 568 #ifdef REACQ 569 lock(this); 570 #endif 571 } 498 572 499 573 … … 511 585 // flag showing if lock is held 512 586 volatile bool held; 513 514 #ifdef __CFA_DEBUG__515 // for deadlock detection516 struct thread$ * owner;517 #endif518 587 }; 519 588 … … 529 598 static inline void lock(mcs_block_spin_lock & this) with(this) { 530 599 mcs_node node; 531 #ifdef __CFA_DEBUG__532 assert(!(held && owner == active_thread()));533 #endif534 600 lock( lock, node ); 535 601 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); 536 602 __atomic_store_n(&held, true, __ATOMIC_SEQ_CST); 537 603 unlock( lock, node ); 538 #ifdef __CFA_DEBUG__539 owner = active_thread();540 #endif541 604 } 542 605 543 606 static inline void unlock(mcs_block_spin_lock & this) with(this) { 544 #ifdef __CFA_DEBUG__545 owner = 0p;546 #endif547 607 __atomic_store_n(&held, false, __ATOMIC_SEQ_CST); 548 608 } … … 550 610 static inline void on_notify(mcs_block_spin_lock & this, struct thread$ * t ) { unpark(t); } 551 611 static inline size_t on_wait(mcs_block_spin_lock & this) { unlock(this); return 0; } 552 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) { } 612 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) { 613 #ifdef REACQ 614 lock(this); 615 #endif 616 } 553 617 554 618 //----------------------------------------------------------------------------- … … 565 629 // flag showing if lock is held 566 630 volatile bool held; 567 568 #ifdef __CFA_DEBUG__569 // for deadlock detection570 struct thread$ * owner;571 #endif572 631 }; 573 632 … … 582 641 // if this is called recursively IT WILL DEADLOCK!!!!! 583 642 static inline void lock(block_spin_lock & this) with(this) { 584 #ifdef __CFA_DEBUG__585 assert(!(held && owner == active_thread()));586 #endif587 643 lock( lock ); 588 644 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); 589 645 __atomic_store_n(&held, true, __ATOMIC_RELEASE); 590 646 unlock( lock ); 647 } 648 649 static inline void unlock(block_spin_lock & this) with(this) { 650 __atomic_store_n(&held, false, __ATOMIC_RELEASE); 651 } 652 653 static inline void on_notify(block_spin_lock & this, struct thread$ * t ) with(this.lock) { 654 #ifdef REACQ 655 // first we acquire internal fast_block_lock 656 lock( lock __cfaabi_dbg_ctx2 ); 657 if ( held ) { // if internal fast_block_lock is held 658 insert_last( blocked_threads, *t ); 659 unlock( lock ); 660 return; 661 } 662 // if internal fast_block_lock is not held 663 held = true; 591 664 #ifdef __CFA_DEBUG__ 592 owner = active_thread(); 593 #endif 594 } 595 596 static inline void unlock(block_spin_lock & this) with(this) { 597 #ifdef __CFA_DEBUG__ 598 owner = 0p; 599 #endif 600 __atomic_store_n(&held, false, __ATOMIC_RELEASE); 601 } 602 603 static inline void on_notify(block_spin_lock & this, struct thread$ * t ) { unpark(t); } 665 owner = t; 666 #endif 667 unlock( lock ); 668 669 #endif 670 unpark(t); 671 672 } 604 673 static inline size_t on_wait(block_spin_lock & this) { unlock(this); return 0; } 605 static inline void on_wakeup(block_spin_lock & this, size_t recursion ) { } 674 static inline void on_wakeup(block_spin_lock & this, size_t recursion ) with(this) { 675 #ifdef REACQ 676 // now we acquire the entire block_spin_lock upon waking up 677 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); 678 __atomic_store_n(&held, true, __ATOMIC_RELEASE); 679 unlock( lock ); // Now we release the internal fast_spin_lock 680 #endif 681 } 606 682 607 683 //-----------------------------------------------------------------------------
Note:
See TracChangeset
for help on using the changeset viewer.