Ignore:
Timestamp:
Apr 25, 2025, 7:39:09 AM (5 months ago)
Author:
Peter A. Buhr <pabuhr@…>
Branches:
master
Children:
65bd3c2
Parents:
b195498
Message:

change backquote call to regular call

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    rb195498 r6b33e89  
    5757
    5858forall( T ) {
    59 
    60 struct __attribute__((aligned(128))) channel {
    61     size_t size, front, back, count;
    62     T * buffer;
    63     dlist( select_node ) prods, cons; // lists of blocked threads
    64     go_mutex mutex_lock;              // MX lock
    65     bool closed;                      // indicates channel close/open
    66     #ifdef CHAN_STATS
    67     size_t p_blocks, p_ops, c_blocks, c_ops;      // counts total ops and ops resulting in a blocked thd
    68     #endif
    69 };
    70 static inline void ?{}( channel(T) & this, channel(T) this2 ) = void;
    71 static inline void ?=?( channel(T) & this, channel(T) this2 ) = void;
    72 
    73 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
    74     size = _size;
    75     front = back = count = 0;
    76     if ( size != 0 ) buffer = aalloc( size );
    77     prods{};
    78     cons{};
    79     mutex_lock{};
    80     closed = false;
    81     #ifdef CHAN_STATS
    82     p_blocks = 0;
    83     p_ops = 0;
    84     c_blocks = 0;
    85     c_ops = 0;
    86     #endif
    87 }
    88 
    89 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    90 static inline void ^?{}( channel(T) &c ) with(c) {
    91     #ifdef CHAN_STATS
    92     printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100);
    93     printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100);
    94     printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100);
    95     #endif
    96     verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || cons`isEmpty && prods`isEmpty,
    97         "Attempted to delete channel with waiting threads (Deadlock).\n" );
    98     if ( size != 0 ) delete( buffer );
    99 }
    100 static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }
    101 static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }
    102 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
    103 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
    104 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
    105 
    106 // closes the channel and notifies all blocked threads
    107 static inline void close( channel(T) & chan ) with(chan) {
    108     lock( mutex_lock );
    109     closed = true;
    110 
    111     // flush waiting consumers and producers
    112     while ( has_waiting_consumers( chan ) ) {
    113         if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
    114             break;  // if __handle_waituntil_OR returns false cons is empty so break
    115         cons`first.extra = 0p;
    116         wake_one( cons );
    117     }
    118     while ( has_waiting_producers( chan ) ) {
    119         if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
    120             break;  // if __handle_waituntil_OR returns false prods is empty so break
    121         prods`first.extra = 0p;
    122         wake_one( prods );
    123     }
    124     unlock(mutex_lock);
    125 }
    126 
    127 static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
    128 
    129 // used to hand an element to a blocked consumer and signal it
    130 static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
    131     memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
    132     wake_one( cons );
    133 }
    134 
    135 // used to hand an element to a blocked producer and signal it
    136 static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
    137     memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
    138     wake_one( prods );
    139 }
    140 
    141 static inline void flush( channel(T) & chan, T elem ) with(chan) {
    142     lock( mutex_lock );
    143     while ( count == 0 && !cons`isEmpty ) {
    144         __cons_handoff( chan, elem );
    145     }
    146     unlock( mutex_lock );
    147 }
    148 
    149 // handles buffer insert
    150 static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
    151     memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
    152     count += 1;
    153     back++;
    154     if ( back == size ) back = 0;
    155 }
    156 
    157 // needed to avoid an extra copy in closed case
    158 static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
    159     lock( mutex_lock );
    160     #ifdef CHAN_STATS
    161     p_ops++;
    162     #endif
    163 
    164     ConsEmpty: if ( !cons`isEmpty ) {
    165         if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
    166         __cons_handoff( chan, elem );
    167         unlock( mutex_lock );
    168         return true;
    169     }
    170 
    171     if ( count == size ) { unlock( mutex_lock ); return false; }
    172 
    173     __buf_insert( chan, elem );
    174     unlock( mutex_lock );
    175     return true;
    176 }
    177 
    178 // attempts a nonblocking insert
    179 // returns true if insert was successful, false otherwise
    180 static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
    181 
    182 // handles closed case of insert routine
    183 static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
    184     channel_closed except{ &channel_closed_vt, &elem, &chan };
    185     throwResume except; // throw closed resumption
    186     if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
    187 }
    188 
    189 static inline void insert( channel(T) & chan, T elem ) with(chan) {
    190     // check for close before acquire mx
    191     if ( unlikely(closed) ) {
    192         __closed_insert( chan, elem );
    193         return;
    194     }
    195 
    196     lock( mutex_lock );
    197 
    198     #ifdef CHAN_STATS
    199     if ( !closed ) p_ops++;
    200     #endif
    201 
    202     // if closed handle
    203     if ( unlikely(closed) ) {
    204         unlock( mutex_lock );
    205         __closed_insert( chan, elem );
    206         return;
    207     }
    208 
    209     // buffer count must be zero if cons are blocked (also handles zero-size case)
    210     ConsEmpty: if ( !cons`isEmpty ) {
    211         if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
    212         __cons_handoff( chan, elem );
    213         unlock( mutex_lock );
    214         return;
    215     }
    216 
    217     // wait if buffer is full, work will be completed by someone else
    218     if ( count == size ) {
    219         #ifdef CHAN_STATS
    220         p_blocks++;
    221         #endif
    222 
    223         // check for if woken due to close
    224         if ( unlikely( block( prods, &elem, mutex_lock ) ) )
    225             __closed_insert( chan, elem );
    226         return;
    227     } // if
    228 
    229     __buf_insert( chan, elem );
    230     unlock( mutex_lock );
    231 }
    232 
    233 // does the buffer remove and potentially does waiting producer work
    234 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
    235     memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
    236     count -= 1;
    237     front = (front + 1) % size;
    238     if (count == size - 1 && !prods`isEmpty ) {
    239         if ( !__handle_waituntil_OR( prods ) ) return;
    240         __buf_insert( chan, *(T *)prods`first.extra );  // do waiting producer work
    241         wake_one( prods );
    242     }
    243 }
    244 
    245 // needed to avoid an extra copy in closed case and single return val case
    246 static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
    247     lock( mutex_lock );
    248     #ifdef CHAN_STATS
    249     c_ops++;
    250     #endif
    251 
    252     ZeroSize: if ( size == 0 && !prods`isEmpty ) {
    253         if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
    254         __prods_handoff( chan, retval );
    255         unlock( mutex_lock );
    256         return true;
    257     }
    258 
    259     if ( count == 0 ) { unlock( mutex_lock ); return false; }
    260 
    261     __do_remove( chan, retval );
    262     unlock( mutex_lock );
    263     return true;
    264 }
    265 
    266 // attempts a nonblocking remove
    267 // returns [T, true] if insert was successful
    268 // returns [T, false] if insert was successful (T uninit)
    269 static inline [T, bool] try_remove( channel(T) & chan ) {
    270     T retval;
    271     bool success = __internal_try_remove( chan, retval );
    272     return [ retval, success ];
    273 }
    274 
    275 static inline T try_remove( channel(T) & chan ) {
    276     T retval;
    277     __internal_try_remove( chan, retval );
    278     return retval;
    279 }
    280 
    281 // handles closed case of insert routine
    282 static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
    283     channel_closed except{ &channel_closed_vt, 0p, &chan };
    284     throwResume except; // throw resumption
    285     if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
    286 }
    287 
    288 static inline T remove( channel(T) & chan ) with(chan) {
    289     T retval;
    290     if ( unlikely(closed) ) {
    291         __closed_remove( chan, retval );
    292         return retval;
    293     }
    294     lock( mutex_lock );
    295 
    296     #ifdef CHAN_STATS
    297     if ( !closed ) c_ops++;
    298     #endif
    299 
    300     if ( unlikely(closed) ) {
    301         unlock( mutex_lock );
    302         __closed_remove( chan, retval );
    303         return retval;
    304     }
    305 
    306     // have to check for the zero size channel case
    307     ZeroSize: if ( size == 0 && !prods`isEmpty ) {
    308         if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
    309         __prods_handoff( chan, retval );
    310         unlock( mutex_lock );
    311         return retval;
    312     }
    313 
    314     // wait if buffer is empty, work will be completed by someone else
    315     if ( count == 0 ) {
    316         #ifdef CHAN_STATS
    317         c_blocks++;
    318         #endif
    319         // check for if woken due to close
    320         if ( unlikely( block( cons, &retval, mutex_lock ) ) )
    321             __closed_remove( chan, retval );
    322         return retval;
    323     }
    324 
    325     // Remove from buffer
    326     __do_remove( chan, retval );
    327     unlock( mutex_lock );
    328     return retval;
    329 }
    330 static inline void remove( channel(T) & chan ) { T elem = (T)remove( chan ); }
    331 
    332 
    333 ///////////////////////////////////////////////////////////////////////////////////////////
    334 // The following is Go-style operator support for channels
    335 ///////////////////////////////////////////////////////////////////////////////////////////
    336 
    337 static inline void ?<<?( channel(T) & chan, T elem ) { insert( chan, elem ); }
    338 static inline void ?<<?( T & ret, channel(T) & chan ) { ret = remove( chan ); }
    339 
    340 ///////////////////////////////////////////////////////////////////////////////////////////
    341 // The following is support for waituntil (select) statements
    342 ///////////////////////////////////////////////////////////////////////////////////////////
    343 static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
    344     if ( !node`isListed && !node.park_counter ) return false; // handle special OR case
    345     lock( mutex_lock );
    346     if ( node`isListed ) { // op wasn't performed
    347         remove( node );
    348         unlock( mutex_lock );
    349         return false;
    350     }
    351     unlock( mutex_lock );
    352 
    353     // only return true when not special OR case and status is SAT
    354     return !node.park_counter ? false : *node.clause_status == __SELECT_SAT;
    355 }
    356 
    357 // special case of __handle_waituntil_OR, that does some work to avoid starvation/deadlock case
    358 static inline bool __handle_pending( dlist( select_node ) & queue, select_node & mine ) {
    359     while ( !queue`isEmpty ) {
    360         // if node not a special OR case or if we win the special OR case race break
    361         if ( !queue`first.clause_status || queue`first.park_counter || __pending_set_other( queue`first, mine, ((unsigned long int)(&(queue`first))) ) )
    362             return true;
     59         struct __attribute__((aligned(128))) channel {
     60                size_t size, front, back, count;
     61                T * buffer;
     62                dlist( select_node ) prods, cons;                               // lists of blocked threads
     63                go_mutex mutex_lock;                                                    // MX lock
     64                bool closed;                                                                    // indicates channel close/open
     65                #ifdef CHAN_STATS
     66                size_t p_blocks, p_ops, c_blocks, c_ops;                // counts total ops and ops resulting in a blocked thd
     67            #endif
     68        };
     69
     70         // type used by select statement to capture a chan read as the selected operation
     71         struct chan_read {
     72                 T * ret;
     73                 channel(T) * chan;
     74         };
     75         __CFA_SELECT_GET_TYPE( chan_read(T) );
     76
     77         // type used by select statement to capture a chan read as the selected operation that doesn't have a param to read to
     78         struct chan_read_no_ret {
     79                 T retval;
     80                 chan_read( T ) c_read;
     81         };
     82         __CFA_SELECT_GET_TYPE( chan_read_no_ret(T) );
     83
     84         // type used by select statement to capture a chan write as the selected operation
     85         struct chan_write {
     86                 T elem;
     87                 channel(T) * chan;
     88         };
     89         __CFA_SELECT_GET_TYPE( chan_write(T) );
     90} // distribution
     91
     92static inline forall( T ) {
     93        void ?{}( channel(T) & this, channel(T) this2 ) = void;
     94        void ?=?( channel(T) & this, channel(T) this2 ) = void;
     95
     96        void ?{}( channel(T) &c, size_t _size ) with(c) {
     97                size = _size;
     98                front = back = count = 0;
     99                if ( size != 0 ) buffer = aalloc( size );
     100                prods{};
     101                cons{};
     102                mutex_lock{};
     103                closed = false;
     104            #ifdef CHAN_STATS
     105                p_blocks = 0;
     106                p_ops = 0;
     107                c_blocks = 0;
     108                c_ops = 0;
     109            #endif
     110        }
     111
     112        void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
     113        void ^?{}( channel(T) &c ) with(c) {
     114            #ifdef CHAN_STATS
     115                printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100);
     116                printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100);
     117                printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100);
     118            #endif
     119                verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || isEmpty( cons ) && isEmpty( prods ),
     120                                 "Attempted to delete channel with waiting threads (Deadlock).\n" );
     121                if ( size != 0 ) delete( buffer );
     122        }
     123        size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }
     124        size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }
     125        bool has_waiters( channel(T) & chan ) with(chan) { return ! isEmpty( cons ) || ! isEmpty( prods ); }
     126        bool has_waiting_consumers( channel(T) & chan ) with(chan) { return ! isEmpty( cons ); }
     127        bool has_waiting_producers( channel(T) & chan ) with(chan) { return ! isEmpty( prods ); }
     128
     129        // closes the channel and notifies all blocked threads
     130        void close( channel(T) & chan ) with(chan) {
     131                lock( mutex_lock );
     132                closed = true;
     133
     134                // flush waiting consumers and producers
     135                while ( has_waiting_consumers( chan ) ) {
     136                        if( ! __handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
     137                                break;  // if __handle_waituntil_OR returns false cons is empty so break
     138                        first( cons ).extra = 0p;
     139                        wake_one( cons );
     140                }
     141                while ( has_waiting_producers( chan ) ) {
     142                        if( ! __handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
     143                                break;  // if __handle_waituntil_OR returns false prods is empty so break
     144                        first( prods ).extra = 0p;
     145                        wake_one( prods );
     146                }
     147                unlock(mutex_lock);
     148        }
     149
     150        void is_closed( channel(T) & chan ) with(chan) { return closed; }
     151
     152        // used to hand an element to a blocked consumer and signal it
     153        void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
     154                memcpy( first( cons ).extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
     155                wake_one( cons );
     156        }
     157
     158        // used to hand an element to a blocked producer and signal it
     159        void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
     160                memcpy( (void *)&retval, first( prods ).extra, sizeof(T) );
     161                wake_one( prods );
     162        }
     163
     164        void flush( channel(T) & chan, T elem ) with(chan) {
     165                lock( mutex_lock );
     166                while ( count == 0 && ! isEmpty( cons ) ) {
     167                        __cons_handoff( chan, elem );
     168                }
     169                unlock( mutex_lock );
     170        }
     171
     172        // handles buffer insert
     173        void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
     174                memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
     175                count += 1;
     176                back++;
     177                if ( back == size ) back = 0;
     178        }
     179
     180        // needed to avoid an extra copy in closed case
     181        bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
     182                lock( mutex_lock );
     183            #ifdef CHAN_STATS
     184                p_ops++;
     185            #endif
     186
     187          ConsEmpty:
     188                if ( ! isEmpty( cons ) ) {
     189                        if ( ! __handle_waituntil_OR( cons ) ) break ConsEmpty;
     190                        __cons_handoff( chan, elem );
     191                        unlock( mutex_lock );
     192                        return true;
     193                }
     194
     195                if ( count == size ) { unlock( mutex_lock ); return false; }
     196
     197                __buf_insert( chan, elem );
     198                unlock( mutex_lock );
     199                return true;
     200        }
     201
     202        // attempts a nonblocking insert
     203        // returns true if insert was successful, false otherwise
     204        bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
     205
     206        // handles closed case of insert routine
     207        void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
     208                channel_closed except{ &channel_closed_vt, &elem, &chan };
     209                throwResume except; // throw closed resumption
     210                if ( ! __internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
     211        }
     212
     213        void insert( channel(T) & chan, T elem ) with(chan) {
     214                // check for close before acquire mx
     215                if ( unlikely(closed) ) {
     216                        __closed_insert( chan, elem );
     217                        return;
     218                }
     219
     220                lock( mutex_lock );
     221
     222            #ifdef CHAN_STATS
     223                if ( ! closed ) p_ops++;
     224            #endif
     225
     226                // if closed handle
     227                if ( unlikely(closed) ) {
     228                        unlock( mutex_lock );
     229                        __closed_insert( chan, elem );
     230                        return;
     231                }
     232
     233                // buffer count must be zero if cons are blocked (also handles zero-size case)
     234          ConsEmpty:
     235                if ( ! isEmpty( cons ) ) {
     236                        if ( ! __handle_waituntil_OR( cons ) ) break ConsEmpty;
     237                        __cons_handoff( chan, elem );
     238                        unlock( mutex_lock );
     239                        return;
     240                }
     241
     242                // wait if buffer is full, work will be completed by someone else
     243                if ( count == size ) {
     244                #ifdef CHAN_STATS
     245                        p_blocks++;
     246                #endif
     247
     248                        // check for if woken due to close
     249                        if ( unlikely( block( prods, &elem, mutex_lock ) ) )
     250                                __closed_insert( chan, elem );
     251                        return;
     252                } // if
     253
     254                __buf_insert( chan, elem );
     255                unlock( mutex_lock );
     256        }
     257
     258        // does the buffer remove and potentially does waiting producer work
     259        void __do_remove( channel(T) & chan, T & retval ) with(chan) {
     260                memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
     261                count -= 1;
     262                front = (front + 1) % size;
     263                if (count == size - 1 && ! isEmpty( prods ) ) {
     264                        if ( ! __handle_waituntil_OR( prods ) ) return;
     265                        __buf_insert( chan, *(T *)first( prods ).extra );  // do waiting producer work
     266                        wake_one( prods );
     267                }
     268        }
     269
     270        // needed to avoid an extra copy in closed case and single return val case
     271        bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
     272                lock( mutex_lock );
     273            #ifdef CHAN_STATS
     274                c_ops++;
     275            #endif
     276
     277          ZeroSize:
     278                if ( size == 0 && ! isEmpty( prods ) ) {
     279                        if ( ! __handle_waituntil_OR( prods ) ) break ZeroSize;
     280                        __prods_handoff( chan, retval );
     281                        unlock( mutex_lock );
     282                        return true;
     283                }
     284
     285                if ( count == 0 ) { unlock( mutex_lock ); return false; }
     286
     287                __do_remove( chan, retval );
     288                unlock( mutex_lock );
     289                return true;
     290        }
     291
     292        // attempts a nonblocking remove
     293        // returns [T, true] if insert was successful
     294        // returns [T, false] if insert was successful (T uninit)
     295        [T, bool] try_remove( channel(T) & chan ) {
     296                T retval;
     297                bool success = __internal_try_remove( chan, retval );
     298                return [ retval, success ];
     299        }
     300
     301        T try_remove( channel(T) & chan ) {
     302                T retval;
     303                __internal_try_remove( chan, retval );
     304                return retval;
     305        }
     306
     307        // handles closed case of insert routine
     308        void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
     309                channel_closed except{ &channel_closed_vt, 0p, &chan };
     310                throwResume except; // throw resumption
     311                if ( ! __internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
     312        }
     313
     314        T remove( channel(T) & chan ) with(chan) {
     315                T retval;
     316                if ( unlikely(closed) ) {
     317                        __closed_remove( chan, retval );
     318                        return retval;
     319                }
     320                lock( mutex_lock );
     321
     322            #ifdef CHAN_STATS
     323                if ( ! closed ) c_ops++;
     324                #endif
     325
     326                if ( unlikely(closed) ) {
     327                        unlock( mutex_lock );
     328                        __closed_remove( chan, retval );
     329                        return retval;
     330                }
     331
     332                // have to check for the zero size channel case
     333          ZeroSize:
     334                if ( size == 0 && ! isEmpty( prods ) ) {
     335                        if ( ! __handle_waituntil_OR( prods ) ) break ZeroSize;
     336                        __prods_handoff( chan, retval );
     337                        unlock( mutex_lock );
     338                        return retval;
     339                }
     340
     341                // wait if buffer is empty, work will be completed by someone else
     342                if ( count == 0 ) {
     343                #ifdef CHAN_STATS
     344                        c_blocks++;
     345                #endif
     346                        // check for if woken due to close
     347                        if ( unlikely( block( cons, &retval, mutex_lock ) ) )
     348                                __closed_remove( chan, retval );
     349                        return retval;
     350                }
     351
     352                // Remove from buffer
     353                __do_remove( chan, retval );
     354                unlock( mutex_lock );
     355                return retval;
     356        }
     357        void remove( channel(T) & chan ) { T elem = (T)remove( chan ); }
     358
     359
     360        ///////////////////////////////////////////////////////////////////////////////////////////
     361        // The following is Go-style operator support for channels
     362        ///////////////////////////////////////////////////////////////////////////////////////////
     363
     364        void ?<<?( channel(T) & chan, T elem ) { insert( chan, elem ); }
     365        void ?<<?( T & ret, channel(T) & chan ) { ret = remove( chan ); }
     366
     367        ///////////////////////////////////////////////////////////////////////////////////////////
     368        // The following is support for waituntil (select) statements
     369        ///////////////////////////////////////////////////////////////////////////////////////////
     370        bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
     371            if ( ! isListed( node ) && ! node.park_counter ) return false; // handle special OR case
     372            lock( mutex_lock );
     373            if ( isListed( node ) ) { // op wasn't performed
     374                remove( node );
     375                unlock( mutex_lock );
     376                return false;
     377            }
     378            unlock( mutex_lock );
     379
     380            // only return true when not special OR case and status is SAT
     381            return ! node.park_counter ? false : *node.clause_status == __SELECT_SAT;
     382        }
     383
     384        // special case of __handle_waituntil_OR, that does some work to avoid starvation/deadlock case
     385        bool __handle_pending( dlist( select_node ) & queue, select_node & mine ) {
     386            while ( ! isEmpty( queue ) ) {
     387                // if node not a special OR case or if we win the special OR case race break
     388                if ( ! first( queue ).clause_status || first( queue ).park_counter || __pending_set_other( first( queue ), mine, ((unsigned long int)(&(first( queue )))) ) )
     389                    return true;
    363390       
    364         // our node lost the race when toggling in __pending_set_other
    365         if ( *mine.clause_status != __SELECT_PENDING )
    366             return false;
    367 
    368         // otherwise we lost the special OR race so discard node
    369         try_pop_front( queue );
    370     }
    371     return false;
    372 }
    373 
    374 // type used by select statement to capture a chan read as the selected operation
    375 struct chan_read {
    376     T * ret;
    377     channel(T) * chan;
    378 };
    379 __CFA_SELECT_GET_TYPE( chan_read(T) );
    380 
    381 static inline void ?{}( chan_read(T) & cr, channel(T) * chan, T * ret ) {
    382     cr.chan = chan;
    383     cr.ret = ret;
    384 }
    385 static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ &chan, &ret }; return cr; }
    386 
    387 static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(*this.chan, this) {
    388     __closed_remove( *chan, *ret );
    389     // if we get here then the insert succeeded
    390     __make_select_node_available( node );
    391 }
    392 
    393 static inline bool register_select( chan_read(T) & this, select_node & node ) with(*this.chan, this) {
    394     lock( mutex_lock );
    395     node.extra = ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
    396 
    397     #ifdef CHAN_STATS
    398     if ( !closed ) c_ops++;
    399     #endif
    400 
    401     if ( !node.park_counter ) {
    402         // are we special case OR and front of cons is also special case OR
    403         if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {
    404             if ( !__make_select_node_pending( node ) ) {
    405                 unlock( mutex_lock );
    406                 return false;
    407             }
    408 
    409             if ( __handle_pending( prods, node ) ) {
    410                 __prods_handoff( *chan, *ret );
    411                 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
    412                 unlock( mutex_lock );
    413                 return true;
    414             }
    415             if ( *node.clause_status == __SELECT_PENDING )
    416                 __make_select_node_unsat( node );
    417         }
    418         // check if we can complete operation. If so race to establish winner in special OR case
    419         if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {
    420             if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
    421                 unlock( mutex_lock );
    422                 return false;
    423             }
    424         }
    425     }
    426 
    427     if ( unlikely(closed) ) {
    428         unlock( mutex_lock );
    429         __handle_select_closed_read( this, node );
    430         return true;
    431     }
    432 
    433     // have to check for the zero size channel case
    434     ZeroSize: if ( size == 0 && !prods`isEmpty ) {
    435         if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
    436         __prods_handoff( *chan, *ret );
    437         __set_avail_then_unlock( node, mutex_lock );
    438         return true;
    439     }
    440 
    441     // wait if buffer is empty, work will be completed by someone else
    442     if ( count == 0 ) {
    443         #ifdef CHAN_STATS
    444         c_blocks++;
    445         #endif
     391                // our node lost the race when toggling in __pending_set_other
     392                if ( *mine.clause_status != __SELECT_PENDING )
     393                    return false;
     394
     395                // otherwise we lost the special OR race so discard node
     396                remove_first( queue );
     397            }
     398            return false;
     399        }
     400
     401        void ?{}( chan_read(T) & cr, channel(T) * chan, T * ret ) {
     402            cr.chan = chan;
     403            cr.ret = ret;
     404        }
     405        chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ &chan, &ret }; return cr; }
     406
     407                void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(*this.chan, this) {
     408            __closed_remove( *chan, *ret );
     409            // if we get here then the insert succeeded
     410            __make_select_node_available( node );
     411        }
     412
     413        bool register_select( chan_read(T) & this, select_node & node ) with(*this.chan, this) {
     414            lock( mutex_lock );
     415            node.extra = ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
     416
     417            #ifdef CHAN_STATS
     418            if ( ! closed ) c_ops++;
     419            #endif
     420
     421            if ( ! node.park_counter ) {
     422                // are we special case OR and front of cons is also special case OR
     423                if ( ! unlikely(closed) && ! isEmpty( prods ) && first( prods ).clause_status && ! first( prods ).park_counter ) {
     424                    if ( ! __make_select_node_pending( node ) ) {
     425                        unlock( mutex_lock );
     426                        return false;
     427                    }
     428
     429                            if ( __handle_pending( prods, node ) ) {
     430                        __prods_handoff( *chan, *ret );
     431                        __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
     432                        unlock( mutex_lock );
     433                        return true;
     434                    }
     435                    if ( *node.clause_status == __SELECT_PENDING )
     436                        __make_select_node_unsat( node );
     437                }
     438                // check if we can complete operation. If so race to establish winner in special OR case
     439                if ( count != 0 || ! isEmpty( prods ) || unlikely(closed) ) {
     440                    if ( ! __make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     441                        unlock( mutex_lock );
     442                        return false;
     443                    }
     444                }
     445            }
     446
     447            if ( unlikely(closed) ) {
     448                unlock( mutex_lock );
     449                __handle_select_closed_read( this, node );
     450                return true;
     451            }
     452
     453            // have to check for the zero size channel case
     454            ZeroSize:
     455                if ( size == 0 && ! isEmpty( prods ) ) {
     456                        if ( ! __handle_waituntil_OR( prods ) ) break ZeroSize;
     457                        __prods_handoff( *chan, *ret );
     458                        __set_avail_then_unlock( node, mutex_lock );
     459                        return true;
     460                }
     461
     462                // wait if buffer is empty, work will be completed by someone else
     463                if ( count == 0 ) {
     464                #ifdef CHAN_STATS
     465                c_blocks++;
     466                #endif
    446467       
    447         insert_last( cons, node );
    448         unlock( mutex_lock );
    449         return false;
    450     }
    451 
    452     // Remove from buffer
    453     __do_remove( *chan, *ret );
    454     __set_avail_then_unlock( node, mutex_lock );
    455     return true;
    456 }
    457 static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); }
    458 static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
    459     if ( unlikely(node.extra == 0p) ) {
    460         if ( !exception_in_flight() ) __closed_remove( *chan, *ret ); // check if woken up due to closed channel
    461         else return false;
    462     }
    463     // This is only reachable if not closed or closed exception was handled
    464     return true;
    465 }
    466 
    467 // type used by select statement to capture a chan read as the selected operation that doesn't have a param to read to
    468 struct chan_read_no_ret {
    469     T retval;
    470     chan_read( T ) c_read;
    471 };
    472 __CFA_SELECT_GET_TYPE( chan_read_no_ret(T) );
    473 
    474 static inline void ?{}( chan_read_no_ret(T) & this, channel(T) & chan ) {
    475     this.c_read{ &chan, &this.retval };
    476 }
    477 
    478 static inline chan_read_no_ret(T) remove( channel(T) & chan ) { chan_read_no_ret(T) c_read{ chan }; return c_read; }
    479 static inline bool register_select( chan_read_no_ret(T) & this, select_node & node ) {
    480     this.c_read.ret = &this.retval;
    481     return register_select( this.c_read, node );
    482 }
    483 static inline bool unregister_select( chan_read_no_ret(T) & this, select_node & node ) { return unregister_select( this.c_read, node ); }
    484 static inline bool on_selected( chan_read_no_ret(T) & this, select_node & node ) { return on_selected( this.c_read, node ); }
    485 
    486 // type used by select statement to capture a chan write as the selected operation
    487 struct chan_write {
    488     T elem;
    489     channel(T) * chan;
    490 };
    491 __CFA_SELECT_GET_TYPE( chan_write(T) );
    492 
    493 static inline void ?{}( chan_write(T) & cw, channel(T) * chan, T elem ) {
    494     cw.chan = chan;
    495     memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
    496 }
    497 static inline chan_write(T) ?<<?( channel(T) & chan, T elem ) { chan_write(T) cw{ &chan, elem }; return cw; }
    498 static inline chan_write(T) insert( T elem, channel(T) & chan) { chan_write(T) cw{ &chan, elem }; return cw; }
    499 
    500 static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(*this.chan, this) {
    501     __closed_insert( *chan, elem );
    502     // if we get here then the insert succeeded
    503     __make_select_node_available( node );
    504 }
    505 
    506 static inline bool register_select( chan_write(T) & this, select_node & node ) with(*this.chan, this) {
    507     lock( mutex_lock );
    508     node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
    509 
    510     #ifdef CHAN_STATS
    511     if ( !closed ) p_ops++;
    512     #endif
    513 
    514     // special OR case handling
    515     if ( !node.park_counter ) {
    516         // are we special case OR and front of cons is also special case OR
    517         if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {
    518             if ( !__make_select_node_pending( node ) ) {
    519                 unlock( mutex_lock );
    520                 return false;
    521             }
    522 
    523             if ( __handle_pending( cons, node ) ) {
    524                 __cons_handoff( *chan, elem );
    525                 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
    526                 unlock( mutex_lock );
    527                 return true;
    528             }
    529             if ( *node.clause_status == __SELECT_PENDING )
    530                 __make_select_node_unsat( node );
    531         }
    532         // check if we can complete operation. If so race to establish winner in special OR case
    533         if ( count != size || !cons`isEmpty || unlikely(closed) ) {
    534             if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
    535                 unlock( mutex_lock );
    536                 return false;
    537             }
    538         }
    539     }
    540 
    541     // if closed handle
    542     if ( unlikely(closed) ) {
    543         unlock( mutex_lock );
    544         __handle_select_closed_write( this, node );
    545         return true;
    546     }
    547 
    548     // handle blocked consumer case via handoff (buffer is implicitly empty)
    549     ConsEmpty: if ( !cons`isEmpty ) {
    550         if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
    551         __cons_handoff( *chan, elem );
    552         __set_avail_then_unlock( node, mutex_lock );
    553         return true;
    554     }
    555 
    556     // insert node in list if buffer is full, work will be completed by someone else
    557     if ( count == size ) {
    558         #ifdef CHAN_STATS
    559         p_blocks++;
    560         #endif
    561 
    562         insert_last( prods, node );
    563         unlock( mutex_lock );
    564         return false;
    565     } // if
    566 
    567     // otherwise carry out write either via normal insert
    568     __buf_insert( *chan, elem );
    569     __set_avail_then_unlock( node, mutex_lock );
    570     return true;
    571 }
    572 static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); }
    573 
    574 static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
    575     if ( unlikely(node.extra == 0p) ) {
    576         if ( !exception_in_flight() ) __closed_insert( *chan, elem ); // check if woken up due to closed channel
    577         else return false;
    578     }
    579     // This is only reachable if not closed or closed exception was handled
    580     return true;
    581 }
    582 
    583 } // forall( T )
    584 
    585 
     468                insert_last( cons, node );
     469                unlock( mutex_lock );
     470                return false;
     471            }
     472
     473            // Remove from buffer
     474            __do_remove( *chan, *ret );
     475            __set_avail_then_unlock( node, mutex_lock );
     476            return true;
     477        }
     478        bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); }
     479        bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
     480            if ( unlikely(node.extra == 0p) ) {
     481                if ( ! exception_in_flight() ) __closed_remove( *chan, *ret ); // check if woken up due to closed channel
     482                else return false;
     483            }
     484            // This is only reachable if not closed or closed exception was handled
     485            return true;
     486        }
     487
     488        void ?{}( chan_read_no_ret(T) & this, channel(T) & chan ) {
     489            this.c_read{ &chan, &this.retval };
     490        }
     491
     492        chan_read_no_ret(T) remove( channel(T) & chan ) { chan_read_no_ret(T) c_read{ chan }; return c_read; }
     493        bool register_select( chan_read_no_ret(T) & this, select_node & node ) {
     494            this.c_read.ret = &this.retval;
     495            return register_select( this.c_read, node );
     496        }
     497        bool unregister_select( chan_read_no_ret(T) & this, select_node & node ) { return unregister_select( this.c_read, node ); }
     498        bool on_selected( chan_read_no_ret(T) & this, select_node & node ) { return on_selected( this.c_read, node ); }
     499
     500        void ?{}( chan_write(T) & cw, channel(T) * chan, T elem ) {
     501            cw.chan = chan;
     502            memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
     503        }
     504        chan_write(T) ?<<?( channel(T) & chan, T elem ) { chan_write(T) cw{ &chan, elem }; return cw; }
     505        chan_write(T) insert( T elem, channel(T) & chan) { chan_write(T) cw{ &chan, elem }; return cw; }
     506
     507        void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(*this.chan, this) {
     508            __closed_insert( *chan, elem );
     509            // if we get here then the insert succeeded
     510            __make_select_node_available( node );
     511        }
     512
     513        bool register_select( chan_write(T) & this, select_node & node ) with(*this.chan, this) {
     514            lock( mutex_lock );
     515            node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
     516       
     517            #ifdef CHAN_STATS
     518            if ( ! closed ) p_ops++;
     519            #endif
     520
     521            // special OR case handling
     522            if ( ! node.park_counter ) {
     523                // are we special case OR and front of cons is also special case OR
     524                if ( ! unlikely(closed) && ! isEmpty( cons ) && first( cons ).clause_status && ! first( cons ).park_counter ) {
     525                    if ( ! __make_select_node_pending( node ) ) {
     526                        unlock( mutex_lock );
     527                        return false;
     528                    }
     529                    if ( __handle_pending( cons, node ) ) {
     530                                        __cons_handoff( *chan, elem );
     531                                        __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
     532                                        unlock( mutex_lock );
     533                                        return true;
     534                                }
     535                                if ( *node.clause_status == __SELECT_PENDING )
     536                                        __make_select_node_unsat( node );
     537                        }
     538                        // check if we can complete operation. If so race to establish winner in special OR case
     539                        if ( count != size || ! isEmpty( cons ) || unlikely(closed) ) {
     540                                if ( ! __make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     541                                        unlock( mutex_lock );
     542                                        return false;
     543                                }
     544                        }
     545                }
     546
     547                // if closed handle
     548                if ( unlikely(closed) ) {
     549                        unlock( mutex_lock );
     550                        __handle_select_closed_write( this, node );
     551                        return true;
     552                }
     553
     554                // handle blocked consumer case via handoff (buffer is implicitly empty)
     555    ConsEmpty:
     556                if ( ! isEmpty( cons ) ) {
     557                        if ( ! __handle_waituntil_OR( cons ) ) break ConsEmpty;
     558                        __cons_handoff( *chan, elem );
     559                        __set_avail_then_unlock( node, mutex_lock );
     560                        return true;
     561                }
     562
     563                // insert node in list if buffer is full, work will be completed by someone else
     564                if ( count == size ) {
     565                #ifdef CHAN_STATS
     566                        p_blocks++;
     567                #endif
     568
     569                        insert_last( prods, node );
     570                        unlock( mutex_lock );
     571                        return false;
     572                } // if
     573
     574                // otherwise carry out write either via normal insert
     575                __buf_insert( *chan, elem );
     576                __set_avail_then_unlock( node, mutex_lock );
     577                return true;
     578        }
     579        bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); }
     580
     581        bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
     582                if ( unlikely(node.extra == 0p) ) {
     583                        if ( ! exception_in_flight() ) __closed_insert( *chan, elem ); // check if woken up due to closed channel
     584                        else return false;
     585                }
     586                // This is only reachable if not closed or closed exception was handled
     587                return true;
     588        }
     589} // distribution
     590
     591
Note: See TracChangeset for help on using the changeset viewer.