- File:
-
- 1 edited
-
libcfa/src/concurrency/channel.hfa (modified) (18 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/channel.hfa
rbeeff61e ra45e21c 4 4 #include <list.hfa> 5 5 #include <mutex_stmt.hfa> 6 #include "select.hfa" 6 7 // link field used for threads waiting on channel 8 struct wait_link { 9 // used to put wait_link on a dl queue 10 inline dlink(wait_link); 11 12 // waiting thread 13 struct thread$ * t; 14 15 // shadow field 16 void * elem; 17 }; 18 P9_EMBEDDED( wait_link, dlink(wait_link) ) 19 20 static inline void ?{}( wait_link & this, thread$ * t, void * elem ) { 21 this.t = t; 22 this.elem = elem; 23 } 24 25 // wake one thread from the list 26 static inline void wake_one( dlist( wait_link ) & queue ) { 27 wait_link & popped = try_pop_front( queue ); 28 unpark( popped.t ); 29 } 7 30 8 31 // returns true if woken due to shutdown 9 32 // blocks thread on list and releases passed lock 10 static inline bool block( dlist( select_node) & queue, void * elem_ptr, go_mutex & lock ) {11 select_node sn{ active_thread(), elem_ptr };12 insert_last( queue, sn);33 static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) { 34 wait_link w{ active_thread(), elem_ptr }; 35 insert_last( queue, w ); 13 36 unlock( lock ); 14 37 park(); 15 return sn.extra == 0p; 16 } 17 18 // Waituntil support (un)register_select helper routine 19 // Sets select node avail if not special OR case and then unlocks 20 static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) { 21 if ( node.park_counter ) __make_select_node_available( node ); 22 unlock( mutex_lock ); 38 return w.elem == 0p; 23 39 } 24 40 … … 43 59 size_t size, front, back, count; 44 60 T * buffer; 45 dlist( select_node) prods, cons; // lists of blocked threads61 dlist( wait_link ) prods, cons; // lists of blocked threads 46 62 go_mutex mutex_lock; // MX lock 47 63 bool closed; // indicates channel close/open … … 54 70 size = _size; 55 71 front = back = count = 0; 56 if ( size != 0 )buffer = aalloc( size );72 buffer = aalloc( size ); 57 73 prods{}; 58 74 cons{}; … … 71 87 #endif 72 88 verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" ); 73 if ( size != 0 )delete( buffer );89 delete( buffer ); 74 90 } 75 91 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; } … … 86 102 // flush waiting consumers and producers 87 103 while ( has_waiting_consumers( chan ) ) { 88 if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race 89 break; // if __handle_waituntil_OR returns false cons is empty so break 90 cons`first.extra = 0p; 104 cons`first.elem = 0p; 91 105 wake_one( cons ); 92 106 } 93 107 while ( has_waiting_producers( chan ) ) { 94 if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race 95 break; // if __handle_waituntil_OR returns false prods is empty so break 96 prods`first.extra = 0p; 108 prods`first.elem = 0p; 97 109 wake_one( prods ); 98 110 } … … 102 114 static inline void is_closed( channel(T) & chan ) with(chan) { return closed; } 103 115 104 // used to hand an element to a blocked consumer and signal it105 static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {106 memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work107 wake_one( cons );108 }109 110 // used to hand an element to a blocked producer and signal it111 static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {112 memcpy( (void *)&retval, prods`first.extra, sizeof(T) );113 wake_one( prods );114 }115 116 116 static inline void flush( channel(T) & chan, T elem ) with(chan) { 117 117 lock( mutex_lock ); 118 118 while ( count == 0 && !cons`isEmpty ) { 119 __cons_handoff( chan, elem ); 119 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 120 wake_one( cons ); 120 121 } 121 122 unlock( mutex_lock ); … … 124 125 // handles buffer insert 125 126 static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) { 126 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T));127 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T)); 127 128 count += 1; 128 129 back++; … … 130 131 } 131 132 133 // does the buffer insert or hands elem directly to consumer if one is waiting 134 static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) { 135 if ( count == 0 && !cons`isEmpty ) { 136 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 137 wake_one( cons ); 138 } else __buf_insert( chan, elem ); 139 } 140 132 141 // needed to avoid an extra copy in closed case 133 142 static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) { … … 136 145 operations++; 137 146 #endif 138 139 ConsEmpty: if ( !cons`isEmpty ) {140 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;141 __cons_handoff( chan, elem );142 unlock( mutex_lock );143 return true;144 }145 146 147 if ( count == size ) { unlock( mutex_lock ); return false; } 147 148 __buf_insert( chan, elem ); 148 __do_insert( chan, elem ); 149 149 unlock( mutex_lock ); 150 150 return true; … … 157 157 // handles closed case of insert routine 158 158 static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) { 159 channel_closed except{ &channel_closed_vt, &elem, &chan };159 channel_closed except{&channel_closed_vt, &elem, &chan }; 160 160 throwResume except; // throw closed resumption 161 161 if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination … … 182 182 } 183 183 184 // buffer count must be zero if cons are blocked (also handles zero-size case)185 ConsEmpty: if (!cons`isEmpty ) {186 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;187 __cons_handoff( chan, elem);184 // have to check for the zero size channel case 185 if ( size == 0 && !cons`isEmpty ) { 186 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); 187 wake_one( cons ); 188 188 unlock( mutex_lock ); 189 return ;189 return true; 190 190 } 191 191 … … 202 202 } // if 203 203 204 __buf_insert( chan, elem ); 205 unlock( mutex_lock ); 204 if ( count == 0 && !cons`isEmpty ) { 205 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 206 wake_one( cons ); 207 } else __buf_insert( chan, elem ); 208 209 unlock( mutex_lock ); 210 return; 211 } 212 213 // handles buffer remove 214 static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) { 215 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T)); 216 count -= 1; 217 front = (front + 1) % size; 206 218 } 207 219 208 220 // does the buffer remove and potentially does waiting producer work 209 221 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) { 210 memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) ); 211 count -= 1; 212 front = (front + 1) % size; 222 __buf_remove( chan, retval ); 213 223 if (count == size - 1 && !prods`isEmpty ) { 214 if ( !__handle_waituntil_OR( prods ) ) return; 215 __buf_insert( chan, *(T *)prods`first.extra ); // do waiting producer work 224 __buf_insert( chan, *(T *)prods`first.elem ); // do waiting producer work 216 225 wake_one( prods ); 217 226 } … … 224 233 operations++; 225 234 #endif 226 227 ZeroSize: if ( size == 0 && !prods`isEmpty ) {228 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;229 __prods_handoff( chan, retval );230 unlock( mutex_lock );231 return true;232 }233 234 235 if ( count == 0 ) { unlock( mutex_lock ); return false; } 235 236 236 __do_remove( chan, retval ); 237 237 unlock( mutex_lock ); … … 244 244 static inline [T, bool] try_remove( channel(T) & chan ) { 245 245 T retval; 246 bool success = __internal_try_remove( chan, retval ); 247 return [ retval, success ]; 248 } 249 250 static inline T try_remove( channel(T) & chan ) { 246 return [ retval, __internal_try_remove( chan, retval ) ]; 247 } 248 249 static inline T try_remove( channel(T) & chan, T elem ) { 251 250 T retval; 252 251 __internal_try_remove( chan, retval ); … … 256 255 // handles closed case of insert routine 257 256 static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) { 258 channel_closed except{ &channel_closed_vt, 0p, &chan };257 channel_closed except{&channel_closed_vt, 0p, &chan }; 259 258 throwResume except; // throw resumption 260 259 if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination … … 280 279 281 280 // have to check for the zero size channel case 282 ZeroSize:if ( size == 0 && !prods`isEmpty ) {283 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;284 __prods_handoff( chan, retval);281 if ( size == 0 && !prods`isEmpty ) { 282 memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T)); 283 wake_one( prods ); 285 284 unlock( mutex_lock ); 286 285 return retval; … … 288 287 289 288 // wait if buffer is empty, work will be completed by someone else 290 if ( count == 0) {289 if (count == 0) { 291 290 #ifdef CHAN_STATS 292 291 blocks++; … … 300 299 // Remove from buffer 301 300 __do_remove( chan, retval ); 301 302 302 unlock( mutex_lock ); 303 303 return retval; 304 304 } 305 306 ///////////////////////////////////////////////////////////////////////////////////////////307 // The following is support for waituntil (select) statements308 ///////////////////////////////////////////////////////////////////////////////////////////309 static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {310 if ( !node`isListed && !node.park_counter ) return false; // handle special OR case311 lock( mutex_lock );312 if ( node`isListed ) { // op wasn't performed313 #ifdef CHAN_STATS314 operations--;315 #endif316 remove( node );317 unlock( mutex_lock );318 return false;319 }320 unlock( mutex_lock );321 322 // only return true when not special OR case, not exceptional calse and status is SAT323 return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT;324 }325 326 // type used by select statement to capture a chan read as the selected operation327 struct chan_read {328 channel(T) & chan;329 T & ret;330 };331 332 static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) {333 &cr.chan = &chan;334 &cr.ret = &ret;335 }336 static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; }337 338 static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) {339 __closed_remove( chan, ret );340 // if we get here then the insert succeeded341 __make_select_node_available( node );342 }343 344 static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) {345 // mutex(sout) sout | "register_read";346 lock( mutex_lock );347 node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close348 349 #ifdef CHAN_STATS350 if ( !closed ) operations++;351 #endif352 353 // check if we can complete operation. If so race to establish winner in special OR case354 if ( !node.park_counter && ( count != 0 || !prods`isEmpty || unlikely(closed) ) ) {355 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering356 unlock( mutex_lock );357 return false;358 }359 }360 361 if ( unlikely(closed) ) {362 unlock( mutex_lock );363 __handle_select_closed_read( this, node );364 return true;365 }366 367 // have to check for the zero size channel case368 ZeroSize: if ( size == 0 && !prods`isEmpty ) {369 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;370 __prods_handoff( chan, ret );371 __set_avail_then_unlock( node, mutex_lock );372 return true;373 }374 375 // wait if buffer is empty, work will be completed by someone else376 if ( count == 0 ) {377 #ifdef CHAN_STATS378 blocks++;379 #endif380 381 insert_last( cons, node );382 unlock( mutex_lock );383 return false;384 }385 386 // Remove from buffer387 __do_remove( chan, ret );388 __set_avail_then_unlock( node, mutex_lock );389 return true;390 }391 static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }392 static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {393 if ( node.extra == 0p ) // check if woken up due to closed channel394 __closed_remove( chan, ret );395 // This is only reachable if not closed or closed exception was handled396 return true;397 }398 399 // type used by select statement to capture a chan write as the selected operation400 struct chan_write {401 channel(T) & chan;402 T elem;403 };404 405 static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) {406 &cw.chan = &chan;407 memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );408 }409 static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; }410 411 static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) {412 __closed_insert( chan, elem );413 // if we get here then the insert succeeded414 __make_select_node_available( node );415 }416 417 static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) {418 // mutex(sout) sout | "register_write";419 lock( mutex_lock );420 node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close421 422 #ifdef CHAN_STATS423 if ( !closed ) operations++;424 #endif425 426 // check if we can complete operation. If so race to establish winner in special OR case427 if ( !node.park_counter && ( count != size || !cons`isEmpty || unlikely(closed) ) ) {428 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering429 unlock( mutex_lock );430 return false;431 }432 }433 434 // if closed handle435 if ( unlikely(closed) ) {436 unlock( mutex_lock );437 __handle_select_closed_write( this, node );438 return true;439 }440 441 // handle blocked consumer case via handoff (buffer is implicitly empty)442 ConsEmpty: if ( !cons`isEmpty ) {443 if ( !__handle_waituntil_OR( cons ) ) {444 // mutex(sout) sout | "empty";445 break ConsEmpty;446 }447 // mutex(sout) sout | "signal";448 __cons_handoff( chan, elem );449 __set_avail_then_unlock( node, mutex_lock );450 return true;451 }452 453 // insert node in list if buffer is full, work will be completed by someone else454 if ( count == size ) {455 #ifdef CHAN_STATS456 blocks++;457 #endif458 459 insert_last( prods, node );460 unlock( mutex_lock );461 return false;462 } // if463 464 // otherwise carry out write either via normal insert465 __buf_insert( chan, elem );466 __set_avail_then_unlock( node, mutex_lock );467 return true;468 }469 static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }470 471 static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {472 if ( node.extra == 0p ) // check if woken up due to closed channel473 __closed_insert( chan, elem );474 475 // This is only reachable if not closed or closed exception was handled476 return true;477 }478 479 480 305 } // forall( T ) 481 482 483
Note:
See TracChangeset
for help on using the changeset viewer.