Ignore:
Timestamp:
May 1, 2023, 4:00:06 PM (16 months ago)
Author:
caparsons <caparson@…>
Branches:
ADT, ast-experimental, master
Children:
73bf7ddc
Parents:
bb7422a
Message:

some cleanup and a bunch of changes to support waituntil statement

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    rbb7422a rbeeff61e  
    44#include <list.hfa>
    55#include <mutex_stmt.hfa>
    6 
    7 // link field used for threads waiting on channel
    8 struct wait_link {
    9     // used to put wait_link on a dl queue
    10     inline dlink(wait_link);
    11 
    12     // waiting thread
    13     struct thread$ * t;
    14 
    15     // shadow field
    16     void * elem;
    17 };
    18 P9_EMBEDDED( wait_link, dlink(wait_link) )
    19 
    20 static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
    21     this.t = t;
    22     this.elem = elem;
    23 }
    24 
    25 // wake one thread from the list
    26 static inline void wake_one( dlist( wait_link ) & queue ) {
    27     wait_link & popped = try_pop_front( queue );
    28     unpark( popped.t );
    29 }
     6#include "select.hfa"
    307
    318// returns true if woken due to shutdown
    329// blocks thread on list and releases passed lock
    33 static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) {
    34     wait_link w{ active_thread(), elem_ptr };
    35     insert_last( queue, w );
     10static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
     11    select_node sn{ active_thread(), elem_ptr };
     12    insert_last( queue, sn );
    3613    unlock( lock );
    3714    park();
    38     return w.elem == 0p;
     15    return sn.extra == 0p;
     16}
     17
     18// Waituntil support (un)register_select helper routine
     19// Sets select node avail if not special OR case and then unlocks
     20static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
     21    if ( node.park_counter ) __make_select_node_available( node );
     22    unlock( mutex_lock );
    3923}
    4024
     
    5943    size_t size, front, back, count;
    6044    T * buffer;
    61     dlist( wait_link ) prods, cons; // lists of blocked threads
     45    dlist( select_node ) prods, cons; // lists of blocked threads
    6246    go_mutex mutex_lock;            // MX lock
    6347    bool closed;                    // indicates channel close/open
     
    7054    size = _size;
    7155    front = back = count = 0;
    72     buffer = aalloc( size );
     56    if ( size != 0 ) buffer = aalloc( size );
    7357    prods{};
    7458    cons{};
     
    8771    #endif
    8872    verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
    89     delete( buffer );
     73    if ( size != 0 ) delete( buffer );
    9074}
    9175static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
     
    10286    // flush waiting consumers and producers
    10387    while ( has_waiting_consumers( chan ) ) {
    104         cons`first.elem = 0p;
     88        if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
     89            break;  // if __handle_waituntil_OR returns false cons is empty so break
     90        cons`first.extra = 0p;
    10591        wake_one( cons );
    10692    }
    10793    while ( has_waiting_producers( chan ) ) {
    108         prods`first.elem = 0p;
     94        if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
     95            break;  // if __handle_waituntil_OR returns false prods is empty so break
     96        prods`first.extra = 0p;
    10997        wake_one( prods );
    11098    }
     
    114102static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
    115103
     104// used to hand an element to a blocked consumer and signal it
     105static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
     106    memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
     107    wake_one( cons );
     108}
     109
     110// used to hand an element to a blocked producer and signal it
     111static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
     112    memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
     113    wake_one( prods );
     114}
     115
    116116static inline void flush( channel(T) & chan, T elem ) with(chan) {
    117117    lock( mutex_lock );
    118118    while ( count == 0 && !cons`isEmpty ) {
    119         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    120         wake_one( cons );
     119        __cons_handoff( chan, elem );
    121120    }
    122121    unlock( mutex_lock );
     
    125124// handles buffer insert
    126125static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
    127     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
     126    memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
    128127    count += 1;
    129128    back++;
     
    131130}
    132131
    133 // does the buffer insert or hands elem directly to consumer if one is waiting
    134 static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {
    135     if ( count == 0 && !cons`isEmpty ) {
    136         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    137         wake_one( cons );
    138     } else __buf_insert( chan, elem );
    139 }
    140 
    141132// needed to avoid an extra copy in closed case
    142133static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
     
    145136    operations++;
    146137    #endif
     138
     139    ConsEmpty: if ( !cons`isEmpty ) {
     140        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
     141        __cons_handoff( chan, elem );
     142        unlock( mutex_lock );
     143        return true;
     144    }
     145
    147146    if ( count == size ) { unlock( mutex_lock ); return false; }
    148     __do_insert( chan, elem );
     147
     148    __buf_insert( chan, elem );
    149149    unlock( mutex_lock );
    150150    return true;
     
    157157// handles closed case of insert routine
    158158static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
    159     channel_closed except{&channel_closed_vt, &elem, &chan };
     159    channel_closed except{ &channel_closed_vt, &elem, &chan };
    160160    throwResume except; // throw closed resumption
    161161    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
     
    182182    }
    183183
    184     // have to check for the zero size channel case
    185     if ( size == 0 && !cons`isEmpty ) {
    186         memcpy(cons`first.elem, (void *)&elem, sizeof(T));
    187         wake_one( cons );
    188         unlock( mutex_lock );
    189         return true;
     184    // buffer count must be zero if cons are blocked (also handles zero-size case)
     185    ConsEmpty: if ( !cons`isEmpty ) {
     186        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
     187        __cons_handoff( chan, elem );
     188        unlock( mutex_lock );
     189        return;
    190190    }
    191191
     
    202202    } // if
    203203
    204     if ( count == 0 && !cons`isEmpty ) {
    205         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    206         wake_one( cons );
    207     } else __buf_insert( chan, elem );
    208    
    209     unlock( mutex_lock );
    210     return;
    211 }
    212 
    213 // handles buffer remove
    214 static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) {
    215     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     204    __buf_insert( chan, elem );
     205    unlock( mutex_lock );
     206}
     207
     208// does the buffer remove and potentially does waiting producer work
     209static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
     210    memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
    216211    count -= 1;
    217212    front = (front + 1) % size;
    218 }
    219 
    220 // does the buffer remove and potentially does waiting producer work
    221 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
    222     __buf_remove( chan, retval );
    223213    if (count == size - 1 && !prods`isEmpty ) {
    224         __buf_insert( chan, *(T *)prods`first.elem );  // do waiting producer work
     214        if ( !__handle_waituntil_OR( prods ) ) return;
     215        __buf_insert( chan, *(T *)prods`first.extra );  // do waiting producer work
    225216        wake_one( prods );
    226217    }
     
    233224    operations++;
    234225    #endif
     226
     227    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     228        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     229        __prods_handoff( chan, retval );
     230        unlock( mutex_lock );
     231        return true;
     232    }
     233
    235234    if ( count == 0 ) { unlock( mutex_lock ); return false; }
     235
    236236    __do_remove( chan, retval );
    237237    unlock( mutex_lock );
     
    244244static inline [T, bool] try_remove( channel(T) & chan ) {
    245245    T retval;
    246     return [ retval, __internal_try_remove( chan, retval ) ];
    247 }
    248 
    249 static inline T try_remove( channel(T) & chan, T elem ) {
     246    bool success = __internal_try_remove( chan, retval );
     247    return [ retval, success ];
     248}
     249
     250static inline T try_remove( channel(T) & chan ) {
    250251    T retval;
    251252    __internal_try_remove( chan, retval );
     
    255256// handles closed case of insert routine
    256257static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
    257     channel_closed except{&channel_closed_vt, 0p, &chan };
     258    channel_closed except{ &channel_closed_vt, 0p, &chan };
    258259    throwResume except; // throw resumption
    259260    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
     
    279280
    280281    // have to check for the zero size channel case
    281     if ( size == 0 && !prods`isEmpty ) {
    282         memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
    283         wake_one( prods );
     282    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     283        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     284        __prods_handoff( chan, retval );
    284285        unlock( mutex_lock );
    285286        return retval;
     
    287288
    288289    // wait if buffer is empty, work will be completed by someone else
    289     if (count == 0) {
     290    if ( count == 0 ) {
    290291        #ifdef CHAN_STATS
    291292        blocks++;
     
    299300    // Remove from buffer
    300301    __do_remove( chan, retval );
    301 
    302302    unlock( mutex_lock );
    303303    return retval;
    304304}
     305
     306///////////////////////////////////////////////////////////////////////////////////////////
     307// The following is support for waituntil (select) statements
     308///////////////////////////////////////////////////////////////////////////////////////////
     309static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
     310    if ( !node`isListed && !node.park_counter ) return false; // handle special OR case
     311    lock( mutex_lock );
     312    if ( node`isListed ) { // op wasn't performed
     313        #ifdef CHAN_STATS
     314        operations--;
     315        #endif
     316        remove( node );
     317        unlock( mutex_lock );
     318        return false;
     319    }
     320    unlock( mutex_lock );
     321
     322    // only return true when not special OR case, not exceptional calse and status is SAT
     323    return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT;
     324}
     325
     326// type used by select statement to capture a chan read as the selected operation
     327struct chan_read {
     328    channel(T) & chan;
     329    T & ret;
     330};
     331
     332static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) {
     333    &cr.chan = &chan;
     334    &cr.ret = &ret;
     335}
     336static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; }
     337
     338static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) {
     339    __closed_remove( chan, ret );
     340    // if we get here then the insert succeeded
     341    __make_select_node_available( node );
     342}
     343
     344static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) {
     345    // mutex(sout) sout | "register_read";
     346    lock( mutex_lock );
     347    node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
     348
     349    #ifdef CHAN_STATS
     350    if ( !closed ) operations++;
     351    #endif
     352
     353    // check if we can complete operation. If so race to establish winner in special OR case
     354    if ( !node.park_counter && ( count != 0 || !prods`isEmpty || unlikely(closed) ) ) {
     355        if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     356           unlock( mutex_lock );
     357           return false;
     358        }
     359    }
     360
     361    if ( unlikely(closed) ) {
     362        unlock( mutex_lock );
     363        __handle_select_closed_read( this, node );
     364        return true;
     365    }
     366
     367    // have to check for the zero size channel case
     368    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     369        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     370        __prods_handoff( chan, ret );
     371        __set_avail_then_unlock( node, mutex_lock );
     372        return true;
     373    }
     374
     375    // wait if buffer is empty, work will be completed by someone else
     376    if ( count == 0 ) {
     377        #ifdef CHAN_STATS
     378        blocks++;
     379        #endif
     380       
     381        insert_last( cons, node );
     382        unlock( mutex_lock );
     383        return false;
     384    }
     385
     386    // Remove from buffer
     387    __do_remove( chan, ret );
     388    __set_avail_then_unlock( node, mutex_lock );
     389    return true;
     390}
     391static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
     392static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
     393    if ( node.extra == 0p ) // check if woken up due to closed channel
     394        __closed_remove( chan, ret );
     395    // This is only reachable if not closed or closed exception was handled
     396    return true;
     397}
     398
     399// type used by select statement to capture a chan write as the selected operation
     400struct chan_write {
     401    channel(T) & chan;
     402    T elem;
     403};
     404
     405static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) {
     406    &cw.chan = &chan;
     407    memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
     408}
     409static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; }
     410
     411static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) {
     412    __closed_insert( chan, elem );
     413    // if we get here then the insert succeeded
     414    __make_select_node_available( node );
     415}
     416
     417static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) {
     418    // mutex(sout) sout | "register_write";
     419    lock( mutex_lock );
     420    node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
     421
     422    #ifdef CHAN_STATS
     423    if ( !closed ) operations++;
     424    #endif
     425
     426    // check if we can complete operation. If so race to establish winner in special OR case
     427    if ( !node.park_counter && ( count != size || !cons`isEmpty || unlikely(closed) ) ) {
     428        if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     429           unlock( mutex_lock );
     430           return false;
     431        }
     432    }
     433
     434    // if closed handle
     435    if ( unlikely(closed) ) {
     436        unlock( mutex_lock );
     437        __handle_select_closed_write( this, node );
     438        return true;
     439    }
     440
     441    // handle blocked consumer case via handoff (buffer is implicitly empty)
     442    ConsEmpty: if ( !cons`isEmpty ) {
     443        if ( !__handle_waituntil_OR( cons ) ) {
     444            // mutex(sout) sout | "empty";
     445            break ConsEmpty;
     446        }
     447        // mutex(sout) sout | "signal";
     448        __cons_handoff( chan, elem );
     449        __set_avail_then_unlock( node, mutex_lock );
     450        return true;
     451    }
     452
     453    // insert node in list if buffer is full, work will be completed by someone else
     454    if ( count == size ) {
     455        #ifdef CHAN_STATS
     456        blocks++;
     457        #endif
     458
     459        insert_last( prods, node );
     460        unlock( mutex_lock );
     461        return false;
     462    } // if
     463
     464    // otherwise carry out write either via normal insert
     465    __buf_insert( chan, elem );
     466    __set_avail_then_unlock( node, mutex_lock );
     467    return true;
     468}
     469static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
     470
     471static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
     472    if ( node.extra == 0p ) // check if woken up due to closed channel
     473        __closed_insert( chan, elem );
     474
     475    // This is only reachable if not closed or closed exception was handled
     476    return true;
     477}
     478
     479
    305480} // forall( T )
     481
     482
     483
Note: See TracChangeset for help on using the changeset viewer.