- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/channel.hfa
r88b49bb ra45e21c 1 //2 // Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo3 //4 // The contents of this file are covered under the licence agreement in the5 // file "LICENCE" distributed with Cforall.6 //7 // channel.hfa -- LIBCFATHREAD8 // Runtime locks that used with the runtime thread system.9 //10 // Author : Colby Alexander Parsons11 // Created On : Thu Jan 21 19:46:50 202212 // Last Modified By :13 // Last Modified On :14 // Update Count :15 //16 17 1 #pragma once 18 2 19 3 #include <locks.hfa> 20 4 #include <list.hfa> 21 #include "select.hfa" 5 #include <mutex_stmt.hfa> 6 7 // link field used for threads waiting on channel 8 struct wait_link { 9 // used to put wait_link on a dl queue 10 inline dlink(wait_link); 11 12 // waiting thread 13 struct thread$ * t; 14 15 // shadow field 16 void * elem; 17 }; 18 P9_EMBEDDED( wait_link, dlink(wait_link) ) 19 20 static inline void ?{}( wait_link & this, thread$ * t, void * elem ) { 21 this.t = t; 22 this.elem = elem; 23 } 24 25 // wake one thread from the list 26 static inline void wake_one( dlist( wait_link ) & queue ) { 27 wait_link & popped = try_pop_front( queue ); 28 unpark( popped.t ); 29 } 22 30 23 31 // returns true if woken due to shutdown 24 32 // blocks thread on list and releases passed lock 25 static inline bool block( dlist( select_node) & queue, void * elem_ptr, go_mutex & lock ) {26 select_node sn{ active_thread(), elem_ptr };27 insert_last( queue, sn);33 static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) { 34 wait_link w{ active_thread(), elem_ptr }; 35 insert_last( queue, w ); 28 36 unlock( lock ); 29 37 park(); 30 return sn.extra == 0p; 31 } 32 33 // Waituntil support (un)register_select helper routine 34 // Sets select node avail if not special OR case and then unlocks 35 static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) { 36 if ( node.park_counter ) __make_select_node_available( node ); 37 unlock( mutex_lock ); 38 return w.elem == 0p; 38 39 } 39 40 … … 58 59 size_t size, front, back, count; 59 60 T * buffer; 60 dlist( select_node) prods, cons; // lists of blocked threads61 go_mutex mutex_lock; 62 bool closed; 63 #ifdef CHAN_STATS 64 size_t p_blocks, p_ops, c_blocks, c_ops; // counts total ops and ops resulting in a blocked thd61 dlist( wait_link ) prods, cons; // lists of blocked threads 62 go_mutex mutex_lock; // MX lock 63 bool closed; // indicates channel close/open 64 #ifdef CHAN_STATS 65 size_t blocks, operations; // counts total ops and ops resulting in a blocked thd 65 66 #endif 66 67 }; … … 69 70 size = _size; 70 71 front = back = count = 0; 71 if ( size != 0 )buffer = aalloc( size );72 buffer = aalloc( size ); 72 73 prods{}; 73 74 cons{}; … … 75 76 closed = false; 76 77 #ifdef CHAN_STATS 77 p_blocks = 0; 78 p_ops = 0; 79 c_blocks = 0; 80 c_ops = 0; 78 blocks = 0; 79 operations = 0; 81 80 #endif 82 81 } … … 85 84 static inline void ^?{}( channel(T) &c ) with(c) { 86 85 #ifdef CHAN_STATS 87 printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100); 88 printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100); 89 printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100); 90 #endif 91 verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || cons`isEmpty && prods`isEmpty, 92 "Attempted to delete channel with waiting threads (Deadlock).\n" ); 93 if ( size != 0 ) delete( buffer ); 94 } 95 static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); } 96 static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); } 86 printf("Channel %p Blocks: %lu, Operations: %lu, %.2f%% of ops blocked\n", &c, blocks, operations, ((double)blocks)/operations * 100); 87 #endif 88 verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" ); 89 delete( buffer ); 90 } 91 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; } 92 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; } 97 93 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; } 98 94 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; } … … 106 102 // flush waiting consumers and producers 107 103 while ( has_waiting_consumers( chan ) ) { 108 if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race 109 break; // if __handle_waituntil_OR returns false cons is empty so break 110 cons`first.extra = 0p; 104 cons`first.elem = 0p; 111 105 wake_one( cons ); 112 106 } 113 107 while ( has_waiting_producers( chan ) ) { 114 if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race 115 break; // if __handle_waituntil_OR returns false prods is empty so break 116 prods`first.extra = 0p; 108 prods`first.elem = 0p; 117 109 wake_one( prods ); 118 110 } … … 122 114 static inline void is_closed( channel(T) & chan ) with(chan) { return closed; } 123 115 124 // used to hand an element to a blocked consumer and signal it125 static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {126 memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work127 wake_one( cons );128 }129 130 // used to hand an element to a blocked producer and signal it131 static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {132 memcpy( (void *)&retval, prods`first.extra, sizeof(T) );133 wake_one( prods );134 }135 136 116 static inline void flush( channel(T) & chan, T elem ) with(chan) { 137 117 lock( mutex_lock ); 138 118 while ( count == 0 && !cons`isEmpty ) { 139 __cons_handoff( chan, elem ); 119 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 120 wake_one( cons ); 140 121 } 141 122 unlock( mutex_lock ); … … 144 125 // handles buffer insert 145 126 static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) { 146 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T));127 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T)); 147 128 count += 1; 148 129 back++; … … 150 131 } 151 132 133 // does the buffer insert or hands elem directly to consumer if one is waiting 134 static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) { 135 if ( count == 0 && !cons`isEmpty ) { 136 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 137 wake_one( cons ); 138 } else __buf_insert( chan, elem ); 139 } 140 152 141 // needed to avoid an extra copy in closed case 153 142 static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) { 154 143 lock( mutex_lock ); 155 144 #ifdef CHAN_STATS 156 p_ops++; 157 #endif 158 159 ConsEmpty: if ( !cons`isEmpty ) { 160 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 161 __cons_handoff( chan, elem ); 162 unlock( mutex_lock ); 163 return true; 164 } 165 145 operations++; 146 #endif 166 147 if ( count == size ) { unlock( mutex_lock ); return false; } 167 168 __buf_insert( chan, elem ); 148 __do_insert( chan, elem ); 169 149 unlock( mutex_lock ); 170 150 return true; … … 177 157 // handles closed case of insert routine 178 158 static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) { 179 channel_closed except{ 159 channel_closed except{&channel_closed_vt, &elem, &chan }; 180 160 throwResume except; // throw closed resumption 181 161 if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination … … 192 172 193 173 #ifdef CHAN_STATS 194 if ( !closed ) p_ops++;174 if ( !closed ) operations++; 195 175 #endif 196 176 … … 202 182 } 203 183 204 // buffer count must be zero if cons are blocked (also handles zero-size case)205 ConsEmpty: if (!cons`isEmpty ) {206 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;207 __cons_handoff( chan, elem);184 // have to check for the zero size channel case 185 if ( size == 0 && !cons`isEmpty ) { 186 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); 187 wake_one( cons ); 208 188 unlock( mutex_lock ); 209 return ;189 return true; 210 190 } 211 191 … … 213 193 if ( count == size ) { 214 194 #ifdef CHAN_STATS 215 p_blocks++;195 blocks++; 216 196 #endif 217 197 … … 222 202 } // if 223 203 224 __buf_insert( chan, elem ); 225 unlock( mutex_lock ); 204 if ( count == 0 && !cons`isEmpty ) { 205 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 206 wake_one( cons ); 207 } else __buf_insert( chan, elem ); 208 209 unlock( mutex_lock ); 210 return; 211 } 212 213 // handles buffer remove 214 static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) { 215 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T)); 216 count -= 1; 217 front = (front + 1) % size; 226 218 } 227 219 228 220 // does the buffer remove and potentially does waiting producer work 229 221 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) { 230 memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) ); 231 count -= 1; 232 front = (front + 1) % size; 222 __buf_remove( chan, retval ); 233 223 if (count == size - 1 && !prods`isEmpty ) { 234 if ( !__handle_waituntil_OR( prods ) ) return; 235 __buf_insert( chan, *(T *)prods`first.extra ); // do waiting producer work 224 __buf_insert( chan, *(T *)prods`first.elem ); // do waiting producer work 236 225 wake_one( prods ); 237 226 } … … 242 231 lock( mutex_lock ); 243 232 #ifdef CHAN_STATS 244 c_ops++; 245 #endif 246 247 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 248 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 249 __prods_handoff( chan, retval ); 250 unlock( mutex_lock ); 251 return true; 252 } 253 233 operations++; 234 #endif 254 235 if ( count == 0 ) { unlock( mutex_lock ); return false; } 255 256 236 __do_remove( chan, retval ); 257 237 unlock( mutex_lock ); … … 264 244 static inline [T, bool] try_remove( channel(T) & chan ) { 265 245 T retval; 266 bool success = __internal_try_remove( chan, retval ); 267 return [ retval, success ]; 268 } 269 270 static inline T try_remove( channel(T) & chan ) { 246 return [ retval, __internal_try_remove( chan, retval ) ]; 247 } 248 249 static inline T try_remove( channel(T) & chan, T elem ) { 271 250 T retval; 272 251 __internal_try_remove( chan, retval ); … … 276 255 // handles closed case of insert routine 277 256 static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) { 278 channel_closed except{ 257 channel_closed except{&channel_closed_vt, 0p, &chan }; 279 258 throwResume except; // throw resumption 280 259 if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination … … 290 269 291 270 #ifdef CHAN_STATS 292 if ( !closed ) c_ops++;271 if ( !closed ) operations++; 293 272 #endif 294 273 … … 300 279 301 280 // have to check for the zero size channel case 302 ZeroSize:if ( size == 0 && !prods`isEmpty ) {303 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;304 __prods_handoff( chan, retval);281 if ( size == 0 && !prods`isEmpty ) { 282 memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T)); 283 wake_one( prods ); 305 284 unlock( mutex_lock ); 306 285 return retval; … … 308 287 309 288 // wait if buffer is empty, work will be completed by someone else 310 if ( count == 0) {289 if (count == 0) { 311 290 #ifdef CHAN_STATS 312 c_blocks++;291 blocks++; 313 292 #endif 314 293 // check for if woken due to close … … 320 299 // Remove from buffer 321 300 __do_remove( chan, retval ); 301 322 302 unlock( mutex_lock ); 323 303 return retval; 324 304 } 325 326 ///////////////////////////////////////////////////////////////////////////////////////////327 // The following is support for waituntil (select) statements328 ///////////////////////////////////////////////////////////////////////////////////////////329 static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {330 if ( !node`isListed && !node.park_counter ) return false; // handle special OR case331 lock( mutex_lock );332 if ( node`isListed ) { // op wasn't performed333 remove( node );334 unlock( mutex_lock );335 return false;336 }337 unlock( mutex_lock );338 339 // only return true when not special OR case, not exceptional calse and status is SAT340 return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT;341 }342 343 // type used by select statement to capture a chan read as the selected operation344 struct chan_read {345 T & ret;346 channel(T) & chan;347 };348 349 static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) {350 &cr.chan = &chan;351 &cr.ret = &ret;352 }353 static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; }354 355 static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) {356 __closed_remove( chan, ret );357 // if we get here then the insert succeeded358 __make_select_node_available( node );359 }360 361 static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) {362 lock( mutex_lock );363 node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close364 365 #ifdef CHAN_STATS366 if ( !closed ) c_ops++;367 #endif368 369 if ( !node.park_counter ) {370 // are we special case OR and front of cons is also special case OR371 if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {372 if ( !__make_select_node_pending( node ) ) {373 unlock( mutex_lock );374 return false;375 }376 377 if ( __handle_waituntil_OR( prods ) ) {378 __prods_handoff( chan, ret );379 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node380 unlock( mutex_lock );381 return true;382 }383 __make_select_node_unsat( node );384 }385 // check if we can complete operation. If so race to establish winner in special OR case386 if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {387 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering388 unlock( mutex_lock );389 return false;390 }391 }392 }393 394 if ( unlikely(closed) ) {395 unlock( mutex_lock );396 __handle_select_closed_read( this, node );397 return true;398 }399 400 // have to check for the zero size channel case401 ZeroSize: if ( size == 0 && !prods`isEmpty ) {402 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;403 __prods_handoff( chan, ret );404 __set_avail_then_unlock( node, mutex_lock );405 return true;406 }407 408 // wait if buffer is empty, work will be completed by someone else409 if ( count == 0 ) {410 #ifdef CHAN_STATS411 c_blocks++;412 #endif413 414 insert_last( cons, node );415 unlock( mutex_lock );416 return false;417 }418 419 // Remove from buffer420 __do_remove( chan, ret );421 __set_avail_then_unlock( node, mutex_lock );422 return true;423 }424 static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }425 static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {426 if ( node.extra == 0p ) // check if woken up due to closed channel427 __closed_remove( chan, ret );428 // This is only reachable if not closed or closed exception was handled429 return true;430 }431 432 // type used by select statement to capture a chan write as the selected operation433 struct chan_write {434 T elem;435 channel(T) & chan;436 };437 438 static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) {439 &cw.chan = &chan;440 memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );441 }442 static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; }443 444 static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) {445 __closed_insert( chan, elem );446 // if we get here then the insert succeeded447 __make_select_node_available( node );448 }449 450 static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) {451 lock( mutex_lock );452 node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close453 454 #ifdef CHAN_STATS455 if ( !closed ) p_ops++;456 #endif457 458 // special OR case handling459 if ( !node.park_counter ) {460 // are we special case OR and front of cons is also special case OR461 if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {462 if ( !__make_select_node_pending( node ) ) {463 unlock( mutex_lock );464 return false;465 }466 467 if ( __handle_waituntil_OR( cons ) ) {468 __cons_handoff( chan, elem );469 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node470 unlock( mutex_lock );471 return true;472 }473 __make_select_node_unsat( node );474 }475 // check if we can complete operation. If so race to establish winner in special OR case476 if ( count != size || !cons`isEmpty || unlikely(closed) ) {477 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering478 unlock( mutex_lock );479 return false;480 }481 }482 }483 484 // if closed handle485 if ( unlikely(closed) ) {486 unlock( mutex_lock );487 __handle_select_closed_write( this, node );488 return true;489 }490 491 // handle blocked consumer case via handoff (buffer is implicitly empty)492 ConsEmpty: if ( !cons`isEmpty ) {493 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;494 __cons_handoff( chan, elem );495 __set_avail_then_unlock( node, mutex_lock );496 return true;497 }498 499 // insert node in list if buffer is full, work will be completed by someone else500 if ( count == size ) {501 #ifdef CHAN_STATS502 p_blocks++;503 #endif504 505 insert_last( prods, node );506 unlock( mutex_lock );507 return false;508 } // if509 510 // otherwise carry out write either via normal insert511 __buf_insert( chan, elem );512 __set_avail_then_unlock( node, mutex_lock );513 return true;514 }515 static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }516 517 static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {518 if ( node.extra == 0p ) // check if woken up due to closed channel519 __closed_insert( chan, elem );520 521 // This is only reachable if not closed or closed exception was handled522 return true;523 }524 525 305 } // forall( T ) 526 527
Note:
See TracChangeset
for help on using the changeset viewer.