Ignore:
Timestamp:
Apr 21, 2023, 5:36:12 PM (2 years ago)
Author:
JiadaL <j82liang@…>
Branches:
ADT, master
Children:
28f8f15, 6e4c44d
Parents:
2ed94a9 (diff), 699a97d (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    r2ed94a9 rb110bcc  
     1#pragma once
     2
    13#include <locks.hfa>
    2 
    3 struct no_reacq_lock {
    4     inline exp_backoff_then_block_lock;
     4#include <list.hfa>
     5#include <mutex_stmt.hfa>
     6
     7// link field used for threads waiting on channel
     8struct wait_link {
     9    // used to put wait_link on a dl queue
     10    inline dlink(wait_link);
     11
     12    // waiting thread
     13    struct thread$ * t;
     14
     15    // shadow field
     16    void * elem;
    517};
    6 
    7 // have to override these by hand to get around plan 9 inheritance bug where resolver can't find the appropriate routine to call
    8 static inline void   ?{}( no_reacq_lock & this ) { ((exp_backoff_then_block_lock &)this){}; }
    9 static inline bool   try_lock(no_reacq_lock & this) { return try_lock(((exp_backoff_then_block_lock &)this)); }
    10 static inline void   lock(no_reacq_lock & this) { lock(((exp_backoff_then_block_lock &)this)); }
    11 static inline void   unlock(no_reacq_lock & this) { unlock(((exp_backoff_then_block_lock &)this)); }
    12 static inline void   on_notify(no_reacq_lock & this, struct thread$ * t ) { on_notify(((exp_backoff_then_block_lock &)this), t); }
    13 static inline size_t on_wait(no_reacq_lock & this) { return on_wait(((exp_backoff_then_block_lock &)this)); }
    14 // override wakeup so that we don't reacquire the lock if using a condvar
    15 static inline void   on_wakeup( no_reacq_lock & this, size_t recursion ) {}
     18P9_EMBEDDED( wait_link, dlink(wait_link) )
     19
     20static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
     21    this.t = t;
     22    this.elem = elem;
     23}
     24
     25// wake one thread from the list
     26static inline void wake_one( dlist( wait_link ) & queue ) {
     27    wait_link & popped = try_pop_front( queue );
     28    unpark( popped.t );
     29}
     30
     31// returns true if woken due to shutdown
     32// blocks thread on list and releases passed lock
     33static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) {
     34    wait_link w{ active_thread(), elem_ptr };
     35    insert_last( queue, w );
     36    unlock( lock );
     37    park();
     38    return w.elem == 0p;
     39}
     40
     41// void * used for some fields since exceptions don't work with parametric polymorphism currently
     42exception channel_closed {
     43    // on failed insert elem is a ptr to the element attempting to be inserted
     44    // on failed remove elem ptr is 0p
     45    // on resumption of a failed insert this elem will be inserted
     46    // so a user may modify it in the resumption handler
     47    void * elem;
     48
     49    // pointer to chan that is closed
     50    void * closed_chan;
     51};
     52vtable(channel_closed) channel_closed_vt;
     53
     54// #define CHAN_STATS // define this to get channel stats printed in dtor
    1655
    1756forall( T ) {
    18 struct channel {
    19     size_t size;
    20     size_t front, back, count;
     57
     58struct __attribute__((aligned(128))) channel {
     59    size_t size, front, back, count;
    2160    T * buffer;
    22     fast_cond_var( no_reacq_lock ) prods, cons;
    23     no_reacq_lock mutex_lock;
     61    dlist( wait_link ) prods, cons; // lists of blocked threads
     62    go_mutex mutex_lock;            // MX lock
     63    bool closed;                    // indicates channel close/open
     64    #ifdef CHAN_STATS
     65    size_t blocks, operations;      // counts total ops and ops resulting in a blocked thd
     66    #endif
    2467};
    2568
     
    2770    size = _size;
    2871    front = back = count = 0;
    29     buffer = anew( size );
     72    buffer = aalloc( size );
    3073    prods{};
    3174    cons{};
    3275    mutex_lock{};
     76    closed = false;
     77    #ifdef CHAN_STATS
     78    blocks = 0;
     79    operations = 0;
     80    #endif
    3381}
    3482
    3583static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    36 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
     84static inline void ^?{}( channel(T) &c ) with(c) {
     85    #ifdef CHAN_STATS
     86    printf("Channel %p Blocks: %lu, Operations: %lu, %.2f%% of ops blocked\n", &c, blocks, operations, ((double)blocks)/operations * 100);
     87    #endif
     88    verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
     89    delete( buffer );
     90}
    3791static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    3892static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    39 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
    40 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
    41 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
    42 
    43 static inline void insert_( channel(T) & chan, T elem ) with(chan) {
     93static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
     94static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
     95static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
     96
     97// closes the channel and notifies all blocked threads
     98static inline void close( channel(T) & chan ) with(chan) {
     99    lock( mutex_lock );
     100    closed = true;
     101
     102    // flush waiting consumers and producers
     103    while ( has_waiting_consumers( chan ) ) {
     104        cons`first.elem = 0p;
     105        wake_one( cons );
     106    }
     107    while ( has_waiting_producers( chan ) ) {
     108        prods`first.elem = 0p;
     109        wake_one( prods );
     110    }
     111    unlock(mutex_lock);
     112}
     113
     114static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
     115
     116static inline void flush( channel(T) & chan, T elem ) with(chan) {
     117    lock( mutex_lock );
     118    while ( count == 0 && !cons`isEmpty ) {
     119        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     120        wake_one( cons );
     121    }
     122    unlock( mutex_lock );
     123}
     124
     125// handles buffer insert
     126static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
    44127    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    45128    count += 1;
     
    48131}
    49132
     133// does the buffer insert or hands elem directly to consumer if one is waiting
     134static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {
     135    if ( count == 0 && !cons`isEmpty ) {
     136        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     137        wake_one( cons );
     138    } else __buf_insert( chan, elem );
     139}
     140
     141// needed to avoid an extra copy in closed case
     142static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
     143    lock( mutex_lock );
     144    #ifdef CHAN_STATS
     145    operations++;
     146    #endif
     147    if ( count == size ) { unlock( mutex_lock ); return false; }
     148    __do_insert( chan, elem );
     149    unlock( mutex_lock );
     150    return true;
     151}
     152
     153// attempts a nonblocking insert
     154// returns true if insert was successful, false otherwise
     155static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
     156
     157// handles closed case of insert routine
     158static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
     159    channel_closed except{&channel_closed_vt, &elem, &chan };
     160    throwResume except; // throw closed resumption
     161    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
     162}
    50163
    51164static inline void insert( channel(T) & chan, T elem ) with(chan) {
    52     lock( mutex_lock );
     165    // check for close before acquire mx
     166    if ( unlikely(closed) ) {
     167        __closed_insert( chan, elem );
     168        return;
     169    }
     170
     171    lock( mutex_lock );
     172
     173    #ifdef CHAN_STATS
     174    if ( !closed ) operations++;
     175    #endif
     176
     177    // if closed handle
     178    if ( unlikely(closed) ) {
     179        unlock( mutex_lock );
     180        __closed_insert( chan, elem );
     181        return;
     182    }
    53183
    54184    // have to check for the zero size channel case
    55     if ( size == 0 && !empty( cons ) ) {
    56         memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
    57         notify_one( cons );
     185    if ( size == 0 && !cons`isEmpty ) {
     186        memcpy(cons`first.elem, (void *)&elem, sizeof(T));
     187        wake_one( cons );
    58188        unlock( mutex_lock );
    59         return;
     189        return true;
    60190    }
    61191
    62192    // wait if buffer is full, work will be completed by someone else
    63     if ( count == size ) {
    64         wait( prods, mutex_lock, (uintptr_t)&elem );
     193    if ( count == size ) {
     194        #ifdef CHAN_STATS
     195        blocks++;
     196        #endif
     197
     198        // check for if woken due to close
     199        if ( unlikely( block( prods, &elem, mutex_lock ) ) )
     200            __closed_insert( chan, elem );
    65201        return;
    66202    } // if
    67203
    68     if ( count == 0 && !empty( cons ) )
    69         // do waiting consumer work
    70         memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
    71     else insert_( chan, elem );
     204    if ( count == 0 && !cons`isEmpty ) {
     205        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     206        wake_one( cons );
     207    } else __buf_insert( chan, elem );
    72208   
    73     notify_one( cons );
    74     unlock( mutex_lock );
    75 }
    76 
    77 static inline T remove( channel(T) & chan ) with(chan) {
    78     lock( mutex_lock );
    79     T retval;
    80 
    81     // have to check for the zero size channel case
    82     if ( size == 0 && !empty( prods ) ) {
    83         memcpy((void *)&retval, (void *)front( prods ), sizeof(T));
    84         notify_one( prods );
    85         unlock( mutex_lock );
    86         return retval;
    87     }
    88 
    89     // wait if buffer is empty, work will be completed by someone else
    90     if (count == 0) {
    91         wait( cons, mutex_lock, (uintptr_t)&retval );
    92         return retval;
    93     }
    94 
    95     // Remove from buffer
     209    unlock( mutex_lock );
     210    return;
     211}
     212
     213// handles buffer remove
     214static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) {
    96215    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    97216    count -= 1;
    98217    front = (front + 1) % size;
    99 
    100     if (count == size - 1 && !empty( prods ) )
    101         insert_( chan, *((T *)front( prods )) );  // do waiting producer work
    102 
    103     notify_one( prods );
    104     unlock( mutex_lock );
     218}
     219
     220// does the buffer remove and potentially does waiting producer work
     221static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
     222    __buf_remove( chan, retval );
     223    if (count == size - 1 && !prods`isEmpty ) {
     224        __buf_insert( chan, *(T *)prods`first.elem );  // do waiting producer work
     225        wake_one( prods );
     226    }
     227}
     228
     229// needed to avoid an extra copy in closed case and single return val case
     230static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
     231    lock( mutex_lock );
     232    #ifdef CHAN_STATS
     233    operations++;
     234    #endif
     235    if ( count == 0 ) { unlock( mutex_lock ); return false; }
     236    __do_remove( chan, retval );
     237    unlock( mutex_lock );
     238    return true;
     239}
     240
     241// attempts a nonblocking remove
     242// returns [T, true] if insert was successful
     243// returns [T, false] if insert was successful (T uninit)
     244static inline [T, bool] try_remove( channel(T) & chan ) {
     245    T retval;
     246    return [ retval, __internal_try_remove( chan, retval ) ];
     247}
     248
     249static inline T try_remove( channel(T) & chan, T elem ) {
     250    T retval;
     251    __internal_try_remove( chan, retval );
    105252    return retval;
    106253}
    107254
     255// handles closed case of insert routine
     256static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
     257    channel_closed except{&channel_closed_vt, 0p, &chan };
     258    throwResume except; // throw resumption
     259    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
     260}
     261
     262static inline T remove( channel(T) & chan ) with(chan) {
     263    T retval;
     264    if ( unlikely(closed) ) {
     265        __closed_remove( chan, retval );
     266        return retval;
     267    }
     268    lock( mutex_lock );
     269
     270    #ifdef CHAN_STATS
     271    if ( !closed ) operations++;
     272    #endif
     273
     274    if ( unlikely(closed) ) {
     275        unlock( mutex_lock );
     276        __closed_remove( chan, retval );
     277        return retval;
     278    }
     279
     280    // have to check for the zero size channel case
     281    if ( size == 0 && !prods`isEmpty ) {
     282        memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
     283        wake_one( prods );
     284        unlock( mutex_lock );
     285        return retval;
     286    }
     287
     288    // wait if buffer is empty, work will be completed by someone else
     289    if (count == 0) {
     290        #ifdef CHAN_STATS
     291        blocks++;
     292        #endif
     293        // check for if woken due to close
     294        if ( unlikely( block( cons, &retval, mutex_lock ) ) )
     295            __closed_remove( chan, retval );
     296        return retval;
     297    }
     298
     299    // Remove from buffer
     300    __do_remove( chan, retval );
     301
     302    unlock( mutex_lock );
     303    return retval;
     304}
    108305} // forall( T )
Note: See TracChangeset for help on using the changeset viewer.