Changes in / [63be3387:19a8c40]


Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/locks.hfa

    r63be3387 r19a8c40  
    3030#include "time.hfa"
    3131
    32 #include <fstream.hfa>
    33 
    34 
    35 // futex headers
    36 #include <linux/futex.h>      /* Definition of FUTEX_* constants */
    37 #include <sys/syscall.h>      /* Definition of SYS_* constants */
    38 #include <unistd.h>
    39 
    4032//-----------------------------------------------------------------------------
    4133// Semaphore
     
    148140
    149141//-----------------------------------------------------------------------------
    150 // futex_mutex
    151 
    152 // - No cond var support
    153 // - Kernel thd blocking alternative to the spinlock
    154 // - No ownership (will deadlock on reacq)
    155 struct futex_mutex {
    156         // lock state any state other than UNLOCKED is locked
    157         // enum LockState { UNLOCKED = 0, UNCONTENDED = 1, CONTENDED = 2 };
    158        
    159         // stores a lock state
    160         int val;
    161 };
    162 
    163 // to use for FUTEX_WAKE and FUTEX_WAIT (other futex calls will need more params)
    164 static int futex(int *uaddr, int futex_op, int val) {
    165     return syscall(SYS_futex, uaddr, futex_op, val, NULL, NULL, 0);
    166 }
    167 
    168 static inline void  ?{}( futex_mutex & this ) with(this) { val = 0; }
    169 
    170 static inline bool internal_try_lock(futex_mutex & this, int & compare_val) with(this) {
    171         return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE);
    172 }
    173 
    174 static inline int internal_exchange(futex_mutex & this) with(this) {
    175         return __atomic_exchange_n((int*)&val, 2, __ATOMIC_ACQUIRE);
    176 }
    177 
    178 // if this is called recursively IT WILL DEADLOCK!!!!!
    179 static inline void lock(futex_mutex & this) with(this) {
    180         int state;
    181 
    182        
    183         // linear backoff
    184         for( int spin = 4; spin < 1024; spin += spin) {
    185                 state = 0;
    186                 // if unlocked, lock and return
    187                 if (internal_try_lock(this, state)) return;
    188                 if (2 == state) break;
    189                 for (int i = 0; i < spin; i++) Pause();
    190         }
    191         // if (internal_try_lock(this, state)) return;
    192        
    193         // if not in contended state, set to be in contended state
    194         if (state != 2) state = internal_exchange(this);
    195 
    196         // block and spin until we win the lock
    197         while (state != 0) {
    198                 futex((int*)&val, FUTEX_WAIT, 2); // if val is not 2 this returns with EWOULDBLOCK
    199                 state = internal_exchange(this);
    200         }
    201 }
    202 
    203 static inline void unlock(futex_mutex & this) with(this) {
    204         // if uncontended do atomice unlock and then return
    205         if (__atomic_fetch_sub(&val, 1, __ATOMIC_RELEASE) == 1) return; // TODO: try acq/rel
    206        
    207         // otherwise threads are blocked so we must wake one
    208         __atomic_store_n((int *)&val, 0, __ATOMIC_RELEASE);
    209         futex((int *)&val, FUTEX_WAKE, 1);
    210 }
    211 
    212 static inline void on_notify( futex_mutex & f, thread$ * t){ unpark(t); }
    213 static inline size_t on_wait( futex_mutex & f ) {unlock(f); return 0;}
    214 
    215 // to set recursion count after getting signalled;
    216 static inline void on_wakeup( futex_mutex & f, size_t recursion ) {}
    217 
    218 //-----------------------------------------------------------------------------
    219142// CLH Spinlock
    220143// - No recursive acquisition
     
    242165}
    243166
    244 static inline void on_notify(clh_lock & this, struct thread$ * t ) { unpark(t); }
    245 static inline size_t on_wait(clh_lock & this) { unlock(this); return 0; }
    246 static inline void on_wakeup(clh_lock & this, size_t recursion ) {
    247         #ifdef REACQ
    248         lock(this);
    249         #endif
    250 }
    251 
    252 
    253167//-----------------------------------------------------------------------------
    254168// Linear backoff Spinlock
     
    257171        __spinlock_t spinlock;
    258172
     173        // Current thread owning the lock
     174        struct thread$ * owner;
     175
    259176        // List of blocked threads
    260177        dlist( thread$ ) blocked_threads;
     
    262179        // Used for comparing and exchanging
    263180        volatile size_t lock_value;
    264 };
    265 
    266 static inline void  ?{}( linear_backoff_then_block_lock & this ) {
     181
     182        // used for linear backoff spinning
     183        int spin_start;
     184        int spin_end;
     185        int spin_count;
     186
     187        // after unsuccessful linear backoff yield this many times
     188        int yield_count;
     189};
     190
     191static inline void  ?{}( linear_backoff_then_block_lock & this, int spin_start, int spin_end, int spin_count, int yield_count ) {
    267192        this.spinlock{};
    268193        this.blocked_threads{};
    269194        this.lock_value = 0;
    270 }
     195        this.spin_start = spin_start;
     196        this.spin_end = spin_end;
     197        this.spin_count = spin_count;
     198        this.yield_count = yield_count;
     199}
     200static inline void  ?{}( linear_backoff_then_block_lock & this ) { this{4, 1024, 16, 0}; }
    271201static inline void ^?{}( linear_backoff_then_block_lock & this ) {}
    272 // static inline void ?{}( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;
    273 // static inline void ?=?( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;
     202static inline void ?{}( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;
     203static inline void ?=?( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;
    274204
    275205static inline bool internal_try_lock(linear_backoff_then_block_lock & this, size_t & compare_val) with(this) {
    276206        if (__atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
     207                owner = active_thread();
    277208                return true;
    278209        }
     
    284215static inline bool try_lock_contention(linear_backoff_then_block_lock & this) with(this) {
    285216        if (__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE) == 0) {
     217                owner = active_thread();
    286218                return true;
    287219        }
     
    302234
    303235static inline void lock(linear_backoff_then_block_lock & this) with(this) {
     236        // if owner just return
     237        if (active_thread() == owner) return;
    304238        size_t compare_val = 0;
    305         int spin = 4;
     239        int spin = spin_start;
    306240        // linear backoff
    307241        for( ;; ) {
     
    310244                if (2 == compare_val) break;
    311245                for (int i = 0; i < spin; i++) Pause();
    312                 if (spin >= 1024) break;
     246                if (spin >= spin_end) break;
    313247                spin += spin;
    314248        }
     
    320254
    321255static inline void unlock(linear_backoff_then_block_lock & this) with(this) {
     256        verify(lock_value > 0);
     257    owner = 0p;
    322258    if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return;
    323259        lock( spinlock __cfaabi_dbg_ctx2 );
     
    329265static inline void on_notify(linear_backoff_then_block_lock & this, struct thread$ * t ) { unpark(t); }
    330266static inline size_t on_wait(linear_backoff_then_block_lock & this) { unlock(this); return 0; }
    331 static inline void on_wakeup(linear_backoff_then_block_lock & this, size_t recursion ) {
    332         #ifdef REACQ
    333         lock(this);
    334         #endif
    335 }
     267static inline void on_wakeup(linear_backoff_then_block_lock & this, size_t recursion ) { lock(this); }
    336268
    337269//-----------------------------------------------------------------------------
     
    374306        assert(!(held && owner == active_thread()));
    375307        #endif
    376         if ( held ) {
     308        if (held) {
    377309                insert_last( blocked_threads, *active_thread() );
    378310                unlock( lock );
     
    399331}
    400332
    401 static inline void on_notify(fast_block_lock & this, struct thread$ * t ) with(this) {
    402         #ifdef REACQ
    403                 lock( lock __cfaabi_dbg_ctx2 );
    404                 insert_last( blocked_threads, *t );
    405                 unlock( lock );
    406         #else
    407                 unpark(t);
    408         #endif
    409 }
     333static inline void on_notify(fast_block_lock & this, struct thread$ * t ) { unpark(t); }
    410334static inline size_t on_wait(fast_block_lock & this) { unlock(this); return 0; }
    411335static inline void on_wakeup(fast_block_lock & this, size_t recursion ) { }
     
    488412        if ( owner != 0p ) {
    489413                insert_last( blocked_threads, *t );
     414                unlock( lock );
    490415        }
    491416        // lock not held
     
    494419                recursion_count = 1;
    495420                unpark( t );
    496         }
    497         unlock( lock );
     421                unlock( lock );
     422        }
    498423}
    499424
     
    549474static inline void lock(spin_queue_lock & this) with(this) {
    550475        mcs_spin_node node;
     476        #ifdef __CFA_DEBUG__
     477        assert(!(held && owner == active_thread()));
     478        #endif
    551479        lock( lock, node );
    552480        while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
    553481        __atomic_store_n(&held, true, __ATOMIC_SEQ_CST);
    554482        unlock( lock, node );
     483        #ifdef __CFA_DEBUG__
     484        owner = active_thread();
     485        #endif
    555486}
    556487
    557488static inline void unlock(spin_queue_lock & this) with(this) {
     489        #ifdef __CFA_DEBUG__
     490        owner = 0p;
     491        #endif
    558492        __atomic_store_n(&held, false, __ATOMIC_RELEASE);
    559493}
    560494
    561 static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) {
    562         unpark(t);
    563 }
     495static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) { unpark(t); }
    564496static inline size_t on_wait(spin_queue_lock & this) { unlock(this); return 0; }
    565 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) {
    566         #ifdef REACQ
    567         lock(this);
    568         #endif
    569 }
     497static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { }
    570498
    571499
     
    583511        // flag showing if lock is held
    584512        volatile bool held;
     513
     514        #ifdef __CFA_DEBUG__
     515        // for deadlock detection
     516        struct thread$ * owner;
     517        #endif
    585518};
    586519
     
    596529static inline void lock(mcs_block_spin_lock & this) with(this) {
    597530        mcs_node node;
     531        #ifdef __CFA_DEBUG__
     532        assert(!(held && owner == active_thread()));
     533        #endif
    598534        lock( lock, node );
    599535        while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
    600536        __atomic_store_n(&held, true, __ATOMIC_SEQ_CST);
    601537        unlock( lock, node );
     538        #ifdef __CFA_DEBUG__
     539        owner = active_thread();
     540        #endif
    602541}
    603542
    604543static inline void unlock(mcs_block_spin_lock & this) with(this) {
     544        #ifdef __CFA_DEBUG__
     545        owner = 0p;
     546        #endif
    605547        __atomic_store_n(&held, false, __ATOMIC_SEQ_CST);
    606548}
     
    608550static inline void on_notify(mcs_block_spin_lock & this, struct thread$ * t ) { unpark(t); }
    609551static inline size_t on_wait(mcs_block_spin_lock & this) { unlock(this); return 0; }
    610 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) {
    611         #ifdef REACQ
    612         lock(this);
    613         #endif
    614 }
     552static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) { }
    615553
    616554//-----------------------------------------------------------------------------
     
    627565        // flag showing if lock is held
    628566        volatile bool held;
     567
     568        #ifdef __CFA_DEBUG__
     569        // for deadlock detection
     570        struct thread$ * owner;
     571        #endif
    629572};
    630573
     
    639582// if this is called recursively IT WILL DEADLOCK!!!!!
    640583static inline void lock(block_spin_lock & this) with(this) {
     584        #ifdef __CFA_DEBUG__
     585        assert(!(held && owner == active_thread()));
     586        #endif
    641587        lock( lock );
    642588        while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
    643589        __atomic_store_n(&held, true, __ATOMIC_RELEASE);
    644590        unlock( lock );
     591        #ifdef __CFA_DEBUG__
     592        owner = active_thread();
     593        #endif
    645594}
    646595
    647596static inline void unlock(block_spin_lock & this) with(this) {
     597        #ifdef __CFA_DEBUG__
     598        owner = 0p;
     599        #endif
    648600        __atomic_store_n(&held, false, __ATOMIC_RELEASE);
    649601}
    650602
    651 static inline void on_notify(block_spin_lock & this, struct thread$ * t ) with(this.lock) {
    652   #ifdef REACQ
    653         // first we acquire internal fast_block_lock
    654         lock( lock __cfaabi_dbg_ctx2 );
    655         if ( held ) { // if internal fast_block_lock is held
    656                 insert_last( blocked_threads, *t );
    657                 unlock( lock );
    658                 return;
    659         }
    660         // if internal fast_block_lock is not held
    661         held = true;
    662         #ifdef __CFA_DEBUG__
    663         owner = t;
    664         #endif
    665         unlock( lock );
    666 
    667   #endif
    668         unpark(t);
    669        
    670 }
     603static inline void on_notify(block_spin_lock & this, struct thread$ * t ) { unpark(t); }
    671604static inline size_t on_wait(block_spin_lock & this) { unlock(this); return 0; }
    672 static inline void on_wakeup(block_spin_lock & this, size_t recursion ) with(this) {
    673   #ifdef REACQ
    674         // now we acquire the entire block_spin_lock upon waking up
    675         while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
    676         __atomic_store_n(&held, true, __ATOMIC_RELEASE);
    677         unlock( lock ); // Now we release the internal fast_spin_lock
    678   #endif
    679 }
     605static inline void on_wakeup(block_spin_lock & this, size_t recursion ) { }
    680606
    681607//-----------------------------------------------------------------------------
Note: See TracChangeset for help on using the changeset viewer.