Changes in / [d964c39:2d028003]


Ignore:
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    rd964c39 r2d028003  
    1515static inline void   on_wakeup( no_reacq_lock & this, size_t recursion ) {}
    1616
    17 #define __PREVENTION_CHANNEL
    18 #ifdef __PREVENTION_CHANNEL
    19 forall( T ) {
    20 struct channel {
    21     size_t size;
    22     size_t front, back, count;
    23     T * buffer;
    24     thread$ * chair;
    25     T * chair_elem;
    26     exp_backoff_then_block_lock c_lock, p_lock;
    27     __spinlock_t mutex_lock;
    28 };
    29 
    30 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
    31     size = _size;
    32     front = back = count = 0;
    33     buffer = anew( size );
    34     chair = 0p;
    35     mutex_lock{};
    36     c_lock{};
    37     p_lock{};
    38 }
    39 
    40 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    41 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
    42 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    43 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    44 static inline bool has_waiters( channel(T) & chan ) with(chan) { return chair != 0p; }
    45 
    46 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
    47     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    48     count += 1;
    49     back++;
    50     if ( back == size ) back = 0;
    51 }
    52 
    53 static inline void insert( channel(T) & chan, T elem ) with( chan ) {
    54     lock( p_lock );
    55     lock( mutex_lock __cfaabi_dbg_ctx2 );
    56 
    57     // have to check for the zero size channel case
    58     if ( size == 0 && chair != 0p ) {
    59         memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
    60         unpark( chair );
    61         chair = 0p;
    62         unlock( mutex_lock );
    63         unlock( p_lock );
    64         unlock( c_lock );
    65         return;
    66     }
    67 
    68     // wait if buffer is full, work will be completed by someone else
    69     if ( count == size ) {
    70         chair = active_thread();
    71         chair_elem = &elem;
    72         unlock( mutex_lock );
    73         park( );
    74         return;
    75     } // if
    76 
    77     if ( chair != 0p ) {
    78         memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
    79         unpark( chair );
    80         chair = 0p;
    81         unlock( mutex_lock );
    82         unlock( p_lock );
    83         unlock( c_lock );
    84         return;
    85     }
    86     else insert_( chan, elem );
    87 
    88     unlock( mutex_lock );
    89     unlock( p_lock );
    90 }
    91 
    92 static inline T remove( channel(T) & chan ) with(chan) {
    93     lock( c_lock );
    94     lock( mutex_lock __cfaabi_dbg_ctx2 );
    95     T retval;
    96 
    97     // have to check for the zero size channel case
    98     if ( size == 0 && chair != 0p ) {
    99         memcpy((void *)&retval, (void *)chair_elem, sizeof(T));
    100         unpark( chair );
    101         chair = 0p;
    102         unlock( mutex_lock );
    103         unlock( p_lock );
    104         unlock( c_lock );
    105         return retval;
    106     }
    107 
    108     // wait if buffer is empty, work will be completed by someone else
    109     if ( count == 0 ) {
    110         chair = active_thread();
    111         chair_elem = &retval;
    112         unlock( mutex_lock );
    113         park( );
    114         return retval;
    115     }
    116 
    117     // Remove from buffer
    118     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    119     count -= 1;
    120     front = (front + 1) % size;
    121 
    122     if ( chair != 0p ) {
    123         insert_( chan, *chair_elem );  // do waiting producer work
    124         unpark( chair );
    125         chair = 0p;
    126         unlock( mutex_lock );
    127         unlock( p_lock );
    128         unlock( c_lock );
    129         return retval;
    130     }
    131 
    132     unlock( mutex_lock );
    133     unlock( c_lock );
    134     return retval;
    135 }
    136 
    137 } // forall( T )
    138 #endif
    139 
    140 #ifndef __PREVENTION_CHANNEL
    14117forall( T ) {
    14218struct channel {
     
    16541static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
    16642
    167 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
     43static inline void insert_( channel(T) & chan, T elem ) with(chan) {
    16844    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    16945    count += 1;
     
    231107
    232108} // forall( T )
    233 #endif
  • tests/concurrent/channels/parallel_harness.hfa

    rd964c39 r2d028003  
    139139    while( cons_done_count != Consumers * Channels ) {
    140140        for ( i; Channels ) {
    141             if ( has_waiters( channels[i] ) ){
     141            if ( has_waiting_consumers( channels[i] ) ){
    142142                #ifdef BIG
    143143                bigObject b{0};
Note: See TracChangeset for help on using the changeset viewer.