source: libcfa/src/concurrency/channel.hfa@ db94b5d

Last change on this file since db94b5d was c44705c, checked in by caparsons <caparson@…>, 2 years ago

fixed remove ambiguity issue

  • Property mode set to 100644
File size: 20.6 KB
RevLine 
[629c95a]1//
2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// channel.hfa -- LIBCFATHREAD
8// Runtime locks that used with the runtime thread system.
9//
10// Author : Colby Alexander Parsons
11// Created On : Thu Jan 21 19:46:50 2022
12// Last Modified By :
13// Last Modified On :
14// Update Count :
15//
16
[5e4a830]17#pragma once
18
[4a962d8]19#include <locks.hfa>
[d30e3eb]20#include <list.hfa>
[beeff61e]21#include "select.hfa"
[a45e21c]22
23// returns true if woken due to shutdown
24// blocks thread on list and releases passed lock
[beeff61e]25static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
26 select_node sn{ active_thread(), elem_ptr };
27 insert_last( queue, sn );
[a45e21c]28 unlock( lock );
29 park();
[beeff61e]30 return sn.extra == 0p;
31}
32
33// Waituntil support (un)register_select helper routine
34// Sets select node avail if not special OR case and then unlocks
35static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
36 if ( node.park_counter ) __make_select_node_available( node );
37 unlock( mutex_lock );
[a45e21c]38}
39
40// void * used for some fields since exceptions don't work with parametric polymorphism currently
41exception channel_closed {
42 // on failed insert elem is a ptr to the element attempting to be inserted
43 // on failed remove elem ptr is 0p
44 // on resumption of a failed insert this elem will be inserted
45 // so a user may modify it in the resumption handler
46 void * elem;
47
48 // pointer to chan that is closed
49 void * closed_chan;
50};
51vtable(channel_closed) channel_closed_vt;
52
[ded6c2a6]53static inline bool is_insert( channel_closed & e ) { return e.elem != 0p; }
54static inline bool is_remove( channel_closed & e ) { return e.elem == 0p; }
[3eeeb88]55
[a45e21c]56// #define CHAN_STATS // define this to get channel stats printed in dtor
57
[4a962d8]58forall( T ) {
[d30e3eb]59
[a45e21c]60struct __attribute__((aligned(128))) channel {
61 size_t size, front, back, count;
[4a962d8]62 T * buffer;
[beeff61e]63 dlist( select_node ) prods, cons; // lists of blocked threads
[629c95a]64 go_mutex mutex_lock; // MX lock
65 bool closed; // indicates channel close/open
[a45e21c]66 #ifdef CHAN_STATS
[88b49bb]67 size_t p_blocks, p_ops, c_blocks, c_ops; // counts total ops and ops resulting in a blocked thd
[a45e21c]68 #endif
[4a962d8]69};
70
71static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
72 size = _size;
73 front = back = count = 0;
[beeff61e]74 if ( size != 0 ) buffer = aalloc( size );
[4a962d8]75 prods{};
76 cons{};
77 mutex_lock{};
[a45e21c]78 closed = false;
79 #ifdef CHAN_STATS
[88b49bb]80 p_blocks = 0;
81 p_ops = 0;
82 c_blocks = 0;
83 c_ops = 0;
[a45e21c]84 #endif
[4a962d8]85}
86
87static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
[a45e21c]88static inline void ^?{}( channel(T) &c ) with(c) {
89 #ifdef CHAN_STATS
[88b49bb]90 printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100);
91 printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100);
92 printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100);
[a45e21c]93 #endif
[88b49bb]94 verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || cons`isEmpty && prods`isEmpty,
95 "Attempted to delete channel with waiting threads (Deadlock).\n" );
[beeff61e]96 if ( size != 0 ) delete( buffer );
[a45e21c]97}
[5908fb4]98static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }
99static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }
[d30e3eb]100static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
101static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
102static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
[4a962d8]103
[a45e21c]104// closes the channel and notifies all blocked threads
105static inline void close( channel(T) & chan ) with(chan) {
106 lock( mutex_lock );
107 closed = true;
108
109 // flush waiting consumers and producers
110 while ( has_waiting_consumers( chan ) ) {
[beeff61e]111 if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
112 break; // if __handle_waituntil_OR returns false cons is empty so break
113 cons`first.extra = 0p;
[a45e21c]114 wake_one( cons );
115 }
116 while ( has_waiting_producers( chan ) ) {
[beeff61e]117 if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
118 break; // if __handle_waituntil_OR returns false prods is empty so break
119 prods`first.extra = 0p;
[a45e21c]120 wake_one( prods );
121 }
122 unlock(mutex_lock);
123}
124
125static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
126
[beeff61e]127// used to hand an element to a blocked consumer and signal it
128static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
129 memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
130 wake_one( cons );
131}
132
133// used to hand an element to a blocked producer and signal it
134static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
135 memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
136 wake_one( prods );
137}
138
[a45e21c]139static inline void flush( channel(T) & chan, T elem ) with(chan) {
140 lock( mutex_lock );
141 while ( count == 0 && !cons`isEmpty ) {
[beeff61e]142 __cons_handoff( chan, elem );
[a45e21c]143 }
144 unlock( mutex_lock );
145}
146
147// handles buffer insert
148static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
[beeff61e]149 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
[4a962d8]150 count += 1;
151 back++;
152 if ( back == size ) back = 0;
153}
154
[a45e21c]155// needed to avoid an extra copy in closed case
156static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
157 lock( mutex_lock );
158 #ifdef CHAN_STATS
[88b49bb]159 p_ops++;
[a45e21c]160 #endif
[beeff61e]161
162 ConsEmpty: if ( !cons`isEmpty ) {
163 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
164 __cons_handoff( chan, elem );
165 unlock( mutex_lock );
166 return true;
167 }
168
[a45e21c]169 if ( count == size ) { unlock( mutex_lock ); return false; }
[beeff61e]170
171 __buf_insert( chan, elem );
[a45e21c]172 unlock( mutex_lock );
173 return true;
174}
175
176// attempts a nonblocking insert
177// returns true if insert was successful, false otherwise
178static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
179
180// handles closed case of insert routine
181static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
[beeff61e]182 channel_closed except{ &channel_closed_vt, &elem, &chan };
[a45e21c]183 throwResume except; // throw closed resumption
184 if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
[d30e3eb]185}
[4a962d8]186
[42b739d7]187static inline void insert( channel(T) & chan, T elem ) with(chan) {
[a45e21c]188 // check for close before acquire mx
189 if ( unlikely(closed) ) {
190 __closed_insert( chan, elem );
191 return;
192 }
193
[4a962d8]194 lock( mutex_lock );
195
[a45e21c]196 #ifdef CHAN_STATS
[88b49bb]197 if ( !closed ) p_ops++;
[a45e21c]198 #endif
199
200 // if closed handle
201 if ( unlikely(closed) ) {
202 unlock( mutex_lock );
203 __closed_insert( chan, elem );
204 return;
205 }
206
[beeff61e]207 // buffer count must be zero if cons are blocked (also handles zero-size case)
208 ConsEmpty: if ( !cons`isEmpty ) {
209 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
210 __cons_handoff( chan, elem );
[5c931e0]211 unlock( mutex_lock );
[beeff61e]212 return;
[5c931e0]213 }
214
[4a962d8]215 // wait if buffer is full, work will be completed by someone else
[d30e3eb]216 if ( count == size ) {
[a45e21c]217 #ifdef CHAN_STATS
[88b49bb]218 p_blocks++;
[a45e21c]219 #endif
220
221 // check for if woken due to close
222 if ( unlikely( block( prods, &elem, mutex_lock ) ) )
223 __closed_insert( chan, elem );
[4a962d8]224 return;
225 } // if
226
[beeff61e]227 __buf_insert( chan, elem );
[4a962d8]228 unlock( mutex_lock );
[a45e21c]229}
[4a962d8]230
[a45e21c]231// does the buffer remove and potentially does waiting producer work
232static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
[beeff61e]233 memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
234 count -= 1;
235 front = (front + 1) % size;
[d30e3eb]236 if (count == size - 1 && !prods`isEmpty ) {
[beeff61e]237 if ( !__handle_waituntil_OR( prods ) ) return;
238 __buf_insert( chan, *(T *)prods`first.extra ); // do waiting producer work
[d30e3eb]239 wake_one( prods );
240 }
[4a962d8]241}
[0e16a2d]242
[a45e21c]243// needed to avoid an extra copy in closed case and single return val case
244static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
[0e16a2d]245 lock( mutex_lock );
[a45e21c]246 #ifdef CHAN_STATS
[88b49bb]247 c_ops++;
[a45e21c]248 #endif
[beeff61e]249
250 ZeroSize: if ( size == 0 && !prods`isEmpty ) {
251 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
252 __prods_handoff( chan, retval );
253 unlock( mutex_lock );
254 return true;
255 }
256
[a45e21c]257 if ( count == 0 ) { unlock( mutex_lock ); return false; }
[beeff61e]258
[a45e21c]259 __do_remove( chan, retval );
[0e16a2d]260 unlock( mutex_lock );
[a45e21c]261 return true;
[0e16a2d]262}
263
[a45e21c]264// attempts a nonblocking remove
265// returns [T, true] if insert was successful
266// returns [T, false] if insert was successful (T uninit)
267static inline [T, bool] try_remove( channel(T) & chan ) {
[0e16a2d]268 T retval;
[beeff61e]269 bool success = __internal_try_remove( chan, retval );
270 return [ retval, success ];
[a45e21c]271}
[0e16a2d]272
[beeff61e]273static inline T try_remove( channel(T) & chan ) {
[a45e21c]274 T retval;
275 __internal_try_remove( chan, retval );
[0e16a2d]276 return retval;
277}
278
[a45e21c]279// handles closed case of insert routine
280static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
[beeff61e]281 channel_closed except{ &channel_closed_vt, 0p, &chan };
[a45e21c]282 throwResume except; // throw resumption
283 if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
[0e16a2d]284}
285
[a45e21c]286static inline T remove( channel(T) & chan ) with(chan) {
287 T retval;
288 if ( unlikely(closed) ) {
289 __closed_remove( chan, retval );
290 return retval;
291 }
292 lock( mutex_lock );
[0e16a2d]293
[a45e21c]294 #ifdef CHAN_STATS
[88b49bb]295 if ( !closed ) c_ops++;
[a45e21c]296 #endif
[0e16a2d]297
[a45e21c]298 if ( unlikely(closed) ) {
299 unlock( mutex_lock );
300 __closed_remove( chan, retval );
301 return retval;
302 }
[0e16a2d]303
[a45e21c]304 // have to check for the zero size channel case
[beeff61e]305 ZeroSize: if ( size == 0 && !prods`isEmpty ) {
306 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
307 __prods_handoff( chan, retval );
[a45e21c]308 unlock( mutex_lock );
309 return retval;
310 }
[0e16a2d]311
[a45e21c]312 // wait if buffer is empty, work will be completed by someone else
[beeff61e]313 if ( count == 0 ) {
[a45e21c]314 #ifdef CHAN_STATS
[88b49bb]315 c_blocks++;
[a45e21c]316 #endif
317 // check for if woken due to close
318 if ( unlikely( block( cons, &retval, mutex_lock ) ) )
319 __closed_remove( chan, retval );
320 return retval;
321 }
[0e16a2d]322
323 // Remove from buffer
[a45e21c]324 __do_remove( chan, retval );
[0e16a2d]325 unlock( mutex_lock );
326 return retval;
327}
[c44705c]328static inline void remove( channel(T) & chan ) { T elem = (T)remove( chan ); }
329
[ca22a7c]330
[a0b59ed]331///////////////////////////////////////////////////////////////////////////////////////////
332// The following is Go-style operator support for channels
333///////////////////////////////////////////////////////////////////////////////////////////
334
[ca22a7c]335static inline void ?<<?( channel(T) & chan, T elem ) { insert( chan, elem ); }
[bf55f32]336static inline void ?<<?( T & ret, channel(T) & chan ) { ret = remove( chan ); }
[beeff61e]337
338///////////////////////////////////////////////////////////////////////////////////////////
339// The following is support for waituntil (select) statements
340///////////////////////////////////////////////////////////////////////////////////////////
341static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
[88b49bb]342 if ( !node`isListed && !node.park_counter ) return false; // handle special OR case
[beeff61e]343 lock( mutex_lock );
344 if ( node`isListed ) { // op wasn't performed
345 remove( node );
346 unlock( mutex_lock );
347 return false;
348 }
349 unlock( mutex_lock );
350
[00b046f]351 // only return true when not special OR case and status is SAT
352 return !node.park_counter ? false : *node.clause_status == __SELECT_SAT;
[beeff61e]353}
354
[6f774be]355// special case of __handle_waituntil_OR, that does some work to avoid starvation/deadlock case
356static inline bool __handle_pending( dlist( select_node ) & queue, select_node & mine ) {
357 while ( !queue`isEmpty ) {
358 // if node not a special OR case or if we win the special OR case race break
359 if ( !queue`first.clause_status || queue`first.park_counter || __pending_set_other( queue`first, mine, ((unsigned long int)(&(queue`first))) ) )
360 return true;
361
362 // our node lost the race when toggling in __pending_set_other
363 if ( *mine.clause_status != __SELECT_PENDING )
364 return false;
365
366 // otherwise we lost the special OR race so discard node
367 try_pop_front( queue );
368 }
369 return false;
370}
371
[beeff61e]372// type used by select statement to capture a chan read as the selected operation
373struct chan_read {
374 T & ret;
[8607a72]375 channel(T) & chan;
[beeff61e]376};
[bf55f32]377__CFA_SELECT_GET_TYPE( chan_read(T) );
[beeff61e]378
379static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) {
380 &cr.chan = &chan;
381 &cr.ret = &ret;
382}
383static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; }
384
385static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) {
386 __closed_remove( chan, ret );
387 // if we get here then the insert succeeded
388 __make_select_node_available( node );
389}
390
391static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) {
392 lock( mutex_lock );
393 node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
394
395 #ifdef CHAN_STATS
[88b49bb]396 if ( !closed ) c_ops++;
[beeff61e]397 #endif
398
[c4f411e]399 if ( !node.park_counter ) {
400 // are we special case OR and front of cons is also special case OR
401 if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {
402 if ( !__make_select_node_pending( node ) ) {
403 unlock( mutex_lock );
404 return false;
405 }
[6f774be]406
407 if ( __handle_pending( prods, node ) ) {
[c4f411e]408 __prods_handoff( chan, ret );
[629c95a]409 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
[c4f411e]410 unlock( mutex_lock );
411 return true;
412 }
[6f774be]413 if ( *node.clause_status == __SELECT_PENDING )
414 __make_select_node_unsat( node );
[c4f411e]415 }
416 // check if we can complete operation. If so race to establish winner in special OR case
417 if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {
418 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
419 unlock( mutex_lock );
420 return false;
421 }
[beeff61e]422 }
423 }
424
425 if ( unlikely(closed) ) {
426 unlock( mutex_lock );
427 __handle_select_closed_read( this, node );
428 return true;
429 }
430
431 // have to check for the zero size channel case
432 ZeroSize: if ( size == 0 && !prods`isEmpty ) {
433 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
434 __prods_handoff( chan, ret );
435 __set_avail_then_unlock( node, mutex_lock );
436 return true;
437 }
438
439 // wait if buffer is empty, work will be completed by someone else
440 if ( count == 0 ) {
441 #ifdef CHAN_STATS
[88b49bb]442 c_blocks++;
[beeff61e]443 #endif
444
445 insert_last( cons, node );
446 unlock( mutex_lock );
447 return false;
448 }
449
450 // Remove from buffer
451 __do_remove( chan, ret );
452 __set_avail_then_unlock( node, mutex_lock );
453 return true;
454}
455static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
[b93bf85]456static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
457 if ( unlikely(node.extra == 0p) ) {
458 if ( !exception_in_flight() ) __closed_remove( chan, ret ); // check if woken up due to closed channel
459 else return false;
460 }
[beeff61e]461 // This is only reachable if not closed or closed exception was handled
[b93bf85]462 return true;
[beeff61e]463}
464
[c44705c]465// type used by select statement to capture a chan read as the selected operation that doesn't have a param to read to
466struct chan_read_no_ret {
467 T ret;
468 chan_read( T ) cr;
469};
470__CFA_SELECT_GET_TYPE( chan_read_no_ret(T) );
471
472static inline void ?{}( chan_read_no_ret(T) & this, channel(T) & chan ) {
473 this.cr{ chan, this.ret };
474}
475static inline chan_read_no_ret(T) remove( channel(T) & chan ) { chan_read_no_ret(T) cr{ chan }; return cr; }
476static inline bool register_select( chan_read_no_ret(T) & this, select_node & node ) { return register_select( this.cr, node ); }
477static inline bool unregister_select( chan_read_no_ret(T) & this, select_node & node ) { return unregister_select( this.cr, node ); }
478static inline bool on_selected( chan_read_no_ret(T) & this, select_node & node ) { return on_selected( this.cr, node ); }
479
[beeff61e]480// type used by select statement to capture a chan write as the selected operation
481struct chan_write {
482 T elem;
[8607a72]483 channel(T) & chan;
[beeff61e]484};
[bf55f32]485__CFA_SELECT_GET_TYPE( chan_write(T) );
[beeff61e]486
487static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) {
488 &cw.chan = &chan;
489 memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
490}
[ca22a7c]491static inline chan_write(T) ?<<?( channel(T) & chan, T elem ) { chan_write(T) cw{ chan, elem }; return cw; }
[a1467c1]492static inline chan_write(T) insert( T elem, channel(T) & chan) { chan_write(T) cw{ chan, elem }; return cw; }
[beeff61e]493
494static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) {
495 __closed_insert( chan, elem );
496 // if we get here then the insert succeeded
497 __make_select_node_available( node );
498}
499
500static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) {
501 lock( mutex_lock );
502 node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
503
504 #ifdef CHAN_STATS
[88b49bb]505 if ( !closed ) p_ops++;
[beeff61e]506 #endif
507
[c4f411e]508 // special OR case handling
509 if ( !node.park_counter ) {
510 // are we special case OR and front of cons is also special case OR
511 if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {
512 if ( !__make_select_node_pending( node ) ) {
513 unlock( mutex_lock );
514 return false;
515 }
[6f774be]516
517 if ( __handle_pending( cons, node ) ) {
[c4f411e]518 __cons_handoff( chan, elem );
[629c95a]519 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
[c4f411e]520 unlock( mutex_lock );
521 return true;
522 }
[6f774be]523 if ( *node.clause_status == __SELECT_PENDING )
524 __make_select_node_unsat( node );
[c4f411e]525 }
526 // check if we can complete operation. If so race to establish winner in special OR case
527 if ( count != size || !cons`isEmpty || unlikely(closed) ) {
528 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
529 unlock( mutex_lock );
530 return false;
531 }
[beeff61e]532 }
533 }
534
535 // if closed handle
536 if ( unlikely(closed) ) {
537 unlock( mutex_lock );
538 __handle_select_closed_write( this, node );
539 return true;
540 }
541
542 // handle blocked consumer case via handoff (buffer is implicitly empty)
543 ConsEmpty: if ( !cons`isEmpty ) {
[cb69fba]544 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
[beeff61e]545 __cons_handoff( chan, elem );
546 __set_avail_then_unlock( node, mutex_lock );
547 return true;
548 }
549
550 // insert node in list if buffer is full, work will be completed by someone else
551 if ( count == size ) {
552 #ifdef CHAN_STATS
[88b49bb]553 p_blocks++;
[beeff61e]554 #endif
555
556 insert_last( prods, node );
557 unlock( mutex_lock );
558 return false;
559 } // if
560
561 // otherwise carry out write either via normal insert
562 __buf_insert( chan, elem );
563 __set_avail_then_unlock( node, mutex_lock );
564 return true;
565}
566static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
567
[b93bf85]568static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
569 if ( unlikely(node.extra == 0p) ) {
570 if ( !exception_in_flight() ) __closed_insert( chan, elem ); // check if woken up due to closed channel
571 else return false;
572 }
[beeff61e]573 // This is only reachable if not closed or closed exception was handled
[b93bf85]574 return true;
[beeff61e]575}
576
[0e16a2d]577} // forall( T )
[beeff61e]578
579
Note: See TracBrowser for help on using the repository browser.