source: libcfa/src/concurrency/channel.hfa @ 0aef549

ast-experimental
Last change on this file since 0aef549 was 0aef549, checked in by caparsons <caparson@…>, 16 months ago

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

  • Property mode set to 100644
File size: 18.8 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// channel.hfa -- LIBCFATHREAD
8// Runtime locks that used with the runtime thread system.
9//
10// Author           : Colby Alexander Parsons
11// Created On       : Thu Jan 21 19:46:50 2022
12// Last Modified By :
13// Last Modified On :
14// Update Count     :
15//
16
17#pragma once
18
19#include <locks.hfa>
20#include <list.hfa>
21#include "select.hfa"
22
23// returns true if woken due to shutdown
24// blocks thread on list and releases passed lock
25static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
26    select_node sn{ active_thread(), elem_ptr };
27    insert_last( queue, sn );
28    unlock( lock );
29    park();
30    return sn.extra == 0p;
31}
32
33// Waituntil support (un)register_select helper routine
34// Sets select node avail if not special OR case and then unlocks
35static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
36    if ( node.park_counter ) __make_select_node_available( node );
37    unlock( mutex_lock );
38}
39
40// void * used for some fields since exceptions don't work with parametric polymorphism currently
41exception channel_closed {
42    // on failed insert elem is a ptr to the element attempting to be inserted
43    // on failed remove elem ptr is 0p
44    // on resumption of a failed insert this elem will be inserted
45    // so a user may modify it in the resumption handler
46    void * elem;
47
48    // pointer to chan that is closed
49    void * closed_chan;
50};
51vtable(channel_closed) channel_closed_vt;
52
53// #define CHAN_STATS // define this to get channel stats printed in dtor
54
55forall( T ) {
56
57struct __attribute__((aligned(128))) channel {
58    size_t size, front, back, count;
59    T * buffer;
60    dlist( select_node ) prods, cons; // lists of blocked threads
61    go_mutex mutex_lock;              // MX lock
62    bool closed;                      // indicates channel close/open
63    #ifdef CHAN_STATS
64    size_t p_blocks, p_ops, c_blocks, c_ops;      // counts total ops and ops resulting in a blocked thd
65    #endif
66};
67
68static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
69    size = _size;
70    front = back = count = 0;
71    if ( size != 0 ) buffer = aalloc( size );
72    prods{};
73    cons{};
74    mutex_lock{};
75    closed = false;
76    #ifdef CHAN_STATS
77    p_blocks = 0;
78    p_ops = 0;
79    c_blocks = 0;
80    c_ops = 0;
81    #endif
82}
83
84static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
85static inline void ^?{}( channel(T) &c ) with(c) {
86    #ifdef CHAN_STATS
87    printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100);
88    printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100);
89    printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100);
90    #endif
91    verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || cons`isEmpty && prods`isEmpty,
92        "Attempted to delete channel with waiting threads (Deadlock).\n" );
93    if ( size != 0 ) delete( buffer );
94}
95static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }
96static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }
97static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
98static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
99static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
100
101// closes the channel and notifies all blocked threads
102static inline void close( channel(T) & chan ) with(chan) {
103    lock( mutex_lock );
104    closed = true;
105
106    // flush waiting consumers and producers
107    while ( has_waiting_consumers( chan ) ) {
108        if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
109            break;  // if __handle_waituntil_OR returns false cons is empty so break
110        cons`first.extra = 0p;
111        wake_one( cons );
112    }
113    while ( has_waiting_producers( chan ) ) {
114        if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
115            break;  // if __handle_waituntil_OR returns false prods is empty so break
116        prods`first.extra = 0p;
117        wake_one( prods );
118    }
119    unlock(mutex_lock);
120}
121
122static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
123
124// used to hand an element to a blocked consumer and signal it
125static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
126    memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
127    wake_one( cons );
128}
129
130// used to hand an element to a blocked producer and signal it
131static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
132    memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
133    wake_one( prods );
134}
135
136static inline void flush( channel(T) & chan, T elem ) with(chan) {
137    lock( mutex_lock );
138    while ( count == 0 && !cons`isEmpty ) {
139        __cons_handoff( chan, elem );
140    }
141    unlock( mutex_lock );
142}
143
144// handles buffer insert
145static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
146    memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
147    count += 1;
148    back++;
149    if ( back == size ) back = 0;
150}
151
152// needed to avoid an extra copy in closed case
153static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
154    lock( mutex_lock );
155    #ifdef CHAN_STATS
156    p_ops++;
157    #endif
158
159    ConsEmpty: if ( !cons`isEmpty ) {
160        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
161        __cons_handoff( chan, elem );
162        unlock( mutex_lock );
163        return true;
164    }
165
166    if ( count == size ) { unlock( mutex_lock ); return false; }
167
168    __buf_insert( chan, elem );
169    unlock( mutex_lock );
170    return true;
171}
172
173// attempts a nonblocking insert
174// returns true if insert was successful, false otherwise
175static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
176
177// handles closed case of insert routine
178static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
179    channel_closed except{ &channel_closed_vt, &elem, &chan };
180    throwResume except; // throw closed resumption
181    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
182}
183
184static inline void insert( channel(T) & chan, T elem ) with(chan) {
185    // check for close before acquire mx
186    if ( unlikely(closed) ) {
187        __closed_insert( chan, elem );
188        return;
189    }
190
191    lock( mutex_lock );
192
193    #ifdef CHAN_STATS
194    if ( !closed ) p_ops++;
195    #endif
196
197    // if closed handle
198    if ( unlikely(closed) ) {
199        unlock( mutex_lock );
200        __closed_insert( chan, elem );
201        return;
202    }
203
204    // buffer count must be zero if cons are blocked (also handles zero-size case)
205    ConsEmpty: if ( !cons`isEmpty ) {
206        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
207        __cons_handoff( chan, elem );
208        unlock( mutex_lock );
209        return;
210    }
211
212    // wait if buffer is full, work will be completed by someone else
213    if ( count == size ) {
214        #ifdef CHAN_STATS
215        p_blocks++;
216        #endif
217
218        // check for if woken due to close
219        if ( unlikely( block( prods, &elem, mutex_lock ) ) )
220            __closed_insert( chan, elem );
221        return;
222    } // if
223
224    __buf_insert( chan, elem );
225    unlock( mutex_lock );
226}
227
228// does the buffer remove and potentially does waiting producer work
229static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
230    memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
231    count -= 1;
232    front = (front + 1) % size;
233    if (count == size - 1 && !prods`isEmpty ) {
234        if ( !__handle_waituntil_OR( prods ) ) return;
235        __buf_insert( chan, *(T *)prods`first.extra );  // do waiting producer work
236        wake_one( prods );
237    }
238}
239
240// needed to avoid an extra copy in closed case and single return val case
241static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
242    lock( mutex_lock );
243    #ifdef CHAN_STATS
244    c_ops++;
245    #endif
246
247    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
248        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
249        __prods_handoff( chan, retval );
250        unlock( mutex_lock );
251        return true;
252    }
253
254    if ( count == 0 ) { unlock( mutex_lock ); return false; }
255
256    __do_remove( chan, retval );
257    unlock( mutex_lock );
258    return true;
259}
260
261// attempts a nonblocking remove
262// returns [T, true] if insert was successful
263// returns [T, false] if insert was successful (T uninit)
264static inline [T, bool] try_remove( channel(T) & chan ) {
265    T retval;
266    bool success = __internal_try_remove( chan, retval );
267    return [ retval, success ];
268}
269
270static inline T try_remove( channel(T) & chan ) {
271    T retval;
272    __internal_try_remove( chan, retval );
273    return retval;
274}
275
276// handles closed case of insert routine
277static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
278    channel_closed except{ &channel_closed_vt, 0p, &chan };
279    throwResume except; // throw resumption
280    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
281}
282
283static inline T remove( channel(T) & chan ) with(chan) {
284    T retval;
285    if ( unlikely(closed) ) {
286        __closed_remove( chan, retval );
287        return retval;
288    }
289    lock( mutex_lock );
290
291    #ifdef CHAN_STATS
292    if ( !closed ) c_ops++;
293    #endif
294
295    if ( unlikely(closed) ) {
296        unlock( mutex_lock );
297        __closed_remove( chan, retval );
298        return retval;
299    }
300
301    // have to check for the zero size channel case
302    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
303        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
304        __prods_handoff( chan, retval );
305        unlock( mutex_lock );
306        return retval;
307    }
308
309    // wait if buffer is empty, work will be completed by someone else
310    if ( count == 0 ) {
311        #ifdef CHAN_STATS
312        c_blocks++;
313        #endif
314        // check for if woken due to close
315        if ( unlikely( block( cons, &retval, mutex_lock ) ) )
316            __closed_remove( chan, retval );
317        return retval;
318    }
319
320    // Remove from buffer
321    __do_remove( chan, retval );
322    unlock( mutex_lock );
323    return retval;
324}
325
326///////////////////////////////////////////////////////////////////////////////////////////
327// The following is support for waituntil (select) statements
328///////////////////////////////////////////////////////////////////////////////////////////
329static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
330    if ( !node`isListed && !node.park_counter ) return false; // handle special OR case
331    lock( mutex_lock );
332    if ( node`isListed ) { // op wasn't performed
333        remove( node );
334        unlock( mutex_lock );
335        return false;
336    }
337    unlock( mutex_lock );
338
339    // only return true when not special OR case, not exceptional calse and status is SAT
340    return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT;
341}
342
343// special case of __handle_waituntil_OR, that does some work to avoid starvation/deadlock case
344static inline bool __handle_pending( dlist( select_node ) & queue, select_node & mine ) {
345    while ( !queue`isEmpty ) {
346        // if node not a special OR case or if we win the special OR case race break
347        if ( !queue`first.clause_status || queue`first.park_counter || __pending_set_other( queue`first, mine, ((unsigned long int)(&(queue`first))) ) )
348            return true;
349       
350        // our node lost the race when toggling in __pending_set_other
351        if ( *mine.clause_status != __SELECT_PENDING )
352            return false;
353
354        // otherwise we lost the special OR race so discard node
355        try_pop_front( queue );
356    }
357    return false;
358}
359
360// type used by select statement to capture a chan read as the selected operation
361struct chan_read {
362    T & ret;
363    channel(T) & chan;
364};
365
366static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) {
367    &cr.chan = &chan;
368    &cr.ret = &ret;
369}
370static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; }
371
372static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) {
373    __closed_remove( chan, ret );
374    // if we get here then the insert succeeded
375    __make_select_node_available( node );
376}
377
378static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) {
379    lock( mutex_lock );
380    node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
381
382    #ifdef CHAN_STATS
383    if ( !closed ) c_ops++;
384    #endif
385
386    if ( !node.park_counter ) {
387        // are we special case OR and front of cons is also special case OR
388        if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {
389            if ( !__make_select_node_pending( node ) ) {
390                unlock( mutex_lock );
391                return false;
392            }
393
394            if ( __handle_pending( prods, node ) ) {
395                __prods_handoff( chan, ret );
396                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
397                unlock( mutex_lock );
398                return true;
399            }
400            if ( *node.clause_status == __SELECT_PENDING )
401                __make_select_node_unsat( node );
402        }
403        // check if we can complete operation. If so race to establish winner in special OR case
404        if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {
405            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
406                unlock( mutex_lock );
407                return false;
408            }
409        }
410    }
411
412    if ( unlikely(closed) ) {
413        unlock( mutex_lock );
414        __handle_select_closed_read( this, node );
415        return true;
416    }
417
418    // have to check for the zero size channel case
419    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
420        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
421        __prods_handoff( chan, ret );
422        __set_avail_then_unlock( node, mutex_lock );
423        return true;
424    }
425
426    // wait if buffer is empty, work will be completed by someone else
427    if ( count == 0 ) {
428        #ifdef CHAN_STATS
429        c_blocks++;
430        #endif
431       
432        insert_last( cons, node );
433        unlock( mutex_lock );
434        return false;
435    }
436
437    // Remove from buffer
438    __do_remove( chan, ret );
439    __set_avail_then_unlock( node, mutex_lock );
440    return true;
441}
442static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
443static inline void on_selected( chan_read(T) & this, select_node & node ) with(this) {
444    if ( node.extra == 0p ) // check if woken up due to closed channel
445        __closed_remove( chan, ret );
446    // This is only reachable if not closed or closed exception was handled
447}
448
449// type used by select statement to capture a chan write as the selected operation
450struct chan_write {
451    T elem;
452    channel(T) & chan;
453};
454
455static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) {
456    &cw.chan = &chan;
457    memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
458}
459static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; }
460
461static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) {
462    __closed_insert( chan, elem );
463    // if we get here then the insert succeeded
464    __make_select_node_available( node );
465}
466
467static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) {
468    lock( mutex_lock );
469    node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
470
471    #ifdef CHAN_STATS
472    if ( !closed ) p_ops++;
473    #endif
474
475    // special OR case handling
476    if ( !node.park_counter ) {
477        // are we special case OR and front of cons is also special case OR
478        if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {
479            if ( !__make_select_node_pending( node ) ) {
480                unlock( mutex_lock );
481                return false;
482            }
483
484            if ( __handle_pending( cons, node ) ) {
485                __cons_handoff( chan, elem );
486                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
487                unlock( mutex_lock );
488                return true;
489            }
490            if ( *node.clause_status == __SELECT_PENDING )
491                __make_select_node_unsat( node );
492        }
493        // check if we can complete operation. If so race to establish winner in special OR case
494        if ( count != size || !cons`isEmpty || unlikely(closed) ) {
495            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
496                unlock( mutex_lock );
497                return false;
498            }
499        }
500    }
501
502    // if closed handle
503    if ( unlikely(closed) ) {
504        unlock( mutex_lock );
505        __handle_select_closed_write( this, node );
506        return true;
507    }
508
509    // handle blocked consumer case via handoff (buffer is implicitly empty)
510    ConsEmpty: if ( !cons`isEmpty ) {
511        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
512        __cons_handoff( chan, elem );
513        __set_avail_then_unlock( node, mutex_lock );
514        return true;
515    }
516
517    // insert node in list if buffer is full, work will be completed by someone else
518    if ( count == size ) {
519        #ifdef CHAN_STATS
520        p_blocks++;
521        #endif
522
523        insert_last( prods, node );
524        unlock( mutex_lock );
525        return false;
526    } // if
527
528    // otherwise carry out write either via normal insert
529    __buf_insert( chan, elem );
530    __set_avail_then_unlock( node, mutex_lock );
531    return true;
532}
533static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
534
535static inline void on_selected( chan_write(T) & this, select_node & node ) with(this) {
536    if ( node.extra == 0p ) // check if woken up due to closed channel
537        __closed_insert( chan, elem );
538
539    // This is only reachable if not closed or closed exception was handled
540}
541
542} // forall( T )
543
544
Note: See TracBrowser for help on using the repository browser.