Changeset d30e3eb for libcfa/src


Ignore:
Timestamp:
Mar 24, 2023, 4:44:46 PM (17 months ago)
Author:
caparson <caparson@…>
Branches:
ADT, ast-experimental, master
Children:
1633e04
Parents:
de934c7
Message:

cleaned up exp_backoff lock and rewrote parts of channels to improve performance

Location:
libcfa/src/concurrency
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    rde934c7 rd30e3eb  
    22
    33#include <locks.hfa>
    4 
    5 struct no_reacq_lock {
    6     inline exp_backoff_then_block_lock;
    7 };
    8 
    9 // have to override these by hand to get around plan 9 inheritance bug where resolver can't find the appropriate routine to call
    10 static inline void   ?{}( no_reacq_lock & this ) { ((exp_backoff_then_block_lock &)this){}; }
    11 static inline bool   try_lock(no_reacq_lock & this) { return try_lock(((exp_backoff_then_block_lock &)this)); }
    12 static inline void   lock(no_reacq_lock & this) { lock(((exp_backoff_then_block_lock &)this)); }
    13 static inline void   unlock(no_reacq_lock & this) { unlock(((exp_backoff_then_block_lock &)this)); }
    14 static inline void   on_notify(no_reacq_lock & this, struct thread$ * t ) { on_notify(((exp_backoff_then_block_lock &)this), t); }
    15 static inline size_t on_wait(no_reacq_lock & this) { return on_wait(((exp_backoff_then_block_lock &)this)); }
    16 // override wakeup so that we don't reacquire the lock if using a condvar
    17 static inline void   on_wakeup( no_reacq_lock & this, size_t recursion ) {}
    18 
    19 #define __PREVENTION_CHANNEL
     4#include <list.hfa>
     5
     6// #define __PREVENTION_CHANNEL
    207#ifdef __PREVENTION_CHANNEL
    218forall( T ) {
    229struct channel {
    23     size_t size;
    24     size_t front, back, count;
     10    size_t size, count, front, back;
    2511    T * buffer;
    2612    thread$ * chair;
     
    8773        return;
    8874    }
    89     else insert_( chan, elem );
     75    insert_( chan, elem );
    9076
    9177    unlock( mutex_lock );
     
    11096
    11197    // wait if buffer is empty, work will be completed by someone else
    112     if ( count == 0 ) { 
     98    if ( count == 0 ) {
    11399        chair = active_thread();
    114100        chair_elem = &retval;
     
    121107    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    122108    count -= 1;
    123     front = (front + 1) % size;
     109    front++;
     110    if ( front == size ) front = 0;
    124111
    125112    if ( chair != 0p ) {
     
    142129
    143130#ifndef __PREVENTION_CHANNEL
     131
     132// link field used for threads waiting on channel
     133struct wait_link {
     134    // used to put wait_link on a dl queue
     135    inline dlink(wait_link);
     136
     137    // waiting thread
     138    struct thread$ * t;
     139
     140    // shadow field
     141    void * elem;
     142};
     143P9_EMBEDDED( wait_link, dlink(wait_link) )
     144
     145static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
     146    this.t = t;
     147    this.elem = elem;
     148}
     149
    144150forall( T ) {
     151
    145152struct channel {
    146153    size_t size;
    147154    size_t front, back, count;
    148155    T * buffer;
    149     fast_cond_var( no_reacq_lock ) prods, cons;
    150     no_reacq_lock mutex_lock;
     156    dlist( wait_link ) prods, cons;
     157    exp_backoff_then_block_lock mutex_lock;
    151158};
    152159
     
    164171static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    165172static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    166 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
    167 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
    168 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
     173static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
     174static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
     175static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
    169176
    170177static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
     
    175182}
    176183
     184static inline void wake_one( dlist( wait_link ) & queue ) {
     185    wait_link & popped = try_pop_front( queue );
     186    unpark( popped.t );
     187}
     188
     189static inline void block( dlist( wait_link ) & queue, void * elem_ptr, exp_backoff_then_block_lock & lock ) {
     190    wait_link w{ active_thread(), elem_ptr };
     191    insert_last( queue, w );
     192    unlock( lock );
     193    park();
     194}
    177195
    178196static inline void insert( channel(T) & chan, T elem ) with(chan) {
     
    180198
    181199    // have to check for the zero size channel case
    182     if ( size == 0 && !empty( cons ) ) {
    183         memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
    184         notify_one( cons );
     200    if ( size == 0 && !cons`isEmpty ) {
     201        memcpy(cons`first.elem, (void *)&elem, sizeof(T));
     202        wake_one( cons );
    185203        unlock( mutex_lock );
    186204        return;
     
    188206
    189207    // wait if buffer is full, work will be completed by someone else
    190     if ( count == size ) { 
    191         wait( prods, mutex_lock, (uintptr_t)&elem );
     208    if ( count == size ) {
     209        block( prods, &elem, mutex_lock );
    192210        return;
    193211    } // if
    194212
    195     if ( count == 0 && !empty( cons ) )
    196         // do waiting consumer work
    197         memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
    198     else insert_( chan, elem );
     213    if ( count == 0 && !cons`isEmpty ) {
     214        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     215        wake_one( cons );
     216    } else insert_( chan, elem );
    199217   
    200     notify_one( cons );
    201218    unlock( mutex_lock );
    202219}
     
    207224
    208225    // have to check for the zero size channel case
    209     if ( size == 0 && !empty( prods ) ) {
    210         memcpy((void *)&retval, (void *)front( prods ), sizeof(T));
    211         notify_one( prods );
     226    if ( size == 0 && !prods`isEmpty ) {
     227        memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
     228        wake_one( prods );
    212229        unlock( mutex_lock );
    213230        return retval;
     
    215232
    216233    // wait if buffer is empty, work will be completed by someone else
    217     if (count == 0) { 
    218         wait( cons, mutex_lock, (uintptr_t)&retval );
     234    if (count == 0) {
     235        block( cons, &retval, mutex_lock );
    219236        return retval;
    220237    }
     
    225242    front = (front + 1) % size;
    226243
    227     if (count == size - 1 && !empty( prods ) )
    228         insert_( chan, *((T *)front( prods )) );  // do waiting producer work
    229 
    230     notify_one( prods );
     244    if (count == size - 1 && !prods`isEmpty ) {
     245        insert_( chan, *(T *)prods`first.elem );  // do waiting producer work
     246        wake_one( prods );
     247    }
     248
    231249    unlock( mutex_lock );
    232250    return retval;
    233251}
    234 
    235252} // forall( T )
    236253#endif
  • libcfa/src/concurrency/locks.hfa

    rde934c7 rd30e3eb  
    253253static inline void on_wakeup(clh_lock & this, size_t recursion ) { lock(this); }
    254254
    255 
    256255//-----------------------------------------------------------------------------
    257256// Exponential backoff then block lock
     
    272271        this.lock_value = 0;
    273272}
    274 static inline void ^?{}( exp_backoff_then_block_lock & this ) {}
    275 // static inline void ?{}( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void;
    276 // static inline void ?=?( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void;
    277273
    278274static inline bool internal_try_lock(exp_backoff_then_block_lock & this, size_t & compare_val) with(this) {
    279         if (__atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
    280                 return true;
    281         }
    282         return false;
     275        return __atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
    283276}
    284277
     
    286279
    287280static inline bool try_lock_contention(exp_backoff_then_block_lock & this) with(this) {
    288         if (__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE) == 0) {
    289                 return true;
    290         }
    291         return false;
     281        return !__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE);
    292282}
    293283
    294284static inline bool block(exp_backoff_then_block_lock & this) with(this) {
    295         lock( spinlock __cfaabi_dbg_ctx2 ); // TODO change to lockfree queue (MPSC)
    296         if (lock_value != 2) {
    297                 unlock( spinlock );
    298                 return true;
    299         }
    300         insert_last( blocked_threads, *active_thread() );
    301         unlock( spinlock );
     285    lock( spinlock __cfaabi_dbg_ctx2 );
     286    if (__atomic_load_n( &lock_value, __ATOMIC_SEQ_CST) != 2) {
     287        unlock( spinlock );
     288        return true;
     289    }
     290    insert_last( blocked_threads, *active_thread() );
     291    unlock( spinlock );
    302292        park( );
    303293        return true;
     
    307297        size_t compare_val = 0;
    308298        int spin = 4;
     299
    309300        // linear backoff
    310301        for( ;; ) {
     
    324315static inline void unlock(exp_backoff_then_block_lock & this) with(this) {
    325316    if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return;
    326         lock( spinlock __cfaabi_dbg_ctx2 );
    327         thread$ * t = &try_pop_front( blocked_threads );
    328         unlock( spinlock );
    329         unpark( t );
     317    lock( spinlock __cfaabi_dbg_ctx2 );
     318    thread$ * t = &try_pop_front( blocked_threads );
     319    unlock( spinlock );
     320    unpark( t );
    330321}
    331322
Note: See TracChangeset for help on using the changeset viewer.