source: libcfa/src/concurrency/channel.hfa@ 6f774be

ast-experimental
Last change on this file since 6f774be was 6f774be, checked in by caparson <caparson@…>, 2 years ago

fixed bug where waituntil deadlock could occur

  • Property mode set to 100644
File size: 18.9 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// channel.hfa -- LIBCFATHREAD
8// Runtime locks that used with the runtime thread system.
9//
10// Author : Colby Alexander Parsons
11// Created On : Thu Jan 21 19:46:50 2022
12// Last Modified By :
13// Last Modified On :
14// Update Count :
15//
16
17#pragma once
18
19#include <locks.hfa>
20#include <list.hfa>
21#include "select.hfa"
22
23// returns true if woken due to shutdown
24// blocks thread on list and releases passed lock
25static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
26 select_node sn{ active_thread(), elem_ptr };
27 insert_last( queue, sn );
28 unlock( lock );
29 park();
30 return sn.extra == 0p;
31}
32
33// Waituntil support (un)register_select helper routine
34// Sets select node avail if not special OR case and then unlocks
35static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
36 if ( node.park_counter ) __make_select_node_available( node );
37 unlock( mutex_lock );
38}
39
40// void * used for some fields since exceptions don't work with parametric polymorphism currently
41exception channel_closed {
42 // on failed insert elem is a ptr to the element attempting to be inserted
43 // on failed remove elem ptr is 0p
44 // on resumption of a failed insert this elem will be inserted
45 // so a user may modify it in the resumption handler
46 void * elem;
47
48 // pointer to chan that is closed
49 void * closed_chan;
50};
51vtable(channel_closed) channel_closed_vt;
52
53// #define CHAN_STATS // define this to get channel stats printed in dtor
54
55forall( T ) {
56
57struct __attribute__((aligned(128))) channel {
58 size_t size, front, back, count;
59 T * buffer;
60 dlist( select_node ) prods, cons; // lists of blocked threads
61 go_mutex mutex_lock; // MX lock
62 bool closed; // indicates channel close/open
63 #ifdef CHAN_STATS
64 size_t p_blocks, p_ops, c_blocks, c_ops; // counts total ops and ops resulting in a blocked thd
65 #endif
66};
67
68static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
69 size = _size;
70 front = back = count = 0;
71 if ( size != 0 ) buffer = aalloc( size );
72 prods{};
73 cons{};
74 mutex_lock{};
75 closed = false;
76 #ifdef CHAN_STATS
77 p_blocks = 0;
78 p_ops = 0;
79 c_blocks = 0;
80 c_ops = 0;
81 #endif
82}
83
84static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
85static inline void ^?{}( channel(T) &c ) with(c) {
86 #ifdef CHAN_STATS
87 printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100);
88 printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100);
89 printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100);
90 #endif
91 verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || cons`isEmpty && prods`isEmpty,
92 "Attempted to delete channel with waiting threads (Deadlock).\n" );
93 if ( size != 0 ) delete( buffer );
94}
95static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }
96static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }
97static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
98static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
99static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
100
101// closes the channel and notifies all blocked threads
102static inline void close( channel(T) & chan ) with(chan) {
103 lock( mutex_lock );
104 closed = true;
105
106 // flush waiting consumers and producers
107 while ( has_waiting_consumers( chan ) ) {
108 if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
109 break; // if __handle_waituntil_OR returns false cons is empty so break
110 cons`first.extra = 0p;
111 wake_one( cons );
112 }
113 while ( has_waiting_producers( chan ) ) {
114 if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
115 break; // if __handle_waituntil_OR returns false prods is empty so break
116 prods`first.extra = 0p;
117 wake_one( prods );
118 }
119 unlock(mutex_lock);
120}
121
122static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
123
124// used to hand an element to a blocked consumer and signal it
125static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
126 memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
127 wake_one( cons );
128}
129
130// used to hand an element to a blocked producer and signal it
131static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
132 memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
133 wake_one( prods );
134}
135
136static inline void flush( channel(T) & chan, T elem ) with(chan) {
137 lock( mutex_lock );
138 while ( count == 0 && !cons`isEmpty ) {
139 __cons_handoff( chan, elem );
140 }
141 unlock( mutex_lock );
142}
143
144// handles buffer insert
145static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
146 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
147 count += 1;
148 back++;
149 if ( back == size ) back = 0;
150}
151
152// needed to avoid an extra copy in closed case
153static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
154 lock( mutex_lock );
155 #ifdef CHAN_STATS
156 p_ops++;
157 #endif
158
159 ConsEmpty: if ( !cons`isEmpty ) {
160 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
161 __cons_handoff( chan, elem );
162 unlock( mutex_lock );
163 return true;
164 }
165
166 if ( count == size ) { unlock( mutex_lock ); return false; }
167
168 __buf_insert( chan, elem );
169 unlock( mutex_lock );
170 return true;
171}
172
173// attempts a nonblocking insert
174// returns true if insert was successful, false otherwise
175static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
176
177// handles closed case of insert routine
178static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
179 channel_closed except{ &channel_closed_vt, &elem, &chan };
180 throwResume except; // throw closed resumption
181 if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
182}
183
184static inline void insert( channel(T) & chan, T elem ) with(chan) {
185 // check for close before acquire mx
186 if ( unlikely(closed) ) {
187 __closed_insert( chan, elem );
188 return;
189 }
190
191 lock( mutex_lock );
192
193 #ifdef CHAN_STATS
194 if ( !closed ) p_ops++;
195 #endif
196
197 // if closed handle
198 if ( unlikely(closed) ) {
199 unlock( mutex_lock );
200 __closed_insert( chan, elem );
201 return;
202 }
203
204 // buffer count must be zero if cons are blocked (also handles zero-size case)
205 ConsEmpty: if ( !cons`isEmpty ) {
206 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
207 __cons_handoff( chan, elem );
208 unlock( mutex_lock );
209 return;
210 }
211
212 // wait if buffer is full, work will be completed by someone else
213 if ( count == size ) {
214 #ifdef CHAN_STATS
215 p_blocks++;
216 #endif
217
218 // check for if woken due to close
219 if ( unlikely( block( prods, &elem, mutex_lock ) ) )
220 __closed_insert( chan, elem );
221 return;
222 } // if
223
224 __buf_insert( chan, elem );
225 unlock( mutex_lock );
226}
227
228// does the buffer remove and potentially does waiting producer work
229static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
230 memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
231 count -= 1;
232 front = (front + 1) % size;
233 if (count == size - 1 && !prods`isEmpty ) {
234 if ( !__handle_waituntil_OR( prods ) ) return;
235 __buf_insert( chan, *(T *)prods`first.extra ); // do waiting producer work
236 wake_one( prods );
237 }
238}
239
240// needed to avoid an extra copy in closed case and single return val case
241static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
242 lock( mutex_lock );
243 #ifdef CHAN_STATS
244 c_ops++;
245 #endif
246
247 ZeroSize: if ( size == 0 && !prods`isEmpty ) {
248 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
249 __prods_handoff( chan, retval );
250 unlock( mutex_lock );
251 return true;
252 }
253
254 if ( count == 0 ) { unlock( mutex_lock ); return false; }
255
256 __do_remove( chan, retval );
257 unlock( mutex_lock );
258 return true;
259}
260
261// attempts a nonblocking remove
262// returns [T, true] if insert was successful
263// returns [T, false] if insert was successful (T uninit)
264static inline [T, bool] try_remove( channel(T) & chan ) {
265 T retval;
266 bool success = __internal_try_remove( chan, retval );
267 return [ retval, success ];
268}
269
270static inline T try_remove( channel(T) & chan ) {
271 T retval;
272 __internal_try_remove( chan, retval );
273 return retval;
274}
275
276// handles closed case of insert routine
277static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
278 channel_closed except{ &channel_closed_vt, 0p, &chan };
279 throwResume except; // throw resumption
280 if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
281}
282
283static inline T remove( channel(T) & chan ) with(chan) {
284 T retval;
285 if ( unlikely(closed) ) {
286 __closed_remove( chan, retval );
287 return retval;
288 }
289 lock( mutex_lock );
290
291 #ifdef CHAN_STATS
292 if ( !closed ) c_ops++;
293 #endif
294
295 if ( unlikely(closed) ) {
296 unlock( mutex_lock );
297 __closed_remove( chan, retval );
298 return retval;
299 }
300
301 // have to check for the zero size channel case
302 ZeroSize: if ( size == 0 && !prods`isEmpty ) {
303 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
304 __prods_handoff( chan, retval );
305 unlock( mutex_lock );
306 return retval;
307 }
308
309 // wait if buffer is empty, work will be completed by someone else
310 if ( count == 0 ) {
311 #ifdef CHAN_STATS
312 c_blocks++;
313 #endif
314 // check for if woken due to close
315 if ( unlikely( block( cons, &retval, mutex_lock ) ) )
316 __closed_remove( chan, retval );
317 return retval;
318 }
319
320 // Remove from buffer
321 __do_remove( chan, retval );
322 unlock( mutex_lock );
323 return retval;
324}
325
326///////////////////////////////////////////////////////////////////////////////////////////
327// The following is support for waituntil (select) statements
328///////////////////////////////////////////////////////////////////////////////////////////
329static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
330 if ( !node`isListed && !node.park_counter ) return false; // handle special OR case
331 lock( mutex_lock );
332 if ( node`isListed ) { // op wasn't performed
333 remove( node );
334 unlock( mutex_lock );
335 return false;
336 }
337 unlock( mutex_lock );
338
339 // only return true when not special OR case, not exceptional calse and status is SAT
340 return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT;
341}
342
343// special case of __handle_waituntil_OR, that does some work to avoid starvation/deadlock case
344static inline bool __handle_pending( dlist( select_node ) & queue, select_node & mine ) {
345 while ( !queue`isEmpty ) {
346 // if node not a special OR case or if we win the special OR case race break
347 if ( !queue`first.clause_status || queue`first.park_counter || __pending_set_other( queue`first, mine, ((unsigned long int)(&(queue`first))) ) )
348 return true;
349
350 // our node lost the race when toggling in __pending_set_other
351 if ( *mine.clause_status != __SELECT_PENDING )
352 return false;
353
354 // otherwise we lost the special OR race so discard node
355 try_pop_front( queue );
356 }
357 return false;
358}
359
360// type used by select statement to capture a chan read as the selected operation
361struct chan_read {
362 T & ret;
363 channel(T) & chan;
364};
365
366static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) {
367 &cr.chan = &chan;
368 &cr.ret = &ret;
369}
370static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; }
371
372static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) {
373 __closed_remove( chan, ret );
374 // if we get here then the insert succeeded
375 __make_select_node_available( node );
376}
377
378static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) {
379 lock( mutex_lock );
380 node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
381
382 #ifdef CHAN_STATS
383 if ( !closed ) c_ops++;
384 #endif
385
386 if ( !node.park_counter ) {
387 // are we special case OR and front of cons is also special case OR
388 if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {
389 if ( !__make_select_node_pending( node ) ) {
390 unlock( mutex_lock );
391 return false;
392 }
393
394 if ( __handle_pending( prods, node ) ) {
395 __prods_handoff( chan, ret );
396 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
397 unlock( mutex_lock );
398 return true;
399 }
400 if ( *node.clause_status == __SELECT_PENDING )
401 __make_select_node_unsat( node );
402 }
403 // check if we can complete operation. If so race to establish winner in special OR case
404 if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {
405 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
406 unlock( mutex_lock );
407 return false;
408 }
409 }
410 }
411
412 if ( unlikely(closed) ) {
413 unlock( mutex_lock );
414 __handle_select_closed_read( this, node );
415 return true;
416 }
417
418 // have to check for the zero size channel case
419 ZeroSize: if ( size == 0 && !prods`isEmpty ) {
420 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
421 __prods_handoff( chan, ret );
422 __set_avail_then_unlock( node, mutex_lock );
423 return true;
424 }
425
426 // wait if buffer is empty, work will be completed by someone else
427 if ( count == 0 ) {
428 #ifdef CHAN_STATS
429 c_blocks++;
430 #endif
431
432 insert_last( cons, node );
433 unlock( mutex_lock );
434 return false;
435 }
436
437 // Remove from buffer
438 __do_remove( chan, ret );
439 __set_avail_then_unlock( node, mutex_lock );
440 return true;
441}
442static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
443static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
444 if ( node.extra == 0p ) // check if woken up due to closed channel
445 __closed_remove( chan, ret );
446 // This is only reachable if not closed or closed exception was handled
447 return true;
448}
449
450// type used by select statement to capture a chan write as the selected operation
451struct chan_write {
452 T elem;
453 channel(T) & chan;
454};
455
456static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) {
457 &cw.chan = &chan;
458 memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
459}
460static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; }
461
462static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) {
463 __closed_insert( chan, elem );
464 // if we get here then the insert succeeded
465 __make_select_node_available( node );
466}
467
468static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) {
469 lock( mutex_lock );
470 node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
471
472 #ifdef CHAN_STATS
473 if ( !closed ) p_ops++;
474 #endif
475
476 // special OR case handling
477 if ( !node.park_counter ) {
478 // are we special case OR and front of cons is also special case OR
479 if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {
480 if ( !__make_select_node_pending( node ) ) {
481 unlock( mutex_lock );
482 return false;
483 }
484
485 if ( __handle_pending( cons, node ) ) {
486 __cons_handoff( chan, elem );
487 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
488 unlock( mutex_lock );
489 return true;
490 }
491 if ( *node.clause_status == __SELECT_PENDING )
492 __make_select_node_unsat( node );
493 }
494 // check if we can complete operation. If so race to establish winner in special OR case
495 if ( count != size || !cons`isEmpty || unlikely(closed) ) {
496 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
497 unlock( mutex_lock );
498 return false;
499 }
500 }
501 }
502
503 // if closed handle
504 if ( unlikely(closed) ) {
505 unlock( mutex_lock );
506 __handle_select_closed_write( this, node );
507 return true;
508 }
509
510 // handle blocked consumer case via handoff (buffer is implicitly empty)
511 ConsEmpty: if ( !cons`isEmpty ) {
512 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
513 __cons_handoff( chan, elem );
514 __set_avail_then_unlock( node, mutex_lock );
515 return true;
516 }
517
518 // insert node in list if buffer is full, work will be completed by someone else
519 if ( count == size ) {
520 #ifdef CHAN_STATS
521 p_blocks++;
522 #endif
523
524 insert_last( prods, node );
525 unlock( mutex_lock );
526 return false;
527 } // if
528
529 // otherwise carry out write either via normal insert
530 __buf_insert( chan, elem );
531 __set_avail_then_unlock( node, mutex_lock );
532 return true;
533}
534static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
535
536static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
537 if ( node.extra == 0p ) // check if woken up due to closed channel
538 __closed_insert( chan, elem );
539
540 // This is only reachable if not closed or closed exception was handled
541 return true;
542}
543
544} // forall( T )
545
546
Note: See TracBrowser for help on using the repository browser.