Changeset 3982384 for libcfa/src/concurrency/channel.hfa
- Timestamp:
- May 17, 2023, 1:35:09 AM (2 years ago)
- Branches:
- ADT, master
- Children:
- f11010e
- Parents:
- 6e4c44d (diff), 8db4708 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/channel.hfa
r6e4c44d r3982384 1 // 2 // Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo 3 // 4 // The contents of this file are covered under the licence agreement in the 5 // file "LICENCE" distributed with Cforall. 6 // 7 // channel.hfa -- LIBCFATHREAD 8 // Runtime locks that used with the runtime thread system. 9 // 10 // Author : Colby Alexander Parsons 11 // Created On : Thu Jan 21 19:46:50 2022 12 // Last Modified By : 13 // Last Modified On : 14 // Update Count : 15 // 16 1 17 #pragma once 2 18 3 19 #include <locks.hfa> 4 20 #include <list.hfa> 5 #include <mutex_stmt.hfa> 6 7 // link field used for threads waiting on channel 8 struct wait_link { 9 // used to put wait_link on a dl queue 10 inline dlink(wait_link); 11 12 // waiting thread 13 struct thread$ * t; 14 15 // shadow field 16 void * elem; 17 }; 18 P9_EMBEDDED( wait_link, dlink(wait_link) ) 19 20 static inline void ?{}( wait_link & this, thread$ * t, void * elem ) { 21 this.t = t; 22 this.elem = elem; 23 } 24 25 // wake one thread from the list 26 static inline void wake_one( dlist( wait_link ) & queue ) { 27 wait_link & popped = try_pop_front( queue ); 28 unpark( popped.t ); 29 } 21 #include "select.hfa" 30 22 31 23 // returns true if woken due to shutdown 32 24 // blocks thread on list and releases passed lock 33 static inline bool block( dlist( wait_link) & queue, void * elem_ptr, go_mutex & lock ) {34 wait_link w{ active_thread(), elem_ptr };35 insert_last( queue, w);25 static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) { 26 select_node sn{ active_thread(), elem_ptr }; 27 insert_last( queue, sn ); 36 28 unlock( lock ); 37 29 park(); 38 return w.elem == 0p; 30 return sn.extra == 0p; 31 } 32 33 // Waituntil support (un)register_select helper routine 34 // Sets select node avail if not special OR case and then unlocks 35 static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) { 36 if ( node.park_counter ) __make_select_node_available( node ); 37 unlock( mutex_lock ); 39 38 } 40 39 … … 59 58 size_t size, front, back, count; 60 59 T * buffer; 61 dlist( wait_link) prods, cons; // lists of blocked threads62 go_mutex mutex_lock; // MX lock63 bool closed; // indicates channel close/open60 dlist( select_node ) prods, cons; // lists of blocked threads 61 go_mutex mutex_lock; // MX lock 62 bool closed; // indicates channel close/open 64 63 #ifdef CHAN_STATS 65 64 size_t blocks, operations; // counts total ops and ops resulting in a blocked thd … … 70 69 size = _size; 71 70 front = back = count = 0; 72 buffer = aalloc( size );71 if ( size != 0 ) buffer = aalloc( size ); 73 72 prods{}; 74 73 cons{}; … … 87 86 #endif 88 87 verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" ); 89 delete( buffer );90 } 91 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }92 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }88 if ( size != 0 ) delete( buffer ); 89 } 90 static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); } 91 static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); } 93 92 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; } 94 93 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; } … … 102 101 // flush waiting consumers and producers 103 102 while ( has_waiting_consumers( chan ) ) { 104 cons`first.elem = 0p; 103 if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race 104 break; // if __handle_waituntil_OR returns false cons is empty so break 105 cons`first.extra = 0p; 105 106 wake_one( cons ); 106 107 } 107 108 while ( has_waiting_producers( chan ) ) { 108 prods`first.elem = 0p; 109 if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race 110 break; // if __handle_waituntil_OR returns false prods is empty so break 111 prods`first.extra = 0p; 109 112 wake_one( prods ); 110 113 } … … 114 117 static inline void is_closed( channel(T) & chan ) with(chan) { return closed; } 115 118 119 // used to hand an element to a blocked consumer and signal it 120 static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) { 121 memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work 122 wake_one( cons ); 123 } 124 125 // used to hand an element to a blocked producer and signal it 126 static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) { 127 memcpy( (void *)&retval, prods`first.extra, sizeof(T) ); 128 wake_one( prods ); 129 } 130 116 131 static inline void flush( channel(T) & chan, T elem ) with(chan) { 117 132 lock( mutex_lock ); 118 133 while ( count == 0 && !cons`isEmpty ) { 119 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 120 wake_one( cons ); 134 __cons_handoff( chan, elem ); 121 135 } 122 136 unlock( mutex_lock ); … … 125 139 // handles buffer insert 126 140 static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) { 127 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T));141 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) ); 128 142 count += 1; 129 143 back++; … … 131 145 } 132 146 133 // does the buffer insert or hands elem directly to consumer if one is waiting134 static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {135 if ( count == 0 && !cons`isEmpty ) {136 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work137 wake_one( cons );138 } else __buf_insert( chan, elem );139 }140 141 147 // needed to avoid an extra copy in closed case 142 148 static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) { … … 145 151 operations++; 146 152 #endif 153 154 ConsEmpty: if ( !cons`isEmpty ) { 155 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 156 __cons_handoff( chan, elem ); 157 unlock( mutex_lock ); 158 return true; 159 } 160 147 161 if ( count == size ) { unlock( mutex_lock ); return false; } 148 __do_insert( chan, elem ); 162 163 __buf_insert( chan, elem ); 149 164 unlock( mutex_lock ); 150 165 return true; … … 157 172 // handles closed case of insert routine 158 173 static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) { 159 channel_closed except{ &channel_closed_vt, &elem, &chan };174 channel_closed except{ &channel_closed_vt, &elem, &chan }; 160 175 throwResume except; // throw closed resumption 161 176 if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination … … 182 197 } 183 198 184 // have to check for the zero size channel case185 if ( size == 0 &&!cons`isEmpty ) {186 memcpy(cons`first.elem, (void *)&elem, sizeof(T));187 wake_one( cons);188 unlock( mutex_lock ); 189 return true;199 // buffer count must be zero if cons are blocked (also handles zero-size case) 200 ConsEmpty: if ( !cons`isEmpty ) { 201 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 202 __cons_handoff( chan, elem ); 203 unlock( mutex_lock ); 204 return; 190 205 } 191 206 … … 202 217 } // if 203 218 204 if ( count == 0 && !cons`isEmpty ) { 205 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 206 wake_one( cons ); 207 } else __buf_insert( chan, elem ); 208 209 unlock( mutex_lock ); 210 return; 211 } 212 213 // handles buffer remove 214 static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) { 215 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T)); 219 __buf_insert( chan, elem ); 220 unlock( mutex_lock ); 221 } 222 223 // does the buffer remove and potentially does waiting producer work 224 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) { 225 memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) ); 216 226 count -= 1; 217 227 front = (front + 1) % size; 218 }219 220 // does the buffer remove and potentially does waiting producer work221 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {222 __buf_remove( chan, retval );223 228 if (count == size - 1 && !prods`isEmpty ) { 224 __buf_insert( chan, *(T *)prods`first.elem ); // do waiting producer work 229 if ( !__handle_waituntil_OR( prods ) ) return; 230 __buf_insert( chan, *(T *)prods`first.extra ); // do waiting producer work 225 231 wake_one( prods ); 226 232 } … … 233 239 operations++; 234 240 #endif 241 242 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 243 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 244 __prods_handoff( chan, retval ); 245 unlock( mutex_lock ); 246 return true; 247 } 248 235 249 if ( count == 0 ) { unlock( mutex_lock ); return false; } 250 236 251 __do_remove( chan, retval ); 237 252 unlock( mutex_lock ); … … 244 259 static inline [T, bool] try_remove( channel(T) & chan ) { 245 260 T retval; 246 return [ retval, __internal_try_remove( chan, retval ) ]; 247 } 248 249 static inline T try_remove( channel(T) & chan, T elem ) { 261 bool success = __internal_try_remove( chan, retval ); 262 return [ retval, success ]; 263 } 264 265 static inline T try_remove( channel(T) & chan ) { 250 266 T retval; 251 267 __internal_try_remove( chan, retval ); … … 255 271 // handles closed case of insert routine 256 272 static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) { 257 channel_closed except{ &channel_closed_vt, 0p, &chan };273 channel_closed except{ &channel_closed_vt, 0p, &chan }; 258 274 throwResume except; // throw resumption 259 275 if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination … … 279 295 280 296 // have to check for the zero size channel case 281 if ( size == 0 && !prods`isEmpty ) {282 memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));283 wake_one( prods);297 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 298 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 299 __prods_handoff( chan, retval ); 284 300 unlock( mutex_lock ); 285 301 return retval; … … 287 303 288 304 // wait if buffer is empty, work will be completed by someone else 289 if ( count == 0) {305 if ( count == 0 ) { 290 306 #ifdef CHAN_STATS 291 307 blocks++; … … 299 315 // Remove from buffer 300 316 __do_remove( chan, retval ); 301 302 317 unlock( mutex_lock ); 303 318 return retval; 304 319 } 320 321 /////////////////////////////////////////////////////////////////////////////////////////// 322 // The following is support for waituntil (select) statements 323 /////////////////////////////////////////////////////////////////////////////////////////// 324 static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) { 325 // if ( !node`isListed && !node.park_counter ) return false; // handle special OR case C_TODO: try adding this back 326 lock( mutex_lock ); 327 if ( node`isListed ) { // op wasn't performed 328 #ifdef CHAN_STATS 329 operations--; 330 #endif 331 remove( node ); 332 unlock( mutex_lock ); 333 return false; 334 } 335 unlock( mutex_lock ); 336 337 // only return true when not special OR case, not exceptional calse and status is SAT 338 return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT; 339 } 340 341 // type used by select statement to capture a chan read as the selected operation 342 struct chan_read { 343 T & ret; 344 channel(T) & chan; 345 }; 346 347 static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) { 348 &cr.chan = &chan; 349 &cr.ret = &ret; 350 } 351 static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; } 352 353 static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) { 354 __closed_remove( chan, ret ); 355 // if we get here then the insert succeeded 356 __make_select_node_available( node ); 357 } 358 359 static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) { 360 lock( mutex_lock ); 361 node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close 362 363 #ifdef CHAN_STATS 364 if ( !closed ) operations++; 365 #endif 366 367 if ( !node.park_counter ) { 368 // are we special case OR and front of cons is also special case OR 369 if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) { 370 if ( !__make_select_node_pending( node ) ) { 371 unlock( mutex_lock ); 372 return false; 373 } 374 375 if ( __handle_waituntil_OR( prods ) ) { 376 __prods_handoff( chan, ret ); 377 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node 378 unlock( mutex_lock ); 379 return true; 380 } 381 __make_select_node_unsat( node ); 382 } 383 // check if we can complete operation. If so race to establish winner in special OR case 384 if ( count != 0 || !prods`isEmpty || unlikely(closed) ) { 385 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 386 unlock( mutex_lock ); 387 return false; 388 } 389 } 390 } 391 392 if ( unlikely(closed) ) { 393 unlock( mutex_lock ); 394 __handle_select_closed_read( this, node ); 395 return true; 396 } 397 398 // have to check for the zero size channel case 399 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 400 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 401 __prods_handoff( chan, ret ); 402 __set_avail_then_unlock( node, mutex_lock ); 403 return true; 404 } 405 406 // wait if buffer is empty, work will be completed by someone else 407 if ( count == 0 ) { 408 #ifdef CHAN_STATS 409 blocks++; 410 #endif 411 412 insert_last( cons, node ); 413 unlock( mutex_lock ); 414 return false; 415 } 416 417 // Remove from buffer 418 __do_remove( chan, ret ); 419 __set_avail_then_unlock( node, mutex_lock ); 420 return true; 421 } 422 static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); } 423 static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) { 424 if ( node.extra == 0p ) // check if woken up due to closed channel 425 __closed_remove( chan, ret ); 426 // This is only reachable if not closed or closed exception was handled 427 return true; 428 } 429 430 // type used by select statement to capture a chan write as the selected operation 431 struct chan_write { 432 T elem; 433 channel(T) & chan; 434 }; 435 436 static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) { 437 &cw.chan = &chan; 438 memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) ); 439 } 440 static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; } 441 442 static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) { 443 __closed_insert( chan, elem ); 444 // if we get here then the insert succeeded 445 __make_select_node_available( node ); 446 } 447 448 static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) { 449 lock( mutex_lock ); 450 node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close 451 452 #ifdef CHAN_STATS 453 if ( !closed ) operations++; 454 #endif 455 456 // special OR case handling 457 if ( !node.park_counter ) { 458 // are we special case OR and front of cons is also special case OR 459 if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) { 460 if ( !__make_select_node_pending( node ) ) { 461 unlock( mutex_lock ); 462 return false; 463 } 464 465 if ( __handle_waituntil_OR( cons ) ) { 466 __cons_handoff( chan, elem ); 467 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node 468 unlock( mutex_lock ); 469 return true; 470 } 471 __make_select_node_unsat( node ); 472 } 473 // check if we can complete operation. If so race to establish winner in special OR case 474 if ( count != size || !cons`isEmpty || unlikely(closed) ) { 475 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 476 unlock( mutex_lock ); 477 return false; 478 } 479 } 480 } 481 482 // if closed handle 483 if ( unlikely(closed) ) { 484 unlock( mutex_lock ); 485 __handle_select_closed_write( this, node ); 486 return true; 487 } 488 489 // handle blocked consumer case via handoff (buffer is implicitly empty) 490 ConsEmpty: if ( !cons`isEmpty ) { 491 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 492 __cons_handoff( chan, elem ); 493 __set_avail_then_unlock( node, mutex_lock ); 494 return true; 495 } 496 497 // insert node in list if buffer is full, work will be completed by someone else 498 if ( count == size ) { 499 #ifdef CHAN_STATS 500 blocks++; 501 #endif 502 503 insert_last( prods, node ); 504 unlock( mutex_lock ); 505 return false; 506 } // if 507 508 // otherwise carry out write either via normal insert 509 __buf_insert( chan, elem ); 510 __set_avail_then_unlock( node, mutex_lock ); 511 return true; 512 } 513 static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); } 514 515 static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) { 516 if ( node.extra == 0p ) // check if woken up due to closed channel 517 __closed_insert( chan, elem ); 518 519 // This is only reachable if not closed or closed exception was handled 520 return true; 521 } 522 305 523 } // forall( T ) 524 525
Note:
See TracChangeset
for help on using the changeset viewer.