- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/channel.hfa
ra45e21c r88b49bb 1 // 2 // Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo 3 // 4 // The contents of this file are covered under the licence agreement in the 5 // file "LICENCE" distributed with Cforall. 6 // 7 // channel.hfa -- LIBCFATHREAD 8 // Runtime locks that used with the runtime thread system. 9 // 10 // Author : Colby Alexander Parsons 11 // Created On : Thu Jan 21 19:46:50 2022 12 // Last Modified By : 13 // Last Modified On : 14 // Update Count : 15 // 16 1 17 #pragma once 2 18 3 19 #include <locks.hfa> 4 20 #include <list.hfa> 5 #include <mutex_stmt.hfa> 6 7 // link field used for threads waiting on channel 8 struct wait_link { 9 // used to put wait_link on a dl queue 10 inline dlink(wait_link); 11 12 // waiting thread 13 struct thread$ * t; 14 15 // shadow field 16 void * elem; 17 }; 18 P9_EMBEDDED( wait_link, dlink(wait_link) ) 19 20 static inline void ?{}( wait_link & this, thread$ * t, void * elem ) { 21 this.t = t; 22 this.elem = elem; 23 } 24 25 // wake one thread from the list 26 static inline void wake_one( dlist( wait_link ) & queue ) { 27 wait_link & popped = try_pop_front( queue ); 28 unpark( popped.t ); 29 } 21 #include "select.hfa" 30 22 31 23 // returns true if woken due to shutdown 32 24 // blocks thread on list and releases passed lock 33 static inline bool block( dlist( wait_link) & queue, void * elem_ptr, go_mutex & lock ) {34 wait_link w{ active_thread(), elem_ptr };35 insert_last( queue, w);25 static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) { 26 select_node sn{ active_thread(), elem_ptr }; 27 insert_last( queue, sn ); 36 28 unlock( lock ); 37 29 park(); 38 return w.elem == 0p; 30 return sn.extra == 0p; 31 } 32 33 // Waituntil support (un)register_select helper routine 34 // Sets select node avail if not special OR case and then unlocks 35 static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) { 36 if ( node.park_counter ) __make_select_node_available( node ); 37 unlock( mutex_lock ); 39 38 } 40 39 … … 59 58 size_t size, front, back, count; 60 59 T * buffer; 61 dlist( wait_link) prods, cons; // lists of blocked threads62 go_mutex mutex_lock; // MX lock63 bool closed; // indicates channel close/open64 #ifdef CHAN_STATS 65 size_t blocks, operations; // counts total ops and ops resulting in a blocked thd60 dlist( select_node ) prods, cons; // lists of blocked threads 61 go_mutex mutex_lock; // MX lock 62 bool closed; // indicates channel close/open 63 #ifdef CHAN_STATS 64 size_t p_blocks, p_ops, c_blocks, c_ops; // counts total ops and ops resulting in a blocked thd 66 65 #endif 67 66 }; … … 70 69 size = _size; 71 70 front = back = count = 0; 72 buffer = aalloc( size );71 if ( size != 0 ) buffer = aalloc( size ); 73 72 prods{}; 74 73 cons{}; … … 76 75 closed = false; 77 76 #ifdef CHAN_STATS 78 blocks = 0; 79 operations = 0; 77 p_blocks = 0; 78 p_ops = 0; 79 c_blocks = 0; 80 c_ops = 0; 80 81 #endif 81 82 } … … 84 85 static inline void ^?{}( channel(T) &c ) with(c) { 85 86 #ifdef CHAN_STATS 86 printf("Channel %p Blocks: %lu, Operations: %lu, %.2f%% of ops blocked\n", &c, blocks, operations, ((double)blocks)/operations * 100); 87 #endif 88 verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" ); 89 delete( buffer ); 90 } 91 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; } 92 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; } 87 printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100); 88 printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100); 89 printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100); 90 #endif 91 verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || cons`isEmpty && prods`isEmpty, 92 "Attempted to delete channel with waiting threads (Deadlock).\n" ); 93 if ( size != 0 ) delete( buffer ); 94 } 95 static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); } 96 static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); } 93 97 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; } 94 98 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; } … … 102 106 // flush waiting consumers and producers 103 107 while ( has_waiting_consumers( chan ) ) { 104 cons`first.elem = 0p; 108 if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race 109 break; // if __handle_waituntil_OR returns false cons is empty so break 110 cons`first.extra = 0p; 105 111 wake_one( cons ); 106 112 } 107 113 while ( has_waiting_producers( chan ) ) { 108 prods`first.elem = 0p; 114 if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race 115 break; // if __handle_waituntil_OR returns false prods is empty so break 116 prods`first.extra = 0p; 109 117 wake_one( prods ); 110 118 } … … 114 122 static inline void is_closed( channel(T) & chan ) with(chan) { return closed; } 115 123 124 // used to hand an element to a blocked consumer and signal it 125 static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) { 126 memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work 127 wake_one( cons ); 128 } 129 130 // used to hand an element to a blocked producer and signal it 131 static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) { 132 memcpy( (void *)&retval, prods`first.extra, sizeof(T) ); 133 wake_one( prods ); 134 } 135 116 136 static inline void flush( channel(T) & chan, T elem ) with(chan) { 117 137 lock( mutex_lock ); 118 138 while ( count == 0 && !cons`isEmpty ) { 119 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 120 wake_one( cons ); 139 __cons_handoff( chan, elem ); 121 140 } 122 141 unlock( mutex_lock ); … … 125 144 // handles buffer insert 126 145 static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) { 127 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T));146 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) ); 128 147 count += 1; 129 148 back++; … … 131 150 } 132 151 133 // does the buffer insert or hands elem directly to consumer if one is waiting134 static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {135 if ( count == 0 && !cons`isEmpty ) {136 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work137 wake_one( cons );138 } else __buf_insert( chan, elem );139 }140 141 152 // needed to avoid an extra copy in closed case 142 153 static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) { 143 154 lock( mutex_lock ); 144 155 #ifdef CHAN_STATS 145 operations++; 146 #endif 156 p_ops++; 157 #endif 158 159 ConsEmpty: if ( !cons`isEmpty ) { 160 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 161 __cons_handoff( chan, elem ); 162 unlock( mutex_lock ); 163 return true; 164 } 165 147 166 if ( count == size ) { unlock( mutex_lock ); return false; } 148 __do_insert( chan, elem ); 167 168 __buf_insert( chan, elem ); 149 169 unlock( mutex_lock ); 150 170 return true; … … 157 177 // handles closed case of insert routine 158 178 static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) { 159 channel_closed except{ &channel_closed_vt, &elem, &chan };179 channel_closed except{ &channel_closed_vt, &elem, &chan }; 160 180 throwResume except; // throw closed resumption 161 181 if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination … … 172 192 173 193 #ifdef CHAN_STATS 174 if ( !closed ) operations++;194 if ( !closed ) p_ops++; 175 195 #endif 176 196 … … 182 202 } 183 203 184 // have to check for the zero size channel case185 if ( size == 0 &&!cons`isEmpty ) {186 memcpy(cons`first.elem, (void *)&elem, sizeof(T));187 wake_one( cons);188 unlock( mutex_lock ); 189 return true;204 // buffer count must be zero if cons are blocked (also handles zero-size case) 205 ConsEmpty: if ( !cons`isEmpty ) { 206 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 207 __cons_handoff( chan, elem ); 208 unlock( mutex_lock ); 209 return; 190 210 } 191 211 … … 193 213 if ( count == size ) { 194 214 #ifdef CHAN_STATS 195 blocks++;215 p_blocks++; 196 216 #endif 197 217 … … 202 222 } // if 203 223 204 if ( count == 0 && !cons`isEmpty ) { 205 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 206 wake_one( cons ); 207 } else __buf_insert( chan, elem ); 208 209 unlock( mutex_lock ); 210 return; 211 } 212 213 // handles buffer remove 214 static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) { 215 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T)); 224 __buf_insert( chan, elem ); 225 unlock( mutex_lock ); 226 } 227 228 // does the buffer remove and potentially does waiting producer work 229 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) { 230 memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) ); 216 231 count -= 1; 217 232 front = (front + 1) % size; 218 }219 220 // does the buffer remove and potentially does waiting producer work221 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {222 __buf_remove( chan, retval );223 233 if (count == size - 1 && !prods`isEmpty ) { 224 __buf_insert( chan, *(T *)prods`first.elem ); // do waiting producer work 234 if ( !__handle_waituntil_OR( prods ) ) return; 235 __buf_insert( chan, *(T *)prods`first.extra ); // do waiting producer work 225 236 wake_one( prods ); 226 237 } … … 231 242 lock( mutex_lock ); 232 243 #ifdef CHAN_STATS 233 operations++; 234 #endif 244 c_ops++; 245 #endif 246 247 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 248 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 249 __prods_handoff( chan, retval ); 250 unlock( mutex_lock ); 251 return true; 252 } 253 235 254 if ( count == 0 ) { unlock( mutex_lock ); return false; } 255 236 256 __do_remove( chan, retval ); 237 257 unlock( mutex_lock ); … … 244 264 static inline [T, bool] try_remove( channel(T) & chan ) { 245 265 T retval; 246 return [ retval, __internal_try_remove( chan, retval ) ]; 247 } 248 249 static inline T try_remove( channel(T) & chan, T elem ) { 266 bool success = __internal_try_remove( chan, retval ); 267 return [ retval, success ]; 268 } 269 270 static inline T try_remove( channel(T) & chan ) { 250 271 T retval; 251 272 __internal_try_remove( chan, retval ); … … 255 276 // handles closed case of insert routine 256 277 static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) { 257 channel_closed except{ &channel_closed_vt, 0p, &chan };278 channel_closed except{ &channel_closed_vt, 0p, &chan }; 258 279 throwResume except; // throw resumption 259 280 if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination … … 269 290 270 291 #ifdef CHAN_STATS 271 if ( !closed ) operations++;292 if ( !closed ) c_ops++; 272 293 #endif 273 294 … … 279 300 280 301 // have to check for the zero size channel case 281 if ( size == 0 && !prods`isEmpty ) {282 memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));283 wake_one( prods);302 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 303 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 304 __prods_handoff( chan, retval ); 284 305 unlock( mutex_lock ); 285 306 return retval; … … 287 308 288 309 // wait if buffer is empty, work will be completed by someone else 289 if ( count == 0) {310 if ( count == 0 ) { 290 311 #ifdef CHAN_STATS 291 blocks++;312 c_blocks++; 292 313 #endif 293 314 // check for if woken due to close … … 299 320 // Remove from buffer 300 321 __do_remove( chan, retval ); 301 302 322 unlock( mutex_lock ); 303 323 return retval; 304 324 } 325 326 /////////////////////////////////////////////////////////////////////////////////////////// 327 // The following is support for waituntil (select) statements 328 /////////////////////////////////////////////////////////////////////////////////////////// 329 static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) { 330 if ( !node`isListed && !node.park_counter ) return false; // handle special OR case 331 lock( mutex_lock ); 332 if ( node`isListed ) { // op wasn't performed 333 remove( node ); 334 unlock( mutex_lock ); 335 return false; 336 } 337 unlock( mutex_lock ); 338 339 // only return true when not special OR case, not exceptional calse and status is SAT 340 return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT; 341 } 342 343 // type used by select statement to capture a chan read as the selected operation 344 struct chan_read { 345 T & ret; 346 channel(T) & chan; 347 }; 348 349 static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) { 350 &cr.chan = &chan; 351 &cr.ret = &ret; 352 } 353 static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; } 354 355 static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) { 356 __closed_remove( chan, ret ); 357 // if we get here then the insert succeeded 358 __make_select_node_available( node ); 359 } 360 361 static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) { 362 lock( mutex_lock ); 363 node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close 364 365 #ifdef CHAN_STATS 366 if ( !closed ) c_ops++; 367 #endif 368 369 if ( !node.park_counter ) { 370 // are we special case OR and front of cons is also special case OR 371 if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) { 372 if ( !__make_select_node_pending( node ) ) { 373 unlock( mutex_lock ); 374 return false; 375 } 376 377 if ( __handle_waituntil_OR( prods ) ) { 378 __prods_handoff( chan, ret ); 379 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node 380 unlock( mutex_lock ); 381 return true; 382 } 383 __make_select_node_unsat( node ); 384 } 385 // check if we can complete operation. If so race to establish winner in special OR case 386 if ( count != 0 || !prods`isEmpty || unlikely(closed) ) { 387 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 388 unlock( mutex_lock ); 389 return false; 390 } 391 } 392 } 393 394 if ( unlikely(closed) ) { 395 unlock( mutex_lock ); 396 __handle_select_closed_read( this, node ); 397 return true; 398 } 399 400 // have to check for the zero size channel case 401 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 402 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 403 __prods_handoff( chan, ret ); 404 __set_avail_then_unlock( node, mutex_lock ); 405 return true; 406 } 407 408 // wait if buffer is empty, work will be completed by someone else 409 if ( count == 0 ) { 410 #ifdef CHAN_STATS 411 c_blocks++; 412 #endif 413 414 insert_last( cons, node ); 415 unlock( mutex_lock ); 416 return false; 417 } 418 419 // Remove from buffer 420 __do_remove( chan, ret ); 421 __set_avail_then_unlock( node, mutex_lock ); 422 return true; 423 } 424 static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); } 425 static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) { 426 if ( node.extra == 0p ) // check if woken up due to closed channel 427 __closed_remove( chan, ret ); 428 // This is only reachable if not closed or closed exception was handled 429 return true; 430 } 431 432 // type used by select statement to capture a chan write as the selected operation 433 struct chan_write { 434 T elem; 435 channel(T) & chan; 436 }; 437 438 static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) { 439 &cw.chan = &chan; 440 memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) ); 441 } 442 static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; } 443 444 static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) { 445 __closed_insert( chan, elem ); 446 // if we get here then the insert succeeded 447 __make_select_node_available( node ); 448 } 449 450 static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) { 451 lock( mutex_lock ); 452 node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close 453 454 #ifdef CHAN_STATS 455 if ( !closed ) p_ops++; 456 #endif 457 458 // special OR case handling 459 if ( !node.park_counter ) { 460 // are we special case OR and front of cons is also special case OR 461 if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) { 462 if ( !__make_select_node_pending( node ) ) { 463 unlock( mutex_lock ); 464 return false; 465 } 466 467 if ( __handle_waituntil_OR( cons ) ) { 468 __cons_handoff( chan, elem ); 469 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node 470 unlock( mutex_lock ); 471 return true; 472 } 473 __make_select_node_unsat( node ); 474 } 475 // check if we can complete operation. If so race to establish winner in special OR case 476 if ( count != size || !cons`isEmpty || unlikely(closed) ) { 477 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 478 unlock( mutex_lock ); 479 return false; 480 } 481 } 482 } 483 484 // if closed handle 485 if ( unlikely(closed) ) { 486 unlock( mutex_lock ); 487 __handle_select_closed_write( this, node ); 488 return true; 489 } 490 491 // handle blocked consumer case via handoff (buffer is implicitly empty) 492 ConsEmpty: if ( !cons`isEmpty ) { 493 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 494 __cons_handoff( chan, elem ); 495 __set_avail_then_unlock( node, mutex_lock ); 496 return true; 497 } 498 499 // insert node in list if buffer is full, work will be completed by someone else 500 if ( count == size ) { 501 #ifdef CHAN_STATS 502 p_blocks++; 503 #endif 504 505 insert_last( prods, node ); 506 unlock( mutex_lock ); 507 return false; 508 } // if 509 510 // otherwise carry out write either via normal insert 511 __buf_insert( chan, elem ); 512 __set_avail_then_unlock( node, mutex_lock ); 513 return true; 514 } 515 static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); } 516 517 static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) { 518 if ( node.extra == 0p ) // check if woken up due to closed channel 519 __closed_insert( chan, elem ); 520 521 // This is only reachable if not closed or closed exception was handled 522 return true; 523 } 524 305 525 } // forall( T ) 526 527
Note:
See TracChangeset
for help on using the changeset viewer.