- File:
-
- 1 edited
-
libcfa/src/concurrency/channel.hfa (modified) (18 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/channel.hfa
r5908fb4 ra45e21c 1 //2 // Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo3 //4 // The contents of this file are covered under the licence agreement in the5 // file "LICENCE" distributed with Cforall.6 //7 // channel.hfa -- LIBCFATHREAD8 // Runtime locks that used with the runtime thread system.9 //10 // Author : Colby Alexander Parsons11 // Created On : Thu Jan 21 19:46:50 202212 // Last Modified By :13 // Last Modified On :14 // Update Count :15 //16 17 1 #pragma once 18 2 19 3 #include <locks.hfa> 20 4 #include <list.hfa> 21 #include "select.hfa" 5 #include <mutex_stmt.hfa> 6 7 // link field used for threads waiting on channel 8 struct wait_link { 9 // used to put wait_link on a dl queue 10 inline dlink(wait_link); 11 12 // waiting thread 13 struct thread$ * t; 14 15 // shadow field 16 void * elem; 17 }; 18 P9_EMBEDDED( wait_link, dlink(wait_link) ) 19 20 static inline void ?{}( wait_link & this, thread$ * t, void * elem ) { 21 this.t = t; 22 this.elem = elem; 23 } 24 25 // wake one thread from the list 26 static inline void wake_one( dlist( wait_link ) & queue ) { 27 wait_link & popped = try_pop_front( queue ); 28 unpark( popped.t ); 29 } 22 30 23 31 // returns true if woken due to shutdown 24 32 // blocks thread on list and releases passed lock 25 static inline bool block( dlist( select_node) & queue, void * elem_ptr, go_mutex & lock ) {26 select_node sn{ active_thread(), elem_ptr };27 insert_last( queue, sn);33 static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) { 34 wait_link w{ active_thread(), elem_ptr }; 35 insert_last( queue, w ); 28 36 unlock( lock ); 29 37 park(); 30 return sn.extra == 0p; 31 } 32 33 // Waituntil support (un)register_select helper routine 34 // Sets select node avail if not special OR case and then unlocks 35 static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) { 36 if ( node.park_counter ) __make_select_node_available( node ); 37 unlock( mutex_lock ); 38 return w.elem == 0p; 38 39 } 39 40 … … 58 59 size_t size, front, back, count; 59 60 T * buffer; 60 dlist( select_node) prods, cons; // lists of blocked threads61 go_mutex mutex_lock; // MX lock62 bool closed; // indicates channel close/open61 dlist( wait_link ) prods, cons; // lists of blocked threads 62 go_mutex mutex_lock; // MX lock 63 bool closed; // indicates channel close/open 63 64 #ifdef CHAN_STATS 64 65 size_t blocks, operations; // counts total ops and ops resulting in a blocked thd … … 69 70 size = _size; 70 71 front = back = count = 0; 71 if ( size != 0 )buffer = aalloc( size );72 buffer = aalloc( size ); 72 73 prods{}; 73 74 cons{}; … … 86 87 #endif 87 88 verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" ); 88 if ( size != 0 )delete( buffer );89 } 90 static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }91 static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }89 delete( buffer ); 90 } 91 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; } 92 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; } 92 93 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; } 93 94 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; } … … 101 102 // flush waiting consumers and producers 102 103 while ( has_waiting_consumers( chan ) ) { 103 if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race 104 break; // if __handle_waituntil_OR returns false cons is empty so break 105 cons`first.extra = 0p; 104 cons`first.elem = 0p; 106 105 wake_one( cons ); 107 106 } 108 107 while ( has_waiting_producers( chan ) ) { 109 if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race 110 break; // if __handle_waituntil_OR returns false prods is empty so break 111 prods`first.extra = 0p; 108 prods`first.elem = 0p; 112 109 wake_one( prods ); 113 110 } … … 117 114 static inline void is_closed( channel(T) & chan ) with(chan) { return closed; } 118 115 119 // used to hand an element to a blocked consumer and signal it120 static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {121 memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work122 wake_one( cons );123 }124 125 // used to hand an element to a blocked producer and signal it126 static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {127 memcpy( (void *)&retval, prods`first.extra, sizeof(T) );128 wake_one( prods );129 }130 131 116 static inline void flush( channel(T) & chan, T elem ) with(chan) { 132 117 lock( mutex_lock ); 133 118 while ( count == 0 && !cons`isEmpty ) { 134 __cons_handoff( chan, elem ); 119 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 120 wake_one( cons ); 135 121 } 136 122 unlock( mutex_lock ); … … 139 125 // handles buffer insert 140 126 static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) { 141 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T));127 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T)); 142 128 count += 1; 143 129 back++; … … 145 131 } 146 132 133 // does the buffer insert or hands elem directly to consumer if one is waiting 134 static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) { 135 if ( count == 0 && !cons`isEmpty ) { 136 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 137 wake_one( cons ); 138 } else __buf_insert( chan, elem ); 139 } 140 147 141 // needed to avoid an extra copy in closed case 148 142 static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) { … … 151 145 operations++; 152 146 #endif 153 154 ConsEmpty: if ( !cons`isEmpty ) {155 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;156 __cons_handoff( chan, elem );157 unlock( mutex_lock );158 return true;159 }160 161 147 if ( count == size ) { unlock( mutex_lock ); return false; } 162 163 __buf_insert( chan, elem ); 148 __do_insert( chan, elem ); 164 149 unlock( mutex_lock ); 165 150 return true; … … 172 157 // handles closed case of insert routine 173 158 static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) { 174 channel_closed except{ &channel_closed_vt, &elem, &chan };159 channel_closed except{&channel_closed_vt, &elem, &chan }; 175 160 throwResume except; // throw closed resumption 176 161 if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination … … 197 182 } 198 183 199 // buffer count must be zero if cons are blocked (also handles zero-size case)200 ConsEmpty: if (!cons`isEmpty ) {201 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;202 __cons_handoff( chan, elem);184 // have to check for the zero size channel case 185 if ( size == 0 && !cons`isEmpty ) { 186 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); 187 wake_one( cons ); 203 188 unlock( mutex_lock ); 204 return ;189 return true; 205 190 } 206 191 … … 217 202 } // if 218 203 219 __buf_insert( chan, elem ); 220 unlock( mutex_lock ); 204 if ( count == 0 && !cons`isEmpty ) { 205 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 206 wake_one( cons ); 207 } else __buf_insert( chan, elem ); 208 209 unlock( mutex_lock ); 210 return; 211 } 212 213 // handles buffer remove 214 static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) { 215 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T)); 216 count -= 1; 217 front = (front + 1) % size; 221 218 } 222 219 223 220 // does the buffer remove and potentially does waiting producer work 224 221 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) { 225 memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) ); 226 count -= 1; 227 front = (front + 1) % size; 222 __buf_remove( chan, retval ); 228 223 if (count == size - 1 && !prods`isEmpty ) { 229 if ( !__handle_waituntil_OR( prods ) ) return; 230 __buf_insert( chan, *(T *)prods`first.extra ); // do waiting producer work 224 __buf_insert( chan, *(T *)prods`first.elem ); // do waiting producer work 231 225 wake_one( prods ); 232 226 } … … 239 233 operations++; 240 234 #endif 241 242 ZeroSize: if ( size == 0 && !prods`isEmpty ) {243 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;244 __prods_handoff( chan, retval );245 unlock( mutex_lock );246 return true;247 }248 249 235 if ( count == 0 ) { unlock( mutex_lock ); return false; } 250 251 236 __do_remove( chan, retval ); 252 237 unlock( mutex_lock ); … … 259 244 static inline [T, bool] try_remove( channel(T) & chan ) { 260 245 T retval; 261 bool success = __internal_try_remove( chan, retval ); 262 return [ retval, success ]; 263 } 264 265 static inline T try_remove( channel(T) & chan ) { 246 return [ retval, __internal_try_remove( chan, retval ) ]; 247 } 248 249 static inline T try_remove( channel(T) & chan, T elem ) { 266 250 T retval; 267 251 __internal_try_remove( chan, retval ); … … 271 255 // handles closed case of insert routine 272 256 static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) { 273 channel_closed except{ &channel_closed_vt, 0p, &chan };257 channel_closed except{&channel_closed_vt, 0p, &chan }; 274 258 throwResume except; // throw resumption 275 259 if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination … … 295 279 296 280 // have to check for the zero size channel case 297 ZeroSize:if ( size == 0 && !prods`isEmpty ) {298 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;299 __prods_handoff( chan, retval);281 if ( size == 0 && !prods`isEmpty ) { 282 memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T)); 283 wake_one( prods ); 300 284 unlock( mutex_lock ); 301 285 return retval; … … 303 287 304 288 // wait if buffer is empty, work will be completed by someone else 305 if ( count == 0) {289 if (count == 0) { 306 290 #ifdef CHAN_STATS 307 291 blocks++; … … 315 299 // Remove from buffer 316 300 __do_remove( chan, retval ); 301 317 302 unlock( mutex_lock ); 318 303 return retval; 319 304 } 320 321 ///////////////////////////////////////////////////////////////////////////////////////////322 // The following is support for waituntil (select) statements323 ///////////////////////////////////////////////////////////////////////////////////////////324 static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {325 // if ( !node`isListed && !node.park_counter ) return false; // handle special OR case C_TODO: try adding this back326 lock( mutex_lock );327 if ( node`isListed ) { // op wasn't performed328 #ifdef CHAN_STATS329 operations--;330 #endif331 remove( node );332 unlock( mutex_lock );333 return false;334 }335 unlock( mutex_lock );336 337 // only return true when not special OR case, not exceptional calse and status is SAT338 return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT;339 }340 341 // type used by select statement to capture a chan read as the selected operation342 struct chan_read {343 T & ret;344 channel(T) & chan;345 };346 347 static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) {348 &cr.chan = &chan;349 &cr.ret = &ret;350 }351 static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; }352 353 static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) {354 __closed_remove( chan, ret );355 // if we get here then the insert succeeded356 __make_select_node_available( node );357 }358 359 static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) {360 lock( mutex_lock );361 node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close362 363 #ifdef CHAN_STATS364 if ( !closed ) operations++;365 #endif366 367 if ( !node.park_counter ) {368 // are we special case OR and front of cons is also special case OR369 if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {370 if ( !__make_select_node_pending( node ) ) {371 unlock( mutex_lock );372 return false;373 }374 375 if ( __handle_waituntil_OR( prods ) ) {376 __prods_handoff( chan, ret );377 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node378 unlock( mutex_lock );379 return true;380 }381 __make_select_node_unsat( node );382 }383 // check if we can complete operation. If so race to establish winner in special OR case384 if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {385 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering386 unlock( mutex_lock );387 return false;388 }389 }390 }391 392 if ( unlikely(closed) ) {393 unlock( mutex_lock );394 __handle_select_closed_read( this, node );395 return true;396 }397 398 // have to check for the zero size channel case399 ZeroSize: if ( size == 0 && !prods`isEmpty ) {400 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;401 __prods_handoff( chan, ret );402 __set_avail_then_unlock( node, mutex_lock );403 return true;404 }405 406 // wait if buffer is empty, work will be completed by someone else407 if ( count == 0 ) {408 #ifdef CHAN_STATS409 blocks++;410 #endif411 412 insert_last( cons, node );413 unlock( mutex_lock );414 return false;415 }416 417 // Remove from buffer418 __do_remove( chan, ret );419 __set_avail_then_unlock( node, mutex_lock );420 return true;421 }422 static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }423 static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {424 if ( node.extra == 0p ) // check if woken up due to closed channel425 __closed_remove( chan, ret );426 // This is only reachable if not closed or closed exception was handled427 return true;428 }429 430 // type used by select statement to capture a chan write as the selected operation431 struct chan_write {432 T elem;433 channel(T) & chan;434 };435 436 static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) {437 &cw.chan = &chan;438 memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );439 }440 static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; }441 442 static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) {443 __closed_insert( chan, elem );444 // if we get here then the insert succeeded445 __make_select_node_available( node );446 }447 448 static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) {449 lock( mutex_lock );450 node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close451 452 #ifdef CHAN_STATS453 if ( !closed ) operations++;454 #endif455 456 // special OR case handling457 if ( !node.park_counter ) {458 // are we special case OR and front of cons is also special case OR459 if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {460 if ( !__make_select_node_pending( node ) ) {461 unlock( mutex_lock );462 return false;463 }464 465 if ( __handle_waituntil_OR( cons ) ) {466 __cons_handoff( chan, elem );467 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node468 unlock( mutex_lock );469 return true;470 }471 __make_select_node_unsat( node );472 }473 // check if we can complete operation. If so race to establish winner in special OR case474 if ( count != size || !cons`isEmpty || unlikely(closed) ) {475 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering476 unlock( mutex_lock );477 return false;478 }479 }480 }481 482 // if closed handle483 if ( unlikely(closed) ) {484 unlock( mutex_lock );485 __handle_select_closed_write( this, node );486 return true;487 }488 489 // handle blocked consumer case via handoff (buffer is implicitly empty)490 ConsEmpty: if ( !cons`isEmpty ) {491 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;492 __cons_handoff( chan, elem );493 __set_avail_then_unlock( node, mutex_lock );494 return true;495 }496 497 // insert node in list if buffer is full, work will be completed by someone else498 if ( count == size ) {499 #ifdef CHAN_STATS500 blocks++;501 #endif502 503 insert_last( prods, node );504 unlock( mutex_lock );505 return false;506 } // if507 508 // otherwise carry out write either via normal insert509 __buf_insert( chan, elem );510 __set_avail_then_unlock( node, mutex_lock );511 return true;512 }513 static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }514 515 static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {516 if ( node.extra == 0p ) // check if woken up due to closed channel517 __closed_insert( chan, elem );518 519 // This is only reachable if not closed or closed exception was handled520 return true;521 }522 523 305 } // forall( T ) 524 525
Note:
See TracChangeset
for help on using the changeset viewer.