Changeset 2a301ff for libcfa/src/concurrency/channel.hfa
- Timestamp:
- Aug 31, 2023, 11:31:15 PM (2 years ago)
- Branches:
- master
- Children:
- 950c58e
- Parents:
- 92355883 (diff), 686912c (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - File:
-
- 1 edited
-
libcfa/src/concurrency/channel.hfa (modified) (10 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/channel.hfa
r92355883 r2a301ff 68 68 #endif 69 69 }; 70 static inline void ?{}( channel(T) & this, channel(T) this2 ) = void; 71 static inline void ?=?( channel(T) & this, channel(T) this2 ) = void; 70 72 71 73 static inline void ?{}( channel(T) &c, size_t _size ) with(c) { … … 326 328 return retval; 327 329 } 330 static inline void remove( channel(T) & chan ) { T elem = (T)remove( chan ); } 331 332 333 /////////////////////////////////////////////////////////////////////////////////////////// 334 // The following is Go-style operator support for channels 335 /////////////////////////////////////////////////////////////////////////////////////////// 336 337 static inline void ?<<?( channel(T) & chan, T elem ) { insert( chan, elem ); } 338 static inline void ?<<?( T & ret, channel(T) & chan ) { ret = remove( chan ); } 328 339 329 340 /////////////////////////////////////////////////////////////////////////////////////////// … … 340 351 unlock( mutex_lock ); 341 352 342 // only return true when not special OR case , not exceptional calseand status is SAT343 return ( node.extra == 0p || !node.park_counter )? false : *node.clause_status == __SELECT_SAT;353 // only return true when not special OR case and status is SAT 354 return !node.park_counter ? false : *node.clause_status == __SELECT_SAT; 344 355 } 345 356 … … 363 374 // type used by select statement to capture a chan read as the selected operation 364 375 struct chan_read { 365 T &ret;366 channel(T) &chan;376 T * ret; 377 channel(T) * chan; 367 378 }; 368 369 static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) { 370 &cr.chan = &chan; 371 &cr.ret = &ret; 372 } 373 static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; } 374 375 static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) { 376 __closed_remove( chan, ret ); 379 __CFA_SELECT_GET_TYPE( chan_read(T) ); 380 381 static inline void ?{}( chan_read(T) & cr, channel(T) * chan, T * ret ) { 382 cr.chan = chan; 383 cr.ret = ret; 384 } 385 static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ &chan, &ret }; return cr; } 386 387 static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(*this.chan, this) { 388 __closed_remove( *chan, *ret ); 377 389 // if we get here then the insert succeeded 378 390 __make_select_node_available( node ); 379 391 } 380 392 381 static inline bool register_select( chan_read(T) & this, select_node & node ) with( this.chan, this) {382 lock( mutex_lock ); 383 node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close393 static inline bool register_select( chan_read(T) & this, select_node & node ) with(*this.chan, this) { 394 lock( mutex_lock ); 395 node.extra = ret; // set .extra so that if it == 0p later in on_selected it is due to channel close 384 396 385 397 #ifdef CHAN_STATS … … 396 408 397 409 if ( __handle_pending( prods, node ) ) { 398 __prods_handoff( chan,ret );410 __prods_handoff( *chan, *ret ); 399 411 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node 400 412 unlock( mutex_lock ); … … 422 434 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 423 435 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 424 __prods_handoff( chan,ret );436 __prods_handoff( *chan, *ret ); 425 437 __set_avail_then_unlock( node, mutex_lock ); 426 438 return true; … … 439 451 440 452 // Remove from buffer 441 __do_remove( chan,ret );453 __do_remove( *chan, *ret ); 442 454 __set_avail_then_unlock( node, mutex_lock ); 443 455 return true; 444 456 } 445 static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); } 446 static inline void on_selected( chan_read(T) & this, select_node & node ) with(this) { 447 if ( node.extra == 0p ) // check if woken up due to closed channel 448 __closed_remove( chan, ret ); 457 static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); } 458 static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) { 459 if ( unlikely(node.extra == 0p) ) { 460 if ( !exception_in_flight() ) __closed_remove( *chan, *ret ); // check if woken up due to closed channel 461 else return false; 462 } 449 463 // This is only reachable if not closed or closed exception was handled 450 } 464 return true; 465 } 466 467 // type used by select statement to capture a chan read as the selected operation that doesn't have a param to read to 468 struct chan_read_no_ret { 469 T retval; 470 chan_read( T ) c_read; 471 }; 472 __CFA_SELECT_GET_TYPE( chan_read_no_ret(T) ); 473 474 static inline void ?{}( chan_read_no_ret(T) & this, channel(T) & chan ) { 475 this.c_read{ &chan, &this.retval }; 476 } 477 478 static inline chan_read_no_ret(T) remove( channel(T) & chan ) { chan_read_no_ret(T) c_read{ chan }; return c_read; } 479 static inline bool register_select( chan_read_no_ret(T) & this, select_node & node ) { 480 this.c_read.ret = &this.retval; 481 return register_select( this.c_read, node ); 482 } 483 static inline bool unregister_select( chan_read_no_ret(T) & this, select_node & node ) { return unregister_select( this.c_read, node ); } 484 static inline bool on_selected( chan_read_no_ret(T) & this, select_node & node ) { return on_selected( this.c_read, node ); } 451 485 452 486 // type used by select statement to capture a chan write as the selected operation 453 487 struct chan_write { 454 488 T elem; 455 channel(T) &chan;489 channel(T) * chan; 456 490 }; 457 458 static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) { 459 &cw.chan = &chan; 491 __CFA_SELECT_GET_TYPE( chan_write(T) ); 492 493 static inline void ?{}( chan_write(T) & cw, channel(T) * chan, T elem ) { 494 cw.chan = chan; 460 495 memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) ); 461 496 } 462 static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; } 463 464 static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) { 465 __closed_insert( chan, elem ); 497 static inline chan_write(T) ?<<?( channel(T) & chan, T elem ) { chan_write(T) cw{ &chan, elem }; return cw; } 498 static inline chan_write(T) insert( T elem, channel(T) & chan) { chan_write(T) cw{ &chan, elem }; return cw; } 499 500 static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(*this.chan, this) { 501 __closed_insert( *chan, elem ); 466 502 // if we get here then the insert succeeded 467 503 __make_select_node_available( node ); 468 504 } 469 505 470 static inline bool register_select( chan_write(T) & this, select_node & node ) with( this.chan, this) {506 static inline bool register_select( chan_write(T) & this, select_node & node ) with(*this.chan, this) { 471 507 lock( mutex_lock ); 472 508 node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close … … 486 522 487 523 if ( __handle_pending( cons, node ) ) { 488 __cons_handoff( chan, elem );524 __cons_handoff( *chan, elem ); 489 525 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node 490 526 unlock( mutex_lock ); … … 513 549 ConsEmpty: if ( !cons`isEmpty ) { 514 550 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 515 __cons_handoff( chan, elem );551 __cons_handoff( *chan, elem ); 516 552 __set_avail_then_unlock( node, mutex_lock ); 517 553 return true; … … 530 566 531 567 // otherwise carry out write either via normal insert 532 __buf_insert( chan, elem );568 __buf_insert( *chan, elem ); 533 569 __set_avail_then_unlock( node, mutex_lock ); 534 570 return true; 535 571 } 536 static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); } 537 538 static inline void on_selected( chan_write(T) & this, select_node & node ) with(this) { 539 if ( node.extra == 0p ) // check if woken up due to closed channel 540 __closed_insert( chan, elem ); 541 572 static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); } 573 574 static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) { 575 if ( unlikely(node.extra == 0p) ) { 576 if ( !exception_in_flight() ) __closed_insert( *chan, elem ); // check if woken up due to closed channel 577 else return false; 578 } 542 579 // This is only reachable if not closed or closed exception was handled 580 return true; 543 581 } 544 582
Note:
See TracChangeset
for help on using the changeset viewer.