Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    ra45e21c r42b739d7  
    1 #pragma once
     1#include <locks.hfa>
    22
    3 #include <locks.hfa>
    4 #include <list.hfa>
    5 #include <mutex_stmt.hfa>
     3struct no_reacq_lock {
     4    inline exp_backoff_then_block_lock;
     5};
    66
    7 // link field used for threads waiting on channel
    8 struct wait_link {
    9     // used to put wait_link on a dl queue
    10     inline dlink(wait_link);
    11 
    12     // waiting thread
    13     struct thread$ * t;
    14 
    15     // shadow field
    16     void * elem;
    17 };
    18 P9_EMBEDDED( wait_link, dlink(wait_link) )
    19 
    20 static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
    21     this.t = t;
    22     this.elem = elem;
    23 }
    24 
    25 // wake one thread from the list
    26 static inline void wake_one( dlist( wait_link ) & queue ) {
    27     wait_link & popped = try_pop_front( queue );
    28     unpark( popped.t );
    29 }
    30 
    31 // returns true if woken due to shutdown
    32 // blocks thread on list and releases passed lock
    33 static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) {
    34     wait_link w{ active_thread(), elem_ptr };
    35     insert_last( queue, w );
    36     unlock( lock );
    37     park();
    38     return w.elem == 0p;
    39 }
    40 
    41 // void * used for some fields since exceptions don't work with parametric polymorphism currently
    42 exception channel_closed {
    43     // on failed insert elem is a ptr to the element attempting to be inserted
    44     // on failed remove elem ptr is 0p
    45     // on resumption of a failed insert this elem will be inserted
    46     // so a user may modify it in the resumption handler
    47     void * elem;
    48 
    49     // pointer to chan that is closed
    50     void * closed_chan;
    51 };
    52 vtable(channel_closed) channel_closed_vt;
    53 
    54 // #define CHAN_STATS // define this to get channel stats printed in dtor
     7// have to override these by hand to get around plan 9 inheritance bug where resolver can't find the appropriate routine to call
     8static inline void   ?{}( no_reacq_lock & this ) { ((exp_backoff_then_block_lock &)this){}; }
     9static inline bool   try_lock(no_reacq_lock & this) { return try_lock(((exp_backoff_then_block_lock &)this)); }
     10static inline void   lock(no_reacq_lock & this) { lock(((exp_backoff_then_block_lock &)this)); }
     11static inline void   unlock(no_reacq_lock & this) { unlock(((exp_backoff_then_block_lock &)this)); }
     12static inline void   on_notify(no_reacq_lock & this, struct thread$ * t ) { on_notify(((exp_backoff_then_block_lock &)this), t); }
     13static inline size_t on_wait(no_reacq_lock & this) { return on_wait(((exp_backoff_then_block_lock &)this)); }
     14// override wakeup so that we don't reacquire the lock if using a condvar
     15static inline void   on_wakeup( no_reacq_lock & this, size_t recursion ) {}
    5516
    5617forall( T ) {
    57 
    58 struct __attribute__((aligned(128))) channel {
    59     size_t size, front, back, count;
     18struct channel {
     19    size_t size;
     20    size_t front, back, count;
    6021    T * buffer;
    61     dlist( wait_link ) prods, cons; // lists of blocked threads
    62     go_mutex mutex_lock;            // MX lock
    63     bool closed;                    // indicates channel close/open
    64     #ifdef CHAN_STATS
    65     size_t blocks, operations;      // counts total ops and ops resulting in a blocked thd
    66     #endif
     22    fast_cond_var( no_reacq_lock ) prods, cons;
     23    no_reacq_lock mutex_lock;
    6724};
    6825
     
    7027    size = _size;
    7128    front = back = count = 0;
    72     buffer = aalloc( size );
     29    buffer = anew( size );
    7330    prods{};
    7431    cons{};
    7532    mutex_lock{};
    76     closed = false;
    77     #ifdef CHAN_STATS
    78     blocks = 0;
    79     operations = 0;
    80     #endif
    8133}
    8234
    8335static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    84 static inline void ^?{}( channel(T) &c ) with(c) {
    85     #ifdef CHAN_STATS
    86     printf("Channel %p Blocks: %lu, Operations: %lu, %.2f%% of ops blocked\n", &c, blocks, operations, ((double)blocks)/operations * 100);
    87     #endif
    88     verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
    89     delete( buffer );
    90 }
     36static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
    9137static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    9238static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    93 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
    94 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
    95 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
     39static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
     40static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
     41static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
    9642
    97 // closes the channel and notifies all blocked threads
    98 static inline void close( channel(T) & chan ) with(chan) {
    99     lock( mutex_lock );
    100     closed = true;
    101 
    102     // flush waiting consumers and producers
    103     while ( has_waiting_consumers( chan ) ) {
    104         cons`first.elem = 0p;
    105         wake_one( cons );
    106     }
    107     while ( has_waiting_producers( chan ) ) {
    108         prods`first.elem = 0p;
    109         wake_one( prods );
    110     }
    111     unlock(mutex_lock);
    112 }
    113 
    114 static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
    115 
    116 static inline void flush( channel(T) & chan, T elem ) with(chan) {
    117     lock( mutex_lock );
    118     while ( count == 0 && !cons`isEmpty ) {
    119         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    120         wake_one( cons );
    121     }
    122     unlock( mutex_lock );
    123 }
    124 
    125 // handles buffer insert
    126 static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
     43static inline void insert_( channel(T) & chan, T elem ) with(chan) {
    12744    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    12845    count += 1;
     
    13148}
    13249
    133 // does the buffer insert or hands elem directly to consumer if one is waiting
    134 static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {
    135     if ( count == 0 && !cons`isEmpty ) {
    136         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    137         wake_one( cons );
    138     } else __buf_insert( chan, elem );
    139 }
    140 
    141 // needed to avoid an extra copy in closed case
    142 static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
    143     lock( mutex_lock );
    144     #ifdef CHAN_STATS
    145     operations++;
    146     #endif
    147     if ( count == size ) { unlock( mutex_lock ); return false; }
    148     __do_insert( chan, elem );
    149     unlock( mutex_lock );
    150     return true;
    151 }
    152 
    153 // attempts a nonblocking insert
    154 // returns true if insert was successful, false otherwise
    155 static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
    156 
    157 // handles closed case of insert routine
    158 static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
    159     channel_closed except{&channel_closed_vt, &elem, &chan };
    160     throwResume except; // throw closed resumption
    161     if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
    162 }
    16350
    16451static inline void insert( channel(T) & chan, T elem ) with(chan) {
    165     // check for close before acquire mx
    166     if ( unlikely(closed) ) {
    167         __closed_insert( chan, elem );
    168         return;
    169     }
    170 
    17152    lock( mutex_lock );
    17253
    173     #ifdef CHAN_STATS
    174     if ( !closed ) operations++;
    175     #endif
    176 
    177     // if closed handle
    178     if ( unlikely(closed) ) {
     54    // have to check for the zero size channel case
     55    if ( size == 0 && !empty( cons ) ) {
     56        memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
     57        notify_one( cons );
    17958        unlock( mutex_lock );
    180         __closed_insert( chan, elem );
    18159        return;
    18260    }
    18361
    184     // have to check for the zero size channel case
    185     if ( size == 0 && !cons`isEmpty ) {
    186         memcpy(cons`first.elem, (void *)&elem, sizeof(T));
    187         wake_one( cons );
    188         unlock( mutex_lock );
    189         return true;
    190     }
    191 
    19262    // wait if buffer is full, work will be completed by someone else
    193     if ( count == size ) {
    194         #ifdef CHAN_STATS
    195         blocks++;
    196         #endif
    197 
    198         // check for if woken due to close
    199         if ( unlikely( block( prods, &elem, mutex_lock ) ) )
    200             __closed_insert( chan, elem );
     63    if ( count == size ) {
     64        wait( prods, mutex_lock, (uintptr_t)&elem );
    20165        return;
    20266    } // if
    20367
    204     if ( count == 0 && !cons`isEmpty ) {
    205         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    206         wake_one( cons );
    207     } else __buf_insert( chan, elem );
     68    if ( count == 0 && !empty( cons ) )
     69        // do waiting consumer work
     70        memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
     71    else insert_( chan, elem );
    20872   
     73    notify_one( cons );
    20974    unlock( mutex_lock );
    210     return;
    211 }
    212 
    213 // handles buffer remove
    214 static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) {
    215     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    216     count -= 1;
    217     front = (front + 1) % size;
    218 }
    219 
    220 // does the buffer remove and potentially does waiting producer work
    221 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
    222     __buf_remove( chan, retval );
    223     if (count == size - 1 && !prods`isEmpty ) {
    224         __buf_insert( chan, *(T *)prods`first.elem );  // do waiting producer work
    225         wake_one( prods );
    226     }
    227 }
    228 
    229 // needed to avoid an extra copy in closed case and single return val case
    230 static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
    231     lock( mutex_lock );
    232     #ifdef CHAN_STATS
    233     operations++;
    234     #endif
    235     if ( count == 0 ) { unlock( mutex_lock ); return false; }
    236     __do_remove( chan, retval );
    237     unlock( mutex_lock );
    238     return true;
    239 }
    240 
    241 // attempts a nonblocking remove
    242 // returns [T, true] if insert was successful
    243 // returns [T, false] if insert was successful (T uninit)
    244 static inline [T, bool] try_remove( channel(T) & chan ) {
    245     T retval;
    246     return [ retval, __internal_try_remove( chan, retval ) ];
    247 }
    248 
    249 static inline T try_remove( channel(T) & chan, T elem ) {
    250     T retval;
    251     __internal_try_remove( chan, retval );
    252     return retval;
    253 }
    254 
    255 // handles closed case of insert routine
    256 static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
    257     channel_closed except{&channel_closed_vt, 0p, &chan };
    258     throwResume except; // throw resumption
    259     if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
    26075}
    26176
    26277static inline T remove( channel(T) & chan ) with(chan) {
     78    lock( mutex_lock );
    26379    T retval;
    264     if ( unlikely(closed) ) {
    265         __closed_remove( chan, retval );
    266         return retval;
    267     }
    268     lock( mutex_lock );
    269 
    270     #ifdef CHAN_STATS
    271     if ( !closed ) operations++;
    272     #endif
    273 
    274     if ( unlikely(closed) ) {
    275         unlock( mutex_lock );
    276         __closed_remove( chan, retval );
    277         return retval;
    278     }
    27980
    28081    // have to check for the zero size channel case
    281     if ( size == 0 && !prods`isEmpty ) {
    282         memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
    283         wake_one( prods );
     82    if ( size == 0 && !empty( prods ) ) {
     83        memcpy((void *)&retval, (void *)front( prods ), sizeof(T));
     84        notify_one( prods );
    28485        unlock( mutex_lock );
    28586        return retval;
     
    28788
    28889    // wait if buffer is empty, work will be completed by someone else
    289     if (count == 0) {
    290         #ifdef CHAN_STATS
    291         blocks++;
    292         #endif
    293         // check for if woken due to close
    294         if ( unlikely( block( cons, &retval, mutex_lock ) ) )
    295             __closed_remove( chan, retval );
     90    if (count == 0) {
     91        wait( cons, mutex_lock, (uintptr_t)&retval );
    29692        return retval;
    29793    }
    29894
    29995    // Remove from buffer
    300     __do_remove( chan, retval );
     96    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     97    count -= 1;
     98    front = (front + 1) % size;
    30199
     100    if (count == size - 1 && !empty( prods ) )
     101        insert_( chan, *((T *)front( prods )) );  // do waiting producer work
     102
     103    notify_one( prods );
    302104    unlock( mutex_lock );
    303105    return retval;
    304106}
     107
    305108} // forall( T )
Note: See TracChangeset for help on using the changeset viewer.