Changes in / [d8bdf13:2dcd80a]


Ignore:
Location:
libcfa/src/concurrency
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/locks.cfa

    rd8bdf13 r2dcd80a  
    414414                #ifdef __CFA_DEBUG__
    415415                        if ( lock_used == 0p ) lock_used = &l;
    416                         else { assert(lock_used == &l); }
     416                        else assert(lock_used == &l);
    417417                #endif
    418418                info_thread( L ) i = { active_thread(), info, &l };
  • libcfa/src/concurrency/locks.hfa

    rd8bdf13 r2dcd80a  
    3030#include "time.hfa"
    3131
     32#include <fstream.hfa>
     33
     34
     35// futex headers
     36#include <linux/futex.h>      /* Definition of FUTEX_* constants */
     37#include <sys/syscall.h>      /* Definition of SYS_* constants */
     38#include <unistd.h>
     39
    3240//-----------------------------------------------------------------------------
    3341// Semaphore
     
    140148
    141149//-----------------------------------------------------------------------------
     150// futex_mutex
     151
     152// - No cond var support
     153// - Kernel thd blocking alternative to the spinlock
     154// - No ownership (will deadlock on reacq)
     155struct futex_mutex {
     156        // lock state any state other than UNLOCKED is locked
     157        // enum LockState { UNLOCKED = 0, UNCONTENDED = 1, CONTENDED = 2 };
     158       
     159        // stores a lock state
     160        int val;
     161};
     162
     163// to use for FUTEX_WAKE and FUTEX_WAIT (other futex calls will need more params)
     164static inline int futex(int *uaddr, int futex_op, int val) {
     165    return syscall(SYS_futex, uaddr, futex_op, val, NULL, NULL, 0);
     166}
     167
     168static inline void  ?{}( futex_mutex & this ) with(this) { val = 0; }
     169
     170static inline bool internal_try_lock(futex_mutex & this, int & compare_val) with(this) {
     171        return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE);
     172}
     173
     174static inline int internal_exchange(futex_mutex & this) with(this) {
     175        return __atomic_exchange_n((int*)&val, 2, __ATOMIC_ACQUIRE);
     176}
     177
     178// if this is called recursively IT WILL DEADLOCK!!!!!
     179static inline void lock(futex_mutex & this) with(this) {
     180        int state;
     181
     182       
     183        // // linear backoff omitted for now
     184        // for( int spin = 4; spin < 1024; spin += spin) {
     185        //      state = 0;
     186        //      // if unlocked, lock and return
     187        //      if (internal_try_lock(this, state)) return;
     188        //      if (2 == state) break;
     189        //      for (int i = 0; i < spin; i++) Pause();
     190        // }
     191
     192        // no contention try to acquire
     193        if (internal_try_lock(this, state)) return;
     194       
     195        // if not in contended state, set to be in contended state
     196        if (state != 2) state = internal_exchange(this);
     197
     198        // block and spin until we win the lock
     199        while (state != 0) {
     200                futex((int*)&val, FUTEX_WAIT, 2); // if val is not 2 this returns with EWOULDBLOCK
     201                state = internal_exchange(this);
     202        }
     203}
     204
     205static inline void unlock(futex_mutex & this) with(this) {
     206        // if uncontended do atomice unlock and then return
     207        if (__atomic_fetch_sub(&val, 1, __ATOMIC_RELEASE) == 1) return; // TODO: try acq/rel
     208       
     209        // otherwise threads are blocked so we must wake one
     210        __atomic_store_n((int *)&val, 0, __ATOMIC_RELEASE);
     211        futex((int *)&val, FUTEX_WAKE, 1);
     212}
     213
     214static inline void on_notify( futex_mutex & f, thread$ * t){ unpark(t); }
     215static inline size_t on_wait( futex_mutex & f ) {unlock(f); return 0;}
     216
     217// to set recursion count after getting signalled;
     218static inline void on_wakeup( futex_mutex & f, size_t recursion ) {}
     219
     220//-----------------------------------------------------------------------------
    142221// CLH Spinlock
    143222// - No recursive acquisition
     
    165244}
    166245
     246static inline void on_notify(clh_lock & this, struct thread$ * t ) { unpark(t); }
     247static inline size_t on_wait(clh_lock & this) { unlock(this); return 0; }
     248static inline void on_wakeup(clh_lock & this, size_t recursion ) {
     249        #ifdef REACQ
     250        lock(this);
     251        #endif
     252}
     253
     254
    167255//-----------------------------------------------------------------------------
    168256// Linear backoff Spinlock
     
    171259        __spinlock_t spinlock;
    172260
    173         // Current thread owning the lock
    174         struct thread$ * owner;
    175 
    176261        // List of blocked threads
    177262        dlist( thread$ ) blocked_threads;
     
    179264        // Used for comparing and exchanging
    180265        volatile size_t lock_value;
    181 
    182         // used for linear backoff spinning
    183         int spin_start;
    184         int spin_end;
    185         int spin_count;
    186 
    187         // after unsuccessful linear backoff yield this many times
    188         int yield_count;
    189 };
    190 
    191 static inline void  ?{}( linear_backoff_then_block_lock & this, int spin_start, int spin_end, int spin_count, int yield_count ) {
     266};
     267
     268static inline void  ?{}( linear_backoff_then_block_lock & this ) {
    192269        this.spinlock{};
    193270        this.blocked_threads{};
    194271        this.lock_value = 0;
    195         this.spin_start = spin_start;
    196         this.spin_end = spin_end;
    197         this.spin_count = spin_count;
    198         this.yield_count = yield_count;
    199 }
    200 static inline void  ?{}( linear_backoff_then_block_lock & this ) { this{4, 1024, 16, 0}; }
     272}
    201273static inline void ^?{}( linear_backoff_then_block_lock & this ) {}
    202 static inline void ?{}( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;
    203 static inline void ?=?( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;
     274// static inline void ?{}( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;
     275// static inline void ?=?( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;
    204276
    205277static inline bool internal_try_lock(linear_backoff_then_block_lock & this, size_t & compare_val) with(this) {
    206278        if (__atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
    207                 owner = active_thread();
    208279                return true;
    209280        }
     
    215286static inline bool try_lock_contention(linear_backoff_then_block_lock & this) with(this) {
    216287        if (__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE) == 0) {
    217                 owner = active_thread();
    218288                return true;
    219289        }
     
    222292
    223293static inline bool block(linear_backoff_then_block_lock & this) with(this) {
    224         lock( spinlock __cfaabi_dbg_ctx2 );
     294        lock( spinlock __cfaabi_dbg_ctx2 ); // TODO change to lockfree queue (MPSC)
    225295        if (lock_value != 2) {
    226296                unlock( spinlock );
     
    234304
    235305static inline void lock(linear_backoff_then_block_lock & this) with(this) {
    236         // if owner just return
    237         if (active_thread() == owner) return;
    238306        size_t compare_val = 0;
    239         int spin = spin_start;
     307        int spin = 4;
    240308        // linear backoff
    241309        for( ;; ) {
     
    244312                if (2 == compare_val) break;
    245313                for (int i = 0; i < spin; i++) Pause();
    246                 if (spin >= spin_end) break;
     314                if (spin >= 1024) break;
    247315                spin += spin;
    248316        }
     
    254322
    255323static inline void unlock(linear_backoff_then_block_lock & this) with(this) {
    256         verify(lock_value > 0);
    257     owner = 0p;
    258324    if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return;
    259325        lock( spinlock __cfaabi_dbg_ctx2 );
     
    265331static inline void on_notify(linear_backoff_then_block_lock & this, struct thread$ * t ) { unpark(t); }
    266332static inline size_t on_wait(linear_backoff_then_block_lock & this) { unlock(this); return 0; }
    267 static inline void on_wakeup(linear_backoff_then_block_lock & this, size_t recursion ) { lock(this); }
     333static inline void on_wakeup(linear_backoff_then_block_lock & this, size_t recursion ) {
     334        #ifdef REACQ
     335        lock(this);
     336        #endif
     337}
    268338
    269339//-----------------------------------------------------------------------------
     
    306376        assert(!(held && owner == active_thread()));
    307377        #endif
    308         if (held) {
     378        if ( held ) {
    309379                insert_last( blocked_threads, *active_thread() );
    310380                unlock( lock );
     
    331401}
    332402
    333 static inline void on_notify(fast_block_lock & this, struct thread$ * t ) { unpark(t); }
     403static inline void on_notify(fast_block_lock & this, struct thread$ * t ) with(this) {
     404        #ifdef REACQ
     405                lock( lock __cfaabi_dbg_ctx2 );
     406                insert_last( blocked_threads, *t );
     407                unlock( lock );
     408        #else
     409                unpark(t);
     410        #endif
     411}
    334412static inline size_t on_wait(fast_block_lock & this) { unlock(this); return 0; }
    335413static inline void on_wakeup(fast_block_lock & this, size_t recursion ) { }
     
    412490        if ( owner != 0p ) {
    413491                insert_last( blocked_threads, *t );
    414                 unlock( lock );
    415492        }
    416493        // lock not held
     
    419496                recursion_count = 1;
    420497                unpark( t );
    421                 unlock( lock );
    422         }
     498        }
     499        unlock( lock );
    423500}
    424501
     
    474551static inline void lock(spin_queue_lock & this) with(this) {
    475552        mcs_spin_node node;
    476         #ifdef __CFA_DEBUG__
    477         assert(!(held && owner == active_thread()));
    478         #endif
    479553        lock( lock, node );
    480554        while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
    481555        __atomic_store_n(&held, true, __ATOMIC_SEQ_CST);
    482556        unlock( lock, node );
    483         #ifdef __CFA_DEBUG__
    484         owner = active_thread();
    485         #endif
    486557}
    487558
    488559static inline void unlock(spin_queue_lock & this) with(this) {
    489         #ifdef __CFA_DEBUG__
    490         owner = 0p;
    491         #endif
    492560        __atomic_store_n(&held, false, __ATOMIC_RELEASE);
    493561}
    494562
    495 static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) { unpark(t); }
     563static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) {
     564        unpark(t);
     565}
    496566static inline size_t on_wait(spin_queue_lock & this) { unlock(this); return 0; }
    497 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { }
     567static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) {
     568        #ifdef REACQ
     569        lock(this);
     570        #endif
     571}
    498572
    499573
     
    511585        // flag showing if lock is held
    512586        volatile bool held;
    513 
    514         #ifdef __CFA_DEBUG__
    515         // for deadlock detection
    516         struct thread$ * owner;
    517         #endif
    518587};
    519588
     
    529598static inline void lock(mcs_block_spin_lock & this) with(this) {
    530599        mcs_node node;
    531         #ifdef __CFA_DEBUG__
    532         assert(!(held && owner == active_thread()));
    533         #endif
    534600        lock( lock, node );
    535601        while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
    536602        __atomic_store_n(&held, true, __ATOMIC_SEQ_CST);
    537603        unlock( lock, node );
    538         #ifdef __CFA_DEBUG__
    539         owner = active_thread();
    540         #endif
    541604}
    542605
    543606static inline void unlock(mcs_block_spin_lock & this) with(this) {
    544         #ifdef __CFA_DEBUG__
    545         owner = 0p;
    546         #endif
    547607        __atomic_store_n(&held, false, __ATOMIC_SEQ_CST);
    548608}
     
    550610static inline void on_notify(mcs_block_spin_lock & this, struct thread$ * t ) { unpark(t); }
    551611static inline size_t on_wait(mcs_block_spin_lock & this) { unlock(this); return 0; }
    552 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) { }
     612static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) {
     613        #ifdef REACQ
     614        lock(this);
     615        #endif
     616}
    553617
    554618//-----------------------------------------------------------------------------
     
    565629        // flag showing if lock is held
    566630        volatile bool held;
    567 
    568         #ifdef __CFA_DEBUG__
    569         // for deadlock detection
    570         struct thread$ * owner;
    571         #endif
    572631};
    573632
     
    582641// if this is called recursively IT WILL DEADLOCK!!!!!
    583642static inline void lock(block_spin_lock & this) with(this) {
    584         #ifdef __CFA_DEBUG__
    585         assert(!(held && owner == active_thread()));
    586         #endif
    587643        lock( lock );
    588644        while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
    589645        __atomic_store_n(&held, true, __ATOMIC_RELEASE);
    590646        unlock( lock );
     647}
     648
     649static inline void unlock(block_spin_lock & this) with(this) {
     650        __atomic_store_n(&held, false, __ATOMIC_RELEASE);
     651}
     652
     653static inline void on_notify(block_spin_lock & this, struct thread$ * t ) with(this.lock) {
     654  #ifdef REACQ
     655        // first we acquire internal fast_block_lock
     656        lock( lock __cfaabi_dbg_ctx2 );
     657        if ( held ) { // if internal fast_block_lock is held
     658                insert_last( blocked_threads, *t );
     659                unlock( lock );
     660                return;
     661        }
     662        // if internal fast_block_lock is not held
     663        held = true;
    591664        #ifdef __CFA_DEBUG__
    592         owner = active_thread();
    593         #endif
    594 }
    595 
    596 static inline void unlock(block_spin_lock & this) with(this) {
    597         #ifdef __CFA_DEBUG__
    598         owner = 0p;
    599         #endif
    600         __atomic_store_n(&held, false, __ATOMIC_RELEASE);
    601 }
    602 
    603 static inline void on_notify(block_spin_lock & this, struct thread$ * t ) { unpark(t); }
     665        owner = t;
     666        #endif
     667        unlock( lock );
     668
     669  #endif
     670        unpark(t);
     671       
     672}
    604673static inline size_t on_wait(block_spin_lock & this) { unlock(this); return 0; }
    605 static inline void on_wakeup(block_spin_lock & this, size_t recursion ) { }
     674static inline void on_wakeup(block_spin_lock & this, size_t recursion ) with(this) {
     675  #ifdef REACQ
     676        // now we acquire the entire block_spin_lock upon waking up
     677        while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
     678        __atomic_store_n(&held, true, __ATOMIC_RELEASE);
     679        unlock( lock ); // Now we release the internal fast_spin_lock
     680  #endif
     681}
    606682
    607683//-----------------------------------------------------------------------------
Note: See TracChangeset for help on using the changeset viewer.