Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    r5908fb4 ra45e21c  
    1 //
    2 // Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
    3 //
    4 // The contents of this file are covered under the licence agreement in the
    5 // file "LICENCE" distributed with Cforall.
    6 //
    7 // channel.hfa -- LIBCFATHREAD
    8 // Runtime locks that used with the runtime thread system.
    9 //
    10 // Author           : Colby Alexander Parsons
    11 // Created On       : Thu Jan 21 19:46:50 2022
    12 // Last Modified By :
    13 // Last Modified On :
    14 // Update Count     :
    15 //
    16 
    171#pragma once
    182
    193#include <locks.hfa>
    204#include <list.hfa>
    21 #include "select.hfa"
     5#include <mutex_stmt.hfa>
     6
     7// link field used for threads waiting on channel
     8struct wait_link {
     9    // used to put wait_link on a dl queue
     10    inline dlink(wait_link);
     11
     12    // waiting thread
     13    struct thread$ * t;
     14
     15    // shadow field
     16    void * elem;
     17};
     18P9_EMBEDDED( wait_link, dlink(wait_link) )
     19
     20static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
     21    this.t = t;
     22    this.elem = elem;
     23}
     24
     25// wake one thread from the list
     26static inline void wake_one( dlist( wait_link ) & queue ) {
     27    wait_link & popped = try_pop_front( queue );
     28    unpark( popped.t );
     29}
    2230
    2331// returns true if woken due to shutdown
    2432// blocks thread on list and releases passed lock
    25 static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
    26     select_node sn{ active_thread(), elem_ptr };
    27     insert_last( queue, sn );
     33static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) {
     34    wait_link w{ active_thread(), elem_ptr };
     35    insert_last( queue, w );
    2836    unlock( lock );
    2937    park();
    30     return sn.extra == 0p;
    31 }
    32 
    33 // Waituntil support (un)register_select helper routine
    34 // Sets select node avail if not special OR case and then unlocks
    35 static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
    36     if ( node.park_counter ) __make_select_node_available( node );
    37     unlock( mutex_lock );
     38    return w.elem == 0p;
    3839}
    3940
     
    5859    size_t size, front, back, count;
    5960    T * buffer;
    60     dlist( select_node ) prods, cons; // lists of blocked threads
    61     go_mutex mutex_lock;              // MX lock
    62     bool closed;                      // indicates channel close/open
     61    dlist( wait_link ) prods, cons; // lists of blocked threads
     62    go_mutex mutex_lock;            // MX lock
     63    bool closed;                    // indicates channel close/open
    6364    #ifdef CHAN_STATS
    6465    size_t blocks, operations;      // counts total ops and ops resulting in a blocked thd
     
    6970    size = _size;
    7071    front = back = count = 0;
    71     if ( size != 0 ) buffer = aalloc( size );
     72    buffer = aalloc( size );
    7273    prods{};
    7374    cons{};
     
    8687    #endif
    8788    verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
    88     if ( size != 0 ) delete( buffer );
    89 }
    90 static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }
    91 static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }
     89    delete( buffer );
     90}
     91static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
     92static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    9293static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
    9394static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
     
    101102    // flush waiting consumers and producers
    102103    while ( has_waiting_consumers( chan ) ) {
    103         if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
    104             break;  // if __handle_waituntil_OR returns false cons is empty so break
    105         cons`first.extra = 0p;
     104        cons`first.elem = 0p;
    106105        wake_one( cons );
    107106    }
    108107    while ( has_waiting_producers( chan ) ) {
    109         if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
    110             break;  // if __handle_waituntil_OR returns false prods is empty so break
    111         prods`first.extra = 0p;
     108        prods`first.elem = 0p;
    112109        wake_one( prods );
    113110    }
     
    117114static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
    118115
    119 // used to hand an element to a blocked consumer and signal it
    120 static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
    121     memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
    122     wake_one( cons );
    123 }
    124 
    125 // used to hand an element to a blocked producer and signal it
    126 static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
    127     memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
    128     wake_one( prods );
    129 }
    130 
    131116static inline void flush( channel(T) & chan, T elem ) with(chan) {
    132117    lock( mutex_lock );
    133118    while ( count == 0 && !cons`isEmpty ) {
    134         __cons_handoff( chan, elem );
     119        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     120        wake_one( cons );
    135121    }
    136122    unlock( mutex_lock );
     
    139125// handles buffer insert
    140126static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
    141     memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
     127    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    142128    count += 1;
    143129    back++;
     
    145131}
    146132
     133// does the buffer insert or hands elem directly to consumer if one is waiting
     134static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {
     135    if ( count == 0 && !cons`isEmpty ) {
     136        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     137        wake_one( cons );
     138    } else __buf_insert( chan, elem );
     139}
     140
    147141// needed to avoid an extra copy in closed case
    148142static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
     
    151145    operations++;
    152146    #endif
    153 
    154     ConsEmpty: if ( !cons`isEmpty ) {
    155         if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
    156         __cons_handoff( chan, elem );
    157         unlock( mutex_lock );
    158         return true;
    159     }
    160 
    161147    if ( count == size ) { unlock( mutex_lock ); return false; }
    162 
    163     __buf_insert( chan, elem );
     148    __do_insert( chan, elem );
    164149    unlock( mutex_lock );
    165150    return true;
     
    172157// handles closed case of insert routine
    173158static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
    174     channel_closed except{ &channel_closed_vt, &elem, &chan };
     159    channel_closed except{&channel_closed_vt, &elem, &chan };
    175160    throwResume except; // throw closed resumption
    176161    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
     
    197182    }
    198183
    199     // buffer count must be zero if cons are blocked (also handles zero-size case)
    200     ConsEmpty: if ( !cons`isEmpty ) {
    201         if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
    202         __cons_handoff( chan, elem );
     184    // have to check for the zero size channel case
     185    if ( size == 0 && !cons`isEmpty ) {
     186        memcpy(cons`first.elem, (void *)&elem, sizeof(T));
     187        wake_one( cons );
    203188        unlock( mutex_lock );
    204         return;
     189        return true;
    205190    }
    206191
     
    217202    } // if
    218203
    219     __buf_insert( chan, elem );
    220     unlock( mutex_lock );
     204    if ( count == 0 && !cons`isEmpty ) {
     205        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     206        wake_one( cons );
     207    } else __buf_insert( chan, elem );
     208   
     209    unlock( mutex_lock );
     210    return;
     211}
     212
     213// handles buffer remove
     214static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) {
     215    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     216    count -= 1;
     217    front = (front + 1) % size;
    221218}
    222219
    223220// does the buffer remove and potentially does waiting producer work
    224221static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
    225     memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
    226     count -= 1;
    227     front = (front + 1) % size;
     222    __buf_remove( chan, retval );
    228223    if (count == size - 1 && !prods`isEmpty ) {
    229         if ( !__handle_waituntil_OR( prods ) ) return;
    230         __buf_insert( chan, *(T *)prods`first.extra );  // do waiting producer work
     224        __buf_insert( chan, *(T *)prods`first.elem );  // do waiting producer work
    231225        wake_one( prods );
    232226    }
     
    239233    operations++;
    240234    #endif
    241 
    242     ZeroSize: if ( size == 0 && !prods`isEmpty ) {
    243         if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
    244         __prods_handoff( chan, retval );
    245         unlock( mutex_lock );
    246         return true;
    247     }
    248 
    249235    if ( count == 0 ) { unlock( mutex_lock ); return false; }
    250 
    251236    __do_remove( chan, retval );
    252237    unlock( mutex_lock );
     
    259244static inline [T, bool] try_remove( channel(T) & chan ) {
    260245    T retval;
    261     bool success = __internal_try_remove( chan, retval );
    262     return [ retval, success ];
    263 }
    264 
    265 static inline T try_remove( channel(T) & chan ) {
     246    return [ retval, __internal_try_remove( chan, retval ) ];
     247}
     248
     249static inline T try_remove( channel(T) & chan, T elem ) {
    266250    T retval;
    267251    __internal_try_remove( chan, retval );
     
    271255// handles closed case of insert routine
    272256static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
    273     channel_closed except{ &channel_closed_vt, 0p, &chan };
     257    channel_closed except{&channel_closed_vt, 0p, &chan };
    274258    throwResume except; // throw resumption
    275259    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
     
    295279
    296280    // have to check for the zero size channel case
    297     ZeroSize: if ( size == 0 && !prods`isEmpty ) {
    298         if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
    299         __prods_handoff( chan, retval );
     281    if ( size == 0 && !prods`isEmpty ) {
     282        memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
     283        wake_one( prods );
    300284        unlock( mutex_lock );
    301285        return retval;
     
    303287
    304288    // wait if buffer is empty, work will be completed by someone else
    305     if ( count == 0 ) {
     289    if (count == 0) {
    306290        #ifdef CHAN_STATS
    307291        blocks++;
     
    315299    // Remove from buffer
    316300    __do_remove( chan, retval );
     301
    317302    unlock( mutex_lock );
    318303    return retval;
    319304}
    320 
    321 ///////////////////////////////////////////////////////////////////////////////////////////
    322 // The following is support for waituntil (select) statements
    323 ///////////////////////////////////////////////////////////////////////////////////////////
    324 static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
    325     // if ( !node`isListed && !node.park_counter ) return false; // handle special OR case C_TODO: try adding this back
    326     lock( mutex_lock );
    327     if ( node`isListed ) { // op wasn't performed
    328         #ifdef CHAN_STATS
    329         operations--;
    330         #endif
    331         remove( node );
    332         unlock( mutex_lock );
    333         return false;
    334     }
    335     unlock( mutex_lock );
    336 
    337     // only return true when not special OR case, not exceptional calse and status is SAT
    338     return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT;
    339 }
    340 
    341 // type used by select statement to capture a chan read as the selected operation
    342 struct chan_read {
    343     T & ret;
    344     channel(T) & chan;
    345 };
    346 
    347 static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) {
    348     &cr.chan = &chan;
    349     &cr.ret = &ret;
    350 }
    351 static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; }
    352 
    353 static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) {
    354     __closed_remove( chan, ret );
    355     // if we get here then the insert succeeded
    356     __make_select_node_available( node );
    357 }
    358 
    359 static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) {
    360     lock( mutex_lock );
    361     node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
    362 
    363     #ifdef CHAN_STATS
    364     if ( !closed ) operations++;
    365     #endif
    366 
    367     if ( !node.park_counter ) {
    368         // are we special case OR and front of cons is also special case OR
    369         if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {
    370             if ( !__make_select_node_pending( node ) ) {
    371                 unlock( mutex_lock );
    372                 return false;
    373             }
    374            
    375             if ( __handle_waituntil_OR( prods ) ) {
    376                 __prods_handoff( chan, ret );
    377                 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
    378                 unlock( mutex_lock );
    379                 return true;
    380             }
    381             __make_select_node_unsat( node );
    382         }
    383         // check if we can complete operation. If so race to establish winner in special OR case
    384         if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {
    385             if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
    386                 unlock( mutex_lock );
    387                 return false;
    388             }
    389         }
    390     }
    391 
    392     if ( unlikely(closed) ) {
    393         unlock( mutex_lock );
    394         __handle_select_closed_read( this, node );
    395         return true;
    396     }
    397 
    398     // have to check for the zero size channel case
    399     ZeroSize: if ( size == 0 && !prods`isEmpty ) {
    400         if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
    401         __prods_handoff( chan, ret );
    402         __set_avail_then_unlock( node, mutex_lock );
    403         return true;
    404     }
    405 
    406     // wait if buffer is empty, work will be completed by someone else
    407     if ( count == 0 ) {
    408         #ifdef CHAN_STATS
    409         blocks++;
    410         #endif
    411        
    412         insert_last( cons, node );
    413         unlock( mutex_lock );
    414         return false;
    415     }
    416 
    417     // Remove from buffer
    418     __do_remove( chan, ret );
    419     __set_avail_then_unlock( node, mutex_lock );
    420     return true;
    421 }
    422 static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
    423 static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
    424     if ( node.extra == 0p ) // check if woken up due to closed channel
    425         __closed_remove( chan, ret );
    426     // This is only reachable if not closed or closed exception was handled
    427     return true;
    428 }
    429 
    430 // type used by select statement to capture a chan write as the selected operation
    431 struct chan_write {
    432     T elem;
    433     channel(T) & chan;
    434 };
    435 
    436 static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) {
    437     &cw.chan = &chan;
    438     memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
    439 }
    440 static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; }
    441 
    442 static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) {
    443     __closed_insert( chan, elem );
    444     // if we get here then the insert succeeded
    445     __make_select_node_available( node );
    446 }
    447 
    448 static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) {
    449     lock( mutex_lock );
    450     node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
    451 
    452     #ifdef CHAN_STATS
    453     if ( !closed ) operations++;
    454     #endif
    455 
    456     // special OR case handling
    457     if ( !node.park_counter ) {
    458         // are we special case OR and front of cons is also special case OR
    459         if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {
    460             if ( !__make_select_node_pending( node ) ) {
    461                 unlock( mutex_lock );
    462                 return false;
    463             }
    464            
    465             if ( __handle_waituntil_OR( cons ) ) {
    466                 __cons_handoff( chan, elem );
    467                 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
    468                 unlock( mutex_lock );
    469                 return true;
    470             }
    471             __make_select_node_unsat( node );
    472         }
    473         // check if we can complete operation. If so race to establish winner in special OR case
    474         if ( count != size || !cons`isEmpty || unlikely(closed) ) {
    475             if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
    476                 unlock( mutex_lock );
    477                 return false;
    478             }
    479         }
    480     }
    481 
    482     // if closed handle
    483     if ( unlikely(closed) ) {
    484         unlock( mutex_lock );
    485         __handle_select_closed_write( this, node );
    486         return true;
    487     }
    488 
    489     // handle blocked consumer case via handoff (buffer is implicitly empty)
    490     ConsEmpty: if ( !cons`isEmpty ) {
    491         if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
    492         __cons_handoff( chan, elem );
    493         __set_avail_then_unlock( node, mutex_lock );
    494         return true;
    495     }
    496 
    497     // insert node in list if buffer is full, work will be completed by someone else
    498     if ( count == size ) {
    499         #ifdef CHAN_STATS
    500         blocks++;
    501         #endif
    502 
    503         insert_last( prods, node );
    504         unlock( mutex_lock );
    505         return false;
    506     } // if
    507 
    508     // otherwise carry out write either via normal insert
    509     __buf_insert( chan, elem );
    510     __set_avail_then_unlock( node, mutex_lock );
    511     return true;
    512 }
    513 static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
    514 
    515 static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
    516     if ( node.extra == 0p ) // check if woken up due to closed channel
    517         __closed_insert( chan, elem );
    518 
    519     // This is only reachable if not closed or closed exception was handled
    520     return true;
    521 }
    522 
    523305} // forall( T )
    524 
    525 
Note: See TracChangeset for help on using the changeset viewer.