Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • TabularUnified libcfa/src/concurrency/actor.hfa

    r70d8e2f2 r0794365  
    3030#define __DEFAULT_EXECUTOR_BUFSIZE__ 10
    3131
    32 #define __STEAL 1 // workstealing toggle. Disjoint from toggles above
     32#define __STEAL 0 // workstealing toggle. Disjoint from toggles above
    3333
    3434// workstealing heuristic selection (only set one to be 1)
     
    4646enum allocation { Nodelete, Delete, Destroy, Finished }; // allocation status
    4747
    48 typedef allocation (*__receive_fn)(actor &, message &, actor **, message **);
     48typedef allocation (*__receive_fn)(actor &, message &);
    4949struct request {
    5050    actor * receiver;
    5151    message * msg;
    5252    __receive_fn fn;
     53    bool stop;
    5354};
    5455
    55 struct a_msg {
    56     int m;
    57 };
    58 static inline void ?{}( request & this ) {}
     56static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel
    5957static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {
    6058    this.receiver = receiver;
    6159    this.msg = msg;
    6260    this.fn = fn;
     61    this.stop = false;
    6362}
    6463static inline void ?{}( request & this, request & copy ) {
     
    6665    this.msg = copy.msg;
    6766    this.fn = copy.fn;
     67    this.stop = copy.stop;
    6868}
    6969
     
    8383    last_size = 0;
    8484}
    85 static inline void ^?{}( copy_queue & this ) with(this) {
    86     DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" );
    87     adelete(buffer);
    88 }
     85static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); }
    8986
    9087static inline void insert( copy_queue & this, request & elem ) with(this) {
     
    120117}
    121118
    122 static inline bool is_empty( copy_queue & this ) with(this) { return count == 0; }
     119static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0; }
    123120
    124121struct work_queue {
     
    181178    volatile unsigned long long stamp;
    182179    #ifdef ACTOR_STATS
    183     size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen;
     180    size_t stolen_from, try_steal, stolen, failed_swaps, msgs_stolen;
    184181    unsigned long long processed;
    185182    size_t gulps;
     
    194191    this.gulps = 0;                                 // number of gulps
    195192    this.failed_swaps = 0;                          // steal swap failures
    196     this.empty_stolen = 0;                          // queues empty after steal
    197193    this.msgs_stolen = 0;                           // number of messages stolen
    198194    #endif
     
    214210#ifdef ACTOR_STATS
    215211// aggregate counters for statistics
    216 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, __total_empty_stolen = 0,
     212size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0,
    217213    __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0;
    218214#endif
     
    239235        unsigned int nprocessors, nworkers, nrqueues;   // number of processors/threads/request queues
    240236        bool seperate_clus;                                                             // use same or separate cluster for executor
    241     volatile bool is_shutdown;                      // flag to communicate shutdown to worker threads
    242237}; // executor
    243238
     
    253248    __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST);
    254249    __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST);
    255     __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST);
    256250
    257251    // per worker steal stats (uncomment alongside the lock above this routine to print)
     
    280274    this.nrqueues = nrqueues;
    281275    this.seperate_clus = seperate_clus;
    282     this.is_shutdown = false;
    283276
    284277    if ( nworkers == nrqueues )
     
    329322
    330323static inline void ^?{}( executor & this ) with(this) {
    331     is_shutdown = true;
     324    #ifdef __STEAL
     325    request sentinels[nrqueues];
     326    for ( unsigned int i = 0; i < nrqueues; i++ ) {
     327        insert( request_queues[i], sentinels[i] );              // force eventually termination
     328    } // for
     329    #else
     330    request sentinels[nworkers];
     331    unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
     332    for ( unsigned int i = 0, step = 0, range; i < nworkers; i += 1, step += range ) {
     333        range = reqPerWorker + ( i < extras ? 1 : 0 );
     334        insert( request_queues[step], sentinels[i] );           // force eventually termination
     335    } // for
     336    #endif
    332337
    333338    for ( i; nworkers )
     
    360365    size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps;
    361366    printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses);
    362     printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n",
    363         __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen);
     367    printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\n",
     368        __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps);
    364369    size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen;
    365370    printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal);
     
    444449static inline void check_message( message & this ) {
    445450    switch ( this.allocation_ ) {                                               // analyze message status
    446         case Nodelete: CFA_DEBUG( this.allocation_ = Finished ); break;
     451        case Nodelete: CFA_DEBUG(this.allocation_ = Finished); break;
    447452        case Delete: delete( &this ); break;
    448         case Destroy: ^?{}( this ); break;
     453        case Destroy: ^?{}(this); break;
    449454        case Finished: break;
    450455    } // switch
     
    456461static inline void deliver_request( request & this ) {
    457462    DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
    458     actor * base_actor;
    459     message * base_msg;
    460     allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg );
    461     base_actor->allocation_ = temp;
    462     check_message( *base_msg );
    463     check_actor( *base_actor );
     463    this.receiver->allocation_ = this.fn( *this.receiver, *this.msg );
     464    check_message( *this.msg );
     465    check_actor( *this.receiver );
    464466}
    465467
     
    511513        curr_steal_queue = request_queues[ i + vic_start ];
    512514        // avoid empty queues and queues that are being operated on
    513         if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) )
     515        if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) )
    514516            continue;
    515517
     
    519521            executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count;
    520522            executor_->w_infos[id].stolen++;
    521             if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++;
    522523            // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);
    523524            // replaced_queue[swap_idx]++;
     
    559560}
    560561
    561 #define CHECK_TERMINATION if ( unlikely( executor_->is_shutdown ) ) break Exit
    562562void main( worker & this ) with(this) {
    563563    // #ifdef ACTOR_STATS
     
    581581       
    582582        // check if queue is empty before trying to gulp it
    583         if ( is_empty( *curr_work_queue->c_queue ) ) {
     583        if ( isEmpty( *curr_work_queue->c_queue ) ) {
    584584            #ifdef __STEAL
    585585            empty_count++;
     
    594594        #endif // ACTOR_STATS
    595595        #ifdef __STEAL
    596         if ( is_empty( *current_queue ) ) {
    597             if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; }
     596        if ( isEmpty( *current_queue ) ) {
     597            if ( unlikely( no_steal ) ) continue;
    598598            empty_count++;
    599599            if ( empty_count < steal_threshold ) continue;
    600600            empty_count = 0;
    601 
    602             CHECK_TERMINATION; // check for termination
    603601
    604602            __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED );
     
    612610        }
    613611        #endif // __STEAL
    614         while ( ! is_empty( *current_queue ) ) {
     612        while ( ! isEmpty( *current_queue ) ) {
    615613            #ifdef ACTOR_STATS
    616614            executor_->w_infos[id].processed++;
     
    618616            &req = &remove( *current_queue );
    619617            if ( !&req ) continue;
     618            if ( req.stop ) break Exit;
    620619            deliver_request( req );
    621620        }
     
    624623        empty_count = 0; // we found work so reset empty counter
    625624        #endif
    626 
    627         CHECK_TERMINATION;
    628625       
    629626        // potentially reclaim some of the current queue's vector space if it is unused
     
    647644    __all_gulps = 0;
    648645    __total_failed_swaps = 0;
    649     __total_empty_stolen = 0;
    650646    __all_processed = 0;
    651647    __num_actors_stats = 0;
     
    661657}
    662658
    663 static inline void start_actor_system() { start_actor_system( get_proc_count( *active_cluster() ) ); }
     659// TODO: potentially revisit getting number of processors
     660//  ( currently the value stored in active_cluster()->procs.total is often stale
     661//  and doesn't reflect how many procs are allocated )
     662// static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); }
     663static inline void start_actor_system() { start_actor_system( 1 ); }
    664664
    665665static inline void start_actor_system( executor & this ) {
     
    671671
    672672static inline void stop_actor_system() {
    673     park( ); // will be unparked when actor system is finished
     673    park( ); // will receive signal when actor system is finished
    674674
    675675    if ( !__actor_executor_passed ) delete( __actor_executor_ );
Note: See TracChangeset for help on using the changeset viewer.