- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/actor.hfa
r99fb52c re23169b 371 371 372 372 // this is a static field of executor but have to forward decl for get_next_ticket 373 static unsigned long int __next_ticket = 0; 374 375 static inline unsigned long int __get_next_ticket( executor & this ) with(this) { 376 unsigned long int temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 373 static size_t __next_ticket = 0; 374 375 static inline size_t __get_next_ticket( executor & this ) with(this) { 376 #ifdef __CFA_DEBUG__ 377 size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 377 378 378 379 // reserve MAX for dead actors 379 if ( temp == MAX) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;380 if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 380 381 return temp; 382 #else 383 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues; 384 #endif 381 385 } // tickets 382 386 … … 384 388 static executor * __actor_executor_ = 0p; 385 389 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system 386 static unsigned long int __num_actors_ = 0; // number of actor objects in system390 static size_t __num_actors_ = 0; // number of actor objects in system 387 391 static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish 388 392 struct actor { 389 unsigned long int ticket; // executor-queue handle393 size_t ticket; // executor-queue handle 390 394 Allocation allocation_; // allocation action 391 395 }; 392 396 393 static inline void ?{}( actor & this ) {397 static inline void ?{}( actor & this ) with(this) { 394 398 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive 395 399 // member must be called to end it 396 400 verifyf( __actor_executor_, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" ); 397 this.allocation_ = Nodelete;398 t his.ticket = __get_next_ticket( *__actor_executor_ );399 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_ SEQ_CST);401 allocation_ = Nodelete; 402 ticket = __get_next_ticket( *__actor_executor_ ); 403 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED ); 400 404 #ifdef STATS 401 405 __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST ); … … 418 422 } 419 423 420 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_ SEQ_CST) == 0 ) ) { // all actors have terminated424 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated 421 425 unpark( __actor_executor_thd ); 422 426 } … … 430 434 static inline void ?{}( message & this ) { this.allocation_ = Nodelete; } 431 435 static inline void ?{}( message & this, Allocation allocation ) { 432 this.allocation_ = allocation;436 memcpy( &this.allocation_, &allocation, sizeof(allocation) ); // optimization to elide ctor 433 437 verifyf( this.allocation_ != Finished, "The Finished Allocation status is not supported for message types.\n"); 434 438 } … … 438 442 439 443 static inline void check_message( message & this ) { 440 #ifdef __CFA_DEBUG__ 441 Allocation temp = this.allocation_; 442 this.allocation_ = Finished; 443 switch ( temp ) 444 #else 445 switch ( this.allocation_ ) 446 #endif 447 { // analyze message status 444 CFA_DEBUG( this.allocation_ = Finished; ) 445 switch ( this.allocation_ ) { // analyze message status 448 446 case Nodelete: break; 449 447 case Delete: delete( &this ); break; … … 455 453 456 454 static inline void deliver_request( request & this ) { 457 Allocation actor_allocation = this.fn( *this.receiver, *this.msg ); 458 this.receiver->allocation_ = actor_allocation; 455 this.receiver->allocation_ = this.fn( *this.receiver, *this.msg ); 459 456 check_actor( *this.receiver ); 460 457 check_message( *this.msg ); … … 569 566 unsigned int empty_count = 0; 570 567 request & req; 571 unsigned int curr_idx;572 568 work_queue * curr_work_queue; 573 569 574 570 Exit: 575 571 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers 576 curr_idx = i + start; 577 curr_work_queue = request_queues[curr_idx]; 572 curr_work_queue = request_queues[i + start]; 578 573 579 574 // check if queue is empty before trying to gulp it
Note:
See TracChangeset
for help on using the changeset viewer.