source: libcfa/src/concurrency/channel.hfa@ b28ce93

Last change on this file since b28ce93 was 6b33e89, checked in by Peter A. Buhr <pabuhr@…>, 5 months ago

change backquote call to regular call

  • Property mode set to 100644
File size: 19.9 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// channel.hfa -- LIBCFATHREAD
8// Runtime locks that used with the runtime thread system.
9//
10// Author : Colby Alexander Parsons
11// Created On : Thu Jan 21 19:46:50 2022
12// Last Modified By :
13// Last Modified On :
14// Update Count :
15//
16
17#pragma once
18
19#include <locks.hfa>
20#include <list.hfa>
21#include "select.hfa"
22
23// returns true if woken due to shutdown
24// blocks thread on list and releases passed lock
25static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
26 select_node sn{ active_thread(), elem_ptr };
27 insert_last( queue, sn );
28 unlock( lock );
29 park();
30 return sn.extra == 0p;
31}
32
33// Waituntil support (un)register_select helper routine
34// Sets select node avail if not special OR case and then unlocks
35static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
36 if ( node.park_counter ) __make_select_node_available( node );
37 unlock( mutex_lock );
38}
39
40// void * used for some fields since exceptions don't work with parametric polymorphism currently
41exception channel_closed {
42 // on failed insert elem is a ptr to the element attempting to be inserted
43 // on failed remove elem ptr is 0p
44 // on resumption of a failed insert this elem will be inserted
45 // so a user may modify it in the resumption handler
46 void * elem;
47
48 // pointer to chan that is closed
49 void * closed_chan;
50};
51vtable(channel_closed) channel_closed_vt;
52
53static inline bool is_insert( channel_closed & e ) { return e.elem != 0p; }
54static inline bool is_remove( channel_closed & e ) { return e.elem == 0p; }
55
56// #define CHAN_STATS // define this to get channel stats printed in dtor
57
58forall( T ) {
59 struct __attribute__((aligned(128))) channel {
60 size_t size, front, back, count;
61 T * buffer;
62 dlist( select_node ) prods, cons; // lists of blocked threads
63 go_mutex mutex_lock; // MX lock
64 bool closed; // indicates channel close/open
65 #ifdef CHAN_STATS
66 size_t p_blocks, p_ops, c_blocks, c_ops; // counts total ops and ops resulting in a blocked thd
67 #endif
68 };
69
70 // type used by select statement to capture a chan read as the selected operation
71 struct chan_read {
72 T * ret;
73 channel(T) * chan;
74 };
75 __CFA_SELECT_GET_TYPE( chan_read(T) );
76
77 // type used by select statement to capture a chan read as the selected operation that doesn't have a param to read to
78 struct chan_read_no_ret {
79 T retval;
80 chan_read( T ) c_read;
81 };
82 __CFA_SELECT_GET_TYPE( chan_read_no_ret(T) );
83
84 // type used by select statement to capture a chan write as the selected operation
85 struct chan_write {
86 T elem;
87 channel(T) * chan;
88 };
89 __CFA_SELECT_GET_TYPE( chan_write(T) );
90} // distribution
91
92static inline forall( T ) {
93 void ?{}( channel(T) & this, channel(T) this2 ) = void;
94 void ?=?( channel(T) & this, channel(T) this2 ) = void;
95
96 void ?{}( channel(T) &c, size_t _size ) with(c) {
97 size = _size;
98 front = back = count = 0;
99 if ( size != 0 ) buffer = aalloc( size );
100 prods{};
101 cons{};
102 mutex_lock{};
103 closed = false;
104 #ifdef CHAN_STATS
105 p_blocks = 0;
106 p_ops = 0;
107 c_blocks = 0;
108 c_ops = 0;
109 #endif
110 }
111
112 void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
113 void ^?{}( channel(T) &c ) with(c) {
114 #ifdef CHAN_STATS
115 printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100);
116 printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100);
117 printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100);
118 #endif
119 verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || isEmpty( cons ) && isEmpty( prods ),
120 "Attempted to delete channel with waiting threads (Deadlock).\n" );
121 if ( size != 0 ) delete( buffer );
122 }
123 size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }
124 size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }
125 bool has_waiters( channel(T) & chan ) with(chan) { return ! isEmpty( cons ) || ! isEmpty( prods ); }
126 bool has_waiting_consumers( channel(T) & chan ) with(chan) { return ! isEmpty( cons ); }
127 bool has_waiting_producers( channel(T) & chan ) with(chan) { return ! isEmpty( prods ); }
128
129 // closes the channel and notifies all blocked threads
130 void close( channel(T) & chan ) with(chan) {
131 lock( mutex_lock );
132 closed = true;
133
134 // flush waiting consumers and producers
135 while ( has_waiting_consumers( chan ) ) {
136 if( ! __handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
137 break; // if __handle_waituntil_OR returns false cons is empty so break
138 first( cons ).extra = 0p;
139 wake_one( cons );
140 }
141 while ( has_waiting_producers( chan ) ) {
142 if( ! __handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
143 break; // if __handle_waituntil_OR returns false prods is empty so break
144 first( prods ).extra = 0p;
145 wake_one( prods );
146 }
147 unlock(mutex_lock);
148 }
149
150 void is_closed( channel(T) & chan ) with(chan) { return closed; }
151
152 // used to hand an element to a blocked consumer and signal it
153 void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
154 memcpy( first( cons ).extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
155 wake_one( cons );
156 }
157
158 // used to hand an element to a blocked producer and signal it
159 void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
160 memcpy( (void *)&retval, first( prods ).extra, sizeof(T) );
161 wake_one( prods );
162 }
163
164 void flush( channel(T) & chan, T elem ) with(chan) {
165 lock( mutex_lock );
166 while ( count == 0 && ! isEmpty( cons ) ) {
167 __cons_handoff( chan, elem );
168 }
169 unlock( mutex_lock );
170 }
171
172 // handles buffer insert
173 void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
174 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
175 count += 1;
176 back++;
177 if ( back == size ) back = 0;
178 }
179
180 // needed to avoid an extra copy in closed case
181 bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
182 lock( mutex_lock );
183 #ifdef CHAN_STATS
184 p_ops++;
185 #endif
186
187 ConsEmpty:
188 if ( ! isEmpty( cons ) ) {
189 if ( ! __handle_waituntil_OR( cons ) ) break ConsEmpty;
190 __cons_handoff( chan, elem );
191 unlock( mutex_lock );
192 return true;
193 }
194
195 if ( count == size ) { unlock( mutex_lock ); return false; }
196
197 __buf_insert( chan, elem );
198 unlock( mutex_lock );
199 return true;
200 }
201
202 // attempts a nonblocking insert
203 // returns true if insert was successful, false otherwise
204 bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
205
206 // handles closed case of insert routine
207 void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
208 channel_closed except{ &channel_closed_vt, &elem, &chan };
209 throwResume except; // throw closed resumption
210 if ( ! __internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
211 }
212
213 void insert( channel(T) & chan, T elem ) with(chan) {
214 // check for close before acquire mx
215 if ( unlikely(closed) ) {
216 __closed_insert( chan, elem );
217 return;
218 }
219
220 lock( mutex_lock );
221
222 #ifdef CHAN_STATS
223 if ( ! closed ) p_ops++;
224 #endif
225
226 // if closed handle
227 if ( unlikely(closed) ) {
228 unlock( mutex_lock );
229 __closed_insert( chan, elem );
230 return;
231 }
232
233 // buffer count must be zero if cons are blocked (also handles zero-size case)
234 ConsEmpty:
235 if ( ! isEmpty( cons ) ) {
236 if ( ! __handle_waituntil_OR( cons ) ) break ConsEmpty;
237 __cons_handoff( chan, elem );
238 unlock( mutex_lock );
239 return;
240 }
241
242 // wait if buffer is full, work will be completed by someone else
243 if ( count == size ) {
244 #ifdef CHAN_STATS
245 p_blocks++;
246 #endif
247
248 // check for if woken due to close
249 if ( unlikely( block( prods, &elem, mutex_lock ) ) )
250 __closed_insert( chan, elem );
251 return;
252 } // if
253
254 __buf_insert( chan, elem );
255 unlock( mutex_lock );
256 }
257
258 // does the buffer remove and potentially does waiting producer work
259 void __do_remove( channel(T) & chan, T & retval ) with(chan) {
260 memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
261 count -= 1;
262 front = (front + 1) % size;
263 if (count == size - 1 && ! isEmpty( prods ) ) {
264 if ( ! __handle_waituntil_OR( prods ) ) return;
265 __buf_insert( chan, *(T *)first( prods ).extra ); // do waiting producer work
266 wake_one( prods );
267 }
268 }
269
270 // needed to avoid an extra copy in closed case and single return val case
271 bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
272 lock( mutex_lock );
273 #ifdef CHAN_STATS
274 c_ops++;
275 #endif
276
277 ZeroSize:
278 if ( size == 0 && ! isEmpty( prods ) ) {
279 if ( ! __handle_waituntil_OR( prods ) ) break ZeroSize;
280 __prods_handoff( chan, retval );
281 unlock( mutex_lock );
282 return true;
283 }
284
285 if ( count == 0 ) { unlock( mutex_lock ); return false; }
286
287 __do_remove( chan, retval );
288 unlock( mutex_lock );
289 return true;
290 }
291
292 // attempts a nonblocking remove
293 // returns [T, true] if insert was successful
294 // returns [T, false] if insert was successful (T uninit)
295 [T, bool] try_remove( channel(T) & chan ) {
296 T retval;
297 bool success = __internal_try_remove( chan, retval );
298 return [ retval, success ];
299 }
300
301 T try_remove( channel(T) & chan ) {
302 T retval;
303 __internal_try_remove( chan, retval );
304 return retval;
305 }
306
307 // handles closed case of insert routine
308 void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
309 channel_closed except{ &channel_closed_vt, 0p, &chan };
310 throwResume except; // throw resumption
311 if ( ! __internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
312 }
313
314 T remove( channel(T) & chan ) with(chan) {
315 T retval;
316 if ( unlikely(closed) ) {
317 __closed_remove( chan, retval );
318 return retval;
319 }
320 lock( mutex_lock );
321
322 #ifdef CHAN_STATS
323 if ( ! closed ) c_ops++;
324 #endif
325
326 if ( unlikely(closed) ) {
327 unlock( mutex_lock );
328 __closed_remove( chan, retval );
329 return retval;
330 }
331
332 // have to check for the zero size channel case
333 ZeroSize:
334 if ( size == 0 && ! isEmpty( prods ) ) {
335 if ( ! __handle_waituntil_OR( prods ) ) break ZeroSize;
336 __prods_handoff( chan, retval );
337 unlock( mutex_lock );
338 return retval;
339 }
340
341 // wait if buffer is empty, work will be completed by someone else
342 if ( count == 0 ) {
343 #ifdef CHAN_STATS
344 c_blocks++;
345 #endif
346 // check for if woken due to close
347 if ( unlikely( block( cons, &retval, mutex_lock ) ) )
348 __closed_remove( chan, retval );
349 return retval;
350 }
351
352 // Remove from buffer
353 __do_remove( chan, retval );
354 unlock( mutex_lock );
355 return retval;
356 }
357 void remove( channel(T) & chan ) { T elem = (T)remove( chan ); }
358
359
360 ///////////////////////////////////////////////////////////////////////////////////////////
361 // The following is Go-style operator support for channels
362 ///////////////////////////////////////////////////////////////////////////////////////////
363
364 void ?<<?( channel(T) & chan, T elem ) { insert( chan, elem ); }
365 void ?<<?( T & ret, channel(T) & chan ) { ret = remove( chan ); }
366
367 ///////////////////////////////////////////////////////////////////////////////////////////
368 // The following is support for waituntil (select) statements
369 ///////////////////////////////////////////////////////////////////////////////////////////
370 bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
371 if ( ! isListed( node ) && ! node.park_counter ) return false; // handle special OR case
372 lock( mutex_lock );
373 if ( isListed( node ) ) { // op wasn't performed
374 remove( node );
375 unlock( mutex_lock );
376 return false;
377 }
378 unlock( mutex_lock );
379
380 // only return true when not special OR case and status is SAT
381 return ! node.park_counter ? false : *node.clause_status == __SELECT_SAT;
382 }
383
384 // special case of __handle_waituntil_OR, that does some work to avoid starvation/deadlock case
385 bool __handle_pending( dlist( select_node ) & queue, select_node & mine ) {
386 while ( ! isEmpty( queue ) ) {
387 // if node not a special OR case or if we win the special OR case race break
388 if ( ! first( queue ).clause_status || first( queue ).park_counter || __pending_set_other( first( queue ), mine, ((unsigned long int)(&(first( queue )))) ) )
389 return true;
390
391 // our node lost the race when toggling in __pending_set_other
392 if ( *mine.clause_status != __SELECT_PENDING )
393 return false;
394
395 // otherwise we lost the special OR race so discard node
396 remove_first( queue );
397 }
398 return false;
399 }
400
401 void ?{}( chan_read(T) & cr, channel(T) * chan, T * ret ) {
402 cr.chan = chan;
403 cr.ret = ret;
404 }
405 chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ &chan, &ret }; return cr; }
406
407 void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(*this.chan, this) {
408 __closed_remove( *chan, *ret );
409 // if we get here then the insert succeeded
410 __make_select_node_available( node );
411 }
412
413 bool register_select( chan_read(T) & this, select_node & node ) with(*this.chan, this) {
414 lock( mutex_lock );
415 node.extra = ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
416
417 #ifdef CHAN_STATS
418 if ( ! closed ) c_ops++;
419 #endif
420
421 if ( ! node.park_counter ) {
422 // are we special case OR and front of cons is also special case OR
423 if ( ! unlikely(closed) && ! isEmpty( prods ) && first( prods ).clause_status && ! first( prods ).park_counter ) {
424 if ( ! __make_select_node_pending( node ) ) {
425 unlock( mutex_lock );
426 return false;
427 }
428
429 if ( __handle_pending( prods, node ) ) {
430 __prods_handoff( *chan, *ret );
431 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
432 unlock( mutex_lock );
433 return true;
434 }
435 if ( *node.clause_status == __SELECT_PENDING )
436 __make_select_node_unsat( node );
437 }
438 // check if we can complete operation. If so race to establish winner in special OR case
439 if ( count != 0 || ! isEmpty( prods ) || unlikely(closed) ) {
440 if ( ! __make_select_node_available( node ) ) { // we didn't win the race so give up on registering
441 unlock( mutex_lock );
442 return false;
443 }
444 }
445 }
446
447 if ( unlikely(closed) ) {
448 unlock( mutex_lock );
449 __handle_select_closed_read( this, node );
450 return true;
451 }
452
453 // have to check for the zero size channel case
454 ZeroSize:
455 if ( size == 0 && ! isEmpty( prods ) ) {
456 if ( ! __handle_waituntil_OR( prods ) ) break ZeroSize;
457 __prods_handoff( *chan, *ret );
458 __set_avail_then_unlock( node, mutex_lock );
459 return true;
460 }
461
462 // wait if buffer is empty, work will be completed by someone else
463 if ( count == 0 ) {
464 #ifdef CHAN_STATS
465 c_blocks++;
466 #endif
467
468 insert_last( cons, node );
469 unlock( mutex_lock );
470 return false;
471 }
472
473 // Remove from buffer
474 __do_remove( *chan, *ret );
475 __set_avail_then_unlock( node, mutex_lock );
476 return true;
477 }
478 bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); }
479 bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
480 if ( unlikely(node.extra == 0p) ) {
481 if ( ! exception_in_flight() ) __closed_remove( *chan, *ret ); // check if woken up due to closed channel
482 else return false;
483 }
484 // This is only reachable if not closed or closed exception was handled
485 return true;
486 }
487
488 void ?{}( chan_read_no_ret(T) & this, channel(T) & chan ) {
489 this.c_read{ &chan, &this.retval };
490 }
491
492 chan_read_no_ret(T) remove( channel(T) & chan ) { chan_read_no_ret(T) c_read{ chan }; return c_read; }
493 bool register_select( chan_read_no_ret(T) & this, select_node & node ) {
494 this.c_read.ret = &this.retval;
495 return register_select( this.c_read, node );
496 }
497 bool unregister_select( chan_read_no_ret(T) & this, select_node & node ) { return unregister_select( this.c_read, node ); }
498 bool on_selected( chan_read_no_ret(T) & this, select_node & node ) { return on_selected( this.c_read, node ); }
499
500 void ?{}( chan_write(T) & cw, channel(T) * chan, T elem ) {
501 cw.chan = chan;
502 memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
503 }
504 chan_write(T) ?<<?( channel(T) & chan, T elem ) { chan_write(T) cw{ &chan, elem }; return cw; }
505 chan_write(T) insert( T elem, channel(T) & chan) { chan_write(T) cw{ &chan, elem }; return cw; }
506
507 void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(*this.chan, this) {
508 __closed_insert( *chan, elem );
509 // if we get here then the insert succeeded
510 __make_select_node_available( node );
511 }
512
513 bool register_select( chan_write(T) & this, select_node & node ) with(*this.chan, this) {
514 lock( mutex_lock );
515 node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
516
517 #ifdef CHAN_STATS
518 if ( ! closed ) p_ops++;
519 #endif
520
521 // special OR case handling
522 if ( ! node.park_counter ) {
523 // are we special case OR and front of cons is also special case OR
524 if ( ! unlikely(closed) && ! isEmpty( cons ) && first( cons ).clause_status && ! first( cons ).park_counter ) {
525 if ( ! __make_select_node_pending( node ) ) {
526 unlock( mutex_lock );
527 return false;
528 }
529 if ( __handle_pending( cons, node ) ) {
530 __cons_handoff( *chan, elem );
531 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
532 unlock( mutex_lock );
533 return true;
534 }
535 if ( *node.clause_status == __SELECT_PENDING )
536 __make_select_node_unsat( node );
537 }
538 // check if we can complete operation. If so race to establish winner in special OR case
539 if ( count != size || ! isEmpty( cons ) || unlikely(closed) ) {
540 if ( ! __make_select_node_available( node ) ) { // we didn't win the race so give up on registering
541 unlock( mutex_lock );
542 return false;
543 }
544 }
545 }
546
547 // if closed handle
548 if ( unlikely(closed) ) {
549 unlock( mutex_lock );
550 __handle_select_closed_write( this, node );
551 return true;
552 }
553
554 // handle blocked consumer case via handoff (buffer is implicitly empty)
555 ConsEmpty:
556 if ( ! isEmpty( cons ) ) {
557 if ( ! __handle_waituntil_OR( cons ) ) break ConsEmpty;
558 __cons_handoff( *chan, elem );
559 __set_avail_then_unlock( node, mutex_lock );
560 return true;
561 }
562
563 // insert node in list if buffer is full, work will be completed by someone else
564 if ( count == size ) {
565 #ifdef CHAN_STATS
566 p_blocks++;
567 #endif
568
569 insert_last( prods, node );
570 unlock( mutex_lock );
571 return false;
572 } // if
573
574 // otherwise carry out write either via normal insert
575 __buf_insert( *chan, elem );
576 __set_avail_then_unlock( node, mutex_lock );
577 return true;
578 }
579 bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); }
580
581 bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
582 if ( unlikely(node.extra == 0p) ) {
583 if ( ! exception_in_flight() ) __closed_insert( *chan, elem ); // check if woken up due to closed channel
584 else return false;
585 }
586 // This is only reachable if not closed or closed exception was handled
587 return true;
588 }
589} // distribution
590
591
Note: See TracBrowser for help on using the repository browser.