source: libcfa/src/concurrency/channel.hfa@ 9eb7f07c

ADT ast-experimental
Last change on this file since 9eb7f07c was c4f411e, checked in by caparsons <caparson@…>, 2 years ago

fixed a bug which may potentially fix build

  • Property mode set to 100644
File size: 16.8 KB
RevLine 
[5e4a830]1#pragma once
2
[4a962d8]3#include <locks.hfa>
[d30e3eb]4#include <list.hfa>
[a45e21c]5#include <mutex_stmt.hfa>
[beeff61e]6#include "select.hfa"
[a45e21c]7
8// returns true if woken due to shutdown
9// blocks thread on list and releases passed lock
[beeff61e]10static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
11 select_node sn{ active_thread(), elem_ptr };
12 insert_last( queue, sn );
[a45e21c]13 unlock( lock );
14 park();
[beeff61e]15 return sn.extra == 0p;
16}
17
18// Waituntil support (un)register_select helper routine
19// Sets select node avail if not special OR case and then unlocks
20static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
21 if ( node.park_counter ) __make_select_node_available( node );
22 unlock( mutex_lock );
[a45e21c]23}
24
25// void * used for some fields since exceptions don't work with parametric polymorphism currently
26exception channel_closed {
27 // on failed insert elem is a ptr to the element attempting to be inserted
28 // on failed remove elem ptr is 0p
29 // on resumption of a failed insert this elem will be inserted
30 // so a user may modify it in the resumption handler
31 void * elem;
32
33 // pointer to chan that is closed
34 void * closed_chan;
35};
36vtable(channel_closed) channel_closed_vt;
37
38// #define CHAN_STATS // define this to get channel stats printed in dtor
39
[4a962d8]40forall( T ) {
[d30e3eb]41
[a45e21c]42struct __attribute__((aligned(128))) channel {
43 size_t size, front, back, count;
[4a962d8]44 T * buffer;
[beeff61e]45 dlist( select_node ) prods, cons; // lists of blocked threads
[a45e21c]46 go_mutex mutex_lock; // MX lock
47 bool closed; // indicates channel close/open
48 #ifdef CHAN_STATS
49 size_t blocks, operations; // counts total ops and ops resulting in a blocked thd
50 #endif
[4a962d8]51};
52
53static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
54 size = _size;
55 front = back = count = 0;
[beeff61e]56 if ( size != 0 ) buffer = aalloc( size );
[4a962d8]57 prods{};
58 cons{};
59 mutex_lock{};
[a45e21c]60 closed = false;
61 #ifdef CHAN_STATS
62 blocks = 0;
63 operations = 0;
64 #endif
[4a962d8]65}
66
67static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
[a45e21c]68static inline void ^?{}( channel(T) &c ) with(c) {
69 #ifdef CHAN_STATS
70 printf("Channel %p Blocks: %lu, Operations: %lu, %.2f%% of ops blocked\n", &c, blocks, operations, ((double)blocks)/operations * 100);
71 #endif
72 verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
[beeff61e]73 if ( size != 0 ) delete( buffer );
[a45e21c]74}
[42b739d7]75static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
76static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
[d30e3eb]77static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
78static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
79static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
[4a962d8]80
[a45e21c]81// closes the channel and notifies all blocked threads
82static inline void close( channel(T) & chan ) with(chan) {
83 lock( mutex_lock );
84 closed = true;
85
86 // flush waiting consumers and producers
87 while ( has_waiting_consumers( chan ) ) {
[beeff61e]88 if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
89 break; // if __handle_waituntil_OR returns false cons is empty so break
90 cons`first.extra = 0p;
[a45e21c]91 wake_one( cons );
92 }
93 while ( has_waiting_producers( chan ) ) {
[beeff61e]94 if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
95 break; // if __handle_waituntil_OR returns false prods is empty so break
96 prods`first.extra = 0p;
[a45e21c]97 wake_one( prods );
98 }
99 unlock(mutex_lock);
100}
101
102static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
103
[beeff61e]104// used to hand an element to a blocked consumer and signal it
105static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
106 memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
107 wake_one( cons );
108}
109
110// used to hand an element to a blocked producer and signal it
111static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
112 memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
113 wake_one( prods );
114}
115
[a45e21c]116static inline void flush( channel(T) & chan, T elem ) with(chan) {
117 lock( mutex_lock );
118 while ( count == 0 && !cons`isEmpty ) {
[beeff61e]119 __cons_handoff( chan, elem );
[a45e21c]120 }
121 unlock( mutex_lock );
122}
123
124// handles buffer insert
125static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
[beeff61e]126 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
[4a962d8]127 count += 1;
128 back++;
129 if ( back == size ) back = 0;
130}
131
[a45e21c]132// needed to avoid an extra copy in closed case
133static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
134 lock( mutex_lock );
135 #ifdef CHAN_STATS
136 operations++;
137 #endif
[beeff61e]138
139 ConsEmpty: if ( !cons`isEmpty ) {
140 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
141 __cons_handoff( chan, elem );
142 unlock( mutex_lock );
143 return true;
144 }
145
[a45e21c]146 if ( count == size ) { unlock( mutex_lock ); return false; }
[beeff61e]147
148 __buf_insert( chan, elem );
[a45e21c]149 unlock( mutex_lock );
150 return true;
151}
152
153// attempts a nonblocking insert
154// returns true if insert was successful, false otherwise
155static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
156
157// handles closed case of insert routine
158static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
[beeff61e]159 channel_closed except{ &channel_closed_vt, &elem, &chan };
[a45e21c]160 throwResume except; // throw closed resumption
161 if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
[d30e3eb]162}
[4a962d8]163
[42b739d7]164static inline void insert( channel(T) & chan, T elem ) with(chan) {
[a45e21c]165 // check for close before acquire mx
166 if ( unlikely(closed) ) {
167 __closed_insert( chan, elem );
168 return;
169 }
170
[4a962d8]171 lock( mutex_lock );
172
[a45e21c]173 #ifdef CHAN_STATS
174 if ( !closed ) operations++;
175 #endif
176
177 // if closed handle
178 if ( unlikely(closed) ) {
179 unlock( mutex_lock );
180 __closed_insert( chan, elem );
181 return;
182 }
183
[beeff61e]184 // buffer count must be zero if cons are blocked (also handles zero-size case)
185 ConsEmpty: if ( !cons`isEmpty ) {
186 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
187 __cons_handoff( chan, elem );
[5c931e0]188 unlock( mutex_lock );
[beeff61e]189 return;
[5c931e0]190 }
191
[4a962d8]192 // wait if buffer is full, work will be completed by someone else
[d30e3eb]193 if ( count == size ) {
[a45e21c]194 #ifdef CHAN_STATS
195 blocks++;
196 #endif
197
198 // check for if woken due to close
199 if ( unlikely( block( prods, &elem, mutex_lock ) ) )
200 __closed_insert( chan, elem );
[4a962d8]201 return;
202 } // if
203
[beeff61e]204 __buf_insert( chan, elem );
[4a962d8]205 unlock( mutex_lock );
[a45e21c]206}
[4a962d8]207
[a45e21c]208// does the buffer remove and potentially does waiting producer work
209static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
[beeff61e]210 memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
211 count -= 1;
212 front = (front + 1) % size;
[d30e3eb]213 if (count == size - 1 && !prods`isEmpty ) {
[beeff61e]214 if ( !__handle_waituntil_OR( prods ) ) return;
215 __buf_insert( chan, *(T *)prods`first.extra ); // do waiting producer work
[d30e3eb]216 wake_one( prods );
217 }
[4a962d8]218}
[0e16a2d]219
[a45e21c]220// needed to avoid an extra copy in closed case and single return val case
221static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
[0e16a2d]222 lock( mutex_lock );
[a45e21c]223 #ifdef CHAN_STATS
224 operations++;
225 #endif
[beeff61e]226
227 ZeroSize: if ( size == 0 && !prods`isEmpty ) {
228 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
229 __prods_handoff( chan, retval );
230 unlock( mutex_lock );
231 return true;
232 }
233
[a45e21c]234 if ( count == 0 ) { unlock( mutex_lock ); return false; }
[beeff61e]235
[a45e21c]236 __do_remove( chan, retval );
[0e16a2d]237 unlock( mutex_lock );
[a45e21c]238 return true;
[0e16a2d]239}
240
[a45e21c]241// attempts a nonblocking remove
242// returns [T, true] if insert was successful
243// returns [T, false] if insert was successful (T uninit)
244static inline [T, bool] try_remove( channel(T) & chan ) {
[0e16a2d]245 T retval;
[beeff61e]246 bool success = __internal_try_remove( chan, retval );
247 return [ retval, success ];
[a45e21c]248}
[0e16a2d]249
[beeff61e]250static inline T try_remove( channel(T) & chan ) {
[a45e21c]251 T retval;
252 __internal_try_remove( chan, retval );
[0e16a2d]253 return retval;
254}
255
[a45e21c]256// handles closed case of insert routine
257static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
[beeff61e]258 channel_closed except{ &channel_closed_vt, 0p, &chan };
[a45e21c]259 throwResume except; // throw resumption
260 if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
[0e16a2d]261}
262
[a45e21c]263static inline T remove( channel(T) & chan ) with(chan) {
264 T retval;
265 if ( unlikely(closed) ) {
266 __closed_remove( chan, retval );
267 return retval;
268 }
269 lock( mutex_lock );
[0e16a2d]270
[a45e21c]271 #ifdef CHAN_STATS
272 if ( !closed ) operations++;
273 #endif
[0e16a2d]274
[a45e21c]275 if ( unlikely(closed) ) {
276 unlock( mutex_lock );
277 __closed_remove( chan, retval );
278 return retval;
279 }
[0e16a2d]280
[a45e21c]281 // have to check for the zero size channel case
[beeff61e]282 ZeroSize: if ( size == 0 && !prods`isEmpty ) {
283 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
284 __prods_handoff( chan, retval );
[a45e21c]285 unlock( mutex_lock );
286 return retval;
287 }
[0e16a2d]288
[a45e21c]289 // wait if buffer is empty, work will be completed by someone else
[beeff61e]290 if ( count == 0 ) {
[a45e21c]291 #ifdef CHAN_STATS
292 blocks++;
293 #endif
294 // check for if woken due to close
295 if ( unlikely( block( cons, &retval, mutex_lock ) ) )
296 __closed_remove( chan, retval );
297 return retval;
298 }
[0e16a2d]299
300 // Remove from buffer
[a45e21c]301 __do_remove( chan, retval );
[0e16a2d]302 unlock( mutex_lock );
303 return retval;
304}
[beeff61e]305
306///////////////////////////////////////////////////////////////////////////////////////////
307// The following is support for waituntil (select) statements
308///////////////////////////////////////////////////////////////////////////////////////////
309static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
[d5187a0]310 // if ( !node`isListed && !node.park_counter ) return false; // handle special OR case
[beeff61e]311 lock( mutex_lock );
312 if ( node`isListed ) { // op wasn't performed
313 #ifdef CHAN_STATS
314 operations--;
315 #endif
316 remove( node );
317 unlock( mutex_lock );
318 return false;
319 }
320 unlock( mutex_lock );
321
322 // only return true when not special OR case, not exceptional calse and status is SAT
323 return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT;
324}
325
326// type used by select statement to capture a chan read as the selected operation
327struct chan_read {
328 T & ret;
[8607a72]329 channel(T) & chan;
[beeff61e]330};
331
332static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) {
333 &cr.chan = &chan;
334 &cr.ret = &ret;
335}
336static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; }
337
338static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) {
339 __closed_remove( chan, ret );
340 // if we get here then the insert succeeded
341 __make_select_node_available( node );
342}
343
344static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) {
345 lock( mutex_lock );
346 node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
347
348 #ifdef CHAN_STATS
349 if ( !closed ) operations++;
350 #endif
351
[c4f411e]352 if ( !node.park_counter ) {
353 // are we special case OR and front of cons is also special case OR
354 if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {
355 if ( !__make_select_node_pending( node ) ) {
356 unlock( mutex_lock );
357 return false;
358 }
359
360 if ( __handle_waituntil_OR( prods ) ) {
361 __prods_handoff( chan, ret );
362 unlock( mutex_lock );
363 return true;
364 }
365 __make_select_node_unsat( node );
366 }
367 // check if we can complete operation. If so race to establish winner in special OR case
368 if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {
369 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
370 unlock( mutex_lock );
371 return false;
372 }
[beeff61e]373 }
374 }
375
376 if ( unlikely(closed) ) {
377 unlock( mutex_lock );
378 __handle_select_closed_read( this, node );
379 return true;
380 }
381
382 // have to check for the zero size channel case
383 ZeroSize: if ( size == 0 && !prods`isEmpty ) {
384 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
385 __prods_handoff( chan, ret );
386 __set_avail_then_unlock( node, mutex_lock );
387 return true;
388 }
389
390 // wait if buffer is empty, work will be completed by someone else
391 if ( count == 0 ) {
392 #ifdef CHAN_STATS
393 blocks++;
394 #endif
395
396 insert_last( cons, node );
397 unlock( mutex_lock );
398 return false;
399 }
400
401 // Remove from buffer
402 __do_remove( chan, ret );
403 __set_avail_then_unlock( node, mutex_lock );
404 return true;
405}
406static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
407static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
408 if ( node.extra == 0p ) // check if woken up due to closed channel
409 __closed_remove( chan, ret );
410 // This is only reachable if not closed or closed exception was handled
411 return true;
412}
413
414// type used by select statement to capture a chan write as the selected operation
415struct chan_write {
416 T elem;
[8607a72]417 channel(T) & chan;
[beeff61e]418};
419
420static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) {
421 &cw.chan = &chan;
422 memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
423}
424static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; }
425
426static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) {
427 __closed_insert( chan, elem );
428 // if we get here then the insert succeeded
429 __make_select_node_available( node );
430}
431
432static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) {
433 lock( mutex_lock );
434 node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
435
436 #ifdef CHAN_STATS
437 if ( !closed ) operations++;
438 #endif
439
[c4f411e]440 // special OR case handling
441 if ( !node.park_counter ) {
442 // are we special case OR and front of cons is also special case OR
443 if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {
444 if ( !__make_select_node_pending( node ) ) {
445 unlock( mutex_lock );
446 return false;
447 }
448
449 if ( __handle_waituntil_OR( cons ) ) {
450 __cons_handoff( chan, elem );
451 unlock( mutex_lock );
452 return true;
453 }
454 __make_select_node_unsat( node );
455 }
456 // check if we can complete operation. If so race to establish winner in special OR case
457 if ( count != size || !cons`isEmpty || unlikely(closed) ) {
458 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
459 unlock( mutex_lock );
460 return false;
461 }
[beeff61e]462 }
463 }
464
465 // if closed handle
466 if ( unlikely(closed) ) {
467 unlock( mutex_lock );
468 __handle_select_closed_write( this, node );
469 return true;
470 }
471
472 // handle blocked consumer case via handoff (buffer is implicitly empty)
473 ConsEmpty: if ( !cons`isEmpty ) {
[cb69fba]474 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
[beeff61e]475 __cons_handoff( chan, elem );
476 __set_avail_then_unlock( node, mutex_lock );
477 return true;
478 }
479
480 // insert node in list if buffer is full, work will be completed by someone else
481 if ( count == size ) {
482 #ifdef CHAN_STATS
483 blocks++;
484 #endif
485
486 insert_last( prods, node );
487 unlock( mutex_lock );
488 return false;
489 } // if
490
491 // otherwise carry out write either via normal insert
492 __buf_insert( chan, elem );
493 __set_avail_then_unlock( node, mutex_lock );
494 return true;
495}
496static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
497
498static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
499 if ( node.extra == 0p ) // check if woken up due to closed channel
500 __closed_insert( chan, elem );
501
502 // This is only reachable if not closed or closed exception was handled
503 return true;
504}
505
506
[0e16a2d]507} // forall( T )
[beeff61e]508
509
510
Note: See TracBrowser for help on using the repository browser.