Changeset f23d34db for libcfa/src


Ignore:
Timestamp:
Mar 5, 2023, 12:34:45 PM (15 months ago)
Author:
caparsons <caparson@…>
Branches:
ADT, ast-experimental, master
Children:
5adf4f4, 99fb52c
Parents:
e54b4e9
Message:

changed some stat collection for actors and attempted to fix matrix.cfa related bug

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/actor.hfa

    re54b4e9 rf23d34db  
    175175    volatile unsigned long long stamp;
    176176    #ifdef STATS
    177     size_t stolen_from;
     177    size_t stolen_from, try_steal, stolen, failed_swaps, msgs_stolen;
     178    unsigned long long processed;
     179    size_t gulps;
    178180    #endif
    179181};
     
    181183    #ifdef STATS
    182184    this.stolen_from = 0;
     185    this.try_steal = 0;                             // attempts to steal
     186    this.stolen = 0;                                // successful steals
     187    this.processed = 0;                             // requests processed
     188    this.gulps = 0;                                 // number of gulps
     189    this.failed_swaps = 0;                          // steal swap failures
     190    this.msgs_stolen = 0;                           // number of messages stolen
    183191    #endif
    184192    this.stamp = rdtscl();
    185193}
    186194
    187 #ifdef STATS
    188 unsigned int * stolen_arr;
    189 unsigned int * replaced_queue;
    190 #endif
     195// #ifdef STATS
     196// unsigned int * stolen_arr;
     197// unsigned int * replaced_queue;
     198// #endif
    191199thread worker {
    192200    work_queue ** request_queues;
     
    195203    unsigned int start, range;
    196204    int id;
    197     #ifdef STATS
    198     size_t try_steal, stolen, failed_swaps, msgs_stolen;
    199     unsigned long long processed;
    200     size_t gulps;
    201     #endif
    202205};
    203206
    204207#ifdef STATS
    205208// aggregate counters for statistics
    206 size_t total_tries = 0, total_stolen = 0, total_workers, all_gulps = 0,
    207     total_failed_swaps = 0, all_processed = 0, __num_actors_stats = 0, all_msgs_stolen = 0;
     209size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0,
     210    __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0;
    208211#endif
    209212static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, executor * executor_,
     
    216219    this.range = range;                             // size of worker's subrange of request_queues
    217220    this.id = id;                                   // worker's id and index in array of workers
    218     #ifdef STATS
    219     this.try_steal = 0;                             // attempts to steal
    220     this.stolen = 0;                                // successful steals
    221     this.processed = 0;                             // requests processed
    222     this.gulps = 0;                                 // number of gulps
    223     this.failed_swaps = 0;                          // steal swap failures
    224     this.msgs_stolen = 0;                           // number of messages stolen
    225     #endif
    226221}
    227222
     
    244239static inline void ^?{}( worker & mutex this ) with(this) {
    245240    #ifdef STATS
    246     __atomic_add_fetch(&all_gulps, gulps,__ATOMIC_SEQ_CST);
    247     __atomic_add_fetch(&all_processed, processed,__ATOMIC_SEQ_CST);
    248     __atomic_add_fetch(&all_msgs_stolen, msgs_stolen,__ATOMIC_SEQ_CST);
    249     __atomic_add_fetch(&total_tries, try_steal, __ATOMIC_SEQ_CST);
    250     __atomic_add_fetch(&total_stolen, stolen, __ATOMIC_SEQ_CST);
    251     __atomic_add_fetch(&total_failed_swaps, failed_swaps, __ATOMIC_SEQ_CST);
     241    __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST);
     242    __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST);
     243    __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST);
     244    __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST);
     245    __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST);
     246    __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST);
    252247
    253248    // per worker steal stats (uncomment alongside the lock above this routine to print)
     
    281276   
    282277    #ifdef STATS
    283     stolen_arr = aalloc( nrqueues );
    284     replaced_queue = aalloc( nrqueues );
    285     total_workers = nworkers;
     278    // stolen_arr = aalloc( nrqueues );
     279    // replaced_queue = aalloc( nrqueues );
     280    __total_workers = nworkers;
    286281    #endif
    287282
     
    350345        misses += worker_req_queues[i]->missed;
    351346    }
    352     adelete( stolen_arr );
    353     adelete( replaced_queue );
     347    // adelete( stolen_arr );
     348    // adelete( replaced_queue );
    354349    #endif
    355350
     
    364359    #ifdef STATS
    365360    printf("    Actor System Stats:\n");
    366     printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, all_processed);
    367     size_t avg_gulps = all_gulps == 0 ? 0 : all_processed / all_gulps;
    368     printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", all_gulps, avg_gulps, misses);
     361    printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed);
     362    size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps;
     363    printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses);
    369364    printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\n",
    370         total_tries, total_stolen, total_tries - total_stolen - total_failed_swaps, total_failed_swaps);
    371     size_t avg_steal = total_stolen == 0 ? 0 : all_msgs_stolen / total_stolen;
    372     printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", all_msgs_stolen, avg_steal);
     365        __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps);
     366    size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen;
     367    printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal);
    373368    #endif
    374369       
     
    443438
    444439static inline void check_message( message & this ) {
     440    CFA_DEBUG( this.allocation_ = Finished; )
    445441    switch ( this.allocation_ ) {                                               // analyze message status
    446         case Nodelete: CFA_DEBUG( this.allocation_ = Finished; ) break;
     442        case Nodelete: break;
    447443        case Delete: delete( &this ); break;
    448444        case Destroy: ^?{}(this); break;
     
    512508        curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
    513509        if ( curr_steal_queue ) {
    514             msgs_stolen += curr_steal_queue->c_queue->count;
    515             stolen++;
    516             __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);
    517             replaced_queue[swap_idx]++;
    518             __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED);
     510            executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count;
     511            executor_->w_infos[id].stolen++;
     512            // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);
     513            // replaced_queue[swap_idx]++;
     514            // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED);
    519515        } else {
    520             failed_swaps++;
     516            executor_->w_infos[id].failed_swaps++;
    521517        }
    522518        #else
     
    554550
    555551void main( worker & this ) with(this) {
    556     #ifdef STATS
    557     for ( i; executor_->nrqueues ) {
    558         replaced_queue[i] = 0;
    559         __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST );
    560     }
    561     #endif
     552    // #ifdef STATS
     553    // for ( i; executor_->nrqueues ) {
     554    //     replaced_queue[i] = 0;
     555    //     __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST );
     556    // }
     557    // #endif
    562558
    563559    // threshold of empty queues we see before we go stealing
     
    586582        transfer( *curr_work_queue, &current_queue );
    587583        #ifdef STATS
    588         gulps++;
     584        executor_->w_infos[id].gulps++;
    589585        #endif // STATS
    590586        #ifdef __STEAL
     
    598594           
    599595            #ifdef STATS
    600             try_steal++;
     596            executor_->w_infos[id].try_steal++;
    601597            #endif // STATS
    602598           
     
    607603        while ( ! isEmpty( *current_queue ) ) {
    608604            #ifdef STATS
    609             processed++;
     605            executor_->w_infos[id].processed++;
    610606            #endif
    611607            &req = &remove( *current_queue );
     
    633629}
    634630
     631static inline void __reset_stats() {
     632    #ifdef STATS
     633    __total_tries = 0;
     634    __total_stolen = 0;
     635    __all_gulps = 0;
     636    __total_failed_swaps = 0;
     637    __all_processed = 0;
     638    __num_actors_stats = 0;
     639    __all_msgs_stolen = 0;
     640    #endif
     641}
     642
    635643static inline void start_actor_system( size_t num_thds ) {
     644    __reset_stats();
    636645    __actor_executor_thd = active_thread();
    637646    __actor_executor_ = alloc();
     
    639648}
    640649
    641 // TODO: potentially getting revisit number of processors
     650// TODO: potentially revisit getting number of processors
    642651//  ( currently the value stored in active_cluster()->procs.total is often stale
    643652//  and doesn't reflect how many procs are allocated )
     
    646655
    647656static inline void start_actor_system( executor & this ) {
     657    __reset_stats();
    648658    __actor_executor_thd = active_thread();
    649659    __actor_executor_ = &this;
Note: See TracChangeset for help on using the changeset viewer.