Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    r88b49bb ra45e21c  
    1 //
    2 // Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
    3 //
    4 // The contents of this file are covered under the licence agreement in the
    5 // file "LICENCE" distributed with Cforall.
    6 //
    7 // channel.hfa -- LIBCFATHREAD
    8 // Runtime locks that used with the runtime thread system.
    9 //
    10 // Author           : Colby Alexander Parsons
    11 // Created On       : Thu Jan 21 19:46:50 2022
    12 // Last Modified By :
    13 // Last Modified On :
    14 // Update Count     :
    15 //
    16 
    171#pragma once
    182
    193#include <locks.hfa>
    204#include <list.hfa>
    21 #include "select.hfa"
     5#include <mutex_stmt.hfa>
     6
     7// link field used for threads waiting on channel
     8struct wait_link {
     9    // used to put wait_link on a dl queue
     10    inline dlink(wait_link);
     11
     12    // waiting thread
     13    struct thread$ * t;
     14
     15    // shadow field
     16    void * elem;
     17};
     18P9_EMBEDDED( wait_link, dlink(wait_link) )
     19
     20static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
     21    this.t = t;
     22    this.elem = elem;
     23}
     24
     25// wake one thread from the list
     26static inline void wake_one( dlist( wait_link ) & queue ) {
     27    wait_link & popped = try_pop_front( queue );
     28    unpark( popped.t );
     29}
    2230
    2331// returns true if woken due to shutdown
    2432// blocks thread on list and releases passed lock
    25 static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
    26     select_node sn{ active_thread(), elem_ptr };
    27     insert_last( queue, sn );
     33static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) {
     34    wait_link w{ active_thread(), elem_ptr };
     35    insert_last( queue, w );
    2836    unlock( lock );
    2937    park();
    30     return sn.extra == 0p;
    31 }
    32 
    33 // Waituntil support (un)register_select helper routine
    34 // Sets select node avail if not special OR case and then unlocks
    35 static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
    36     if ( node.park_counter ) __make_select_node_available( node );
    37     unlock( mutex_lock );
     38    return w.elem == 0p;
    3839}
    3940
     
    5859    size_t size, front, back, count;
    5960    T * buffer;
    60     dlist( select_node ) prods, cons; // lists of blocked threads
    61     go_mutex mutex_lock;              // MX lock
    62     bool closed;                      // indicates channel close/open
    63     #ifdef CHAN_STATS
    64     size_t p_blocks, p_ops, c_blocks, c_ops;      // counts total ops and ops resulting in a blocked thd
     61    dlist( wait_link ) prods, cons; // lists of blocked threads
     62    go_mutex mutex_lock;            // MX lock
     63    bool closed;                    // indicates channel close/open
     64    #ifdef CHAN_STATS
     65    size_t blocks, operations;      // counts total ops and ops resulting in a blocked thd
    6566    #endif
    6667};
     
    6970    size = _size;
    7071    front = back = count = 0;
    71     if ( size != 0 ) buffer = aalloc( size );
     72    buffer = aalloc( size );
    7273    prods{};
    7374    cons{};
     
    7576    closed = false;
    7677    #ifdef CHAN_STATS
    77     p_blocks = 0;
    78     p_ops = 0;
    79     c_blocks = 0;
    80     c_ops = 0;
     78    blocks = 0;
     79    operations = 0;
    8180    #endif
    8281}
     
    8584static inline void ^?{}( channel(T) &c ) with(c) {
    8685    #ifdef CHAN_STATS
    87     printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100);
    88     printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100);
    89     printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100);
    90     #endif
    91     verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || cons`isEmpty && prods`isEmpty,
    92         "Attempted to delete channel with waiting threads (Deadlock).\n" );
    93     if ( size != 0 ) delete( buffer );
    94 }
    95 static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }
    96 static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }
     86    printf("Channel %p Blocks: %lu, Operations: %lu, %.2f%% of ops blocked\n", &c, blocks, operations, ((double)blocks)/operations * 100);
     87    #endif
     88    verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
     89    delete( buffer );
     90}
     91static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
     92static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    9793static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
    9894static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
     
    106102    // flush waiting consumers and producers
    107103    while ( has_waiting_consumers( chan ) ) {
    108         if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
    109             break;  // if __handle_waituntil_OR returns false cons is empty so break
    110         cons`first.extra = 0p;
     104        cons`first.elem = 0p;
    111105        wake_one( cons );
    112106    }
    113107    while ( has_waiting_producers( chan ) ) {
    114         if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
    115             break;  // if __handle_waituntil_OR returns false prods is empty so break
    116         prods`first.extra = 0p;
     108        prods`first.elem = 0p;
    117109        wake_one( prods );
    118110    }
     
    122114static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
    123115
    124 // used to hand an element to a blocked consumer and signal it
    125 static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
    126     memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
    127     wake_one( cons );
    128 }
    129 
    130 // used to hand an element to a blocked producer and signal it
    131 static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
    132     memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
    133     wake_one( prods );
    134 }
    135 
    136116static inline void flush( channel(T) & chan, T elem ) with(chan) {
    137117    lock( mutex_lock );
    138118    while ( count == 0 && !cons`isEmpty ) {
    139         __cons_handoff( chan, elem );
     119        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     120        wake_one( cons );
    140121    }
    141122    unlock( mutex_lock );
     
    144125// handles buffer insert
    145126static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
    146     memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
     127    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    147128    count += 1;
    148129    back++;
     
    150131}
    151132
     133// does the buffer insert or hands elem directly to consumer if one is waiting
     134static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {
     135    if ( count == 0 && !cons`isEmpty ) {
     136        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     137        wake_one( cons );
     138    } else __buf_insert( chan, elem );
     139}
     140
    152141// needed to avoid an extra copy in closed case
    153142static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
    154143    lock( mutex_lock );
    155144    #ifdef CHAN_STATS
    156     p_ops++;
    157     #endif
    158 
    159     ConsEmpty: if ( !cons`isEmpty ) {
    160         if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
    161         __cons_handoff( chan, elem );
    162         unlock( mutex_lock );
    163         return true;
    164     }
    165 
     145    operations++;
     146    #endif
    166147    if ( count == size ) { unlock( mutex_lock ); return false; }
    167 
    168     __buf_insert( chan, elem );
     148    __do_insert( chan, elem );
    169149    unlock( mutex_lock );
    170150    return true;
     
    177157// handles closed case of insert routine
    178158static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
    179     channel_closed except{ &channel_closed_vt, &elem, &chan };
     159    channel_closed except{&channel_closed_vt, &elem, &chan };
    180160    throwResume except; // throw closed resumption
    181161    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
     
    192172
    193173    #ifdef CHAN_STATS
    194     if ( !closed ) p_ops++;
     174    if ( !closed ) operations++;
    195175    #endif
    196176
     
    202182    }
    203183
    204     // buffer count must be zero if cons are blocked (also handles zero-size case)
    205     ConsEmpty: if ( !cons`isEmpty ) {
    206         if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
    207         __cons_handoff( chan, elem );
     184    // have to check for the zero size channel case
     185    if ( size == 0 && !cons`isEmpty ) {
     186        memcpy(cons`first.elem, (void *)&elem, sizeof(T));
     187        wake_one( cons );
    208188        unlock( mutex_lock );
    209         return;
     189        return true;
    210190    }
    211191
     
    213193    if ( count == size ) {
    214194        #ifdef CHAN_STATS
    215         p_blocks++;
     195        blocks++;
    216196        #endif
    217197
     
    222202    } // if
    223203
    224     __buf_insert( chan, elem );
    225     unlock( mutex_lock );
     204    if ( count == 0 && !cons`isEmpty ) {
     205        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     206        wake_one( cons );
     207    } else __buf_insert( chan, elem );
     208   
     209    unlock( mutex_lock );
     210    return;
     211}
     212
     213// handles buffer remove
     214static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) {
     215    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     216    count -= 1;
     217    front = (front + 1) % size;
    226218}
    227219
    228220// does the buffer remove and potentially does waiting producer work
    229221static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
    230     memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
    231     count -= 1;
    232     front = (front + 1) % size;
     222    __buf_remove( chan, retval );
    233223    if (count == size - 1 && !prods`isEmpty ) {
    234         if ( !__handle_waituntil_OR( prods ) ) return;
    235         __buf_insert( chan, *(T *)prods`first.extra );  // do waiting producer work
     224        __buf_insert( chan, *(T *)prods`first.elem );  // do waiting producer work
    236225        wake_one( prods );
    237226    }
     
    242231    lock( mutex_lock );
    243232    #ifdef CHAN_STATS
    244     c_ops++;
    245     #endif
    246 
    247     ZeroSize: if ( size == 0 && !prods`isEmpty ) {
    248         if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
    249         __prods_handoff( chan, retval );
    250         unlock( mutex_lock );
    251         return true;
    252     }
    253 
     233    operations++;
     234    #endif
    254235    if ( count == 0 ) { unlock( mutex_lock ); return false; }
    255 
    256236    __do_remove( chan, retval );
    257237    unlock( mutex_lock );
     
    264244static inline [T, bool] try_remove( channel(T) & chan ) {
    265245    T retval;
    266     bool success = __internal_try_remove( chan, retval );
    267     return [ retval, success ];
    268 }
    269 
    270 static inline T try_remove( channel(T) & chan ) {
     246    return [ retval, __internal_try_remove( chan, retval ) ];
     247}
     248
     249static inline T try_remove( channel(T) & chan, T elem ) {
    271250    T retval;
    272251    __internal_try_remove( chan, retval );
     
    276255// handles closed case of insert routine
    277256static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
    278     channel_closed except{ &channel_closed_vt, 0p, &chan };
     257    channel_closed except{&channel_closed_vt, 0p, &chan };
    279258    throwResume except; // throw resumption
    280259    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
     
    290269
    291270    #ifdef CHAN_STATS
    292     if ( !closed ) c_ops++;
     271    if ( !closed ) operations++;
    293272    #endif
    294273
     
    300279
    301280    // have to check for the zero size channel case
    302     ZeroSize: if ( size == 0 && !prods`isEmpty ) {
    303         if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
    304         __prods_handoff( chan, retval );
     281    if ( size == 0 && !prods`isEmpty ) {
     282        memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
     283        wake_one( prods );
    305284        unlock( mutex_lock );
    306285        return retval;
     
    308287
    309288    // wait if buffer is empty, work will be completed by someone else
    310     if ( count == 0 ) {
     289    if (count == 0) {
    311290        #ifdef CHAN_STATS
    312         c_blocks++;
     291        blocks++;
    313292        #endif
    314293        // check for if woken due to close
     
    320299    // Remove from buffer
    321300    __do_remove( chan, retval );
     301
    322302    unlock( mutex_lock );
    323303    return retval;
    324304}
    325 
    326 ///////////////////////////////////////////////////////////////////////////////////////////
    327 // The following is support for waituntil (select) statements
    328 ///////////////////////////////////////////////////////////////////////////////////////////
    329 static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
    330     if ( !node`isListed && !node.park_counter ) return false; // handle special OR case
    331     lock( mutex_lock );
    332     if ( node`isListed ) { // op wasn't performed
    333         remove( node );
    334         unlock( mutex_lock );
    335         return false;
    336     }
    337     unlock( mutex_lock );
    338 
    339     // only return true when not special OR case, not exceptional calse and status is SAT
    340     return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT;
    341 }
    342 
    343 // type used by select statement to capture a chan read as the selected operation
    344 struct chan_read {
    345     T & ret;
    346     channel(T) & chan;
    347 };
    348 
    349 static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) {
    350     &cr.chan = &chan;
    351     &cr.ret = &ret;
    352 }
    353 static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; }
    354 
    355 static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) {
    356     __closed_remove( chan, ret );
    357     // if we get here then the insert succeeded
    358     __make_select_node_available( node );
    359 }
    360 
    361 static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) {
    362     lock( mutex_lock );
    363     node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
    364 
    365     #ifdef CHAN_STATS
    366     if ( !closed ) c_ops++;
    367     #endif
    368 
    369     if ( !node.park_counter ) {
    370         // are we special case OR and front of cons is also special case OR
    371         if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {
    372             if ( !__make_select_node_pending( node ) ) {
    373                 unlock( mutex_lock );
    374                 return false;
    375             }
    376            
    377             if ( __handle_waituntil_OR( prods ) ) {
    378                 __prods_handoff( chan, ret );
    379                 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
    380                 unlock( mutex_lock );
    381                 return true;
    382             }
    383             __make_select_node_unsat( node );
    384         }
    385         // check if we can complete operation. If so race to establish winner in special OR case
    386         if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {
    387             if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
    388                 unlock( mutex_lock );
    389                 return false;
    390             }
    391         }
    392     }
    393 
    394     if ( unlikely(closed) ) {
    395         unlock( mutex_lock );
    396         __handle_select_closed_read( this, node );
    397         return true;
    398     }
    399 
    400     // have to check for the zero size channel case
    401     ZeroSize: if ( size == 0 && !prods`isEmpty ) {
    402         if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
    403         __prods_handoff( chan, ret );
    404         __set_avail_then_unlock( node, mutex_lock );
    405         return true;
    406     }
    407 
    408     // wait if buffer is empty, work will be completed by someone else
    409     if ( count == 0 ) {
    410         #ifdef CHAN_STATS
    411         c_blocks++;
    412         #endif
    413        
    414         insert_last( cons, node );
    415         unlock( mutex_lock );
    416         return false;
    417     }
    418 
    419     // Remove from buffer
    420     __do_remove( chan, ret );
    421     __set_avail_then_unlock( node, mutex_lock );
    422     return true;
    423 }
    424 static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
    425 static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
    426     if ( node.extra == 0p ) // check if woken up due to closed channel
    427         __closed_remove( chan, ret );
    428     // This is only reachable if not closed or closed exception was handled
    429     return true;
    430 }
    431 
    432 // type used by select statement to capture a chan write as the selected operation
    433 struct chan_write {
    434     T elem;
    435     channel(T) & chan;
    436 };
    437 
    438 static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) {
    439     &cw.chan = &chan;
    440     memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
    441 }
    442 static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; }
    443 
    444 static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) {
    445     __closed_insert( chan, elem );
    446     // if we get here then the insert succeeded
    447     __make_select_node_available( node );
    448 }
    449 
    450 static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) {
    451     lock( mutex_lock );
    452     node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
    453 
    454     #ifdef CHAN_STATS
    455     if ( !closed ) p_ops++;
    456     #endif
    457 
    458     // special OR case handling
    459     if ( !node.park_counter ) {
    460         // are we special case OR and front of cons is also special case OR
    461         if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {
    462             if ( !__make_select_node_pending( node ) ) {
    463                 unlock( mutex_lock );
    464                 return false;
    465             }
    466            
    467             if ( __handle_waituntil_OR( cons ) ) {
    468                 __cons_handoff( chan, elem );
    469                 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
    470                 unlock( mutex_lock );
    471                 return true;
    472             }
    473             __make_select_node_unsat( node );
    474         }
    475         // check if we can complete operation. If so race to establish winner in special OR case
    476         if ( count != size || !cons`isEmpty || unlikely(closed) ) {
    477             if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
    478                 unlock( mutex_lock );
    479                 return false;
    480             }
    481         }
    482     }
    483 
    484     // if closed handle
    485     if ( unlikely(closed) ) {
    486         unlock( mutex_lock );
    487         __handle_select_closed_write( this, node );
    488         return true;
    489     }
    490 
    491     // handle blocked consumer case via handoff (buffer is implicitly empty)
    492     ConsEmpty: if ( !cons`isEmpty ) {
    493         if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
    494         __cons_handoff( chan, elem );
    495         __set_avail_then_unlock( node, mutex_lock );
    496         return true;
    497     }
    498 
    499     // insert node in list if buffer is full, work will be completed by someone else
    500     if ( count == size ) {
    501         #ifdef CHAN_STATS
    502         p_blocks++;
    503         #endif
    504 
    505         insert_last( prods, node );
    506         unlock( mutex_lock );
    507         return false;
    508     } // if
    509 
    510     // otherwise carry out write either via normal insert
    511     __buf_insert( chan, elem );
    512     __set_avail_then_unlock( node, mutex_lock );
    513     return true;
    514 }
    515 static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
    516 
    517 static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
    518     if ( node.extra == 0p ) // check if woken up due to closed channel
    519         __closed_insert( chan, elem );
    520 
    521     // This is only reachable if not closed or closed exception was handled
    522     return true;
    523 }
    524 
    525305} // forall( T )
    526 
    527 
Note: See TracChangeset for help on using the changeset viewer.