Changes in / [1633e04:75d874a]


Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    r1633e04 r75d874a  
    44#include <list.hfa>
    55
    6 // #define __PREVENTION_CHANNEL
     6#define __COOP_CHANNEL
    77#ifdef __PREVENTION_CHANNEL
    88forall( T ) {
     
    1414    exp_backoff_then_block_lock c_lock, p_lock;
    1515    __spinlock_t mutex_lock;
    16     char __padding[64]; // avoid false sharing in arrays
     16    char __padding[64]; // avoid false sharing in arrays of channels
    1717};
    1818
     
    2020    size = _size;
    2121    front = back = count = 0;
    22     buffer = anew( size );
     22    buffer = aalloc( size );
    2323    chair = 0p;
    2424    mutex_lock{};
     
    128128#endif
    129129
    130 #ifndef __PREVENTION_CHANNEL
     130#ifdef __COOP_CHANNEL
    131131
    132132// link field used for threads waiting on channel
     
    161161    size = _size;
    162162    front = back = count = 0;
    163     buffer = anew( size );
     163    buffer = aalloc( size );
    164164    prods{};
    165165    cons{};
     
    252252} // forall( T )
    253253#endif
     254
     255#ifdef __BARGE_CHANNEL
     256forall( T ) {
     257struct channel {
     258    size_t size;
     259    size_t front, back, count;
     260    T * buffer;
     261    fast_cond_var( exp_backoff_then_block_lock ) prods, cons;
     262    exp_backoff_then_block_lock mutex_lock;
     263};
     264
     265static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
     266    size = _size;
     267    front = back = count = 0;
     268    buffer = aalloc( size );
     269    prods{};
     270    cons{};
     271    mutex_lock{};
     272}
     273
     274static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
     275static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
     276static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
     277static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
     278static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
     279static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
     280static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
     281
     282static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
     283    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
     284    count += 1;
     285    back++;
     286    if ( back == size ) back = 0;
     287}
     288
     289
     290static inline void insert( channel(T) & chan, T elem ) with(chan) {
     291    lock( mutex_lock );
     292
     293    while ( count == size ) {
     294        wait( prods, mutex_lock );
     295    } // if
     296
     297    insert_( chan, elem );
     298   
     299    if ( !notify_one( cons ) && count < size )
     300        notify_one( prods );
     301
     302    unlock( mutex_lock );
     303}
     304
     305static inline T remove( channel(T) & chan ) with(chan) {
     306    lock( mutex_lock );
     307    T retval;
     308
     309    while (count == 0) {
     310        wait( cons, mutex_lock );
     311    }
     312
     313    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     314    count -= 1;
     315    front = (front + 1) % size;
     316
     317    if ( !notify_one( prods ) && count > 0 )
     318        notify_one( cons );
     319
     320    unlock( mutex_lock );
     321    return retval;
     322}
     323
     324} // forall( T )
     325#endif
     326
     327#ifdef __NO_WAIT_CHANNEL
     328forall( T ) {
     329struct channel {
     330    size_t size;
     331    size_t front, back, count;
     332    T * buffer;
     333    thread$ * chair;
     334    T * chair_elem;
     335    exp_backoff_then_block_lock c_lock, p_lock;
     336    __spinlock_t mutex_lock;
     337};
     338
     339static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
     340    size = _size;
     341    front = back = count = 0;
     342    buffer = aalloc( size );
     343    chair = 0p;
     344    mutex_lock{};
     345    c_lock{};
     346    p_lock{};
     347    lock( c_lock );
     348}
     349
     350static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
     351static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
     352static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
     353static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
     354static inline bool has_waiters( channel(T) & chan ) with(chan) { return c_lock.lock_value != 0; }
     355
     356static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
     357    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
     358    count += 1;
     359    back++;
     360    if ( back == size ) back = 0;
     361}
     362
     363static inline void insert( channel(T) & chan, T elem ) with( chan ) {
     364    lock( p_lock );
     365    lock( mutex_lock __cfaabi_dbg_ctx2 );
     366
     367    insert_( chan, elem );
     368
     369    if ( count != size )
     370        unlock( p_lock );
     371
     372    if ( count == 1 )
     373        unlock( c_lock );
     374       
     375    unlock( mutex_lock );
     376}
     377
     378static inline T remove( channel(T) & chan ) with(chan) {
     379    lock( c_lock );
     380    lock( mutex_lock __cfaabi_dbg_ctx2 );
     381    T retval;
     382
     383    // Remove from buffer
     384    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     385    count -= 1;
     386    front = (front + 1) % size;
     387
     388    if ( count != 0 )
     389        unlock( c_lock );
     390
     391    if ( count == size - 1 )
     392        unlock( p_lock );
     393       
     394    unlock( mutex_lock );
     395    return retval;
     396}
     397
     398} // forall( T )
     399#endif
Note: See TracChangeset for help on using the changeset viewer.