Ignore:
Timestamp:
Apr 21, 2023, 5:36:12 PM (3 years ago)
Author:
JiadaL <j82liang@…>
Branches:
ADT, master
Children:
28f8f15, 6e4c44d
Parents:
2ed94a9 (diff), 699a97d (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/actor.hfa

    r2ed94a9 rb110bcc  
    33#include <locks.hfa>
    44#include <limits.hfa>
    5 #include <list.hfa>
    65#include <kernel.hfa>
     6#include <iofwd.hfa>
     7#include <virtual_dtor.hfa>
    78
    89#ifdef __CFA_DEBUG__
     
    2021// Define the default number of executor request-queues (mailboxes) written to by actors and serviced by the
    2122// actor-executor threads. Must be greater than 0.
    22 #define __DEFAULT_EXECUTOR_RQUEUES__ 2
     23#define __DEFAULT_EXECUTOR_RQUEUES__ 4
    2324
    2425// Define if executor is created in a separate cluster
    2526#define __DEFAULT_EXECUTOR_SEPCLUS__ false
    2627
    27 // when you flip this make sure to recompile compiler and flip the appropriate flag there too in Actors.cpp
    28 #define __ALLOC 0
     28#define __DEFAULT_EXECUTOR_BUFSIZE__ 10
     29
     30#define __STEAL 0 // workstealing toggle. Disjoint from toggles above
     31
     32// workstealing heuristic selection (only set one to be 1)
     33// #define RAND 0
     34#define SEARCH 1
     35
     36// show stats
     37// #define ACTOR_STATS
    2938
    3039// forward decls
    3140struct actor;
    3241struct message;
     42struct executor;
    3343
    3444enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status
     
    4050    __receive_fn fn;
    4151    bool stop;
    42     inline dlink(request);
    4352};
    44 P9_EMBEDDED( request, dlink(request) )
    4553
    4654static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel
     
    5866}
    5967
    60 // hybrid data structure. Copies until buffer is full and then allocates for intrusive list
     68// Vector-like data structure that supports O(1) queue operations with no bound on size
     69// assumes gulping behaviour (once a remove occurs, removes happen until empty beforw next insert)
    6170struct copy_queue {
    62     dlist( request ) list;
    63     #if ! __ALLOC
    6471    request * buffer;
    65     size_t count, buffer_size, index;
    66     #endif
     72    size_t count, buffer_size, index, utilized, last_size;
    6773};
    6874static inline void ?{}( copy_queue & this ) {}
    6975static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) {
    70     list{};
    71     #if ! __ALLOC
    7276    buffer_size = buf_size;
    7377    buffer = aalloc( buffer_size );
    7478    count = 0;
     79    utilized = 0;
    7580    index = 0;
    76     #endif
    77 }
    78 static inline void ^?{}( copy_queue & this ) with(this) {
    79     #if ! __ALLOC
    80     adelete(buffer);
    81     #endif
    82 }
     81    last_size = 0;
     82}
     83static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); }
    8384
    8485static inline void insert( copy_queue & this, request & elem ) with(this) {
    85     #if ! __ALLOC
    86     if ( count < buffer_size ) { // fast path ( no alloc )
    87         buffer[count]{ elem };
    88         count++;
    89         return;
    90     }
    91     request * new_elem = alloc();
    92     (*new_elem){ elem };
    93     insert_last( list, *new_elem );
    94     #else
    95     insert_last( list, elem );
    96     #endif
     86    if ( count >= buffer_size ) { // increase arr size
     87        last_size = buffer_size;
     88        buffer_size = 2 * buffer_size;
     89        buffer = realloc( buffer, sizeof( request ) * buffer_size );
     90        /* paranoid */ verify( buffer );
     91    }
     92    memcpy( &buffer[count], &elem, sizeof(request) );
     93    count++;
    9794}
    9895
    9996// once you start removing you need to remove all elements
    100 // it is not supported to call insert() before the list is fully empty
    101 // should_delete is an output param
    102 static inline request & remove( copy_queue & this, bool & should_delete ) with(this) {
    103     #if ! __ALLOC
     97// it is not supported to call insert() before the array is fully empty
     98static inline request & remove( copy_queue & this ) with(this) {
    10499    if ( count > 0 ) {
    105100        count--;
    106         should_delete = false;
    107101        size_t old_idx = index;
    108102        index = count == 0 ? 0 : index + 1;
    109103        return buffer[old_idx];
    110104    }
    111     #endif
    112     should_delete = true;
    113     return try_pop_front( list );
    114 }
    115 
    116 static inline bool isEmpty( copy_queue & this ) with(this) {
    117     #if ! __ALLOC
    118     return count == 0 && list`isEmpty;
    119     #else
    120     return list`isEmpty;
    121     #endif
    122 }
    123 
    124 static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global)
     105    request * ret = 0p;
     106    return *0p;
     107}
     108
     109// try to reclaim some memory if less than half of buffer is utilized
     110static inline void reclaim( copy_queue & this ) with(this) {
     111    if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; }
     112    utilized = 0;
     113    buffer_size--;
     114    buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory
     115}
     116
     117static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0; }
     118
    125119struct work_queue {
    126120    __spinlock_t mutex_lock;
    127     copy_queue owned_queue;
    128     copy_queue * c_queue; // C_TODO: try putting this on the stack with ptr juggling
    129 
     121    copy_queue * owned_queue;       // copy queue allocated and cleaned up by this work_queue
     122    copy_queue * c_queue;           // current queue
     123    volatile bool being_processed;  // flag to prevent concurrent processing
     124    #ifdef ACTOR_STATS
     125    unsigned int id;
     126    size_t missed;                  // transfers skipped due to being_processed flag being up
     127    #endif
    130128}; // work_queue
    131 static inline void ?{}( work_queue & this ) with(this) {
    132     // c_queue = alloc();
    133     // (*c_queue){ __buffer_size };
    134     owned_queue{ __buffer_size };
    135     c_queue = &owned_queue;
    136 }
    137 // static inline void ^?{}( work_queue & this ) with(this) { delete( c_queue ); }
     129static inline void ?{}( work_queue & this, size_t buf_size, unsigned int i ) with(this) {
     130    owned_queue = alloc();      // allocated separately to avoid false sharing
     131    (*owned_queue){ buf_size };
     132    c_queue = owned_queue;
     133    being_processed = false;
     134    #ifdef ACTOR_STATS
     135    id = i;
     136    missed = 0;
     137    #endif
     138}
     139
     140// clean up copy_queue owned by this work_queue
     141static inline void ^?{}( work_queue & this ) with(this) { delete( owned_queue ); }
    138142
    139143static inline void insert( work_queue & this, request & elem ) with(this) {
     
    145149static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) {
    146150    lock( mutex_lock __cfaabi_dbg_ctx2 );
     151    #ifdef __STEAL
     152
     153    // check if queue is being processed elsewhere
     154    if ( unlikely( being_processed ) ) {
     155        #ifdef ACTOR_STATS
     156        missed++;
     157        #endif
     158        unlock( mutex_lock );
     159        return;
     160    }
     161
     162    being_processed = c_queue->count != 0;
     163    #endif // __STEAL
     164
     165    c_queue->utilized = c_queue->count;
     166
    147167    // swap copy queue ptrs
    148168    copy_queue * temp = *transfer_to;
     
    152172} // transfer
    153173
     174// needed since some info needs to persist past worker lifetimes
     175struct worker_info {
     176    volatile unsigned long long stamp;
     177    #ifdef ACTOR_STATS
     178    size_t stolen_from, try_steal, stolen, failed_swaps, msgs_stolen;
     179    unsigned long long processed;
     180    size_t gulps;
     181    #endif
     182};
     183static inline void ?{}( worker_info & this ) {
     184    #ifdef ACTOR_STATS
     185    this.stolen_from = 0;
     186    this.try_steal = 0;                             // attempts to steal
     187    this.stolen = 0;                                // successful steals
     188    this.processed = 0;                             // requests processed
     189    this.gulps = 0;                                 // number of gulps
     190    this.failed_swaps = 0;                          // steal swap failures
     191    this.msgs_stolen = 0;                           // number of messages stolen
     192    #endif
     193    this.stamp = rdtscl();
     194}
     195
     196// #ifdef ACTOR_STATS
     197// unsigned int * stolen_arr;
     198// unsigned int * replaced_queue;
     199// #endif
    154200thread worker {
    155     copy_queue owned_queue;
    156     work_queue * request_queues;
     201    work_queue ** request_queues;
    157202    copy_queue * current_queue;
    158         request & req;
     203    executor * executor_;
    159204    unsigned int start, range;
     205    int id;
    160206};
    161207
    162 static inline void ?{}( worker & this, cluster & clu, work_queue * request_queues, unsigned int start, unsigned int range ) {
     208#ifdef ACTOR_STATS
     209// aggregate counters for statistics
     210size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0,
     211    __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0;
     212#endif
     213static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, executor * executor_,
     214    unsigned int start, unsigned int range, int id ) {
    163215    ((thread &)this){ clu };
    164     this.request_queues = request_queues;
    165     // this.current_queue = alloc();
    166     // (*this.current_queue){ __buffer_size };
    167     this.owned_queue{ __buffer_size };
    168     this.current_queue = &this.owned_queue;
    169     this.start = start;
    170     this.range = range;
    171 }
    172 // static inline void ^?{}( worker & mutex this ) with(this) { delete( current_queue ); }
    173 
     216    this.request_queues = request_queues;           // array of all queues
     217    this.current_queue = current_queue;             // currently gulped queue (start with empty queue to use in swap later)
     218    this.executor_ = executor_;                     // pointer to current executor
     219    this.start = start;                             // start of worker's subrange of request_queues
     220    this.range = range;                             // size of worker's subrange of request_queues
     221    this.id = id;                                   // worker's id and index in array of workers
     222}
     223
     224static bool no_steal = false;
    174225struct executor {
    175226    cluster * cluster;                                                      // if workers execute on separate cluster
    176227        processor ** processors;                                            // array of virtual processors adding parallelism for workers
    177         work_queue * request_queues;                                // master list of work request queues
    178         worker ** workers;                                                              // array of workers executing work requests
     228        work_queue * request_queues;                                // master array of work request queues
     229    copy_queue * local_queues;                      // array of all worker local queues to avoid deletion race
     230        work_queue ** worker_req_queues;                // secondary array of work queues to allow for swapping
     231    worker ** workers;                                                          // array of workers executing work requests
     232    worker_info * w_infos;                          // array of info about each worker
    179233        unsigned int nprocessors, nworkers, nrqueues;   // number of processors/threads/request queues
    180234        bool seperate_clus;                                                             // use same or separate cluster for executor
    181235}; // executor
    182236
     237// #ifdef ACTOR_STATS
     238// __spinlock_t out_lock;
     239// #endif
     240static inline void ^?{}( worker & mutex this ) with(this) {
     241    #ifdef ACTOR_STATS
     242    __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST);
     243    __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST);
     244    __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST);
     245    __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST);
     246    __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST);
     247    __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST);
     248
     249    // per worker steal stats (uncomment alongside the lock above this routine to print)
     250    // lock( out_lock __cfaabi_dbg_ctx2 );
     251    // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) );
     252    // int count = 0;
     253    // int count2 = 0;
     254    // for ( i; range ) {
     255    //     if ( replaced_queue[start + i] > 0 ){
     256    //         count++;
     257    //         // printf("%d: %u, ",i, replaced_queue[i]);
     258    //     }
     259    //     if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0)
     260    //         count2++;
     261    // }
     262    // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers );
     263    // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers );
     264    // unlock( out_lock );
     265    #endif
     266}
     267
    183268static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) {
    184269    if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
    185     __buffer_size = buf_size;
    186270    this.nprocessors = nprocessors;
    187271    this.nworkers = nworkers;
     
    189273    this.seperate_clus = seperate_clus;
    190274
     275    if ( nworkers == nrqueues )
     276        no_steal = true;
     277   
     278    #ifdef ACTOR_STATS
     279    // stolen_arr = aalloc( nrqueues );
     280    // replaced_queue = aalloc( nrqueues );
     281    __total_workers = nworkers;
     282    #endif
     283
    191284    if ( seperate_clus ) {
    192285        cluster = alloc();
     
    195288
    196289    request_queues = aalloc( nrqueues );
    197     for ( i; nrqueues )
    198         request_queues[i]{};
     290    worker_req_queues = aalloc( nrqueues );
     291    for ( i; nrqueues ) {
     292        request_queues[i]{ buf_size, i };
     293        worker_req_queues[i] = &request_queues[i];
     294    }
    199295   
    200296    processors = aalloc( nprocessors );
     
    202298        (*(processors[i] = alloc())){ *cluster };
    203299
    204     workers = alloc( nworkers );
     300    local_queues = aalloc( nworkers );
     301    workers = aalloc( nworkers );
     302    w_infos = aalloc( nworkers );
    205303    unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
     304
     305    for ( i; nworkers ) {
     306        w_infos[i]{};
     307        local_queues[i]{ buf_size };
     308    }
     309
    206310    for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
    207311        range = reqPerWorker + ( i < extras ? 1 : 0 );
    208         (*(workers[i] = alloc())){ *cluster, request_queues, start, range };
     312        (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i };
    209313    } // for
    210314}
    211 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __buffer_size }; }
     315static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __DEFAULT_EXECUTOR_BUFSIZE__ }; }
    212316static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; }
    213317static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; }
     
    216320
    217321static inline void ^?{}( executor & this ) with(this) {
     322    #ifdef __STEAL
     323    request sentinels[nrqueues];
     324    for ( unsigned int i = 0; i < nrqueues; i++ ) {
     325        insert( request_queues[i], sentinels[i] );              // force eventually termination
     326    } // for
     327    #else
    218328    request sentinels[nworkers];
    219     unsigned int reqPerWorker = nrqueues / nworkers;
    220     for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {
     329    unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
     330    for ( unsigned int i = 0, step = 0, range; i < nworkers; i += 1, step += range ) {
     331        range = reqPerWorker + ( i < extras ? 1 : 0 );
    221332        insert( request_queues[step], sentinels[i] );           // force eventually termination
    222333    } // for
     334    #endif
    223335
    224336    for ( i; nworkers )
     
    229341    } // for
    230342
     343    #ifdef ACTOR_STATS
     344    size_t misses = 0;
     345    for ( i; nrqueues ) {
     346        misses += worker_req_queues[i]->missed;
     347    }
     348    // adelete( stolen_arr );
     349    // adelete( replaced_queue );
     350    #endif
     351
    231352    adelete( workers );
     353    adelete( w_infos );
     354    adelete( local_queues );
    232355    adelete( request_queues );
     356    adelete( worker_req_queues );
    233357    adelete( processors );
    234358    if ( seperate_clus ) delete( cluster );
     359
     360    #ifdef ACTOR_STATS // print formatted stats
     361    printf("    Actor System Stats:\n");
     362    printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed);
     363    size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps;
     364    printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses);
     365    printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\n",
     366        __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps);
     367    size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen;
     368    printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal);
     369    #endif
     370       
    235371}
    236372
    237373// this is a static field of executor but have to forward decl for get_next_ticket
    238 static unsigned int __next_ticket = 0;
    239 
    240 static inline unsigned int get_next_ticket( executor & this ) with(this) {
    241     return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
     374static size_t __next_ticket = 0;
     375
     376static inline size_t __get_next_ticket( executor & this ) with(this) {
     377    #ifdef __CFA_DEBUG__
     378    size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
     379
     380    // reserve MAX for dead actors
     381    if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
     382    return temp;
     383    #else
     384    return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues;
     385    #endif
    242386} // tickets
    243387
    244 // C_TODO: update globals in this file to be static fields once the project is done
     388// TODO: update globals in this file to be static fields once the static fields project is done
    245389static executor * __actor_executor_ = 0p;
    246 static bool __actor_executor_passed = false;        // was an executor passed to start_actor_system
    247 static unsigned long int __num_actors_;                         // number of actor objects in system
     390static bool __actor_executor_passed = false;            // was an executor passed to start_actor_system
     391static size_t __num_actors_ = 0;                                        // number of actor objects in system
    248392static struct thread$ * __actor_executor_thd = 0p;              // used to wake executor after actors finish
    249393struct actor {
    250     unsigned long int ticket;           // executor-queue handle to provide FIFO message execution
    251     Allocation allocation_;                     // allocation action
     394    size_t ticket;                                          // executor-queue handle
     395    Allocation allocation_;                                         // allocation action
     396    inline virtual_dtor;
    252397};
    253398
    254 static inline void ?{}( actor & this ) {
     399static inline void ?{}( actor & this ) with(this) {
    255400    // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
    256401    // member must be called to end it
    257     verifyf( __actor_executor_, "Creating actor before calling start_actor_system()." );
    258     this.allocation_ = Nodelete;
    259     this.ticket = get_next_ticket( *__actor_executor_ );
    260     __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST );
    261 }
    262 static inline void ^?{}( actor & this ) {}
     402    verifyf( __actor_executor_, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );
     403    allocation_ = Nodelete;
     404    ticket = __get_next_ticket( *__actor_executor_ );
     405    __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED );
     406    #ifdef ACTOR_STATS
     407    __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST );
     408    #endif
     409}
    263410
    264411static inline void check_actor( actor & this ) {
     
    276423        }
    277424
    278         if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_SEQ_CST ) == 0 ) ) { // all actors have terminated
     425        if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated
    279426            unpark( __actor_executor_thd );
    280427        }
     
    284431struct message {
    285432    Allocation allocation_;                     // allocation action
     433    inline virtual_dtor;
    286434};
    287435
    288 static inline void ?{}( message & this ) { this.allocation_ = Nodelete; }
    289 static inline void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; }
    290 static inline void ^?{}( message & this ) {}
     436static inline void ?{}( message & this ) {
     437    this.allocation_ = Nodelete;
     438}
     439static inline void ?{}( message & this, Allocation allocation ) {
     440    memcpy( &this.allocation_, &allocation, sizeof(allocation) ); // optimization to elide ctor
     441    verifyf( this.allocation_ != Finished, "The Finished Allocation status is not supported for message types.\n");
     442}
     443static inline void ^?{}( message & this ) with(this) {
     444    CFA_DEBUG( if ( allocation_ == Nodelete ) printf("A message at location %p was allocated but never sent.\n", &this); )
     445}
    291446
    292447static inline void check_message( message & this ) {
    293448    switch ( this.allocation_ ) {                                               // analyze message status
    294         case Nodelete: break;
     449        case Nodelete: CFA_DEBUG(this.allocation_ = Finished); break;
    295450        case Delete: delete( &this ); break;
    296451        case Destroy: ^?{}(this); break;
     
    298453    } // switch
    299454}
     455static inline void set_allocation( message & this, Allocation state ) {
     456    this.allocation_ = state;
     457}
    300458
    301459static inline void deliver_request( request & this ) {
    302     Allocation actor_allocation = this.fn( *this.receiver, *this.msg );
    303     this.receiver->allocation_ = actor_allocation;
     460    this.receiver->allocation_ = this.fn( *this.receiver, *this.msg );
     461    check_message( *this.msg );
    304462    check_actor( *this.receiver );
    305     check_message( *this.msg );
     463}
     464
     465// tries to atomically swap two queues and returns 0p if the swap failed
     466// returns ptr to newly owned queue if swap succeeds
     467static inline work_queue * try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) {
     468    work_queue * my_queue = request_queues[my_idx];
     469    work_queue * other_queue = request_queues[victim_idx];
     470
     471    // if either queue is 0p then they are in the process of being stolen
     472    if ( other_queue == 0p ) return 0p;
     473
     474    // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false
     475    if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )
     476        return 0p;
     477
     478    // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false
     479    if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
     480        /* paranoid */ verify( request_queues[my_idx] == 0p );
     481        request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val
     482        return 0p;
     483    }
     484
     485    // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically
     486    request_queues[my_idx] = other_queue; // last write does not need to be atomic
     487    return other_queue;
     488}
     489
     490// once a worker to steal from has been chosen, choose queue to steal from
     491static inline void choose_queue( worker & this, unsigned int victim_id, unsigned int swap_idx ) with(this) {
     492    // have to calculate victim start and range since victim may be deleted before us in shutdown
     493    const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers;
     494    const unsigned int extras = executor_->nrqueues % executor_->nworkers;
     495    unsigned int vic_start, vic_range;
     496    if ( extras > victim_id  ) {
     497        vic_range = queues_per_worker + 1;
     498        vic_start = vic_range * victim_id;
     499    } else {
     500        vic_start = extras + victim_id * queues_per_worker;
     501        vic_range = queues_per_worker;
     502    }
     503    unsigned int start_idx = prng( vic_range );
     504
     505    unsigned int tries = 0;
     506    work_queue * curr_steal_queue;
     507
     508    for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) {
     509        tries++;
     510        curr_steal_queue = request_queues[ i + vic_start ];
     511        // avoid empty queues and queues that are being operated on
     512        if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) )
     513            continue;
     514
     515        #ifdef ACTOR_STATS
     516        curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
     517        if ( curr_steal_queue ) {
     518            executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count;
     519            executor_->w_infos[id].stolen++;
     520            // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);
     521            // replaced_queue[swap_idx]++;
     522            // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED);
     523        } else {
     524            executor_->w_infos[id].failed_swaps++;
     525        }
     526        #else
     527        curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
     528        #endif // ACTOR_STATS
     529
     530        return;
     531    }
     532
     533    return;
     534}
     535
     536// choose a worker to steal from
     537static inline void steal_work( worker & this, unsigned int swap_idx ) with(this) {
     538    #if RAND
     539    unsigned int victim = prng( executor_->nworkers );
     540    if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers;
     541    choose_queue( this, victim, swap_idx );
     542    #elif SEARCH
     543    unsigned long long min = MAX; // smaller timestamp means longer since service
     544    int min_id = 0; // use ints not uints to avoid integer underflow without hacky math
     545    int n_workers = executor_->nworkers;
     546    unsigned long long curr_stamp;
     547    int scount = 1;
     548    for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) {
     549        curr_stamp = executor_->w_infos[i].stamp;
     550        if ( curr_stamp < min ) {
     551            min = curr_stamp;
     552            min_id = i;
     553        }
     554    }
     555    choose_queue( this, min_id, swap_idx );
     556    #endif
    306557}
    307558
    308559void main( worker & this ) with(this) {
    309     bool should_delete;
     560    // #ifdef ACTOR_STATS
     561    // for ( i; executor_->nrqueues ) {
     562    //     replaced_queue[i] = 0;
     563    //     __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST );
     564    // }
     565    // #endif
     566
     567    // threshold of empty queues we see before we go stealing
     568    const unsigned int steal_threshold = 2 * range;
     569
     570    // Store variable data here instead of worker struct to avoid any potential false sharing
     571    unsigned int empty_count = 0;
     572    request & req;
     573    work_queue * curr_work_queue;
     574
    310575    Exit:
    311576    for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers
    312         // C_TODO: potentially check queue count instead of immediately trying to transfer
    313         transfer( request_queues[i + start], &current_queue );
     577        curr_work_queue = request_queues[i + start];
     578       
     579        // check if queue is empty before trying to gulp it
     580        if ( isEmpty( *curr_work_queue->c_queue ) ) {
     581            #ifdef __STEAL
     582            empty_count++;
     583            if ( empty_count < steal_threshold ) continue;
     584            #else
     585            continue;
     586            #endif
     587        }
     588        transfer( *curr_work_queue, &current_queue );
     589        #ifdef ACTOR_STATS
     590        executor_->w_infos[id].gulps++;
     591        #endif // ACTOR_STATS
     592        #ifdef __STEAL
     593        if ( isEmpty( *current_queue ) ) {
     594            if ( unlikely( no_steal ) ) continue;
     595            empty_count++;
     596            if ( empty_count < steal_threshold ) continue;
     597            empty_count = 0;
     598
     599            __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED );
     600           
     601            #ifdef ACTOR_STATS
     602            executor_->w_infos[id].try_steal++;
     603            #endif // ACTOR_STATS
     604           
     605            steal_work( this, start + prng( range ) );
     606            continue;
     607        }
     608        #endif // __STEAL
    314609        while ( ! isEmpty( *current_queue ) ) {
    315             &req = &remove( *current_queue, should_delete );
    316             if ( !&req ) continue; // possibly add some work stealing/idle sleep here
     610            #ifdef ACTOR_STATS
     611            executor_->w_infos[id].processed++;
     612            #endif
     613            &req = &remove( *current_queue );
     614            if ( !&req ) continue;
    317615            if ( req.stop ) break Exit;
    318616            deliver_request( req );
    319 
    320             if ( should_delete ) delete( &req );
    321         } // while
     617        }
     618        #ifdef __STEAL
     619        curr_work_queue->being_processed = false; // set done processing
     620        empty_count = 0; // we found work so reset empty counter
     621        #endif
     622       
     623        // potentially reclaim some of the current queue's vector space if it is unused
     624        reclaim( *current_queue );
    322625    } // for
    323626}
     
    328631
    329632static inline void send( actor & this, request & req ) {
     633    verifyf( this.ticket != (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
    330634    send( *__actor_executor_, req, this.ticket );
    331635}
    332636
     637static inline void __reset_stats() {
     638    #ifdef ACTOR_STATS
     639    __total_tries = 0;
     640    __total_stolen = 0;
     641    __all_gulps = 0;
     642    __total_failed_swaps = 0;
     643    __all_processed = 0;
     644    __num_actors_stats = 0;
     645    __all_msgs_stolen = 0;
     646    #endif
     647}
     648
    333649static inline void start_actor_system( size_t num_thds ) {
     650    __reset_stats();
    334651    __actor_executor_thd = active_thread();
    335652    __actor_executor_ = alloc();
     
    337654}
    338655
    339 static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); }
     656// TODO: potentially revisit getting number of processors
     657//  ( currently the value stored in active_cluster()->procs.total is often stale
     658//  and doesn't reflect how many procs are allocated )
     659// static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); }
     660static inline void start_actor_system() { start_actor_system( 1 ); }
    340661
    341662static inline void start_actor_system( executor & this ) {
     663    __reset_stats();
    342664    __actor_executor_thd = active_thread();
    343665    __actor_executor_ = &this;
     
    354676    __actor_executor_passed = false;
    355677}
     678
     679// Default messages to send to any actor to change status
     680// assigned at creation to __base_msg_finished to avoid unused message warning
     681message __base_msg_finished @= { .allocation_ : Finished };
     682struct __DeleteMsg { inline message; } DeleteMsg = __base_msg_finished;
     683struct __DestroyMsg { inline message; } DestroyMsg = __base_msg_finished;
     684struct __FinishedMsg { inline message; } FinishedMsg = __base_msg_finished;
     685
     686Allocation receive( actor & this, __DeleteMsg & msg ) { return Delete; }
     687Allocation receive( actor & this, __DestroyMsg & msg ) { return Destroy; }
     688Allocation receive( actor & this, __FinishedMsg & msg ) { return Finished; }
     689
Note: See TracChangeset for help on using the changeset viewer.