source: libcfa/src/concurrency/channel.hfa @ c5a2c96

ADTast-experimental
Last change on this file since c5a2c96 was c5a2c96, checked in by caparsons <caparson@…>, 12 months ago

added atomic to eliminate if failing test deadlock is a staleness issue

  • Property mode set to 100644
File size: 16.9 KB
Line 
1#pragma once
2
3#include <locks.hfa>
4#include <list.hfa>
5#include <mutex_stmt.hfa>
6#include "select.hfa"
7
8// returns true if woken due to shutdown
9// blocks thread on list and releases passed lock
10static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
11    select_node sn{ active_thread(), elem_ptr };
12    insert_last( queue, sn );
13    unlock( lock );
14    park();
15    return sn.extra == 0p;
16}
17
18// Waituntil support (un)register_select helper routine
19// Sets select node avail if not special OR case and then unlocks
20static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
21    if ( node.park_counter ) __make_select_node_available( node );
22    unlock( mutex_lock );
23}
24
25// void * used for some fields since exceptions don't work with parametric polymorphism currently
26exception channel_closed {
27    // on failed insert elem is a ptr to the element attempting to be inserted
28    // on failed remove elem ptr is 0p
29    // on resumption of a failed insert this elem will be inserted
30    // so a user may modify it in the resumption handler
31    void * elem;
32
33    // pointer to chan that is closed
34    void * closed_chan;
35};
36vtable(channel_closed) channel_closed_vt;
37
38// #define CHAN_STATS // define this to get channel stats printed in dtor
39
40forall( T ) {
41
42struct __attribute__((aligned(128))) channel {
43    size_t size, front, back, count;
44    T * buffer;
45    dlist( select_node ) prods, cons; // lists of blocked threads
46    go_mutex mutex_lock;            // MX lock
47    bool closed;                    // indicates channel close/open
48    #ifdef CHAN_STATS
49    size_t blocks, operations;      // counts total ops and ops resulting in a blocked thd
50    #endif
51};
52
53static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
54    size = _size;
55    front = back = count = 0;
56    if ( size != 0 ) buffer = aalloc( size );
57    prods{};
58    cons{};
59    mutex_lock{};
60    closed = false;
61    #ifdef CHAN_STATS
62    blocks = 0;
63    operations = 0;
64    #endif
65}
66
67static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
68static inline void ^?{}( channel(T) &c ) with(c) {
69    #ifdef CHAN_STATS
70    printf("Channel %p Blocks: %lu, Operations: %lu, %.2f%% of ops blocked\n", &c, blocks, operations, ((double)blocks)/operations * 100);
71    #endif
72    verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
73    if ( size != 0 ) delete( buffer );
74}
75static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_SEQ_CST); }
76static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
77static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
78static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
79static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
80
81// closes the channel and notifies all blocked threads
82static inline void close( channel(T) & chan ) with(chan) {
83    lock( mutex_lock );
84    closed = true;
85
86    // flush waiting consumers and producers
87    while ( has_waiting_consumers( chan ) ) {
88        if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
89            break;  // if __handle_waituntil_OR returns false cons is empty so break
90        cons`first.extra = 0p;
91        wake_one( cons );
92    }
93    while ( has_waiting_producers( chan ) ) {
94        if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
95            break;  // if __handle_waituntil_OR returns false prods is empty so break
96        prods`first.extra = 0p;
97        wake_one( prods );
98    }
99    unlock(mutex_lock);
100}
101
102static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
103
104// used to hand an element to a blocked consumer and signal it
105static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
106    memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
107    wake_one( cons );
108}
109
110// used to hand an element to a blocked producer and signal it
111static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
112    memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
113    wake_one( prods );
114}
115
116static inline void flush( channel(T) & chan, T elem ) with(chan) {
117    lock( mutex_lock );
118    while ( count == 0 && !cons`isEmpty ) {
119        __cons_handoff( chan, elem );
120    }
121    unlock( mutex_lock );
122}
123
124// handles buffer insert
125static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
126    memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
127    count += 1;
128    back++;
129    if ( back == size ) back = 0;
130}
131
132// needed to avoid an extra copy in closed case
133static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
134    lock( mutex_lock );
135    #ifdef CHAN_STATS
136    operations++;
137    #endif
138
139    ConsEmpty: if ( !cons`isEmpty ) {
140        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
141        __cons_handoff( chan, elem );
142        unlock( mutex_lock );
143        return true;
144    }
145
146    if ( count == size ) { unlock( mutex_lock ); return false; }
147
148    __buf_insert( chan, elem );
149    unlock( mutex_lock );
150    return true;
151}
152
153// attempts a nonblocking insert
154// returns true if insert was successful, false otherwise
155static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
156
157// handles closed case of insert routine
158static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
159    channel_closed except{ &channel_closed_vt, &elem, &chan };
160    throwResume except; // throw closed resumption
161    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
162}
163
164static inline void insert( channel(T) & chan, T elem ) with(chan) {
165    // check for close before acquire mx
166    if ( unlikely(closed) ) {
167        __closed_insert( chan, elem );
168        return;
169    }
170
171    lock( mutex_lock );
172
173    #ifdef CHAN_STATS
174    if ( !closed ) operations++;
175    #endif
176
177    // if closed handle
178    if ( unlikely(closed) ) {
179        unlock( mutex_lock );
180        __closed_insert( chan, elem );
181        return;
182    }
183
184    // buffer count must be zero if cons are blocked (also handles zero-size case)
185    ConsEmpty: if ( !cons`isEmpty ) {
186        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
187        __cons_handoff( chan, elem );
188        unlock( mutex_lock );
189        return;
190    }
191
192    // wait if buffer is full, work will be completed by someone else
193    if ( count == size ) {
194        #ifdef CHAN_STATS
195        blocks++;
196        #endif
197
198        // check for if woken due to close
199        if ( unlikely( block( prods, &elem, mutex_lock ) ) )
200            __closed_insert( chan, elem );
201        return;
202    } // if
203
204    __buf_insert( chan, elem );
205    unlock( mutex_lock );
206}
207
208// does the buffer remove and potentially does waiting producer work
209static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
210    memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
211    count -= 1;
212    front = (front + 1) % size;
213    if (count == size - 1 && !prods`isEmpty ) {
214        if ( !__handle_waituntil_OR( prods ) ) return;
215        __buf_insert( chan, *(T *)prods`first.extra );  // do waiting producer work
216        wake_one( prods );
217    }
218}
219
220// needed to avoid an extra copy in closed case and single return val case
221static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
222    lock( mutex_lock );
223    #ifdef CHAN_STATS
224    operations++;
225    #endif
226
227    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
228        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
229        __prods_handoff( chan, retval );
230        unlock( mutex_lock );
231        return true;
232    }
233
234    if ( count == 0 ) { unlock( mutex_lock ); return false; }
235
236    __do_remove( chan, retval );
237    unlock( mutex_lock );
238    return true;
239}
240
241// attempts a nonblocking remove
242// returns [T, true] if insert was successful
243// returns [T, false] if insert was successful (T uninit)
244static inline [T, bool] try_remove( channel(T) & chan ) {
245    T retval;
246    bool success = __internal_try_remove( chan, retval );
247    return [ retval, success ];
248}
249
250static inline T try_remove( channel(T) & chan ) {
251    T retval;
252    __internal_try_remove( chan, retval );
253    return retval;
254}
255
256// handles closed case of insert routine
257static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
258    channel_closed except{ &channel_closed_vt, 0p, &chan };
259    throwResume except; // throw resumption
260    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
261}
262
263static inline T remove( channel(T) & chan ) with(chan) {
264    T retval;
265    if ( unlikely(closed) ) {
266        __closed_remove( chan, retval );
267        return retval;
268    }
269    lock( mutex_lock );
270
271    #ifdef CHAN_STATS
272    if ( !closed ) operations++;
273    #endif
274
275    if ( unlikely(closed) ) {
276        unlock( mutex_lock );
277        __closed_remove( chan, retval );
278        return retval;
279    }
280
281    // have to check for the zero size channel case
282    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
283        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
284        __prods_handoff( chan, retval );
285        unlock( mutex_lock );
286        return retval;
287    }
288
289    // wait if buffer is empty, work will be completed by someone else
290    if ( count == 0 ) {
291        #ifdef CHAN_STATS
292        blocks++;
293        #endif
294        // check for if woken due to close
295        if ( unlikely( block( cons, &retval, mutex_lock ) ) )
296            __closed_remove( chan, retval );
297        return retval;
298    }
299
300    // Remove from buffer
301    __do_remove( chan, retval );
302    unlock( mutex_lock );
303    return retval;
304}
305
306///////////////////////////////////////////////////////////////////////////////////////////
307// The following is support for waituntil (select) statements
308///////////////////////////////////////////////////////////////////////////////////////////
309static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
310    // if ( !node`isListed && !node.park_counter ) return false; // handle special OR case
311    lock( mutex_lock );
312    if ( node`isListed ) { // op wasn't performed
313        #ifdef CHAN_STATS
314        operations--;
315        #endif
316        remove( node );
317        unlock( mutex_lock );
318        return false;
319    }
320    unlock( mutex_lock );
321
322    // only return true when not special OR case, not exceptional calse and status is SAT
323    return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT;
324}
325
326// type used by select statement to capture a chan read as the selected operation
327struct chan_read {
328    T & ret;
329    channel(T) & chan;
330};
331
332static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) {
333    &cr.chan = &chan;
334    &cr.ret = &ret;
335}
336static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; }
337
338static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) {
339    __closed_remove( chan, ret );
340    // if we get here then the insert succeeded
341    __make_select_node_available( node );
342}
343
344static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) {
345    lock( mutex_lock );
346    node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
347
348    #ifdef CHAN_STATS
349    if ( !closed ) operations++;
350    #endif
351
352    if ( !node.park_counter ) {
353        // are we special case OR and front of cons is also special case OR
354        if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {
355            if ( !__make_select_node_pending( node ) ) {
356                unlock( mutex_lock );
357                return false;
358            }
359           
360            if ( __handle_waituntil_OR( prods ) ) {
361                __prods_handoff( chan, ret );
362                unlock( mutex_lock );
363                return true;
364            }
365            __make_select_node_unsat( node );
366        }
367        // check if we can complete operation. If so race to establish winner in special OR case
368        if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {
369            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
370                unlock( mutex_lock );
371                return false;
372            }
373        }
374    }
375
376    if ( unlikely(closed) ) {
377        unlock( mutex_lock );
378        __handle_select_closed_read( this, node );
379        return true;
380    }
381
382    // have to check for the zero size channel case
383    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
384        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
385        __prods_handoff( chan, ret );
386        __set_avail_then_unlock( node, mutex_lock );
387        return true;
388    }
389
390    // wait if buffer is empty, work will be completed by someone else
391    if ( count == 0 ) {
392        #ifdef CHAN_STATS
393        blocks++;
394        #endif
395       
396        insert_last( cons, node );
397        unlock( mutex_lock );
398        return false;
399    }
400
401    // Remove from buffer
402    __do_remove( chan, ret );
403    __set_avail_then_unlock( node, mutex_lock );
404    return true;
405}
406static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
407static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
408    if ( node.extra == 0p ) // check if woken up due to closed channel
409        __closed_remove( chan, ret );
410    // This is only reachable if not closed or closed exception was handled
411    return true;
412}
413
414// type used by select statement to capture a chan write as the selected operation
415struct chan_write {
416    T elem;
417    channel(T) & chan;
418};
419
420static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) {
421    &cw.chan = &chan;
422    memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
423}
424static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; }
425
426static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) {
427    __closed_insert( chan, elem );
428    // if we get here then the insert succeeded
429    __make_select_node_available( node );
430}
431
432static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) {
433    lock( mutex_lock );
434    node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
435
436    #ifdef CHAN_STATS
437    if ( !closed ) operations++;
438    #endif
439
440    // special OR case handling
441    if ( !node.park_counter ) {
442        // are we special case OR and front of cons is also special case OR
443        if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {
444            if ( !__make_select_node_pending( node ) ) {
445                unlock( mutex_lock );
446                return false;
447            }
448           
449            if ( __handle_waituntil_OR( cons ) ) {
450                __cons_handoff( chan, elem );
451                unlock( mutex_lock );
452                return true;
453            }
454            __make_select_node_unsat( node );
455        }
456        // check if we can complete operation. If so race to establish winner in special OR case
457        if ( count != size || !cons`isEmpty || unlikely(closed) ) {
458            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
459                unlock( mutex_lock );
460                return false;
461            }
462        }
463    }
464
465    // if closed handle
466    if ( unlikely(closed) ) {
467        unlock( mutex_lock );
468        __handle_select_closed_write( this, node );
469        return true;
470    }
471
472    // handle blocked consumer case via handoff (buffer is implicitly empty)
473    ConsEmpty: if ( !cons`isEmpty ) {
474        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
475        __cons_handoff( chan, elem );
476        __set_avail_then_unlock( node, mutex_lock );
477        return true;
478    }
479
480    // insert node in list if buffer is full, work will be completed by someone else
481    if ( count == size ) {
482        #ifdef CHAN_STATS
483        blocks++;
484        #endif
485
486        insert_last( prods, node );
487        unlock( mutex_lock );
488        return false;
489    } // if
490
491    // otherwise carry out write either via normal insert
492    __buf_insert( chan, elem );
493    __set_avail_then_unlock( node, mutex_lock );
494    return true;
495}
496static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
497
498static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
499    if ( node.extra == 0p ) // check if woken up due to closed channel
500        __closed_insert( chan, elem );
501
502    // This is only reachable if not closed or closed exception was handled
503    return true;
504}
505
506
507} // forall( T )
508
509
510
Note: See TracBrowser for help on using the repository browser.