Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    ra45e21c r0e16a2d  
    22
    33#include <locks.hfa>
    4 #include <list.hfa>
    5 #include <mutex_stmt.hfa>
    6 
    7 // link field used for threads waiting on channel
    8 struct wait_link {
    9     // used to put wait_link on a dl queue
    10     inline dlink(wait_link);
    11 
    12     // waiting thread
    13     struct thread$ * t;
    14 
    15     // shadow field
    16     void * elem;
    17 };
    18 P9_EMBEDDED( wait_link, dlink(wait_link) )
    19 
    20 static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
    21     this.t = t;
    22     this.elem = elem;
    23 }
    24 
    25 // wake one thread from the list
    26 static inline void wake_one( dlist( wait_link ) & queue ) {
    27     wait_link & popped = try_pop_front( queue );
    28     unpark( popped.t );
    29 }
    30 
    31 // returns true if woken due to shutdown
    32 // blocks thread on list and releases passed lock
    33 static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) {
    34     wait_link w{ active_thread(), elem_ptr };
    35     insert_last( queue, w );
    36     unlock( lock );
    37     park();
    38     return w.elem == 0p;
    39 }
    40 
    41 // void * used for some fields since exceptions don't work with parametric polymorphism currently
    42 exception channel_closed {
    43     // on failed insert elem is a ptr to the element attempting to be inserted
    44     // on failed remove elem ptr is 0p
    45     // on resumption of a failed insert this elem will be inserted
    46     // so a user may modify it in the resumption handler
    47     void * elem;
    48 
    49     // pointer to chan that is closed
    50     void * closed_chan;
    51 };
    52 vtable(channel_closed) channel_closed_vt;
    53 
    54 // #define CHAN_STATS // define this to get channel stats printed in dtor
    55 
     4
     5struct no_reacq_lock {
     6    inline exp_backoff_then_block_lock;
     7};
     8
     9// have to override these by hand to get around plan 9 inheritance bug where resolver can't find the appropriate routine to call
     10static inline void   ?{}( no_reacq_lock & this ) { ((exp_backoff_then_block_lock &)this){}; }
     11static inline bool   try_lock(no_reacq_lock & this) { return try_lock(((exp_backoff_then_block_lock &)this)); }
     12static inline void   lock(no_reacq_lock & this) { lock(((exp_backoff_then_block_lock &)this)); }
     13static inline void   unlock(no_reacq_lock & this) { unlock(((exp_backoff_then_block_lock &)this)); }
     14static inline void   on_notify(no_reacq_lock & this, struct thread$ * t ) { on_notify(((exp_backoff_then_block_lock &)this), t); }
     15static inline size_t on_wait(no_reacq_lock & this) { return on_wait(((exp_backoff_then_block_lock &)this)); }
     16// override wakeup so that we don't reacquire the lock if using a condvar
     17static inline void   on_wakeup( no_reacq_lock & this, size_t recursion ) {}
     18
     19#define __PREVENTION_CHANNEL
     20#ifdef __PREVENTION_CHANNEL
    5621forall( T ) {
    57 
    58 struct __attribute__((aligned(128))) channel {
    59     size_t size, front, back, count;
     22struct channel {
     23    size_t size;
     24    size_t front, back, count;
    6025    T * buffer;
    61     dlist( wait_link ) prods, cons; // lists of blocked threads
    62     go_mutex mutex_lock;            // MX lock
    63     bool closed;                    // indicates channel close/open
    64     #ifdef CHAN_STATS
    65     size_t blocks, operations;      // counts total ops and ops resulting in a blocked thd
    66     #endif
     26    thread$ * chair;
     27    T * chair_elem;
     28    exp_backoff_then_block_lock c_lock, p_lock;
     29    __spinlock_t mutex_lock;
     30    char __padding[64]; // avoid false sharing in arrays of channels
     31};
     32
     33static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
     34    size = _size;
     35    front = back = count = 0;
     36    buffer = aalloc( size );
     37    chair = 0p;
     38    mutex_lock{};
     39    c_lock{};
     40    p_lock{};
     41}
     42
     43static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
     44static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
     45static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
     46static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
     47static inline bool has_waiters( channel(T) & chan ) with(chan) { return chair != 0p; }
     48
     49static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
     50    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
     51    count += 1;
     52    back++;
     53    if ( back == size ) back = 0;
     54}
     55
     56static inline void insert( channel(T) & chan, T elem ) with( chan ) {
     57    lock( p_lock );
     58    lock( mutex_lock __cfaabi_dbg_ctx2 );
     59
     60    // have to check for the zero size channel case
     61    if ( size == 0 && chair != 0p ) {
     62        memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
     63        unpark( chair );
     64        chair = 0p;
     65        unlock( mutex_lock );
     66        unlock( p_lock );
     67        unlock( c_lock );
     68        return;
     69    }
     70
     71    // wait if buffer is full, work will be completed by someone else
     72    if ( count == size ) {
     73        chair = active_thread();
     74        chair_elem = &elem;
     75        unlock( mutex_lock );
     76        park( );
     77        return;
     78    } // if
     79
     80    if ( chair != 0p ) {
     81        memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
     82        unpark( chair );
     83        chair = 0p;
     84        unlock( mutex_lock );
     85        unlock( p_lock );
     86        unlock( c_lock );
     87        return;
     88    }
     89    else insert_( chan, elem );
     90
     91    unlock( mutex_lock );
     92    unlock( p_lock );
     93}
     94
     95static inline T remove( channel(T) & chan ) with(chan) {
     96    lock( c_lock );
     97    lock( mutex_lock __cfaabi_dbg_ctx2 );
     98    T retval;
     99
     100    // have to check for the zero size channel case
     101    if ( size == 0 && chair != 0p ) {
     102        memcpy((void *)&retval, (void *)chair_elem, sizeof(T));
     103        unpark( chair );
     104        chair = 0p;
     105        unlock( mutex_lock );
     106        unlock( p_lock );
     107        unlock( c_lock );
     108        return retval;
     109    }
     110
     111    // wait if buffer is empty, work will be completed by someone else
     112    if ( count == 0 ) {
     113        chair = active_thread();
     114        chair_elem = &retval;
     115        unlock( mutex_lock );
     116        park( );
     117        return retval;
     118    }
     119
     120    // Remove from buffer
     121    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     122    count -= 1;
     123    front = (front + 1) % size;
     124
     125    if ( chair != 0p ) {
     126        insert_( chan, *chair_elem );  // do waiting producer work
     127        unpark( chair );
     128        chair = 0p;
     129        unlock( mutex_lock );
     130        unlock( p_lock );
     131        unlock( c_lock );
     132        return retval;
     133    }
     134
     135    unlock( mutex_lock );
     136    unlock( c_lock );
     137    return retval;
     138}
     139
     140} // forall( T )
     141#endif
     142
     143#ifdef __COOP_CHANNEL
     144forall( T ) {
     145struct channel {
     146    size_t size;
     147    size_t front, back, count;
     148    T * buffer;
     149    fast_cond_var( no_reacq_lock ) prods, cons;
     150    no_reacq_lock mutex_lock;
    67151};
    68152
     
    74158    cons{};
    75159    mutex_lock{};
    76     closed = false;
    77     #ifdef CHAN_STATS
    78     blocks = 0;
    79     operations = 0;
    80     #endif
    81160}
    82161
    83162static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    84 static inline void ^?{}( channel(T) &c ) with(c) {
    85     #ifdef CHAN_STATS
    86     printf("Channel %p Blocks: %lu, Operations: %lu, %.2f%% of ops blocked\n", &c, blocks, operations, ((double)blocks)/operations * 100);
    87     #endif
    88     verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
    89     delete( buffer );
    90 }
     163static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
    91164static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    92165static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    93 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
    94 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
    95 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
    96 
    97 // closes the channel and notifies all blocked threads
    98 static inline void close( channel(T) & chan ) with(chan) {
    99     lock( mutex_lock );
    100     closed = true;
    101 
    102     // flush waiting consumers and producers
    103     while ( has_waiting_consumers( chan ) ) {
    104         cons`first.elem = 0p;
    105         wake_one( cons );
    106     }
    107     while ( has_waiting_producers( chan ) ) {
    108         prods`first.elem = 0p;
    109         wake_one( prods );
    110     }
    111     unlock(mutex_lock);
    112 }
    113 
    114 static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
    115 
    116 static inline void flush( channel(T) & chan, T elem ) with(chan) {
    117     lock( mutex_lock );
    118     while ( count == 0 && !cons`isEmpty ) {
    119         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    120         wake_one( cons );
    121     }
    122     unlock( mutex_lock );
    123 }
    124 
    125 // handles buffer insert
    126 static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
     166static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
     167static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
     168static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
     169
     170static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
    127171    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    128172    count += 1;
     
    131175}
    132176
    133 // does the buffer insert or hands elem directly to consumer if one is waiting
    134 static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {
    135     if ( count == 0 && !cons`isEmpty ) {
    136         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    137         wake_one( cons );
    138     } else __buf_insert( chan, elem );
    139 }
    140 
    141 // needed to avoid an extra copy in closed case
    142 static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
     177
     178static inline void insert( channel(T) & chan, T elem ) with(chan) {
    143179    lock( mutex_lock );
    144     #ifdef CHAN_STATS
    145     operations++;
    146     #endif
    147     if ( count == size ) { unlock( mutex_lock ); return false; }
    148     __do_insert( chan, elem );
    149     unlock( mutex_lock );
    150     return true;
    151 }
    152 
    153 // attempts a nonblocking insert
    154 // returns true if insert was successful, false otherwise
    155 static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
    156 
    157 // handles closed case of insert routine
    158 static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
    159     channel_closed except{&channel_closed_vt, &elem, &chan };
    160     throwResume except; // throw closed resumption
    161     if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
    162 }
    163 
    164 static inline void insert( channel(T) & chan, T elem ) with(chan) {
    165     // check for close before acquire mx
    166     if ( unlikely(closed) ) {
    167         __closed_insert( chan, elem );
    168         return;
    169     }
    170 
     180
     181    // have to check for the zero size channel case
     182    if ( size == 0 && !empty( cons ) ) {
     183        memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
     184        notify_one( cons );
     185        unlock( mutex_lock );
     186        return;
     187    }
     188
     189    // wait if buffer is full, work will be completed by someone else
     190    if ( count == size ) {
     191        wait( prods, mutex_lock, (uintptr_t)&elem );
     192        return;
     193    } // if
     194
     195    if ( count == 0 && !empty( cons ) )
     196        // do waiting consumer work
     197        memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
     198    else insert_( chan, elem );
     199   
     200    notify_one( cons );
     201    unlock( mutex_lock );
     202}
     203
     204static inline T remove( channel(T) & chan ) with(chan) {
    171205    lock( mutex_lock );
    172 
    173     #ifdef CHAN_STATS
    174     if ( !closed ) operations++;
    175     #endif
    176 
    177     // if closed handle
    178     if ( unlikely(closed) ) {
    179         unlock( mutex_lock );
    180         __closed_insert( chan, elem );
    181         return;
    182     }
     206    T retval;
    183207
    184208    // have to check for the zero size channel case
    185     if ( size == 0 && !cons`isEmpty ) {
    186         memcpy(cons`first.elem, (void *)&elem, sizeof(T));
    187         wake_one( cons );
    188         unlock( mutex_lock );
    189         return true;
    190     }
    191 
    192     // wait if buffer is full, work will be completed by someone else
    193     if ( count == size ) {
    194         #ifdef CHAN_STATS
    195         blocks++;
    196         #endif
    197 
    198         // check for if woken due to close
    199         if ( unlikely( block( prods, &elem, mutex_lock ) ) )
    200             __closed_insert( chan, elem );
    201         return;
    202     } // if
    203 
    204     if ( count == 0 && !cons`isEmpty ) {
    205         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    206         wake_one( cons );
    207     } else __buf_insert( chan, elem );
    208    
    209     unlock( mutex_lock );
    210     return;
    211 }
    212 
    213 // handles buffer remove
    214 static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) {
     209    if ( size == 0 && !empty( prods ) ) {
     210        memcpy((void *)&retval, (void *)front( prods ), sizeof(T));
     211        notify_one( prods );
     212        unlock( mutex_lock );
     213        return retval;
     214    }
     215
     216    // wait if buffer is empty, work will be completed by someone else
     217    if (count == 0) {
     218        wait( cons, mutex_lock, (uintptr_t)&retval );
     219        return retval;
     220    }
     221
     222    // Remove from buffer
    215223    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    216224    count -= 1;
    217225    front = (front + 1) % size;
    218 }
    219 
    220 // does the buffer remove and potentially does waiting producer work
    221 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
    222     __buf_remove( chan, retval );
    223     if (count == size - 1 && !prods`isEmpty ) {
    224         __buf_insert( chan, *(T *)prods`first.elem );  // do waiting producer work
    225         wake_one( prods );
    226     }
    227 }
    228 
    229 // needed to avoid an extra copy in closed case and single return val case
    230 static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
     226
     227    if (count == size - 1 && !empty( prods ) )
     228        insert_( chan, *((T *)front( prods )) );  // do waiting producer work
     229
     230    notify_one( prods );
     231    unlock( mutex_lock );
     232    return retval;
     233}
     234
     235} // forall( T )
     236#endif
     237
     238#ifdef __BARGE_CHANNEL
     239forall( T ) {
     240struct channel {
     241    size_t size;
     242    size_t front, back, count;
     243    T * buffer;
     244    fast_cond_var( exp_backoff_then_block_lock ) prods, cons;
     245    exp_backoff_then_block_lock mutex_lock;
     246};
     247
     248static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
     249    size = _size;
     250    front = back = count = 0;
     251    buffer = aalloc( size );
     252    prods{};
     253    cons{};
     254    mutex_lock{};
     255}
     256
     257static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
     258static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
     259static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
     260static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
     261static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
     262static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
     263static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
     264
     265static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
     266    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
     267    count += 1;
     268    back++;
     269    if ( back == size ) back = 0;
     270}
     271
     272
     273static inline void insert( channel(T) & chan, T elem ) with(chan) {
    231274    lock( mutex_lock );
    232     #ifdef CHAN_STATS
    233     operations++;
    234     #endif
    235     if ( count == 0 ) { unlock( mutex_lock ); return false; }
    236     __do_remove( chan, retval );
    237     unlock( mutex_lock );
    238     return true;
    239 }
    240 
    241 // attempts a nonblocking remove
    242 // returns [T, true] if insert was successful
    243 // returns [T, false] if insert was successful (T uninit)
    244 static inline [T, bool] try_remove( channel(T) & chan ) {
     275
     276    while ( count == size ) {
     277        wait( prods, mutex_lock );
     278    } // if
     279
     280    insert_( chan, elem );
     281   
     282    if ( !notify_one( cons ) && count < size )
     283        notify_one( prods );
     284
     285    unlock( mutex_lock );
     286}
     287
     288static inline T remove( channel(T) & chan ) with(chan) {
     289    lock( mutex_lock );
    245290    T retval;
    246     return [ retval, __internal_try_remove( chan, retval ) ];
    247 }
    248 
    249 static inline T try_remove( channel(T) & chan, T elem ) {
     291
     292    while (count == 0) {
     293        wait( cons, mutex_lock );
     294    }
     295
     296    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     297    count -= 1;
     298    front = (front + 1) % size;
     299
     300    if ( !notify_one( prods ) && count > 0 )
     301        notify_one( cons );
     302
     303    unlock( mutex_lock );
     304    return retval;
     305}
     306
     307} // forall( T )
     308#endif
     309
     310#ifdef __NO_WAIT_CHANNEL
     311forall( T ) {
     312struct channel {
     313    size_t size;
     314    size_t front, back, count;
     315    T * buffer;
     316    thread$ * chair;
     317    T * chair_elem;
     318    exp_backoff_then_block_lock c_lock, p_lock;
     319    __spinlock_t mutex_lock;
     320};
     321
     322static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
     323    size = _size;
     324    front = back = count = 0;
     325    buffer = aalloc( size );
     326    chair = 0p;
     327    mutex_lock{};
     328    c_lock{};
     329    p_lock{};
     330    lock( c_lock );
     331}
     332
     333static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
     334static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
     335static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
     336static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
     337static inline bool has_waiters( channel(T) & chan ) with(chan) { return c_lock.lock_value != 0; }
     338
     339static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
     340    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
     341    count += 1;
     342    back++;
     343    if ( back == size ) back = 0;
     344}
     345
     346static inline void insert( channel(T) & chan, T elem ) with( chan ) {
     347    lock( p_lock );
     348    lock( mutex_lock __cfaabi_dbg_ctx2 );
     349
     350    insert_( chan, elem );
     351
     352    if ( count != size )
     353        unlock( p_lock );
     354
     355    if ( count == 1 )
     356        unlock( c_lock );
     357       
     358    unlock( mutex_lock );
     359}
     360
     361static inline T remove( channel(T) & chan ) with(chan) {
     362    lock( c_lock );
     363    lock( mutex_lock __cfaabi_dbg_ctx2 );
    250364    T retval;
    251     __internal_try_remove( chan, retval );
     365
     366    // Remove from buffer
     367    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     368    count -= 1;
     369    front = (front + 1) % size;
     370
     371    if ( count != 0 )
     372        unlock( c_lock );
     373
     374    if ( count == size - 1 )
     375        unlock( p_lock );
     376       
     377    unlock( mutex_lock );
    252378    return retval;
    253379}
    254380
    255 // handles closed case of insert routine
    256 static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
    257     channel_closed except{&channel_closed_vt, 0p, &chan };
    258     throwResume except; // throw resumption
    259     if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
    260 }
    261 
    262 static inline T remove( channel(T) & chan ) with(chan) {
    263     T retval;
    264     if ( unlikely(closed) ) {
    265         __closed_remove( chan, retval );
    266         return retval;
    267     }
    268     lock( mutex_lock );
    269 
    270     #ifdef CHAN_STATS
    271     if ( !closed ) operations++;
    272     #endif
    273 
    274     if ( unlikely(closed) ) {
    275         unlock( mutex_lock );
    276         __closed_remove( chan, retval );
    277         return retval;
    278     }
    279 
    280     // have to check for the zero size channel case
    281     if ( size == 0 && !prods`isEmpty ) {
    282         memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
    283         wake_one( prods );
    284         unlock( mutex_lock );
    285         return retval;
    286     }
    287 
    288     // wait if buffer is empty, work will be completed by someone else
    289     if (count == 0) {
    290         #ifdef CHAN_STATS
    291         blocks++;
    292         #endif
    293         // check for if woken due to close
    294         if ( unlikely( block( cons, &retval, mutex_lock ) ) )
    295             __closed_remove( chan, retval );
    296         return retval;
    297     }
    298 
    299     // Remove from buffer
    300     __do_remove( chan, retval );
    301 
    302     unlock( mutex_lock );
    303     return retval;
    304 }
    305381} // forall( T )
     382#endif
Note: See TracChangeset for help on using the changeset viewer.