Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    r0e16a2d ra45e21c  
    22
    33#include <locks.hfa>
    4 
    5 struct no_reacq_lock {
    6     inline exp_backoff_then_block_lock;
     4#include <list.hfa>
     5#include <mutex_stmt.hfa>
     6
     7// link field used for threads waiting on channel
     8struct wait_link {
     9    // used to put wait_link on a dl queue
     10    inline dlink(wait_link);
     11
     12    // waiting thread
     13    struct thread$ * t;
     14
     15    // shadow field
     16    void * elem;
    717};
    8 
    9 // have to override these by hand to get around plan 9 inheritance bug where resolver can't find the appropriate routine to call
    10 static inline void   ?{}( no_reacq_lock & this ) { ((exp_backoff_then_block_lock &)this){}; }
    11 static inline bool   try_lock(no_reacq_lock & this) { return try_lock(((exp_backoff_then_block_lock &)this)); }
    12 static inline void   lock(no_reacq_lock & this) { lock(((exp_backoff_then_block_lock &)this)); }
    13 static inline void   unlock(no_reacq_lock & this) { unlock(((exp_backoff_then_block_lock &)this)); }
    14 static inline void   on_notify(no_reacq_lock & this, struct thread$ * t ) { on_notify(((exp_backoff_then_block_lock &)this), t); }
    15 static inline size_t on_wait(no_reacq_lock & this) { return on_wait(((exp_backoff_then_block_lock &)this)); }
    16 // override wakeup so that we don't reacquire the lock if using a condvar
    17 static inline void   on_wakeup( no_reacq_lock & this, size_t recursion ) {}
    18 
    19 #define __PREVENTION_CHANNEL
    20 #ifdef __PREVENTION_CHANNEL
     18P9_EMBEDDED( wait_link, dlink(wait_link) )
     19
     20static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
     21    this.t = t;
     22    this.elem = elem;
     23}
     24
     25// wake one thread from the list
     26static inline void wake_one( dlist( wait_link ) & queue ) {
     27    wait_link & popped = try_pop_front( queue );
     28    unpark( popped.t );
     29}
     30
     31// returns true if woken due to shutdown
     32// blocks thread on list and releases passed lock
     33static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) {
     34    wait_link w{ active_thread(), elem_ptr };
     35    insert_last( queue, w );
     36    unlock( lock );
     37    park();
     38    return w.elem == 0p;
     39}
     40
     41// void * used for some fields since exceptions don't work with parametric polymorphism currently
     42exception channel_closed {
     43    // on failed insert elem is a ptr to the element attempting to be inserted
     44    // on failed remove elem ptr is 0p
     45    // on resumption of a failed insert this elem will be inserted
     46    // so a user may modify it in the resumption handler
     47    void * elem;
     48
     49    // pointer to chan that is closed
     50    void * closed_chan;
     51};
     52vtable(channel_closed) channel_closed_vt;
     53
     54// #define CHAN_STATS // define this to get channel stats printed in dtor
     55
    2156forall( T ) {
    22 struct channel {
    23     size_t size;
    24     size_t front, back, count;
     57
     58struct __attribute__((aligned(128))) channel {
     59    size_t size, front, back, count;
    2560    T * buffer;
    26     thread$ * chair;
    27     T * chair_elem;
    28     exp_backoff_then_block_lock c_lock, p_lock;
    29     __spinlock_t mutex_lock;
    30     char __padding[64]; // avoid false sharing in arrays of channels
    31 };
    32 
    33 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
    34     size = _size;
    35     front = back = count = 0;
    36     buffer = aalloc( size );
    37     chair = 0p;
    38     mutex_lock{};
    39     c_lock{};
    40     p_lock{};
    41 }
    42 
    43 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    44 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
    45 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    46 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    47 static inline bool has_waiters( channel(T) & chan ) with(chan) { return chair != 0p; }
    48 
    49 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
    50     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    51     count += 1;
    52     back++;
    53     if ( back == size ) back = 0;
    54 }
    55 
    56 static inline void insert( channel(T) & chan, T elem ) with( chan ) {
    57     lock( p_lock );
    58     lock( mutex_lock __cfaabi_dbg_ctx2 );
    59 
    60     // have to check for the zero size channel case
    61     if ( size == 0 && chair != 0p ) {
    62         memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
    63         unpark( chair );
    64         chair = 0p;
    65         unlock( mutex_lock );
    66         unlock( p_lock );
    67         unlock( c_lock );
    68         return;
    69     }
    70 
    71     // wait if buffer is full, work will be completed by someone else
    72     if ( count == size ) {
    73         chair = active_thread();
    74         chair_elem = &elem;
    75         unlock( mutex_lock );
    76         park( );
    77         return;
    78     } // if
    79 
    80     if ( chair != 0p ) {
    81         memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
    82         unpark( chair );
    83         chair = 0p;
    84         unlock( mutex_lock );
    85         unlock( p_lock );
    86         unlock( c_lock );
    87         return;
    88     }
    89     else insert_( chan, elem );
    90 
    91     unlock( mutex_lock );
    92     unlock( p_lock );
    93 }
    94 
    95 static inline T remove( channel(T) & chan ) with(chan) {
    96     lock( c_lock );
    97     lock( mutex_lock __cfaabi_dbg_ctx2 );
    98     T retval;
    99 
    100     // have to check for the zero size channel case
    101     if ( size == 0 && chair != 0p ) {
    102         memcpy((void *)&retval, (void *)chair_elem, sizeof(T));
    103         unpark( chair );
    104         chair = 0p;
    105         unlock( mutex_lock );
    106         unlock( p_lock );
    107         unlock( c_lock );
    108         return retval;
    109     }
    110 
    111     // wait if buffer is empty, work will be completed by someone else
    112     if ( count == 0 ) {
    113         chair = active_thread();
    114         chair_elem = &retval;
    115         unlock( mutex_lock );
    116         park( );
    117         return retval;
    118     }
    119 
    120     // Remove from buffer
    121     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    122     count -= 1;
    123     front = (front + 1) % size;
    124 
    125     if ( chair != 0p ) {
    126         insert_( chan, *chair_elem );  // do waiting producer work
    127         unpark( chair );
    128         chair = 0p;
    129         unlock( mutex_lock );
    130         unlock( p_lock );
    131         unlock( c_lock );
    132         return retval;
    133     }
    134 
    135     unlock( mutex_lock );
    136     unlock( c_lock );
    137     return retval;
    138 }
    139 
    140 } // forall( T )
    141 #endif
    142 
    143 #ifdef __COOP_CHANNEL
    144 forall( T ) {
    145 struct channel {
    146     size_t size;
    147     size_t front, back, count;
    148     T * buffer;
    149     fast_cond_var( no_reacq_lock ) prods, cons;
    150     no_reacq_lock mutex_lock;
     61    dlist( wait_link ) prods, cons; // lists of blocked threads
     62    go_mutex mutex_lock;            // MX lock
     63    bool closed;                    // indicates channel close/open
     64    #ifdef CHAN_STATS
     65    size_t blocks, operations;      // counts total ops and ops resulting in a blocked thd
     66    #endif
    15167};
    15268
     
    15874    cons{};
    15975    mutex_lock{};
     76    closed = false;
     77    #ifdef CHAN_STATS
     78    blocks = 0;
     79    operations = 0;
     80    #endif
    16081}
    16182
    16283static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    163 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
     84static inline void ^?{}( channel(T) &c ) with(c) {
     85    #ifdef CHAN_STATS
     86    printf("Channel %p Blocks: %lu, Operations: %lu, %.2f%% of ops blocked\n", &c, blocks, operations, ((double)blocks)/operations * 100);
     87    #endif
     88    verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
     89    delete( buffer );
     90}
    16491static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    16592static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    166 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
    167 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
    168 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
    169 
    170 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
     93static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
     94static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
     95static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
     96
     97// closes the channel and notifies all blocked threads
     98static inline void close( channel(T) & chan ) with(chan) {
     99    lock( mutex_lock );
     100    closed = true;
     101
     102    // flush waiting consumers and producers
     103    while ( has_waiting_consumers( chan ) ) {
     104        cons`first.elem = 0p;
     105        wake_one( cons );
     106    }
     107    while ( has_waiting_producers( chan ) ) {
     108        prods`first.elem = 0p;
     109        wake_one( prods );
     110    }
     111    unlock(mutex_lock);
     112}
     113
     114static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
     115
     116static inline void flush( channel(T) & chan, T elem ) with(chan) {
     117    lock( mutex_lock );
     118    while ( count == 0 && !cons`isEmpty ) {
     119        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     120        wake_one( cons );
     121    }
     122    unlock( mutex_lock );
     123}
     124
     125// handles buffer insert
     126static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
    171127    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    172128    count += 1;
     
    175131}
    176132
     133// does the buffer insert or hands elem directly to consumer if one is waiting
     134static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {
     135    if ( count == 0 && !cons`isEmpty ) {
     136        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     137        wake_one( cons );
     138    } else __buf_insert( chan, elem );
     139}
     140
     141// needed to avoid an extra copy in closed case
     142static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
     143    lock( mutex_lock );
     144    #ifdef CHAN_STATS
     145    operations++;
     146    #endif
     147    if ( count == size ) { unlock( mutex_lock ); return false; }
     148    __do_insert( chan, elem );
     149    unlock( mutex_lock );
     150    return true;
     151}
     152
     153// attempts a nonblocking insert
     154// returns true if insert was successful, false otherwise
     155static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
     156
     157// handles closed case of insert routine
     158static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
     159    channel_closed except{&channel_closed_vt, &elem, &chan };
     160    throwResume except; // throw closed resumption
     161    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
     162}
    177163
    178164static inline void insert( channel(T) & chan, T elem ) with(chan) {
    179     lock( mutex_lock );
     165    // check for close before acquire mx
     166    if ( unlikely(closed) ) {
     167        __closed_insert( chan, elem );
     168        return;
     169    }
     170
     171    lock( mutex_lock );
     172
     173    #ifdef CHAN_STATS
     174    if ( !closed ) operations++;
     175    #endif
     176
     177    // if closed handle
     178    if ( unlikely(closed) ) {
     179        unlock( mutex_lock );
     180        __closed_insert( chan, elem );
     181        return;
     182    }
    180183
    181184    // have to check for the zero size channel case
    182     if ( size == 0 && !empty( cons ) ) {
    183         memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
    184         notify_one( cons );
     185    if ( size == 0 && !cons`isEmpty ) {
     186        memcpy(cons`first.elem, (void *)&elem, sizeof(T));
     187        wake_one( cons );
    185188        unlock( mutex_lock );
    186         return;
     189        return true;
    187190    }
    188191
    189192    // wait if buffer is full, work will be completed by someone else
    190     if ( count == size ) {
    191         wait( prods, mutex_lock, (uintptr_t)&elem );
     193    if ( count == size ) {
     194        #ifdef CHAN_STATS
     195        blocks++;
     196        #endif
     197
     198        // check for if woken due to close
     199        if ( unlikely( block( prods, &elem, mutex_lock ) ) )
     200            __closed_insert( chan, elem );
    192201        return;
    193202    } // if
    194203
    195     if ( count == 0 && !empty( cons ) )
    196         // do waiting consumer work
    197         memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
    198     else insert_( chan, elem );
     204    if ( count == 0 && !cons`isEmpty ) {
     205        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     206        wake_one( cons );
     207    } else __buf_insert( chan, elem );
    199208   
    200     notify_one( cons );
    201     unlock( mutex_lock );
    202 }
    203 
    204 static inline T remove( channel(T) & chan ) with(chan) {
    205     lock( mutex_lock );
    206     T retval;
    207 
    208     // have to check for the zero size channel case
    209     if ( size == 0 && !empty( prods ) ) {
    210         memcpy((void *)&retval, (void *)front( prods ), sizeof(T));
    211         notify_one( prods );
    212         unlock( mutex_lock );
    213         return retval;
    214     }
    215 
    216     // wait if buffer is empty, work will be completed by someone else
    217     if (count == 0) {
    218         wait( cons, mutex_lock, (uintptr_t)&retval );
    219         return retval;
    220     }
    221 
    222     // Remove from buffer
     209    unlock( mutex_lock );
     210    return;
     211}
     212
     213// handles buffer remove
     214static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) {
    223215    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    224216    count -= 1;
    225217    front = (front + 1) % size;
    226 
    227     if (count == size - 1 && !empty( prods ) )
    228         insert_( chan, *((T *)front( prods )) );  // do waiting producer work
    229 
    230     notify_one( prods );
    231     unlock( mutex_lock );
     218}
     219
     220// does the buffer remove and potentially does waiting producer work
     221static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
     222    __buf_remove( chan, retval );
     223    if (count == size - 1 && !prods`isEmpty ) {
     224        __buf_insert( chan, *(T *)prods`first.elem );  // do waiting producer work
     225        wake_one( prods );
     226    }
     227}
     228
     229// needed to avoid an extra copy in closed case and single return val case
     230static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
     231    lock( mutex_lock );
     232    #ifdef CHAN_STATS
     233    operations++;
     234    #endif
     235    if ( count == 0 ) { unlock( mutex_lock ); return false; }
     236    __do_remove( chan, retval );
     237    unlock( mutex_lock );
     238    return true;
     239}
     240
     241// attempts a nonblocking remove
     242// returns [T, true] if insert was successful
     243// returns [T, false] if insert was successful (T uninit)
     244static inline [T, bool] try_remove( channel(T) & chan ) {
     245    T retval;
     246    return [ retval, __internal_try_remove( chan, retval ) ];
     247}
     248
     249static inline T try_remove( channel(T) & chan, T elem ) {
     250    T retval;
     251    __internal_try_remove( chan, retval );
    232252    return retval;
    233253}
    234254
     255// handles closed case of insert routine
     256static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
     257    channel_closed except{&channel_closed_vt, 0p, &chan };
     258    throwResume except; // throw resumption
     259    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
     260}
     261
     262static inline T remove( channel(T) & chan ) with(chan) {
     263    T retval;
     264    if ( unlikely(closed) ) {
     265        __closed_remove( chan, retval );
     266        return retval;
     267    }
     268    lock( mutex_lock );
     269
     270    #ifdef CHAN_STATS
     271    if ( !closed ) operations++;
     272    #endif
     273
     274    if ( unlikely(closed) ) {
     275        unlock( mutex_lock );
     276        __closed_remove( chan, retval );
     277        return retval;
     278    }
     279
     280    // have to check for the zero size channel case
     281    if ( size == 0 && !prods`isEmpty ) {
     282        memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
     283        wake_one( prods );
     284        unlock( mutex_lock );
     285        return retval;
     286    }
     287
     288    // wait if buffer is empty, work will be completed by someone else
     289    if (count == 0) {
     290        #ifdef CHAN_STATS
     291        blocks++;
     292        #endif
     293        // check for if woken due to close
     294        if ( unlikely( block( cons, &retval, mutex_lock ) ) )
     295            __closed_remove( chan, retval );
     296        return retval;
     297    }
     298
     299    // Remove from buffer
     300    __do_remove( chan, retval );
     301
     302    unlock( mutex_lock );
     303    return retval;
     304}
    235305} // forall( T )
    236 #endif
    237 
    238 #ifdef __BARGE_CHANNEL
    239 forall( T ) {
    240 struct channel {
    241     size_t size;
    242     size_t front, back, count;
    243     T * buffer;
    244     fast_cond_var( exp_backoff_then_block_lock ) prods, cons;
    245     exp_backoff_then_block_lock mutex_lock;
    246 };
    247 
    248 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
    249     size = _size;
    250     front = back = count = 0;
    251     buffer = aalloc( size );
    252     prods{};
    253     cons{};
    254     mutex_lock{};
    255 }
    256 
    257 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    258 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
    259 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    260 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    261 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
    262 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
    263 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
    264 
    265 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
    266     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    267     count += 1;
    268     back++;
    269     if ( back == size ) back = 0;
    270 }
    271 
    272 
    273 static inline void insert( channel(T) & chan, T elem ) with(chan) {
    274     lock( mutex_lock );
    275 
    276     while ( count == size ) {
    277         wait( prods, mutex_lock );
    278     } // if
    279 
    280     insert_( chan, elem );
    281    
    282     if ( !notify_one( cons ) && count < size )
    283         notify_one( prods );
    284 
    285     unlock( mutex_lock );
    286 }
    287 
    288 static inline T remove( channel(T) & chan ) with(chan) {
    289     lock( mutex_lock );
    290     T retval;
    291 
    292     while (count == 0) {
    293         wait( cons, mutex_lock );
    294     }
    295 
    296     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    297     count -= 1;
    298     front = (front + 1) % size;
    299 
    300     if ( !notify_one( prods ) && count > 0 )
    301         notify_one( cons );
    302 
    303     unlock( mutex_lock );
    304     return retval;
    305 }
    306 
    307 } // forall( T )
    308 #endif
    309 
    310 #ifdef __NO_WAIT_CHANNEL
    311 forall( T ) {
    312 struct channel {
    313     size_t size;
    314     size_t front, back, count;
    315     T * buffer;
    316     thread$ * chair;
    317     T * chair_elem;
    318     exp_backoff_then_block_lock c_lock, p_lock;
    319     __spinlock_t mutex_lock;
    320 };
    321 
    322 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
    323     size = _size;
    324     front = back = count = 0;
    325     buffer = aalloc( size );
    326     chair = 0p;
    327     mutex_lock{};
    328     c_lock{};
    329     p_lock{};
    330     lock( c_lock );
    331 }
    332 
    333 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    334 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
    335 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    336 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    337 static inline bool has_waiters( channel(T) & chan ) with(chan) { return c_lock.lock_value != 0; }
    338 
    339 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
    340     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    341     count += 1;
    342     back++;
    343     if ( back == size ) back = 0;
    344 }
    345 
    346 static inline void insert( channel(T) & chan, T elem ) with( chan ) {
    347     lock( p_lock );
    348     lock( mutex_lock __cfaabi_dbg_ctx2 );
    349 
    350     insert_( chan, elem );
    351 
    352     if ( count != size )
    353         unlock( p_lock );
    354 
    355     if ( count == 1 )
    356         unlock( c_lock );
    357        
    358     unlock( mutex_lock );
    359 }
    360 
    361 static inline T remove( channel(T) & chan ) with(chan) {
    362     lock( c_lock );
    363     lock( mutex_lock __cfaabi_dbg_ctx2 );
    364     T retval;
    365 
    366     // Remove from buffer
    367     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    368     count -= 1;
    369     front = (front + 1) % size;
    370 
    371     if ( count != 0 )
    372         unlock( c_lock );
    373 
    374     if ( count == size - 1 )
    375         unlock( p_lock );
    376        
    377     unlock( mutex_lock );
    378     return retval;
    379 }
    380 
    381 } // forall( T )
    382 #endif
Note: See TracChangeset for help on using the changeset viewer.