Ignore:
Timestamp:
Mar 4, 2023, 1:35:11 PM (17 months ago)
Author:
caparson <caparson@…>
Branches:
ADT, ast-experimental, master
Children:
13f066d
Parents:
601bd9e
Message:

added some safety/productivity features and some stats

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/actor.hfa

    r601bd9e r1e38178  
    33#include <locks.hfa>
    44#include <limits.hfa>
    5 #include <list.hfa>
    65#include <kernel.hfa>
    7 #include <vector2.hfa>
     6#include <time_t.hfa>
     7#include <time.hfa>
     8#include <iofwd.hfa>
    89
    910#ifdef __CFA_DEBUG__
     
    2122// Define the default number of executor request-queues (mailboxes) written to by actors and serviced by the
    2223// actor-executor threads. Must be greater than 0.
    23 #define __DEFAULT_EXECUTOR_RQUEUES__ 2
     24#define __DEFAULT_EXECUTOR_RQUEUES__ 4
    2425
    2526// Define if executor is created in a separate cluster
    2627#define __DEFAULT_EXECUTOR_SEPCLUS__ false
    2728
    28 #define __STEAL 1 // workstealing toggle. Disjoint from toggles above
    29 
    30 // whether to steal work or to steal a queue Only applicable if __STEAL == 1
    31 #define __STEAL_WORK 0
    32 
    33 // heuristic selection (only set one to be 1)
    34 #define __RAND_QUEUE 1
    35 #define __RAND_WORKER 0
    36 
    37 // show stealing stats
    38 // #define __STEAL_STATS
     29#define __DEFAULT_EXECUTOR_BUFSIZE__ 10
     30
     31// #define __STEAL 0 // workstealing toggle. Disjoint from toggles above
     32
     33// workstealing heuristic selection (only set one to be 1)
     34// #define RAND 0
     35// #define SEARCH 0
     36
     37// show stats
     38// #define STATS
    3939
    4040// forward decls
    4141struct actor;
    4242struct message;
     43struct executor;
    4344
    4445enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status
     
    6667}
    6768
    68 // hybrid data structure. Copies until buffer is full and then allocates for intrusive list
     69// Vector-like data structure that supports O(1) queue operations with no bound on size
     70// assumes gulping behaviour (once a remove occurs, removes happen until empty beforw next insert)
    6971struct copy_queue {
    7072    request * buffer;
     
    8991        /* paranoid */ verify( buffer );
    9092    }
    91     buffer[count]{ elem }; // C_TODO: change to memcpy
    92     // memcpy( &buffer[count], &elem, sizeof(request) );
     93    memcpy( &buffer[count], &elem, sizeof(request) );
    9394    count++;
    9495}
    9596
    9697// once you start removing you need to remove all elements
    97 // it is not supported to call insert() before the list is fully empty
     98// it is not supported to call insert() before the array is fully empty
    9899static inline request & remove( copy_queue & this ) with(this) {
    99100    if ( count > 0 ) {
     
    107108}
    108109
    109 // try to reclaim some memory
     110// try to reclaim some memory if less than half of buffer is utilized
    110111static inline void reclaim( copy_queue & this ) with(this) {
    111112    if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; }
     
    117118static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0; }
    118119
    119 static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global)
    120120struct work_queue {
    121121    __spinlock_t mutex_lock;
    122     copy_queue owned_queue;
    123     copy_queue * c_queue;
    124     volatile bool being_processed;
     122    copy_queue * owned_queue;       // copy queue allocated and cleaned up by this work_queue
     123    copy_queue * c_queue;           // current queue
     124    volatile bool being_processed;  // flag to prevent concurrent processing
     125    #ifdef STATS
     126    unsigned int id;
     127    size_t missed;                  // transfers skipped due to being_processed flag being up
     128    #endif
    125129}; // work_queue
    126 static inline void ?{}( work_queue & this ) with(this) {
    127     owned_queue{ __buffer_size };
    128     c_queue = &owned_queue;
     130static inline void ?{}( work_queue & this, size_t buf_size, unsigned int i ) with(this) {
     131    owned_queue = alloc();      // allocated separately to avoid false sharing
     132    (*owned_queue){ buf_size };
     133    c_queue = owned_queue;
    129134    being_processed = false;
    130 }
     135    #ifdef STATS
     136    id = i;
     137    missed = 0;
     138    #endif
     139}
     140
     141// clean up copy_queue owned by this work_queue
     142static inline void ^?{}( work_queue & this ) with(this) { delete( owned_queue ); }
    131143
    132144static inline void insert( work_queue & this, request & elem ) with(this) {
     
    136148} // insert
    137149
    138 static inline void transfer( work_queue & this, copy_queue ** transfer_to, work_queue ** queue_arr, unsigned int idx ) with(this) {
     150static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) {
    139151    lock( mutex_lock __cfaabi_dbg_ctx2 );
    140     #if __STEAL
    141 
    142     #if __STEAL_WORK
    143     if (  unlikely( being_processed ) )
    144     #else
    145     // check if queue has been stolen out from under us between
    146     // transfer() call and lock acquire C_TODO: maybe just use new queue!
    147     if ( unlikely( being_processed || queue_arr[idx] != &this ) )
    148     #endif // __STEAL_WORK
    149     {
     152    #ifdef __STEAL
     153
     154    // check if queue is being processed elsewhere
     155    if ( unlikely( being_processed ) ) {
     156        #ifdef STATS
     157        missed++;
     158        #endif
    150159        unlock( mutex_lock );
    151160        return;
     
    164173} // transfer
    165174
     175// needed since some info needs to persist past worker lifetimes
     176struct worker_info {
     177    volatile unsigned long long stamp;
     178    #ifdef STATS
     179    size_t stolen_from;
     180    #endif
     181};
     182static inline void ?{}( worker_info & this ) {
     183    #ifdef STATS
     184    this.stolen_from = 0;
     185    #endif
     186    this.stamp = rdtscl();
     187}
     188
     189#ifdef STATS
     190unsigned int * stolen_arr;
     191unsigned int * replaced_queue;
     192#endif
    166193thread worker {
    167194    work_queue ** request_queues;
    168195    copy_queue * current_queue;
    169     worker ** worker_arr; // C_TODO: change n_workers, n_queues,worker_arr to just be pulled from ptr to executor
    170         request & req;
    171     unsigned int start, range, empty_count, n_workers, n_queues, id;
    172     #ifdef __STEAL_STATS
    173     unsigned int try_steal, stolen;
     196    executor * executor_;
     197    unsigned int start, range;
     198    int id;
     199    #ifdef STATS
     200    size_t try_steal, stolen, failed_swaps, msgs_stolen;
     201    unsigned long long processed;
     202    size_t gulps;
    174203    #endif
    175204};
    176205
    177 #ifdef __STEAL_STATS
    178 unsigned int total_tries = 0, total_stolen = 0, total_workers;
     206#ifdef STATS
     207// aggregate counters for statistics
     208size_t total_tries = 0, total_stolen = 0, total_workers, all_gulps = 0,
     209    total_failed_swaps = 0, all_processed = 0, __num_actors_stats = 0, all_msgs_stolen = 0;
    179210#endif
    180 static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, unsigned int start,
    181         unsigned int range, worker ** worker_arr, unsigned int n_workers, unsigned int n_queues, unsigned int id ) {
     211static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, executor * executor_,
     212    unsigned int start, unsigned int range, int id ) {
    182213    ((thread &)this){ clu };
    183     this.request_queues = request_queues;
    184     this.current_queue = current_queue;
    185     this.start = start;
    186     this.range = range;
    187     this.empty_count = 0;
    188     this.n_workers = n_workers;
    189     this.worker_arr = worker_arr;
    190     this.n_queues = n_queues;
    191     this.id = id;
    192     #ifdef __STEAL_STATS
    193     this.try_steal = 0;
    194     this.stolen = 0;
    195     total_workers = n_workers;
    196     #endif
    197 }
    198 static inline void ^?{}( worker & mutex this ) with(this) {
    199     // delete( current_queue );
    200     #ifdef __STEAL_STATS
    201     __atomic_add_fetch(&total_tries, try_steal, __ATOMIC_SEQ_CST);
    202     __atomic_add_fetch(&total_stolen, stolen, __ATOMIC_SEQ_CST);
    203     if (__atomic_sub_fetch(&total_workers, 1, __ATOMIC_SEQ_CST) == 0)
    204         printf("steal attempts: %u, steals: %u\n", total_tries, total_stolen);
    205     #endif
    206 }
    207 
     214    this.request_queues = request_queues;           // array of all queues
     215    this.current_queue = current_queue;             // currently gulped queue (start with empty queue to use in swap later)
     216    this.executor_ = executor_;                     // pointer to current executor
     217    this.start = start;                             // start of worker's subrange of request_queues
     218    this.range = range;                             // size of worker's subrange of request_queues
     219    this.id = id;                                   // worker's id and index in array of workers
     220    #ifdef STATS
     221    this.try_steal = 0;                             // attempts to steal
     222    this.stolen = 0;                                // successful steals
     223    this.processed = 0;                             // requests processed
     224    this.gulps = 0;                                 // number of gulps
     225    this.failed_swaps = 0;                          // steal swap failures
     226    this.msgs_stolen = 0;                           // number of messages stolen
     227    #endif
     228}
     229
     230static bool no_steal = false;
    208231struct executor {
    209232    cluster * cluster;                                                      // if workers execute on separate cluster
     
    213236        work_queue ** worker_req_queues;                // secondary array of work queues to allow for swapping
    214237    worker ** workers;                                                          // array of workers executing work requests
     238    worker_info * w_infos;                          // array of info about each worker
    215239        unsigned int nprocessors, nworkers, nrqueues;   // number of processors/threads/request queues
    216240        bool seperate_clus;                                                             // use same or separate cluster for executor
    217241}; // executor
    218242
     243// #ifdef STATS
     244// __spinlock_t out_lock;
     245// #endif
     246static inline void ^?{}( worker & mutex this ) with(this) {
     247    #ifdef STATS
     248    __atomic_add_fetch(&all_gulps, gulps,__ATOMIC_SEQ_CST);
     249    __atomic_add_fetch(&all_processed, processed,__ATOMIC_SEQ_CST);
     250    __atomic_add_fetch(&all_msgs_stolen, msgs_stolen,__ATOMIC_SEQ_CST);
     251    __atomic_add_fetch(&total_tries, try_steal, __ATOMIC_SEQ_CST);
     252    __atomic_add_fetch(&total_stolen, stolen, __ATOMIC_SEQ_CST);
     253    __atomic_add_fetch(&total_failed_swaps, failed_swaps, __ATOMIC_SEQ_CST);
     254
     255    // per worker steal stats (uncomment alongside the lock above this routine to print)
     256    // lock( out_lock __cfaabi_dbg_ctx2 );
     257    // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) );
     258    // int count = 0;
     259    // int count2 = 0;
     260    // for ( i; range ) {
     261    //     if ( replaced_queue[start + i] > 0 ){
     262    //         count++;
     263    //         // printf("%d: %u, ",i, replaced_queue[i]);
     264    //     }
     265    //     if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0)
     266    //         count2++;
     267    // }
     268    // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers );
     269    // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers );
     270    // unlock( out_lock );
     271    #endif
     272}
     273
    219274static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) {
    220275    if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
    221     __buffer_size = buf_size;
    222276    this.nprocessors = nprocessors;
    223277    this.nworkers = nworkers;
     
    225279    this.seperate_clus = seperate_clus;
    226280
     281    if ( nworkers == nrqueues )
     282        no_steal = true;
     283   
     284    #ifdef STATS
     285    stolen_arr = aalloc( nrqueues );
     286    replaced_queue = aalloc( nrqueues );
     287    total_workers = nworkers;
     288    #endif
     289
    227290    if ( seperate_clus ) {
    228291        cluster = alloc();
     
    233296    worker_req_queues = aalloc( nrqueues );
    234297    for ( i; nrqueues ) {
    235         request_queues[i]{};
     298        request_queues[i]{ buf_size, i };
    236299        worker_req_queues[i] = &request_queues[i];
    237300    }
     
    242305
    243306    local_queues = aalloc( nworkers );
    244     workers = alloc( nworkers );
     307    workers = aalloc( nworkers );
     308    w_infos = aalloc( nworkers );
    245309    unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
     310
     311    for ( i; nworkers ) {
     312        w_infos[i]{};
     313        local_queues[i]{ buf_size };
     314    }
     315
    246316    for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
    247         local_queues[i]{ buf_size };
    248317        range = reqPerWorker + ( i < extras ? 1 : 0 );
    249         (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], start, range, workers, nworkers, nrqueues, i };
     318        (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i };
    250319    } // for
    251320}
    252 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __buffer_size }; }
     321static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __DEFAULT_EXECUTOR_BUFSIZE__ }; }
    253322static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; }
    254323static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; }
     
    256325static inline void ?{}( executor & this ) { this{ __DEFAULT_EXECUTOR_PROCESSORS__ }; }
    257326
    258 // C_TODO: once stealing is implemented make sure shutdown still works
    259327static inline void ^?{}( executor & this ) with(this) {
    260     #if __STEAL
     328    #ifdef __STEAL
    261329    request sentinels[nrqueues];
    262330    for ( unsigned int i = 0; i < nrqueues; i++ ) {
     
    265333    #else
    266334    request sentinels[nworkers];
    267     unsigned int reqPerWorker = nrqueues / nworkers;
    268     for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {
     335    unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
     336    for ( unsigned int i = 0, step = 0, range; i < nworkers; i += 1, step += range ) {
     337        range = reqPerWorker + ( i < extras ? 1 : 0 );
    269338        insert( request_queues[step], sentinels[i] );           // force eventually termination
    270339    } // for
     
    278347    } // for
    279348
     349    #ifdef STATS
     350    size_t misses = 0;
     351    for ( i; nrqueues ) {
     352        misses += worker_req_queues[i]->missed;
     353    }
     354    adelete( stolen_arr );
     355    adelete( replaced_queue );
     356    #endif
     357
    280358    adelete( workers );
     359    adelete( w_infos );
    281360    adelete( local_queues );
    282361    adelete( request_queues );
     
    284363    adelete( processors );
    285364    if ( seperate_clus ) delete( cluster );
     365
     366    #ifdef STATS
     367    printf("    Actor System Stats:\n");
     368    printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, all_processed);
     369    printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", all_gulps, all_processed / all_gulps, misses);
     370    printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\n",
     371        total_tries, total_stolen, total_tries - total_stolen - total_failed_swaps, total_failed_swaps);
     372    printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", all_msgs_stolen, all_msgs_stolen/total_stolen);
     373    #endif
     374       
    286375}
    287376
    288377// this is a static field of executor but have to forward decl for get_next_ticket
    289 static unsigned int __next_ticket = 0;
    290 
    291 static inline unsigned int get_next_ticket( executor & this ) with(this) {
    292     return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
     378static unsigned long int __next_ticket = 0;
     379
     380static inline unsigned long int __get_next_ticket( executor & this ) with(this) {
     381    unsigned long int temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
     382
     383    // reserve MAX for dead actors
     384    if ( temp == MAX ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
     385    return temp;
    293386} // tickets
    294387
    295 // C_TODO: update globals in this file to be static fields once the project is done
     388// TODO: update globals in this file to be static fields once the static fields project is done
    296389static executor * __actor_executor_ = 0p;
    297 static bool __actor_executor_passed = false;        // was an executor passed to start_actor_system
    298 static unsigned long int __num_actors_;                         // number of actor objects in system
     390static bool __actor_executor_passed = false;            // was an executor passed to start_actor_system
     391static unsigned long int __num_actors_ = 0;                             // number of actor objects in system
    299392static struct thread$ * __actor_executor_thd = 0p;              // used to wake executor after actors finish
    300393struct actor {
    301     unsigned long int ticket;           // executor-queue handle to provide FIFO message execution
    302     Allocation allocation_;                     // allocation action
     394    unsigned long int ticket;                           // executor-queue handle
     395    Allocation allocation_;                                         // allocation action
    303396};
    304397
     
    306399    // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
    307400    // member must be called to end it
    308     verifyf( __actor_executor_, "Creating actor before calling start_actor_system()." );
     401    verifyf( __actor_executor_, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );
    309402    this.allocation_ = Nodelete;
    310     this.ticket = get_next_ticket( *__actor_executor_ );
     403    this.ticket = __get_next_ticket( *__actor_executor_ );
    311404    __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST );
     405    #ifdef STATS
     406    __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST );
     407    #endif
    312408}
    313409static inline void ^?{}( actor & this ) {}
     
    338434
    339435static inline void ?{}( message & this ) { this.allocation_ = Nodelete; }
    340 static inline void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; }
    341 static inline void ^?{}( message & this ) {}
     436static inline void ?{}( message & this, Allocation allocation ) {
     437    this.allocation_ = allocation;
     438    verifyf( this.allocation_ != Finished, "The Finished Allocation status is not supported for message types.\n");
     439}
     440static inline void ^?{}( message & this ) {
     441    CFA_DEBUG( if ( this.allocation_ == Nodelete ) printf("A message at location %p was allocated but never sent.\n", &this); )
     442}
    342443
    343444static inline void check_message( message & this ) {
    344445    switch ( this.allocation_ ) {                                               // analyze message status
    345         case Nodelete: break;
     446        case Nodelete: CFA_DEBUG( this.allocation_ = Finished; ) break;
    346447        case Delete: delete( &this ); break;
    347448        case Destroy: ^?{}(this); break;
     
    349450    } // switch
    350451}
     452static inline void set_allocation( message & this, Allocation state ) { this.allocation_ = state; }
    351453
    352454static inline void deliver_request( request & this ) {
     
    357459}
    358460
    359 // Couple of ways to approach work stealing
    360 // 1: completely worker agnostic, just find a big queue and steal it
    361 // 2: track some heuristic of worker's load and focus on that and then pick a queue from that worker
    362 //   worker heuristics:
    363 //     - how many queues have work?
    364 //     - size of largest queue
    365 //     - total # of messages
    366 //     - messages currently servicing
    367 //     - pick randomly
    368 //     - pick from closer threads/workers (this can be combined with others)
    369 
    370 // lock free or global lock for queue stealing
    371 #define __LOCK_SWP 0
    372 
    373 __spinlock_t swp_lock;
    374 
    375 // tries to atomically swap two queues and returns a bool indicating if the swap failed
    376 static inline bool try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) {
    377     #if __LOCK_SWP
    378 
    379     lock( swp_lock __cfaabi_dbg_ctx2 );
    380     work_queue * temp = request_queues[my_idx];
    381     request_queues[my_idx] = request_queues[victim_idx];
    382     request_queues[victim_idx] = temp;
    383     unlock( swp_lock );
    384    
    385     return true;
    386 
    387     #else // __LOCK_SWP else
     461// tries to atomically swap two queues and returns 0p if the swap failed
     462// returns ptr to newly owned queue if swap succeeds
     463static inline work_queue * try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) {
    388464    work_queue * my_queue = request_queues[my_idx];
    389465    work_queue * other_queue = request_queues[victim_idx];
    390     if ( other_queue == 0p || my_queue == 0p ) return false;
     466
     467    // if either queue is 0p then they are in the process of being stolen
     468    if ( other_queue == 0p ) return 0p;
    391469
    392470    // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false
    393471    if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )
    394         return false;
     472        return 0p;
    395473
    396474    // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false
     
    398476        /* paranoid */ verify( request_queues[my_idx] == 0p );
    399477        request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val
    400         return false;
     478        return 0p;
    401479    }
    402480
    403481    // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically
    404482    request_queues[my_idx] = other_queue; // last write does not need to be atomic
    405     return true;
    406 
    407     #endif // __LOCK_SWP
     483    return other_queue;
    408484}
    409485
    410486// once a worker to steal from has been chosen, choose queue to steal from
    411 static inline bool choose_queue( worker & this, unsigned int victim_id, unsigned int & last_idx ) with(this) {
    412     #if __RAND_QUEUE
    413     unsigned int tries = 0;
    414     const unsigned int start_idx = prng( n_queues );
    415     work_queue * curr_steal_queue;
    416 
    417     for ( unsigned int i = start_idx; tries < n_queues; i = (i + 1) % n_queues ) {
    418         tries++;
    419         curr_steal_queue = request_queues[i];
    420         #if __STEAL_WORK
    421 
    422         // avoid empty queues and queues that are being operated on
    423         if ( curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) )
    424             continue;
    425        
    426         // in this case we just return from transfer if this doesn't work
    427         transfer( *curr_steal_queue, &current_queue, request_queues, i );
    428         if ( isEmpty( *current_queue ) ) continue;
    429         last_idx = i;
    430 
    431         #ifdef __STEAL_STATS
    432         stolen++;
    433         #endif // __STEAL_STATS
    434 
    435         #else // __STEAL_WORK else
    436 
    437         // avoid empty queues and queues that are being operated on
    438         if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) )
    439             continue;
    440 
    441         #ifdef __STEAL_STATS
    442         bool success = try_swap_queues( this, i, last_idx );
    443         if ( success ) stolen++;
    444         #else
    445         try_swap_queues( this, i, last_idx );
    446         #endif // __STEAL_STATS
    447 
    448         // C_TODO: try transfer immediately
    449         // transfer( *request_queues[last_idx], &current_queue, request_queues, last_idx );
    450         // if ( isEmpty( *current_queue ) ) return false;
    451         return false;
    452 
    453         #endif // __STEAL_WORK
    454 
    455         return true;
    456     } // for
    457     return false;
    458 
    459     #elif __RAND_WORKER
    460 
     487static inline void choose_queue( worker & this, unsigned int victim_id, unsigned int swap_idx ) with(this) {
    461488    // have to calculate victim start and range since victim may be deleted before us in shutdown
    462     const unsigned int queues_per_worker = n_queues / n_workers;
    463     const unsigned int extras = n_queues % n_workers;
     489    const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers;
     490    const unsigned int extras = executor_->nrqueues % executor_->nworkers;
    464491    unsigned int vic_start, vic_range;
    465492    if ( extras > victim_id  ) {
     
    471498    }
    472499    unsigned int start_idx = prng( vic_range );
     500
    473501    unsigned int tries = 0;
    474502    work_queue * curr_steal_queue;
     
    481509            continue;
    482510
    483         try_swap_queues( this, i, last_idx );
    484 
    485         #ifdef __STEAL_STATS
    486         bool success = try_swap_queues( this, i, last_idx );
    487         if ( success ) stolen++;
     511        #ifdef STATS
     512        curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
     513        if ( curr_steal_queue ) {
     514            msgs_stolen += curr_steal_queue->c_queue->count;
     515            stolen++;
     516            __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);
     517            replaced_queue[swap_idx]++;
     518            __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED);
     519        } else {
     520            failed_swaps++;
     521        }
    488522        #else
    489         try_swap_queues( this, i, last_idx );
    490         #endif // __STEAL_STATS
    491 
    492         // C_TODO: try transfer immediately
    493         // transfer( *request_queues[last_idx], &current_queue, request_queues, last_idx );
    494         // if ( isEmpty( *current_queue ) ) return false;
    495         return false;
    496     }
    497     #endif
     523        curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
     524        #endif // STATS
     525
     526        return;
     527    }
     528
     529    return;
    498530}
    499531
    500532// choose a worker to steal from
    501 static inline bool choose_victim( worker & this, unsigned int & last_idx ) with(this) {
    502     #if __RAND_WORKER
    503     unsigned int victim = prng( n_workers );
    504     if ( victim == id ) victim = ( victim + 1 ) % n_workers;
    505     return choose_queue( this, victim, last_idx );
    506     #else
    507     return choose_queue( this, 0, last_idx );
    508     #endif
    509 }
    510 
    511 // look for work to steal
    512 // returns a bool: true => a queue was stolen, false => no work was stolen
    513 static inline bool steal_work( worker & this, unsigned int & last_idx ) with(this) { // C_TODO: add debug tracking of how many steals occur
    514     // to steal queue acquire both queue's locks in address ordering (maybe can do atomic swap)
    515     // maybe a flag to hint which queue is being processed
    516     // look at count to see if queue is worth stealing (dont steal empty queues)
    517     // if steal and then flag is up then dont process and just continue looking at own queues
    518     // (best effort approach) its ok if stealing isn't fruitful
    519     //          -> more important to not delay busy threads
    520 
    521     return choose_victim( this, last_idx );
     533static inline void steal_work( worker & this, unsigned int swap_idx ) with(this) {
     534    #if RAND
     535    unsigned int victim = prng( executor_->nworkers );
     536    if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers;
     537    choose_queue( this, victim, swap_idx );
     538    #elif SEARCH
     539    unsigned long long min = MAX; // smaller timestamp means longer since service
     540    int min_id = 0; // use ints not uints to avoid integer underflow without hacky math
     541    int n_workers = executor_->nworkers;
     542    unsigned long long curr_stamp;
     543    int scount = 1;
     544    for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) {
     545        curr_stamp = executor_->w_infos[i].stamp;
     546        if ( curr_stamp < min ) {
     547            min = curr_stamp;
     548            min_id = i;
     549        }
     550    }
     551    choose_queue( this, min_id, swap_idx );
     552    #endif
    522553}
    523554
    524555void main( worker & this ) with(this) {
     556    #ifdef STATS
     557    for ( i; executor_->nrqueues ) {
     558        replaced_queue[i] = 0;
     559        __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST );
     560    }
     561    #endif
     562
    525563    // threshold of empty queues we see before we go stealing
    526     const unsigned int steal_threshold = 2 * n_queues;
     564    const unsigned int steal_threshold = 2 * range;
     565
     566    // Store variable data here instead of worker struct to avoid any potential false sharing
     567    unsigned int empty_count = 0;
     568    request & req;
    527569    unsigned int curr_idx;
    528570    work_queue * curr_work_queue;
     571
    529572    Exit:
    530573    for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers
    531         // C_TODO: potentially check queue count instead of immediately trying to transfer
    532574        curr_idx = i + start;
    533575        curr_work_queue = request_queues[curr_idx];
    534         transfer( *curr_work_queue, &current_queue, request_queues, curr_idx );
    535         if ( isEmpty( *current_queue ) ) {
    536             #if __STEAL
     576       
     577        // check if queue is empty before trying to gulp it
     578        if ( isEmpty( *curr_work_queue->c_queue ) ) {
     579            #ifdef __STEAL
    537580            empty_count++;
    538581            if ( empty_count < steal_threshold ) continue;
    539             empty_count = 0; // C_TODO: look into stealing backoff schemes
    540             #ifdef __STEAL_STATS
     582            #else
     583            continue;
     584            #endif
     585        }
     586        transfer( *curr_work_queue, &current_queue );
     587        #ifdef STATS
     588        gulps++;
     589        #endif // STATS
     590        #ifdef __STEAL
     591        if ( isEmpty( *current_queue ) ) {
     592            if ( unlikely( no_steal ) ) continue;
     593            empty_count++;
     594            if ( empty_count < steal_threshold ) continue;
     595            empty_count = 0;
     596
     597            __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED );
     598           
     599            #ifdef STATS
    541600            try_steal++;
    542             #endif // __STEAL_STATS
    543 
    544             if ( ! steal_work( this, curr_idx ) ) continue;
    545 
    546             #else // __STEAL else
    547 
     601            #endif // STATS
     602           
     603            steal_work( this, start + prng( range ) );
    548604            continue;
    549            
    550             #endif // __STEAL
    551605        }
     606        #endif // __STEAL
    552607        while ( ! isEmpty( *current_queue ) ) {
     608            #ifdef STATS
     609            processed++;
     610            #endif
    553611            &req = &remove( *current_queue );
    554             if ( !&req ) continue; // possibly add some work stealing/idle sleep here
     612            if ( !&req ) continue;
    555613            if ( req.stop ) break Exit;
    556614            deliver_request( req );
    557615        }
    558         #if __STEAL
     616        #ifdef __STEAL
    559617        curr_work_queue->being_processed = false; // set done processing
     618        empty_count = 0; // we found work so reset empty counter
    560619        #endif
    561         empty_count = 0; // we found work so reset empty counter
     620       
     621        // potentially reclaim some of the current queue's vector space if it is unused
    562622        reclaim( *current_queue );
    563623    } // for
     
    569629
    570630static inline void send( actor & this, request & req ) {
     631    verifyf( this.ticket != (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
    571632    send( *__actor_executor_, req, this.ticket );
    572633}
     
    578639}
    579640
    580 static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); }
     641// TODO: potentially getting revisit number of processors
     642//  ( currently the value stored in active_cluster()->procs.total is often stale
     643//  and doesn't reflect how many procs are allocated )
     644// static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); }
     645static inline void start_actor_system() { start_actor_system( 1 ); }
    581646
    582647static inline void start_actor_system( executor & this ) {
     
    595660    __actor_executor_passed = false;
    596661}
     662
     663// Default messages to send to any actor to change status
     664// struct __DeleteMsg { inline message; } DeleteMsg;
     665// void ?{}( __DeleteMsg & this ) { ((message &) this){ Finished }; }
     666// struct __DestroyMsg { inline message; } DestroyMsg;
     667// void ?{}( __DestroyMsg & this ) { ((message &) this){ Finished }; }
     668// struct __FinishedMsg { inline message; } FinishedMsg;
     669// void ?{}( __FinishedMsg & this ) { ((message &) this){ Finished }; }
     670
     671// Allocation receive( actor & this, __DeleteMsg & msg ) { return Delete; }
     672// Allocation receive( actor & this, __DestroyMsg & msg ) { return Destroy; }
     673// Allocation receive( actor & this, __FinishedMsg & msg ) { return Finished; }
Note: See TracChangeset for help on using the changeset viewer.