Changeset e23169b for libcfa/src/concurrency
- Timestamp:
- Mar 7, 2023, 3:10:35 PM (21 months ago)
- Branches:
- ADT, ast-experimental, master
- Children:
- 9155026
- Parents:
- ab81e3b
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/actor.hfa
rab81e3b re23169b 371 371 372 372 // this is a static field of executor but have to forward decl for get_next_ticket 373 static unsigned long int __next_ticket = 0; 374 375 static inline unsigned long int __get_next_ticket( executor & this ) with(this) { 376 unsigned long int temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 373 static size_t __next_ticket = 0; 374 375 static inline size_t __get_next_ticket( executor & this ) with(this) { 376 #ifdef __CFA_DEBUG__ 377 size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 377 378 378 379 // reserve MAX for dead actors 379 if ( temp == MAX) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;380 if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 380 381 return temp; 382 #else 383 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues; 384 #endif 381 385 } // tickets 382 386 … … 384 388 static executor * __actor_executor_ = 0p; 385 389 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system 386 static unsigned long int __num_actors_ = 0; // number of actor objects in system390 static size_t __num_actors_ = 0; // number of actor objects in system 387 391 static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish 388 392 struct actor { 389 unsigned long int ticket; // executor-queue handle393 size_t ticket; // executor-queue handle 390 394 Allocation allocation_; // allocation action 391 395 }; 392 396 393 static inline void ?{}( actor & this ) {397 static inline void ?{}( actor & this ) with(this) { 394 398 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive 395 399 // member must be called to end it 396 400 verifyf( __actor_executor_, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" ); 397 this.allocation_ = Nodelete;398 t his.ticket = __get_next_ticket( *__actor_executor_ );399 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_ SEQ_CST);401 allocation_ = Nodelete; 402 ticket = __get_next_ticket( *__actor_executor_ ); 403 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED ); 400 404 #ifdef STATS 401 405 __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST ); … … 418 422 } 419 423 420 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_ SEQ_CST) == 0 ) ) { // all actors have terminated424 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated 421 425 unpark( __actor_executor_thd ); 422 426 } … … 430 434 static inline void ?{}( message & this ) { this.allocation_ = Nodelete; } 431 435 static inline void ?{}( message & this, Allocation allocation ) { 432 this.allocation_ = allocation;436 memcpy( &this.allocation_, &allocation, sizeof(allocation) ); // optimization to elide ctor 433 437 verifyf( this.allocation_ != Finished, "The Finished Allocation status is not supported for message types.\n"); 434 438 } … … 449 453 450 454 static inline void deliver_request( request & this ) { 451 Allocation actor_allocation = this.fn( *this.receiver, *this.msg ); 452 this.receiver->allocation_ = actor_allocation; 455 this.receiver->allocation_ = this.fn( *this.receiver, *this.msg ); 453 456 check_actor( *this.receiver ); 454 457 check_message( *this.msg ); … … 563 566 unsigned int empty_count = 0; 564 567 request & req; 565 unsigned int curr_idx;566 568 work_queue * curr_work_queue; 567 569 568 570 Exit: 569 571 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers 570 curr_idx = i + start; 571 curr_work_queue = request_queues[curr_idx]; 572 curr_work_queue = request_queues[i + start]; 572 573 573 574 // check if queue is empty before trying to gulp it
Note: See TracChangeset
for help on using the changeset viewer.