source: libcfa/src/concurrency/channel.hfa @ 0d41b2e

Last change on this file since 0d41b2e was 3f0b062, checked in by caparsons <caparson@…>, 9 months ago

ifdef'd the arm fences that were added to channels so that they only appear on the arm

  • Property mode set to 100644
File size: 21.1 KB
RevLine 
[629c95a]1//
2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// channel.hfa -- LIBCFATHREAD
8// Runtime locks that used with the runtime thread system.
9//
10// Author           : Colby Alexander Parsons
11// Created On       : Thu Jan 21 19:46:50 2022
12// Last Modified By :
13// Last Modified On :
14// Update Count     :
15//
16
[5e4a830]17#pragma once
18
[4a962d8]19#include <locks.hfa>
[d30e3eb]20#include <list.hfa>
[beeff61e]21#include "select.hfa"
[a45e21c]22
23// returns true if woken due to shutdown
24// blocks thread on list and releases passed lock
[beeff61e]25static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
26    select_node sn{ active_thread(), elem_ptr };
27    insert_last( queue, sn );
[a45e21c]28    unlock( lock );
29    park();
[3f0b062]30    #if defined(__ARM_ARCH)
[ca995e3]31    __atomic_thread_fence( __ATOMIC_SEQ_CST );
[3f0b062]32    #endif
[beeff61e]33    return sn.extra == 0p;
34}
35
36// Waituntil support (un)register_select helper routine
37// Sets select node avail if not special OR case and then unlocks
38static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
39    if ( node.park_counter ) __make_select_node_available( node );
40    unlock( mutex_lock );
[a45e21c]41}
42
43// void * used for some fields since exceptions don't work with parametric polymorphism currently
44exception channel_closed {
45    // on failed insert elem is a ptr to the element attempting to be inserted
46    // on failed remove elem ptr is 0p
47    // on resumption of a failed insert this elem will be inserted
48    // so a user may modify it in the resumption handler
49    void * elem;
50
51    // pointer to chan that is closed
52    void * closed_chan;
53};
54vtable(channel_closed) channel_closed_vt;
55
[ded6c2a6]56static inline bool is_insert( channel_closed & e ) { return e.elem != 0p; }
57static inline bool is_remove( channel_closed & e ) { return e.elem == 0p; }
[3eeeb88]58
[a45e21c]59// #define CHAN_STATS // define this to get channel stats printed in dtor
60
[4a962d8]61forall( T ) {
[d30e3eb]62
[a45e21c]63struct __attribute__((aligned(128))) channel {
64    size_t size, front, back, count;
[4a962d8]65    T * buffer;
[beeff61e]66    dlist( select_node ) prods, cons; // lists of blocked threads
[629c95a]67    go_mutex mutex_lock;              // MX lock
68    bool closed;                      // indicates channel close/open
[a45e21c]69    #ifdef CHAN_STATS
[88b49bb]70    size_t p_blocks, p_ops, c_blocks, c_ops;      // counts total ops and ops resulting in a blocked thd
[a45e21c]71    #endif
[4a962d8]72};
[7a2c6b18]73static inline void ?{}( channel(T) & this, channel(T) this2 ) = void;
74static inline void ?=?( channel(T) & this, channel(T) this2 ) = void;
[4a962d8]75
76static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
77    size = _size;
78    front = back = count = 0;
[beeff61e]79    if ( size != 0 ) buffer = aalloc( size );
[4a962d8]80    prods{};
81    cons{};
82    mutex_lock{};
[a45e21c]83    closed = false;
84    #ifdef CHAN_STATS
[88b49bb]85    p_blocks = 0;
86    p_ops = 0;
87    c_blocks = 0;
88    c_ops = 0;
[a45e21c]89    #endif
[4a962d8]90}
91
92static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
[a45e21c]93static inline void ^?{}( channel(T) &c ) with(c) {
94    #ifdef CHAN_STATS
[88b49bb]95    printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100);
96    printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100);
97    printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100);
[a45e21c]98    #endif
[88b49bb]99    verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || cons`isEmpty && prods`isEmpty,
100        "Attempted to delete channel with waiting threads (Deadlock).\n" );
[beeff61e]101    if ( size != 0 ) delete( buffer );
[a45e21c]102}
[5908fb4]103static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }
104static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }
[d30e3eb]105static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
106static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
107static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
[4a962d8]108
[a45e21c]109// closes the channel and notifies all blocked threads
110static inline void close( channel(T) & chan ) with(chan) {
111    lock( mutex_lock );
112    closed = true;
113
114    // flush waiting consumers and producers
115    while ( has_waiting_consumers( chan ) ) {
[beeff61e]116        if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
117            break;  // if __handle_waituntil_OR returns false cons is empty so break
118        cons`first.extra = 0p;
[a45e21c]119        wake_one( cons );
120    }
121    while ( has_waiting_producers( chan ) ) {
[beeff61e]122        if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
123            break;  // if __handle_waituntil_OR returns false prods is empty so break
124        prods`first.extra = 0p;
[a45e21c]125        wake_one( prods );
126    }
127    unlock(mutex_lock);
128}
129
130static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
131
[beeff61e]132// used to hand an element to a blocked consumer and signal it
133static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
134    memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
[3f0b062]135    #if defined(__ARM_ARCH)
[02c5880]136    __atomic_thread_fence( __ATOMIC_SEQ_CST );
[3f0b062]137    #endif
[beeff61e]138    wake_one( cons );
139}
140
141// used to hand an element to a blocked producer and signal it
142static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
143    memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
[3f0b062]144    #if defined(__ARM_ARCH)
[02c5880]145    __atomic_thread_fence( __ATOMIC_SEQ_CST );
[3f0b062]146    #endif
[beeff61e]147    wake_one( prods );
148}
149
[a45e21c]150static inline void flush( channel(T) & chan, T elem ) with(chan) {
151    lock( mutex_lock );
152    while ( count == 0 && !cons`isEmpty ) {
[beeff61e]153        __cons_handoff( chan, elem );
[a45e21c]154    }
155    unlock( mutex_lock );
156}
157
158// handles buffer insert
159static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
[beeff61e]160    memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
[4a962d8]161    count += 1;
162    back++;
163    if ( back == size ) back = 0;
164}
165
[a45e21c]166// needed to avoid an extra copy in closed case
167static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
168    lock( mutex_lock );
169    #ifdef CHAN_STATS
[88b49bb]170    p_ops++;
[a45e21c]171    #endif
[beeff61e]172
173    ConsEmpty: if ( !cons`isEmpty ) {
174        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
175        __cons_handoff( chan, elem );
176        unlock( mutex_lock );
177        return true;
178    }
179
[a45e21c]180    if ( count == size ) { unlock( mutex_lock ); return false; }
[beeff61e]181
182    __buf_insert( chan, elem );
[a45e21c]183    unlock( mutex_lock );
184    return true;
185}
186
187// attempts a nonblocking insert
188// returns true if insert was successful, false otherwise
189static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
190
191// handles closed case of insert routine
192static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
[beeff61e]193    channel_closed except{ &channel_closed_vt, &elem, &chan };
[a45e21c]194    throwResume except; // throw closed resumption
195    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
[d30e3eb]196}
[4a962d8]197
[42b739d7]198static inline void insert( channel(T) & chan, T elem ) with(chan) {
[a45e21c]199    // check for close before acquire mx
200    if ( unlikely(closed) ) {
201        __closed_insert( chan, elem );
202        return;
203    }
204
[4a962d8]205    lock( mutex_lock );
206
[a45e21c]207    #ifdef CHAN_STATS
[88b49bb]208    if ( !closed ) p_ops++;
[a45e21c]209    #endif
210
211    // if closed handle
212    if ( unlikely(closed) ) {
213        unlock( mutex_lock );
214        __closed_insert( chan, elem );
215        return;
216    }
217
[beeff61e]218    // buffer count must be zero if cons are blocked (also handles zero-size case)
219    ConsEmpty: if ( !cons`isEmpty ) {
220        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
221        __cons_handoff( chan, elem );
[5c931e0]222        unlock( mutex_lock );
[beeff61e]223        return;
[5c931e0]224    }
225
[4a962d8]226    // wait if buffer is full, work will be completed by someone else
[d30e3eb]227    if ( count == size ) {
[a45e21c]228        #ifdef CHAN_STATS
[88b49bb]229        p_blocks++;
[a45e21c]230        #endif
231
232        // check for if woken due to close
233        if ( unlikely( block( prods, &elem, mutex_lock ) ) )
234            __closed_insert( chan, elem );
[4a962d8]235        return;
236    } // if
237
[beeff61e]238    __buf_insert( chan, elem );
[4a962d8]239    unlock( mutex_lock );
[a45e21c]240}
[4a962d8]241
[a45e21c]242// does the buffer remove and potentially does waiting producer work
243static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
[beeff61e]244    memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
245    count -= 1;
246    front = (front + 1) % size;
[d30e3eb]247    if (count == size - 1 && !prods`isEmpty ) {
[beeff61e]248        if ( !__handle_waituntil_OR( prods ) ) return;
249        __buf_insert( chan, *(T *)prods`first.extra );  // do waiting producer work
[d30e3eb]250        wake_one( prods );
251    }
[4a962d8]252}
[0e16a2d]253
[a45e21c]254// needed to avoid an extra copy in closed case and single return val case
255static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
[0e16a2d]256    lock( mutex_lock );
[a45e21c]257    #ifdef CHAN_STATS
[88b49bb]258    c_ops++;
[a45e21c]259    #endif
[beeff61e]260
261    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
262        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
263        __prods_handoff( chan, retval );
264        unlock( mutex_lock );
265        return true;
266    }
267
[a45e21c]268    if ( count == 0 ) { unlock( mutex_lock ); return false; }
[beeff61e]269
[a45e21c]270    __do_remove( chan, retval );
[0e16a2d]271    unlock( mutex_lock );
[a45e21c]272    return true;
[0e16a2d]273}
274
[a45e21c]275// attempts a nonblocking remove
276// returns [T, true] if insert was successful
277// returns [T, false] if insert was successful (T uninit)
278static inline [T, bool] try_remove( channel(T) & chan ) {
[0e16a2d]279    T retval;
[beeff61e]280    bool success = __internal_try_remove( chan, retval );
281    return [ retval, success ];
[a45e21c]282}
[0e16a2d]283
[beeff61e]284static inline T try_remove( channel(T) & chan ) {
[a45e21c]285    T retval;
286    __internal_try_remove( chan, retval );
[0e16a2d]287    return retval;
288}
289
[a45e21c]290// handles closed case of insert routine
291static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
[beeff61e]292    channel_closed except{ &channel_closed_vt, 0p, &chan };
[a45e21c]293    throwResume except; // throw resumption
294    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
[0e16a2d]295}
296
[a45e21c]297static inline T remove( channel(T) & chan ) with(chan) {
298    T retval;
299    if ( unlikely(closed) ) {
300        __closed_remove( chan, retval );
301        return retval;
302    }
303    lock( mutex_lock );
[0e16a2d]304
[a45e21c]305    #ifdef CHAN_STATS
[88b49bb]306    if ( !closed ) c_ops++;
[a45e21c]307    #endif
[0e16a2d]308
[a45e21c]309    if ( unlikely(closed) ) {
310        unlock( mutex_lock );
311        __closed_remove( chan, retval );
312        return retval;
313    }
[0e16a2d]314
[a45e21c]315    // have to check for the zero size channel case
[beeff61e]316    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
317        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
318        __prods_handoff( chan, retval );
[a45e21c]319        unlock( mutex_lock );
320        return retval;
321    }
[0e16a2d]322
[a45e21c]323    // wait if buffer is empty, work will be completed by someone else
[beeff61e]324    if ( count == 0 ) {
[a45e21c]325        #ifdef CHAN_STATS
[88b49bb]326        c_blocks++;
[a45e21c]327        #endif
328        // check for if woken due to close
329        if ( unlikely( block( cons, &retval, mutex_lock ) ) )
330            __closed_remove( chan, retval );
331        return retval;
332    }
[0e16a2d]333
334    // Remove from buffer
[a45e21c]335    __do_remove( chan, retval );
[0e16a2d]336    unlock( mutex_lock );
337    return retval;
338}
[c44705c]339static inline void remove( channel(T) & chan ) { T elem = (T)remove( chan ); }
340
[ca22a7c]341
[a0b59ed]342///////////////////////////////////////////////////////////////////////////////////////////
343// The following is Go-style operator support for channels
344///////////////////////////////////////////////////////////////////////////////////////////
345
[ca22a7c]346static inline void ?<<?( channel(T) & chan, T elem ) { insert( chan, elem ); }
[bf55f32]347static inline void ?<<?( T & ret, channel(T) & chan ) { ret = remove( chan ); }
[beeff61e]348
349///////////////////////////////////////////////////////////////////////////////////////////
350// The following is support for waituntil (select) statements
351///////////////////////////////////////////////////////////////////////////////////////////
352static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
[88b49bb]353    if ( !node`isListed && !node.park_counter ) return false; // handle special OR case
[beeff61e]354    lock( mutex_lock );
355    if ( node`isListed ) { // op wasn't performed
356        remove( node );
357        unlock( mutex_lock );
358        return false;
359    }
360    unlock( mutex_lock );
361
[00b046f]362    // only return true when not special OR case and status is SAT
363    return !node.park_counter ? false : *node.clause_status == __SELECT_SAT;
[beeff61e]364}
365
[6f774be]366// special case of __handle_waituntil_OR, that does some work to avoid starvation/deadlock case
367static inline bool __handle_pending( dlist( select_node ) & queue, select_node & mine ) {
368    while ( !queue`isEmpty ) {
369        // if node not a special OR case or if we win the special OR case race break
370        if ( !queue`first.clause_status || queue`first.park_counter || __pending_set_other( queue`first, mine, ((unsigned long int)(&(queue`first))) ) )
371            return true;
372       
373        // our node lost the race when toggling in __pending_set_other
374        if ( *mine.clause_status != __SELECT_PENDING )
375            return false;
376
377        // otherwise we lost the special OR race so discard node
378        try_pop_front( queue );
379    }
380    return false;
381}
382
[beeff61e]383// type used by select statement to capture a chan read as the selected operation
384struct chan_read {
[7a2c6b18]385    T * ret;
386    channel(T) * chan;
[beeff61e]387};
[bf55f32]388__CFA_SELECT_GET_TYPE( chan_read(T) );
[beeff61e]389
[7a2c6b18]390static inline void ?{}( chan_read(T) & cr, channel(T) * chan, T * ret ) {
391    cr.chan = chan;
392    cr.ret = ret;
[beeff61e]393}
[7a2c6b18]394static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ &chan, &ret }; return cr; }
[beeff61e]395
[7a2c6b18]396static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(*this.chan, this) {
397    __closed_remove( *chan, *ret );
[beeff61e]398    // if we get here then the insert succeeded
399    __make_select_node_available( node );
400}
401
[7a2c6b18]402static inline bool register_select( chan_read(T) & this, select_node & node ) with(*this.chan, this) {
[beeff61e]403    lock( mutex_lock );
[7a2c6b18]404    node.extra = ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
[beeff61e]405
406    #ifdef CHAN_STATS
[88b49bb]407    if ( !closed ) c_ops++;
[beeff61e]408    #endif
409
[c4f411e]410    if ( !node.park_counter ) {
411        // are we special case OR and front of cons is also special case OR
412        if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {
413            if ( !__make_select_node_pending( node ) ) {
414                unlock( mutex_lock );
415                return false;
416            }
[6f774be]417
418            if ( __handle_pending( prods, node ) ) {
[7a2c6b18]419                __prods_handoff( *chan, *ret );
[629c95a]420                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
[c4f411e]421                unlock( mutex_lock );
422                return true;
423            }
[6f774be]424            if ( *node.clause_status == __SELECT_PENDING )
425                __make_select_node_unsat( node );
[c4f411e]426        }
427        // check if we can complete operation. If so race to establish winner in special OR case
428        if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {
429            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
430                unlock( mutex_lock );
431                return false;
432            }
[beeff61e]433        }
434    }
435
436    if ( unlikely(closed) ) {
437        unlock( mutex_lock );
438        __handle_select_closed_read( this, node );
439        return true;
440    }
441
442    // have to check for the zero size channel case
443    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
444        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
[7a2c6b18]445        __prods_handoff( *chan, *ret );
[beeff61e]446        __set_avail_then_unlock( node, mutex_lock );
447        return true;
448    }
449
450    // wait if buffer is empty, work will be completed by someone else
451    if ( count == 0 ) {
452        #ifdef CHAN_STATS
[88b49bb]453        c_blocks++;
[beeff61e]454        #endif
455       
456        insert_last( cons, node );
457        unlock( mutex_lock );
458        return false;
459    }
460
461    // Remove from buffer
[7a2c6b18]462    __do_remove( *chan, *ret );
[beeff61e]463    __set_avail_then_unlock( node, mutex_lock );
464    return true;
465}
[7a2c6b18]466static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); }
[b93bf85]467static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
468    if ( unlikely(node.extra == 0p) ) {
[7a2c6b18]469        if ( !exception_in_flight() ) __closed_remove( *chan, *ret ); // check if woken up due to closed channel
[b93bf85]470        else return false;
471    }
[beeff61e]472    // This is only reachable if not closed or closed exception was handled
[b93bf85]473    return true;
[beeff61e]474}
475
[c44705c]476// type used by select statement to capture a chan read as the selected operation that doesn't have a param to read to
477struct chan_read_no_ret {
[7a2c6b18]478    T retval;
479    chan_read( T ) c_read;
[c44705c]480};
481__CFA_SELECT_GET_TYPE( chan_read_no_ret(T) );
482
483static inline void ?{}( chan_read_no_ret(T) & this, channel(T) & chan ) {
[7a2c6b18]484    this.c_read{ &chan, &this.retval };
485}
486
487static inline chan_read_no_ret(T) remove( channel(T) & chan ) { chan_read_no_ret(T) c_read{ chan }; return c_read; }
488static inline bool register_select( chan_read_no_ret(T) & this, select_node & node ) {
489    this.c_read.ret = &this.retval;
490    return register_select( this.c_read, node );
[c44705c]491}
[7a2c6b18]492static inline bool unregister_select( chan_read_no_ret(T) & this, select_node & node ) { return unregister_select( this.c_read, node ); }
493static inline bool on_selected( chan_read_no_ret(T) & this, select_node & node ) { return on_selected( this.c_read, node ); }
[c44705c]494
[beeff61e]495// type used by select statement to capture a chan write as the selected operation
496struct chan_write {
497    T elem;
[7a2c6b18]498    channel(T) * chan;
[beeff61e]499};
[bf55f32]500__CFA_SELECT_GET_TYPE( chan_write(T) );
[beeff61e]501
[7a2c6b18]502static inline void ?{}( chan_write(T) & cw, channel(T) * chan, T elem ) {
503    cw.chan = chan;
[beeff61e]504    memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
505}
[7a2c6b18]506static inline chan_write(T) ?<<?( channel(T) & chan, T elem ) { chan_write(T) cw{ &chan, elem }; return cw; }
507static inline chan_write(T) insert( T elem, channel(T) & chan) { chan_write(T) cw{ &chan, elem }; return cw; }
[beeff61e]508
[7a2c6b18]509static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(*this.chan, this) {
510    __closed_insert( *chan, elem );
[beeff61e]511    // if we get here then the insert succeeded
512    __make_select_node_available( node );
513}
514
[7a2c6b18]515static inline bool register_select( chan_write(T) & this, select_node & node ) with(*this.chan, this) {
[beeff61e]516    lock( mutex_lock );
517    node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
518
519    #ifdef CHAN_STATS
[88b49bb]520    if ( !closed ) p_ops++;
[beeff61e]521    #endif
522
[c4f411e]523    // special OR case handling
524    if ( !node.park_counter ) {
525        // are we special case OR and front of cons is also special case OR
526        if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {
527            if ( !__make_select_node_pending( node ) ) {
528                unlock( mutex_lock );
529                return false;
530            }
[6f774be]531
532            if ( __handle_pending( cons, node ) ) {
[7a2c6b18]533                __cons_handoff( *chan, elem );
[629c95a]534                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
[c4f411e]535                unlock( mutex_lock );
536                return true;
537            }
[6f774be]538            if ( *node.clause_status == __SELECT_PENDING )
539                __make_select_node_unsat( node );
[c4f411e]540        }
541        // check if we can complete operation. If so race to establish winner in special OR case
542        if ( count != size || !cons`isEmpty || unlikely(closed) ) {
543            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
544                unlock( mutex_lock );
545                return false;
546            }
[beeff61e]547        }
548    }
549
550    // if closed handle
551    if ( unlikely(closed) ) {
552        unlock( mutex_lock );
553        __handle_select_closed_write( this, node );
554        return true;
555    }
556
557    // handle blocked consumer case via handoff (buffer is implicitly empty)
558    ConsEmpty: if ( !cons`isEmpty ) {
[cb69fba]559        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
[7a2c6b18]560        __cons_handoff( *chan, elem );
[beeff61e]561        __set_avail_then_unlock( node, mutex_lock );
562        return true;
563    }
564
565    // insert node in list if buffer is full, work will be completed by someone else
566    if ( count == size ) {
567        #ifdef CHAN_STATS
[88b49bb]568        p_blocks++;
[beeff61e]569        #endif
570
571        insert_last( prods, node );
572        unlock( mutex_lock );
573        return false;
574    } // if
575
576    // otherwise carry out write either via normal insert
[7a2c6b18]577    __buf_insert( *chan, elem );
[beeff61e]578    __set_avail_then_unlock( node, mutex_lock );
579    return true;
580}
[7a2c6b18]581static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); }
[beeff61e]582
[b93bf85]583static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
584    if ( unlikely(node.extra == 0p) ) {
[7a2c6b18]585        if ( !exception_in_flight() ) __closed_insert( *chan, elem ); // check if woken up due to closed channel
[b93bf85]586        else return false;
587    }
[beeff61e]588    // This is only reachable if not closed or closed exception was handled
[b93bf85]589    return true;
[beeff61e]590}
591
[0e16a2d]592} // forall( T )
[beeff61e]593
594
Note: See TracBrowser for help on using the repository browser.