source: libcfa/src/concurrency/channel.hfa @ fc0996a

ast-experimental
Last change on this file since fc0996a was 70a4ed5, checked in by caparsons <caparson@…>, 13 months ago

refactored to remove return val from on_selected

  • Property mode set to 100644
File size: 18.0 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// channel.hfa -- LIBCFATHREAD
8// Runtime locks that used with the runtime thread system.
9//
10// Author           : Colby Alexander Parsons
11// Created On       : Thu Jan 21 19:46:50 2022
12// Last Modified By :
13// Last Modified On :
14// Update Count     :
15//
16
17#pragma once
18
19#include <locks.hfa>
20#include <list.hfa>
21#include "select.hfa"
22
23// returns true if woken due to shutdown
24// blocks thread on list and releases passed lock
25static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
26    select_node sn{ active_thread(), elem_ptr };
27    insert_last( queue, sn );
28    unlock( lock );
29    park();
30    return sn.extra == 0p;
31}
32
33// Waituntil support (un)register_select helper routine
34// Sets select node avail if not special OR case and then unlocks
35static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
36    if ( node.park_counter ) __make_select_node_available( node );
37    unlock( mutex_lock );
38}
39
40// void * used for some fields since exceptions don't work with parametric polymorphism currently
41exception channel_closed {
42    // on failed insert elem is a ptr to the element attempting to be inserted
43    // on failed remove elem ptr is 0p
44    // on resumption of a failed insert this elem will be inserted
45    // so a user may modify it in the resumption handler
46    void * elem;
47
48    // pointer to chan that is closed
49    void * closed_chan;
50};
51vtable(channel_closed) channel_closed_vt;
52
53// #define CHAN_STATS // define this to get channel stats printed in dtor
54
55forall( T ) {
56
57struct __attribute__((aligned(128))) channel {
58    size_t size, front, back, count;
59    T * buffer;
60    dlist( select_node ) prods, cons; // lists of blocked threads
61    go_mutex mutex_lock;              // MX lock
62    bool closed;                      // indicates channel close/open
63    #ifdef CHAN_STATS
64    size_t p_blocks, p_ops, c_blocks, c_ops;      // counts total ops and ops resulting in a blocked thd
65    #endif
66};
67
68static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
69    size = _size;
70    front = back = count = 0;
71    if ( size != 0 ) buffer = aalloc( size );
72    prods{};
73    cons{};
74    mutex_lock{};
75    closed = false;
76    #ifdef CHAN_STATS
77    p_blocks = 0;
78    p_ops = 0;
79    c_blocks = 0;
80    c_ops = 0;
81    #endif
82}
83
84static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
85static inline void ^?{}( channel(T) &c ) with(c) {
86    #ifdef CHAN_STATS
87    printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100);
88    printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100);
89    printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100);
90    #endif
91    verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || cons`isEmpty && prods`isEmpty,
92        "Attempted to delete channel with waiting threads (Deadlock).\n" );
93    if ( size != 0 ) delete( buffer );
94}
95static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }
96static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }
97static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
98static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
99static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
100
101// closes the channel and notifies all blocked threads
102static inline void close( channel(T) & chan ) with(chan) {
103    lock( mutex_lock );
104    closed = true;
105
106    // flush waiting consumers and producers
107    while ( has_waiting_consumers( chan ) ) {
108        if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
109            break;  // if __handle_waituntil_OR returns false cons is empty so break
110        cons`first.extra = 0p;
111        wake_one( cons );
112    }
113    while ( has_waiting_producers( chan ) ) {
114        if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
115            break;  // if __handle_waituntil_OR returns false prods is empty so break
116        prods`first.extra = 0p;
117        wake_one( prods );
118    }
119    unlock(mutex_lock);
120}
121
122static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
123
124// used to hand an element to a blocked consumer and signal it
125static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
126    memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
127    wake_one( cons );
128}
129
130// used to hand an element to a blocked producer and signal it
131static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
132    memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
133    wake_one( prods );
134}
135
136static inline void flush( channel(T) & chan, T elem ) with(chan) {
137    lock( mutex_lock );
138    while ( count == 0 && !cons`isEmpty ) {
139        __cons_handoff( chan, elem );
140    }
141    unlock( mutex_lock );
142}
143
144// handles buffer insert
145static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
146    memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
147    count += 1;
148    back++;
149    if ( back == size ) back = 0;
150}
151
152// needed to avoid an extra copy in closed case
153static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
154    lock( mutex_lock );
155    #ifdef CHAN_STATS
156    p_ops++;
157    #endif
158
159    ConsEmpty: if ( !cons`isEmpty ) {
160        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
161        __cons_handoff( chan, elem );
162        unlock( mutex_lock );
163        return true;
164    }
165
166    if ( count == size ) { unlock( mutex_lock ); return false; }
167
168    __buf_insert( chan, elem );
169    unlock( mutex_lock );
170    return true;
171}
172
173// attempts a nonblocking insert
174// returns true if insert was successful, false otherwise
175static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
176
177// handles closed case of insert routine
178static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
179    channel_closed except{ &channel_closed_vt, &elem, &chan };
180    throwResume except; // throw closed resumption
181    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
182}
183
184static inline void insert( channel(T) & chan, T elem ) with(chan) {
185    // check for close before acquire mx
186    if ( unlikely(closed) ) {
187        __closed_insert( chan, elem );
188        return;
189    }
190
191    lock( mutex_lock );
192
193    #ifdef CHAN_STATS
194    if ( !closed ) p_ops++;
195    #endif
196
197    // if closed handle
198    if ( unlikely(closed) ) {
199        unlock( mutex_lock );
200        __closed_insert( chan, elem );
201        return;
202    }
203
204    // buffer count must be zero if cons are blocked (also handles zero-size case)
205    ConsEmpty: if ( !cons`isEmpty ) {
206        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
207        __cons_handoff( chan, elem );
208        unlock( mutex_lock );
209        return;
210    }
211
212    // wait if buffer is full, work will be completed by someone else
213    if ( count == size ) {
214        #ifdef CHAN_STATS
215        p_blocks++;
216        #endif
217
218        // check for if woken due to close
219        if ( unlikely( block( prods, &elem, mutex_lock ) ) )
220            __closed_insert( chan, elem );
221        return;
222    } // if
223
224    __buf_insert( chan, elem );
225    unlock( mutex_lock );
226}
227
228// does the buffer remove and potentially does waiting producer work
229static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
230    memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
231    count -= 1;
232    front = (front + 1) % size;
233    if (count == size - 1 && !prods`isEmpty ) {
234        if ( !__handle_waituntil_OR( prods ) ) return;
235        __buf_insert( chan, *(T *)prods`first.extra );  // do waiting producer work
236        wake_one( prods );
237    }
238}
239
240// needed to avoid an extra copy in closed case and single return val case
241static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
242    lock( mutex_lock );
243    #ifdef CHAN_STATS
244    c_ops++;
245    #endif
246
247    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
248        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
249        __prods_handoff( chan, retval );
250        unlock( mutex_lock );
251        return true;
252    }
253
254    if ( count == 0 ) { unlock( mutex_lock ); return false; }
255
256    __do_remove( chan, retval );
257    unlock( mutex_lock );
258    return true;
259}
260
261// attempts a nonblocking remove
262// returns [T, true] if insert was successful
263// returns [T, false] if insert was successful (T uninit)
264static inline [T, bool] try_remove( channel(T) & chan ) {
265    T retval;
266    bool success = __internal_try_remove( chan, retval );
267    return [ retval, success ];
268}
269
270static inline T try_remove( channel(T) & chan ) {
271    T retval;
272    __internal_try_remove( chan, retval );
273    return retval;
274}
275
276// handles closed case of insert routine
277static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
278    channel_closed except{ &channel_closed_vt, 0p, &chan };
279    throwResume except; // throw resumption
280    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
281}
282
283static inline T remove( channel(T) & chan ) with(chan) {
284    T retval;
285    if ( unlikely(closed) ) {
286        __closed_remove( chan, retval );
287        return retval;
288    }
289    lock( mutex_lock );
290
291    #ifdef CHAN_STATS
292    if ( !closed ) c_ops++;
293    #endif
294
295    if ( unlikely(closed) ) {
296        unlock( mutex_lock );
297        __closed_remove( chan, retval );
298        return retval;
299    }
300
301    // have to check for the zero size channel case
302    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
303        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
304        __prods_handoff( chan, retval );
305        unlock( mutex_lock );
306        return retval;
307    }
308
309    // wait if buffer is empty, work will be completed by someone else
310    if ( count == 0 ) {
311        #ifdef CHAN_STATS
312        c_blocks++;
313        #endif
314        // check for if woken due to close
315        if ( unlikely( block( cons, &retval, mutex_lock ) ) )
316            __closed_remove( chan, retval );
317        return retval;
318    }
319
320    // Remove from buffer
321    __do_remove( chan, retval );
322    unlock( mutex_lock );
323    return retval;
324}
325
326///////////////////////////////////////////////////////////////////////////////////////////
327// The following is support for waituntil (select) statements
328///////////////////////////////////////////////////////////////////////////////////////////
329static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
330    if ( !node`isListed && !node.park_counter ) return false; // handle special OR case
331    lock( mutex_lock );
332    if ( node`isListed ) { // op wasn't performed
333        remove( node );
334        unlock( mutex_lock );
335        return false;
336    }
337    unlock( mutex_lock );
338
339    // only return true when not special OR case, not exceptional calse and status is SAT
340    return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT;
341}
342
343// type used by select statement to capture a chan read as the selected operation
344struct chan_read {
345    T & ret;
346    channel(T) & chan;
347};
348
349static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) {
350    &cr.chan = &chan;
351    &cr.ret = &ret;
352}
353static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; }
354
355static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) {
356    __closed_remove( chan, ret );
357    // if we get here then the insert succeeded
358    __make_select_node_available( node );
359}
360
361static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) {
362    lock( mutex_lock );
363    node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
364
365    #ifdef CHAN_STATS
366    if ( !closed ) c_ops++;
367    #endif
368
369    if ( !node.park_counter ) {
370        // are we special case OR and front of cons is also special case OR
371        if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {
372            if ( !__make_select_node_pending( node ) ) {
373                unlock( mutex_lock );
374                return false;
375            }
376           
377            if ( __handle_waituntil_OR( prods ) ) {
378                __prods_handoff( chan, ret );
379                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
380                unlock( mutex_lock );
381                return true;
382            }
383            __make_select_node_unsat( node );
384        }
385        // check if we can complete operation. If so race to establish winner in special OR case
386        if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {
387            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
388                unlock( mutex_lock );
389                return false;
390            }
391        }
392    }
393
394    if ( unlikely(closed) ) {
395        unlock( mutex_lock );
396        __handle_select_closed_read( this, node );
397        return true;
398    }
399
400    // have to check for the zero size channel case
401    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
402        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
403        __prods_handoff( chan, ret );
404        __set_avail_then_unlock( node, mutex_lock );
405        return true;
406    }
407
408    // wait if buffer is empty, work will be completed by someone else
409    if ( count == 0 ) {
410        #ifdef CHAN_STATS
411        c_blocks++;
412        #endif
413       
414        insert_last( cons, node );
415        unlock( mutex_lock );
416        return false;
417    }
418
419    // Remove from buffer
420    __do_remove( chan, ret );
421    __set_avail_then_unlock( node, mutex_lock );
422    return true;
423}
424static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
425static inline void on_selected( chan_read(T) & this, select_node & node ) with(this) {
426    if ( node.extra == 0p ) // check if woken up due to closed channel
427        __closed_remove( chan, ret );
428    // This is only reachable if not closed or closed exception was handled
429}
430
431// type used by select statement to capture a chan write as the selected operation
432struct chan_write {
433    T elem;
434    channel(T) & chan;
435};
436
437static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) {
438    &cw.chan = &chan;
439    memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
440}
441static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; }
442
443static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) {
444    __closed_insert( chan, elem );
445    // if we get here then the insert succeeded
446    __make_select_node_available( node );
447}
448
449static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) {
450    lock( mutex_lock );
451    node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
452
453    #ifdef CHAN_STATS
454    if ( !closed ) p_ops++;
455    #endif
456
457    // special OR case handling
458    if ( !node.park_counter ) {
459        // are we special case OR and front of cons is also special case OR
460        if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {
461            if ( !__make_select_node_pending( node ) ) {
462                unlock( mutex_lock );
463                return false;
464            }
465           
466            if ( __handle_waituntil_OR( cons ) ) {
467                __cons_handoff( chan, elem );
468                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
469                unlock( mutex_lock );
470                return true;
471            }
472            __make_select_node_unsat( node );
473        }
474        // check if we can complete operation. If so race to establish winner in special OR case
475        if ( count != size || !cons`isEmpty || unlikely(closed) ) {
476            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
477                unlock( mutex_lock );
478                return false;
479            }
480        }
481    }
482
483    // if closed handle
484    if ( unlikely(closed) ) {
485        unlock( mutex_lock );
486        __handle_select_closed_write( this, node );
487        return true;
488    }
489
490    // handle blocked consumer case via handoff (buffer is implicitly empty)
491    ConsEmpty: if ( !cons`isEmpty ) {
492        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
493        __cons_handoff( chan, elem );
494        __set_avail_then_unlock( node, mutex_lock );
495        return true;
496    }
497
498    // insert node in list if buffer is full, work will be completed by someone else
499    if ( count == size ) {
500        #ifdef CHAN_STATS
501        p_blocks++;
502        #endif
503
504        insert_last( prods, node );
505        unlock( mutex_lock );
506        return false;
507    } // if
508
509    // otherwise carry out write either via normal insert
510    __buf_insert( chan, elem );
511    __set_avail_then_unlock( node, mutex_lock );
512    return true;
513}
514static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
515
516static inline void on_selected( chan_write(T) & this, select_node & node ) with(this) {
517    if ( node.extra == 0p ) // check if woken up due to closed channel
518        __closed_insert( chan, elem );
519
520    // This is only reachable if not closed or closed exception was handled
521}
522
523} // forall( T )
524
525
Note: See TracBrowser for help on using the repository browser.