Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/actor.hfa

    refdd18c rccf1d99  
    33#include <locks.hfa>
    44#include <limits.hfa>
     5#include <list.hfa>
    56#include <kernel.hfa>
    6 #include <iofwd.hfa>
    7 #include <virtual_dtor.hfa>
    87
    98#ifdef __CFA_DEBUG__
     
    2120// Define the default number of executor request-queues (mailboxes) written to by actors and serviced by the
    2221// actor-executor threads. Must be greater than 0.
    23 #define __DEFAULT_EXECUTOR_RQUEUES__ 4
     22#define __DEFAULT_EXECUTOR_RQUEUES__ 2
    2423
    2524// Define if executor is created in a separate cluster
    2625#define __DEFAULT_EXECUTOR_SEPCLUS__ false
    2726
    28 #define __DEFAULT_EXECUTOR_BUFSIZE__ 10
    29 
    30 #define __STEAL 0 // workstealing toggle. Disjoint from toggles above
    31 
    32 // workstealing heuristic selection (only set one to be 1)
    33 // #define RAND 0
    34 #define SEARCH 1
    35 
    36 // show stats
    37 // #define ACTOR_STATS
     27// when you flip this make sure to recompile compiler and flip the appropriate flag there too in Actors.cpp
     28#define __ALLOC 0
    3829
    3930// forward decls
    4031struct actor;
    4132struct message;
    42 struct executor;
    4333
    4434enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status
     
    5040    __receive_fn fn;
    5141    bool stop;
    52 };
     42    inline dlink(request);
     43};
     44P9_EMBEDDED( request, dlink(request) )
    5345
    5446static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel
     
    6658}
    6759
    68 // Vector-like data structure that supports O(1) queue operations with no bound on size
    69 // assumes gulping behaviour (once a remove occurs, removes happen until empty beforw next insert)
     60// hybrid data structure. Copies until buffer is full and then allocates for intrusive list
    7061struct copy_queue {
     62    dlist( request ) list;
     63    #if ! __ALLOC
    7164    request * buffer;
    72     size_t count, buffer_size, index, utilized, last_size;
     65    size_t count, buffer_size, index;
     66    #endif
    7367};
    7468static inline void ?{}( copy_queue & this ) {}
    7569static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) {
     70    list{};
     71    #if ! __ALLOC
    7672    buffer_size = buf_size;
    7773    buffer = aalloc( buffer_size );
    7874    count = 0;
    79     utilized = 0;
    8075    index = 0;
    81     last_size = 0;
    82 }
    83 static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); }
     76    #endif
     77}
     78static inline void ^?{}( copy_queue & this ) with(this) {
     79    #if ! __ALLOC
     80    adelete(buffer);
     81    #endif
     82}
    8483
    8584static inline void insert( copy_queue & this, request & elem ) with(this) {
    86     if ( count >= buffer_size ) { // increase arr size
    87         last_size = buffer_size;
    88         buffer_size = 2 * buffer_size;
    89         buffer = realloc( buffer, sizeof( request ) * buffer_size );
    90         /* paranoid */ verify( buffer );
     85    #if ! __ALLOC
     86    if ( count < buffer_size ) { // fast path ( no alloc )
     87        buffer[count]{ elem };
     88        count++;
     89        return;
    9190    }
    92     memcpy( &buffer[count], &elem, sizeof(request) );
    93     count++;
     91    request * new_elem = alloc();
     92    (*new_elem){ elem };
     93    insert_last( list, *new_elem );
     94    #else
     95    insert_last( list, elem );
     96    #endif
    9497}
    9598
    9699// once you start removing you need to remove all elements
    97 // it is not supported to call insert() before the array is fully empty
    98 static inline request & remove( copy_queue & this ) with(this) {
     100// it is not supported to call insert() before the list is fully empty
     101// should_delete is an output param
     102static inline request & remove( copy_queue & this, bool & should_delete ) with(this) {
     103    #if ! __ALLOC
    99104    if ( count > 0 ) {
    100105        count--;
     106        should_delete = false;
    101107        size_t old_idx = index;
    102108        index = count == 0 ? 0 : index + 1;
    103109        return buffer[old_idx];
    104110    }
    105     request * ret = 0p;
    106     return *0p;
    107 }
    108 
    109 // try to reclaim some memory if less than half of buffer is utilized
    110 static inline void reclaim( copy_queue & this ) with(this) {
    111     if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; }
    112     utilized = 0;
    113     buffer_size--;
    114     buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory
    115 }
    116 
    117 static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0; }
    118 
     111    #endif
     112    should_delete = true;
     113    return try_pop_front( list );
     114}
     115
     116static inline bool isEmpty( copy_queue & this ) with(this) {
     117    #if ! __ALLOC
     118    return count == 0 && list`isEmpty;
     119    #else
     120    return list`isEmpty;
     121    #endif
     122}
     123
     124static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global)
    119125struct work_queue {
    120126    __spinlock_t mutex_lock;
    121     copy_queue * owned_queue;       // copy queue allocated and cleaned up by this work_queue
    122     copy_queue * c_queue;           // current queue
    123     volatile bool being_processed;  // flag to prevent concurrent processing
    124     #ifdef ACTOR_STATS
    125     unsigned int id;
    126     size_t missed;                  // transfers skipped due to being_processed flag being up
    127     #endif
     127    copy_queue owned_queue;
     128    copy_queue * c_queue; // C_TODO: try putting this on the stack with ptr juggling
     129
    128130}; // work_queue
    129 static inline void ?{}( work_queue & this, size_t buf_size, unsigned int i ) with(this) {
    130     owned_queue = alloc();      // allocated separately to avoid false sharing
    131     (*owned_queue){ buf_size };
    132     c_queue = owned_queue;
    133     being_processed = false;
    134     #ifdef ACTOR_STATS
    135     id = i;
    136     missed = 0;
    137     #endif
    138 }
    139 
    140 // clean up copy_queue owned by this work_queue
    141 static inline void ^?{}( work_queue & this ) with(this) { delete( owned_queue ); }
     131static inline void ?{}( work_queue & this ) with(this) {
     132    // c_queue = alloc();
     133    // (*c_queue){ __buffer_size };
     134    owned_queue{ __buffer_size };
     135    c_queue = &owned_queue;
     136}
     137// static inline void ^?{}( work_queue & this ) with(this) { delete( c_queue ); }
    142138
    143139static inline void insert( work_queue & this, request & elem ) with(this) {
     
    149145static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) {
    150146    lock( mutex_lock __cfaabi_dbg_ctx2 );
    151     #ifdef __STEAL
    152 
    153     // check if queue is being processed elsewhere
    154     if ( unlikely( being_processed ) ) {
    155         #ifdef ACTOR_STATS
    156         missed++;
    157         #endif
    158         unlock( mutex_lock );
    159         return;
    160     }
    161 
    162     being_processed = c_queue->count != 0;
    163     #endif // __STEAL
    164 
    165     c_queue->utilized = c_queue->count;
    166 
    167147    // swap copy queue ptrs
    168148    copy_queue * temp = *transfer_to;
     
    172152} // transfer
    173153
    174 // needed since some info needs to persist past worker lifetimes
    175 struct worker_info {
    176     volatile unsigned long long stamp;
    177     #ifdef ACTOR_STATS
    178     size_t stolen_from, try_steal, stolen, failed_swaps, msgs_stolen;
    179     unsigned long long processed;
    180     size_t gulps;
    181     #endif
    182 };
    183 static inline void ?{}( worker_info & this ) {
    184     #ifdef ACTOR_STATS
    185     this.stolen_from = 0;
    186     this.try_steal = 0;                             // attempts to steal
    187     this.stolen = 0;                                // successful steals
    188     this.processed = 0;                             // requests processed
    189     this.gulps = 0;                                 // number of gulps
    190     this.failed_swaps = 0;                          // steal swap failures
    191     this.msgs_stolen = 0;                           // number of messages stolen
    192     #endif
    193     this.stamp = rdtscl();
    194 }
    195 
    196 // #ifdef ACTOR_STATS
    197 // unsigned int * stolen_arr;
    198 // unsigned int * replaced_queue;
    199 // #endif
    200154thread worker {
    201     work_queue ** request_queues;
     155    copy_queue owned_queue;
     156    work_queue * request_queues;
    202157    copy_queue * current_queue;
    203     executor * executor_;
     158        request & req;
    204159    unsigned int start, range;
    205     int id;
    206 };
    207 
    208 #ifdef ACTOR_STATS
    209 // aggregate counters for statistics
    210 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0,
    211     __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0;
    212 #endif
    213 static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, executor * executor_,
    214     unsigned int start, unsigned int range, int id ) {
     160};
     161
     162static inline void ?{}( worker & this, cluster & clu, work_queue * request_queues, unsigned int start, unsigned int range ) {
    215163    ((thread &)this){ clu };
    216     this.request_queues = request_queues;           // array of all queues
    217     this.current_queue = current_queue;             // currently gulped queue (start with empty queue to use in swap later)
    218     this.executor_ = executor_;                     // pointer to current executor
    219     this.start = start;                             // start of worker's subrange of request_queues
    220     this.range = range;                             // size of worker's subrange of request_queues
    221     this.id = id;                                   // worker's id and index in array of workers
    222 }
    223 
    224 static bool no_steal = false;
     164    this.request_queues = request_queues;
     165    // this.current_queue = alloc();
     166    // (*this.current_queue){ __buffer_size };
     167    this.owned_queue{ __buffer_size };
     168    this.current_queue = &this.owned_queue;
     169    this.start = start;
     170    this.range = range;
     171}
     172// static inline void ^?{}( worker & mutex this ) with(this) { delete( current_queue ); }
     173
    225174struct executor {
    226175    cluster * cluster;                                                      // if workers execute on separate cluster
    227176        processor ** processors;                                            // array of virtual processors adding parallelism for workers
    228         work_queue * request_queues;                                // master array of work request queues
    229     copy_queue * local_queues;                      // array of all worker local queues to avoid deletion race
    230         work_queue ** worker_req_queues;                // secondary array of work queues to allow for swapping
    231     worker ** workers;                                                          // array of workers executing work requests
    232     worker_info * w_infos;                          // array of info about each worker
     177        work_queue * request_queues;                                // master list of work request queues
     178        worker ** workers;                                                              // array of workers executing work requests
    233179        unsigned int nprocessors, nworkers, nrqueues;   // number of processors/threads/request queues
    234180        bool seperate_clus;                                                             // use same or separate cluster for executor
    235181}; // executor
    236182
    237 // #ifdef ACTOR_STATS
    238 // __spinlock_t out_lock;
    239 // #endif
    240 static inline void ^?{}( worker & mutex this ) with(this) {
    241     #ifdef ACTOR_STATS
    242     __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST);
    243     __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST);
    244     __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST);
    245     __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST);
    246     __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST);
    247     __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST);
    248 
    249     // per worker steal stats (uncomment alongside the lock above this routine to print)
    250     // lock( out_lock __cfaabi_dbg_ctx2 );
    251     // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) );
    252     // int count = 0;
    253     // int count2 = 0;
    254     // for ( i; range ) {
    255     //     if ( replaced_queue[start + i] > 0 ){
    256     //         count++;
    257     //         // printf("%d: %u, ",i, replaced_queue[i]);
    258     //     }
    259     //     if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0)
    260     //         count2++;
    261     // }
    262     // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers );
    263     // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers );
    264     // unlock( out_lock );
    265     #endif
    266 }
    267 
    268183static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) {
    269184    if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
     185    __buffer_size = buf_size;
    270186    this.nprocessors = nprocessors;
    271187    this.nworkers = nworkers;
     
    273189    this.seperate_clus = seperate_clus;
    274190
    275     if ( nworkers == nrqueues )
    276         no_steal = true;
    277    
    278     #ifdef ACTOR_STATS
    279     // stolen_arr = aalloc( nrqueues );
    280     // replaced_queue = aalloc( nrqueues );
    281     __total_workers = nworkers;
    282     #endif
    283 
    284191    if ( seperate_clus ) {
    285192        cluster = alloc();
     
    288195
    289196    request_queues = aalloc( nrqueues );
    290     worker_req_queues = aalloc( nrqueues );
    291     for ( i; nrqueues ) {
    292         request_queues[i]{ buf_size, i };
    293         worker_req_queues[i] = &request_queues[i];
    294     }
     197    for ( i; nrqueues )
     198        request_queues[i]{};
    295199   
    296200    processors = aalloc( nprocessors );
     
    298202        (*(processors[i] = alloc())){ *cluster };
    299203
    300     local_queues = aalloc( nworkers );
    301     workers = aalloc( nworkers );
    302     w_infos = aalloc( nworkers );
     204    workers = alloc( nworkers );
    303205    unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
    304 
    305     for ( i; nworkers ) {
    306         w_infos[i]{};
    307         local_queues[i]{ buf_size };
    308     }
    309 
    310206    for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
    311207        range = reqPerWorker + ( i < extras ? 1 : 0 );
    312         (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i };
     208        (*(workers[i] = alloc())){ *cluster, request_queues, start, range };
    313209    } // for
    314210}
    315 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __DEFAULT_EXECUTOR_BUFSIZE__ }; }
     211static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __buffer_size }; }
    316212static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; }
    317213static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; }
     
    320216
    321217static inline void ^?{}( executor & this ) with(this) {
    322     #ifdef __STEAL
    323     request sentinels[nrqueues];
    324     for ( unsigned int i = 0; i < nrqueues; i++ ) {
    325         insert( request_queues[i], sentinels[i] );              // force eventually termination
    326     } // for
    327     #else
    328218    request sentinels[nworkers];
    329     unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
    330     for ( unsigned int i = 0, step = 0, range; i < nworkers; i += 1, step += range ) {
    331         range = reqPerWorker + ( i < extras ? 1 : 0 );
     219    unsigned int reqPerWorker = nrqueues / nworkers;
     220    for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {
    332221        insert( request_queues[step], sentinels[i] );           // force eventually termination
    333222    } // for
    334     #endif
    335223
    336224    for ( i; nworkers )
     
    341229    } // for
    342230
    343     #ifdef ACTOR_STATS
    344     size_t misses = 0;
    345     for ( i; nrqueues ) {
    346         misses += worker_req_queues[i]->missed;
    347     }
    348     // adelete( stolen_arr );
    349     // adelete( replaced_queue );
    350     #endif
    351 
    352231    adelete( workers );
    353     adelete( w_infos );
    354     adelete( local_queues );
    355232    adelete( request_queues );
    356     adelete( worker_req_queues );
    357233    adelete( processors );
    358234    if ( seperate_clus ) delete( cluster );
    359 
    360     #ifdef ACTOR_STATS // print formatted stats
    361     printf("    Actor System Stats:\n");
    362     printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed);
    363     size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps;
    364     printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses);
    365     printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\n",
    366         __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps);
    367     size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen;
    368     printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal);
    369     #endif
    370        
    371235}
    372236
    373237// this is a static field of executor but have to forward decl for get_next_ticket
    374 static size_t __next_ticket = 0;
    375 
    376 static inline size_t __get_next_ticket( executor & this ) with(this) {
    377     #ifdef __CFA_DEBUG__
    378     size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
    379 
    380     // reserve MAX for dead actors
    381     if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
    382     return temp;
    383     #else
    384     return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues;
    385     #endif
     238static unsigned int __next_ticket = 0;
     239
     240static inline unsigned int get_next_ticket( executor & this ) with(this) {
     241    return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
    386242} // tickets
    387243
    388 // TODO: update globals in this file to be static fields once the static fields project is done
     244// C_TODO: update globals in this file to be static fields once the project is done
    389245static executor * __actor_executor_ = 0p;
    390 static bool __actor_executor_passed = false;            // was an executor passed to start_actor_system
    391 static size_t __num_actors_ = 0;                                        // number of actor objects in system
     246static bool __actor_executor_passed = false;        // was an executor passed to start_actor_system
     247static unsigned long int __num_actors_;                         // number of actor objects in system
    392248static struct thread$ * __actor_executor_thd = 0p;              // used to wake executor after actors finish
    393249struct actor {
    394     size_t ticket;                                          // executor-queue handle
    395     Allocation allocation_;                                         // allocation action
    396     inline virtual_dtor;
    397 };
    398 
    399 static inline void ?{}( actor & this ) with(this) {
     250    unsigned long int ticket;           // executor-queue handle to provide FIFO message execution
     251    Allocation allocation_;                     // allocation action
     252};
     253
     254static inline void ?{}( actor & this ) {
    400255    // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
    401256    // member must be called to end it
    402     verifyf( __actor_executor_, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );
    403     allocation_ = Nodelete;
    404     ticket = __get_next_ticket( *__actor_executor_ );
    405     __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED );
    406     #ifdef ACTOR_STATS
    407     __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST );
    408     #endif
    409 }
     257    verifyf( __actor_executor_, "Creating actor before calling start_actor_system()." );
     258    this.allocation_ = Nodelete;
     259    this.ticket = get_next_ticket( *__actor_executor_ );
     260    __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST );
     261}
     262static inline void ^?{}( actor & this ) {}
    410263
    411264static inline void check_actor( actor & this ) {
     
    423276        }
    424277
    425         if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated
     278        if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_SEQ_CST ) == 0 ) ) { // all actors have terminated
    426279            unpark( __actor_executor_thd );
    427280        }
     
    431284struct message {
    432285    Allocation allocation_;                     // allocation action
    433     inline virtual_dtor;
    434 };
    435 
    436 static inline void ?{}( message & this ) {
    437     this.allocation_ = Nodelete;
    438 }
    439 static inline void ?{}( message & this, Allocation allocation ) {
    440     memcpy( &this.allocation_, &allocation, sizeof(allocation) ); // optimization to elide ctor
    441     verifyf( this.allocation_ != Finished, "The Finished Allocation status is not supported for message types.\n");
    442 }
    443 static inline void ^?{}( message & this ) with(this) {
    444     CFA_DEBUG( if ( allocation_ == Nodelete ) printf("A message at location %p was allocated but never sent.\n", &this); )
    445 }
     286};
     287
     288static inline void ?{}( message & this ) { this.allocation_ = Nodelete; }
     289static inline void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; }
     290static inline void ^?{}( message & this ) {}
    446291
    447292static inline void check_message( message & this ) {
    448293    switch ( this.allocation_ ) {                                               // analyze message status
    449         case Nodelete: CFA_DEBUG(this.allocation_ = Finished); break;
     294        case Nodelete: break;
    450295        case Delete: delete( &this ); break;
    451296        case Destroy: ^?{}(this); break;
     
    453298    } // switch
    454299}
    455 static inline void set_allocation( message & this, Allocation state ) {
    456     this.allocation_ = state;
    457 }
    458300
    459301static inline void deliver_request( request & this ) {
    460     this.receiver->allocation_ = this.fn( *this.receiver, *this.msg );
     302    Allocation actor_allocation = this.fn( *this.receiver, *this.msg );
     303    this.receiver->allocation_ = actor_allocation;
     304    check_actor( *this.receiver );
    461305    check_message( *this.msg );
    462     check_actor( *this.receiver );
    463 }
    464 
    465 // tries to atomically swap two queues and returns 0p if the swap failed
    466 // returns ptr to newly owned queue if swap succeeds
    467 static inline work_queue * try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) {
    468     work_queue * my_queue = request_queues[my_idx];
    469     work_queue * other_queue = request_queues[victim_idx];
    470 
    471     // if either queue is 0p then they are in the process of being stolen
    472     if ( other_queue == 0p ) return 0p;
    473 
    474     // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false
    475     if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )
    476         return 0p;
    477 
    478     // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false
    479     if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
    480         /* paranoid */ verify( request_queues[my_idx] == 0p );
    481         request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val
    482         return 0p;
    483     }
    484 
    485     // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically
    486     request_queues[my_idx] = other_queue; // last write does not need to be atomic
    487     return other_queue;
    488 }
    489 
    490 // once a worker to steal from has been chosen, choose queue to steal from
    491 static inline void choose_queue( worker & this, unsigned int victim_id, unsigned int swap_idx ) with(this) {
    492     // have to calculate victim start and range since victim may be deleted before us in shutdown
    493     const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers;
    494     const unsigned int extras = executor_->nrqueues % executor_->nworkers;
    495     unsigned int vic_start, vic_range;
    496     if ( extras > victim_id  ) {
    497         vic_range = queues_per_worker + 1;
    498         vic_start = vic_range * victim_id;
    499     } else {
    500         vic_start = extras + victim_id * queues_per_worker;
    501         vic_range = queues_per_worker;
    502     }
    503     unsigned int start_idx = prng( vic_range );
    504 
    505     unsigned int tries = 0;
    506     work_queue * curr_steal_queue;
    507 
    508     for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) {
    509         tries++;
    510         curr_steal_queue = request_queues[ i + vic_start ];
    511         // avoid empty queues and queues that are being operated on
    512         if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) )
    513             continue;
    514 
    515         #ifdef ACTOR_STATS
    516         curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
    517         if ( curr_steal_queue ) {
    518             executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count;
    519             executor_->w_infos[id].stolen++;
    520             // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);
    521             // replaced_queue[swap_idx]++;
    522             // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED);
    523         } else {
    524             executor_->w_infos[id].failed_swaps++;
    525         }
    526         #else
    527         curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
    528         #endif // ACTOR_STATS
    529 
    530         return;
    531     }
    532 
    533     return;
    534 }
    535 
    536 // choose a worker to steal from
    537 static inline void steal_work( worker & this, unsigned int swap_idx ) with(this) {
    538     #if RAND
    539     unsigned int victim = prng( executor_->nworkers );
    540     if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers;
    541     choose_queue( this, victim, swap_idx );
    542     #elif SEARCH
    543     unsigned long long min = MAX; // smaller timestamp means longer since service
    544     int min_id = 0; // use ints not uints to avoid integer underflow without hacky math
    545     int n_workers = executor_->nworkers;
    546     unsigned long long curr_stamp;
    547     int scount = 1;
    548     for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) {
    549         curr_stamp = executor_->w_infos[i].stamp;
    550         if ( curr_stamp < min ) {
    551             min = curr_stamp;
    552             min_id = i;
    553         }
    554     }
    555     choose_queue( this, min_id, swap_idx );
    556     #endif
    557306}
    558307
    559308void main( worker & this ) with(this) {
    560     // #ifdef ACTOR_STATS
    561     // for ( i; executor_->nrqueues ) {
    562     //     replaced_queue[i] = 0;
    563     //     __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST );
    564     // }
    565     // #endif
    566 
    567     // threshold of empty queues we see before we go stealing
    568     const unsigned int steal_threshold = 2 * range;
    569 
    570     // Store variable data here instead of worker struct to avoid any potential false sharing
    571     unsigned int empty_count = 0;
    572     request & req;
    573     work_queue * curr_work_queue;
    574 
     309    bool should_delete;
    575310    Exit:
    576311    for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers
    577         curr_work_queue = request_queues[i + start];
    578        
    579         // check if queue is empty before trying to gulp it
    580         if ( isEmpty( *curr_work_queue->c_queue ) ) {
    581             #ifdef __STEAL
    582             empty_count++;
    583             if ( empty_count < steal_threshold ) continue;
    584             #else
    585             continue;
    586             #endif
    587         }
    588         transfer( *curr_work_queue, &current_queue );
    589         #ifdef ACTOR_STATS
    590         executor_->w_infos[id].gulps++;
    591         #endif // ACTOR_STATS
    592         #ifdef __STEAL
    593         if ( isEmpty( *current_queue ) ) {
    594             if ( unlikely( no_steal ) ) continue;
    595             empty_count++;
    596             if ( empty_count < steal_threshold ) continue;
    597             empty_count = 0;
    598 
    599             __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED );
    600            
    601             #ifdef ACTOR_STATS
    602             executor_->w_infos[id].try_steal++;
    603             #endif // ACTOR_STATS
    604            
    605             steal_work( this, start + prng( range ) );
    606             continue;
    607         }
    608         #endif // __STEAL
     312        // C_TODO: potentially check queue count instead of immediately trying to transfer
     313        transfer( request_queues[i + start], &current_queue );
    609314        while ( ! isEmpty( *current_queue ) ) {
    610             #ifdef ACTOR_STATS
    611             executor_->w_infos[id].processed++;
    612             #endif
    613             &req = &remove( *current_queue );
    614             if ( !&req ) continue;
     315            &req = &remove( *current_queue, should_delete );
     316            if ( !&req ) continue; // possibly add some work stealing/idle sleep here
    615317            if ( req.stop ) break Exit;
    616318            deliver_request( req );
    617         }
    618         #ifdef __STEAL
    619         curr_work_queue->being_processed = false; // set done processing
    620         empty_count = 0; // we found work so reset empty counter
    621         #endif
    622        
    623         // potentially reclaim some of the current queue's vector space if it is unused
    624         reclaim( *current_queue );
     319
     320            if ( should_delete ) delete( &req );
     321        } // while
    625322    } // for
    626323}
     
    631328
    632329static inline void send( actor & this, request & req ) {
    633     verifyf( this.ticket != (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
    634330    send( *__actor_executor_, req, this.ticket );
    635331}
    636332
    637 static inline void __reset_stats() {
    638     #ifdef ACTOR_STATS
    639     __total_tries = 0;
    640     __total_stolen = 0;
    641     __all_gulps = 0;
    642     __total_failed_swaps = 0;
    643     __all_processed = 0;
    644     __num_actors_stats = 0;
    645     __all_msgs_stolen = 0;
    646     #endif
    647 }
    648 
    649333static inline void start_actor_system( size_t num_thds ) {
    650     __reset_stats();
    651334    __actor_executor_thd = active_thread();
    652335    __actor_executor_ = alloc();
     
    654337}
    655338
    656 // TODO: potentially revisit getting number of processors
    657 //  ( currently the value stored in active_cluster()->procs.total is often stale
    658 //  and doesn't reflect how many procs are allocated )
    659 // static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); }
    660 static inline void start_actor_system() { start_actor_system( 1 ); }
     339static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); }
    661340
    662341static inline void start_actor_system( executor & this ) {
    663     __reset_stats();
    664342    __actor_executor_thd = active_thread();
    665343    __actor_executor_ = &this;
     
    676354    __actor_executor_passed = false;
    677355}
    678 
    679 // Default messages to send to any actor to change status
    680 // assigned at creation to __base_msg_finished to avoid unused message warning
    681 message __base_msg_finished @= { .allocation_ : Finished };
    682 struct __DeleteMsg { inline message; } DeleteMsg = __base_msg_finished;
    683 struct __DestroyMsg { inline message; } DestroyMsg = __base_msg_finished;
    684 struct __FinishedMsg { inline message; } FinishedMsg = __base_msg_finished;
    685 
    686 Allocation receive( actor & this, __DeleteMsg & msg ) { return Delete; }
    687 Allocation receive( actor & this, __DestroyMsg & msg ) { return Destroy; }
    688 Allocation receive( actor & this, __FinishedMsg & msg ) { return Finished; }
    689 
Note: See TracChangeset for help on using the changeset viewer.