source: libcfa/src/concurrency/channel.hfa @ a09552d

Last change on this file since a09552d was c44705c, checked in by caparsons <caparson@…>, 12 months ago

fixed remove ambiguity issue

  • Property mode set to 100644
File size: 20.6 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// channel.hfa -- LIBCFATHREAD
8// Runtime locks that used with the runtime thread system.
9//
10// Author           : Colby Alexander Parsons
11// Created On       : Thu Jan 21 19:46:50 2022
12// Last Modified By :
13// Last Modified On :
14// Update Count     :
15//
16
17#pragma once
18
19#include <locks.hfa>
20#include <list.hfa>
21#include "select.hfa"
22
23// returns true if woken due to shutdown
24// blocks thread on list and releases passed lock
25static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
26    select_node sn{ active_thread(), elem_ptr };
27    insert_last( queue, sn );
28    unlock( lock );
29    park();
30    return sn.extra == 0p;
31}
32
33// Waituntil support (un)register_select helper routine
34// Sets select node avail if not special OR case and then unlocks
35static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
36    if ( node.park_counter ) __make_select_node_available( node );
37    unlock( mutex_lock );
38}
39
40// void * used for some fields since exceptions don't work with parametric polymorphism currently
41exception channel_closed {
42    // on failed insert elem is a ptr to the element attempting to be inserted
43    // on failed remove elem ptr is 0p
44    // on resumption of a failed insert this elem will be inserted
45    // so a user may modify it in the resumption handler
46    void * elem;
47
48    // pointer to chan that is closed
49    void * closed_chan;
50};
51vtable(channel_closed) channel_closed_vt;
52
53static inline bool is_insert( channel_closed & e ) { return e.elem != 0p; }
54static inline bool is_remove( channel_closed & e ) { return e.elem == 0p; }
55
56// #define CHAN_STATS // define this to get channel stats printed in dtor
57
58forall( T ) {
59
60struct __attribute__((aligned(128))) channel {
61    size_t size, front, back, count;
62    T * buffer;
63    dlist( select_node ) prods, cons; // lists of blocked threads
64    go_mutex mutex_lock;              // MX lock
65    bool closed;                      // indicates channel close/open
66    #ifdef CHAN_STATS
67    size_t p_blocks, p_ops, c_blocks, c_ops;      // counts total ops and ops resulting in a blocked thd
68    #endif
69};
70
71static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
72    size = _size;
73    front = back = count = 0;
74    if ( size != 0 ) buffer = aalloc( size );
75    prods{};
76    cons{};
77    mutex_lock{};
78    closed = false;
79    #ifdef CHAN_STATS
80    p_blocks = 0;
81    p_ops = 0;
82    c_blocks = 0;
83    c_ops = 0;
84    #endif
85}
86
87static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
88static inline void ^?{}( channel(T) &c ) with(c) {
89    #ifdef CHAN_STATS
90    printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100);
91    printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100);
92    printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100);
93    #endif
94    verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || cons`isEmpty && prods`isEmpty,
95        "Attempted to delete channel with waiting threads (Deadlock).\n" );
96    if ( size != 0 ) delete( buffer );
97}
98static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }
99static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }
100static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
101static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
102static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
103
104// closes the channel and notifies all blocked threads
105static inline void close( channel(T) & chan ) with(chan) {
106    lock( mutex_lock );
107    closed = true;
108
109    // flush waiting consumers and producers
110    while ( has_waiting_consumers( chan ) ) {
111        if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
112            break;  // if __handle_waituntil_OR returns false cons is empty so break
113        cons`first.extra = 0p;
114        wake_one( cons );
115    }
116    while ( has_waiting_producers( chan ) ) {
117        if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
118            break;  // if __handle_waituntil_OR returns false prods is empty so break
119        prods`first.extra = 0p;
120        wake_one( prods );
121    }
122    unlock(mutex_lock);
123}
124
125static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
126
127// used to hand an element to a blocked consumer and signal it
128static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
129    memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
130    wake_one( cons );
131}
132
133// used to hand an element to a blocked producer and signal it
134static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
135    memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
136    wake_one( prods );
137}
138
139static inline void flush( channel(T) & chan, T elem ) with(chan) {
140    lock( mutex_lock );
141    while ( count == 0 && !cons`isEmpty ) {
142        __cons_handoff( chan, elem );
143    }
144    unlock( mutex_lock );
145}
146
147// handles buffer insert
148static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
149    memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
150    count += 1;
151    back++;
152    if ( back == size ) back = 0;
153}
154
155// needed to avoid an extra copy in closed case
156static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
157    lock( mutex_lock );
158    #ifdef CHAN_STATS
159    p_ops++;
160    #endif
161
162    ConsEmpty: if ( !cons`isEmpty ) {
163        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
164        __cons_handoff( chan, elem );
165        unlock( mutex_lock );
166        return true;
167    }
168
169    if ( count == size ) { unlock( mutex_lock ); return false; }
170
171    __buf_insert( chan, elem );
172    unlock( mutex_lock );
173    return true;
174}
175
176// attempts a nonblocking insert
177// returns true if insert was successful, false otherwise
178static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
179
180// handles closed case of insert routine
181static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
182    channel_closed except{ &channel_closed_vt, &elem, &chan };
183    throwResume except; // throw closed resumption
184    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
185}
186
187static inline void insert( channel(T) & chan, T elem ) with(chan) {
188    // check for close before acquire mx
189    if ( unlikely(closed) ) {
190        __closed_insert( chan, elem );
191        return;
192    }
193
194    lock( mutex_lock );
195
196    #ifdef CHAN_STATS
197    if ( !closed ) p_ops++;
198    #endif
199
200    // if closed handle
201    if ( unlikely(closed) ) {
202        unlock( mutex_lock );
203        __closed_insert( chan, elem );
204        return;
205    }
206
207    // buffer count must be zero if cons are blocked (also handles zero-size case)
208    ConsEmpty: if ( !cons`isEmpty ) {
209        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
210        __cons_handoff( chan, elem );
211        unlock( mutex_lock );
212        return;
213    }
214
215    // wait if buffer is full, work will be completed by someone else
216    if ( count == size ) {
217        #ifdef CHAN_STATS
218        p_blocks++;
219        #endif
220
221        // check for if woken due to close
222        if ( unlikely( block( prods, &elem, mutex_lock ) ) )
223            __closed_insert( chan, elem );
224        return;
225    } // if
226
227    __buf_insert( chan, elem );
228    unlock( mutex_lock );
229}
230
231// does the buffer remove and potentially does waiting producer work
232static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
233    memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
234    count -= 1;
235    front = (front + 1) % size;
236    if (count == size - 1 && !prods`isEmpty ) {
237        if ( !__handle_waituntil_OR( prods ) ) return;
238        __buf_insert( chan, *(T *)prods`first.extra );  // do waiting producer work
239        wake_one( prods );
240    }
241}
242
243// needed to avoid an extra copy in closed case and single return val case
244static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
245    lock( mutex_lock );
246    #ifdef CHAN_STATS
247    c_ops++;
248    #endif
249
250    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
251        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
252        __prods_handoff( chan, retval );
253        unlock( mutex_lock );
254        return true;
255    }
256
257    if ( count == 0 ) { unlock( mutex_lock ); return false; }
258
259    __do_remove( chan, retval );
260    unlock( mutex_lock );
261    return true;
262}
263
264// attempts a nonblocking remove
265// returns [T, true] if insert was successful
266// returns [T, false] if insert was successful (T uninit)
267static inline [T, bool] try_remove( channel(T) & chan ) {
268    T retval;
269    bool success = __internal_try_remove( chan, retval );
270    return [ retval, success ];
271}
272
273static inline T try_remove( channel(T) & chan ) {
274    T retval;
275    __internal_try_remove( chan, retval );
276    return retval;
277}
278
279// handles closed case of insert routine
280static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
281    channel_closed except{ &channel_closed_vt, 0p, &chan };
282    throwResume except; // throw resumption
283    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
284}
285
286static inline T remove( channel(T) & chan ) with(chan) {
287    T retval;
288    if ( unlikely(closed) ) {
289        __closed_remove( chan, retval );
290        return retval;
291    }
292    lock( mutex_lock );
293
294    #ifdef CHAN_STATS
295    if ( !closed ) c_ops++;
296    #endif
297
298    if ( unlikely(closed) ) {
299        unlock( mutex_lock );
300        __closed_remove( chan, retval );
301        return retval;
302    }
303
304    // have to check for the zero size channel case
305    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
306        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
307        __prods_handoff( chan, retval );
308        unlock( mutex_lock );
309        return retval;
310    }
311
312    // wait if buffer is empty, work will be completed by someone else
313    if ( count == 0 ) {
314        #ifdef CHAN_STATS
315        c_blocks++;
316        #endif
317        // check for if woken due to close
318        if ( unlikely( block( cons, &retval, mutex_lock ) ) )
319            __closed_remove( chan, retval );
320        return retval;
321    }
322
323    // Remove from buffer
324    __do_remove( chan, retval );
325    unlock( mutex_lock );
326    return retval;
327}
328static inline void remove( channel(T) & chan ) { T elem = (T)remove( chan ); }
329
330
331///////////////////////////////////////////////////////////////////////////////////////////
332// The following is Go-style operator support for channels
333///////////////////////////////////////////////////////////////////////////////////////////
334
335static inline void ?<<?( channel(T) & chan, T elem ) { insert( chan, elem ); }
336static inline void ?<<?( T & ret, channel(T) & chan ) { ret = remove( chan ); }
337
338///////////////////////////////////////////////////////////////////////////////////////////
339// The following is support for waituntil (select) statements
340///////////////////////////////////////////////////////////////////////////////////////////
341static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
342    if ( !node`isListed && !node.park_counter ) return false; // handle special OR case
343    lock( mutex_lock );
344    if ( node`isListed ) { // op wasn't performed
345        remove( node );
346        unlock( mutex_lock );
347        return false;
348    }
349    unlock( mutex_lock );
350
351    // only return true when not special OR case and status is SAT
352    return !node.park_counter ? false : *node.clause_status == __SELECT_SAT;
353}
354
355// special case of __handle_waituntil_OR, that does some work to avoid starvation/deadlock case
356static inline bool __handle_pending( dlist( select_node ) & queue, select_node & mine ) {
357    while ( !queue`isEmpty ) {
358        // if node not a special OR case or if we win the special OR case race break
359        if ( !queue`first.clause_status || queue`first.park_counter || __pending_set_other( queue`first, mine, ((unsigned long int)(&(queue`first))) ) )
360            return true;
361       
362        // our node lost the race when toggling in __pending_set_other
363        if ( *mine.clause_status != __SELECT_PENDING )
364            return false;
365
366        // otherwise we lost the special OR race so discard node
367        try_pop_front( queue );
368    }
369    return false;
370}
371
372// type used by select statement to capture a chan read as the selected operation
373struct chan_read {
374    T & ret;
375    channel(T) & chan;
376};
377__CFA_SELECT_GET_TYPE( chan_read(T) );
378
379static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) {
380    &cr.chan = &chan;
381    &cr.ret = &ret;
382}
383static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; }
384
385static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) {
386    __closed_remove( chan, ret );
387    // if we get here then the insert succeeded
388    __make_select_node_available( node );
389}
390
391static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) {
392    lock( mutex_lock );
393    node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
394
395    #ifdef CHAN_STATS
396    if ( !closed ) c_ops++;
397    #endif
398
399    if ( !node.park_counter ) {
400        // are we special case OR and front of cons is also special case OR
401        if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {
402            if ( !__make_select_node_pending( node ) ) {
403                unlock( mutex_lock );
404                return false;
405            }
406
407            if ( __handle_pending( prods, node ) ) {
408                __prods_handoff( chan, ret );
409                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
410                unlock( mutex_lock );
411                return true;
412            }
413            if ( *node.clause_status == __SELECT_PENDING )
414                __make_select_node_unsat( node );
415        }
416        // check if we can complete operation. If so race to establish winner in special OR case
417        if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {
418            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
419                unlock( mutex_lock );
420                return false;
421            }
422        }
423    }
424
425    if ( unlikely(closed) ) {
426        unlock( mutex_lock );
427        __handle_select_closed_read( this, node );
428        return true;
429    }
430
431    // have to check for the zero size channel case
432    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
433        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
434        __prods_handoff( chan, ret );
435        __set_avail_then_unlock( node, mutex_lock );
436        return true;
437    }
438
439    // wait if buffer is empty, work will be completed by someone else
440    if ( count == 0 ) {
441        #ifdef CHAN_STATS
442        c_blocks++;
443        #endif
444       
445        insert_last( cons, node );
446        unlock( mutex_lock );
447        return false;
448    }
449
450    // Remove from buffer
451    __do_remove( chan, ret );
452    __set_avail_then_unlock( node, mutex_lock );
453    return true;
454}
455static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
456static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
457    if ( unlikely(node.extra == 0p) ) {
458        if ( !exception_in_flight() ) __closed_remove( chan, ret ); // check if woken up due to closed channel
459        else return false;
460    }
461    // This is only reachable if not closed or closed exception was handled
462    return true;
463}
464
465// type used by select statement to capture a chan read as the selected operation that doesn't have a param to read to
466struct chan_read_no_ret {
467    T ret;
468    chan_read( T ) cr;
469};
470__CFA_SELECT_GET_TYPE( chan_read_no_ret(T) );
471
472static inline void ?{}( chan_read_no_ret(T) & this, channel(T) & chan ) {
473    this.cr{ chan, this.ret };
474}
475static inline chan_read_no_ret(T) remove( channel(T) & chan ) { chan_read_no_ret(T) cr{ chan }; return cr; }
476static inline bool register_select( chan_read_no_ret(T) & this, select_node & node ) { return register_select( this.cr, node ); }
477static inline bool unregister_select( chan_read_no_ret(T) & this, select_node & node ) { return unregister_select( this.cr, node ); }
478static inline bool on_selected( chan_read_no_ret(T) & this, select_node & node ) { return on_selected( this.cr, node ); }
479
480// type used by select statement to capture a chan write as the selected operation
481struct chan_write {
482    T elem;
483    channel(T) & chan;
484};
485__CFA_SELECT_GET_TYPE( chan_write(T) );
486
487static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) {
488    &cw.chan = &chan;
489    memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
490}
491static inline chan_write(T) ?<<?( channel(T) & chan, T elem ) { chan_write(T) cw{ chan, elem }; return cw; }
492static inline chan_write(T) insert( T elem, channel(T) & chan) { chan_write(T) cw{ chan, elem }; return cw; }
493
494static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) {
495    __closed_insert( chan, elem );
496    // if we get here then the insert succeeded
497    __make_select_node_available( node );
498}
499
500static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) {
501    lock( mutex_lock );
502    node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
503
504    #ifdef CHAN_STATS
505    if ( !closed ) p_ops++;
506    #endif
507
508    // special OR case handling
509    if ( !node.park_counter ) {
510        // are we special case OR and front of cons is also special case OR
511        if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {
512            if ( !__make_select_node_pending( node ) ) {
513                unlock( mutex_lock );
514                return false;
515            }
516
517            if ( __handle_pending( cons, node ) ) {
518                __cons_handoff( chan, elem );
519                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
520                unlock( mutex_lock );
521                return true;
522            }
523            if ( *node.clause_status == __SELECT_PENDING )
524                __make_select_node_unsat( node );
525        }
526        // check if we can complete operation. If so race to establish winner in special OR case
527        if ( count != size || !cons`isEmpty || unlikely(closed) ) {
528            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
529                unlock( mutex_lock );
530                return false;
531            }
532        }
533    }
534
535    // if closed handle
536    if ( unlikely(closed) ) {
537        unlock( mutex_lock );
538        __handle_select_closed_write( this, node );
539        return true;
540    }
541
542    // handle blocked consumer case via handoff (buffer is implicitly empty)
543    ConsEmpty: if ( !cons`isEmpty ) {
544        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
545        __cons_handoff( chan, elem );
546        __set_avail_then_unlock( node, mutex_lock );
547        return true;
548    }
549
550    // insert node in list if buffer is full, work will be completed by someone else
551    if ( count == size ) {
552        #ifdef CHAN_STATS
553        p_blocks++;
554        #endif
555
556        insert_last( prods, node );
557        unlock( mutex_lock );
558        return false;
559    } // if
560
561    // otherwise carry out write either via normal insert
562    __buf_insert( chan, elem );
563    __set_avail_then_unlock( node, mutex_lock );
564    return true;
565}
566static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
567
568static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
569    if ( unlikely(node.extra == 0p) ) {
570        if ( !exception_in_flight() ) __closed_insert( chan, elem ); // check if woken up due to closed channel
571        else return false;
572    }
573    // This is only reachable if not closed or closed exception was handled
574    return true;
575}
576
577} // forall( T )
578
579
Note: See TracBrowser for help on using the repository browser.