Changeset 8a930c03 for libcfa/src/concurrency
- Timestamp:
- Jun 12, 2023, 12:05:58 PM (3 years ago)
- Branches:
- master
- Children:
- fec8bd1
- Parents:
- 2b78949 (diff), 38e266ca (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - Location:
- libcfa/src/concurrency
- Files:
-
- 1 added
- 7 edited
-
actor.hfa (modified) (9 diffs)
-
atomic.hfa (added)
-
channel.hfa (modified) (8 diffs)
-
future.hfa (modified) (2 diffs)
-
locks.cfa (modified) (1 diff)
-
locks.hfa (modified) (4 diffs)
-
select.cfa (modified) (1 diff)
-
select.hfa (modified) (4 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/actor.hfa
r2b78949 r8a930c03 13 13 #endif // CFA_DEBUG 14 14 15 #define DEBUG_ABORT( cond, string ) CFA_DEBUG( if ( cond ) abort( string ) ) 16 15 17 // Define the default number of processors created in the executor. Must be greater than 0. 16 18 #define __DEFAULT_EXECUTOR_PROCESSORS__ 2 … … 42 44 struct executor; 43 45 44 enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status45 46 typedef Allocation (*__receive_fn)(actor &, message &);46 enum allocation { Nodelete, Delete, Destroy, Finished }; // allocation status 47 48 typedef allocation (*__receive_fn)(actor &, message &); 47 49 struct request { 48 50 actor * receiver; … … 393 395 struct actor { 394 396 size_t ticket; // executor-queue handle 395 Allocation allocation_; // allocation action397 allocation allocation_; // allocation action 396 398 inline virtual_dtor; 397 399 }; … … 400 402 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive 401 403 // member must be called to end it 402 verifyf( __actor_executor_, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );404 DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" ); 403 405 allocation_ = Nodelete; 404 406 ticket = __get_next_ticket( *__actor_executor_ ); … … 430 432 431 433 struct message { 432 Allocation allocation_; // allocation action434 allocation allocation_; // allocation action 433 435 inline virtual_dtor; 434 436 }; … … 437 439 this.allocation_ = Nodelete; 438 440 } 439 static inline void ?{}( message & this, Allocation allocation) {440 memcpy( &this.allocation_, &alloc ation, sizeof(allocation) ); // optimization to elide ctor441 verifyf( this.allocation_ != Finished, "The Finished Allocation status is not supported for message types.\n");441 static inline void ?{}( message & this, allocation alloc ) { 442 memcpy( &this.allocation_, &alloc, sizeof(allocation) ); // optimization to elide ctor 443 DEBUG_ABORT( this.allocation_ == Finished, "The Finished allocation status is not supported for message types.\n" ); 442 444 } 443 445 static inline void ^?{}( message & this ) with(this) { … … 453 455 } // switch 454 456 } 455 static inline void set_allocation( message & this, Allocation state ) {457 static inline void set_allocation( message & this, allocation state ) { 456 458 this.allocation_ = state; 457 459 } 458 460 459 461 static inline void deliver_request( request & this ) { 462 DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 460 463 this.receiver->allocation_ = this.fn( *this.receiver, *this.msg ); 461 464 check_message( *this.msg ); … … 631 634 632 635 static inline void send( actor & this, request & req ) { 633 verifyf( this.ticket != (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );636 DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 634 637 send( *__actor_executor_, req, this.ticket ); 635 638 } … … 680 683 // assigned at creation to __base_msg_finished to avoid unused message warning 681 684 message __base_msg_finished @= { .allocation_ : Finished }; 682 struct __ DeleteMsg { inline message; } DeleteMsg = __base_msg_finished;683 struct __ DestroyMsg { inline message; } DestroyMsg = __base_msg_finished;684 struct __ FinishedMsg { inline message; } FinishedMsg = __base_msg_finished;685 686 Allocation receive( actor & this, __DeleteMsg& msg ) { return Delete; }687 Allocation receive( actor & this, __DestroyMsg& msg ) { return Destroy; }688 Allocation receive( actor & this, __FinishedMsg& msg ) { return Finished; }689 685 struct __delete_msg_t { inline message; } delete_msg = __base_msg_finished; 686 struct __destroy_msg_t { inline message; } destroy_msg = __base_msg_finished; 687 struct __finished_msg_t { inline message; } finished_msg = __base_msg_finished; 688 689 allocation receive( actor & this, __delete_msg_t & msg ) { return Delete; } 690 allocation receive( actor & this, __destroy_msg_t & msg ) { return Destroy; } 691 allocation receive( actor & this, __finished_msg_t & msg ) { return Finished; } 692 -
libcfa/src/concurrency/channel.hfa
r2b78949 r8a930c03 51 51 vtable(channel_closed) channel_closed_vt; 52 52 53 static inline bool is_insert( channel_closed & e ) { return e.elem != 0p; } 54 static inline bool is_remove( channel_closed & e ) { return e.elem == 0p; } 55 53 56 // #define CHAN_STATS // define this to get channel stats printed in dtor 54 57 … … 341 344 } 342 345 346 // special case of __handle_waituntil_OR, that does some work to avoid starvation/deadlock case 347 static inline bool __handle_pending( dlist( select_node ) & queue, select_node & mine ) { 348 while ( !queue`isEmpty ) { 349 // if node not a special OR case or if we win the special OR case race break 350 if ( !queue`first.clause_status || queue`first.park_counter || __pending_set_other( queue`first, mine, ((unsigned long int)(&(queue`first))) ) ) 351 return true; 352 353 // our node lost the race when toggling in __pending_set_other 354 if ( *mine.clause_status != __SELECT_PENDING ) 355 return false; 356 357 // otherwise we lost the special OR race so discard node 358 try_pop_front( queue ); 359 } 360 return false; 361 } 362 343 363 // type used by select statement to capture a chan read as the selected operation 344 364 struct chan_read { … … 374 394 return false; 375 395 } 376 377 if ( __handle_ waituntil_OR( prods) ) {396 397 if ( __handle_pending( prods, node ) ) { 378 398 __prods_handoff( chan, ret ); 379 399 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node … … 381 401 return true; 382 402 } 383 __make_select_node_unsat( node ); 403 if ( *node.clause_status == __SELECT_PENDING ) 404 __make_select_node_unsat( node ); 384 405 } 385 406 // check if we can complete operation. If so race to establish winner in special OR case … … 423 444 } 424 445 static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); } 425 static inline boolon_selected( chan_read(T) & this, select_node & node ) with(this) {446 static inline void on_selected( chan_read(T) & this, select_node & node ) with(this) { 426 447 if ( node.extra == 0p ) // check if woken up due to closed channel 427 448 __closed_remove( chan, ret ); 428 449 // This is only reachable if not closed or closed exception was handled 429 return true;430 450 } 431 451 … … 464 484 return false; 465 485 } 466 467 if ( __handle_ waituntil_OR( cons) ) {486 487 if ( __handle_pending( cons, node ) ) { 468 488 __cons_handoff( chan, elem ); 469 489 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node … … 471 491 return true; 472 492 } 473 __make_select_node_unsat( node ); 493 if ( *node.clause_status == __SELECT_PENDING ) 494 __make_select_node_unsat( node ); 474 495 } 475 496 // check if we can complete operation. If so race to establish winner in special OR case … … 515 536 static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); } 516 537 517 static inline boolon_selected( chan_write(T) & this, select_node & node ) with(this) {538 static inline void on_selected( chan_write(T) & this, select_node & node ) with(this) { 518 539 if ( node.extra == 0p ) // check if woken up due to closed channel 519 540 __closed_insert( chan, elem ); 520 541 521 542 // This is only reachable if not closed or closed exception was handled 522 return true;523 543 } 524 544 -
libcfa/src/concurrency/future.hfa
r2b78949 r8a930c03 70 70 // check if the future is available 71 71 // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected 72 bool available( future(T) & this ) { return this.state; }72 bool available( future(T) & this ) { return __atomic_load_n( &this.state, __ATOMIC_RELAXED ); } 73 73 74 74 … … 180 180 } 181 181 182 bool on_selected( future(T) & this, select_node & node ) { return true;}182 void on_selected( future(T) & this, select_node & node ) {} 183 183 } 184 184 } 185 185 186 186 //-------------------------------------------------------------------------------------------------------- 187 // These futures below do not support select statements so they may not be as usefulas 'future'187 // These futures below do not support select statements so they may not have as many features as 'future' 188 188 // however the 'single_future' is cheap and cheerful and is most likely more performant than 'future' 189 189 // since it uses raw atomics and no locks -
libcfa/src/concurrency/locks.cfa
r2b78949 r8a930c03 239 239 } 240 240 241 bool on_selected( blocking_lock & this, select_node & node ) { return true;}241 void on_selected( blocking_lock & this, select_node & node ) {} 242 242 243 243 //----------------------------------------------------------------------------- -
libcfa/src/concurrency/locks.hfa
r2b78949 r8a930c03 32 32 #include "select.hfa" 33 33 34 #include <fstream.hfa>35 36 34 // futex headers 37 35 #include <linux/futex.h> /* Definition of FUTEX_* constants */ … … 114 112 static inline bool register_select( single_acquisition_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); } 115 113 static inline bool unregister_select( single_acquisition_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); } 116 static inline bool on_selected( single_acquisition_lock & this, select_node & node ) { returnon_selected( (blocking_lock &)this, node ); }114 static inline void on_selected( single_acquisition_lock & this, select_node & node ) { on_selected( (blocking_lock &)this, node ); } 117 115 118 116 //---------- … … 131 129 static inline bool register_select( owner_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); } 132 130 static inline bool unregister_select( owner_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); } 133 static inline bool on_selected( owner_lock & this, select_node & node ) { returnon_selected( (blocking_lock &)this, node ); }131 static inline void on_selected( owner_lock & this, select_node & node ) { on_selected( (blocking_lock &)this, node ); } 134 132 135 133 //----------------------------------------------------------------------------- … … 621 619 } 622 620 623 static inline bool on_selected( simple_owner_lock & this, select_node & node ) { return true;}621 static inline void on_selected( simple_owner_lock & this, select_node & node ) {} 624 622 625 623 -
libcfa/src/concurrency/select.cfa
r2b78949 r8a930c03 49 49 return false; 50 50 } 51 bool on_selected( select_timeout_node & this, select_node & node ) { return true;}51 void on_selected( select_timeout_node & this, select_node & node ) {} 52 52 53 53 // Gateway routine to wait on duration -
libcfa/src/concurrency/select.hfa
r2b78949 r8a930c03 91 91 // For unregistering a select stmt on a selectable concurrency primitive 92 92 // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN) 93 bool unregister_select( T &, select_node & );93 bool unregister_select( T &, select_node & ); 94 94 95 95 // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node 96 96 // passed as an arg to this routine 97 97 // If on_selected returns false, the statement is not run, if it returns true it is run. 98 boolon_selected( T &, select_node & );98 void on_selected( T &, select_node & ); 99 99 }; 100 100 … … 102 102 // Waituntil Helpers 103 103 //============================================================================================= 104 105 static inline void __make_select_node_unsat( select_node & this ) with( this ) { 106 __atomic_store_n( clause_status, __SELECT_UNSAT, __ATOMIC_SEQ_CST ); 107 } 108 static inline void __make_select_node_sat( select_node & this ) with( this ) { 109 __atomic_store_n( clause_status, __SELECT_SAT, __ATOMIC_SEQ_CST ); 110 } 104 111 105 112 // used for the 2-stage avail needed by the special OR case … … 116 123 } 117 124 118 static inline void __make_select_node_unsat( select_node & this ) with( this ) { 119 __atomic_store_n( clause_status, __SELECT_UNSAT, __ATOMIC_SEQ_CST ); 120 } 121 static inline void __make_select_node_sat( select_node & this ) with( this ) { 122 __atomic_store_n( clause_status, __SELECT_SAT, __ATOMIC_SEQ_CST ); 125 // used for the 2-stage avail by the thread who owns a pending node 126 static inline bool __pending_set_other( select_node & other, select_node & mine, unsigned long int val ) with( other ) { 127 /* paranoid */ verify( park_counter == 0p ); 128 /* paranoid */ verify( clause_status != 0p ); 129 130 unsigned long int cmp_status = __SELECT_UNSAT; 131 while( !__atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) { 132 if ( cmp_status != __SELECT_PENDING ) 133 return false; 134 135 // toggle current status flag to avoid starvation/deadlock 136 __make_select_node_unsat( mine ); 137 cmp_status = __SELECT_UNSAT; 138 if ( !__atomic_compare_exchange_n( mine.clause_status, &cmp_status, __SELECT_PENDING, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) 139 return false; 140 cmp_status = __SELECT_UNSAT; 141 } 142 return true; 123 143 } 124 144 … … 188 208 bool register_select( select_timeout_node & this, select_node & node ); 189 209 bool unregister_select( select_timeout_node & this, select_node & node ); 190 boolon_selected( select_timeout_node & this, select_node & node );210 void on_selected( select_timeout_node & this, select_node & node ); 191 211 192 212 // Gateway routines to waituntil on duration
Note:
See TracChangeset
for help on using the changeset viewer.