Changes in / [2dcd80a:d8bdf13]
- Location:
- libcfa/src/concurrency
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/locks.cfa
r2dcd80a rd8bdf13 414 414 #ifdef __CFA_DEBUG__ 415 415 if ( lock_used == 0p ) lock_used = &l; 416 else assert(lock_used == &l);416 else { assert(lock_used == &l); } 417 417 #endif 418 418 info_thread( L ) i = { active_thread(), info, &l }; -
libcfa/src/concurrency/locks.hfa
r2dcd80a rd8bdf13 30 30 #include "time.hfa" 31 31 32 #include <fstream.hfa>33 34 35 // futex headers36 #include <linux/futex.h> /* Definition of FUTEX_* constants */37 #include <sys/syscall.h> /* Definition of SYS_* constants */38 #include <unistd.h>39 40 32 //----------------------------------------------------------------------------- 41 33 // Semaphore … … 148 140 149 141 //----------------------------------------------------------------------------- 150 // futex_mutex151 152 // - No cond var support153 // - Kernel thd blocking alternative to the spinlock154 // - No ownership (will deadlock on reacq)155 struct futex_mutex {156 // lock state any state other than UNLOCKED is locked157 // enum LockState { UNLOCKED = 0, UNCONTENDED = 1, CONTENDED = 2 };158 159 // stores a lock state160 int val;161 };162 163 // to use for FUTEX_WAKE and FUTEX_WAIT (other futex calls will need more params)164 static inline int futex(int *uaddr, int futex_op, int val) {165 return syscall(SYS_futex, uaddr, futex_op, val, NULL, NULL, 0);166 }167 168 static inline void ?{}( futex_mutex & this ) with(this) { val = 0; }169 170 static inline bool internal_try_lock(futex_mutex & this, int & compare_val) with(this) {171 return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE);172 }173 174 static inline int internal_exchange(futex_mutex & this) with(this) {175 return __atomic_exchange_n((int*)&val, 2, __ATOMIC_ACQUIRE);176 }177 178 // if this is called recursively IT WILL DEADLOCK!!!!!179 static inline void lock(futex_mutex & this) with(this) {180 int state;181 182 183 // // linear backoff omitted for now184 // for( int spin = 4; spin < 1024; spin += spin) {185 // state = 0;186 // // if unlocked, lock and return187 // if (internal_try_lock(this, state)) return;188 // if (2 == state) break;189 // for (int i = 0; i < spin; i++) Pause();190 // }191 192 // no contention try to acquire193 if (internal_try_lock(this, state)) return;194 195 // if not in contended state, set to be in contended state196 if (state != 2) state = internal_exchange(this);197 198 // block and spin until we win the lock199 while (state != 0) {200 futex((int*)&val, FUTEX_WAIT, 2); // if val is not 2 this returns with EWOULDBLOCK201 state = internal_exchange(this);202 }203 }204 205 static inline void unlock(futex_mutex & this) with(this) {206 // if uncontended do atomice unlock and then return207 if (__atomic_fetch_sub(&val, 1, __ATOMIC_RELEASE) == 1) return; // TODO: try acq/rel208 209 // otherwise threads are blocked so we must wake one210 __atomic_store_n((int *)&val, 0, __ATOMIC_RELEASE);211 futex((int *)&val, FUTEX_WAKE, 1);212 }213 214 static inline void on_notify( futex_mutex & f, thread$ * t){ unpark(t); }215 static inline size_t on_wait( futex_mutex & f ) {unlock(f); return 0;}216 217 // to set recursion count after getting signalled;218 static inline void on_wakeup( futex_mutex & f, size_t recursion ) {}219 220 //-----------------------------------------------------------------------------221 142 // CLH Spinlock 222 143 // - No recursive acquisition … … 244 165 } 245 166 246 static inline void on_notify(clh_lock & this, struct thread$ * t ) { unpark(t); }247 static inline size_t on_wait(clh_lock & this) { unlock(this); return 0; }248 static inline void on_wakeup(clh_lock & this, size_t recursion ) {249 #ifdef REACQ250 lock(this);251 #endif252 }253 254 255 167 //----------------------------------------------------------------------------- 256 168 // Linear backoff Spinlock … … 259 171 __spinlock_t spinlock; 260 172 173 // Current thread owning the lock 174 struct thread$ * owner; 175 261 176 // List of blocked threads 262 177 dlist( thread$ ) blocked_threads; … … 264 179 // Used for comparing and exchanging 265 180 volatile size_t lock_value; 266 }; 267 268 static inline void ?{}( linear_backoff_then_block_lock & this ) { 181 182 // used for linear backoff spinning 183 int spin_start; 184 int spin_end; 185 int spin_count; 186 187 // after unsuccessful linear backoff yield this many times 188 int yield_count; 189 }; 190 191 static inline void ?{}( linear_backoff_then_block_lock & this, int spin_start, int spin_end, int spin_count, int yield_count ) { 269 192 this.spinlock{}; 270 193 this.blocked_threads{}; 271 194 this.lock_value = 0; 272 } 195 this.spin_start = spin_start; 196 this.spin_end = spin_end; 197 this.spin_count = spin_count; 198 this.yield_count = yield_count; 199 } 200 static inline void ?{}( linear_backoff_then_block_lock & this ) { this{4, 1024, 16, 0}; } 273 201 static inline void ^?{}( linear_backoff_then_block_lock & this ) {} 274 //static inline void ?{}( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;275 //static inline void ?=?( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;202 static inline void ?{}( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void; 203 static inline void ?=?( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void; 276 204 277 205 static inline bool internal_try_lock(linear_backoff_then_block_lock & this, size_t & compare_val) with(this) { 278 206 if (__atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) { 207 owner = active_thread(); 279 208 return true; 280 209 } … … 286 215 static inline bool try_lock_contention(linear_backoff_then_block_lock & this) with(this) { 287 216 if (__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE) == 0) { 217 owner = active_thread(); 288 218 return true; 289 219 } … … 292 222 293 223 static inline bool block(linear_backoff_then_block_lock & this) with(this) { 294 lock( spinlock __cfaabi_dbg_ctx2 ); // TODO change to lockfree queue (MPSC)224 lock( spinlock __cfaabi_dbg_ctx2 ); 295 225 if (lock_value != 2) { 296 226 unlock( spinlock ); … … 304 234 305 235 static inline void lock(linear_backoff_then_block_lock & this) with(this) { 236 // if owner just return 237 if (active_thread() == owner) return; 306 238 size_t compare_val = 0; 307 int spin = 4;239 int spin = spin_start; 308 240 // linear backoff 309 241 for( ;; ) { … … 312 244 if (2 == compare_val) break; 313 245 for (int i = 0; i < spin; i++) Pause(); 314 if (spin >= 1024) break;246 if (spin >= spin_end) break; 315 247 spin += spin; 316 248 } … … 322 254 323 255 static inline void unlock(linear_backoff_then_block_lock & this) with(this) { 256 verify(lock_value > 0); 257 owner = 0p; 324 258 if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return; 325 259 lock( spinlock __cfaabi_dbg_ctx2 ); … … 331 265 static inline void on_notify(linear_backoff_then_block_lock & this, struct thread$ * t ) { unpark(t); } 332 266 static inline size_t on_wait(linear_backoff_then_block_lock & this) { unlock(this); return 0; } 333 static inline void on_wakeup(linear_backoff_then_block_lock & this, size_t recursion ) { 334 #ifdef REACQ 335 lock(this); 336 #endif 337 } 267 static inline void on_wakeup(linear_backoff_then_block_lock & this, size_t recursion ) { lock(this); } 338 268 339 269 //----------------------------------------------------------------------------- … … 376 306 assert(!(held && owner == active_thread())); 377 307 #endif 378 if ( held) {308 if (held) { 379 309 insert_last( blocked_threads, *active_thread() ); 380 310 unlock( lock ); … … 401 331 } 402 332 403 static inline void on_notify(fast_block_lock & this, struct thread$ * t ) with(this) { 404 #ifdef REACQ 405 lock( lock __cfaabi_dbg_ctx2 ); 406 insert_last( blocked_threads, *t ); 407 unlock( lock ); 408 #else 409 unpark(t); 410 #endif 411 } 333 static inline void on_notify(fast_block_lock & this, struct thread$ * t ) { unpark(t); } 412 334 static inline size_t on_wait(fast_block_lock & this) { unlock(this); return 0; } 413 335 static inline void on_wakeup(fast_block_lock & this, size_t recursion ) { } … … 490 412 if ( owner != 0p ) { 491 413 insert_last( blocked_threads, *t ); 414 unlock( lock ); 492 415 } 493 416 // lock not held … … 496 419 recursion_count = 1; 497 420 unpark( t ); 498 }499 unlock( lock );421 unlock( lock ); 422 } 500 423 } 501 424 … … 551 474 static inline void lock(spin_queue_lock & this) with(this) { 552 475 mcs_spin_node node; 476 #ifdef __CFA_DEBUG__ 477 assert(!(held && owner == active_thread())); 478 #endif 553 479 lock( lock, node ); 554 480 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); 555 481 __atomic_store_n(&held, true, __ATOMIC_SEQ_CST); 556 482 unlock( lock, node ); 483 #ifdef __CFA_DEBUG__ 484 owner = active_thread(); 485 #endif 557 486 } 558 487 559 488 static inline void unlock(spin_queue_lock & this) with(this) { 489 #ifdef __CFA_DEBUG__ 490 owner = 0p; 491 #endif 560 492 __atomic_store_n(&held, false, __ATOMIC_RELEASE); 561 493 } 562 494 563 static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) { 564 unpark(t); 565 } 495 static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) { unpark(t); } 566 496 static inline size_t on_wait(spin_queue_lock & this) { unlock(this); return 0; } 567 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { 568 #ifdef REACQ 569 lock(this); 570 #endif 571 } 497 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { } 572 498 573 499 … … 585 511 // flag showing if lock is held 586 512 volatile bool held; 513 514 #ifdef __CFA_DEBUG__ 515 // for deadlock detection 516 struct thread$ * owner; 517 #endif 587 518 }; 588 519 … … 598 529 static inline void lock(mcs_block_spin_lock & this) with(this) { 599 530 mcs_node node; 531 #ifdef __CFA_DEBUG__ 532 assert(!(held && owner == active_thread())); 533 #endif 600 534 lock( lock, node ); 601 535 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); 602 536 __atomic_store_n(&held, true, __ATOMIC_SEQ_CST); 603 537 unlock( lock, node ); 538 #ifdef __CFA_DEBUG__ 539 owner = active_thread(); 540 #endif 604 541 } 605 542 606 543 static inline void unlock(mcs_block_spin_lock & this) with(this) { 544 #ifdef __CFA_DEBUG__ 545 owner = 0p; 546 #endif 607 547 __atomic_store_n(&held, false, __ATOMIC_SEQ_CST); 608 548 } … … 610 550 static inline void on_notify(mcs_block_spin_lock & this, struct thread$ * t ) { unpark(t); } 611 551 static inline size_t on_wait(mcs_block_spin_lock & this) { unlock(this); return 0; } 612 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) { 613 #ifdef REACQ 614 lock(this); 615 #endif 616 } 552 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) { } 617 553 618 554 //----------------------------------------------------------------------------- … … 629 565 // flag showing if lock is held 630 566 volatile bool held; 567 568 #ifdef __CFA_DEBUG__ 569 // for deadlock detection 570 struct thread$ * owner; 571 #endif 631 572 }; 632 573 … … 641 582 // if this is called recursively IT WILL DEADLOCK!!!!! 642 583 static inline void lock(block_spin_lock & this) with(this) { 584 #ifdef __CFA_DEBUG__ 585 assert(!(held && owner == active_thread())); 586 #endif 643 587 lock( lock ); 644 588 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); 645 589 __atomic_store_n(&held, true, __ATOMIC_RELEASE); 646 590 unlock( lock ); 591 #ifdef __CFA_DEBUG__ 592 owner = active_thread(); 593 #endif 647 594 } 648 595 649 596 static inline void unlock(block_spin_lock & this) with(this) { 597 #ifdef __CFA_DEBUG__ 598 owner = 0p; 599 #endif 650 600 __atomic_store_n(&held, false, __ATOMIC_RELEASE); 651 601 } 652 602 653 static inline void on_notify(block_spin_lock & this, struct thread$ * t ) with(this.lock) { 654 #ifdef REACQ 655 // first we acquire internal fast_block_lock 656 lock( lock __cfaabi_dbg_ctx2 ); 657 if ( held ) { // if internal fast_block_lock is held 658 insert_last( blocked_threads, *t ); 659 unlock( lock ); 660 return; 661 } 662 // if internal fast_block_lock is not held 663 held = true; 664 #ifdef __CFA_DEBUG__ 665 owner = t; 666 #endif 667 unlock( lock ); 668 669 #endif 670 unpark(t); 671 672 } 603 static inline void on_notify(block_spin_lock & this, struct thread$ * t ) { unpark(t); } 673 604 static inline size_t on_wait(block_spin_lock & this) { unlock(this); return 0; } 674 static inline void on_wakeup(block_spin_lock & this, size_t recursion ) with(this) { 675 #ifdef REACQ 676 // now we acquire the entire block_spin_lock upon waking up 677 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); 678 __atomic_store_n(&held, true, __ATOMIC_RELEASE); 679 unlock( lock ); // Now we release the internal fast_spin_lock 680 #endif 681 } 605 static inline void on_wakeup(block_spin_lock & this, size_t recursion ) { } 682 606 683 607 //-----------------------------------------------------------------------------
Note:
See TracChangeset
for help on using the changeset viewer.