Changes in / [1633e04:056bee8]


Ignore:
Location:
libcfa/src/concurrency
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    r1633e04 r056bee8  
    22
    33#include <locks.hfa>
    4 #include <list.hfa>
    5 
    6 // #define __PREVENTION_CHANNEL
     4
     5struct no_reacq_lock {
     6    inline exp_backoff_then_block_lock;
     7};
     8
     9// have to override these by hand to get around plan 9 inheritance bug where resolver can't find the appropriate routine to call
     10static inline void   ?{}( no_reacq_lock & this ) { ((exp_backoff_then_block_lock &)this){}; }
     11static inline bool   try_lock(no_reacq_lock & this) { return try_lock(((exp_backoff_then_block_lock &)this)); }
     12static inline void   lock(no_reacq_lock & this) { lock(((exp_backoff_then_block_lock &)this)); }
     13static inline void   unlock(no_reacq_lock & this) { unlock(((exp_backoff_then_block_lock &)this)); }
     14static inline void   on_notify(no_reacq_lock & this, struct thread$ * t ) { on_notify(((exp_backoff_then_block_lock &)this), t); }
     15static inline size_t on_wait(no_reacq_lock & this) { return on_wait(((exp_backoff_then_block_lock &)this)); }
     16// override wakeup so that we don't reacquire the lock if using a condvar
     17static inline void   on_wakeup( no_reacq_lock & this, size_t recursion ) {}
     18
     19#define __PREVENTION_CHANNEL
    720#ifdef __PREVENTION_CHANNEL
    821forall( T ) {
    922struct channel {
    10     size_t size, count, front, back;
     23    size_t size;
     24    size_t front, back, count;
    1125    T * buffer;
    1226    thread$ * chair;
     
    7387        return;
    7488    }
    75     insert_( chan, elem );
     89    else insert_( chan, elem );
    7690
    7791    unlock( mutex_lock );
     
    96110
    97111    // wait if buffer is empty, work will be completed by someone else
    98     if ( count == 0 ) {
     112    if ( count == 0 ) { 
    99113        chair = active_thread();
    100114        chair_elem = &retval;
     
    107121    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    108122    count -= 1;
    109     front++;
    110     if ( front == size ) front = 0;
     123    front = (front + 1) % size;
    111124
    112125    if ( chair != 0p ) {
     
    129142
    130143#ifndef __PREVENTION_CHANNEL
    131 
    132 // link field used for threads waiting on channel
    133 struct wait_link {
    134     // used to put wait_link on a dl queue
    135     inline dlink(wait_link);
    136 
    137     // waiting thread
    138     struct thread$ * t;
    139 
    140     // shadow field
    141     void * elem;
    142 };
    143 P9_EMBEDDED( wait_link, dlink(wait_link) )
    144 
    145 static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
    146     this.t = t;
    147     this.elem = elem;
    148 }
    149 
    150144forall( T ) {
    151 
    152145struct channel {
    153146    size_t size;
    154147    size_t front, back, count;
    155148    T * buffer;
    156     dlist( wait_link ) prods, cons;
    157     exp_backoff_then_block_lock mutex_lock;
     149    fast_cond_var( no_reacq_lock ) prods, cons;
     150    no_reacq_lock mutex_lock;
    158151};
    159152
     
    171164static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    172165static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    173 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
    174 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
    175 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
     166static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
     167static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
     168static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
    176169
    177170static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
     
    182175}
    183176
    184 static inline void wake_one( dlist( wait_link ) & queue ) {
    185     wait_link & popped = try_pop_front( queue );
    186     unpark( popped.t );
    187 }
    188 
    189 static inline void block( dlist( wait_link ) & queue, void * elem_ptr, exp_backoff_then_block_lock & lock ) {
    190     wait_link w{ active_thread(), elem_ptr };
    191     insert_last( queue, w );
    192     unlock( lock );
    193     park();
    194 }
    195177
    196178static inline void insert( channel(T) & chan, T elem ) with(chan) {
     
    198180
    199181    // have to check for the zero size channel case
    200     if ( size == 0 && !cons`isEmpty ) {
    201         memcpy(cons`first.elem, (void *)&elem, sizeof(T));
    202         wake_one( cons );
     182    if ( size == 0 && !empty( cons ) ) {
     183        memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
     184        notify_one( cons );
    203185        unlock( mutex_lock );
    204186        return;
     
    206188
    207189    // wait if buffer is full, work will be completed by someone else
    208     if ( count == size ) {
    209         block( prods, &elem, mutex_lock );
     190    if ( count == size ) { 
     191        wait( prods, mutex_lock, (uintptr_t)&elem );
    210192        return;
    211193    } // if
    212194
    213     if ( count == 0 && !cons`isEmpty ) {
    214         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    215         wake_one( cons );
    216     } else insert_( chan, elem );
     195    if ( count == 0 && !empty( cons ) )
     196        // do waiting consumer work
     197        memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
     198    else insert_( chan, elem );
    217199   
     200    notify_one( cons );
    218201    unlock( mutex_lock );
    219202}
     
    224207
    225208    // have to check for the zero size channel case
    226     if ( size == 0 && !prods`isEmpty ) {
    227         memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
    228         wake_one( prods );
     209    if ( size == 0 && !empty( prods ) ) {
     210        memcpy((void *)&retval, (void *)front( prods ), sizeof(T));
     211        notify_one( prods );
    229212        unlock( mutex_lock );
    230213        return retval;
     
    232215
    233216    // wait if buffer is empty, work will be completed by someone else
    234     if (count == 0) {
    235         block( cons, &retval, mutex_lock );
     217    if (count == 0) { 
     218        wait( cons, mutex_lock, (uintptr_t)&retval );
    236219        return retval;
    237220    }
     
    242225    front = (front + 1) % size;
    243226
    244     if (count == size - 1 && !prods`isEmpty ) {
    245         insert_( chan, *(T *)prods`first.elem );  // do waiting producer work
    246         wake_one( prods );
    247     }
    248 
     227    if (count == size - 1 && !empty( prods ) )
     228        insert_( chan, *((T *)front( prods )) );  // do waiting producer work
     229
     230    notify_one( prods );
    249231    unlock( mutex_lock );
    250232    return retval;
    251233}
     234
    252235} // forall( T )
    253236#endif
  • libcfa/src/concurrency/locks.hfa

    r1633e04 r056bee8  
    253253static inline void on_wakeup(clh_lock & this, size_t recursion ) { lock(this); }
    254254
     255
    255256//-----------------------------------------------------------------------------
    256257// Exponential backoff then block lock
     
    271272        this.lock_value = 0;
    272273}
     274static inline void ^?{}( exp_backoff_then_block_lock & this ) {}
     275// static inline void ?{}( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void;
     276// static inline void ?=?( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void;
    273277
    274278static inline bool internal_try_lock(exp_backoff_then_block_lock & this, size_t & compare_val) with(this) {
    275         return __atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
     279        if (__atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
     280                return true;
     281        }
     282        return false;
    276283}
    277284
     
    279286
    280287static inline bool try_lock_contention(exp_backoff_then_block_lock & this) with(this) {
    281         return !__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE);
     288        if (__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE) == 0) {
     289                return true;
     290        }
     291        return false;
    282292}
    283293
    284294static inline bool block(exp_backoff_then_block_lock & this) with(this) {
    285     lock( spinlock __cfaabi_dbg_ctx2 );
    286     if (__atomic_load_n( &lock_value, __ATOMIC_SEQ_CST) != 2) {
    287         unlock( spinlock );
    288         return true;
    289     }
    290     insert_last( blocked_threads, *active_thread() );
    291     unlock( spinlock );
     295        lock( spinlock __cfaabi_dbg_ctx2 ); // TODO change to lockfree queue (MPSC)
     296        if (lock_value != 2) {
     297                unlock( spinlock );
     298                return true;
     299        }
     300        insert_last( blocked_threads, *active_thread() );
     301        unlock( spinlock );
    292302        park( );
    293303        return true;
     
    297307        size_t compare_val = 0;
    298308        int spin = 4;
    299 
    300309        // linear backoff
    301310        for( ;; ) {
     
    315324static inline void unlock(exp_backoff_then_block_lock & this) with(this) {
    316325    if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return;
    317     lock( spinlock __cfaabi_dbg_ctx2 );
    318     thread$ * t = &try_pop_front( blocked_threads );
    319     unlock( spinlock );
    320     unpark( t );
     326        lock( spinlock __cfaabi_dbg_ctx2 );
     327        thread$ * t = &try_pop_front( blocked_threads );
     328        unlock( spinlock );
     329        unpark( t );
    321330}
    322331
Note: See TracChangeset for help on using the changeset viewer.