Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    ra45e21c r88b49bb  
     1//
     2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
     3//
     4// The contents of this file are covered under the licence agreement in the
     5// file "LICENCE" distributed with Cforall.
     6//
     7// channel.hfa -- LIBCFATHREAD
     8// Runtime locks that used with the runtime thread system.
     9//
     10// Author           : Colby Alexander Parsons
     11// Created On       : Thu Jan 21 19:46:50 2022
     12// Last Modified By :
     13// Last Modified On :
     14// Update Count     :
     15//
     16
    117#pragma once
    218
    319#include <locks.hfa>
    420#include <list.hfa>
    5 #include <mutex_stmt.hfa>
    6 
    7 // link field used for threads waiting on channel
    8 struct wait_link {
    9     // used to put wait_link on a dl queue
    10     inline dlink(wait_link);
    11 
    12     // waiting thread
    13     struct thread$ * t;
    14 
    15     // shadow field
    16     void * elem;
    17 };
    18 P9_EMBEDDED( wait_link, dlink(wait_link) )
    19 
    20 static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
    21     this.t = t;
    22     this.elem = elem;
    23 }
    24 
    25 // wake one thread from the list
    26 static inline void wake_one( dlist( wait_link ) & queue ) {
    27     wait_link & popped = try_pop_front( queue );
    28     unpark( popped.t );
    29 }
     21#include "select.hfa"
    3022
    3123// returns true if woken due to shutdown
    3224// blocks thread on list and releases passed lock
    33 static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) {
    34     wait_link w{ active_thread(), elem_ptr };
    35     insert_last( queue, w );
     25static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
     26    select_node sn{ active_thread(), elem_ptr };
     27    insert_last( queue, sn );
    3628    unlock( lock );
    3729    park();
    38     return w.elem == 0p;
     30    return sn.extra == 0p;
     31}
     32
     33// Waituntil support (un)register_select helper routine
     34// Sets select node avail if not special OR case and then unlocks
     35static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
     36    if ( node.park_counter ) __make_select_node_available( node );
     37    unlock( mutex_lock );
    3938}
    4039
     
    5958    size_t size, front, back, count;
    6059    T * buffer;
    61     dlist( wait_link ) prods, cons; // lists of blocked threads
    62     go_mutex mutex_lock;            // MX lock
    63     bool closed;                    // indicates channel close/open
    64     #ifdef CHAN_STATS
    65     size_t blocks, operations;      // counts total ops and ops resulting in a blocked thd
     60    dlist( select_node ) prods, cons; // lists of blocked threads
     61    go_mutex mutex_lock;              // MX lock
     62    bool closed;                      // indicates channel close/open
     63    #ifdef CHAN_STATS
     64    size_t p_blocks, p_ops, c_blocks, c_ops;      // counts total ops and ops resulting in a blocked thd
    6665    #endif
    6766};
     
    7069    size = _size;
    7170    front = back = count = 0;
    72     buffer = aalloc( size );
     71    if ( size != 0 ) buffer = aalloc( size );
    7372    prods{};
    7473    cons{};
     
    7675    closed = false;
    7776    #ifdef CHAN_STATS
    78     blocks = 0;
    79     operations = 0;
     77    p_blocks = 0;
     78    p_ops = 0;
     79    c_blocks = 0;
     80    c_ops = 0;
    8081    #endif
    8182}
     
    8485static inline void ^?{}( channel(T) &c ) with(c) {
    8586    #ifdef CHAN_STATS
    86     printf("Channel %p Blocks: %lu, Operations: %lu, %.2f%% of ops blocked\n", &c, blocks, operations, ((double)blocks)/operations * 100);
    87     #endif
    88     verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
    89     delete( buffer );
    90 }
    91 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    92 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
     87    printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100);
     88    printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100);
     89    printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100);
     90    #endif
     91    verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || cons`isEmpty && prods`isEmpty,
     92        "Attempted to delete channel with waiting threads (Deadlock).\n" );
     93    if ( size != 0 ) delete( buffer );
     94}
     95static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }
     96static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }
    9397static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
    9498static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
     
    102106    // flush waiting consumers and producers
    103107    while ( has_waiting_consumers( chan ) ) {
    104         cons`first.elem = 0p;
     108        if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
     109            break;  // if __handle_waituntil_OR returns false cons is empty so break
     110        cons`first.extra = 0p;
    105111        wake_one( cons );
    106112    }
    107113    while ( has_waiting_producers( chan ) ) {
    108         prods`first.elem = 0p;
     114        if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
     115            break;  // if __handle_waituntil_OR returns false prods is empty so break
     116        prods`first.extra = 0p;
    109117        wake_one( prods );
    110118    }
     
    114122static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
    115123
     124// used to hand an element to a blocked consumer and signal it
     125static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
     126    memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
     127    wake_one( cons );
     128}
     129
     130// used to hand an element to a blocked producer and signal it
     131static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
     132    memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
     133    wake_one( prods );
     134}
     135
    116136static inline void flush( channel(T) & chan, T elem ) with(chan) {
    117137    lock( mutex_lock );
    118138    while ( count == 0 && !cons`isEmpty ) {
    119         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    120         wake_one( cons );
     139        __cons_handoff( chan, elem );
    121140    }
    122141    unlock( mutex_lock );
     
    125144// handles buffer insert
    126145static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
    127     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
     146    memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
    128147    count += 1;
    129148    back++;
     
    131150}
    132151
    133 // does the buffer insert or hands elem directly to consumer if one is waiting
    134 static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {
    135     if ( count == 0 && !cons`isEmpty ) {
    136         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    137         wake_one( cons );
    138     } else __buf_insert( chan, elem );
    139 }
    140 
    141152// needed to avoid an extra copy in closed case
    142153static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
    143154    lock( mutex_lock );
    144155    #ifdef CHAN_STATS
    145     operations++;
    146     #endif
     156    p_ops++;
     157    #endif
     158
     159    ConsEmpty: if ( !cons`isEmpty ) {
     160        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
     161        __cons_handoff( chan, elem );
     162        unlock( mutex_lock );
     163        return true;
     164    }
     165
    147166    if ( count == size ) { unlock( mutex_lock ); return false; }
    148     __do_insert( chan, elem );
     167
     168    __buf_insert( chan, elem );
    149169    unlock( mutex_lock );
    150170    return true;
     
    157177// handles closed case of insert routine
    158178static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
    159     channel_closed except{&channel_closed_vt, &elem, &chan };
     179    channel_closed except{ &channel_closed_vt, &elem, &chan };
    160180    throwResume except; // throw closed resumption
    161181    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
     
    172192
    173193    #ifdef CHAN_STATS
    174     if ( !closed ) operations++;
     194    if ( !closed ) p_ops++;
    175195    #endif
    176196
     
    182202    }
    183203
    184     // have to check for the zero size channel case
    185     if ( size == 0 && !cons`isEmpty ) {
    186         memcpy(cons`first.elem, (void *)&elem, sizeof(T));
    187         wake_one( cons );
    188         unlock( mutex_lock );
    189         return true;
     204    // buffer count must be zero if cons are blocked (also handles zero-size case)
     205    ConsEmpty: if ( !cons`isEmpty ) {
     206        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
     207        __cons_handoff( chan, elem );
     208        unlock( mutex_lock );
     209        return;
    190210    }
    191211
     
    193213    if ( count == size ) {
    194214        #ifdef CHAN_STATS
    195         blocks++;
     215        p_blocks++;
    196216        #endif
    197217
     
    202222    } // if
    203223
    204     if ( count == 0 && !cons`isEmpty ) {
    205         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    206         wake_one( cons );
    207     } else __buf_insert( chan, elem );
    208    
    209     unlock( mutex_lock );
    210     return;
    211 }
    212 
    213 // handles buffer remove
    214 static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) {
    215     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     224    __buf_insert( chan, elem );
     225    unlock( mutex_lock );
     226}
     227
     228// does the buffer remove and potentially does waiting producer work
     229static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
     230    memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
    216231    count -= 1;
    217232    front = (front + 1) % size;
    218 }
    219 
    220 // does the buffer remove and potentially does waiting producer work
    221 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
    222     __buf_remove( chan, retval );
    223233    if (count == size - 1 && !prods`isEmpty ) {
    224         __buf_insert( chan, *(T *)prods`first.elem );  // do waiting producer work
     234        if ( !__handle_waituntil_OR( prods ) ) return;
     235        __buf_insert( chan, *(T *)prods`first.extra );  // do waiting producer work
    225236        wake_one( prods );
    226237    }
     
    231242    lock( mutex_lock );
    232243    #ifdef CHAN_STATS
    233     operations++;
    234     #endif
     244    c_ops++;
     245    #endif
     246
     247    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     248        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     249        __prods_handoff( chan, retval );
     250        unlock( mutex_lock );
     251        return true;
     252    }
     253
    235254    if ( count == 0 ) { unlock( mutex_lock ); return false; }
     255
    236256    __do_remove( chan, retval );
    237257    unlock( mutex_lock );
     
    244264static inline [T, bool] try_remove( channel(T) & chan ) {
    245265    T retval;
    246     return [ retval, __internal_try_remove( chan, retval ) ];
    247 }
    248 
    249 static inline T try_remove( channel(T) & chan, T elem ) {
     266    bool success = __internal_try_remove( chan, retval );
     267    return [ retval, success ];
     268}
     269
     270static inline T try_remove( channel(T) & chan ) {
    250271    T retval;
    251272    __internal_try_remove( chan, retval );
     
    255276// handles closed case of insert routine
    256277static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
    257     channel_closed except{&channel_closed_vt, 0p, &chan };
     278    channel_closed except{ &channel_closed_vt, 0p, &chan };
    258279    throwResume except; // throw resumption
    259280    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
     
    269290
    270291    #ifdef CHAN_STATS
    271     if ( !closed ) operations++;
     292    if ( !closed ) c_ops++;
    272293    #endif
    273294
     
    279300
    280301    // have to check for the zero size channel case
    281     if ( size == 0 && !prods`isEmpty ) {
    282         memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
    283         wake_one( prods );
     302    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     303        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     304        __prods_handoff( chan, retval );
    284305        unlock( mutex_lock );
    285306        return retval;
     
    287308
    288309    // wait if buffer is empty, work will be completed by someone else
    289     if (count == 0) {
     310    if ( count == 0 ) {
    290311        #ifdef CHAN_STATS
    291         blocks++;
     312        c_blocks++;
    292313        #endif
    293314        // check for if woken due to close
     
    299320    // Remove from buffer
    300321    __do_remove( chan, retval );
    301 
    302322    unlock( mutex_lock );
    303323    return retval;
    304324}
     325
     326///////////////////////////////////////////////////////////////////////////////////////////
     327// The following is support for waituntil (select) statements
     328///////////////////////////////////////////////////////////////////////////////////////////
     329static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
     330    if ( !node`isListed && !node.park_counter ) return false; // handle special OR case
     331    lock( mutex_lock );
     332    if ( node`isListed ) { // op wasn't performed
     333        remove( node );
     334        unlock( mutex_lock );
     335        return false;
     336    }
     337    unlock( mutex_lock );
     338
     339    // only return true when not special OR case, not exceptional calse and status is SAT
     340    return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT;
     341}
     342
     343// type used by select statement to capture a chan read as the selected operation
     344struct chan_read {
     345    T & ret;
     346    channel(T) & chan;
     347};
     348
     349static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) {
     350    &cr.chan = &chan;
     351    &cr.ret = &ret;
     352}
     353static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; }
     354
     355static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) {
     356    __closed_remove( chan, ret );
     357    // if we get here then the insert succeeded
     358    __make_select_node_available( node );
     359}
     360
     361static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) {
     362    lock( mutex_lock );
     363    node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
     364
     365    #ifdef CHAN_STATS
     366    if ( !closed ) c_ops++;
     367    #endif
     368
     369    if ( !node.park_counter ) {
     370        // are we special case OR and front of cons is also special case OR
     371        if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {
     372            if ( !__make_select_node_pending( node ) ) {
     373                unlock( mutex_lock );
     374                return false;
     375            }
     376           
     377            if ( __handle_waituntil_OR( prods ) ) {
     378                __prods_handoff( chan, ret );
     379                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
     380                unlock( mutex_lock );
     381                return true;
     382            }
     383            __make_select_node_unsat( node );
     384        }
     385        // check if we can complete operation. If so race to establish winner in special OR case
     386        if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {
     387            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     388                unlock( mutex_lock );
     389                return false;
     390            }
     391        }
     392    }
     393
     394    if ( unlikely(closed) ) {
     395        unlock( mutex_lock );
     396        __handle_select_closed_read( this, node );
     397        return true;
     398    }
     399
     400    // have to check for the zero size channel case
     401    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     402        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     403        __prods_handoff( chan, ret );
     404        __set_avail_then_unlock( node, mutex_lock );
     405        return true;
     406    }
     407
     408    // wait if buffer is empty, work will be completed by someone else
     409    if ( count == 0 ) {
     410        #ifdef CHAN_STATS
     411        c_blocks++;
     412        #endif
     413       
     414        insert_last( cons, node );
     415        unlock( mutex_lock );
     416        return false;
     417    }
     418
     419    // Remove from buffer
     420    __do_remove( chan, ret );
     421    __set_avail_then_unlock( node, mutex_lock );
     422    return true;
     423}
     424static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
     425static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
     426    if ( node.extra == 0p ) // check if woken up due to closed channel
     427        __closed_remove( chan, ret );
     428    // This is only reachable if not closed or closed exception was handled
     429    return true;
     430}
     431
     432// type used by select statement to capture a chan write as the selected operation
     433struct chan_write {
     434    T elem;
     435    channel(T) & chan;
     436};
     437
     438static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) {
     439    &cw.chan = &chan;
     440    memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
     441}
     442static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; }
     443
     444static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) {
     445    __closed_insert( chan, elem );
     446    // if we get here then the insert succeeded
     447    __make_select_node_available( node );
     448}
     449
     450static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) {
     451    lock( mutex_lock );
     452    node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
     453
     454    #ifdef CHAN_STATS
     455    if ( !closed ) p_ops++;
     456    #endif
     457
     458    // special OR case handling
     459    if ( !node.park_counter ) {
     460        // are we special case OR and front of cons is also special case OR
     461        if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {
     462            if ( !__make_select_node_pending( node ) ) {
     463                unlock( mutex_lock );
     464                return false;
     465            }
     466           
     467            if ( __handle_waituntil_OR( cons ) ) {
     468                __cons_handoff( chan, elem );
     469                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
     470                unlock( mutex_lock );
     471                return true;
     472            }
     473            __make_select_node_unsat( node );
     474        }
     475        // check if we can complete operation. If so race to establish winner in special OR case
     476        if ( count != size || !cons`isEmpty || unlikely(closed) ) {
     477            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     478                unlock( mutex_lock );
     479                return false;
     480            }
     481        }
     482    }
     483
     484    // if closed handle
     485    if ( unlikely(closed) ) {
     486        unlock( mutex_lock );
     487        __handle_select_closed_write( this, node );
     488        return true;
     489    }
     490
     491    // handle blocked consumer case via handoff (buffer is implicitly empty)
     492    ConsEmpty: if ( !cons`isEmpty ) {
     493        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
     494        __cons_handoff( chan, elem );
     495        __set_avail_then_unlock( node, mutex_lock );
     496        return true;
     497    }
     498
     499    // insert node in list if buffer is full, work will be completed by someone else
     500    if ( count == size ) {
     501        #ifdef CHAN_STATS
     502        p_blocks++;
     503        #endif
     504
     505        insert_last( prods, node );
     506        unlock( mutex_lock );
     507        return false;
     508    } // if
     509
     510    // otherwise carry out write either via normal insert
     511    __buf_insert( chan, elem );
     512    __set_avail_then_unlock( node, mutex_lock );
     513    return true;
     514}
     515static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
     516
     517static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
     518    if ( node.extra == 0p ) // check if woken up due to closed channel
     519        __closed_insert( chan, elem );
     520
     521    // This is only reachable if not closed or closed exception was handled
     522    return true;
     523}
     524
    305525} // forall( T )
     526
     527
Note: See TracChangeset for help on using the changeset viewer.