- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/actor.hfa
r0794365 r70d8e2f2 30 30 #define __DEFAULT_EXECUTOR_BUFSIZE__ 10 31 31 32 #define __STEAL 0// workstealing toggle. Disjoint from toggles above32 #define __STEAL 1 // workstealing toggle. Disjoint from toggles above 33 33 34 34 // workstealing heuristic selection (only set one to be 1) … … 46 46 enum allocation { Nodelete, Delete, Destroy, Finished }; // allocation status 47 47 48 typedef allocation (*__receive_fn)(actor &, message & );48 typedef allocation (*__receive_fn)(actor &, message &, actor **, message **); 49 49 struct request { 50 50 actor * receiver; 51 51 message * msg; 52 52 __receive_fn fn; 53 bool stop;54 53 }; 55 54 56 static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel 55 struct a_msg { 56 int m; 57 }; 58 static inline void ?{}( request & this ) {} 57 59 static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) { 58 60 this.receiver = receiver; 59 61 this.msg = msg; 60 62 this.fn = fn; 61 this.stop = false;62 63 } 63 64 static inline void ?{}( request & this, request & copy ) { … … 65 66 this.msg = copy.msg; 66 67 this.fn = copy.fn; 67 this.stop = copy.stop;68 68 } 69 69 … … 83 83 last_size = 0; 84 84 } 85 static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); } 85 static inline void ^?{}( copy_queue & this ) with(this) { 86 DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" ); 87 adelete(buffer); 88 } 86 89 87 90 static inline void insert( copy_queue & this, request & elem ) with(this) { … … 117 120 } 118 121 119 static inline bool is Empty( copy_queue & this ) with(this) { return count == 0; }122 static inline bool is_empty( copy_queue & this ) with(this) { return count == 0; } 120 123 121 124 struct work_queue { … … 178 181 volatile unsigned long long stamp; 179 182 #ifdef ACTOR_STATS 180 size_t stolen_from, try_steal, stolen, failed_swaps, msgs_stolen;183 size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen; 181 184 unsigned long long processed; 182 185 size_t gulps; … … 191 194 this.gulps = 0; // number of gulps 192 195 this.failed_swaps = 0; // steal swap failures 196 this.empty_stolen = 0; // queues empty after steal 193 197 this.msgs_stolen = 0; // number of messages stolen 194 198 #endif … … 210 214 #ifdef ACTOR_STATS 211 215 // aggregate counters for statistics 212 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, 216 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, __total_empty_stolen = 0, 213 217 __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0; 214 218 #endif … … 235 239 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues 236 240 bool seperate_clus; // use same or separate cluster for executor 241 volatile bool is_shutdown; // flag to communicate shutdown to worker threads 237 242 }; // executor 238 243 … … 248 253 __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST); 249 254 __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST); 255 __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST); 250 256 251 257 // per worker steal stats (uncomment alongside the lock above this routine to print) … … 274 280 this.nrqueues = nrqueues; 275 281 this.seperate_clus = seperate_clus; 282 this.is_shutdown = false; 276 283 277 284 if ( nworkers == nrqueues ) … … 322 329 323 330 static inline void ^?{}( executor & this ) with(this) { 324 #ifdef __STEAL 325 request sentinels[nrqueues]; 326 for ( unsigned int i = 0; i < nrqueues; i++ ) { 327 insert( request_queues[i], sentinels[i] ); // force eventually termination 328 } // for 329 #else 330 request sentinels[nworkers]; 331 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 332 for ( unsigned int i = 0, step = 0, range; i < nworkers; i += 1, step += range ) { 333 range = reqPerWorker + ( i < extras ? 1 : 0 ); 334 insert( request_queues[step], sentinels[i] ); // force eventually termination 335 } // for 336 #endif 331 is_shutdown = true; 337 332 338 333 for ( i; nworkers ) … … 365 360 size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps; 366 361 printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses); 367 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\ n",368 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps );362 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n", 363 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen); 369 364 size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen; 370 365 printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal); … … 449 444 static inline void check_message( message & this ) { 450 445 switch ( this.allocation_ ) { // analyze message status 451 case Nodelete: CFA_DEBUG( this.allocation_ = Finished); break;446 case Nodelete: CFA_DEBUG( this.allocation_ = Finished ); break; 452 447 case Delete: delete( &this ); break; 453 case Destroy: ^?{}( this); break;448 case Destroy: ^?{}( this ); break; 454 449 case Finished: break; 455 450 } // switch … … 461 456 static inline void deliver_request( request & this ) { 462 457 DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 463 this.receiver->allocation_ = this.fn( *this.receiver, *this.msg ); 464 check_message( *this.msg ); 465 check_actor( *this.receiver ); 458 actor * base_actor; 459 message * base_msg; 460 allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg ); 461 base_actor->allocation_ = temp; 462 check_message( *base_msg ); 463 check_actor( *base_actor ); 466 464 } 467 465 … … 513 511 curr_steal_queue = request_queues[ i + vic_start ]; 514 512 // avoid empty queues and queues that are being operated on 515 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is Empty( *curr_steal_queue->c_queue ) )513 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) ) 516 514 continue; 517 515 … … 521 519 executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count; 522 520 executor_->w_infos[id].stolen++; 521 if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++; 523 522 // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED); 524 523 // replaced_queue[swap_idx]++; … … 560 559 } 561 560 561 #define CHECK_TERMINATION if ( unlikely( executor_->is_shutdown ) ) break Exit 562 562 void main( worker & this ) with(this) { 563 563 // #ifdef ACTOR_STATS … … 581 581 582 582 // check if queue is empty before trying to gulp it 583 if ( is Empty( *curr_work_queue->c_queue ) ) {583 if ( is_empty( *curr_work_queue->c_queue ) ) { 584 584 #ifdef __STEAL 585 585 empty_count++; … … 594 594 #endif // ACTOR_STATS 595 595 #ifdef __STEAL 596 if ( is Empty( *current_queue ) ) {597 if ( unlikely( no_steal ) ) continue;596 if ( is_empty( *current_queue ) ) { 597 if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; } 598 598 empty_count++; 599 599 if ( empty_count < steal_threshold ) continue; 600 600 empty_count = 0; 601 602 CHECK_TERMINATION; // check for termination 601 603 602 604 __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED ); … … 610 612 } 611 613 #endif // __STEAL 612 while ( ! is Empty( *current_queue ) ) {614 while ( ! is_empty( *current_queue ) ) { 613 615 #ifdef ACTOR_STATS 614 616 executor_->w_infos[id].processed++; … … 616 618 &req = &remove( *current_queue ); 617 619 if ( !&req ) continue; 618 if ( req.stop ) break Exit;619 620 deliver_request( req ); 620 621 } … … 623 624 empty_count = 0; // we found work so reset empty counter 624 625 #endif 626 627 CHECK_TERMINATION; 625 628 626 629 // potentially reclaim some of the current queue's vector space if it is unused … … 644 647 __all_gulps = 0; 645 648 __total_failed_swaps = 0; 649 __total_empty_stolen = 0; 646 650 __all_processed = 0; 647 651 __num_actors_stats = 0; … … 657 661 } 658 662 659 // TODO: potentially revisit getting number of processors 660 // ( currently the value stored in active_cluster()->procs.total is often stale 661 // and doesn't reflect how many procs are allocated ) 662 // static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); } 663 static inline void start_actor_system() { start_actor_system( 1 ); } 663 static inline void start_actor_system() { start_actor_system( get_proc_count( *active_cluster() ) ); } 664 664 665 665 static inline void start_actor_system( executor & this ) { … … 671 671 672 672 static inline void stop_actor_system() { 673 park( ); // will receive signalwhen actor system is finished673 park( ); // will be unparked when actor system is finished 674 674 675 675 if ( !__actor_executor_passed ) delete( __actor_executor_ );
Note: See TracChangeset
for help on using the changeset viewer.