Changes in / [75d874a:1633e04]


Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    r75d874a r1633e04  
    44#include <list.hfa>
    55
    6 #define __COOP_CHANNEL
     6// #define __PREVENTION_CHANNEL
    77#ifdef __PREVENTION_CHANNEL
    88forall( T ) {
     
    1414    exp_backoff_then_block_lock c_lock, p_lock;
    1515    __spinlock_t mutex_lock;
    16     char __padding[64]; // avoid false sharing in arrays of channels
     16    char __padding[64]; // avoid false sharing in arrays
    1717};
    1818
     
    2020    size = _size;
    2121    front = back = count = 0;
    22     buffer = aalloc( size );
     22    buffer = anew( size );
    2323    chair = 0p;
    2424    mutex_lock{};
     
    128128#endif
    129129
    130 #ifdef __COOP_CHANNEL
     130#ifndef __PREVENTION_CHANNEL
    131131
    132132// link field used for threads waiting on channel
     
    161161    size = _size;
    162162    front = back = count = 0;
    163     buffer = aalloc( size );
     163    buffer = anew( size );
    164164    prods{};
    165165    cons{};
     
    252252} // forall( T )
    253253#endif
    254 
    255 #ifdef __BARGE_CHANNEL
    256 forall( T ) {
    257 struct channel {
    258     size_t size;
    259     size_t front, back, count;
    260     T * buffer;
    261     fast_cond_var( exp_backoff_then_block_lock ) prods, cons;
    262     exp_backoff_then_block_lock mutex_lock;
    263 };
    264 
    265 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
    266     size = _size;
    267     front = back = count = 0;
    268     buffer = aalloc( size );
    269     prods{};
    270     cons{};
    271     mutex_lock{};
    272 }
    273 
    274 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    275 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
    276 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    277 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    278 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
    279 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
    280 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
    281 
    282 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
    283     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    284     count += 1;
    285     back++;
    286     if ( back == size ) back = 0;
    287 }
    288 
    289 
    290 static inline void insert( channel(T) & chan, T elem ) with(chan) {
    291     lock( mutex_lock );
    292 
    293     while ( count == size ) {
    294         wait( prods, mutex_lock );
    295     } // if
    296 
    297     insert_( chan, elem );
    298    
    299     if ( !notify_one( cons ) && count < size )
    300         notify_one( prods );
    301 
    302     unlock( mutex_lock );
    303 }
    304 
    305 static inline T remove( channel(T) & chan ) with(chan) {
    306     lock( mutex_lock );
    307     T retval;
    308 
    309     while (count == 0) {
    310         wait( cons, mutex_lock );
    311     }
    312 
    313     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    314     count -= 1;
    315     front = (front + 1) % size;
    316 
    317     if ( !notify_one( prods ) && count > 0 )
    318         notify_one( cons );
    319 
    320     unlock( mutex_lock );
    321     return retval;
    322 }
    323 
    324 } // forall( T )
    325 #endif
    326 
    327 #ifdef __NO_WAIT_CHANNEL
    328 forall( T ) {
    329 struct channel {
    330     size_t size;
    331     size_t front, back, count;
    332     T * buffer;
    333     thread$ * chair;
    334     T * chair_elem;
    335     exp_backoff_then_block_lock c_lock, p_lock;
    336     __spinlock_t mutex_lock;
    337 };
    338 
    339 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
    340     size = _size;
    341     front = back = count = 0;
    342     buffer = aalloc( size );
    343     chair = 0p;
    344     mutex_lock{};
    345     c_lock{};
    346     p_lock{};
    347     lock( c_lock );
    348 }
    349 
    350 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    351 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
    352 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    353 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    354 static inline bool has_waiters( channel(T) & chan ) with(chan) { return c_lock.lock_value != 0; }
    355 
    356 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
    357     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    358     count += 1;
    359     back++;
    360     if ( back == size ) back = 0;
    361 }
    362 
    363 static inline void insert( channel(T) & chan, T elem ) with( chan ) {
    364     lock( p_lock );
    365     lock( mutex_lock __cfaabi_dbg_ctx2 );
    366 
    367     insert_( chan, elem );
    368 
    369     if ( count != size )
    370         unlock( p_lock );
    371 
    372     if ( count == 1 )
    373         unlock( c_lock );
    374        
    375     unlock( mutex_lock );
    376 }
    377 
    378 static inline T remove( channel(T) & chan ) with(chan) {
    379     lock( c_lock );
    380     lock( mutex_lock __cfaabi_dbg_ctx2 );
    381     T retval;
    382 
    383     // Remove from buffer
    384     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    385     count -= 1;
    386     front = (front + 1) % size;
    387 
    388     if ( count != 0 )
    389         unlock( c_lock );
    390 
    391     if ( count == size - 1 )
    392         unlock( p_lock );
    393        
    394     unlock( mutex_lock );
    395     return retval;
    396 }
    397 
    398 } // forall( T )
    399 #endif
Note: See TracChangeset for help on using the changeset viewer.