source: libcfa/src/concurrency/channel.hfa @ edc4813

Last change on this file since edc4813 was 3f0b062, checked in by caparsons <caparson@…>, 14 months ago

ifdef'd the arm fences that were added to channels so that they only appear on the arm

  • Property mode set to 100644
File size: 21.1 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// channel.hfa -- LIBCFATHREAD
8// Runtime locks that used with the runtime thread system.
9//
10// Author           : Colby Alexander Parsons
11// Created On       : Thu Jan 21 19:46:50 2022
12// Last Modified By :
13// Last Modified On :
14// Update Count     :
15//
16
17#pragma once
18
19#include <locks.hfa>
20#include <list.hfa>
21#include "select.hfa"
22
23// returns true if woken due to shutdown
24// blocks thread on list and releases passed lock
25static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
26    select_node sn{ active_thread(), elem_ptr };
27    insert_last( queue, sn );
28    unlock( lock );
29    park();
30    #if defined(__ARM_ARCH)
31    __atomic_thread_fence( __ATOMIC_SEQ_CST );
32    #endif
33    return sn.extra == 0p;
34}
35
36// Waituntil support (un)register_select helper routine
37// Sets select node avail if not special OR case and then unlocks
38static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
39    if ( node.park_counter ) __make_select_node_available( node );
40    unlock( mutex_lock );
41}
42
43// void * used for some fields since exceptions don't work with parametric polymorphism currently
44exception channel_closed {
45    // on failed insert elem is a ptr to the element attempting to be inserted
46    // on failed remove elem ptr is 0p
47    // on resumption of a failed insert this elem will be inserted
48    // so a user may modify it in the resumption handler
49    void * elem;
50
51    // pointer to chan that is closed
52    void * closed_chan;
53};
54vtable(channel_closed) channel_closed_vt;
55
56static inline bool is_insert( channel_closed & e ) { return e.elem != 0p; }
57static inline bool is_remove( channel_closed & e ) { return e.elem == 0p; }
58
59// #define CHAN_STATS // define this to get channel stats printed in dtor
60
61forall( T ) {
62
63struct __attribute__((aligned(128))) channel {
64    size_t size, front, back, count;
65    T * buffer;
66    dlist( select_node ) prods, cons; // lists of blocked threads
67    go_mutex mutex_lock;              // MX lock
68    bool closed;                      // indicates channel close/open
69    #ifdef CHAN_STATS
70    size_t p_blocks, p_ops, c_blocks, c_ops;      // counts total ops and ops resulting in a blocked thd
71    #endif
72};
73static inline void ?{}( channel(T) & this, channel(T) this2 ) = void;
74static inline void ?=?( channel(T) & this, channel(T) this2 ) = void;
75
76static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
77    size = _size;
78    front = back = count = 0;
79    if ( size != 0 ) buffer = aalloc( size );
80    prods{};
81    cons{};
82    mutex_lock{};
83    closed = false;
84    #ifdef CHAN_STATS
85    p_blocks = 0;
86    p_ops = 0;
87    c_blocks = 0;
88    c_ops = 0;
89    #endif
90}
91
92static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
93static inline void ^?{}( channel(T) &c ) with(c) {
94    #ifdef CHAN_STATS
95    printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100);
96    printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100);
97    printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100);
98    #endif
99    verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || cons`isEmpty && prods`isEmpty,
100        "Attempted to delete channel with waiting threads (Deadlock).\n" );
101    if ( size != 0 ) delete( buffer );
102}
103static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }
104static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }
105static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
106static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
107static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
108
109// closes the channel and notifies all blocked threads
110static inline void close( channel(T) & chan ) with(chan) {
111    lock( mutex_lock );
112    closed = true;
113
114    // flush waiting consumers and producers
115    while ( has_waiting_consumers( chan ) ) {
116        if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
117            break;  // if __handle_waituntil_OR returns false cons is empty so break
118        cons`first.extra = 0p;
119        wake_one( cons );
120    }
121    while ( has_waiting_producers( chan ) ) {
122        if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
123            break;  // if __handle_waituntil_OR returns false prods is empty so break
124        prods`first.extra = 0p;
125        wake_one( prods );
126    }
127    unlock(mutex_lock);
128}
129
130static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
131
132// used to hand an element to a blocked consumer and signal it
133static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
134    memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
135    #if defined(__ARM_ARCH)
136    __atomic_thread_fence( __ATOMIC_SEQ_CST );
137    #endif
138    wake_one( cons );
139}
140
141// used to hand an element to a blocked producer and signal it
142static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
143    memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
144    #if defined(__ARM_ARCH)
145    __atomic_thread_fence( __ATOMIC_SEQ_CST );
146    #endif
147    wake_one( prods );
148}
149
150static inline void flush( channel(T) & chan, T elem ) with(chan) {
151    lock( mutex_lock );
152    while ( count == 0 && !cons`isEmpty ) {
153        __cons_handoff( chan, elem );
154    }
155    unlock( mutex_lock );
156}
157
158// handles buffer insert
159static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
160    memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
161    count += 1;
162    back++;
163    if ( back == size ) back = 0;
164}
165
166// needed to avoid an extra copy in closed case
167static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
168    lock( mutex_lock );
169    #ifdef CHAN_STATS
170    p_ops++;
171    #endif
172
173    ConsEmpty: if ( !cons`isEmpty ) {
174        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
175        __cons_handoff( chan, elem );
176        unlock( mutex_lock );
177        return true;
178    }
179
180    if ( count == size ) { unlock( mutex_lock ); return false; }
181
182    __buf_insert( chan, elem );
183    unlock( mutex_lock );
184    return true;
185}
186
187// attempts a nonblocking insert
188// returns true if insert was successful, false otherwise
189static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
190
191// handles closed case of insert routine
192static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
193    channel_closed except{ &channel_closed_vt, &elem, &chan };
194    throwResume except; // throw closed resumption
195    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
196}
197
198static inline void insert( channel(T) & chan, T elem ) with(chan) {
199    // check for close before acquire mx
200    if ( unlikely(closed) ) {
201        __closed_insert( chan, elem );
202        return;
203    }
204
205    lock( mutex_lock );
206
207    #ifdef CHAN_STATS
208    if ( !closed ) p_ops++;
209    #endif
210
211    // if closed handle
212    if ( unlikely(closed) ) {
213        unlock( mutex_lock );
214        __closed_insert( chan, elem );
215        return;
216    }
217
218    // buffer count must be zero if cons are blocked (also handles zero-size case)
219    ConsEmpty: if ( !cons`isEmpty ) {
220        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
221        __cons_handoff( chan, elem );
222        unlock( mutex_lock );
223        return;
224    }
225
226    // wait if buffer is full, work will be completed by someone else
227    if ( count == size ) {
228        #ifdef CHAN_STATS
229        p_blocks++;
230        #endif
231
232        // check for if woken due to close
233        if ( unlikely( block( prods, &elem, mutex_lock ) ) )
234            __closed_insert( chan, elem );
235        return;
236    } // if
237
238    __buf_insert( chan, elem );
239    unlock( mutex_lock );
240}
241
242// does the buffer remove and potentially does waiting producer work
243static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
244    memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
245    count -= 1;
246    front = (front + 1) % size;
247    if (count == size - 1 && !prods`isEmpty ) {
248        if ( !__handle_waituntil_OR( prods ) ) return;
249        __buf_insert( chan, *(T *)prods`first.extra );  // do waiting producer work
250        wake_one( prods );
251    }
252}
253
254// needed to avoid an extra copy in closed case and single return val case
255static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
256    lock( mutex_lock );
257    #ifdef CHAN_STATS
258    c_ops++;
259    #endif
260
261    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
262        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
263        __prods_handoff( chan, retval );
264        unlock( mutex_lock );
265        return true;
266    }
267
268    if ( count == 0 ) { unlock( mutex_lock ); return false; }
269
270    __do_remove( chan, retval );
271    unlock( mutex_lock );
272    return true;
273}
274
275// attempts a nonblocking remove
276// returns [T, true] if insert was successful
277// returns [T, false] if insert was successful (T uninit)
278static inline [T, bool] try_remove( channel(T) & chan ) {
279    T retval;
280    bool success = __internal_try_remove( chan, retval );
281    return [ retval, success ];
282}
283
284static inline T try_remove( channel(T) & chan ) {
285    T retval;
286    __internal_try_remove( chan, retval );
287    return retval;
288}
289
290// handles closed case of insert routine
291static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
292    channel_closed except{ &channel_closed_vt, 0p, &chan };
293    throwResume except; // throw resumption
294    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
295}
296
297static inline T remove( channel(T) & chan ) with(chan) {
298    T retval;
299    if ( unlikely(closed) ) {
300        __closed_remove( chan, retval );
301        return retval;
302    }
303    lock( mutex_lock );
304
305    #ifdef CHAN_STATS
306    if ( !closed ) c_ops++;
307    #endif
308
309    if ( unlikely(closed) ) {
310        unlock( mutex_lock );
311        __closed_remove( chan, retval );
312        return retval;
313    }
314
315    // have to check for the zero size channel case
316    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
317        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
318        __prods_handoff( chan, retval );
319        unlock( mutex_lock );
320        return retval;
321    }
322
323    // wait if buffer is empty, work will be completed by someone else
324    if ( count == 0 ) {
325        #ifdef CHAN_STATS
326        c_blocks++;
327        #endif
328        // check for if woken due to close
329        if ( unlikely( block( cons, &retval, mutex_lock ) ) )
330            __closed_remove( chan, retval );
331        return retval;
332    }
333
334    // Remove from buffer
335    __do_remove( chan, retval );
336    unlock( mutex_lock );
337    return retval;
338}
339static inline void remove( channel(T) & chan ) { T elem = (T)remove( chan ); }
340
341
342///////////////////////////////////////////////////////////////////////////////////////////
343// The following is Go-style operator support for channels
344///////////////////////////////////////////////////////////////////////////////////////////
345
346static inline void ?<<?( channel(T) & chan, T elem ) { insert( chan, elem ); }
347static inline void ?<<?( T & ret, channel(T) & chan ) { ret = remove( chan ); }
348
349///////////////////////////////////////////////////////////////////////////////////////////
350// The following is support for waituntil (select) statements
351///////////////////////////////////////////////////////////////////////////////////////////
352static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
353    if ( !node`isListed && !node.park_counter ) return false; // handle special OR case
354    lock( mutex_lock );
355    if ( node`isListed ) { // op wasn't performed
356        remove( node );
357        unlock( mutex_lock );
358        return false;
359    }
360    unlock( mutex_lock );
361
362    // only return true when not special OR case and status is SAT
363    return !node.park_counter ? false : *node.clause_status == __SELECT_SAT;
364}
365
366// special case of __handle_waituntil_OR, that does some work to avoid starvation/deadlock case
367static inline bool __handle_pending( dlist( select_node ) & queue, select_node & mine ) {
368    while ( !queue`isEmpty ) {
369        // if node not a special OR case or if we win the special OR case race break
370        if ( !queue`first.clause_status || queue`first.park_counter || __pending_set_other( queue`first, mine, ((unsigned long int)(&(queue`first))) ) )
371            return true;
372       
373        // our node lost the race when toggling in __pending_set_other
374        if ( *mine.clause_status != __SELECT_PENDING )
375            return false;
376
377        // otherwise we lost the special OR race so discard node
378        try_pop_front( queue );
379    }
380    return false;
381}
382
383// type used by select statement to capture a chan read as the selected operation
384struct chan_read {
385    T * ret;
386    channel(T) * chan;
387};
388__CFA_SELECT_GET_TYPE( chan_read(T) );
389
390static inline void ?{}( chan_read(T) & cr, channel(T) * chan, T * ret ) {
391    cr.chan = chan;
392    cr.ret = ret;
393}
394static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ &chan, &ret }; return cr; }
395
396static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(*this.chan, this) {
397    __closed_remove( *chan, *ret );
398    // if we get here then the insert succeeded
399    __make_select_node_available( node );
400}
401
402static inline bool register_select( chan_read(T) & this, select_node & node ) with(*this.chan, this) {
403    lock( mutex_lock );
404    node.extra = ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
405
406    #ifdef CHAN_STATS
407    if ( !closed ) c_ops++;
408    #endif
409
410    if ( !node.park_counter ) {
411        // are we special case OR and front of cons is also special case OR
412        if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {
413            if ( !__make_select_node_pending( node ) ) {
414                unlock( mutex_lock );
415                return false;
416            }
417
418            if ( __handle_pending( prods, node ) ) {
419                __prods_handoff( *chan, *ret );
420                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
421                unlock( mutex_lock );
422                return true;
423            }
424            if ( *node.clause_status == __SELECT_PENDING )
425                __make_select_node_unsat( node );
426        }
427        // check if we can complete operation. If so race to establish winner in special OR case
428        if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {
429            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
430                unlock( mutex_lock );
431                return false;
432            }
433        }
434    }
435
436    if ( unlikely(closed) ) {
437        unlock( mutex_lock );
438        __handle_select_closed_read( this, node );
439        return true;
440    }
441
442    // have to check for the zero size channel case
443    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
444        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
445        __prods_handoff( *chan, *ret );
446        __set_avail_then_unlock( node, mutex_lock );
447        return true;
448    }
449
450    // wait if buffer is empty, work will be completed by someone else
451    if ( count == 0 ) {
452        #ifdef CHAN_STATS
453        c_blocks++;
454        #endif
455       
456        insert_last( cons, node );
457        unlock( mutex_lock );
458        return false;
459    }
460
461    // Remove from buffer
462    __do_remove( *chan, *ret );
463    __set_avail_then_unlock( node, mutex_lock );
464    return true;
465}
466static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); }
467static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
468    if ( unlikely(node.extra == 0p) ) {
469        if ( !exception_in_flight() ) __closed_remove( *chan, *ret ); // check if woken up due to closed channel
470        else return false;
471    }
472    // This is only reachable if not closed or closed exception was handled
473    return true;
474}
475
476// type used by select statement to capture a chan read as the selected operation that doesn't have a param to read to
477struct chan_read_no_ret {
478    T retval;
479    chan_read( T ) c_read;
480};
481__CFA_SELECT_GET_TYPE( chan_read_no_ret(T) );
482
483static inline void ?{}( chan_read_no_ret(T) & this, channel(T) & chan ) {
484    this.c_read{ &chan, &this.retval };
485}
486
487static inline chan_read_no_ret(T) remove( channel(T) & chan ) { chan_read_no_ret(T) c_read{ chan }; return c_read; }
488static inline bool register_select( chan_read_no_ret(T) & this, select_node & node ) {
489    this.c_read.ret = &this.retval;
490    return register_select( this.c_read, node );
491}
492static inline bool unregister_select( chan_read_no_ret(T) & this, select_node & node ) { return unregister_select( this.c_read, node ); }
493static inline bool on_selected( chan_read_no_ret(T) & this, select_node & node ) { return on_selected( this.c_read, node ); }
494
495// type used by select statement to capture a chan write as the selected operation
496struct chan_write {
497    T elem;
498    channel(T) * chan;
499};
500__CFA_SELECT_GET_TYPE( chan_write(T) );
501
502static inline void ?{}( chan_write(T) & cw, channel(T) * chan, T elem ) {
503    cw.chan = chan;
504    memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
505}
506static inline chan_write(T) ?<<?( channel(T) & chan, T elem ) { chan_write(T) cw{ &chan, elem }; return cw; }
507static inline chan_write(T) insert( T elem, channel(T) & chan) { chan_write(T) cw{ &chan, elem }; return cw; }
508
509static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(*this.chan, this) {
510    __closed_insert( *chan, elem );
511    // if we get here then the insert succeeded
512    __make_select_node_available( node );
513}
514
515static inline bool register_select( chan_write(T) & this, select_node & node ) with(*this.chan, this) {
516    lock( mutex_lock );
517    node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
518
519    #ifdef CHAN_STATS
520    if ( !closed ) p_ops++;
521    #endif
522
523    // special OR case handling
524    if ( !node.park_counter ) {
525        // are we special case OR and front of cons is also special case OR
526        if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {
527            if ( !__make_select_node_pending( node ) ) {
528                unlock( mutex_lock );
529                return false;
530            }
531
532            if ( __handle_pending( cons, node ) ) {
533                __cons_handoff( *chan, elem );
534                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
535                unlock( mutex_lock );
536                return true;
537            }
538            if ( *node.clause_status == __SELECT_PENDING )
539                __make_select_node_unsat( node );
540        }
541        // check if we can complete operation. If so race to establish winner in special OR case
542        if ( count != size || !cons`isEmpty || unlikely(closed) ) {
543            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
544                unlock( mutex_lock );
545                return false;
546            }
547        }
548    }
549
550    // if closed handle
551    if ( unlikely(closed) ) {
552        unlock( mutex_lock );
553        __handle_select_closed_write( this, node );
554        return true;
555    }
556
557    // handle blocked consumer case via handoff (buffer is implicitly empty)
558    ConsEmpty: if ( !cons`isEmpty ) {
559        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
560        __cons_handoff( *chan, elem );
561        __set_avail_then_unlock( node, mutex_lock );
562        return true;
563    }
564
565    // insert node in list if buffer is full, work will be completed by someone else
566    if ( count == size ) {
567        #ifdef CHAN_STATS
568        p_blocks++;
569        #endif
570
571        insert_last( prods, node );
572        unlock( mutex_lock );
573        return false;
574    } // if
575
576    // otherwise carry out write either via normal insert
577    __buf_insert( *chan, elem );
578    __set_avail_then_unlock( node, mutex_lock );
579    return true;
580}
581static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); }
582
583static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
584    if ( unlikely(node.extra == 0p) ) {
585        if ( !exception_in_flight() ) __closed_insert( *chan, elem ); // check if woken up due to closed channel
586        else return false;
587    }
588    // This is only reachable if not closed or closed exception was handled
589    return true;
590}
591
592} // forall( T )
593
594
Note: See TracBrowser for help on using the repository browser.