Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    rbeeff61e ra45e21c  
    44#include <list.hfa>
    55#include <mutex_stmt.hfa>
    6 #include "select.hfa"
     6
     7// link field used for threads waiting on channel
     8struct wait_link {
     9    // used to put wait_link on a dl queue
     10    inline dlink(wait_link);
     11
     12    // waiting thread
     13    struct thread$ * t;
     14
     15    // shadow field
     16    void * elem;
     17};
     18P9_EMBEDDED( wait_link, dlink(wait_link) )
     19
     20static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
     21    this.t = t;
     22    this.elem = elem;
     23}
     24
     25// wake one thread from the list
     26static inline void wake_one( dlist( wait_link ) & queue ) {
     27    wait_link & popped = try_pop_front( queue );
     28    unpark( popped.t );
     29}
    730
    831// returns true if woken due to shutdown
    932// blocks thread on list and releases passed lock
    10 static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
    11     select_node sn{ active_thread(), elem_ptr };
    12     insert_last( queue, sn );
     33static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) {
     34    wait_link w{ active_thread(), elem_ptr };
     35    insert_last( queue, w );
    1336    unlock( lock );
    1437    park();
    15     return sn.extra == 0p;
    16 }
    17 
    18 // Waituntil support (un)register_select helper routine
    19 // Sets select node avail if not special OR case and then unlocks
    20 static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
    21     if ( node.park_counter ) __make_select_node_available( node );
    22     unlock( mutex_lock );
     38    return w.elem == 0p;
    2339}
    2440
     
    4359    size_t size, front, back, count;
    4460    T * buffer;
    45     dlist( select_node ) prods, cons; // lists of blocked threads
     61    dlist( wait_link ) prods, cons; // lists of blocked threads
    4662    go_mutex mutex_lock;            // MX lock
    4763    bool closed;                    // indicates channel close/open
     
    5470    size = _size;
    5571    front = back = count = 0;
    56     if ( size != 0 ) buffer = aalloc( size );
     72    buffer = aalloc( size );
    5773    prods{};
    5874    cons{};
     
    7187    #endif
    7288    verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
    73     if ( size != 0 ) delete( buffer );
     89    delete( buffer );
    7490}
    7591static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
     
    86102    // flush waiting consumers and producers
    87103    while ( has_waiting_consumers( chan ) ) {
    88         if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
    89             break;  // if __handle_waituntil_OR returns false cons is empty so break
    90         cons`first.extra = 0p;
     104        cons`first.elem = 0p;
    91105        wake_one( cons );
    92106    }
    93107    while ( has_waiting_producers( chan ) ) {
    94         if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
    95             break;  // if __handle_waituntil_OR returns false prods is empty so break
    96         prods`first.extra = 0p;
     108        prods`first.elem = 0p;
    97109        wake_one( prods );
    98110    }
     
    102114static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
    103115
    104 // used to hand an element to a blocked consumer and signal it
    105 static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
    106     memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
    107     wake_one( cons );
    108 }
    109 
    110 // used to hand an element to a blocked producer and signal it
    111 static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
    112     memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
    113     wake_one( prods );
    114 }
    115 
    116116static inline void flush( channel(T) & chan, T elem ) with(chan) {
    117117    lock( mutex_lock );
    118118    while ( count == 0 && !cons`isEmpty ) {
    119         __cons_handoff( chan, elem );
     119        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     120        wake_one( cons );
    120121    }
    121122    unlock( mutex_lock );
     
    124125// handles buffer insert
    125126static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
    126     memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
     127    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    127128    count += 1;
    128129    back++;
     
    130131}
    131132
     133// does the buffer insert or hands elem directly to consumer if one is waiting
     134static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {
     135    if ( count == 0 && !cons`isEmpty ) {
     136        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     137        wake_one( cons );
     138    } else __buf_insert( chan, elem );
     139}
     140
    132141// needed to avoid an extra copy in closed case
    133142static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
     
    136145    operations++;
    137146    #endif
    138 
    139     ConsEmpty: if ( !cons`isEmpty ) {
    140         if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
    141         __cons_handoff( chan, elem );
    142         unlock( mutex_lock );
    143         return true;
    144     }
    145 
    146147    if ( count == size ) { unlock( mutex_lock ); return false; }
    147 
    148     __buf_insert( chan, elem );
     148    __do_insert( chan, elem );
    149149    unlock( mutex_lock );
    150150    return true;
     
    157157// handles closed case of insert routine
    158158static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
    159     channel_closed except{ &channel_closed_vt, &elem, &chan };
     159    channel_closed except{&channel_closed_vt, &elem, &chan };
    160160    throwResume except; // throw closed resumption
    161161    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
     
    182182    }
    183183
    184     // buffer count must be zero if cons are blocked (also handles zero-size case)
    185     ConsEmpty: if ( !cons`isEmpty ) {
    186         if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
    187         __cons_handoff( chan, elem );
     184    // have to check for the zero size channel case
     185    if ( size == 0 && !cons`isEmpty ) {
     186        memcpy(cons`first.elem, (void *)&elem, sizeof(T));
     187        wake_one( cons );
    188188        unlock( mutex_lock );
    189         return;
     189        return true;
    190190    }
    191191
     
    202202    } // if
    203203
    204     __buf_insert( chan, elem );
    205     unlock( mutex_lock );
     204    if ( count == 0 && !cons`isEmpty ) {
     205        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     206        wake_one( cons );
     207    } else __buf_insert( chan, elem );
     208   
     209    unlock( mutex_lock );
     210    return;
     211}
     212
     213// handles buffer remove
     214static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) {
     215    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     216    count -= 1;
     217    front = (front + 1) % size;
    206218}
    207219
    208220// does the buffer remove and potentially does waiting producer work
    209221static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
    210     memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
    211     count -= 1;
    212     front = (front + 1) % size;
     222    __buf_remove( chan, retval );
    213223    if (count == size - 1 && !prods`isEmpty ) {
    214         if ( !__handle_waituntil_OR( prods ) ) return;
    215         __buf_insert( chan, *(T *)prods`first.extra );  // do waiting producer work
     224        __buf_insert( chan, *(T *)prods`first.elem );  // do waiting producer work
    216225        wake_one( prods );
    217226    }
     
    224233    operations++;
    225234    #endif
    226 
    227     ZeroSize: if ( size == 0 && !prods`isEmpty ) {
    228         if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
    229         __prods_handoff( chan, retval );
    230         unlock( mutex_lock );
    231         return true;
    232     }
    233 
    234235    if ( count == 0 ) { unlock( mutex_lock ); return false; }
    235 
    236236    __do_remove( chan, retval );
    237237    unlock( mutex_lock );
     
    244244static inline [T, bool] try_remove( channel(T) & chan ) {
    245245    T retval;
    246     bool success = __internal_try_remove( chan, retval );
    247     return [ retval, success ];
    248 }
    249 
    250 static inline T try_remove( channel(T) & chan ) {
     246    return [ retval, __internal_try_remove( chan, retval ) ];
     247}
     248
     249static inline T try_remove( channel(T) & chan, T elem ) {
    251250    T retval;
    252251    __internal_try_remove( chan, retval );
     
    256255// handles closed case of insert routine
    257256static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
    258     channel_closed except{ &channel_closed_vt, 0p, &chan };
     257    channel_closed except{&channel_closed_vt, 0p, &chan };
    259258    throwResume except; // throw resumption
    260259    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
     
    280279
    281280    // have to check for the zero size channel case
    282     ZeroSize: if ( size == 0 && !prods`isEmpty ) {
    283         if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
    284         __prods_handoff( chan, retval );
     281    if ( size == 0 && !prods`isEmpty ) {
     282        memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
     283        wake_one( prods );
    285284        unlock( mutex_lock );
    286285        return retval;
     
    288287
    289288    // wait if buffer is empty, work will be completed by someone else
    290     if ( count == 0 ) {
     289    if (count == 0) {
    291290        #ifdef CHAN_STATS
    292291        blocks++;
     
    300299    // Remove from buffer
    301300    __do_remove( chan, retval );
     301
    302302    unlock( mutex_lock );
    303303    return retval;
    304304}
    305 
    306 ///////////////////////////////////////////////////////////////////////////////////////////
    307 // The following is support for waituntil (select) statements
    308 ///////////////////////////////////////////////////////////////////////////////////////////
    309 static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
    310     if ( !node`isListed && !node.park_counter ) return false; // handle special OR case
    311     lock( mutex_lock );
    312     if ( node`isListed ) { // op wasn't performed
    313         #ifdef CHAN_STATS
    314         operations--;
    315         #endif
    316         remove( node );
    317         unlock( mutex_lock );
    318         return false;
    319     }
    320     unlock( mutex_lock );
    321 
    322     // only return true when not special OR case, not exceptional calse and status is SAT
    323     return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT;
    324 }
    325 
    326 // type used by select statement to capture a chan read as the selected operation
    327 struct chan_read {
    328     channel(T) & chan;
    329     T & ret;
    330 };
    331 
    332 static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) {
    333     &cr.chan = &chan;
    334     &cr.ret = &ret;
    335 }
    336 static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; }
    337 
    338 static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) {
    339     __closed_remove( chan, ret );
    340     // if we get here then the insert succeeded
    341     __make_select_node_available( node );
    342 }
    343 
    344 static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) {
    345     // mutex(sout) sout | "register_read";
    346     lock( mutex_lock );
    347     node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
    348 
    349     #ifdef CHAN_STATS
    350     if ( !closed ) operations++;
    351     #endif
    352 
    353     // check if we can complete operation. If so race to establish winner in special OR case
    354     if ( !node.park_counter && ( count != 0 || !prods`isEmpty || unlikely(closed) ) ) {
    355         if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
    356            unlock( mutex_lock );
    357            return false;
    358         }
    359     }
    360 
    361     if ( unlikely(closed) ) {
    362         unlock( mutex_lock );
    363         __handle_select_closed_read( this, node );
    364         return true;
    365     }
    366 
    367     // have to check for the zero size channel case
    368     ZeroSize: if ( size == 0 && !prods`isEmpty ) {
    369         if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
    370         __prods_handoff( chan, ret );
    371         __set_avail_then_unlock( node, mutex_lock );
    372         return true;
    373     }
    374 
    375     // wait if buffer is empty, work will be completed by someone else
    376     if ( count == 0 ) {
    377         #ifdef CHAN_STATS
    378         blocks++;
    379         #endif
    380        
    381         insert_last( cons, node );
    382         unlock( mutex_lock );
    383         return false;
    384     }
    385 
    386     // Remove from buffer
    387     __do_remove( chan, ret );
    388     __set_avail_then_unlock( node, mutex_lock );
    389     return true;
    390 }
    391 static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
    392 static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
    393     if ( node.extra == 0p ) // check if woken up due to closed channel
    394         __closed_remove( chan, ret );
    395     // This is only reachable if not closed or closed exception was handled
    396     return true;
    397 }
    398 
    399 // type used by select statement to capture a chan write as the selected operation
    400 struct chan_write {
    401     channel(T) & chan;
    402     T elem;
    403 };
    404 
    405 static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) {
    406     &cw.chan = &chan;
    407     memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
    408 }
    409 static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; }
    410 
    411 static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) {
    412     __closed_insert( chan, elem );
    413     // if we get here then the insert succeeded
    414     __make_select_node_available( node );
    415 }
    416 
    417 static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) {
    418     // mutex(sout) sout | "register_write";
    419     lock( mutex_lock );
    420     node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
    421 
    422     #ifdef CHAN_STATS
    423     if ( !closed ) operations++;
    424     #endif
    425 
    426     // check if we can complete operation. If so race to establish winner in special OR case
    427     if ( !node.park_counter && ( count != size || !cons`isEmpty || unlikely(closed) ) ) {
    428         if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
    429            unlock( mutex_lock );
    430            return false;
    431         }
    432     }
    433 
    434     // if closed handle
    435     if ( unlikely(closed) ) {
    436         unlock( mutex_lock );
    437         __handle_select_closed_write( this, node );
    438         return true;
    439     }
    440 
    441     // handle blocked consumer case via handoff (buffer is implicitly empty)
    442     ConsEmpty: if ( !cons`isEmpty ) {
    443         if ( !__handle_waituntil_OR( cons ) ) {
    444             // mutex(sout) sout | "empty";
    445             break ConsEmpty;
    446         }
    447         // mutex(sout) sout | "signal";
    448         __cons_handoff( chan, elem );
    449         __set_avail_then_unlock( node, mutex_lock );
    450         return true;
    451     }
    452 
    453     // insert node in list if buffer is full, work will be completed by someone else
    454     if ( count == size ) {
    455         #ifdef CHAN_STATS
    456         blocks++;
    457         #endif
    458 
    459         insert_last( prods, node );
    460         unlock( mutex_lock );
    461         return false;
    462     } // if
    463 
    464     // otherwise carry out write either via normal insert
    465     __buf_insert( chan, elem );
    466     __set_avail_then_unlock( node, mutex_lock );
    467     return true;
    468 }
    469 static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
    470 
    471 static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
    472     if ( node.extra == 0p ) // check if woken up due to closed channel
    473         __closed_insert( chan, elem );
    474 
    475     // This is only reachable if not closed or closed exception was handled
    476     return true;
    477 }
    478 
    479 
    480305} // forall( T )
    481 
    482 
    483 
Note: See TracChangeset for help on using the changeset viewer.