- File:
-
- 1 edited
-
libcfa/src/concurrency/actor.hfa (modified) (7 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/actor.hfa
re23169b r99fb52c 371 371 372 372 // this is a static field of executor but have to forward decl for get_next_ticket 373 static size_t __next_ticket = 0; 374 375 static inline size_t __get_next_ticket( executor & this ) with(this) { 376 #ifdef __CFA_DEBUG__ 377 size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 373 static unsigned long int __next_ticket = 0; 374 375 static inline unsigned long int __get_next_ticket( executor & this ) with(this) { 376 unsigned long int temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 378 377 379 378 // reserve MAX for dead actors 380 if ( unlikely( temp == MAX )) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;379 if ( temp == MAX ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 381 380 return temp; 382 #else383 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues;384 #endif385 381 } // tickets 386 382 … … 388 384 static executor * __actor_executor_ = 0p; 389 385 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system 390 static size_t __num_actors_ = 0; // number of actor objects in system386 static unsigned long int __num_actors_ = 0; // number of actor objects in system 391 387 static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish 392 388 struct actor { 393 size_t ticket; // executor-queue handle389 unsigned long int ticket; // executor-queue handle 394 390 Allocation allocation_; // allocation action 395 391 }; 396 392 397 static inline void ?{}( actor & this ) with(this){393 static inline void ?{}( actor & this ) { 398 394 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive 399 395 // member must be called to end it 400 396 verifyf( __actor_executor_, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" ); 401 allocation_ = Nodelete;402 t icket = __get_next_ticket( *__actor_executor_ );403 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_ RELAXED);397 this.allocation_ = Nodelete; 398 this.ticket = __get_next_ticket( *__actor_executor_ ); 399 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST ); 404 400 #ifdef STATS 405 401 __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST ); … … 422 418 } 423 419 424 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_ RELAXED) == 0 ) ) { // all actors have terminated420 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_SEQ_CST ) == 0 ) ) { // all actors have terminated 425 421 unpark( __actor_executor_thd ); 426 422 } … … 434 430 static inline void ?{}( message & this ) { this.allocation_ = Nodelete; } 435 431 static inline void ?{}( message & this, Allocation allocation ) { 436 memcpy( &this.allocation_, &allocation, sizeof(allocation) ); // optimization to elide ctor432 this.allocation_ = allocation; 437 433 verifyf( this.allocation_ != Finished, "The Finished Allocation status is not supported for message types.\n"); 438 434 } … … 442 438 443 439 static inline void check_message( message & this ) { 444 CFA_DEBUG( this.allocation_ = Finished; ) 445 switch ( this.allocation_ ) { // analyze message status 440 #ifdef __CFA_DEBUG__ 441 Allocation temp = this.allocation_; 442 this.allocation_ = Finished; 443 switch ( temp ) 444 #else 445 switch ( this.allocation_ ) 446 #endif 447 { // analyze message status 446 448 case Nodelete: break; 447 449 case Delete: delete( &this ); break; … … 453 455 454 456 static inline void deliver_request( request & this ) { 455 this.receiver->allocation_ = this.fn( *this.receiver, *this.msg ); 457 Allocation actor_allocation = this.fn( *this.receiver, *this.msg ); 458 this.receiver->allocation_ = actor_allocation; 456 459 check_actor( *this.receiver ); 457 460 check_message( *this.msg ); … … 566 569 unsigned int empty_count = 0; 567 570 request & req; 571 unsigned int curr_idx; 568 572 work_queue * curr_work_queue; 569 573 570 574 Exit: 571 575 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers 572 curr_work_queue = request_queues[i + start]; 576 curr_idx = i + start; 577 curr_work_queue = request_queues[curr_idx]; 573 578 574 579 // check if queue is empty before trying to gulp it
Note:
See TracChangeset
for help on using the changeset viewer.