Changeset b7b3e41 for libcfa/src/concurrency
- Timestamp:
- Jun 19, 2023, 1:57:11 PM (2 years ago)
- Branches:
- master
- Children:
- adc73a5
- Parents:
- fa5e1aa5 (diff), 33d4bc8 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- libcfa/src/concurrency
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/actor.hfa
rfa5e1aa5 rb7b3e41 13 13 #endif // CFA_DEBUG 14 14 15 #define DEBUG_ABORT( cond, string ) CFA_DEBUG( if ( cond ) abort( string ) ) 16 15 17 // Define the default number of processors created in the executor. Must be greater than 0. 16 18 #define __DEFAULT_EXECUTOR_PROCESSORS__ 2 … … 28 30 #define __DEFAULT_EXECUTOR_BUFSIZE__ 10 29 31 30 #define __STEAL 0// workstealing toggle. Disjoint from toggles above32 #define __STEAL 1 // workstealing toggle. Disjoint from toggles above 31 33 32 34 // workstealing heuristic selection (only set one to be 1) … … 42 44 struct executor; 43 45 44 enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status45 46 typedef Allocation (*__receive_fn)(actor &, message &);46 enum allocation { Nodelete, Delete, Destroy, Finished }; // allocation status 47 48 typedef allocation (*__receive_fn)(actor &, message &); 47 49 struct request { 50 actor * base_receiver; 48 51 actor * receiver; 52 message * base_msg; 49 53 message * msg; 50 54 __receive_fn fn; 51 bool stop;52 55 }; 53 56 54 static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel 55 static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) { 57 struct a_msg { 58 int m; 59 }; 60 static inline void ?{}( request & this ) {} 61 static inline void ?{}( request & this, actor * base_receiver, actor * receiver, message * base_msg, message * msg, __receive_fn fn ) { 62 this.base_receiver = base_receiver; 56 63 this.receiver = receiver; 64 this.base_msg = base_msg; 57 65 this.msg = msg; 58 66 this.fn = fn; 59 this.stop = false;60 67 } 61 68 static inline void ?{}( request & this, request & copy ) { … … 63 70 this.msg = copy.msg; 64 71 this.fn = copy.fn; 65 this.stop = copy.stop;66 72 } 67 73 … … 81 87 last_size = 0; 82 88 } 83 static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); } 89 static inline void ^?{}( copy_queue & this ) with(this) { 90 DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" ); 91 adelete(buffer); 92 } 84 93 85 94 static inline void insert( copy_queue & this, request & elem ) with(this) { … … 115 124 } 116 125 117 static inline bool is Empty( copy_queue & this ) with(this) { return count == 0; }126 static inline bool is_empty( copy_queue & this ) with(this) { return count == 0; } 118 127 119 128 struct work_queue { … … 176 185 volatile unsigned long long stamp; 177 186 #ifdef ACTOR_STATS 178 size_t stolen_from, try_steal, stolen, failed_swaps, msgs_stolen;187 size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen; 179 188 unsigned long long processed; 180 189 size_t gulps; … … 189 198 this.gulps = 0; // number of gulps 190 199 this.failed_swaps = 0; // steal swap failures 200 this.empty_stolen = 0; // queues empty after steal 191 201 this.msgs_stolen = 0; // number of messages stolen 192 202 #endif … … 208 218 #ifdef ACTOR_STATS 209 219 // aggregate counters for statistics 210 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, 220 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, __total_empty_stolen = 0, 211 221 __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0; 212 222 #endif … … 233 243 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues 234 244 bool seperate_clus; // use same or separate cluster for executor 245 volatile bool is_shutdown; // flag to communicate shutdown to worker threads 235 246 }; // executor 236 247 … … 246 257 __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST); 247 258 __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST); 259 __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST); 248 260 249 261 // per worker steal stats (uncomment alongside the lock above this routine to print) … … 272 284 this.nrqueues = nrqueues; 273 285 this.seperate_clus = seperate_clus; 286 this.is_shutdown = false; 274 287 275 288 if ( nworkers == nrqueues ) … … 320 333 321 334 static inline void ^?{}( executor & this ) with(this) { 322 #ifdef __STEAL 323 request sentinels[nrqueues]; 324 for ( unsigned int i = 0; i < nrqueues; i++ ) { 325 insert( request_queues[i], sentinels[i] ); // force eventually termination 326 } // for 327 #else 328 request sentinels[nworkers]; 329 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 330 for ( unsigned int i = 0, step = 0, range; i < nworkers; i += 1, step += range ) { 331 range = reqPerWorker + ( i < extras ? 1 : 0 ); 332 insert( request_queues[step], sentinels[i] ); // force eventually termination 333 } // for 334 #endif 335 is_shutdown = true; 335 336 336 337 for ( i; nworkers ) … … 363 364 size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps; 364 365 printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses); 365 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\ n",366 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps );366 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n", 367 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen); 367 368 size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen; 368 369 printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal); … … 393 394 struct actor { 394 395 size_t ticket; // executor-queue handle 395 Allocation allocation_; // allocation action396 allocation allocation_; // allocation action 396 397 inline virtual_dtor; 397 398 }; … … 400 401 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive 401 402 // member must be called to end it 402 verifyf( __actor_executor_, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );403 DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" ); 403 404 allocation_ = Nodelete; 404 405 ticket = __get_next_ticket( *__actor_executor_ ); … … 430 431 431 432 struct message { 432 Allocation allocation_; // allocation action433 allocation allocation_; // allocation action 433 434 inline virtual_dtor; 434 435 }; … … 437 438 this.allocation_ = Nodelete; 438 439 } 439 static inline void ?{}( message & this, Allocation allocation) {440 memcpy( &this.allocation_, &alloc ation, sizeof(allocation) ); // optimization to elide ctor441 verifyf( this.allocation_ != Finished, "The Finished Allocation status is not supported for message types.\n");440 static inline void ?{}( message & this, allocation alloc ) { 441 memcpy( &this.allocation_, &alloc, sizeof(allocation) ); // optimization to elide ctor 442 DEBUG_ABORT( this.allocation_ == Finished, "The Finished allocation status is not supported for message types.\n" ); 442 443 } 443 444 static inline void ^?{}( message & this ) with(this) { … … 447 448 static inline void check_message( message & this ) { 448 449 switch ( this.allocation_ ) { // analyze message status 449 case Nodelete: CFA_DEBUG( this.allocation_ = Finished); break;450 case Nodelete: CFA_DEBUG( this.allocation_ = Finished ); break; 450 451 case Delete: delete( &this ); break; 451 case Destroy: ^?{}( this); break;452 case Destroy: ^?{}( this ); break; 452 453 case Finished: break; 453 454 } // switch 454 455 } 455 static inline void set_allocation( message & this, Allocation state ) {456 static inline void set_allocation( message & this, allocation state ) { 456 457 this.allocation_ = state; 457 458 } 458 459 459 460 static inline void deliver_request( request & this ) { 460 this.receiver->allocation_ = this.fn( *this.receiver, *this.msg ); 461 check_message( *this.msg ); 462 check_actor( *this.receiver ); 461 DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 462 this.base_receiver->allocation_ = this.fn( *this.receiver, *this.msg ); 463 check_message( *this.base_msg ); 464 check_actor( *this.base_receiver ); 463 465 } 464 466 … … 510 512 curr_steal_queue = request_queues[ i + vic_start ]; 511 513 // avoid empty queues and queues that are being operated on 512 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is Empty( *curr_steal_queue->c_queue ) )514 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) ) 513 515 continue; 514 516 … … 518 520 executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count; 519 521 executor_->w_infos[id].stolen++; 522 if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++; 520 523 // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED); 521 524 // replaced_queue[swap_idx]++; … … 557 560 } 558 561 562 #define CHECK_TERMINATION if ( unlikely( executor_->is_shutdown ) ) break Exit 559 563 void main( worker & this ) with(this) { 560 564 // #ifdef ACTOR_STATS … … 578 582 579 583 // check if queue is empty before trying to gulp it 580 if ( is Empty( *curr_work_queue->c_queue ) ) {584 if ( is_empty( *curr_work_queue->c_queue ) ) { 581 585 #ifdef __STEAL 582 586 empty_count++; … … 591 595 #endif // ACTOR_STATS 592 596 #ifdef __STEAL 593 if ( is Empty( *current_queue ) ) {594 if ( unlikely( no_steal ) ) continue;597 if ( is_empty( *current_queue ) ) { 598 if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; } 595 599 empty_count++; 596 600 if ( empty_count < steal_threshold ) continue; 597 601 empty_count = 0; 602 603 CHECK_TERMINATION; // check for termination 598 604 599 605 __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED ); … … 607 613 } 608 614 #endif // __STEAL 609 while ( ! is Empty( *current_queue ) ) {615 while ( ! is_empty( *current_queue ) ) { 610 616 #ifdef ACTOR_STATS 611 617 executor_->w_infos[id].processed++; … … 613 619 &req = &remove( *current_queue ); 614 620 if ( !&req ) continue; 615 if ( req.stop ) break Exit;616 621 deliver_request( req ); 617 622 } … … 631 636 632 637 static inline void send( actor & this, request & req ) { 633 verifyf( this.ticket != (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );638 DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 634 639 send( *__actor_executor_, req, this.ticket ); 635 640 } … … 641 646 __all_gulps = 0; 642 647 __total_failed_swaps = 0; 648 __total_empty_stolen = 0; 643 649 __all_processed = 0; 644 650 __num_actors_stats = 0; … … 654 660 } 655 661 656 // TODO: potentially revisit getting number of processors 657 // ( currently the value stored in active_cluster()->procs.total is often stale 658 // and doesn't reflect how many procs are allocated ) 659 // static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); } 660 static inline void start_actor_system() { start_actor_system( 1 ); } 662 static inline void start_actor_system() { start_actor_system( get_proc_count( *active_cluster() ) ); } 661 663 662 664 static inline void start_actor_system( executor & this ) { … … 668 670 669 671 static inline void stop_actor_system() { 670 park( ); // will receive signalwhen actor system is finished672 park( ); // will be unparked when actor system is finished 671 673 672 674 if ( !__actor_executor_passed ) delete( __actor_executor_ ); … … 680 682 // assigned at creation to __base_msg_finished to avoid unused message warning 681 683 message __base_msg_finished @= { .allocation_ : Finished }; 682 struct __ DeleteMsg { inline message; } DeleteMsg = __base_msg_finished;683 struct __ DestroyMsg { inline message; } DestroyMsg = __base_msg_finished;684 struct __ FinishedMsg { inline message; } FinishedMsg = __base_msg_finished;685 686 Allocation receive( actor & this, __DeleteMsg& msg ) { return Delete; }687 Allocation receive( actor & this, __DestroyMsg& msg ) { return Destroy; }688 Allocation receive( actor & this, __FinishedMsg& msg ) { return Finished; }689 684 struct __delete_msg_t { inline message; } delete_msg = __base_msg_finished; 685 struct __destroy_msg_t { inline message; } destroy_msg = __base_msg_finished; 686 struct __finished_msg_t { inline message; } finished_msg = __base_msg_finished; 687 688 allocation receive( actor & this, __delete_msg_t & msg ) { return Delete; } 689 allocation receive( actor & this, __destroy_msg_t & msg ) { return Destroy; } 690 allocation receive( actor & this, __finished_msg_t & msg ) { return Finished; } 691 -
libcfa/src/concurrency/atomic.hfa
rfa5e1aa5 rb7b3e41 10 10 // Created On : Thu May 25 15:22:46 2023 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Thu May 25 15:24:45202313 // Update Count : 112 // Last Modified On : Wed Jun 14 07:48:57 2023 13 // Update Count : 52 14 14 // 15 15 16 #define LOAD( lock ) (__atomic_load_n( &(lock), __ATOMIC_SEQ_CST )) 17 #define LOADM( lock, memorder ) (__atomic_load_n( &(lock), memorder )) 18 #define STORE( lock, assn ) (__atomic_store_n( &(lock), assn, __ATOMIC_SEQ_CST )) 19 #define STOREM( lock, assn, memorder ) (__atomic_store_n( &(lock), assn, memorder )) 20 #define CLR( lock ) (__atomic_clear( &(lock), __ATOMIC_RELEASE )) 21 #define CLRM( lock, memorder ) (__atomic_clear( &(lock), memorder )) 22 #define TAS( lock ) (__atomic_test_and_set( &(lock), __ATOMIC_ACQUIRE )) 23 #define TASM( lock, memorder ) (__atomic_test_and_set( &(lock), memorder )) 24 #define CAS( change, comp, assn ) ({typeof(comp) __temp = (comp); __atomic_compare_exchange_n( &(change), &(__temp), (assn), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ); }) 25 #define CASM( change, comp, assn, memorder... ) ({typeof(comp) * __temp = &(comp); __atomic_compare_exchange_n( &(change), &(__temp), (assn), false, memorder, memorder ); }) 26 #define CASV( change, comp, assn ) (__atomic_compare_exchange_n( &(change), &(comp), (assn), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST )) 27 #define CASVM( change, comp, assn, memorder... ) (__atomic_compare_exchange_n( &(change), &(comp), (assn), false, memorder, memorder )) 28 #define FAS( change, assn ) (__atomic_exchange_n( &(change), (assn), __ATOMIC_SEQ_CST )) 29 #define FASM( change, assn, memorder ) (__atomic_exchange_n( &(change), (assn), memorder )) 30 #define FAI( change, Inc ) (__atomic_fetch_add( &(change), (Inc), __ATOMIC_SEQ_CST )) 31 #define FAIM( change, Inc, memorder ) (__atomic_fetch_add( &(change), (Inc), memorder )) 16 #define LOAD( val ) (LOADM( val, __ATOMIC_SEQ_CST)) 17 #define LOADM( val, memorder ) (__atomic_load_n( &(val), memorder)) 18 19 #define STORE( val, assn ) (STOREM( val, assn, __ATOMIC_SEQ_CST)) 20 #define STOREM( val, assn, memorder ) (__atomic_store_n( &(val), assn, memorder)) 21 22 #define TAS( lock ) (TASM( lock, __ATOMIC_ACQUIRE)) 23 #define TASM( lock, memorder ) (__atomic_test_and_set( &(lock), memorder)) 24 25 #define TASCLR( lock ) (TASCLRM( lock, __ATOMIC_RELEASE)) 26 #define TASCLRM( lock, memorder ) (__atomic_clear( &(lock), memorder)) 27 28 #define FAS( assn, replace ) (FASM(assn, replace, __ATOMIC_SEQ_CST)) 29 #define FASM( assn, replace, memorder ) (__atomic_exchange_n( &(assn), (replace), memorder)) 30 31 #define FAI( assn, Inc ) (__atomic_fetch_add( &(assn), (Inc), __ATOMIC_SEQ_CST)) 32 #define FAIM( assn, Inc, memorder ) (__atomic_fetch_add( &(assn), (Inc), memorder)) 33 34 // Use __sync because __atomic with 128-bit CAA can result in calls to pthread_mutex_lock. 35 36 // #define CAS( assn, comp, replace ) (CASM( assn, comp, replace, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) 37 // #define CASM( assn, comp, replace, memorder... ) ({ typeof(comp) __temp = (comp); __atomic_compare_exchange_n( &(assn), &(__temp), (replace), false, memorder ); }) 38 #define CAS( assn, comp, replace ) (__sync_bool_compare_and_swap( &assn, comp, replace)) 39 #define CASM( assn, comp, replace, memorder... ) _Static_assert( false, "memory order unsupported for CAS macro" ); 40 41 // #define CASV( assn, comp, replace ) (__atomic_compare_exchange_n( &(assn), &(comp), (replace), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST )) 42 // #define CASVM( assn, comp, replace, memorder... ) (__atomic_compare_exchange_n( &(assn), &(comp), (replace), false, memorder, memorder )) 43 #define CASV( assn, comp, replace ) ({ \ 44 typeof(comp) temp = comp; \ 45 typeof(comp) old = __sync_val_compare_and_swap( &(assn), (comp), (replace) ); \ 46 old == temp ? true : (comp = old, false); \ 47 }) 48 #define CASVM( assn, comp, replace, memorder... ) _Static_assert( false, "memory order unsupported for CASV macro" ); -
libcfa/src/concurrency/kernel.hfa
rfa5e1aa5 rb7b3e41 195 195 // Total number of processors 196 196 unsigned total; 197 198 // Number of processors constructed 199 // incremented at construction time, total is incremented once the processor asyncronously registers 200 unsigned constructed; 197 201 198 202 // Total number of idle processors … … 297 301 static inline struct cluster * active_cluster () { return publicTLS_get( this_processor )->cltr; } 298 302 303 // gets the number of constructed processors on the cluster 304 static inline unsigned get_proc_count( cluster & this ) { return this.procs.constructed; } 305 299 306 // set the number of internal processors 300 307 // these processors are in addition to any explicitly declared processors -
libcfa/src/concurrency/kernel/cluster.hfa
rfa5e1aa5 rb7b3e41 40 40 41 41 // convert to log2 scale but using double 42 static inline __readyQ_avg_t __to_readyQ_avg(unsigned long long intsc) { if(unlikely(0 == intsc)) return 0.0; else return log2( intsc); }42 static inline __readyQ_avg_t __to_readyQ_avg(unsigned long long intsc) { if(unlikely(0 == intsc)) return 0.0; else return log2((__readyQ_avg_t)intsc); } 43 43 44 44 #define warn_large_before warnf( !strict || old_avg < 35.0, "Suspiciously large previous average: %'lf, %'" PRId64 "ms \n", old_avg, program()`ms ) -
libcfa/src/concurrency/kernel/startup.cfa
rfa5e1aa5 rb7b3e41 528 528 this.name = name; 529 529 this.cltr = &_cltr; 530 __atomic_add_fetch( &_cltr.procs.constructed, 1u, __ATOMIC_RELAXED ); 530 531 this.rdq.its = 0; 531 532 this.rdq.itr = 0; … … 595 596 __cfadbg_print_safe(runtime_core, "Kernel : core %p signaling termination\n", &this); 596 597 598 __atomic_sub_fetch( &this.cltr->procs.constructed, 1u, __ATOMIC_RELAXED ); 599 597 600 __atomic_store_n(&do_terminate, true, __ATOMIC_RELAXED); 598 601 __disable_interrupts_checked(); … … 615 618 this.fdw = 0p; 616 619 this.idle = 0; 620 this.constructed = 0; 617 621 this.total = 0; 618 622 } -
libcfa/src/concurrency/locks.hfa
rfa5e1aa5 rb7b3e41 32 32 #include "select.hfa" 33 33 34 #include <fstream.hfa>35 36 34 // futex headers 37 35 #include <linux/futex.h> /* Definition of FUTEX_* constants */
Note:
See TracChangeset
for help on using the changeset viewer.