source: libcfa/src/concurrency/channel.hfa@ 49ae2bc

Last change on this file since 49ae2bc was 3f0b062, checked in by caparsons <caparson@…>, 2 years ago

ifdef'd the arm fences that were added to channels so that they only appear on the arm

  • Property mode set to 100644
File size: 21.1 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// channel.hfa -- LIBCFATHREAD
8// Runtime locks that used with the runtime thread system.
9//
10// Author : Colby Alexander Parsons
11// Created On : Thu Jan 21 19:46:50 2022
12// Last Modified By :
13// Last Modified On :
14// Update Count :
15//
16
17#pragma once
18
19#include <locks.hfa>
20#include <list.hfa>
21#include "select.hfa"
22
23// returns true if woken due to shutdown
24// blocks thread on list and releases passed lock
25static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
26 select_node sn{ active_thread(), elem_ptr };
27 insert_last( queue, sn );
28 unlock( lock );
29 park();
30 #if defined(__ARM_ARCH)
31 __atomic_thread_fence( __ATOMIC_SEQ_CST );
32 #endif
33 return sn.extra == 0p;
34}
35
36// Waituntil support (un)register_select helper routine
37// Sets select node avail if not special OR case and then unlocks
38static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
39 if ( node.park_counter ) __make_select_node_available( node );
40 unlock( mutex_lock );
41}
42
43// void * used for some fields since exceptions don't work with parametric polymorphism currently
44exception channel_closed {
45 // on failed insert elem is a ptr to the element attempting to be inserted
46 // on failed remove elem ptr is 0p
47 // on resumption of a failed insert this elem will be inserted
48 // so a user may modify it in the resumption handler
49 void * elem;
50
51 // pointer to chan that is closed
52 void * closed_chan;
53};
54vtable(channel_closed) channel_closed_vt;
55
56static inline bool is_insert( channel_closed & e ) { return e.elem != 0p; }
57static inline bool is_remove( channel_closed & e ) { return e.elem == 0p; }
58
59// #define CHAN_STATS // define this to get channel stats printed in dtor
60
61forall( T ) {
62
63struct __attribute__((aligned(128))) channel {
64 size_t size, front, back, count;
65 T * buffer;
66 dlist( select_node ) prods, cons; // lists of blocked threads
67 go_mutex mutex_lock; // MX lock
68 bool closed; // indicates channel close/open
69 #ifdef CHAN_STATS
70 size_t p_blocks, p_ops, c_blocks, c_ops; // counts total ops and ops resulting in a blocked thd
71 #endif
72};
73static inline void ?{}( channel(T) & this, channel(T) this2 ) = void;
74static inline void ?=?( channel(T) & this, channel(T) this2 ) = void;
75
76static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
77 size = _size;
78 front = back = count = 0;
79 if ( size != 0 ) buffer = aalloc( size );
80 prods{};
81 cons{};
82 mutex_lock{};
83 closed = false;
84 #ifdef CHAN_STATS
85 p_blocks = 0;
86 p_ops = 0;
87 c_blocks = 0;
88 c_ops = 0;
89 #endif
90}
91
92static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
93static inline void ^?{}( channel(T) &c ) with(c) {
94 #ifdef CHAN_STATS
95 printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100);
96 printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100);
97 printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100);
98 #endif
99 verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || cons`isEmpty && prods`isEmpty,
100 "Attempted to delete channel with waiting threads (Deadlock).\n" );
101 if ( size != 0 ) delete( buffer );
102}
103static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }
104static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }
105static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
106static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
107static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
108
109// closes the channel and notifies all blocked threads
110static inline void close( channel(T) & chan ) with(chan) {
111 lock( mutex_lock );
112 closed = true;
113
114 // flush waiting consumers and producers
115 while ( has_waiting_consumers( chan ) ) {
116 if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
117 break; // if __handle_waituntil_OR returns false cons is empty so break
118 cons`first.extra = 0p;
119 wake_one( cons );
120 }
121 while ( has_waiting_producers( chan ) ) {
122 if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
123 break; // if __handle_waituntil_OR returns false prods is empty so break
124 prods`first.extra = 0p;
125 wake_one( prods );
126 }
127 unlock(mutex_lock);
128}
129
130static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
131
132// used to hand an element to a blocked consumer and signal it
133static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
134 memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
135 #if defined(__ARM_ARCH)
136 __atomic_thread_fence( __ATOMIC_SEQ_CST );
137 #endif
138 wake_one( cons );
139}
140
141// used to hand an element to a blocked producer and signal it
142static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
143 memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
144 #if defined(__ARM_ARCH)
145 __atomic_thread_fence( __ATOMIC_SEQ_CST );
146 #endif
147 wake_one( prods );
148}
149
150static inline void flush( channel(T) & chan, T elem ) with(chan) {
151 lock( mutex_lock );
152 while ( count == 0 && !cons`isEmpty ) {
153 __cons_handoff( chan, elem );
154 }
155 unlock( mutex_lock );
156}
157
158// handles buffer insert
159static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
160 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
161 count += 1;
162 back++;
163 if ( back == size ) back = 0;
164}
165
166// needed to avoid an extra copy in closed case
167static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
168 lock( mutex_lock );
169 #ifdef CHAN_STATS
170 p_ops++;
171 #endif
172
173 ConsEmpty: if ( !cons`isEmpty ) {
174 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
175 __cons_handoff( chan, elem );
176 unlock( mutex_lock );
177 return true;
178 }
179
180 if ( count == size ) { unlock( mutex_lock ); return false; }
181
182 __buf_insert( chan, elem );
183 unlock( mutex_lock );
184 return true;
185}
186
187// attempts a nonblocking insert
188// returns true if insert was successful, false otherwise
189static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
190
191// handles closed case of insert routine
192static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
193 channel_closed except{ &channel_closed_vt, &elem, &chan };
194 throwResume except; // throw closed resumption
195 if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
196}
197
198static inline void insert( channel(T) & chan, T elem ) with(chan) {
199 // check for close before acquire mx
200 if ( unlikely(closed) ) {
201 __closed_insert( chan, elem );
202 return;
203 }
204
205 lock( mutex_lock );
206
207 #ifdef CHAN_STATS
208 if ( !closed ) p_ops++;
209 #endif
210
211 // if closed handle
212 if ( unlikely(closed) ) {
213 unlock( mutex_lock );
214 __closed_insert( chan, elem );
215 return;
216 }
217
218 // buffer count must be zero if cons are blocked (also handles zero-size case)
219 ConsEmpty: if ( !cons`isEmpty ) {
220 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
221 __cons_handoff( chan, elem );
222 unlock( mutex_lock );
223 return;
224 }
225
226 // wait if buffer is full, work will be completed by someone else
227 if ( count == size ) {
228 #ifdef CHAN_STATS
229 p_blocks++;
230 #endif
231
232 // check for if woken due to close
233 if ( unlikely( block( prods, &elem, mutex_lock ) ) )
234 __closed_insert( chan, elem );
235 return;
236 } // if
237
238 __buf_insert( chan, elem );
239 unlock( mutex_lock );
240}
241
242// does the buffer remove and potentially does waiting producer work
243static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
244 memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
245 count -= 1;
246 front = (front + 1) % size;
247 if (count == size - 1 && !prods`isEmpty ) {
248 if ( !__handle_waituntil_OR( prods ) ) return;
249 __buf_insert( chan, *(T *)prods`first.extra ); // do waiting producer work
250 wake_one( prods );
251 }
252}
253
254// needed to avoid an extra copy in closed case and single return val case
255static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
256 lock( mutex_lock );
257 #ifdef CHAN_STATS
258 c_ops++;
259 #endif
260
261 ZeroSize: if ( size == 0 && !prods`isEmpty ) {
262 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
263 __prods_handoff( chan, retval );
264 unlock( mutex_lock );
265 return true;
266 }
267
268 if ( count == 0 ) { unlock( mutex_lock ); return false; }
269
270 __do_remove( chan, retval );
271 unlock( mutex_lock );
272 return true;
273}
274
275// attempts a nonblocking remove
276// returns [T, true] if insert was successful
277// returns [T, false] if insert was successful (T uninit)
278static inline [T, bool] try_remove( channel(T) & chan ) {
279 T retval;
280 bool success = __internal_try_remove( chan, retval );
281 return [ retval, success ];
282}
283
284static inline T try_remove( channel(T) & chan ) {
285 T retval;
286 __internal_try_remove( chan, retval );
287 return retval;
288}
289
290// handles closed case of insert routine
291static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
292 channel_closed except{ &channel_closed_vt, 0p, &chan };
293 throwResume except; // throw resumption
294 if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
295}
296
297static inline T remove( channel(T) & chan ) with(chan) {
298 T retval;
299 if ( unlikely(closed) ) {
300 __closed_remove( chan, retval );
301 return retval;
302 }
303 lock( mutex_lock );
304
305 #ifdef CHAN_STATS
306 if ( !closed ) c_ops++;
307 #endif
308
309 if ( unlikely(closed) ) {
310 unlock( mutex_lock );
311 __closed_remove( chan, retval );
312 return retval;
313 }
314
315 // have to check for the zero size channel case
316 ZeroSize: if ( size == 0 && !prods`isEmpty ) {
317 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
318 __prods_handoff( chan, retval );
319 unlock( mutex_lock );
320 return retval;
321 }
322
323 // wait if buffer is empty, work will be completed by someone else
324 if ( count == 0 ) {
325 #ifdef CHAN_STATS
326 c_blocks++;
327 #endif
328 // check for if woken due to close
329 if ( unlikely( block( cons, &retval, mutex_lock ) ) )
330 __closed_remove( chan, retval );
331 return retval;
332 }
333
334 // Remove from buffer
335 __do_remove( chan, retval );
336 unlock( mutex_lock );
337 return retval;
338}
339static inline void remove( channel(T) & chan ) { T elem = (T)remove( chan ); }
340
341
342///////////////////////////////////////////////////////////////////////////////////////////
343// The following is Go-style operator support for channels
344///////////////////////////////////////////////////////////////////////////////////////////
345
346static inline void ?<<?( channel(T) & chan, T elem ) { insert( chan, elem ); }
347static inline void ?<<?( T & ret, channel(T) & chan ) { ret = remove( chan ); }
348
349///////////////////////////////////////////////////////////////////////////////////////////
350// The following is support for waituntil (select) statements
351///////////////////////////////////////////////////////////////////////////////////////////
352static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
353 if ( !node`isListed && !node.park_counter ) return false; // handle special OR case
354 lock( mutex_lock );
355 if ( node`isListed ) { // op wasn't performed
356 remove( node );
357 unlock( mutex_lock );
358 return false;
359 }
360 unlock( mutex_lock );
361
362 // only return true when not special OR case and status is SAT
363 return !node.park_counter ? false : *node.clause_status == __SELECT_SAT;
364}
365
366// special case of __handle_waituntil_OR, that does some work to avoid starvation/deadlock case
367static inline bool __handle_pending( dlist( select_node ) & queue, select_node & mine ) {
368 while ( !queue`isEmpty ) {
369 // if node not a special OR case or if we win the special OR case race break
370 if ( !queue`first.clause_status || queue`first.park_counter || __pending_set_other( queue`first, mine, ((unsigned long int)(&(queue`first))) ) )
371 return true;
372
373 // our node lost the race when toggling in __pending_set_other
374 if ( *mine.clause_status != __SELECT_PENDING )
375 return false;
376
377 // otherwise we lost the special OR race so discard node
378 try_pop_front( queue );
379 }
380 return false;
381}
382
383// type used by select statement to capture a chan read as the selected operation
384struct chan_read {
385 T * ret;
386 channel(T) * chan;
387};
388__CFA_SELECT_GET_TYPE( chan_read(T) );
389
390static inline void ?{}( chan_read(T) & cr, channel(T) * chan, T * ret ) {
391 cr.chan = chan;
392 cr.ret = ret;
393}
394static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ &chan, &ret }; return cr; }
395
396static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(*this.chan, this) {
397 __closed_remove( *chan, *ret );
398 // if we get here then the insert succeeded
399 __make_select_node_available( node );
400}
401
402static inline bool register_select( chan_read(T) & this, select_node & node ) with(*this.chan, this) {
403 lock( mutex_lock );
404 node.extra = ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
405
406 #ifdef CHAN_STATS
407 if ( !closed ) c_ops++;
408 #endif
409
410 if ( !node.park_counter ) {
411 // are we special case OR and front of cons is also special case OR
412 if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {
413 if ( !__make_select_node_pending( node ) ) {
414 unlock( mutex_lock );
415 return false;
416 }
417
418 if ( __handle_pending( prods, node ) ) {
419 __prods_handoff( *chan, *ret );
420 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
421 unlock( mutex_lock );
422 return true;
423 }
424 if ( *node.clause_status == __SELECT_PENDING )
425 __make_select_node_unsat( node );
426 }
427 // check if we can complete operation. If so race to establish winner in special OR case
428 if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {
429 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
430 unlock( mutex_lock );
431 return false;
432 }
433 }
434 }
435
436 if ( unlikely(closed) ) {
437 unlock( mutex_lock );
438 __handle_select_closed_read( this, node );
439 return true;
440 }
441
442 // have to check for the zero size channel case
443 ZeroSize: if ( size == 0 && !prods`isEmpty ) {
444 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
445 __prods_handoff( *chan, *ret );
446 __set_avail_then_unlock( node, mutex_lock );
447 return true;
448 }
449
450 // wait if buffer is empty, work will be completed by someone else
451 if ( count == 0 ) {
452 #ifdef CHAN_STATS
453 c_blocks++;
454 #endif
455
456 insert_last( cons, node );
457 unlock( mutex_lock );
458 return false;
459 }
460
461 // Remove from buffer
462 __do_remove( *chan, *ret );
463 __set_avail_then_unlock( node, mutex_lock );
464 return true;
465}
466static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); }
467static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
468 if ( unlikely(node.extra == 0p) ) {
469 if ( !exception_in_flight() ) __closed_remove( *chan, *ret ); // check if woken up due to closed channel
470 else return false;
471 }
472 // This is only reachable if not closed or closed exception was handled
473 return true;
474}
475
476// type used by select statement to capture a chan read as the selected operation that doesn't have a param to read to
477struct chan_read_no_ret {
478 T retval;
479 chan_read( T ) c_read;
480};
481__CFA_SELECT_GET_TYPE( chan_read_no_ret(T) );
482
483static inline void ?{}( chan_read_no_ret(T) & this, channel(T) & chan ) {
484 this.c_read{ &chan, &this.retval };
485}
486
487static inline chan_read_no_ret(T) remove( channel(T) & chan ) { chan_read_no_ret(T) c_read{ chan }; return c_read; }
488static inline bool register_select( chan_read_no_ret(T) & this, select_node & node ) {
489 this.c_read.ret = &this.retval;
490 return register_select( this.c_read, node );
491}
492static inline bool unregister_select( chan_read_no_ret(T) & this, select_node & node ) { return unregister_select( this.c_read, node ); }
493static inline bool on_selected( chan_read_no_ret(T) & this, select_node & node ) { return on_selected( this.c_read, node ); }
494
495// type used by select statement to capture a chan write as the selected operation
496struct chan_write {
497 T elem;
498 channel(T) * chan;
499};
500__CFA_SELECT_GET_TYPE( chan_write(T) );
501
502static inline void ?{}( chan_write(T) & cw, channel(T) * chan, T elem ) {
503 cw.chan = chan;
504 memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
505}
506static inline chan_write(T) ?<<?( channel(T) & chan, T elem ) { chan_write(T) cw{ &chan, elem }; return cw; }
507static inline chan_write(T) insert( T elem, channel(T) & chan) { chan_write(T) cw{ &chan, elem }; return cw; }
508
509static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(*this.chan, this) {
510 __closed_insert( *chan, elem );
511 // if we get here then the insert succeeded
512 __make_select_node_available( node );
513}
514
515static inline bool register_select( chan_write(T) & this, select_node & node ) with(*this.chan, this) {
516 lock( mutex_lock );
517 node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
518
519 #ifdef CHAN_STATS
520 if ( !closed ) p_ops++;
521 #endif
522
523 // special OR case handling
524 if ( !node.park_counter ) {
525 // are we special case OR and front of cons is also special case OR
526 if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {
527 if ( !__make_select_node_pending( node ) ) {
528 unlock( mutex_lock );
529 return false;
530 }
531
532 if ( __handle_pending( cons, node ) ) {
533 __cons_handoff( *chan, elem );
534 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
535 unlock( mutex_lock );
536 return true;
537 }
538 if ( *node.clause_status == __SELECT_PENDING )
539 __make_select_node_unsat( node );
540 }
541 // check if we can complete operation. If so race to establish winner in special OR case
542 if ( count != size || !cons`isEmpty || unlikely(closed) ) {
543 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
544 unlock( mutex_lock );
545 return false;
546 }
547 }
548 }
549
550 // if closed handle
551 if ( unlikely(closed) ) {
552 unlock( mutex_lock );
553 __handle_select_closed_write( this, node );
554 return true;
555 }
556
557 // handle blocked consumer case via handoff (buffer is implicitly empty)
558 ConsEmpty: if ( !cons`isEmpty ) {
559 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
560 __cons_handoff( *chan, elem );
561 __set_avail_then_unlock( node, mutex_lock );
562 return true;
563 }
564
565 // insert node in list if buffer is full, work will be completed by someone else
566 if ( count == size ) {
567 #ifdef CHAN_STATS
568 p_blocks++;
569 #endif
570
571 insert_last( prods, node );
572 unlock( mutex_lock );
573 return false;
574 } // if
575
576 // otherwise carry out write either via normal insert
577 __buf_insert( *chan, elem );
578 __set_avail_then_unlock( node, mutex_lock );
579 return true;
580}
581static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); }
582
583static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
584 if ( unlikely(node.extra == 0p) ) {
585 if ( !exception_in_flight() ) __closed_insert( *chan, elem ); // check if woken up due to closed channel
586 else return false;
587 }
588 // This is only reachable if not closed or closed exception was handled
589 return true;
590}
591
592} // forall( T )
593
594
Note: See TracBrowser for help on using the repository browser.