Ignore:
Timestamp:
Jun 19, 2023, 1:57:11 PM (2 years ago)
Author:
caparson <caparson@…>
Branches:
master
Children:
adc73a5
Parents:
fa5e1aa5 (diff), 33d4bc8 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Location:
libcfa/src/concurrency
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/actor.hfa

    rfa5e1aa5 rb7b3e41  
    1313#endif // CFA_DEBUG
    1414
     15#define DEBUG_ABORT( cond, string ) CFA_DEBUG( if ( cond ) abort( string ) )
     16
    1517// Define the default number of processors created in the executor. Must be greater than 0.
    1618#define __DEFAULT_EXECUTOR_PROCESSORS__ 2
     
    2830#define __DEFAULT_EXECUTOR_BUFSIZE__ 10
    2931
    30 #define __STEAL 0 // workstealing toggle. Disjoint from toggles above
     32#define __STEAL 1 // workstealing toggle. Disjoint from toggles above
    3133
    3234// workstealing heuristic selection (only set one to be 1)
     
    4244struct executor;
    4345
    44 enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status
    45 
    46 typedef Allocation (*__receive_fn)(actor &, message &);
     46enum allocation { Nodelete, Delete, Destroy, Finished }; // allocation status
     47
     48typedef allocation (*__receive_fn)(actor &, message &);
    4749struct request {
     50    actor * base_receiver;
    4851    actor * receiver;
     52    message * base_msg;
    4953    message * msg;
    5054    __receive_fn fn;
    51     bool stop;
    5255};
    5356
    54 static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel
    55 static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {
     57struct a_msg {
     58    int m;
     59};
     60static inline void ?{}( request & this ) {}
     61static inline void ?{}( request & this, actor * base_receiver, actor * receiver, message * base_msg, message * msg, __receive_fn fn ) {
     62    this.base_receiver = base_receiver;
    5663    this.receiver = receiver;
     64    this.base_msg = base_msg;
    5765    this.msg = msg;
    5866    this.fn = fn;
    59     this.stop = false;
    6067}
    6168static inline void ?{}( request & this, request & copy ) {
     
    6370    this.msg = copy.msg;
    6471    this.fn = copy.fn;
    65     this.stop = copy.stop;
    6672}
    6773
     
    8187    last_size = 0;
    8288}
    83 static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); }
     89static inline void ^?{}( copy_queue & this ) with(this) {
     90    DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" );
     91    adelete(buffer);
     92}
    8493
    8594static inline void insert( copy_queue & this, request & elem ) with(this) {
     
    115124}
    116125
    117 static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0; }
     126static inline bool is_empty( copy_queue & this ) with(this) { return count == 0; }
    118127
    119128struct work_queue {
     
    176185    volatile unsigned long long stamp;
    177186    #ifdef ACTOR_STATS
    178     size_t stolen_from, try_steal, stolen, failed_swaps, msgs_stolen;
     187    size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen;
    179188    unsigned long long processed;
    180189    size_t gulps;
     
    189198    this.gulps = 0;                                 // number of gulps
    190199    this.failed_swaps = 0;                          // steal swap failures
     200    this.empty_stolen = 0;                          // queues empty after steal
    191201    this.msgs_stolen = 0;                           // number of messages stolen
    192202    #endif
     
    208218#ifdef ACTOR_STATS
    209219// aggregate counters for statistics
    210 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0,
     220size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, __total_empty_stolen = 0,
    211221    __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0;
    212222#endif
     
    233243        unsigned int nprocessors, nworkers, nrqueues;   // number of processors/threads/request queues
    234244        bool seperate_clus;                                                             // use same or separate cluster for executor
     245    volatile bool is_shutdown;                      // flag to communicate shutdown to worker threads
    235246}; // executor
    236247
     
    246257    __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST);
    247258    __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST);
     259    __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST);
    248260
    249261    // per worker steal stats (uncomment alongside the lock above this routine to print)
     
    272284    this.nrqueues = nrqueues;
    273285    this.seperate_clus = seperate_clus;
     286    this.is_shutdown = false;
    274287
    275288    if ( nworkers == nrqueues )
     
    320333
    321334static inline void ^?{}( executor & this ) with(this) {
    322     #ifdef __STEAL
    323     request sentinels[nrqueues];
    324     for ( unsigned int i = 0; i < nrqueues; i++ ) {
    325         insert( request_queues[i], sentinels[i] );              // force eventually termination
    326     } // for
    327     #else
    328     request sentinels[nworkers];
    329     unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
    330     for ( unsigned int i = 0, step = 0, range; i < nworkers; i += 1, step += range ) {
    331         range = reqPerWorker + ( i < extras ? 1 : 0 );
    332         insert( request_queues[step], sentinels[i] );           // force eventually termination
    333     } // for
    334     #endif
     335    is_shutdown = true;
    335336
    336337    for ( i; nworkers )
     
    363364    size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps;
    364365    printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses);
    365     printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\n",
    366         __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps);
     366    printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n",
     367        __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen);
    367368    size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen;
    368369    printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal);
     
    393394struct actor {
    394395    size_t ticket;                                          // executor-queue handle
    395     Allocation allocation_;                                         // allocation action
     396    allocation allocation_;                                         // allocation action
    396397    inline virtual_dtor;
    397398};
     
    400401    // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
    401402    // member must be called to end it
    402     verifyf( __actor_executor_, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );
     403    DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );
    403404    allocation_ = Nodelete;
    404405    ticket = __get_next_ticket( *__actor_executor_ );
     
    430431
    431432struct message {
    432     Allocation allocation_;                     // allocation action
     433    allocation allocation_;                     // allocation action
    433434    inline virtual_dtor;
    434435};
     
    437438    this.allocation_ = Nodelete;
    438439}
    439 static inline void ?{}( message & this, Allocation allocation ) {
    440     memcpy( &this.allocation_, &allocation, sizeof(allocation) ); // optimization to elide ctor
    441     verifyf( this.allocation_ != Finished, "The Finished Allocation status is not supported for message types.\n");
     440static inline void ?{}( message & this, allocation alloc ) {
     441    memcpy( &this.allocation_, &alloc, sizeof(allocation) ); // optimization to elide ctor
     442    DEBUG_ABORT( this.allocation_ == Finished, "The Finished allocation status is not supported for message types.\n" );
    442443}
    443444static inline void ^?{}( message & this ) with(this) {
     
    447448static inline void check_message( message & this ) {
    448449    switch ( this.allocation_ ) {                                               // analyze message status
    449         case Nodelete: CFA_DEBUG(this.allocation_ = Finished); break;
     450        case Nodelete: CFA_DEBUG( this.allocation_ = Finished ); break;
    450451        case Delete: delete( &this ); break;
    451         case Destroy: ^?{}(this); break;
     452        case Destroy: ^?{}( this ); break;
    452453        case Finished: break;
    453454    } // switch
    454455}
    455 static inline void set_allocation( message & this, Allocation state ) {
     456static inline void set_allocation( message & this, allocation state ) {
    456457    this.allocation_ = state;
    457458}
    458459
    459460static inline void deliver_request( request & this ) {
    460     this.receiver->allocation_ = this.fn( *this.receiver, *this.msg );
    461     check_message( *this.msg );
    462     check_actor( *this.receiver );
     461    DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
     462    this.base_receiver->allocation_ = this.fn( *this.receiver, *this.msg );
     463    check_message( *this.base_msg );
     464    check_actor( *this.base_receiver );
    463465}
    464466
     
    510512        curr_steal_queue = request_queues[ i + vic_start ];
    511513        // avoid empty queues and queues that are being operated on
    512         if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) )
     514        if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) )
    513515            continue;
    514516
     
    518520            executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count;
    519521            executor_->w_infos[id].stolen++;
     522            if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++;
    520523            // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);
    521524            // replaced_queue[swap_idx]++;
     
    557560}
    558561
     562#define CHECK_TERMINATION if ( unlikely( executor_->is_shutdown ) ) break Exit
    559563void main( worker & this ) with(this) {
    560564    // #ifdef ACTOR_STATS
     
    578582       
    579583        // check if queue is empty before trying to gulp it
    580         if ( isEmpty( *curr_work_queue->c_queue ) ) {
     584        if ( is_empty( *curr_work_queue->c_queue ) ) {
    581585            #ifdef __STEAL
    582586            empty_count++;
     
    591595        #endif // ACTOR_STATS
    592596        #ifdef __STEAL
    593         if ( isEmpty( *current_queue ) ) {
    594             if ( unlikely( no_steal ) ) continue;
     597        if ( is_empty( *current_queue ) ) {
     598            if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; }
    595599            empty_count++;
    596600            if ( empty_count < steal_threshold ) continue;
    597601            empty_count = 0;
     602
     603            CHECK_TERMINATION; // check for termination
    598604
    599605            __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED );
     
    607613        }
    608614        #endif // __STEAL
    609         while ( ! isEmpty( *current_queue ) ) {
     615        while ( ! is_empty( *current_queue ) ) {
    610616            #ifdef ACTOR_STATS
    611617            executor_->w_infos[id].processed++;
     
    613619            &req = &remove( *current_queue );
    614620            if ( !&req ) continue;
    615             if ( req.stop ) break Exit;
    616621            deliver_request( req );
    617622        }
     
    631636
    632637static inline void send( actor & this, request & req ) {
    633     verifyf( this.ticket != (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
     638    DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
    634639    send( *__actor_executor_, req, this.ticket );
    635640}
     
    641646    __all_gulps = 0;
    642647    __total_failed_swaps = 0;
     648    __total_empty_stolen = 0;
    643649    __all_processed = 0;
    644650    __num_actors_stats = 0;
     
    654660}
    655661
    656 // TODO: potentially revisit getting number of processors
    657 //  ( currently the value stored in active_cluster()->procs.total is often stale
    658 //  and doesn't reflect how many procs are allocated )
    659 // static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); }
    660 static inline void start_actor_system() { start_actor_system( 1 ); }
     662static inline void start_actor_system() { start_actor_system( get_proc_count( *active_cluster() ) ); }
    661663
    662664static inline void start_actor_system( executor & this ) {
     
    668670
    669671static inline void stop_actor_system() {
    670     park( ); // will receive signal when actor system is finished
     672    park( ); // will be unparked when actor system is finished
    671673
    672674    if ( !__actor_executor_passed ) delete( __actor_executor_ );
     
    680682// assigned at creation to __base_msg_finished to avoid unused message warning
    681683message __base_msg_finished @= { .allocation_ : Finished };
    682 struct __DeleteMsg { inline message; } DeleteMsg = __base_msg_finished;
    683 struct __DestroyMsg { inline message; } DestroyMsg = __base_msg_finished;
    684 struct __FinishedMsg { inline message; } FinishedMsg = __base_msg_finished;
    685 
    686 Allocation receive( actor & this, __DeleteMsg & msg ) { return Delete; }
    687 Allocation receive( actor & this, __DestroyMsg & msg ) { return Destroy; }
    688 Allocation receive( actor & this, __FinishedMsg & msg ) { return Finished; }
    689 
     684struct __delete_msg_t { inline message; } delete_msg = __base_msg_finished;
     685struct __destroy_msg_t { inline message; } destroy_msg = __base_msg_finished;
     686struct __finished_msg_t { inline message; } finished_msg = __base_msg_finished;
     687
     688allocation receive( actor & this, __delete_msg_t & msg ) { return Delete; }
     689allocation receive( actor & this, __destroy_msg_t & msg ) { return Destroy; }
     690allocation receive( actor & this, __finished_msg_t & msg ) { return Finished; }
     691
  • libcfa/src/concurrency/atomic.hfa

    rfa5e1aa5 rb7b3e41  
    1010// Created On       : Thu May 25 15:22:46 2023
    1111// Last Modified By : Peter A. Buhr
    12 // Last Modified On : Thu May 25 15:24:45 2023
    13 // Update Count     : 1
     12// Last Modified On : Wed Jun 14 07:48:57 2023
     13// Update Count     : 52
    1414//
    1515
    16 #define LOAD( lock ) (__atomic_load_n( &(lock), __ATOMIC_SEQ_CST ))
    17 #define LOADM( lock, memorder ) (__atomic_load_n( &(lock), memorder ))
    18 #define STORE( lock, assn ) (__atomic_store_n( &(lock), assn, __ATOMIC_SEQ_CST ))
    19 #define STOREM( lock, assn, memorder ) (__atomic_store_n( &(lock), assn, memorder ))
    20 #define CLR( lock ) (__atomic_clear( &(lock), __ATOMIC_RELEASE ))
    21 #define CLRM( lock, memorder ) (__atomic_clear( &(lock), memorder ))
    22 #define TAS( lock ) (__atomic_test_and_set( &(lock), __ATOMIC_ACQUIRE ))
    23 #define TASM( lock, memorder ) (__atomic_test_and_set( &(lock), memorder ))
    24 #define CAS( change, comp, assn ) ({typeof(comp) __temp = (comp); __atomic_compare_exchange_n( &(change), &(__temp), (assn), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ); })
    25 #define CASM( change, comp, assn, memorder... ) ({typeof(comp) * __temp = &(comp); __atomic_compare_exchange_n( &(change), &(__temp), (assn), false, memorder, memorder ); })
    26 #define CASV( change, comp, assn ) (__atomic_compare_exchange_n( &(change), &(comp), (assn), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ))
    27 #define CASVM( change, comp, assn, memorder... ) (__atomic_compare_exchange_n( &(change), &(comp), (assn), false, memorder, memorder ))
    28 #define FAS( change, assn ) (__atomic_exchange_n( &(change), (assn), __ATOMIC_SEQ_CST ))
    29 #define FASM( change, assn, memorder ) (__atomic_exchange_n( &(change), (assn), memorder ))
    30 #define FAI( change, Inc ) (__atomic_fetch_add( &(change), (Inc), __ATOMIC_SEQ_CST ))
    31 #define FAIM( change, Inc, memorder ) (__atomic_fetch_add( &(change), (Inc), memorder ))
     16#define LOAD( val ) (LOADM( val, __ATOMIC_SEQ_CST))
     17#define LOADM( val, memorder ) (__atomic_load_n( &(val), memorder))
     18
     19#define STORE( val, assn ) (STOREM( val, assn, __ATOMIC_SEQ_CST))
     20#define STOREM( val, assn, memorder ) (__atomic_store_n( &(val), assn, memorder))
     21
     22#define TAS( lock ) (TASM( lock, __ATOMIC_ACQUIRE))
     23#define TASM( lock, memorder ) (__atomic_test_and_set( &(lock), memorder))
     24
     25#define TASCLR( lock ) (TASCLRM( lock, __ATOMIC_RELEASE))
     26#define TASCLRM( lock, memorder ) (__atomic_clear( &(lock), memorder))
     27
     28#define FAS( assn, replace ) (FASM(assn, replace, __ATOMIC_SEQ_CST))
     29#define FASM( assn, replace, memorder ) (__atomic_exchange_n( &(assn), (replace), memorder))
     30
     31#define FAI( assn, Inc ) (__atomic_fetch_add( &(assn), (Inc), __ATOMIC_SEQ_CST))
     32#define FAIM( assn, Inc, memorder ) (__atomic_fetch_add( &(assn), (Inc), memorder))
     33
     34// Use __sync because __atomic with 128-bit CAA can result in calls to pthread_mutex_lock.
     35
     36// #define CAS( assn, comp, replace ) (CASM( assn, comp, replace, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
     37// #define CASM( assn, comp, replace, memorder... ) ({ typeof(comp) __temp = (comp); __atomic_compare_exchange_n( &(assn), &(__temp), (replace), false, memorder ); })
     38#define CAS( assn, comp, replace ) (__sync_bool_compare_and_swap( &assn, comp, replace))
     39#define CASM( assn, comp, replace, memorder... ) _Static_assert( false, "memory order unsupported for CAS macro" );
     40
     41// #define CASV( assn, comp, replace ) (__atomic_compare_exchange_n( &(assn), &(comp), (replace), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ))
     42// #define CASVM( assn, comp, replace, memorder... ) (__atomic_compare_exchange_n( &(assn), &(comp), (replace), false, memorder, memorder ))
     43#define CASV( assn, comp, replace ) ({ \
     44        typeof(comp) temp = comp; \
     45        typeof(comp) old = __sync_val_compare_and_swap( &(assn), (comp), (replace) ); \
     46        old == temp ? true : (comp = old, false); \
     47})
     48#define CASVM( assn, comp, replace, memorder... ) _Static_assert( false, "memory order unsupported for CASV macro" );
  • libcfa/src/concurrency/kernel.hfa

    rfa5e1aa5 rb7b3e41  
    195195        // Total number of processors
    196196        unsigned total;
     197
     198    // Number of processors constructed
     199    //  incremented at construction time, total is incremented once the processor asyncronously registers
     200        unsigned constructed;
    197201
    198202        // Total number of idle processors
     
    297301static inline struct cluster   * active_cluster  () { return publicTLS_get( this_processor )->cltr; }
    298302
     303// gets the number of constructed processors on the cluster
     304static inline unsigned get_proc_count( cluster & this ) { return this.procs.constructed; }
     305
    299306// set the number of internal processors
    300307// these processors are in addition to any explicitly declared processors
  • libcfa/src/concurrency/kernel/cluster.hfa

    rfa5e1aa5 rb7b3e41  
    4040
    4141// convert to log2 scale but using double
    42 static inline __readyQ_avg_t __to_readyQ_avg(unsigned long long intsc) { if(unlikely(0 == intsc)) return 0.0; else return log2(intsc); }
     42static inline __readyQ_avg_t __to_readyQ_avg(unsigned long long intsc) { if(unlikely(0 == intsc)) return 0.0; else return log2((__readyQ_avg_t)intsc); }
    4343
    4444#define warn_large_before warnf( !strict || old_avg < 35.0, "Suspiciously large previous average: %'lf, %'" PRId64 "ms \n", old_avg, program()`ms )
  • libcfa/src/concurrency/kernel/startup.cfa

    rfa5e1aa5 rb7b3e41  
    528528        this.name = name;
    529529        this.cltr = &_cltr;
     530    __atomic_add_fetch( &_cltr.procs.constructed, 1u, __ATOMIC_RELAXED );
    530531        this.rdq.its = 0;
    531532        this.rdq.itr = 0;
     
    595596        __cfadbg_print_safe(runtime_core, "Kernel : core %p signaling termination\n", &this);
    596597
     598    __atomic_sub_fetch( &this.cltr->procs.constructed, 1u, __ATOMIC_RELAXED );
     599
    597600        __atomic_store_n(&do_terminate, true, __ATOMIC_RELAXED);
    598601        __disable_interrupts_checked();
     
    615618        this.fdw   = 0p;
    616619        this.idle  = 0;
     620    this.constructed = 0;
    617621        this.total = 0;
    618622}
  • libcfa/src/concurrency/locks.hfa

    rfa5e1aa5 rb7b3e41  
    3232#include "select.hfa"
    3333
    34 #include <fstream.hfa>
    35 
    3634// futex headers
    3735#include <linux/futex.h>      /* Definition of FUTEX_* constants */
Note: See TracChangeset for help on using the changeset viewer.