Changeset 1e940de0
- Timestamp:
- Jun 14, 2023, 4:44:00 PM (21 months ago)
- Branches:
- master
- Children:
- 8d6786b
- Parents:
- 7e4bd9b6
- Location:
- libcfa/src
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/actor.hfa
r7e4bd9b6 r1e940de0 30 30 #define __DEFAULT_EXECUTOR_BUFSIZE__ 10 31 31 32 #define __STEAL 0// workstealing toggle. Disjoint from toggles above32 #define __STEAL 1 // workstealing toggle. Disjoint from toggles above 33 33 34 34 // workstealing heuristic selection (only set one to be 1) … … 48 48 typedef allocation (*__receive_fn)(actor &, message &); 49 49 struct request { 50 actor * base_receiver; 50 51 actor * receiver; 52 message * base_msg; 51 53 message * msg; 52 54 __receive_fn fn; 53 bool stop;55 // bool stop; // commented from change to termination flag from sentinels C_TODO: remove after confirming no performance degradation 54 56 }; 55 57 56 static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel 57 static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) { 58 struct a_msg { 59 int m; 60 }; 61 static inline void ?{}( request & this ) { 62 // this.stop = true; // default ctor makes a sentinel 63 } 64 static inline void ?{}( request & this, actor * base_receiver, actor * receiver, message * base_msg, message * msg, __receive_fn fn ) { 65 this.base_receiver = base_receiver; 58 66 this.receiver = receiver; 67 this.base_msg = base_msg; 59 68 this.msg = msg; 60 69 this.fn = fn; 61 this.stop = false;70 // this.stop = false; 62 71 } 63 72 static inline void ?{}( request & this, request & copy ) { … … 65 74 this.msg = copy.msg; 66 75 this.fn = copy.fn; 67 this.stop = copy.stop;76 // this.stop = copy.stop; 68 77 } 69 78 … … 83 92 last_size = 0; 84 93 } 85 static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); } 86 87 static inline void insert( copy_queue & this, request & elem ) with(this) { 94 static inline void ^?{}( copy_queue & this ) with(this) { 95 DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" ); 96 adelete(buffer); 97 } 98 99 static inline void insert( copy_queue & this, request & elem ) with(this) { // C_TODO: remove redundant send/insert once decision is made on emplace/copy 88 100 if ( count >= buffer_size ) { // increase arr size 89 101 last_size = buffer_size; … … 117 129 } 118 130 119 static inline bool is Empty( copy_queue & this ) with(this) { return count == 0; }131 static inline bool is_empty( copy_queue & this ) with(this) { return count == 0; } 120 132 121 133 struct work_queue { … … 178 190 volatile unsigned long long stamp; 179 191 #ifdef ACTOR_STATS 180 size_t stolen_from, try_steal, stolen, failed_swaps, msgs_stolen;192 size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen; 181 193 unsigned long long processed; 182 194 size_t gulps; … … 191 203 this.gulps = 0; // number of gulps 192 204 this.failed_swaps = 0; // steal swap failures 205 this.empty_stolen = 0; // queues empty after steal 193 206 this.msgs_stolen = 0; // number of messages stolen 194 207 #endif … … 210 223 #ifdef ACTOR_STATS 211 224 // aggregate counters for statistics 212 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, 225 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, __total_empty_stolen = 0, 213 226 __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0; 214 227 #endif … … 235 248 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues 236 249 bool seperate_clus; // use same or separate cluster for executor 250 volatile bool is_shutdown; // flag to communicate shutdown to worker threads 237 251 }; // executor 238 252 … … 248 262 __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST); 249 263 __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST); 264 __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST); 250 265 251 266 // per worker steal stats (uncomment alongside the lock above this routine to print) … … 274 289 this.nrqueues = nrqueues; 275 290 this.seperate_clus = seperate_clus; 291 this.is_shutdown = false; 276 292 277 293 if ( nworkers == nrqueues ) … … 322 338 323 339 static inline void ^?{}( executor & this ) with(this) { 324 #ifdef __STEAL 325 request sentinels[nrqueues]; 326 for ( unsigned int i = 0; i < nrqueues; i++ ) { 327 insert( request_queues[i], sentinels[i] ); // force eventually termination 328 } // for 329 #else 330 request sentinels[nworkers]; 331 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 332 for ( unsigned int i = 0, step = 0, range; i < nworkers; i += 1, step += range ) { 333 range = reqPerWorker + ( i < extras ? 1 : 0 ); 334 insert( request_queues[step], sentinels[i] ); // force eventually termination 335 } // for 336 #endif 340 // #ifdef __STEAL // commented from change to termination flag from sentinels C_TODO: remove after confirming no performance degradation 341 // request sentinels[nrqueues]; 342 // for ( unsigned int i = 0; i < nrqueues; i++ ) { 343 // insert( request_queues[i], sentinels[i] ); // force eventually termination 344 // } // for 345 // #else 346 // request sentinels[nworkers]; 347 // unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 348 // for ( unsigned int i = 0, step = 0, range; i < nworkers; i += 1, step += range ) { 349 // range = reqPerWorker + ( i < extras ? 1 : 0 ); 350 // insert( request_queues[step], sentinels[i] ); // force eventually termination 351 // } // for 352 // #endif 353 is_shutdown = true; 337 354 338 355 for ( i; nworkers ) … … 365 382 size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps; 366 383 printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses); 367 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\ n",368 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps );384 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n", 385 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen); 369 386 size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen; 370 387 printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal); … … 449 466 static inline void check_message( message & this ) { 450 467 switch ( this.allocation_ ) { // analyze message status 451 case Nodelete: CFA_DEBUG( this.allocation_ = Finished); break;468 case Nodelete: CFA_DEBUG( this.allocation_ = Finished ); break; 452 469 case Delete: delete( &this ); break; 453 case Destroy: ^?{}( this); break;470 case Destroy: ^?{}( this ); break; 454 471 case Finished: break; 455 472 } // switch … … 461 478 static inline void deliver_request( request & this ) { 462 479 DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 463 this. receiver->allocation_ = this.fn( *this.receiver, *this.msg );464 check_message( *this. msg );465 check_actor( *this. receiver );480 this.base_receiver->allocation_ = this.fn( *this.receiver, *this.msg ); 481 check_message( *this.base_msg ); 482 check_actor( *this.base_receiver ); 466 483 } 467 484 … … 513 530 curr_steal_queue = request_queues[ i + vic_start ]; 514 531 // avoid empty queues and queues that are being operated on 515 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is Empty( *curr_steal_queue->c_queue ) )532 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) ) 516 533 continue; 517 534 … … 521 538 executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count; 522 539 executor_->w_infos[id].stolen++; 540 if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++; 523 541 // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED); 524 542 // replaced_queue[swap_idx]++; … … 560 578 } 561 579 580 #define CHECK_TERMINATION if ( unlikely( executor_->is_shutdown ) ) break Exit 562 581 void main( worker & this ) with(this) { 563 582 // #ifdef ACTOR_STATS … … 581 600 582 601 // check if queue is empty before trying to gulp it 583 if ( is Empty( *curr_work_queue->c_queue ) ) {602 if ( is_empty( *curr_work_queue->c_queue ) ) { 584 603 #ifdef __STEAL 585 604 empty_count++; … … 594 613 #endif // ACTOR_STATS 595 614 #ifdef __STEAL 596 if ( is Empty( *current_queue ) ) {597 if ( unlikely( no_steal ) ) continue;615 if ( is_empty( *current_queue ) ) { 616 if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; } // C_TODO: if this impacts static/dynamic perf refactor check 598 617 empty_count++; 599 618 if ( empty_count < steal_threshold ) continue; 600 619 empty_count = 0; 620 621 CHECK_TERMINATION; // check for termination 601 622 602 623 __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED ); … … 610 631 } 611 632 #endif // __STEAL 612 while ( ! is Empty( *current_queue ) ) {633 while ( ! is_empty( *current_queue ) ) { 613 634 #ifdef ACTOR_STATS 614 635 executor_->w_infos[id].processed++; … … 616 637 &req = &remove( *current_queue ); 617 638 if ( !&req ) continue; 618 if ( req.stop ) break Exit;639 // if ( req.stop ) break Exit; 619 640 deliver_request( req ); 620 641 } … … 644 665 __all_gulps = 0; 645 666 __total_failed_swaps = 0; 667 __total_empty_stolen = 0; 646 668 __all_processed = 0; 647 669 __num_actors_stats = 0; -
libcfa/src/virtual_dtor.hfa
r7e4bd9b6 r1e940de0 25 25 __virtual_obj_start = &this; 26 26 } 27 static inline void __CFA_dtor_shutdown( virtual_dtor & this ) with(this) { 27 static inline bool __CFA_dtor_shutdown( virtual_dtor & this ) with(this) { 28 if ( __virtual_dtor_ptr == 1p ) return true; // stop base dtors from being called twice 28 29 if ( __virtual_dtor_ptr ) { 29 30 void (*dtor_ptr)(virtual_dtor &) = __virtual_dtor_ptr; 30 31 __virtual_dtor_ptr = 0p; 31 dtor_ptr(*((virtual_dtor *)__virtual_obj_start)); // replace actor with base type 32 return; 32 dtor_ptr(*((virtual_dtor *)__virtual_obj_start)); // call most derived dtor 33 __virtual_dtor_ptr = 1p; // stop base dtors from being called twice 34 return true; 33 35 } 36 return false; 34 37 } 35 38 static inline void __CFA_virt_free( virtual_dtor & this ) { free( this.__virtual_obj_start ); }
Note: See TracChangeset
for help on using the changeset viewer.