Changes in / [2dcd80a:d8bdf13]


Ignore:
Location:
libcfa/src/concurrency
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/locks.cfa

    r2dcd80a rd8bdf13  
    414414                #ifdef __CFA_DEBUG__
    415415                        if ( lock_used == 0p ) lock_used = &l;
    416                         else assert(lock_used == &l);
     416                        else { assert(lock_used == &l); }
    417417                #endif
    418418                info_thread( L ) i = { active_thread(), info, &l };
  • libcfa/src/concurrency/locks.hfa

    r2dcd80a rd8bdf13  
    3030#include "time.hfa"
    3131
    32 #include <fstream.hfa>
    33 
    34 
    35 // futex headers
    36 #include <linux/futex.h>      /* Definition of FUTEX_* constants */
    37 #include <sys/syscall.h>      /* Definition of SYS_* constants */
    38 #include <unistd.h>
    39 
    4032//-----------------------------------------------------------------------------
    4133// Semaphore
     
    148140
    149141//-----------------------------------------------------------------------------
    150 // futex_mutex
    151 
    152 // - No cond var support
    153 // - Kernel thd blocking alternative to the spinlock
    154 // - No ownership (will deadlock on reacq)
    155 struct futex_mutex {
    156         // lock state any state other than UNLOCKED is locked
    157         // enum LockState { UNLOCKED = 0, UNCONTENDED = 1, CONTENDED = 2 };
    158        
    159         // stores a lock state
    160         int val;
    161 };
    162 
    163 // to use for FUTEX_WAKE and FUTEX_WAIT (other futex calls will need more params)
    164 static inline int futex(int *uaddr, int futex_op, int val) {
    165     return syscall(SYS_futex, uaddr, futex_op, val, NULL, NULL, 0);
    166 }
    167 
    168 static inline void  ?{}( futex_mutex & this ) with(this) { val = 0; }
    169 
    170 static inline bool internal_try_lock(futex_mutex & this, int & compare_val) with(this) {
    171         return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE);
    172 }
    173 
    174 static inline int internal_exchange(futex_mutex & this) with(this) {
    175         return __atomic_exchange_n((int*)&val, 2, __ATOMIC_ACQUIRE);
    176 }
    177 
    178 // if this is called recursively IT WILL DEADLOCK!!!!!
    179 static inline void lock(futex_mutex & this) with(this) {
    180         int state;
    181 
    182        
    183         // // linear backoff omitted for now
    184         // for( int spin = 4; spin < 1024; spin += spin) {
    185         //      state = 0;
    186         //      // if unlocked, lock and return
    187         //      if (internal_try_lock(this, state)) return;
    188         //      if (2 == state) break;
    189         //      for (int i = 0; i < spin; i++) Pause();
    190         // }
    191 
    192         // no contention try to acquire
    193         if (internal_try_lock(this, state)) return;
    194        
    195         // if not in contended state, set to be in contended state
    196         if (state != 2) state = internal_exchange(this);
    197 
    198         // block and spin until we win the lock
    199         while (state != 0) {
    200                 futex((int*)&val, FUTEX_WAIT, 2); // if val is not 2 this returns with EWOULDBLOCK
    201                 state = internal_exchange(this);
    202         }
    203 }
    204 
    205 static inline void unlock(futex_mutex & this) with(this) {
    206         // if uncontended do atomice unlock and then return
    207         if (__atomic_fetch_sub(&val, 1, __ATOMIC_RELEASE) == 1) return; // TODO: try acq/rel
    208        
    209         // otherwise threads are blocked so we must wake one
    210         __atomic_store_n((int *)&val, 0, __ATOMIC_RELEASE);
    211         futex((int *)&val, FUTEX_WAKE, 1);
    212 }
    213 
    214 static inline void on_notify( futex_mutex & f, thread$ * t){ unpark(t); }
    215 static inline size_t on_wait( futex_mutex & f ) {unlock(f); return 0;}
    216 
    217 // to set recursion count after getting signalled;
    218 static inline void on_wakeup( futex_mutex & f, size_t recursion ) {}
    219 
    220 //-----------------------------------------------------------------------------
    221142// CLH Spinlock
    222143// - No recursive acquisition
     
    244165}
    245166
    246 static inline void on_notify(clh_lock & this, struct thread$ * t ) { unpark(t); }
    247 static inline size_t on_wait(clh_lock & this) { unlock(this); return 0; }
    248 static inline void on_wakeup(clh_lock & this, size_t recursion ) {
    249         #ifdef REACQ
    250         lock(this);
    251         #endif
    252 }
    253 
    254 
    255167//-----------------------------------------------------------------------------
    256168// Linear backoff Spinlock
     
    259171        __spinlock_t spinlock;
    260172
     173        // Current thread owning the lock
     174        struct thread$ * owner;
     175
    261176        // List of blocked threads
    262177        dlist( thread$ ) blocked_threads;
     
    264179        // Used for comparing and exchanging
    265180        volatile size_t lock_value;
    266 };
    267 
    268 static inline void  ?{}( linear_backoff_then_block_lock & this ) {
     181
     182        // used for linear backoff spinning
     183        int spin_start;
     184        int spin_end;
     185        int spin_count;
     186
     187        // after unsuccessful linear backoff yield this many times
     188        int yield_count;
     189};
     190
     191static inline void  ?{}( linear_backoff_then_block_lock & this, int spin_start, int spin_end, int spin_count, int yield_count ) {
    269192        this.spinlock{};
    270193        this.blocked_threads{};
    271194        this.lock_value = 0;
    272 }
     195        this.spin_start = spin_start;
     196        this.spin_end = spin_end;
     197        this.spin_count = spin_count;
     198        this.yield_count = yield_count;
     199}
     200static inline void  ?{}( linear_backoff_then_block_lock & this ) { this{4, 1024, 16, 0}; }
    273201static inline void ^?{}( linear_backoff_then_block_lock & this ) {}
    274 // static inline void ?{}( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;
    275 // static inline void ?=?( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;
     202static inline void ?{}( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;
     203static inline void ?=?( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;
    276204
    277205static inline bool internal_try_lock(linear_backoff_then_block_lock & this, size_t & compare_val) with(this) {
    278206        if (__atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
     207                owner = active_thread();
    279208                return true;
    280209        }
     
    286215static inline bool try_lock_contention(linear_backoff_then_block_lock & this) with(this) {
    287216        if (__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE) == 0) {
     217                owner = active_thread();
    288218                return true;
    289219        }
     
    292222
    293223static inline bool block(linear_backoff_then_block_lock & this) with(this) {
    294         lock( spinlock __cfaabi_dbg_ctx2 ); // TODO change to lockfree queue (MPSC)
     224        lock( spinlock __cfaabi_dbg_ctx2 );
    295225        if (lock_value != 2) {
    296226                unlock( spinlock );
     
    304234
    305235static inline void lock(linear_backoff_then_block_lock & this) with(this) {
     236        // if owner just return
     237        if (active_thread() == owner) return;
    306238        size_t compare_val = 0;
    307         int spin = 4;
     239        int spin = spin_start;
    308240        // linear backoff
    309241        for( ;; ) {
     
    312244                if (2 == compare_val) break;
    313245                for (int i = 0; i < spin; i++) Pause();
    314                 if (spin >= 1024) break;
     246                if (spin >= spin_end) break;
    315247                spin += spin;
    316248        }
     
    322254
    323255static inline void unlock(linear_backoff_then_block_lock & this) with(this) {
     256        verify(lock_value > 0);
     257    owner = 0p;
    324258    if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return;
    325259        lock( spinlock __cfaabi_dbg_ctx2 );
     
    331265static inline void on_notify(linear_backoff_then_block_lock & this, struct thread$ * t ) { unpark(t); }
    332266static inline size_t on_wait(linear_backoff_then_block_lock & this) { unlock(this); return 0; }
    333 static inline void on_wakeup(linear_backoff_then_block_lock & this, size_t recursion ) {
    334         #ifdef REACQ
    335         lock(this);
    336         #endif
    337 }
     267static inline void on_wakeup(linear_backoff_then_block_lock & this, size_t recursion ) { lock(this); }
    338268
    339269//-----------------------------------------------------------------------------
     
    376306        assert(!(held && owner == active_thread()));
    377307        #endif
    378         if ( held ) {
     308        if (held) {
    379309                insert_last( blocked_threads, *active_thread() );
    380310                unlock( lock );
     
    401331}
    402332
    403 static inline void on_notify(fast_block_lock & this, struct thread$ * t ) with(this) {
    404         #ifdef REACQ
    405                 lock( lock __cfaabi_dbg_ctx2 );
    406                 insert_last( blocked_threads, *t );
    407                 unlock( lock );
    408         #else
    409                 unpark(t);
    410         #endif
    411 }
     333static inline void on_notify(fast_block_lock & this, struct thread$ * t ) { unpark(t); }
    412334static inline size_t on_wait(fast_block_lock & this) { unlock(this); return 0; }
    413335static inline void on_wakeup(fast_block_lock & this, size_t recursion ) { }
     
    490412        if ( owner != 0p ) {
    491413                insert_last( blocked_threads, *t );
     414                unlock( lock );
    492415        }
    493416        // lock not held
     
    496419                recursion_count = 1;
    497420                unpark( t );
    498         }
    499         unlock( lock );
     421                unlock( lock );
     422        }
    500423}
    501424
     
    551474static inline void lock(spin_queue_lock & this) with(this) {
    552475        mcs_spin_node node;
     476        #ifdef __CFA_DEBUG__
     477        assert(!(held && owner == active_thread()));
     478        #endif
    553479        lock( lock, node );
    554480        while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
    555481        __atomic_store_n(&held, true, __ATOMIC_SEQ_CST);
    556482        unlock( lock, node );
     483        #ifdef __CFA_DEBUG__
     484        owner = active_thread();
     485        #endif
    557486}
    558487
    559488static inline void unlock(spin_queue_lock & this) with(this) {
     489        #ifdef __CFA_DEBUG__
     490        owner = 0p;
     491        #endif
    560492        __atomic_store_n(&held, false, __ATOMIC_RELEASE);
    561493}
    562494
    563 static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) {
    564         unpark(t);
    565 }
     495static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) { unpark(t); }
    566496static inline size_t on_wait(spin_queue_lock & this) { unlock(this); return 0; }
    567 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) {
    568         #ifdef REACQ
    569         lock(this);
    570         #endif
    571 }
     497static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { }
    572498
    573499
     
    585511        // flag showing if lock is held
    586512        volatile bool held;
     513
     514        #ifdef __CFA_DEBUG__
     515        // for deadlock detection
     516        struct thread$ * owner;
     517        #endif
    587518};
    588519
     
    598529static inline void lock(mcs_block_spin_lock & this) with(this) {
    599530        mcs_node node;
     531        #ifdef __CFA_DEBUG__
     532        assert(!(held && owner == active_thread()));
     533        #endif
    600534        lock( lock, node );
    601535        while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
    602536        __atomic_store_n(&held, true, __ATOMIC_SEQ_CST);
    603537        unlock( lock, node );
     538        #ifdef __CFA_DEBUG__
     539        owner = active_thread();
     540        #endif
    604541}
    605542
    606543static inline void unlock(mcs_block_spin_lock & this) with(this) {
     544        #ifdef __CFA_DEBUG__
     545        owner = 0p;
     546        #endif
    607547        __atomic_store_n(&held, false, __ATOMIC_SEQ_CST);
    608548}
     
    610550static inline void on_notify(mcs_block_spin_lock & this, struct thread$ * t ) { unpark(t); }
    611551static inline size_t on_wait(mcs_block_spin_lock & this) { unlock(this); return 0; }
    612 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) {
    613         #ifdef REACQ
    614         lock(this);
    615         #endif
    616 }
     552static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) { }
    617553
    618554//-----------------------------------------------------------------------------
     
    629565        // flag showing if lock is held
    630566        volatile bool held;
     567
     568        #ifdef __CFA_DEBUG__
     569        // for deadlock detection
     570        struct thread$ * owner;
     571        #endif
    631572};
    632573
     
    641582// if this is called recursively IT WILL DEADLOCK!!!!!
    642583static inline void lock(block_spin_lock & this) with(this) {
     584        #ifdef __CFA_DEBUG__
     585        assert(!(held && owner == active_thread()));
     586        #endif
    643587        lock( lock );
    644588        while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
    645589        __atomic_store_n(&held, true, __ATOMIC_RELEASE);
    646590        unlock( lock );
     591        #ifdef __CFA_DEBUG__
     592        owner = active_thread();
     593        #endif
    647594}
    648595
    649596static inline void unlock(block_spin_lock & this) with(this) {
     597        #ifdef __CFA_DEBUG__
     598        owner = 0p;
     599        #endif
    650600        __atomic_store_n(&held, false, __ATOMIC_RELEASE);
    651601}
    652602
    653 static inline void on_notify(block_spin_lock & this, struct thread$ * t ) with(this.lock) {
    654   #ifdef REACQ
    655         // first we acquire internal fast_block_lock
    656         lock( lock __cfaabi_dbg_ctx2 );
    657         if ( held ) { // if internal fast_block_lock is held
    658                 insert_last( blocked_threads, *t );
    659                 unlock( lock );
    660                 return;
    661         }
    662         // if internal fast_block_lock is not held
    663         held = true;
    664         #ifdef __CFA_DEBUG__
    665         owner = t;
    666         #endif
    667         unlock( lock );
    668 
    669   #endif
    670         unpark(t);
    671        
    672 }
     603static inline void on_notify(block_spin_lock & this, struct thread$ * t ) { unpark(t); }
    673604static inline size_t on_wait(block_spin_lock & this) { unlock(this); return 0; }
    674 static inline void on_wakeup(block_spin_lock & this, size_t recursion ) with(this) {
    675   #ifdef REACQ
    676         // now we acquire the entire block_spin_lock upon waking up
    677         while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
    678         __atomic_store_n(&held, true, __ATOMIC_RELEASE);
    679         unlock( lock ); // Now we release the internal fast_spin_lock
    680   #endif
    681 }
     605static inline void on_wakeup(block_spin_lock & this, size_t recursion ) { }
    682606
    683607//-----------------------------------------------------------------------------
Note: See TracChangeset for help on using the changeset viewer.