source: libcfa/src/concurrency/channel.hfa@ 8941b6b

Last change on this file since 8941b6b was 3f0b062, checked in by caparsons <caparson@…>, 2 years ago

ifdef'd the arm fences that were added to channels so that they only appear on the arm

  • Property mode set to 100644
File size: 21.1 KB
RevLine 
[629c95a]1//
2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// channel.hfa -- LIBCFATHREAD
8// Runtime locks that used with the runtime thread system.
9//
10// Author : Colby Alexander Parsons
11// Created On : Thu Jan 21 19:46:50 2022
12// Last Modified By :
13// Last Modified On :
14// Update Count :
15//
16
[5e4a830]17#pragma once
18
[4a962d8]19#include <locks.hfa>
[d30e3eb]20#include <list.hfa>
[beeff61e]21#include "select.hfa"
[a45e21c]22
23// returns true if woken due to shutdown
24// blocks thread on list and releases passed lock
[beeff61e]25static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
26 select_node sn{ active_thread(), elem_ptr };
27 insert_last( queue, sn );
[a45e21c]28 unlock( lock );
29 park();
[3f0b062]30 #if defined(__ARM_ARCH)
[ca995e3]31 __atomic_thread_fence( __ATOMIC_SEQ_CST );
[3f0b062]32 #endif
[beeff61e]33 return sn.extra == 0p;
34}
35
36// Waituntil support (un)register_select helper routine
37// Sets select node avail if not special OR case and then unlocks
38static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
39 if ( node.park_counter ) __make_select_node_available( node );
40 unlock( mutex_lock );
[a45e21c]41}
42
43// void * used for some fields since exceptions don't work with parametric polymorphism currently
44exception channel_closed {
45 // on failed insert elem is a ptr to the element attempting to be inserted
46 // on failed remove elem ptr is 0p
47 // on resumption of a failed insert this elem will be inserted
48 // so a user may modify it in the resumption handler
49 void * elem;
50
51 // pointer to chan that is closed
52 void * closed_chan;
53};
54vtable(channel_closed) channel_closed_vt;
55
[ded6c2a6]56static inline bool is_insert( channel_closed & e ) { return e.elem != 0p; }
57static inline bool is_remove( channel_closed & e ) { return e.elem == 0p; }
[3eeeb88]58
[a45e21c]59// #define CHAN_STATS // define this to get channel stats printed in dtor
60
[4a962d8]61forall( T ) {
[d30e3eb]62
[a45e21c]63struct __attribute__((aligned(128))) channel {
64 size_t size, front, back, count;
[4a962d8]65 T * buffer;
[beeff61e]66 dlist( select_node ) prods, cons; // lists of blocked threads
[629c95a]67 go_mutex mutex_lock; // MX lock
68 bool closed; // indicates channel close/open
[a45e21c]69 #ifdef CHAN_STATS
[88b49bb]70 size_t p_blocks, p_ops, c_blocks, c_ops; // counts total ops and ops resulting in a blocked thd
[a45e21c]71 #endif
[4a962d8]72};
[7a2c6b18]73static inline void ?{}( channel(T) & this, channel(T) this2 ) = void;
74static inline void ?=?( channel(T) & this, channel(T) this2 ) = void;
[4a962d8]75
76static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
77 size = _size;
78 front = back = count = 0;
[beeff61e]79 if ( size != 0 ) buffer = aalloc( size );
[4a962d8]80 prods{};
81 cons{};
82 mutex_lock{};
[a45e21c]83 closed = false;
84 #ifdef CHAN_STATS
[88b49bb]85 p_blocks = 0;
86 p_ops = 0;
87 c_blocks = 0;
88 c_ops = 0;
[a45e21c]89 #endif
[4a962d8]90}
91
92static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
[a45e21c]93static inline void ^?{}( channel(T) &c ) with(c) {
94 #ifdef CHAN_STATS
[88b49bb]95 printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100);
96 printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100);
97 printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100);
[a45e21c]98 #endif
[88b49bb]99 verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || cons`isEmpty && prods`isEmpty,
100 "Attempted to delete channel with waiting threads (Deadlock).\n" );
[beeff61e]101 if ( size != 0 ) delete( buffer );
[a45e21c]102}
[5908fb4]103static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }
104static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }
[d30e3eb]105static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
106static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
107static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
[4a962d8]108
[a45e21c]109// closes the channel and notifies all blocked threads
110static inline void close( channel(T) & chan ) with(chan) {
111 lock( mutex_lock );
112 closed = true;
113
114 // flush waiting consumers and producers
115 while ( has_waiting_consumers( chan ) ) {
[beeff61e]116 if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
117 break; // if __handle_waituntil_OR returns false cons is empty so break
118 cons`first.extra = 0p;
[a45e21c]119 wake_one( cons );
120 }
121 while ( has_waiting_producers( chan ) ) {
[beeff61e]122 if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
123 break; // if __handle_waituntil_OR returns false prods is empty so break
124 prods`first.extra = 0p;
[a45e21c]125 wake_one( prods );
126 }
127 unlock(mutex_lock);
128}
129
130static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
131
[beeff61e]132// used to hand an element to a blocked consumer and signal it
133static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
134 memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
[3f0b062]135 #if defined(__ARM_ARCH)
[02c5880]136 __atomic_thread_fence( __ATOMIC_SEQ_CST );
[3f0b062]137 #endif
[beeff61e]138 wake_one( cons );
139}
140
141// used to hand an element to a blocked producer and signal it
142static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
143 memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
[3f0b062]144 #if defined(__ARM_ARCH)
[02c5880]145 __atomic_thread_fence( __ATOMIC_SEQ_CST );
[3f0b062]146 #endif
[beeff61e]147 wake_one( prods );
148}
149
[a45e21c]150static inline void flush( channel(T) & chan, T elem ) with(chan) {
151 lock( mutex_lock );
152 while ( count == 0 && !cons`isEmpty ) {
[beeff61e]153 __cons_handoff( chan, elem );
[a45e21c]154 }
155 unlock( mutex_lock );
156}
157
158// handles buffer insert
159static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
[beeff61e]160 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
[4a962d8]161 count += 1;
162 back++;
163 if ( back == size ) back = 0;
164}
165
[a45e21c]166// needed to avoid an extra copy in closed case
167static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
168 lock( mutex_lock );
169 #ifdef CHAN_STATS
[88b49bb]170 p_ops++;
[a45e21c]171 #endif
[beeff61e]172
173 ConsEmpty: if ( !cons`isEmpty ) {
174 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
175 __cons_handoff( chan, elem );
176 unlock( mutex_lock );
177 return true;
178 }
179
[a45e21c]180 if ( count == size ) { unlock( mutex_lock ); return false; }
[beeff61e]181
182 __buf_insert( chan, elem );
[a45e21c]183 unlock( mutex_lock );
184 return true;
185}
186
187// attempts a nonblocking insert
188// returns true if insert was successful, false otherwise
189static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
190
191// handles closed case of insert routine
192static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
[beeff61e]193 channel_closed except{ &channel_closed_vt, &elem, &chan };
[a45e21c]194 throwResume except; // throw closed resumption
195 if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
[d30e3eb]196}
[4a962d8]197
[42b739d7]198static inline void insert( channel(T) & chan, T elem ) with(chan) {
[a45e21c]199 // check for close before acquire mx
200 if ( unlikely(closed) ) {
201 __closed_insert( chan, elem );
202 return;
203 }
204
[4a962d8]205 lock( mutex_lock );
206
[a45e21c]207 #ifdef CHAN_STATS
[88b49bb]208 if ( !closed ) p_ops++;
[a45e21c]209 #endif
210
211 // if closed handle
212 if ( unlikely(closed) ) {
213 unlock( mutex_lock );
214 __closed_insert( chan, elem );
215 return;
216 }
217
[beeff61e]218 // buffer count must be zero if cons are blocked (also handles zero-size case)
219 ConsEmpty: if ( !cons`isEmpty ) {
220 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
221 __cons_handoff( chan, elem );
[5c931e0]222 unlock( mutex_lock );
[beeff61e]223 return;
[5c931e0]224 }
225
[4a962d8]226 // wait if buffer is full, work will be completed by someone else
[d30e3eb]227 if ( count == size ) {
[a45e21c]228 #ifdef CHAN_STATS
[88b49bb]229 p_blocks++;
[a45e21c]230 #endif
231
232 // check for if woken due to close
233 if ( unlikely( block( prods, &elem, mutex_lock ) ) )
234 __closed_insert( chan, elem );
[4a962d8]235 return;
236 } // if
237
[beeff61e]238 __buf_insert( chan, elem );
[4a962d8]239 unlock( mutex_lock );
[a45e21c]240}
[4a962d8]241
[a45e21c]242// does the buffer remove and potentially does waiting producer work
243static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
[beeff61e]244 memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
245 count -= 1;
246 front = (front + 1) % size;
[d30e3eb]247 if (count == size - 1 && !prods`isEmpty ) {
[beeff61e]248 if ( !__handle_waituntil_OR( prods ) ) return;
249 __buf_insert( chan, *(T *)prods`first.extra ); // do waiting producer work
[d30e3eb]250 wake_one( prods );
251 }
[4a962d8]252}
[0e16a2d]253
[a45e21c]254// needed to avoid an extra copy in closed case and single return val case
255static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
[0e16a2d]256 lock( mutex_lock );
[a45e21c]257 #ifdef CHAN_STATS
[88b49bb]258 c_ops++;
[a45e21c]259 #endif
[beeff61e]260
261 ZeroSize: if ( size == 0 && !prods`isEmpty ) {
262 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
263 __prods_handoff( chan, retval );
264 unlock( mutex_lock );
265 return true;
266 }
267
[a45e21c]268 if ( count == 0 ) { unlock( mutex_lock ); return false; }
[beeff61e]269
[a45e21c]270 __do_remove( chan, retval );
[0e16a2d]271 unlock( mutex_lock );
[a45e21c]272 return true;
[0e16a2d]273}
274
[a45e21c]275// attempts a nonblocking remove
276// returns [T, true] if insert was successful
277// returns [T, false] if insert was successful (T uninit)
278static inline [T, bool] try_remove( channel(T) & chan ) {
[0e16a2d]279 T retval;
[beeff61e]280 bool success = __internal_try_remove( chan, retval );
281 return [ retval, success ];
[a45e21c]282}
[0e16a2d]283
[beeff61e]284static inline T try_remove( channel(T) & chan ) {
[a45e21c]285 T retval;
286 __internal_try_remove( chan, retval );
[0e16a2d]287 return retval;
288}
289
[a45e21c]290// handles closed case of insert routine
291static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
[beeff61e]292 channel_closed except{ &channel_closed_vt, 0p, &chan };
[a45e21c]293 throwResume except; // throw resumption
294 if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
[0e16a2d]295}
296
[a45e21c]297static inline T remove( channel(T) & chan ) with(chan) {
298 T retval;
299 if ( unlikely(closed) ) {
300 __closed_remove( chan, retval );
301 return retval;
302 }
303 lock( mutex_lock );
[0e16a2d]304
[a45e21c]305 #ifdef CHAN_STATS
[88b49bb]306 if ( !closed ) c_ops++;
[a45e21c]307 #endif
[0e16a2d]308
[a45e21c]309 if ( unlikely(closed) ) {
310 unlock( mutex_lock );
311 __closed_remove( chan, retval );
312 return retval;
313 }
[0e16a2d]314
[a45e21c]315 // have to check for the zero size channel case
[beeff61e]316 ZeroSize: if ( size == 0 && !prods`isEmpty ) {
317 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
318 __prods_handoff( chan, retval );
[a45e21c]319 unlock( mutex_lock );
320 return retval;
321 }
[0e16a2d]322
[a45e21c]323 // wait if buffer is empty, work will be completed by someone else
[beeff61e]324 if ( count == 0 ) {
[a45e21c]325 #ifdef CHAN_STATS
[88b49bb]326 c_blocks++;
[a45e21c]327 #endif
328 // check for if woken due to close
329 if ( unlikely( block( cons, &retval, mutex_lock ) ) )
330 __closed_remove( chan, retval );
331 return retval;
332 }
[0e16a2d]333
334 // Remove from buffer
[a45e21c]335 __do_remove( chan, retval );
[0e16a2d]336 unlock( mutex_lock );
337 return retval;
338}
[c44705c]339static inline void remove( channel(T) & chan ) { T elem = (T)remove( chan ); }
340
[ca22a7c]341
[a0b59ed]342///////////////////////////////////////////////////////////////////////////////////////////
343// The following is Go-style operator support for channels
344///////////////////////////////////////////////////////////////////////////////////////////
345
[ca22a7c]346static inline void ?<<?( channel(T) & chan, T elem ) { insert( chan, elem ); }
[bf55f32]347static inline void ?<<?( T & ret, channel(T) & chan ) { ret = remove( chan ); }
[beeff61e]348
349///////////////////////////////////////////////////////////////////////////////////////////
350// The following is support for waituntil (select) statements
351///////////////////////////////////////////////////////////////////////////////////////////
352static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
[88b49bb]353 if ( !node`isListed && !node.park_counter ) return false; // handle special OR case
[beeff61e]354 lock( mutex_lock );
355 if ( node`isListed ) { // op wasn't performed
356 remove( node );
357 unlock( mutex_lock );
358 return false;
359 }
360 unlock( mutex_lock );
361
[00b046f]362 // only return true when not special OR case and status is SAT
363 return !node.park_counter ? false : *node.clause_status == __SELECT_SAT;
[beeff61e]364}
365
[6f774be]366// special case of __handle_waituntil_OR, that does some work to avoid starvation/deadlock case
367static inline bool __handle_pending( dlist( select_node ) & queue, select_node & mine ) {
368 while ( !queue`isEmpty ) {
369 // if node not a special OR case or if we win the special OR case race break
370 if ( !queue`first.clause_status || queue`first.park_counter || __pending_set_other( queue`first, mine, ((unsigned long int)(&(queue`first))) ) )
371 return true;
372
373 // our node lost the race when toggling in __pending_set_other
374 if ( *mine.clause_status != __SELECT_PENDING )
375 return false;
376
377 // otherwise we lost the special OR race so discard node
378 try_pop_front( queue );
379 }
380 return false;
381}
382
[beeff61e]383// type used by select statement to capture a chan read as the selected operation
384struct chan_read {
[7a2c6b18]385 T * ret;
386 channel(T) * chan;
[beeff61e]387};
[bf55f32]388__CFA_SELECT_GET_TYPE( chan_read(T) );
[beeff61e]389
[7a2c6b18]390static inline void ?{}( chan_read(T) & cr, channel(T) * chan, T * ret ) {
391 cr.chan = chan;
392 cr.ret = ret;
[beeff61e]393}
[7a2c6b18]394static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ &chan, &ret }; return cr; }
[beeff61e]395
[7a2c6b18]396static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(*this.chan, this) {
397 __closed_remove( *chan, *ret );
[beeff61e]398 // if we get here then the insert succeeded
399 __make_select_node_available( node );
400}
401
[7a2c6b18]402static inline bool register_select( chan_read(T) & this, select_node & node ) with(*this.chan, this) {
[beeff61e]403 lock( mutex_lock );
[7a2c6b18]404 node.extra = ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
[beeff61e]405
406 #ifdef CHAN_STATS
[88b49bb]407 if ( !closed ) c_ops++;
[beeff61e]408 #endif
409
[c4f411e]410 if ( !node.park_counter ) {
411 // are we special case OR and front of cons is also special case OR
412 if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {
413 if ( !__make_select_node_pending( node ) ) {
414 unlock( mutex_lock );
415 return false;
416 }
[6f774be]417
418 if ( __handle_pending( prods, node ) ) {
[7a2c6b18]419 __prods_handoff( *chan, *ret );
[629c95a]420 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
[c4f411e]421 unlock( mutex_lock );
422 return true;
423 }
[6f774be]424 if ( *node.clause_status == __SELECT_PENDING )
425 __make_select_node_unsat( node );
[c4f411e]426 }
427 // check if we can complete operation. If so race to establish winner in special OR case
428 if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {
429 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
430 unlock( mutex_lock );
431 return false;
432 }
[beeff61e]433 }
434 }
435
436 if ( unlikely(closed) ) {
437 unlock( mutex_lock );
438 __handle_select_closed_read( this, node );
439 return true;
440 }
441
442 // have to check for the zero size channel case
443 ZeroSize: if ( size == 0 && !prods`isEmpty ) {
444 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
[7a2c6b18]445 __prods_handoff( *chan, *ret );
[beeff61e]446 __set_avail_then_unlock( node, mutex_lock );
447 return true;
448 }
449
450 // wait if buffer is empty, work will be completed by someone else
451 if ( count == 0 ) {
452 #ifdef CHAN_STATS
[88b49bb]453 c_blocks++;
[beeff61e]454 #endif
455
456 insert_last( cons, node );
457 unlock( mutex_lock );
458 return false;
459 }
460
461 // Remove from buffer
[7a2c6b18]462 __do_remove( *chan, *ret );
[beeff61e]463 __set_avail_then_unlock( node, mutex_lock );
464 return true;
465}
[7a2c6b18]466static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); }
[b93bf85]467static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
468 if ( unlikely(node.extra == 0p) ) {
[7a2c6b18]469 if ( !exception_in_flight() ) __closed_remove( *chan, *ret ); // check if woken up due to closed channel
[b93bf85]470 else return false;
471 }
[beeff61e]472 // This is only reachable if not closed or closed exception was handled
[b93bf85]473 return true;
[beeff61e]474}
475
[c44705c]476// type used by select statement to capture a chan read as the selected operation that doesn't have a param to read to
477struct chan_read_no_ret {
[7a2c6b18]478 T retval;
479 chan_read( T ) c_read;
[c44705c]480};
481__CFA_SELECT_GET_TYPE( chan_read_no_ret(T) );
482
483static inline void ?{}( chan_read_no_ret(T) & this, channel(T) & chan ) {
[7a2c6b18]484 this.c_read{ &chan, &this.retval };
485}
486
487static inline chan_read_no_ret(T) remove( channel(T) & chan ) { chan_read_no_ret(T) c_read{ chan }; return c_read; }
488static inline bool register_select( chan_read_no_ret(T) & this, select_node & node ) {
489 this.c_read.ret = &this.retval;
490 return register_select( this.c_read, node );
[c44705c]491}
[7a2c6b18]492static inline bool unregister_select( chan_read_no_ret(T) & this, select_node & node ) { return unregister_select( this.c_read, node ); }
493static inline bool on_selected( chan_read_no_ret(T) & this, select_node & node ) { return on_selected( this.c_read, node ); }
[c44705c]494
[beeff61e]495// type used by select statement to capture a chan write as the selected operation
496struct chan_write {
497 T elem;
[7a2c6b18]498 channel(T) * chan;
[beeff61e]499};
[bf55f32]500__CFA_SELECT_GET_TYPE( chan_write(T) );
[beeff61e]501
[7a2c6b18]502static inline void ?{}( chan_write(T) & cw, channel(T) * chan, T elem ) {
503 cw.chan = chan;
[beeff61e]504 memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
505}
[7a2c6b18]506static inline chan_write(T) ?<<?( channel(T) & chan, T elem ) { chan_write(T) cw{ &chan, elem }; return cw; }
507static inline chan_write(T) insert( T elem, channel(T) & chan) { chan_write(T) cw{ &chan, elem }; return cw; }
[beeff61e]508
[7a2c6b18]509static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(*this.chan, this) {
510 __closed_insert( *chan, elem );
[beeff61e]511 // if we get here then the insert succeeded
512 __make_select_node_available( node );
513}
514
[7a2c6b18]515static inline bool register_select( chan_write(T) & this, select_node & node ) with(*this.chan, this) {
[beeff61e]516 lock( mutex_lock );
517 node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
518
519 #ifdef CHAN_STATS
[88b49bb]520 if ( !closed ) p_ops++;
[beeff61e]521 #endif
522
[c4f411e]523 // special OR case handling
524 if ( !node.park_counter ) {
525 // are we special case OR and front of cons is also special case OR
526 if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {
527 if ( !__make_select_node_pending( node ) ) {
528 unlock( mutex_lock );
529 return false;
530 }
[6f774be]531
532 if ( __handle_pending( cons, node ) ) {
[7a2c6b18]533 __cons_handoff( *chan, elem );
[629c95a]534 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
[c4f411e]535 unlock( mutex_lock );
536 return true;
537 }
[6f774be]538 if ( *node.clause_status == __SELECT_PENDING )
539 __make_select_node_unsat( node );
[c4f411e]540 }
541 // check if we can complete operation. If so race to establish winner in special OR case
542 if ( count != size || !cons`isEmpty || unlikely(closed) ) {
543 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
544 unlock( mutex_lock );
545 return false;
546 }
[beeff61e]547 }
548 }
549
550 // if closed handle
551 if ( unlikely(closed) ) {
552 unlock( mutex_lock );
553 __handle_select_closed_write( this, node );
554 return true;
555 }
556
557 // handle blocked consumer case via handoff (buffer is implicitly empty)
558 ConsEmpty: if ( !cons`isEmpty ) {
[cb69fba]559 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
[7a2c6b18]560 __cons_handoff( *chan, elem );
[beeff61e]561 __set_avail_then_unlock( node, mutex_lock );
562 return true;
563 }
564
565 // insert node in list if buffer is full, work will be completed by someone else
566 if ( count == size ) {
567 #ifdef CHAN_STATS
[88b49bb]568 p_blocks++;
[beeff61e]569 #endif
570
571 insert_last( prods, node );
572 unlock( mutex_lock );
573 return false;
574 } // if
575
576 // otherwise carry out write either via normal insert
[7a2c6b18]577 __buf_insert( *chan, elem );
[beeff61e]578 __set_avail_then_unlock( node, mutex_lock );
579 return true;
580}
[7a2c6b18]581static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); }
[beeff61e]582
[b93bf85]583static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
584 if ( unlikely(node.extra == 0p) ) {
[7a2c6b18]585 if ( !exception_in_flight() ) __closed_insert( *chan, elem ); // check if woken up due to closed channel
[b93bf85]586 else return false;
587 }
[beeff61e]588 // This is only reachable if not closed or closed exception was handled
[b93bf85]589 return true;
[beeff61e]590}
591
[0e16a2d]592} // forall( T )
[beeff61e]593
594
Note: See TracBrowser for help on using the repository browser.