Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/actor.hfa

    re23169b r99fb52c  
    371371
    372372// this is a static field of executor but have to forward decl for get_next_ticket
    373 static size_t __next_ticket = 0;
    374 
    375 static inline size_t __get_next_ticket( executor & this ) with(this) {
    376     #ifdef __CFA_DEBUG__
    377     size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
     373static unsigned long int __next_ticket = 0;
     374
     375static inline unsigned long int __get_next_ticket( executor & this ) with(this) {
     376    unsigned long int temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
    378377
    379378    // reserve MAX for dead actors
    380     if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
     379    if ( temp == MAX ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
    381380    return temp;
    382     #else
    383     return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues;
    384     #endif
    385381} // tickets
    386382
     
    388384static executor * __actor_executor_ = 0p;
    389385static bool __actor_executor_passed = false;            // was an executor passed to start_actor_system
    390 static size_t __num_actors_ = 0;                                // number of actor objects in system
     386static unsigned long int __num_actors_ = 0;                             // number of actor objects in system
    391387static struct thread$ * __actor_executor_thd = 0p;              // used to wake executor after actors finish
    392388struct actor {
    393     size_t ticket;                              // executor-queue handle
     389    unsigned long int ticket;                           // executor-queue handle
    394390    Allocation allocation_;                                         // allocation action
    395391};
    396392
    397 static inline void ?{}( actor & this ) with(this) {
     393static inline void ?{}( actor & this ) {
    398394    // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
    399395    // member must be called to end it
    400396    verifyf( __actor_executor_, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );
    401     allocation_ = Nodelete;
    402     ticket = __get_next_ticket( *__actor_executor_ );
    403     __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED );
     397    this.allocation_ = Nodelete;
     398    this.ticket = __get_next_ticket( *__actor_executor_ );
     399    __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST );
    404400    #ifdef STATS
    405401    __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST );
     
    422418        }
    423419
    424         if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated
     420        if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_SEQ_CST ) == 0 ) ) { // all actors have terminated
    425421            unpark( __actor_executor_thd );
    426422        }
     
    434430static inline void ?{}( message & this ) { this.allocation_ = Nodelete; }
    435431static inline void ?{}( message & this, Allocation allocation ) {
    436     memcpy( &this.allocation_, &allocation, sizeof(allocation) ); // optimization to elide ctor
     432    this.allocation_ = allocation;
    437433    verifyf( this.allocation_ != Finished, "The Finished Allocation status is not supported for message types.\n");
    438434}
     
    442438
    443439static inline void check_message( message & this ) {
    444     CFA_DEBUG( this.allocation_ = Finished; )
    445     switch ( this.allocation_ ) {                                               // analyze message status
     440    #ifdef __CFA_DEBUG__
     441    Allocation temp = this.allocation_;
     442    this.allocation_ = Finished;
     443    switch ( temp )
     444    #else
     445    switch ( this.allocation_ )
     446    #endif
     447    {                                           // analyze message status
    446448        case Nodelete: break;
    447449        case Delete: delete( &this ); break;
     
    453455
    454456static inline void deliver_request( request & this ) {
    455     this.receiver->allocation_ = this.fn( *this.receiver, *this.msg );
     457    Allocation actor_allocation = this.fn( *this.receiver, *this.msg );
     458    this.receiver->allocation_ = actor_allocation;
    456459    check_actor( *this.receiver );
    457460    check_message( *this.msg );
     
    566569    unsigned int empty_count = 0;
    567570    request & req;
     571    unsigned int curr_idx;
    568572    work_queue * curr_work_queue;
    569573
    570574    Exit:
    571575    for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers
    572         curr_work_queue = request_queues[i + start];
     576        curr_idx = i + start;
     577        curr_work_queue = request_queues[curr_idx];
    573578       
    574579        // check if queue is empty before trying to gulp it
Note: See TracChangeset for help on using the changeset viewer.