- File:
-
- 1 edited
-
libcfa/src/concurrency/channel.hfa (modified) (18 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/channel.hfa
ra45e21c rbeeff61e 4 4 #include <list.hfa> 5 5 #include <mutex_stmt.hfa> 6 7 // link field used for threads waiting on channel 8 struct wait_link { 9 // used to put wait_link on a dl queue 10 inline dlink(wait_link); 11 12 // waiting thread 13 struct thread$ * t; 14 15 // shadow field 16 void * elem; 17 }; 18 P9_EMBEDDED( wait_link, dlink(wait_link) ) 19 20 static inline void ?{}( wait_link & this, thread$ * t, void * elem ) { 21 this.t = t; 22 this.elem = elem; 23 } 24 25 // wake one thread from the list 26 static inline void wake_one( dlist( wait_link ) & queue ) { 27 wait_link & popped = try_pop_front( queue ); 28 unpark( popped.t ); 29 } 6 #include "select.hfa" 30 7 31 8 // returns true if woken due to shutdown 32 9 // blocks thread on list and releases passed lock 33 static inline bool block( dlist( wait_link) & queue, void * elem_ptr, go_mutex & lock ) {34 wait_link w{ active_thread(), elem_ptr };35 insert_last( queue, w);10 static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) { 11 select_node sn{ active_thread(), elem_ptr }; 12 insert_last( queue, sn ); 36 13 unlock( lock ); 37 14 park(); 38 return w.elem == 0p; 15 return sn.extra == 0p; 16 } 17 18 // Waituntil support (un)register_select helper routine 19 // Sets select node avail if not special OR case and then unlocks 20 static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) { 21 if ( node.park_counter ) __make_select_node_available( node ); 22 unlock( mutex_lock ); 39 23 } 40 24 … … 59 43 size_t size, front, back, count; 60 44 T * buffer; 61 dlist( wait_link) prods, cons; // lists of blocked threads45 dlist( select_node ) prods, cons; // lists of blocked threads 62 46 go_mutex mutex_lock; // MX lock 63 47 bool closed; // indicates channel close/open … … 70 54 size = _size; 71 55 front = back = count = 0; 72 buffer = aalloc( size );56 if ( size != 0 ) buffer = aalloc( size ); 73 57 prods{}; 74 58 cons{}; … … 87 71 #endif 88 72 verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" ); 89 delete( buffer );73 if ( size != 0 ) delete( buffer ); 90 74 } 91 75 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; } … … 102 86 // flush waiting consumers and producers 103 87 while ( has_waiting_consumers( chan ) ) { 104 cons`first.elem = 0p; 88 if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race 89 break; // if __handle_waituntil_OR returns false cons is empty so break 90 cons`first.extra = 0p; 105 91 wake_one( cons ); 106 92 } 107 93 while ( has_waiting_producers( chan ) ) { 108 prods`first.elem = 0p; 94 if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race 95 break; // if __handle_waituntil_OR returns false prods is empty so break 96 prods`first.extra = 0p; 109 97 wake_one( prods ); 110 98 } … … 114 102 static inline void is_closed( channel(T) & chan ) with(chan) { return closed; } 115 103 104 // used to hand an element to a blocked consumer and signal it 105 static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) { 106 memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work 107 wake_one( cons ); 108 } 109 110 // used to hand an element to a blocked producer and signal it 111 static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) { 112 memcpy( (void *)&retval, prods`first.extra, sizeof(T) ); 113 wake_one( prods ); 114 } 115 116 116 static inline void flush( channel(T) & chan, T elem ) with(chan) { 117 117 lock( mutex_lock ); 118 118 while ( count == 0 && !cons`isEmpty ) { 119 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 120 wake_one( cons ); 119 __cons_handoff( chan, elem ); 121 120 } 122 121 unlock( mutex_lock ); … … 125 124 // handles buffer insert 126 125 static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) { 127 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T));126 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) ); 128 127 count += 1; 129 128 back++; … … 131 130 } 132 131 133 // does the buffer insert or hands elem directly to consumer if one is waiting134 static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {135 if ( count == 0 && !cons`isEmpty ) {136 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work137 wake_one( cons );138 } else __buf_insert( chan, elem );139 }140 141 132 // needed to avoid an extra copy in closed case 142 133 static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) { … … 145 136 operations++; 146 137 #endif 138 139 ConsEmpty: if ( !cons`isEmpty ) { 140 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 141 __cons_handoff( chan, elem ); 142 unlock( mutex_lock ); 143 return true; 144 } 145 147 146 if ( count == size ) { unlock( mutex_lock ); return false; } 148 __do_insert( chan, elem ); 147 148 __buf_insert( chan, elem ); 149 149 unlock( mutex_lock ); 150 150 return true; … … 157 157 // handles closed case of insert routine 158 158 static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) { 159 channel_closed except{ &channel_closed_vt, &elem, &chan };159 channel_closed except{ &channel_closed_vt, &elem, &chan }; 160 160 throwResume except; // throw closed resumption 161 161 if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination … … 182 182 } 183 183 184 // have to check for the zero size channel case185 if ( size == 0 &&!cons`isEmpty ) {186 memcpy(cons`first.elem, (void *)&elem, sizeof(T));187 wake_one( cons);188 unlock( mutex_lock ); 189 return true;184 // buffer count must be zero if cons are blocked (also handles zero-size case) 185 ConsEmpty: if ( !cons`isEmpty ) { 186 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 187 __cons_handoff( chan, elem ); 188 unlock( mutex_lock ); 189 return; 190 190 } 191 191 … … 202 202 } // if 203 203 204 if ( count == 0 && !cons`isEmpty ) { 205 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 206 wake_one( cons ); 207 } else __buf_insert( chan, elem ); 208 209 unlock( mutex_lock ); 210 return; 211 } 212 213 // handles buffer remove 214 static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) { 215 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T)); 204 __buf_insert( chan, elem ); 205 unlock( mutex_lock ); 206 } 207 208 // does the buffer remove and potentially does waiting producer work 209 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) { 210 memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) ); 216 211 count -= 1; 217 212 front = (front + 1) % size; 218 }219 220 // does the buffer remove and potentially does waiting producer work221 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {222 __buf_remove( chan, retval );223 213 if (count == size - 1 && !prods`isEmpty ) { 224 __buf_insert( chan, *(T *)prods`first.elem ); // do waiting producer work 214 if ( !__handle_waituntil_OR( prods ) ) return; 215 __buf_insert( chan, *(T *)prods`first.extra ); // do waiting producer work 225 216 wake_one( prods ); 226 217 } … … 233 224 operations++; 234 225 #endif 226 227 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 228 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 229 __prods_handoff( chan, retval ); 230 unlock( mutex_lock ); 231 return true; 232 } 233 235 234 if ( count == 0 ) { unlock( mutex_lock ); return false; } 235 236 236 __do_remove( chan, retval ); 237 237 unlock( mutex_lock ); … … 244 244 static inline [T, bool] try_remove( channel(T) & chan ) { 245 245 T retval; 246 return [ retval, __internal_try_remove( chan, retval ) ]; 247 } 248 249 static inline T try_remove( channel(T) & chan, T elem ) { 246 bool success = __internal_try_remove( chan, retval ); 247 return [ retval, success ]; 248 } 249 250 static inline T try_remove( channel(T) & chan ) { 250 251 T retval; 251 252 __internal_try_remove( chan, retval ); … … 255 256 // handles closed case of insert routine 256 257 static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) { 257 channel_closed except{ &channel_closed_vt, 0p, &chan };258 channel_closed except{ &channel_closed_vt, 0p, &chan }; 258 259 throwResume except; // throw resumption 259 260 if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination … … 279 280 280 281 // have to check for the zero size channel case 281 if ( size == 0 && !prods`isEmpty ) {282 memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));283 wake_one( prods);282 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 283 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 284 __prods_handoff( chan, retval ); 284 285 unlock( mutex_lock ); 285 286 return retval; … … 287 288 288 289 // wait if buffer is empty, work will be completed by someone else 289 if ( count == 0) {290 if ( count == 0 ) { 290 291 #ifdef CHAN_STATS 291 292 blocks++; … … 299 300 // Remove from buffer 300 301 __do_remove( chan, retval ); 301 302 302 unlock( mutex_lock ); 303 303 return retval; 304 304 } 305 306 /////////////////////////////////////////////////////////////////////////////////////////// 307 // The following is support for waituntil (select) statements 308 /////////////////////////////////////////////////////////////////////////////////////////// 309 static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) { 310 if ( !node`isListed && !node.park_counter ) return false; // handle special OR case 311 lock( mutex_lock ); 312 if ( node`isListed ) { // op wasn't performed 313 #ifdef CHAN_STATS 314 operations--; 315 #endif 316 remove( node ); 317 unlock( mutex_lock ); 318 return false; 319 } 320 unlock( mutex_lock ); 321 322 // only return true when not special OR case, not exceptional calse and status is SAT 323 return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT; 324 } 325 326 // type used by select statement to capture a chan read as the selected operation 327 struct chan_read { 328 channel(T) & chan; 329 T & ret; 330 }; 331 332 static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) { 333 &cr.chan = &chan; 334 &cr.ret = &ret; 335 } 336 static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; } 337 338 static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) { 339 __closed_remove( chan, ret ); 340 // if we get here then the insert succeeded 341 __make_select_node_available( node ); 342 } 343 344 static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) { 345 // mutex(sout) sout | "register_read"; 346 lock( mutex_lock ); 347 node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close 348 349 #ifdef CHAN_STATS 350 if ( !closed ) operations++; 351 #endif 352 353 // check if we can complete operation. If so race to establish winner in special OR case 354 if ( !node.park_counter && ( count != 0 || !prods`isEmpty || unlikely(closed) ) ) { 355 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 356 unlock( mutex_lock ); 357 return false; 358 } 359 } 360 361 if ( unlikely(closed) ) { 362 unlock( mutex_lock ); 363 __handle_select_closed_read( this, node ); 364 return true; 365 } 366 367 // have to check for the zero size channel case 368 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 369 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 370 __prods_handoff( chan, ret ); 371 __set_avail_then_unlock( node, mutex_lock ); 372 return true; 373 } 374 375 // wait if buffer is empty, work will be completed by someone else 376 if ( count == 0 ) { 377 #ifdef CHAN_STATS 378 blocks++; 379 #endif 380 381 insert_last( cons, node ); 382 unlock( mutex_lock ); 383 return false; 384 } 385 386 // Remove from buffer 387 __do_remove( chan, ret ); 388 __set_avail_then_unlock( node, mutex_lock ); 389 return true; 390 } 391 static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); } 392 static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) { 393 if ( node.extra == 0p ) // check if woken up due to closed channel 394 __closed_remove( chan, ret ); 395 // This is only reachable if not closed or closed exception was handled 396 return true; 397 } 398 399 // type used by select statement to capture a chan write as the selected operation 400 struct chan_write { 401 channel(T) & chan; 402 T elem; 403 }; 404 405 static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) { 406 &cw.chan = &chan; 407 memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) ); 408 } 409 static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; } 410 411 static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) { 412 __closed_insert( chan, elem ); 413 // if we get here then the insert succeeded 414 __make_select_node_available( node ); 415 } 416 417 static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) { 418 // mutex(sout) sout | "register_write"; 419 lock( mutex_lock ); 420 node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close 421 422 #ifdef CHAN_STATS 423 if ( !closed ) operations++; 424 #endif 425 426 // check if we can complete operation. If so race to establish winner in special OR case 427 if ( !node.park_counter && ( count != size || !cons`isEmpty || unlikely(closed) ) ) { 428 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 429 unlock( mutex_lock ); 430 return false; 431 } 432 } 433 434 // if closed handle 435 if ( unlikely(closed) ) { 436 unlock( mutex_lock ); 437 __handle_select_closed_write( this, node ); 438 return true; 439 } 440 441 // handle blocked consumer case via handoff (buffer is implicitly empty) 442 ConsEmpty: if ( !cons`isEmpty ) { 443 if ( !__handle_waituntil_OR( cons ) ) { 444 // mutex(sout) sout | "empty"; 445 break ConsEmpty; 446 } 447 // mutex(sout) sout | "signal"; 448 __cons_handoff( chan, elem ); 449 __set_avail_then_unlock( node, mutex_lock ); 450 return true; 451 } 452 453 // insert node in list if buffer is full, work will be completed by someone else 454 if ( count == size ) { 455 #ifdef CHAN_STATS 456 blocks++; 457 #endif 458 459 insert_last( prods, node ); 460 unlock( mutex_lock ); 461 return false; 462 } // if 463 464 // otherwise carry out write either via normal insert 465 __buf_insert( chan, elem ); 466 __set_avail_then_unlock( node, mutex_lock ); 467 return true; 468 } 469 static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); } 470 471 static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) { 472 if ( node.extra == 0p ) // check if woken up due to closed channel 473 __closed_insert( chan, elem ); 474 475 // This is only reachable if not closed or closed exception was handled 476 return true; 477 } 478 479 305 480 } // forall( T ) 481 482 483
Note:
See TracChangeset
for help on using the changeset viewer.