Changeset 1e940de0 for libcfa


Ignore:
Timestamp:
Jun 14, 2023, 4:44:00 PM (13 months ago)
Author:
caparsons <caparson@…>
Branches:
master
Children:
8d6786b
Parents:
7e4bd9b6
Message:

cleanup/bugfix actors and fix virtual dtor bug

Location:
libcfa/src
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/actor.hfa

    r7e4bd9b6 r1e940de0  
    3030#define __DEFAULT_EXECUTOR_BUFSIZE__ 10
    3131
    32 #define __STEAL 0 // workstealing toggle. Disjoint from toggles above
     32#define __STEAL 1 // workstealing toggle. Disjoint from toggles above
    3333
    3434// workstealing heuristic selection (only set one to be 1)
     
    4848typedef allocation (*__receive_fn)(actor &, message &);
    4949struct request {
     50    actor * base_receiver;
    5051    actor * receiver;
     52    message * base_msg;
    5153    message * msg;
    5254    __receive_fn fn;
    53     bool stop;
     55    // bool stop; // commented from change to termination flag from sentinels C_TODO: remove after confirming no performance degradation
    5456};
    5557
    56 static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel
    57 static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {
     58struct a_msg {
     59    int m;
     60};
     61static inline void ?{}( request & this ) {
     62    // this.stop = true; // default ctor makes a sentinel
     63}
     64static inline void ?{}( request & this, actor * base_receiver, actor * receiver, message * base_msg, message * msg, __receive_fn fn ) {
     65    this.base_receiver = base_receiver;
    5866    this.receiver = receiver;
     67    this.base_msg = base_msg;
    5968    this.msg = msg;
    6069    this.fn = fn;
    61     this.stop = false;
     70    // this.stop = false;
    6271}
    6372static inline void ?{}( request & this, request & copy ) {
     
    6574    this.msg = copy.msg;
    6675    this.fn = copy.fn;
    67     this.stop = copy.stop;
     76    // this.stop = copy.stop;
    6877}
    6978
     
    8392    last_size = 0;
    8493}
    85 static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); }
    86 
    87 static inline void insert( copy_queue & this, request & elem ) with(this) {
     94static inline void ^?{}( copy_queue & this ) with(this) {
     95    DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" );
     96    adelete(buffer);
     97}
     98
     99static inline void insert( copy_queue & this, request & elem ) with(this) { // C_TODO: remove redundant send/insert once decision is made on emplace/copy
    88100    if ( count >= buffer_size ) { // increase arr size
    89101        last_size = buffer_size;
     
    117129}
    118130
    119 static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0; }
     131static inline bool is_empty( copy_queue & this ) with(this) { return count == 0; }
    120132
    121133struct work_queue {
     
    178190    volatile unsigned long long stamp;
    179191    #ifdef ACTOR_STATS
    180     size_t stolen_from, try_steal, stolen, failed_swaps, msgs_stolen;
     192    size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen;
    181193    unsigned long long processed;
    182194    size_t gulps;
     
    191203    this.gulps = 0;                                 // number of gulps
    192204    this.failed_swaps = 0;                          // steal swap failures
     205    this.empty_stolen = 0;                          // queues empty after steal
    193206    this.msgs_stolen = 0;                           // number of messages stolen
    194207    #endif
     
    210223#ifdef ACTOR_STATS
    211224// aggregate counters for statistics
    212 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0,
     225size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, __total_empty_stolen = 0,
    213226    __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0;
    214227#endif
     
    235248        unsigned int nprocessors, nworkers, nrqueues;   // number of processors/threads/request queues
    236249        bool seperate_clus;                                                             // use same or separate cluster for executor
     250    volatile bool is_shutdown;                      // flag to communicate shutdown to worker threads
    237251}; // executor
    238252
     
    248262    __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST);
    249263    __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST);
     264    __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST);
    250265
    251266    // per worker steal stats (uncomment alongside the lock above this routine to print)
     
    274289    this.nrqueues = nrqueues;
    275290    this.seperate_clus = seperate_clus;
     291    this.is_shutdown = false;
    276292
    277293    if ( nworkers == nrqueues )
     
    322338
    323339static inline void ^?{}( executor & this ) with(this) {
    324     #ifdef __STEAL
    325     request sentinels[nrqueues];
    326     for ( unsigned int i = 0; i < nrqueues; i++ ) {
    327         insert( request_queues[i], sentinels[i] );              // force eventually termination
    328     } // for
    329     #else
    330     request sentinels[nworkers];
    331     unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
    332     for ( unsigned int i = 0, step = 0, range; i < nworkers; i += 1, step += range ) {
    333         range = reqPerWorker + ( i < extras ? 1 : 0 );
    334         insert( request_queues[step], sentinels[i] );           // force eventually termination
    335     } // for
    336     #endif
     340    // #ifdef __STEAL // commented from change to termination flag from sentinels C_TODO: remove after confirming no performance degradation
     341    // request sentinels[nrqueues];
     342    // for ( unsigned int i = 0; i < nrqueues; i++ ) {
     343    //     insert( request_queues[i], sentinels[i] );           // force eventually termination
     344    // } // for
     345    // #else
     346    // request sentinels[nworkers];
     347    // unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
     348    // for ( unsigned int i = 0, step = 0, range; i < nworkers; i += 1, step += range ) {
     349    //     range = reqPerWorker + ( i < extras ? 1 : 0 );
     350    //     insert( request_queues[step], sentinels[i] );                // force eventually termination
     351    // } // for
     352    // #endif
     353    is_shutdown = true;
    337354
    338355    for ( i; nworkers )
     
    365382    size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps;
    366383    printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses);
    367     printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\n",
    368         __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps);
     384    printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n",
     385        __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen);
    369386    size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen;
    370387    printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal);
     
    449466static inline void check_message( message & this ) {
    450467    switch ( this.allocation_ ) {                                               // analyze message status
    451         case Nodelete: CFA_DEBUG(this.allocation_ = Finished); break;
     468        case Nodelete: CFA_DEBUG( this.allocation_ = Finished ); break;
    452469        case Delete: delete( &this ); break;
    453         case Destroy: ^?{}(this); break;
     470        case Destroy: ^?{}( this ); break;
    454471        case Finished: break;
    455472    } // switch
     
    461478static inline void deliver_request( request & this ) {
    462479    DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
    463     this.receiver->allocation_ = this.fn( *this.receiver, *this.msg );
    464     check_message( *this.msg );
    465     check_actor( *this.receiver );
     480    this.base_receiver->allocation_ = this.fn( *this.receiver, *this.msg );
     481    check_message( *this.base_msg );
     482    check_actor( *this.base_receiver );
    466483}
    467484
     
    513530        curr_steal_queue = request_queues[ i + vic_start ];
    514531        // avoid empty queues and queues that are being operated on
    515         if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) )
     532        if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) )
    516533            continue;
    517534
     
    521538            executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count;
    522539            executor_->w_infos[id].stolen++;
     540            if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++;
    523541            // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);
    524542            // replaced_queue[swap_idx]++;
     
    560578}
    561579
     580#define CHECK_TERMINATION if ( unlikely( executor_->is_shutdown ) ) break Exit
    562581void main( worker & this ) with(this) {
    563582    // #ifdef ACTOR_STATS
     
    581600       
    582601        // check if queue is empty before trying to gulp it
    583         if ( isEmpty( *curr_work_queue->c_queue ) ) {
     602        if ( is_empty( *curr_work_queue->c_queue ) ) {
    584603            #ifdef __STEAL
    585604            empty_count++;
     
    594613        #endif // ACTOR_STATS
    595614        #ifdef __STEAL
    596         if ( isEmpty( *current_queue ) ) {
    597             if ( unlikely( no_steal ) ) continue;
     615        if ( is_empty( *current_queue ) ) {
     616            if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; } // C_TODO: if this impacts static/dynamic perf refactor check
    598617            empty_count++;
    599618            if ( empty_count < steal_threshold ) continue;
    600619            empty_count = 0;
     620
     621            CHECK_TERMINATION; // check for termination
    601622
    602623            __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED );
     
    610631        }
    611632        #endif // __STEAL
    612         while ( ! isEmpty( *current_queue ) ) {
     633        while ( ! is_empty( *current_queue ) ) {
    613634            #ifdef ACTOR_STATS
    614635            executor_->w_infos[id].processed++;
     
    616637            &req = &remove( *current_queue );
    617638            if ( !&req ) continue;
    618             if ( req.stop ) break Exit;
     639            // if ( req.stop ) break Exit;
    619640            deliver_request( req );
    620641        }
     
    644665    __all_gulps = 0;
    645666    __total_failed_swaps = 0;
     667    __total_empty_stolen = 0;
    646668    __all_processed = 0;
    647669    __num_actors_stats = 0;
  • libcfa/src/virtual_dtor.hfa

    r7e4bd9b6 r1e940de0  
    2525    __virtual_obj_start = &this;
    2626}
    27 static inline void __CFA_dtor_shutdown( virtual_dtor & this ) with(this) {
     27static inline bool __CFA_dtor_shutdown( virtual_dtor & this ) with(this) {
     28    if ( __virtual_dtor_ptr == 1p ) return true; // stop base dtors from being called twice
    2829    if ( __virtual_dtor_ptr ) {
    2930        void (*dtor_ptr)(virtual_dtor &) = __virtual_dtor_ptr;
    3031        __virtual_dtor_ptr = 0p;
    31         dtor_ptr(*((virtual_dtor *)__virtual_obj_start)); // replace actor with base type
    32         return;
     32        dtor_ptr(*((virtual_dtor *)__virtual_obj_start)); // call most derived dtor
     33        __virtual_dtor_ptr = 1p; // stop base dtors from being called twice
     34        return true;
    3335    }
     36    return false;
    3437}
    3538static inline void __CFA_virt_free( virtual_dtor & this ) { free( this.__virtual_obj_start ); }
Note: See TracChangeset for help on using the changeset viewer.