Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    r6b33e89 rfe293bf  
    5757
    5858forall( T ) {
    59          struct __attribute__((aligned(128))) channel {
    60                 size_t size, front, back, count;
    61                 T * buffer;
    62                 dlist( select_node ) prods, cons;                               // lists of blocked threads
    63                 go_mutex mutex_lock;                                                    // MX lock
    64                 bool closed;                                                                    // indicates channel close/open
    65                 #ifdef CHAN_STATS
    66                 size_t p_blocks, p_ops, c_blocks, c_ops;                // counts total ops and ops resulting in a blocked thd
    67             #endif
    68         };
    69 
    70          // type used by select statement to capture a chan read as the selected operation
    71          struct chan_read {
    72                  T * ret;
    73                  channel(T) * chan;
    74          };
    75          __CFA_SELECT_GET_TYPE( chan_read(T) );
    76 
    77          // type used by select statement to capture a chan read as the selected operation that doesn't have a param to read to
    78          struct chan_read_no_ret {
    79                  T retval;
    80                  chan_read( T ) c_read;
    81          };
    82          __CFA_SELECT_GET_TYPE( chan_read_no_ret(T) );
    83 
    84          // type used by select statement to capture a chan write as the selected operation
    85          struct chan_write {
    86                  T elem;
    87                  channel(T) * chan;
    88          };
    89          __CFA_SELECT_GET_TYPE( chan_write(T) );
    90 } // distribution
    91 
    92 static inline forall( T ) {
    93         void ?{}( channel(T) & this, channel(T) this2 ) = void;
    94         void ?=?( channel(T) & this, channel(T) this2 ) = void;
    95 
    96         void ?{}( channel(T) &c, size_t _size ) with(c) {
    97                 size = _size;
    98                 front = back = count = 0;
    99                 if ( size != 0 ) buffer = aalloc( size );
    100                 prods{};
    101                 cons{};
    102                 mutex_lock{};
    103                 closed = false;
    104             #ifdef CHAN_STATS
    105                 p_blocks = 0;
    106                 p_ops = 0;
    107                 c_blocks = 0;
    108                 c_ops = 0;
    109             #endif
    110         }
    111 
    112         void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    113         void ^?{}( channel(T) &c ) with(c) {
    114             #ifdef CHAN_STATS
    115                 printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100);
    116                 printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100);
    117                 printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100);
    118             #endif
    119                 verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || isEmpty( cons ) && isEmpty( prods ),
    120                                  "Attempted to delete channel with waiting threads (Deadlock).\n" );
    121                 if ( size != 0 ) delete( buffer );
    122         }
    123         size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }
    124         size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }
    125         bool has_waiters( channel(T) & chan ) with(chan) { return ! isEmpty( cons ) || ! isEmpty( prods ); }
    126         bool has_waiting_consumers( channel(T) & chan ) with(chan) { return ! isEmpty( cons ); }
    127         bool has_waiting_producers( channel(T) & chan ) with(chan) { return ! isEmpty( prods ); }
    128 
    129         // closes the channel and notifies all blocked threads
    130         void close( channel(T) & chan ) with(chan) {
    131                 lock( mutex_lock );
    132                 closed = true;
    133 
    134                 // flush waiting consumers and producers
    135                 while ( has_waiting_consumers( chan ) ) {
    136                         if( ! __handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
    137                                 break;  // if __handle_waituntil_OR returns false cons is empty so break
    138                         first( cons ).extra = 0p;
    139                         wake_one( cons );
    140                 }
    141                 while ( has_waiting_producers( chan ) ) {
    142                         if( ! __handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
    143                                 break;  // if __handle_waituntil_OR returns false prods is empty so break
    144                         first( prods ).extra = 0p;
    145                         wake_one( prods );
    146                 }
    147                 unlock(mutex_lock);
    148         }
    149 
    150         void is_closed( channel(T) & chan ) with(chan) { return closed; }
    151 
    152         // used to hand an element to a blocked consumer and signal it
    153         void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
    154                 memcpy( first( cons ).extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
    155                 wake_one( cons );
    156         }
    157 
    158         // used to hand an element to a blocked producer and signal it
    159         void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
    160                 memcpy( (void *)&retval, first( prods ).extra, sizeof(T) );
    161                 wake_one( prods );
    162         }
    163 
    164         void flush( channel(T) & chan, T elem ) with(chan) {
    165                 lock( mutex_lock );
    166                 while ( count == 0 && ! isEmpty( cons ) ) {
    167                         __cons_handoff( chan, elem );
    168                 }
    169                 unlock( mutex_lock );
    170         }
    171 
    172         // handles buffer insert
    173         void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
    174                 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
    175                 count += 1;
    176                 back++;
    177                 if ( back == size ) back = 0;
    178         }
    179 
    180         // needed to avoid an extra copy in closed case
    181         bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
    182                 lock( mutex_lock );
    183             #ifdef CHAN_STATS
    184                 p_ops++;
    185             #endif
    186 
    187           ConsEmpty:
    188                 if ( ! isEmpty( cons ) ) {
    189                         if ( ! __handle_waituntil_OR( cons ) ) break ConsEmpty;
    190                         __cons_handoff( chan, elem );
    191                         unlock( mutex_lock );
    192                         return true;
    193                 }
    194 
    195                 if ( count == size ) { unlock( mutex_lock ); return false; }
    196 
    197                 __buf_insert( chan, elem );
    198                 unlock( mutex_lock );
    199                 return true;
    200         }
    201 
    202         // attempts a nonblocking insert
    203         // returns true if insert was successful, false otherwise
    204         bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
    205 
    206         // handles closed case of insert routine
    207         void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
    208                 channel_closed except{ &channel_closed_vt, &elem, &chan };
    209                 throwResume except; // throw closed resumption
    210                 if ( ! __internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
    211         }
    212 
    213         void insert( channel(T) & chan, T elem ) with(chan) {
    214                 // check for close before acquire mx
    215                 if ( unlikely(closed) ) {
    216                         __closed_insert( chan, elem );
    217                         return;
    218                 }
    219 
    220                 lock( mutex_lock );
    221 
    222             #ifdef CHAN_STATS
    223                 if ( ! closed ) p_ops++;
    224             #endif
    225 
    226                 // if closed handle
    227                 if ( unlikely(closed) ) {
    228                         unlock( mutex_lock );
    229                         __closed_insert( chan, elem );
    230                         return;
    231                 }
    232 
    233                 // buffer count must be zero if cons are blocked (also handles zero-size case)
    234           ConsEmpty:
    235                 if ( ! isEmpty( cons ) ) {
    236                         if ( ! __handle_waituntil_OR( cons ) ) break ConsEmpty;
    237                         __cons_handoff( chan, elem );
    238                         unlock( mutex_lock );
    239                         return;
    240                 }
    241 
    242                 // wait if buffer is full, work will be completed by someone else
    243                 if ( count == size ) {
    244                 #ifdef CHAN_STATS
    245                         p_blocks++;
    246                 #endif
    247 
    248                         // check for if woken due to close
    249                         if ( unlikely( block( prods, &elem, mutex_lock ) ) )
    250                                 __closed_insert( chan, elem );
    251                         return;
    252                 } // if
    253 
    254                 __buf_insert( chan, elem );
    255                 unlock( mutex_lock );
    256         }
    257 
    258         // does the buffer remove and potentially does waiting producer work
    259         void __do_remove( channel(T) & chan, T & retval ) with(chan) {
    260                 memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
    261                 count -= 1;
    262                 front = (front + 1) % size;
    263                 if (count == size - 1 && ! isEmpty( prods ) ) {
    264                         if ( ! __handle_waituntil_OR( prods ) ) return;
    265                         __buf_insert( chan, *(T *)first( prods ).extra );  // do waiting producer work
    266                         wake_one( prods );
    267                 }
    268         }
    269 
    270         // needed to avoid an extra copy in closed case and single return val case
    271         bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
    272                 lock( mutex_lock );
    273             #ifdef CHAN_STATS
    274                 c_ops++;
    275             #endif
    276 
    277           ZeroSize:
    278                 if ( size == 0 && ! isEmpty( prods ) ) {
    279                         if ( ! __handle_waituntil_OR( prods ) ) break ZeroSize;
    280                         __prods_handoff( chan, retval );
    281                         unlock( mutex_lock );
    282                         return true;
    283                 }
    284 
    285                 if ( count == 0 ) { unlock( mutex_lock ); return false; }
    286 
    287                 __do_remove( chan, retval );
    288                 unlock( mutex_lock );
    289                 return true;
    290         }
    291 
    292         // attempts a nonblocking remove
    293         // returns [T, true] if insert was successful
    294         // returns [T, false] if insert was successful (T uninit)
    295         [T, bool] try_remove( channel(T) & chan ) {
    296                 T retval;
    297                 bool success = __internal_try_remove( chan, retval );
    298                 return [ retval, success ];
    299         }
    300 
    301         T try_remove( channel(T) & chan ) {
    302                 T retval;
    303                 __internal_try_remove( chan, retval );
    304                 return retval;
    305         }
    306 
    307         // handles closed case of insert routine
    308         void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
    309                 channel_closed except{ &channel_closed_vt, 0p, &chan };
    310                 throwResume except; // throw resumption
    311                 if ( ! __internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
    312         }
    313 
    314         T remove( channel(T) & chan ) with(chan) {
    315                 T retval;
    316                 if ( unlikely(closed) ) {
    317                         __closed_remove( chan, retval );
    318                         return retval;
    319                 }
    320                 lock( mutex_lock );
    321 
    322             #ifdef CHAN_STATS
    323                 if ( ! closed ) c_ops++;
    324                 #endif
    325 
    326                 if ( unlikely(closed) ) {
    327                         unlock( mutex_lock );
    328                         __closed_remove( chan, retval );
    329                         return retval;
    330                 }
    331 
    332                 // have to check for the zero size channel case
    333           ZeroSize:
    334                 if ( size == 0 && ! isEmpty( prods ) ) {
    335                         if ( ! __handle_waituntil_OR( prods ) ) break ZeroSize;
    336                         __prods_handoff( chan, retval );
    337                         unlock( mutex_lock );
    338                         return retval;
    339                 }
    340 
    341                 // wait if buffer is empty, work will be completed by someone else
    342                 if ( count == 0 ) {
    343                 #ifdef CHAN_STATS
    344                         c_blocks++;
    345                 #endif
    346                         // check for if woken due to close
    347                         if ( unlikely( block( cons, &retval, mutex_lock ) ) )
    348                                 __closed_remove( chan, retval );
    349                         return retval;
    350                 }
    351 
    352                 // Remove from buffer
    353                 __do_remove( chan, retval );
    354                 unlock( mutex_lock );
    355                 return retval;
    356         }
    357         void remove( channel(T) & chan ) { T elem = (T)remove( chan ); }
    358 
    359 
    360         ///////////////////////////////////////////////////////////////////////////////////////////
    361         // The following is Go-style operator support for channels
    362         ///////////////////////////////////////////////////////////////////////////////////////////
    363 
    364         void ?<<?( channel(T) & chan, T elem ) { insert( chan, elem ); }
    365         void ?<<?( T & ret, channel(T) & chan ) { ret = remove( chan ); }
    366 
    367         ///////////////////////////////////////////////////////////////////////////////////////////
    368         // The following is support for waituntil (select) statements
    369         ///////////////////////////////////////////////////////////////////////////////////////////
    370         bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
    371             if ( ! isListed( node ) && ! node.park_counter ) return false; // handle special OR case
    372             lock( mutex_lock );
    373             if ( isListed( node ) ) { // op wasn't performed
    374                 remove( node );
    375                 unlock( mutex_lock );
    376                 return false;
    377             }
    378             unlock( mutex_lock );
    379 
    380             // only return true when not special OR case and status is SAT
    381             return ! node.park_counter ? false : *node.clause_status == __SELECT_SAT;
    382         }
    383 
    384         // special case of __handle_waituntil_OR, that does some work to avoid starvation/deadlock case
    385         bool __handle_pending( dlist( select_node ) & queue, select_node & mine ) {
    386             while ( ! isEmpty( queue ) ) {
    387                 // if node not a special OR case or if we win the special OR case race break
    388                 if ( ! first( queue ).clause_status || first( queue ).park_counter || __pending_set_other( first( queue ), mine, ((unsigned long int)(&(first( queue )))) ) )
    389                     return true;
     59
     60struct __attribute__((aligned(128))) channel {
     61    size_t size, front, back, count;
     62    T * buffer;
     63    dlist( select_node ) prods, cons; // lists of blocked threads
     64    go_mutex mutex_lock;              // MX lock
     65    bool closed;                      // indicates channel close/open
     66    #ifdef CHAN_STATS
     67    size_t p_blocks, p_ops, c_blocks, c_ops;      // counts total ops and ops resulting in a blocked thd
     68    #endif
     69};
     70static inline void ?{}( channel(T) & this, channel(T) this2 ) = void;
     71static inline void ?=?( channel(T) & this, channel(T) this2 ) = void;
     72
     73static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
     74    size = _size;
     75    front = back = count = 0;
     76    if ( size != 0 ) buffer = aalloc( size );
     77    prods{};
     78    cons{};
     79    mutex_lock{};
     80    closed = false;
     81    #ifdef CHAN_STATS
     82    p_blocks = 0;
     83    p_ops = 0;
     84    c_blocks = 0;
     85    c_ops = 0;
     86    #endif
     87}
     88
     89static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
     90static inline void ^?{}( channel(T) &c ) with(c) {
     91    #ifdef CHAN_STATS
     92    printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100);
     93    printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100);
     94    printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100);
     95    #endif
     96    verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || cons`isEmpty && prods`isEmpty,
     97        "Attempted to delete channel with waiting threads (Deadlock).\n" );
     98    if ( size != 0 ) delete( buffer );
     99}
     100static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }
     101static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }
     102static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
     103static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
     104static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
     105
     106// closes the channel and notifies all blocked threads
     107static inline void close( channel(T) & chan ) with(chan) {
     108    lock( mutex_lock );
     109    closed = true;
     110
     111    // flush waiting consumers and producers
     112    while ( has_waiting_consumers( chan ) ) {
     113        if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
     114            break;  // if __handle_waituntil_OR returns false cons is empty so break
     115        cons`first.extra = 0p;
     116        wake_one( cons );
     117    }
     118    while ( has_waiting_producers( chan ) ) {
     119        if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
     120            break;  // if __handle_waituntil_OR returns false prods is empty so break
     121        prods`first.extra = 0p;
     122        wake_one( prods );
     123    }
     124    unlock(mutex_lock);
     125}
     126
     127static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
     128
     129// used to hand an element to a blocked consumer and signal it
     130static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
     131    memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
     132    wake_one( cons );
     133}
     134
     135// used to hand an element to a blocked producer and signal it
     136static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
     137    memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
     138    wake_one( prods );
     139}
     140
     141static inline void flush( channel(T) & chan, T elem ) with(chan) {
     142    lock( mutex_lock );
     143    while ( count == 0 && !cons`isEmpty ) {
     144        __cons_handoff( chan, elem );
     145    }
     146    unlock( mutex_lock );
     147}
     148
     149// handles buffer insert
     150static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
     151    memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
     152    count += 1;
     153    back++;
     154    if ( back == size ) back = 0;
     155}
     156
     157// needed to avoid an extra copy in closed case
     158static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
     159    lock( mutex_lock );
     160    #ifdef CHAN_STATS
     161    p_ops++;
     162    #endif
     163
     164    ConsEmpty: if ( !cons`isEmpty ) {
     165        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
     166        __cons_handoff( chan, elem );
     167        unlock( mutex_lock );
     168        return true;
     169    }
     170
     171    if ( count == size ) { unlock( mutex_lock ); return false; }
     172
     173    __buf_insert( chan, elem );
     174    unlock( mutex_lock );
     175    return true;
     176}
     177
     178// attempts a nonblocking insert
     179// returns true if insert was successful, false otherwise
     180static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
     181
     182// handles closed case of insert routine
     183static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
     184    channel_closed except{ &channel_closed_vt, &elem, &chan };
     185    throwResume except; // throw closed resumption
     186    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
     187}
     188
     189static inline void insert( channel(T) & chan, T elem ) with(chan) {
     190    // check for close before acquire mx
     191    if ( unlikely(closed) ) {
     192        __closed_insert( chan, elem );
     193        return;
     194    }
     195
     196    lock( mutex_lock );
     197
     198    #ifdef CHAN_STATS
     199    if ( !closed ) p_ops++;
     200    #endif
     201
     202    // if closed handle
     203    if ( unlikely(closed) ) {
     204        unlock( mutex_lock );
     205        __closed_insert( chan, elem );
     206        return;
     207    }
     208
     209    // buffer count must be zero if cons are blocked (also handles zero-size case)
     210    ConsEmpty: if ( !cons`isEmpty ) {
     211        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
     212        __cons_handoff( chan, elem );
     213        unlock( mutex_lock );
     214        return;
     215    }
     216
     217    // wait if buffer is full, work will be completed by someone else
     218    if ( count == size ) {
     219        #ifdef CHAN_STATS
     220        p_blocks++;
     221        #endif
     222
     223        // check for if woken due to close
     224        if ( unlikely( block( prods, &elem, mutex_lock ) ) )
     225            __closed_insert( chan, elem );
     226        return;
     227    } // if
     228
     229    __buf_insert( chan, elem );
     230    unlock( mutex_lock );
     231}
     232
     233// does the buffer remove and potentially does waiting producer work
     234static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
     235    memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
     236    count -= 1;
     237    front = (front + 1) % size;
     238    if (count == size - 1 && !prods`isEmpty ) {
     239        if ( !__handle_waituntil_OR( prods ) ) return;
     240        __buf_insert( chan, *(T *)prods`first.extra );  // do waiting producer work
     241        wake_one( prods );
     242    }
     243}
     244
     245// needed to avoid an extra copy in closed case and single return val case
     246static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
     247    lock( mutex_lock );
     248    #ifdef CHAN_STATS
     249    c_ops++;
     250    #endif
     251
     252    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     253        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     254        __prods_handoff( chan, retval );
     255        unlock( mutex_lock );
     256        return true;
     257    }
     258
     259    if ( count == 0 ) { unlock( mutex_lock ); return false; }
     260
     261    __do_remove( chan, retval );
     262    unlock( mutex_lock );
     263    return true;
     264}
     265
     266// attempts a nonblocking remove
     267// returns [T, true] if insert was successful
     268// returns [T, false] if insert was successful (T uninit)
     269static inline [T, bool] try_remove( channel(T) & chan ) {
     270    T retval;
     271    bool success = __internal_try_remove( chan, retval );
     272    return [ retval, success ];
     273}
     274
     275static inline T try_remove( channel(T) & chan ) {
     276    T retval;
     277    __internal_try_remove( chan, retval );
     278    return retval;
     279}
     280
     281// handles closed case of insert routine
     282static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
     283    channel_closed except{ &channel_closed_vt, 0p, &chan };
     284    throwResume except; // throw resumption
     285    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
     286}
     287
     288static inline T remove( channel(T) & chan ) with(chan) {
     289    T retval;
     290    if ( unlikely(closed) ) {
     291        __closed_remove( chan, retval );
     292        return retval;
     293    }
     294    lock( mutex_lock );
     295
     296    #ifdef CHAN_STATS
     297    if ( !closed ) c_ops++;
     298    #endif
     299
     300    if ( unlikely(closed) ) {
     301        unlock( mutex_lock );
     302        __closed_remove( chan, retval );
     303        return retval;
     304    }
     305
     306    // have to check for the zero size channel case
     307    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     308        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     309        __prods_handoff( chan, retval );
     310        unlock( mutex_lock );
     311        return retval;
     312    }
     313
     314    // wait if buffer is empty, work will be completed by someone else
     315    if ( count == 0 ) {
     316        #ifdef CHAN_STATS
     317        c_blocks++;
     318        #endif
     319        // check for if woken due to close
     320        if ( unlikely( block( cons, &retval, mutex_lock ) ) )
     321            __closed_remove( chan, retval );
     322        return retval;
     323    }
     324
     325    // Remove from buffer
     326    __do_remove( chan, retval );
     327    unlock( mutex_lock );
     328    return retval;
     329}
     330static inline void remove( channel(T) & chan ) { T elem = (T)remove( chan ); }
     331
     332
     333///////////////////////////////////////////////////////////////////////////////////////////
     334// The following is Go-style operator support for channels
     335///////////////////////////////////////////////////////////////////////////////////////////
     336
     337static inline void ?<<?( channel(T) & chan, T elem ) { insert( chan, elem ); }
     338static inline void ?<<?( T & ret, channel(T) & chan ) { ret = remove( chan ); }
     339
     340///////////////////////////////////////////////////////////////////////////////////////////
     341// The following is support for waituntil (select) statements
     342///////////////////////////////////////////////////////////////////////////////////////////
     343static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
     344    if ( !node`isListed && !node.park_counter ) return false; // handle special OR case
     345    lock( mutex_lock );
     346    if ( node`isListed ) { // op wasn't performed
     347        remove( node );
     348        unlock( mutex_lock );
     349        return false;
     350    }
     351    unlock( mutex_lock );
     352
     353    // only return true when not special OR case and status is SAT
     354    return !node.park_counter ? false : *node.clause_status == __SELECT_SAT;
     355}
     356
     357// special case of __handle_waituntil_OR, that does some work to avoid starvation/deadlock case
     358static inline bool __handle_pending( dlist( select_node ) & queue, select_node & mine ) {
     359    while ( !queue`isEmpty ) {
     360        // if node not a special OR case or if we win the special OR case race break
     361        if ( !queue`first.clause_status || queue`first.park_counter || __pending_set_other( queue`first, mine, ((unsigned long int)(&(queue`first))) ) )
     362            return true;
    390363       
    391                 // our node lost the race when toggling in __pending_set_other
    392                 if ( *mine.clause_status != __SELECT_PENDING )
    393                     return false;
    394 
    395                 // otherwise we lost the special OR race so discard node
    396                 remove_first( queue );
    397             }
    398             return false;
    399         }
    400 
    401         void ?{}( chan_read(T) & cr, channel(T) * chan, T * ret ) {
    402             cr.chan = chan;
    403             cr.ret = ret;
    404         }
    405         chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ &chan, &ret }; return cr; }
    406 
    407                 void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(*this.chan, this) {
    408             __closed_remove( *chan, *ret );
    409             // if we get here then the insert succeeded
    410             __make_select_node_available( node );
    411         }
    412 
    413         bool register_select( chan_read(T) & this, select_node & node ) with(*this.chan, this) {
    414             lock( mutex_lock );
    415             node.extra = ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
    416 
    417             #ifdef CHAN_STATS
    418             if ( ! closed ) c_ops++;
    419             #endif
    420 
    421             if ( ! node.park_counter ) {
    422                 // are we special case OR and front of cons is also special case OR
    423                 if ( ! unlikely(closed) && ! isEmpty( prods ) && first( prods ).clause_status && ! first( prods ).park_counter ) {
    424                     if ( ! __make_select_node_pending( node ) ) {
    425                         unlock( mutex_lock );
    426                         return false;
    427                     }
    428 
    429                             if ( __handle_pending( prods, node ) ) {
    430                         __prods_handoff( *chan, *ret );
    431                         __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
    432                         unlock( mutex_lock );
    433                         return true;
    434                     }
    435                     if ( *node.clause_status == __SELECT_PENDING )
    436                         __make_select_node_unsat( node );
    437                 }
    438                 // check if we can complete operation. If so race to establish winner in special OR case
    439                 if ( count != 0 || ! isEmpty( prods ) || unlikely(closed) ) {
    440                     if ( ! __make_select_node_available( node ) ) { // we didn't win the race so give up on registering
    441                         unlock( mutex_lock );
    442                         return false;
    443                     }
    444                 }
    445             }
    446 
    447             if ( unlikely(closed) ) {
    448                 unlock( mutex_lock );
    449                 __handle_select_closed_read( this, node );
    450                 return true;
    451             }
    452 
    453             // have to check for the zero size channel case
    454             ZeroSize:
    455                 if ( size == 0 && ! isEmpty( prods ) ) {
    456                         if ( ! __handle_waituntil_OR( prods ) ) break ZeroSize;
    457                         __prods_handoff( *chan, *ret );
    458                         __set_avail_then_unlock( node, mutex_lock );
    459                         return true;
    460                 }
    461 
    462                 // wait if buffer is empty, work will be completed by someone else
    463                 if ( count == 0 ) {
    464                 #ifdef CHAN_STATS
    465                 c_blocks++;
    466                 #endif
     364        // our node lost the race when toggling in __pending_set_other
     365        if ( *mine.clause_status != __SELECT_PENDING )
     366            return false;
     367
     368        // otherwise we lost the special OR race so discard node
     369        try_pop_front( queue );
     370    }
     371    return false;
     372}
     373
     374// type used by select statement to capture a chan read as the selected operation
     375struct chan_read {
     376    T * ret;
     377    channel(T) * chan;
     378};
     379__CFA_SELECT_GET_TYPE( chan_read(T) );
     380
     381static inline void ?{}( chan_read(T) & cr, channel(T) * chan, T * ret ) {
     382    cr.chan = chan;
     383    cr.ret = ret;
     384}
     385static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ &chan, &ret }; return cr; }
     386
     387static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(*this.chan, this) {
     388    __closed_remove( *chan, *ret );
     389    // if we get here then the insert succeeded
     390    __make_select_node_available( node );
     391}
     392
     393static inline bool register_select( chan_read(T) & this, select_node & node ) with(*this.chan, this) {
     394    lock( mutex_lock );
     395    node.extra = ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
     396
     397    #ifdef CHAN_STATS
     398    if ( !closed ) c_ops++;
     399    #endif
     400
     401    if ( !node.park_counter ) {
     402        // are we special case OR and front of cons is also special case OR
     403        if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {
     404            if ( !__make_select_node_pending( node ) ) {
     405                unlock( mutex_lock );
     406                return false;
     407            }
     408
     409            if ( __handle_pending( prods, node ) ) {
     410                __prods_handoff( *chan, *ret );
     411                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
     412                unlock( mutex_lock );
     413                return true;
     414            }
     415            if ( *node.clause_status == __SELECT_PENDING )
     416                __make_select_node_unsat( node );
     417        }
     418        // check if we can complete operation. If so race to establish winner in special OR case
     419        if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {
     420            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     421                unlock( mutex_lock );
     422                return false;
     423            }
     424        }
     425    }
     426
     427    if ( unlikely(closed) ) {
     428        unlock( mutex_lock );
     429        __handle_select_closed_read( this, node );
     430        return true;
     431    }
     432
     433    // have to check for the zero size channel case
     434    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     435        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     436        __prods_handoff( *chan, *ret );
     437        __set_avail_then_unlock( node, mutex_lock );
     438        return true;
     439    }
     440
     441    // wait if buffer is empty, work will be completed by someone else
     442    if ( count == 0 ) {
     443        #ifdef CHAN_STATS
     444        c_blocks++;
     445        #endif
    467446       
    468                 insert_last( cons, node );
    469                 unlock( mutex_lock );
    470                 return false;
    471             }
    472 
    473             // Remove from buffer
    474             __do_remove( *chan, *ret );
    475             __set_avail_then_unlock( node, mutex_lock );
    476             return true;
    477         }
    478         bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); }
    479         bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
    480             if ( unlikely(node.extra == 0p) ) {
    481                 if ( ! exception_in_flight() ) __closed_remove( *chan, *ret ); // check if woken up due to closed channel
    482                 else return false;
    483             }
    484             // This is only reachable if not closed or closed exception was handled
    485             return true;
    486         }
    487 
    488         void ?{}( chan_read_no_ret(T) & this, channel(T) & chan ) {
    489             this.c_read{ &chan, &this.retval };
    490         }
    491 
    492         chan_read_no_ret(T) remove( channel(T) & chan ) { chan_read_no_ret(T) c_read{ chan }; return c_read; }
    493         bool register_select( chan_read_no_ret(T) & this, select_node & node ) {
    494             this.c_read.ret = &this.retval;
    495             return register_select( this.c_read, node );
    496         }
    497         bool unregister_select( chan_read_no_ret(T) & this, select_node & node ) { return unregister_select( this.c_read, node ); }
    498         bool on_selected( chan_read_no_ret(T) & this, select_node & node ) { return on_selected( this.c_read, node ); }
    499 
    500         void ?{}( chan_write(T) & cw, channel(T) * chan, T elem ) {
    501             cw.chan = chan;
    502             memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
    503         }
    504         chan_write(T) ?<<?( channel(T) & chan, T elem ) { chan_write(T) cw{ &chan, elem }; return cw; }
    505         chan_write(T) insert( T elem, channel(T) & chan) { chan_write(T) cw{ &chan, elem }; return cw; }
    506 
    507         void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(*this.chan, this) {
    508             __closed_insert( *chan, elem );
    509             // if we get here then the insert succeeded
    510             __make_select_node_available( node );
    511         }
    512 
    513         bool register_select( chan_write(T) & this, select_node & node ) with(*this.chan, this) {
    514             lock( mutex_lock );
    515             node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
    516        
    517             #ifdef CHAN_STATS
    518             if ( ! closed ) p_ops++;
    519             #endif
    520 
    521             // special OR case handling
    522             if ( ! node.park_counter ) {
    523                 // are we special case OR and front of cons is also special case OR
    524                 if ( ! unlikely(closed) && ! isEmpty( cons ) && first( cons ).clause_status && ! first( cons ).park_counter ) {
    525                     if ( ! __make_select_node_pending( node ) ) {
    526                         unlock( mutex_lock );
    527                         return false;
    528                     }
    529                     if ( __handle_pending( cons, node ) ) {
    530                                         __cons_handoff( *chan, elem );
    531                                         __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
    532                                         unlock( mutex_lock );
    533                                         return true;
    534                                 }
    535                                 if ( *node.clause_status == __SELECT_PENDING )
    536                                         __make_select_node_unsat( node );
    537                         }
    538                         // check if we can complete operation. If so race to establish winner in special OR case
    539                         if ( count != size || ! isEmpty( cons ) || unlikely(closed) ) {
    540                                 if ( ! __make_select_node_available( node ) ) { // we didn't win the race so give up on registering
    541                                         unlock( mutex_lock );
    542                                         return false;
    543                                 }
    544                         }
    545                 }
    546 
    547                 // if closed handle
    548                 if ( unlikely(closed) ) {
    549                         unlock( mutex_lock );
    550                         __handle_select_closed_write( this, node );
    551                         return true;
    552                 }
    553 
    554                 // handle blocked consumer case via handoff (buffer is implicitly empty)
    555     ConsEmpty:
    556                 if ( ! isEmpty( cons ) ) {
    557                         if ( ! __handle_waituntil_OR( cons ) ) break ConsEmpty;
    558                         __cons_handoff( *chan, elem );
    559                         __set_avail_then_unlock( node, mutex_lock );
    560                         return true;
    561                 }
    562 
    563                 // insert node in list if buffer is full, work will be completed by someone else
    564                 if ( count == size ) {
    565                 #ifdef CHAN_STATS
    566                         p_blocks++;
    567                 #endif
    568 
    569                         insert_last( prods, node );
    570                         unlock( mutex_lock );
    571                         return false;
    572                 } // if
    573 
    574                 // otherwise carry out write either via normal insert
    575                 __buf_insert( *chan, elem );
    576                 __set_avail_then_unlock( node, mutex_lock );
    577                 return true;
    578         }
    579         bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); }
    580 
    581         bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
    582                 if ( unlikely(node.extra == 0p) ) {
    583                         if ( ! exception_in_flight() ) __closed_insert( *chan, elem ); // check if woken up due to closed channel
    584                         else return false;
    585                 }
    586                 // This is only reachable if not closed or closed exception was handled
    587                 return true;
    588         }
    589 } // distribution
    590 
    591 
     447        insert_last( cons, node );
     448        unlock( mutex_lock );
     449        return false;
     450    }
     451
     452    // Remove from buffer
     453    __do_remove( *chan, *ret );
     454    __set_avail_then_unlock( node, mutex_lock );
     455    return true;
     456}
     457static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); }
     458static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
     459    if ( unlikely(node.extra == 0p) ) {
     460        if ( !exception_in_flight() ) __closed_remove( *chan, *ret ); // check if woken up due to closed channel
     461        else return false;
     462    }
     463    // This is only reachable if not closed or closed exception was handled
     464    return true;
     465}
     466
     467// type used by select statement to capture a chan read as the selected operation that doesn't have a param to read to
     468struct chan_read_no_ret {
     469    T retval;
     470    chan_read( T ) c_read;
     471};
     472__CFA_SELECT_GET_TYPE( chan_read_no_ret(T) );
     473
     474static inline void ?{}( chan_read_no_ret(T) & this, channel(T) & chan ) {
     475    this.c_read{ &chan, &this.retval };
     476}
     477
     478static inline chan_read_no_ret(T) remove( channel(T) & chan ) { chan_read_no_ret(T) c_read{ chan }; return c_read; }
     479static inline bool register_select( chan_read_no_ret(T) & this, select_node & node ) {
     480    this.c_read.ret = &this.retval;
     481    return register_select( this.c_read, node );
     482}
     483static inline bool unregister_select( chan_read_no_ret(T) & this, select_node & node ) { return unregister_select( this.c_read, node ); }
     484static inline bool on_selected( chan_read_no_ret(T) & this, select_node & node ) { return on_selected( this.c_read, node ); }
     485
     486// type used by select statement to capture a chan write as the selected operation
     487struct chan_write {
     488    T elem;
     489    channel(T) * chan;
     490};
     491__CFA_SELECT_GET_TYPE( chan_write(T) );
     492
     493static inline void ?{}( chan_write(T) & cw, channel(T) * chan, T elem ) {
     494    cw.chan = chan;
     495    memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
     496}
     497static inline chan_write(T) ?<<?( channel(T) & chan, T elem ) { chan_write(T) cw{ &chan, elem }; return cw; }
     498static inline chan_write(T) insert( T elem, channel(T) & chan) { chan_write(T) cw{ &chan, elem }; return cw; }
     499
     500static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(*this.chan, this) {
     501    __closed_insert( *chan, elem );
     502    // if we get here then the insert succeeded
     503    __make_select_node_available( node );
     504}
     505
     506static inline bool register_select( chan_write(T) & this, select_node & node ) with(*this.chan, this) {
     507    lock( mutex_lock );
     508    node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
     509
     510    #ifdef CHAN_STATS
     511    if ( !closed ) p_ops++;
     512    #endif
     513
     514    // special OR case handling
     515    if ( !node.park_counter ) {
     516        // are we special case OR and front of cons is also special case OR
     517        if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {
     518            if ( !__make_select_node_pending( node ) ) {
     519                unlock( mutex_lock );
     520                return false;
     521            }
     522
     523            if ( __handle_pending( cons, node ) ) {
     524                __cons_handoff( *chan, elem );
     525                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
     526                unlock( mutex_lock );
     527                return true;
     528            }
     529            if ( *node.clause_status == __SELECT_PENDING )
     530                __make_select_node_unsat( node );
     531        }
     532        // check if we can complete operation. If so race to establish winner in special OR case
     533        if ( count != size || !cons`isEmpty || unlikely(closed) ) {
     534            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     535                unlock( mutex_lock );
     536                return false;
     537            }
     538        }
     539    }
     540
     541    // if closed handle
     542    if ( unlikely(closed) ) {
     543        unlock( mutex_lock );
     544        __handle_select_closed_write( this, node );
     545        return true;
     546    }
     547
     548    // handle blocked consumer case via handoff (buffer is implicitly empty)
     549    ConsEmpty: if ( !cons`isEmpty ) {
     550        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
     551        __cons_handoff( *chan, elem );
     552        __set_avail_then_unlock( node, mutex_lock );
     553        return true;
     554    }
     555
     556    // insert node in list if buffer is full, work will be completed by someone else
     557    if ( count == size ) {
     558        #ifdef CHAN_STATS
     559        p_blocks++;
     560        #endif
     561
     562        insert_last( prods, node );
     563        unlock( mutex_lock );
     564        return false;
     565    } // if
     566
     567    // otherwise carry out write either via normal insert
     568    __buf_insert( *chan, elem );
     569    __set_avail_then_unlock( node, mutex_lock );
     570    return true;
     571}
     572static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); }
     573
     574static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
     575    if ( unlikely(node.extra == 0p) ) {
     576        if ( !exception_in_flight() ) __closed_insert( *chan, elem ); // check if woken up due to closed channel
     577        else return false;
     578    }
     579    // This is only reachable if not closed or closed exception was handled
     580    return true;
     581}
     582
     583} // forall( T )
     584
     585
Note: See TracChangeset for help on using the changeset viewer.