Ignore:
Timestamp:
Feb 25, 2023, 6:45:40 PM (21 months ago)
Author:
caparson <caparson@…>
Branches:
ADT, ast-experimental, master
Children:
d964c39
Parents:
640b3df
Message:

Thought of new channel implementation while working on the prevention paper. Resulted in 30%+ greater throughput so impl is now switched to that.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    r640b3df rce44c5f  
    1515static inline void   on_wakeup( no_reacq_lock & this, size_t recursion ) {}
    1616
     17#define __PREVENTION_CHANNEL
     18#ifdef __PREVENTION_CHANNEL
     19forall( T ) {
     20struct channel {
     21    size_t size;
     22    size_t front, back, count;
     23    T * buffer;
     24    thread$ * chair;
     25    T * chair_elem;
     26    exp_backoff_then_block_lock c_lock, p_lock;
     27    __spinlock_t mutex_lock;
     28};
     29
     30static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
     31    size = _size;
     32    front = back = count = 0;
     33    buffer = anew( size );
     34    chair = 0p;
     35    mutex_lock{};
     36    c_lock{};
     37    p_lock{};
     38}
     39
     40static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
     41static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
     42static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
     43static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
     44static inline bool has_waiters( channel(T) & chan ) with(chan) { return chair != 0p; }
     45
     46static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
     47    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
     48    count += 1;
     49    back++;
     50    if ( back == size ) back = 0;
     51}
     52
     53static inline void insert( channel(T) & chan, T elem ) with( chan ) {
     54    lock( p_lock );
     55    lock( mutex_lock __cfaabi_dbg_ctx2 );
     56
     57    // have to check for the zero size channel case
     58    if ( size == 0 && chair != 0p ) {
     59        memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
     60        unpark( chair );
     61        chair = 0p;
     62        unlock( mutex_lock );
     63        unlock( p_lock );
     64        unlock( c_lock );
     65        return;
     66    }
     67
     68    // wait if buffer is full, work will be completed by someone else
     69    if ( count == size ) {
     70        chair = active_thread();
     71        chair_elem = &elem;
     72        unlock( mutex_lock );
     73        park( );
     74        return;
     75    } // if
     76
     77    if ( chair != 0p ) {
     78        memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
     79        unpark( chair );
     80        chair = 0p;
     81        unlock( mutex_lock );
     82        unlock( p_lock );
     83        unlock( c_lock );
     84        return;
     85    }
     86    else insert_( chan, elem );
     87
     88    unlock( mutex_lock );
     89    unlock( p_lock );
     90}
     91
     92static inline T remove( channel(T) & chan ) with(chan) {
     93    lock( c_lock );
     94    lock( mutex_lock __cfaabi_dbg_ctx2 );
     95    T retval;
     96
     97    // have to check for the zero size channel case
     98    if ( size == 0 && chair != 0p ) {
     99        memcpy((void *)&retval, (void *)chair_elem, sizeof(T));
     100        unpark( chair );
     101        chair = 0p;
     102        unlock( mutex_lock );
     103        unlock( p_lock );
     104        unlock( c_lock );
     105        return retval;
     106    }
     107
     108    // wait if buffer is empty, work will be completed by someone else
     109    if ( count == 0 ) {
     110        chair = active_thread();
     111        chair_elem = &retval;
     112        unlock( mutex_lock );
     113        park( );
     114        return retval;
     115    }
     116
     117    // Remove from buffer
     118    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     119    count -= 1;
     120    front = (front + 1) % size;
     121
     122    if ( chair != 0p ) {
     123        insert_( chan, *chair_elem );  // do waiting producer work
     124        unpark( chair );
     125        chair = 0p;
     126        unlock( mutex_lock );
     127        unlock( p_lock );
     128        unlock( c_lock );
     129        return retval;
     130    }
     131
     132    unlock( mutex_lock );
     133    unlock( c_lock );
     134    return retval;
     135}
     136
     137} // forall( T )
     138#endif
     139
     140#ifndef __PREVENTION_CHANNEL
    17141forall( T ) {
    18142struct channel {
     
    41165static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
    42166
    43 static inline void insert_( channel(T) & chan, T elem ) with(chan) {
     167static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
    44168    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    45169    count += 1;
     
    107231
    108232} // forall( T )
     233#endif
Note: See TracChangeset for help on using the changeset viewer.