Changeset f23d34db for libcfa/src/concurrency
- Timestamp:
- Mar 5, 2023, 12:34:45 PM (21 months ago)
- Branches:
- ADT, ast-experimental, master
- Children:
- 5adf4f4, 99fb52c
- Parents:
- e54b4e9
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/actor.hfa
re54b4e9 rf23d34db 175 175 volatile unsigned long long stamp; 176 176 #ifdef STATS 177 size_t stolen_from; 177 size_t stolen_from, try_steal, stolen, failed_swaps, msgs_stolen; 178 unsigned long long processed; 179 size_t gulps; 178 180 #endif 179 181 }; … … 181 183 #ifdef STATS 182 184 this.stolen_from = 0; 185 this.try_steal = 0; // attempts to steal 186 this.stolen = 0; // successful steals 187 this.processed = 0; // requests processed 188 this.gulps = 0; // number of gulps 189 this.failed_swaps = 0; // steal swap failures 190 this.msgs_stolen = 0; // number of messages stolen 183 191 #endif 184 192 this.stamp = rdtscl(); 185 193 } 186 194 187 #ifdef STATS188 unsigned int * stolen_arr;189 unsigned int * replaced_queue;190 #endif195 // #ifdef STATS 196 // unsigned int * stolen_arr; 197 // unsigned int * replaced_queue; 198 // #endif 191 199 thread worker { 192 200 work_queue ** request_queues; … … 195 203 unsigned int start, range; 196 204 int id; 197 #ifdef STATS198 size_t try_steal, stolen, failed_swaps, msgs_stolen;199 unsigned long long processed;200 size_t gulps;201 #endif202 205 }; 203 206 204 207 #ifdef STATS 205 208 // aggregate counters for statistics 206 size_t total_tries = 0, total_stolen = 0, total_workers,all_gulps = 0,207 total_failed_swaps = 0, all_processed = 0, __num_actors_stats = 0,all_msgs_stolen = 0;209 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, 210 __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0; 208 211 #endif 209 212 static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, executor * executor_, … … 216 219 this.range = range; // size of worker's subrange of request_queues 217 220 this.id = id; // worker's id and index in array of workers 218 #ifdef STATS219 this.try_steal = 0; // attempts to steal220 this.stolen = 0; // successful steals221 this.processed = 0; // requests processed222 this.gulps = 0; // number of gulps223 this.failed_swaps = 0; // steal swap failures224 this.msgs_stolen = 0; // number of messages stolen225 #endif226 221 } 227 222 … … 244 239 static inline void ^?{}( worker & mutex this ) with(this) { 245 240 #ifdef STATS 246 __atomic_add_fetch(& all_gulps,gulps,__ATOMIC_SEQ_CST);247 __atomic_add_fetch(& all_processed,processed,__ATOMIC_SEQ_CST);248 __atomic_add_fetch(& all_msgs_stolen,msgs_stolen,__ATOMIC_SEQ_CST);249 __atomic_add_fetch(& total_tries,try_steal, __ATOMIC_SEQ_CST);250 __atomic_add_fetch(& total_stolen,stolen, __ATOMIC_SEQ_CST);251 __atomic_add_fetch(& total_failed_swaps,failed_swaps, __ATOMIC_SEQ_CST);241 __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST); 242 __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST); 243 __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST); 244 __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST); 245 __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST); 246 __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST); 252 247 253 248 // per worker steal stats (uncomment alongside the lock above this routine to print) … … 281 276 282 277 #ifdef STATS 283 stolen_arr = aalloc( nrqueues );284 replaced_queue = aalloc( nrqueues );285 total_workers = nworkers;278 // stolen_arr = aalloc( nrqueues ); 279 // replaced_queue = aalloc( nrqueues ); 280 __total_workers = nworkers; 286 281 #endif 287 282 … … 350 345 misses += worker_req_queues[i]->missed; 351 346 } 352 adelete( stolen_arr );353 adelete( replaced_queue );347 // adelete( stolen_arr ); 348 // adelete( replaced_queue ); 354 349 #endif 355 350 … … 364 359 #ifdef STATS 365 360 printf(" Actor System Stats:\n"); 366 printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, all_processed);367 size_t avg_gulps = all_gulps == 0 ? 0 : all_processed /all_gulps;368 printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", all_gulps, avg_gulps, misses);361 printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed); 362 size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps; 363 printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses); 369 364 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\n", 370 total_tries, total_stolen, total_tries - total_stolen - total_failed_swaps,total_failed_swaps);371 size_t avg_steal = total_stolen == 0 ? 0 : all_msgs_stolen /total_stolen;372 printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", all_msgs_stolen, avg_steal);365 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps); 366 size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen; 367 printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal); 373 368 #endif 374 369 … … 443 438 444 439 static inline void check_message( message & this ) { 440 CFA_DEBUG( this.allocation_ = Finished; ) 445 441 switch ( this.allocation_ ) { // analyze message status 446 case Nodelete: CFA_DEBUG( this.allocation_ = Finished; )break;442 case Nodelete: break; 447 443 case Delete: delete( &this ); break; 448 444 case Destroy: ^?{}(this); break; … … 512 508 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 513 509 if ( curr_steal_queue ) { 514 msgs_stolen += curr_steal_queue->c_queue->count;515 stolen++;516 __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);517 replaced_queue[swap_idx]++;518 __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED);510 executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count; 511 executor_->w_infos[id].stolen++; 512 // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED); 513 // replaced_queue[swap_idx]++; 514 // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED); 519 515 } else { 520 failed_swaps++;516 executor_->w_infos[id].failed_swaps++; 521 517 } 522 518 #else … … 554 550 555 551 void main( worker & this ) with(this) { 556 #ifdef STATS557 for ( i; executor_->nrqueues ) {558 replaced_queue[i] = 0;559 __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST );560 }561 #endif552 // #ifdef STATS 553 // for ( i; executor_->nrqueues ) { 554 // replaced_queue[i] = 0; 555 // __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST ); 556 // } 557 // #endif 562 558 563 559 // threshold of empty queues we see before we go stealing … … 586 582 transfer( *curr_work_queue, ¤t_queue ); 587 583 #ifdef STATS 588 gulps++;584 executor_->w_infos[id].gulps++; 589 585 #endif // STATS 590 586 #ifdef __STEAL … … 598 594 599 595 #ifdef STATS 600 try_steal++;596 executor_->w_infos[id].try_steal++; 601 597 #endif // STATS 602 598 … … 607 603 while ( ! isEmpty( *current_queue ) ) { 608 604 #ifdef STATS 609 processed++;605 executor_->w_infos[id].processed++; 610 606 #endif 611 607 &req = &remove( *current_queue ); … … 633 629 } 634 630 631 static inline void __reset_stats() { 632 #ifdef STATS 633 __total_tries = 0; 634 __total_stolen = 0; 635 __all_gulps = 0; 636 __total_failed_swaps = 0; 637 __all_processed = 0; 638 __num_actors_stats = 0; 639 __all_msgs_stolen = 0; 640 #endif 641 } 642 635 643 static inline void start_actor_system( size_t num_thds ) { 644 __reset_stats(); 636 645 __actor_executor_thd = active_thread(); 637 646 __actor_executor_ = alloc(); … … 639 648 } 640 649 641 // TODO: potentially getting revisitnumber of processors650 // TODO: potentially revisit getting number of processors 642 651 // ( currently the value stored in active_cluster()->procs.total is often stale 643 652 // and doesn't reflect how many procs are allocated ) … … 646 655 647 656 static inline void start_actor_system( executor & this ) { 657 __reset_stats(); 648 658 __actor_executor_thd = active_thread(); 649 659 __actor_executor_ = &this;
Note: See TracChangeset
for help on using the changeset viewer.