Changeset a45e21c


Ignore:
Timestamp:
Mar 30, 2023, 3:52:00 PM (13 months ago)
Author:
caparson <caparson@…>
Branches:
ADT, ast-experimental, master
Children:
76a8400
Parents:
efdd18c
Message:

cleaned up channel, added safety/productivity features to channels. added go style spin/block lock. small cleanup in locks.hfa

Location:
libcfa/src/concurrency
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    refdd18c ra45e21c  
    33#include <locks.hfa>
    44#include <list.hfa>
    5 
    6 #define __COOP_CHANNEL
    7 #ifdef __PREVENTION_CHANNEL
    8 forall( T ) {
    9 struct channel {
    10     size_t size, count, front, back;
    11     T * buffer;
    12     thread$ * chair;
    13     T * chair_elem;
    14     exp_backoff_then_block_lock c_lock, p_lock;
    15     __spinlock_t mutex_lock;
    16     char __padding[64]; // avoid false sharing in arrays of channels
    17 };
    18 
    19 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
    20     size = _size;
    21     front = back = count = 0;
    22     buffer = aalloc( size );
    23     chair = 0p;
    24     mutex_lock{};
    25     c_lock{};
    26     p_lock{};
    27 }
    28 
    29 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    30 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
    31 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    32 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    33 static inline bool has_waiters( channel(T) & chan ) with(chan) { return chair != 0p; }
    34 
    35 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
    36     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    37     count += 1;
    38     back++;
    39     if ( back == size ) back = 0;
    40 }
    41 
    42 static inline void insert( channel(T) & chan, T elem ) with( chan ) {
    43     lock( p_lock );
    44     lock( mutex_lock __cfaabi_dbg_ctx2 );
    45 
    46     // have to check for the zero size channel case
    47     if ( size == 0 && chair != 0p ) {
    48         memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
    49         unpark( chair );
    50         chair = 0p;
    51         unlock( mutex_lock );
    52         unlock( p_lock );
    53         unlock( c_lock );
    54         return;
    55     }
    56 
    57     // wait if buffer is full, work will be completed by someone else
    58     if ( count == size ) {
    59         chair = active_thread();
    60         chair_elem = &elem;
    61         unlock( mutex_lock );
    62         park( );
    63         return;
    64     } // if
    65 
    66     if ( chair != 0p ) {
    67         memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
    68         unpark( chair );
    69         chair = 0p;
    70         unlock( mutex_lock );
    71         unlock( p_lock );
    72         unlock( c_lock );
    73         return;
    74     }
    75     insert_( chan, elem );
    76 
    77     unlock( mutex_lock );
    78     unlock( p_lock );
    79 }
    80 
    81 static inline T remove( channel(T) & chan ) with(chan) {
    82     lock( c_lock );
    83     lock( mutex_lock __cfaabi_dbg_ctx2 );
    84     T retval;
    85 
    86     // have to check for the zero size channel case
    87     if ( size == 0 && chair != 0p ) {
    88         memcpy((void *)&retval, (void *)chair_elem, sizeof(T));
    89         unpark( chair );
    90         chair = 0p;
    91         unlock( mutex_lock );
    92         unlock( p_lock );
    93         unlock( c_lock );
    94         return retval;
    95     }
    96 
    97     // wait if buffer is empty, work will be completed by someone else
    98     if ( count == 0 ) {
    99         chair = active_thread();
    100         chair_elem = &retval;
    101         unlock( mutex_lock );
    102         park( );
    103         return retval;
    104     }
    105 
    106     // Remove from buffer
    107     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    108     count -= 1;
    109     front++;
    110     if ( front == size ) front = 0;
    111 
    112     if ( chair != 0p ) {
    113         insert_( chan, *chair_elem );  // do waiting producer work
    114         unpark( chair );
    115         chair = 0p;
    116         unlock( mutex_lock );
    117         unlock( p_lock );
    118         unlock( c_lock );
    119         return retval;
    120     }
    121 
    122     unlock( mutex_lock );
    123     unlock( c_lock );
    124     return retval;
    125 }
    126 
    127 } // forall( T )
    128 #endif
    129 
    130 #ifdef __COOP_CHANNEL
     5#include <mutex_stmt.hfa>
    1316
    1327// link field used for threads waiting on channel
     
    14823}
    14924
     25// wake one thread from the list
     26static inline void wake_one( dlist( wait_link ) & queue ) {
     27    wait_link & popped = try_pop_front( queue );
     28    unpark( popped.t );
     29}
     30
     31// returns true if woken due to shutdown
     32// blocks thread on list and releases passed lock
     33static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) {
     34    wait_link w{ active_thread(), elem_ptr };
     35    insert_last( queue, w );
     36    unlock( lock );
     37    park();
     38    return w.elem == 0p;
     39}
     40
     41// void * used for some fields since exceptions don't work with parametric polymorphism currently
     42exception channel_closed {
     43    // on failed insert elem is a ptr to the element attempting to be inserted
     44    // on failed remove elem ptr is 0p
     45    // on resumption of a failed insert this elem will be inserted
     46    // so a user may modify it in the resumption handler
     47    void * elem;
     48
     49    // pointer to chan that is closed
     50    void * closed_chan;
     51};
     52vtable(channel_closed) channel_closed_vt;
     53
     54// #define CHAN_STATS // define this to get channel stats printed in dtor
     55
    15056forall( T ) {
    15157
    152 struct channel {
    153     size_t size;
    154     size_t front, back, count;
     58struct __attribute__((aligned(128))) channel {
     59    size_t size, front, back, count;
    15560    T * buffer;
    156     dlist( wait_link ) prods, cons;
    157     exp_backoff_then_block_lock mutex_lock;
     61    dlist( wait_link ) prods, cons; // lists of blocked threads
     62    go_mutex mutex_lock;            // MX lock
     63    bool closed;                    // indicates channel close/open
     64    #ifdef CHAN_STATS
     65    size_t blocks, operations;      // counts total ops and ops resulting in a blocked thd
     66    #endif
    15867};
    15968
     
    16574    cons{};
    16675    mutex_lock{};
     76    closed = false;
     77    #ifdef CHAN_STATS
     78    blocks = 0;
     79    operations = 0;
     80    #endif
    16781}
    16882
    16983static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    170 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
     84static inline void ^?{}( channel(T) &c ) with(c) {
     85    #ifdef CHAN_STATS
     86    printf("Channel %p Blocks: %lu, Operations: %lu, %.2f%% of ops blocked\n", &c, blocks, operations, ((double)blocks)/operations * 100);
     87    #endif
     88    verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
     89    delete( buffer );
     90}
    17191static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    17292static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
     
    17595static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
    17696
    177 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
     97// closes the channel and notifies all blocked threads
     98static inline void close( channel(T) & chan ) with(chan) {
     99    lock( mutex_lock );
     100    closed = true;
     101
     102    // flush waiting consumers and producers
     103    while ( has_waiting_consumers( chan ) ) {
     104        cons`first.elem = 0p;
     105        wake_one( cons );
     106    }
     107    while ( has_waiting_producers( chan ) ) {
     108        prods`first.elem = 0p;
     109        wake_one( prods );
     110    }
     111    unlock(mutex_lock);
     112}
     113
     114static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
     115
     116static inline void flush( channel(T) & chan, T elem ) with(chan) {
     117    lock( mutex_lock );
     118    while ( count == 0 && !cons`isEmpty ) {
     119        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     120        wake_one( cons );
     121    }
     122    unlock( mutex_lock );
     123}
     124
     125// handles buffer insert
     126static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
    178127    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    179128    count += 1;
     
    182131}
    183132
    184 static inline void wake_one( dlist( wait_link ) & queue ) {
    185     wait_link & popped = try_pop_front( queue );
    186     unpark( popped.t );
    187 }
    188 
    189 static inline void block( dlist( wait_link ) & queue, void * elem_ptr, exp_backoff_then_block_lock & lock ) {
    190     wait_link w{ active_thread(), elem_ptr };
    191     insert_last( queue, w );
    192     unlock( lock );
    193     park();
     133// does the buffer insert or hands elem directly to consumer if one is waiting
     134static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {
     135    if ( count == 0 && !cons`isEmpty ) {
     136        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     137        wake_one( cons );
     138    } else __buf_insert( chan, elem );
     139}
     140
     141// needed to avoid an extra copy in closed case
     142static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
     143    lock( mutex_lock );
     144    #ifdef CHAN_STATS
     145    operations++;
     146    #endif
     147    if ( count == size ) { unlock( mutex_lock ); return false; }
     148    __do_insert( chan, elem );
     149    unlock( mutex_lock );
     150    return true;
     151}
     152
     153// attempts a nonblocking insert
     154// returns true if insert was successful, false otherwise
     155static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
     156
     157// handles closed case of insert routine
     158static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
     159    channel_closed except{&channel_closed_vt, &elem, &chan };
     160    throwResume except; // throw closed resumption
     161    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
    194162}
    195163
    196164static inline void insert( channel(T) & chan, T elem ) with(chan) {
    197     lock( mutex_lock );
     165    // check for close before acquire mx
     166    if ( unlikely(closed) ) {
     167        __closed_insert( chan, elem );
     168        return;
     169    }
     170
     171    lock( mutex_lock );
     172
     173    #ifdef CHAN_STATS
     174    if ( !closed ) operations++;
     175    #endif
     176
     177    // if closed handle
     178    if ( unlikely(closed) ) {
     179        unlock( mutex_lock );
     180        __closed_insert( chan, elem );
     181        return;
     182    }
    198183
    199184    // have to check for the zero size channel case
     
    202187        wake_one( cons );
    203188        unlock( mutex_lock );
    204         return;
     189        return true;
    205190    }
    206191
    207192    // wait if buffer is full, work will be completed by someone else
    208193    if ( count == size ) {
    209         block( prods, &elem, mutex_lock );
     194        #ifdef CHAN_STATS
     195        blocks++;
     196        #endif
     197
     198        // check for if woken due to close
     199        if ( unlikely( block( prods, &elem, mutex_lock ) ) )
     200            __closed_insert( chan, elem );
    210201        return;
    211202    } // if
     
    214205        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    215206        wake_one( cons );
    216     } else insert_( chan, elem );
     207    } else __buf_insert( chan, elem );
    217208   
    218209    unlock( mutex_lock );
     210    return;
     211}
     212
     213// handles buffer remove
     214static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) {
     215    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     216    count -= 1;
     217    front = (front + 1) % size;
     218}
     219
     220// does the buffer remove and potentially does waiting producer work
     221static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
     222    __buf_remove( chan, retval );
     223    if (count == size - 1 && !prods`isEmpty ) {
     224        __buf_insert( chan, *(T *)prods`first.elem );  // do waiting producer work
     225        wake_one( prods );
     226    }
     227}
     228
     229// needed to avoid an extra copy in closed case and single return val case
     230static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
     231    lock( mutex_lock );
     232    #ifdef CHAN_STATS
     233    operations++;
     234    #endif
     235    if ( count == 0 ) { unlock( mutex_lock ); return false; }
     236    __do_remove( chan, retval );
     237    unlock( mutex_lock );
     238    return true;
     239}
     240
     241// attempts a nonblocking remove
     242// returns [T, true] if insert was successful
     243// returns [T, false] if insert was successful (T uninit)
     244static inline [T, bool] try_remove( channel(T) & chan ) {
     245    T retval;
     246    return [ retval, __internal_try_remove( chan, retval ) ];
     247}
     248
     249static inline T try_remove( channel(T) & chan, T elem ) {
     250    T retval;
     251    __internal_try_remove( chan, retval );
     252    return retval;
     253}
     254
     255// handles closed case of insert routine
     256static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
     257    channel_closed except{&channel_closed_vt, 0p, &chan };
     258    throwResume except; // throw resumption
     259    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
    219260}
    220261
    221262static inline T remove( channel(T) & chan ) with(chan) {
    222     lock( mutex_lock );
    223263    T retval;
     264    if ( unlikely(closed) ) {
     265        __closed_remove( chan, retval );
     266        return retval;
     267    }
     268    lock( mutex_lock );
     269
     270    #ifdef CHAN_STATS
     271    if ( !closed ) operations++;
     272    #endif
     273
     274    if ( unlikely(closed) ) {
     275        unlock( mutex_lock );
     276        __closed_remove( chan, retval );
     277        return retval;
     278    }
    224279
    225280    // have to check for the zero size channel case
     
    233288    // wait if buffer is empty, work will be completed by someone else
    234289    if (count == 0) {
    235         block( cons, &retval, mutex_lock );
     290        #ifdef CHAN_STATS
     291        blocks++;
     292        #endif
     293        // check for if woken due to close
     294        if ( unlikely( block( cons, &retval, mutex_lock ) ) )
     295            __closed_remove( chan, retval );
    236296        return retval;
    237297    }
    238298
    239299    // Remove from buffer
    240     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    241     count -= 1;
    242     front = (front + 1) % size;
    243 
    244     if (count == size - 1 && !prods`isEmpty ) {
    245         insert_( chan, *(T *)prods`first.elem );  // do waiting producer work
    246         wake_one( prods );
    247     }
     300    __do_remove( chan, retval );
    248301
    249302    unlock( mutex_lock );
     
    251304}
    252305} // forall( T )
    253 #endif
    254 
    255 #ifdef __BARGE_CHANNEL
    256 forall( T ) {
    257 struct channel {
    258     size_t size;
    259     size_t front, back, count;
    260     T * buffer;
    261     fast_cond_var( exp_backoff_then_block_lock ) prods, cons;
    262     exp_backoff_then_block_lock mutex_lock;
    263 };
    264 
    265 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
    266     size = _size;
    267     front = back = count = 0;
    268     buffer = aalloc( size );
    269     prods{};
    270     cons{};
    271     mutex_lock{};
    272 }
    273 
    274 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    275 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
    276 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    277 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    278 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
    279 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
    280 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
    281 
    282 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
    283     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    284     count += 1;
    285     back++;
    286     if ( back == size ) back = 0;
    287 }
    288 
    289 
    290 static inline void insert( channel(T) & chan, T elem ) with(chan) {
    291     lock( mutex_lock );
    292 
    293     while ( count == size ) {
    294         wait( prods, mutex_lock );
    295     } // if
    296 
    297     insert_( chan, elem );
    298    
    299     if ( !notify_one( cons ) && count < size )
    300         notify_one( prods );
    301 
    302     unlock( mutex_lock );
    303 }
    304 
    305 static inline T remove( channel(T) & chan ) with(chan) {
    306     lock( mutex_lock );
    307     T retval;
    308 
    309     while (count == 0) {
    310         wait( cons, mutex_lock );
    311     }
    312 
    313     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    314     count -= 1;
    315     front = (front + 1) % size;
    316 
    317     if ( !notify_one( prods ) && count > 0 )
    318         notify_one( cons );
    319 
    320     unlock( mutex_lock );
    321     return retval;
    322 }
    323 
    324 } // forall( T )
    325 #endif
    326 
    327 #ifdef __NO_WAIT_CHANNEL
    328 forall( T ) {
    329 struct channel {
    330     size_t size;
    331     size_t front, back, count;
    332     T * buffer;
    333     thread$ * chair;
    334     T * chair_elem;
    335     exp_backoff_then_block_lock c_lock, p_lock;
    336     __spinlock_t mutex_lock;
    337 };
    338 
    339 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
    340     size = _size;
    341     front = back = count = 0;
    342     buffer = aalloc( size );
    343     chair = 0p;
    344     mutex_lock{};
    345     c_lock{};
    346     p_lock{};
    347     lock( c_lock );
    348 }
    349 
    350 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    351 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
    352 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    353 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    354 static inline bool has_waiters( channel(T) & chan ) with(chan) { return c_lock.lock_value != 0; }
    355 
    356 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
    357     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    358     count += 1;
    359     back++;
    360     if ( back == size ) back = 0;
    361 }
    362 
    363 static inline void insert( channel(T) & chan, T elem ) with( chan ) {
    364     lock( p_lock );
    365     lock( mutex_lock __cfaabi_dbg_ctx2 );
    366 
    367     insert_( chan, elem );
    368 
    369     if ( count != size )
    370         unlock( p_lock );
    371 
    372     if ( count == 1 )
    373         unlock( c_lock );
    374        
    375     unlock( mutex_lock );
    376 }
    377 
    378 static inline T remove( channel(T) & chan ) with(chan) {
    379     lock( c_lock );
    380     lock( mutex_lock __cfaabi_dbg_ctx2 );
    381     T retval;
    382 
    383     // Remove from buffer
    384     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    385     count -= 1;
    386     front = (front + 1) % size;
    387 
    388     if ( count != 0 )
    389         unlock( c_lock );
    390 
    391     if ( count == size - 1 )
    392         unlock( p_lock );
    393        
    394     unlock( mutex_lock );
    395     return retval;
    396 }
    397 
    398 } // forall( T )
    399 #endif
  • libcfa/src/concurrency/locks.hfa

    refdd18c ra45e21c  
    3232#include <fstream.hfa>
    3333
    34 
    3534// futex headers
    3635#include <linux/futex.h>      /* Definition of FUTEX_* constants */
     
    155154// futex_mutex
    156155
    157 // - No cond var support
    158156// - Kernel thd blocking alternative to the spinlock
    159157// - No ownership (will deadlock on reacq)
     
    185183        int state;
    186184
    187        
    188         // // linear backoff omitted for now
    189         // for( int spin = 4; spin < 1024; spin += spin) {
    190         //      state = 0;
    191         //      // if unlocked, lock and return
    192         //      if (internal_try_lock(this, state)) return;
    193         //      if (2 == state) break;
    194         //      for (int i = 0; i < spin; i++) Pause();
    195         // }
    196 
    197         // no contention try to acquire
    198         if (internal_try_lock(this, state)) return;
     185        for( int spin = 4; spin < 1024; spin += spin) {
     186                state = 0;
     187                // if unlocked, lock and return
     188                if (internal_try_lock(this, state)) return;
     189                if (2 == state) break;
     190                for (int i = 0; i < spin; i++) Pause();
     191        }
     192
     193        // // no contention try to acquire
     194        // if (internal_try_lock(this, state)) return;
    199195       
    200196        // if not in contended state, set to be in contended state
     
    209205
    210206static inline void unlock(futex_mutex & this) with(this) {
    211         // if uncontended do atomice unlock and then return
    212         if (__atomic_fetch_sub(&val, 1, __ATOMIC_RELEASE) == 1) return; // TODO: try acq/rel
     207        // if uncontended do atomic unlock and then return
     208    if (__atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return;
    213209       
    214210        // otherwise threads are blocked so we must wake one
    215         __atomic_store_n((int *)&val, 0, __ATOMIC_RELEASE);
    216211        futex((int *)&val, FUTEX_WAKE, 1);
    217212}
     
    222217// to set recursion count after getting signalled;
    223218static inline void on_wakeup( futex_mutex & f, size_t recursion ) {}
     219
     220//-----------------------------------------------------------------------------
     221// go_mutex
     222
     223// - Kernel thd blocking alternative to the spinlock
     224// - No ownership (will deadlock on reacq)
     225// - Golang's flavour of mutex
     226// - Impl taken from Golang: src/runtime/lock_futex.go
     227struct go_mutex {
     228        // lock state any state other than UNLOCKED is locked
     229        // enum LockState { UNLOCKED = 0, LOCKED = 1, SLEEPING = 2 };
     230       
     231        // stores a lock state
     232        int val;
     233};
     234
     235static inline void  ?{}( go_mutex & this ) with(this) { val = 0; }
     236
     237static inline bool internal_try_lock(go_mutex & this, int & compare_val, int new_val ) with(this) {
     238        return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, new_val, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE);
     239}
     240
     241static inline int internal_exchange(go_mutex & this, int swap ) with(this) {
     242        return __atomic_exchange_n((int*)&val, swap, __ATOMIC_ACQUIRE);
     243}
     244
     245const int __go_mtx_spins = 4;
     246const int __go_mtx_pauses = 30;
     247// if this is called recursively IT WILL DEADLOCK!!!!!
     248static inline void lock(go_mutex & this) with(this) {
     249        int state, init_state;
     250
     251    // speculative grab
     252    state = internal_exchange(this, 1);
     253    if ( !state ) return; // state == 0
     254    init_state = state;
     255    for (;;) {
     256        for( int i = 0; i < __go_mtx_spins; i++ ) {
     257            while( !val ) { // lock unlocked
     258                state = 0;
     259                if (internal_try_lock(this, state, init_state)) return;
     260            }
     261            for (int i = 0; i < __go_mtx_pauses; i++) Pause();
     262        }
     263
     264        while( !val ) { // lock unlocked
     265            state = 0;
     266            if (internal_try_lock(this, state, init_state)) return;
     267        }
     268        sched_yield();
     269       
     270        // if not in contended state, set to be in contended state
     271        state = internal_exchange(this, 2);
     272        if ( !state ) return; // state == 0
     273        init_state = 2;
     274        futex((int*)&val, FUTEX_WAIT, 2); // if val is not 2 this returns with EWOULDBLOCK
     275    }
     276}
     277
     278static inline void unlock( go_mutex & this ) with(this) {
     279        // if uncontended do atomic unlock and then return
     280    if (__atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return;
     281       
     282        // otherwise threads are blocked so we must wake one
     283        futex((int *)&val, FUTEX_WAKE, 1);
     284}
     285
     286static inline void on_notify( go_mutex & f, thread$ * t){ unpark(t); }
     287static inline size_t on_wait( go_mutex & f ) {unlock(f); return 0;}
     288static inline void on_wakeup( go_mutex & f, size_t recursion ) {}
    224289
    225290//-----------------------------------------------------------------------------
     
    271336        this.lock_value = 0;
    272337}
     338
     339static inline void  ^?{}( exp_backoff_then_block_lock & this ){}
    273340
    274341static inline bool internal_try_lock(exp_backoff_then_block_lock & this, size_t & compare_val) with(this) {
Note: See TracChangeset for help on using the changeset viewer.