Ignore:
Timestamp:
May 17, 2023, 1:35:09 AM (2 years ago)
Author:
JiadaL <j82liang@…>
Branches:
ADT, master
Children:
f11010e
Parents:
6e4c44d (diff), 8db4708 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    r6e4c44d r3982384  
     1//
     2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
     3//
     4// The contents of this file are covered under the licence agreement in the
     5// file "LICENCE" distributed with Cforall.
     6//
     7// channel.hfa -- LIBCFATHREAD
     8// Runtime locks that used with the runtime thread system.
     9//
     10// Author           : Colby Alexander Parsons
     11// Created On       : Thu Jan 21 19:46:50 2022
     12// Last Modified By :
     13// Last Modified On :
     14// Update Count     :
     15//
     16
    117#pragma once
    218
    319#include <locks.hfa>
    420#include <list.hfa>
    5 #include <mutex_stmt.hfa>
    6 
    7 // link field used for threads waiting on channel
    8 struct wait_link {
    9     // used to put wait_link on a dl queue
    10     inline dlink(wait_link);
    11 
    12     // waiting thread
    13     struct thread$ * t;
    14 
    15     // shadow field
    16     void * elem;
    17 };
    18 P9_EMBEDDED( wait_link, dlink(wait_link) )
    19 
    20 static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
    21     this.t = t;
    22     this.elem = elem;
    23 }
    24 
    25 // wake one thread from the list
    26 static inline void wake_one( dlist( wait_link ) & queue ) {
    27     wait_link & popped = try_pop_front( queue );
    28     unpark( popped.t );
    29 }
     21#include "select.hfa"
    3022
    3123// returns true if woken due to shutdown
    3224// blocks thread on list and releases passed lock
    33 static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) {
    34     wait_link w{ active_thread(), elem_ptr };
    35     insert_last( queue, w );
     25static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
     26    select_node sn{ active_thread(), elem_ptr };
     27    insert_last( queue, sn );
    3628    unlock( lock );
    3729    park();
    38     return w.elem == 0p;
     30    return sn.extra == 0p;
     31}
     32
     33// Waituntil support (un)register_select helper routine
     34// Sets select node avail if not special OR case and then unlocks
     35static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
     36    if ( node.park_counter ) __make_select_node_available( node );
     37    unlock( mutex_lock );
    3938}
    4039
     
    5958    size_t size, front, back, count;
    6059    T * buffer;
    61     dlist( wait_link ) prods, cons; // lists of blocked threads
    62     go_mutex mutex_lock;            // MX lock
    63     bool closed;                    // indicates channel close/open
     60    dlist( select_node ) prods, cons; // lists of blocked threads
     61    go_mutex mutex_lock;              // MX lock
     62    bool closed;                      // indicates channel close/open
    6463    #ifdef CHAN_STATS
    6564    size_t blocks, operations;      // counts total ops and ops resulting in a blocked thd
     
    7069    size = _size;
    7170    front = back = count = 0;
    72     buffer = aalloc( size );
     71    if ( size != 0 ) buffer = aalloc( size );
    7372    prods{};
    7473    cons{};
     
    8786    #endif
    8887    verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
    89     delete( buffer );
    90 }
    91 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    92 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
     88    if ( size != 0 ) delete( buffer );
     89}
     90static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }
     91static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }
    9392static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
    9493static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
     
    102101    // flush waiting consumers and producers
    103102    while ( has_waiting_consumers( chan ) ) {
    104         cons`first.elem = 0p;
     103        if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
     104            break;  // if __handle_waituntil_OR returns false cons is empty so break
     105        cons`first.extra = 0p;
    105106        wake_one( cons );
    106107    }
    107108    while ( has_waiting_producers( chan ) ) {
    108         prods`first.elem = 0p;
     109        if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
     110            break;  // if __handle_waituntil_OR returns false prods is empty so break
     111        prods`first.extra = 0p;
    109112        wake_one( prods );
    110113    }
     
    114117static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
    115118
     119// used to hand an element to a blocked consumer and signal it
     120static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
     121    memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
     122    wake_one( cons );
     123}
     124
     125// used to hand an element to a blocked producer and signal it
     126static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
     127    memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
     128    wake_one( prods );
     129}
     130
    116131static inline void flush( channel(T) & chan, T elem ) with(chan) {
    117132    lock( mutex_lock );
    118133    while ( count == 0 && !cons`isEmpty ) {
    119         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    120         wake_one( cons );
     134        __cons_handoff( chan, elem );
    121135    }
    122136    unlock( mutex_lock );
     
    125139// handles buffer insert
    126140static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
    127     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
     141    memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
    128142    count += 1;
    129143    back++;
     
    131145}
    132146
    133 // does the buffer insert or hands elem directly to consumer if one is waiting
    134 static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {
    135     if ( count == 0 && !cons`isEmpty ) {
    136         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    137         wake_one( cons );
    138     } else __buf_insert( chan, elem );
    139 }
    140 
    141147// needed to avoid an extra copy in closed case
    142148static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
     
    145151    operations++;
    146152    #endif
     153
     154    ConsEmpty: if ( !cons`isEmpty ) {
     155        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
     156        __cons_handoff( chan, elem );
     157        unlock( mutex_lock );
     158        return true;
     159    }
     160
    147161    if ( count == size ) { unlock( mutex_lock ); return false; }
    148     __do_insert( chan, elem );
     162
     163    __buf_insert( chan, elem );
    149164    unlock( mutex_lock );
    150165    return true;
     
    157172// handles closed case of insert routine
    158173static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
    159     channel_closed except{&channel_closed_vt, &elem, &chan };
     174    channel_closed except{ &channel_closed_vt, &elem, &chan };
    160175    throwResume except; // throw closed resumption
    161176    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
     
    182197    }
    183198
    184     // have to check for the zero size channel case
    185     if ( size == 0 && !cons`isEmpty ) {
    186         memcpy(cons`first.elem, (void *)&elem, sizeof(T));
    187         wake_one( cons );
    188         unlock( mutex_lock );
    189         return true;
     199    // buffer count must be zero if cons are blocked (also handles zero-size case)
     200    ConsEmpty: if ( !cons`isEmpty ) {
     201        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
     202        __cons_handoff( chan, elem );
     203        unlock( mutex_lock );
     204        return;
    190205    }
    191206
     
    202217    } // if
    203218
    204     if ( count == 0 && !cons`isEmpty ) {
    205         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    206         wake_one( cons );
    207     } else __buf_insert( chan, elem );
    208    
    209     unlock( mutex_lock );
    210     return;
    211 }
    212 
    213 // handles buffer remove
    214 static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) {
    215     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     219    __buf_insert( chan, elem );
     220    unlock( mutex_lock );
     221}
     222
     223// does the buffer remove and potentially does waiting producer work
     224static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
     225    memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
    216226    count -= 1;
    217227    front = (front + 1) % size;
    218 }
    219 
    220 // does the buffer remove and potentially does waiting producer work
    221 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
    222     __buf_remove( chan, retval );
    223228    if (count == size - 1 && !prods`isEmpty ) {
    224         __buf_insert( chan, *(T *)prods`first.elem );  // do waiting producer work
     229        if ( !__handle_waituntil_OR( prods ) ) return;
     230        __buf_insert( chan, *(T *)prods`first.extra );  // do waiting producer work
    225231        wake_one( prods );
    226232    }
     
    233239    operations++;
    234240    #endif
     241
     242    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     243        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     244        __prods_handoff( chan, retval );
     245        unlock( mutex_lock );
     246        return true;
     247    }
     248
    235249    if ( count == 0 ) { unlock( mutex_lock ); return false; }
     250
    236251    __do_remove( chan, retval );
    237252    unlock( mutex_lock );
     
    244259static inline [T, bool] try_remove( channel(T) & chan ) {
    245260    T retval;
    246     return [ retval, __internal_try_remove( chan, retval ) ];
    247 }
    248 
    249 static inline T try_remove( channel(T) & chan, T elem ) {
     261    bool success = __internal_try_remove( chan, retval );
     262    return [ retval, success ];
     263}
     264
     265static inline T try_remove( channel(T) & chan ) {
    250266    T retval;
    251267    __internal_try_remove( chan, retval );
     
    255271// handles closed case of insert routine
    256272static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
    257     channel_closed except{&channel_closed_vt, 0p, &chan };
     273    channel_closed except{ &channel_closed_vt, 0p, &chan };
    258274    throwResume except; // throw resumption
    259275    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
     
    279295
    280296    // have to check for the zero size channel case
    281     if ( size == 0 && !prods`isEmpty ) {
    282         memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
    283         wake_one( prods );
     297    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     298        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     299        __prods_handoff( chan, retval );
    284300        unlock( mutex_lock );
    285301        return retval;
     
    287303
    288304    // wait if buffer is empty, work will be completed by someone else
    289     if (count == 0) {
     305    if ( count == 0 ) {
    290306        #ifdef CHAN_STATS
    291307        blocks++;
     
    299315    // Remove from buffer
    300316    __do_remove( chan, retval );
    301 
    302317    unlock( mutex_lock );
    303318    return retval;
    304319}
     320
     321///////////////////////////////////////////////////////////////////////////////////////////
     322// The following is support for waituntil (select) statements
     323///////////////////////////////////////////////////////////////////////////////////////////
     324static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
     325    // if ( !node`isListed && !node.park_counter ) return false; // handle special OR case C_TODO: try adding this back
     326    lock( mutex_lock );
     327    if ( node`isListed ) { // op wasn't performed
     328        #ifdef CHAN_STATS
     329        operations--;
     330        #endif
     331        remove( node );
     332        unlock( mutex_lock );
     333        return false;
     334    }
     335    unlock( mutex_lock );
     336
     337    // only return true when not special OR case, not exceptional calse and status is SAT
     338    return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT;
     339}
     340
     341// type used by select statement to capture a chan read as the selected operation
     342struct chan_read {
     343    T & ret;
     344    channel(T) & chan;
     345};
     346
     347static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) {
     348    &cr.chan = &chan;
     349    &cr.ret = &ret;
     350}
     351static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; }
     352
     353static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) {
     354    __closed_remove( chan, ret );
     355    // if we get here then the insert succeeded
     356    __make_select_node_available( node );
     357}
     358
     359static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) {
     360    lock( mutex_lock );
     361    node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
     362
     363    #ifdef CHAN_STATS
     364    if ( !closed ) operations++;
     365    #endif
     366
     367    if ( !node.park_counter ) {
     368        // are we special case OR and front of cons is also special case OR
     369        if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {
     370            if ( !__make_select_node_pending( node ) ) {
     371                unlock( mutex_lock );
     372                return false;
     373            }
     374           
     375            if ( __handle_waituntil_OR( prods ) ) {
     376                __prods_handoff( chan, ret );
     377                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
     378                unlock( mutex_lock );
     379                return true;
     380            }
     381            __make_select_node_unsat( node );
     382        }
     383        // check if we can complete operation. If so race to establish winner in special OR case
     384        if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {
     385            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     386                unlock( mutex_lock );
     387                return false;
     388            }
     389        }
     390    }
     391
     392    if ( unlikely(closed) ) {
     393        unlock( mutex_lock );
     394        __handle_select_closed_read( this, node );
     395        return true;
     396    }
     397
     398    // have to check for the zero size channel case
     399    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     400        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     401        __prods_handoff( chan, ret );
     402        __set_avail_then_unlock( node, mutex_lock );
     403        return true;
     404    }
     405
     406    // wait if buffer is empty, work will be completed by someone else
     407    if ( count == 0 ) {
     408        #ifdef CHAN_STATS
     409        blocks++;
     410        #endif
     411       
     412        insert_last( cons, node );
     413        unlock( mutex_lock );
     414        return false;
     415    }
     416
     417    // Remove from buffer
     418    __do_remove( chan, ret );
     419    __set_avail_then_unlock( node, mutex_lock );
     420    return true;
     421}
     422static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
     423static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
     424    if ( node.extra == 0p ) // check if woken up due to closed channel
     425        __closed_remove( chan, ret );
     426    // This is only reachable if not closed or closed exception was handled
     427    return true;
     428}
     429
     430// type used by select statement to capture a chan write as the selected operation
     431struct chan_write {
     432    T elem;
     433    channel(T) & chan;
     434};
     435
     436static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) {
     437    &cw.chan = &chan;
     438    memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
     439}
     440static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; }
     441
     442static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) {
     443    __closed_insert( chan, elem );
     444    // if we get here then the insert succeeded
     445    __make_select_node_available( node );
     446}
     447
     448static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) {
     449    lock( mutex_lock );
     450    node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
     451
     452    #ifdef CHAN_STATS
     453    if ( !closed ) operations++;
     454    #endif
     455
     456    // special OR case handling
     457    if ( !node.park_counter ) {
     458        // are we special case OR and front of cons is also special case OR
     459        if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {
     460            if ( !__make_select_node_pending( node ) ) {
     461                unlock( mutex_lock );
     462                return false;
     463            }
     464           
     465            if ( __handle_waituntil_OR( cons ) ) {
     466                __cons_handoff( chan, elem );
     467                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
     468                unlock( mutex_lock );
     469                return true;
     470            }
     471            __make_select_node_unsat( node );
     472        }
     473        // check if we can complete operation. If so race to establish winner in special OR case
     474        if ( count != size || !cons`isEmpty || unlikely(closed) ) {
     475            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     476                unlock( mutex_lock );
     477                return false;
     478            }
     479        }
     480    }
     481
     482    // if closed handle
     483    if ( unlikely(closed) ) {
     484        unlock( mutex_lock );
     485        __handle_select_closed_write( this, node );
     486        return true;
     487    }
     488
     489    // handle blocked consumer case via handoff (buffer is implicitly empty)
     490    ConsEmpty: if ( !cons`isEmpty ) {
     491        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
     492        __cons_handoff( chan, elem );
     493        __set_avail_then_unlock( node, mutex_lock );
     494        return true;
     495    }
     496
     497    // insert node in list if buffer is full, work will be completed by someone else
     498    if ( count == size ) {
     499        #ifdef CHAN_STATS
     500        blocks++;
     501        #endif
     502
     503        insert_last( prods, node );
     504        unlock( mutex_lock );
     505        return false;
     506    } // if
     507
     508    // otherwise carry out write either via normal insert
     509    __buf_insert( chan, elem );
     510    __set_avail_then_unlock( node, mutex_lock );
     511    return true;
     512}
     513static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
     514
     515static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
     516    if ( node.extra == 0p ) // check if woken up due to closed channel
     517        __closed_insert( chan, elem );
     518
     519    // This is only reachable if not closed or closed exception was handled
     520    return true;
     521}
     522
    305523} // forall( T )
     524
     525
Note: See TracChangeset for help on using the changeset viewer.