Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/actor.hfa

    r508671e r9235192c  
    3939// #define ACTOR_STATS
    4040
     41// used to run and only track missed queue gulps
     42#ifdef ACTOR_STATS
     43#define ACTOR_STATS_QUEUE_MISSED
     44#endif
     45
    4146// forward decls
    4247struct actor;
     
    4853typedef allocation (*__receive_fn)(actor &, message &, actor **, message **);
    4954struct request {
    50     actor * receiver;
    51     message * msg;
    52     __receive_fn fn;
     55        actor * receiver;
     56        message * msg;
     57        __receive_fn fn;
    5358};
    5459
    5560struct a_msg {
    56     int m;
     61        int m;
    5762};
    5863static inline void ?{}( request & this ) {}
    5964static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {
    60     this.receiver = receiver;
    61     this.msg = msg;
    62     this.fn = fn;
     65        this.receiver = receiver;
     66        this.msg = msg;
     67        this.fn = fn;
    6368}
    6469static inline void ?{}( request & this, request & copy ) {
    65     this.receiver = copy.receiver;
    66     this.msg = copy.msg;
    67     this.fn = copy.fn;
     70        this.receiver = copy.receiver;
     71        this.msg = copy.msg;
     72        this.fn = copy.fn;
    6873}
    6974
     
    7176// assumes gulping behaviour (once a remove occurs, removes happen until empty beforw next insert)
    7277struct copy_queue {
    73     request * buffer;
    74     size_t count, buffer_size, index, utilized, last_size;
     78        request * buffer;
     79        size_t count, buffer_size, index, utilized, last_size;
    7580};
    7681static inline void ?{}( copy_queue & this ) {}
    7782static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) {
    78     buffer_size = buf_size;
    79     buffer = aalloc( buffer_size );
    80     count = 0;
    81     utilized = 0;
    82     index = 0;
    83     last_size = 0;
     83        buffer_size = buf_size;
     84        buffer = aalloc( buffer_size );
     85        count = 0;
     86        utilized = 0;
     87        index = 0;
     88        last_size = 0;
    8489}
    8590static inline void ^?{}( copy_queue & this ) with(this) {
    86     DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" );
    87     adelete(buffer);
     91        DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" );
     92        adelete(buffer);
    8893}
    8994
    9095static inline void insert( copy_queue & this, request & elem ) with(this) {
    91     if ( count >= buffer_size ) { // increase arr size
    92         last_size = buffer_size;
    93         buffer_size = 2 * buffer_size;
    94         buffer = realloc( buffer, sizeof( request ) * buffer_size );
    95         /* paranoid */ verify( buffer );
    96     }
    97     memcpy( &buffer[count], &elem, sizeof(request) );
    98     count++;
     96        if ( count >= buffer_size ) { // increase arr size
     97                last_size = buffer_size;
     98                buffer_size = 2 * buffer_size;
     99                buffer = realloc( buffer, sizeof( request ) * buffer_size );
     100                /* paranoid */ verify( buffer );
     101        }
     102        memcpy( &buffer[count], &elem, sizeof(request) );
     103        count++;
    99104}
    100105
     
    102107// it is not supported to call insert() before the array is fully empty
    103108static inline request & remove( copy_queue & this ) with(this) {
    104     if ( count > 0 ) {
    105         count--;
    106         size_t old_idx = index;
    107         index = count == 0 ? 0 : index + 1;
    108         return buffer[old_idx];
    109     }
    110     request * ret = 0p;
    111     return *0p;
     109        if ( count > 0 ) {
     110                count--;
     111                size_t old_idx = index;
     112                index = count == 0 ? 0 : index + 1;
     113                return buffer[old_idx];
     114        }
     115        request * ret = 0p;
     116        return *0p;
    112117}
    113118
    114119// try to reclaim some memory if less than half of buffer is utilized
    115120static inline void reclaim( copy_queue & this ) with(this) {
    116     if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; }
    117     utilized = 0;
    118     buffer_size--;
    119     buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory
     121        if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; }
     122        utilized = 0;
     123        buffer_size--;
     124        buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory
    120125}
    121126
     
    123128
    124129struct work_queue {
    125     __spinlock_t mutex_lock;
    126     copy_queue * owned_queue;       // copy queue allocated and cleaned up by this work_queue
    127     copy_queue * c_queue;           // current queue
    128     volatile bool being_processed;  // flag to prevent concurrent processing
    129     #ifdef ACTOR_STATS
    130     unsigned int id;
    131     size_t missed;                  // transfers skipped due to being_processed flag being up
     130        __spinlock_t mutex_lock;
     131        copy_queue * owned_queue;                                                       // copy queue allocated and cleaned up by this work_queue
     132        copy_queue * c_queue;                                                           // current queue
     133        volatile bool being_processed;                                          // flag to prevent concurrent processing
     134        #ifdef ACTOR_STATS
     135        unsigned int id;
    132136    #endif
     137    #ifdef ACTOR_STATS_QUEUE_MISSED
     138        size_t missed;                                                                          // transfers skipped due to being_processed flag being up
     139        #endif
    133140}; // work_queue
    134141static inline void ?{}( work_queue & this, size_t buf_size, unsigned int i ) with(this) {
    135     owned_queue = alloc();      // allocated separately to avoid false sharing
    136     (*owned_queue){ buf_size };
    137     c_queue = owned_queue;
    138     being_processed = false;
    139     #ifdef ACTOR_STATS
    140     id = i;
    141     missed = 0;
    142     #endif
     142        owned_queue = alloc();                                                          // allocated separately to avoid false sharing
     143        (*owned_queue){ buf_size };
     144        c_queue = owned_queue;
     145        being_processed = false;
     146        #ifdef ACTOR_STATS
     147        id = i;
     148        missed = 0;
     149        #endif
    143150}
    144151
     
    147154
    148155static inline void insert( work_queue & this, request & elem ) with(this) {
    149     lock( mutex_lock __cfaabi_dbg_ctx2 );
    150     insert( *c_queue, elem );
    151     unlock( mutex_lock );
     156        lock( mutex_lock __cfaabi_dbg_ctx2 );
     157        insert( *c_queue, elem );
     158        unlock( mutex_lock );
    152159} // insert
    153160
    154161static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) {
    155     lock( mutex_lock __cfaabi_dbg_ctx2 );
    156     #ifdef __STEAL
    157 
    158     // check if queue is being processed elsewhere
    159     if ( unlikely( being_processed ) ) {
    160         #ifdef ACTOR_STATS
    161         missed++;
    162         #endif
    163         unlock( mutex_lock );
    164         return;
    165     }
    166 
    167     being_processed = c_queue->count != 0;
    168     #endif // __STEAL
    169 
    170     c_queue->utilized = c_queue->count;
    171 
    172     // swap copy queue ptrs
    173     copy_queue * temp = *transfer_to;
    174     *transfer_to = c_queue;
    175     c_queue = temp;
    176     unlock( mutex_lock );
     162        lock( mutex_lock __cfaabi_dbg_ctx2 );
     163        #ifdef __STEAL
     164
     165        // check if queue is being processed elsewhere
     166        if ( unlikely( being_processed ) ) {
     167                #ifdef ACTOR_STATS
     168                missed++;
     169                #endif
     170                unlock( mutex_lock );
     171                return;
     172        }
     173
     174        being_processed = c_queue->count != 0;
     175        #endif // __STEAL
     176
     177        c_queue->utilized = c_queue->count;
     178
     179        // swap copy queue ptrs
     180        copy_queue * temp = *transfer_to;
     181        *transfer_to = c_queue;
     182        c_queue = temp;
     183        unlock( mutex_lock );
    177184} // transfer
    178185
    179186// needed since some info needs to persist past worker lifetimes
    180187struct worker_info {
    181     volatile unsigned long long stamp;
    182     #ifdef ACTOR_STATS
    183     size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen;
    184     unsigned long long processed;
    185     size_t gulps;
    186     #endif
     188        volatile unsigned long long stamp;
     189        #ifdef ACTOR_STATS
     190        size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen;
     191        unsigned long long processed;
     192        size_t gulps;
     193        #endif
    187194};
    188195static inline void ?{}( worker_info & this ) {
    189     #ifdef ACTOR_STATS
    190     this.stolen_from = 0;
    191     this.try_steal = 0;                             // attempts to steal
    192     this.stolen = 0;                                // successful steals
    193     this.processed = 0;                             // requests processed
    194     this.gulps = 0;                                 // number of gulps
    195     this.failed_swaps = 0;                          // steal swap failures
    196     this.empty_stolen = 0;                          // queues empty after steal
    197     this.msgs_stolen = 0;                           // number of messages stolen
    198     #endif
    199     this.stamp = rdtscl();
     196        #ifdef ACTOR_STATS
     197        this.stolen_from = 0;
     198        this.try_steal = 0;                                                                     // attempts to steal
     199        this.stolen = 0;                                                                        // successful steals
     200        this.processed = 0;                                                                     // requests processed
     201        this.gulps = 0;                                                                         // number of gulps
     202        this.failed_swaps = 0;                                                          // steal swap failures
     203        this.empty_stolen = 0;                                                          // queues empty after steal
     204        this.msgs_stolen = 0;                                                           // number of messages stolen
     205        #endif
     206        this.stamp = rdtscl();
    200207}
    201208
     
    205212// #endif
    206213thread worker {
    207     work_queue ** request_queues;
    208     copy_queue * current_queue;
    209     executor * executor_;
    210     unsigned int start, range;
    211     int id;
     214        work_queue ** request_queues;
     215        copy_queue * current_queue;
     216        executor * executor_;
     217        unsigned int start, range;
     218        int id;
    212219};
    213220
     
    215222// aggregate counters for statistics
    216223size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, __total_empty_stolen = 0,
    217     __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0;
     224        __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0;
    218225#endif
    219226static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, executor * executor_,
    220     unsigned int start, unsigned int range, int id ) {
    221     ((thread &)this){ clu };
    222     this.request_queues = request_queues;           // array of all queues
    223     this.current_queue = current_queue;             // currently gulped queue (start with empty queue to use in swap later)
    224     this.executor_ = executor_;                     // pointer to current executor
    225     this.start = start;                             // start of worker's subrange of request_queues
    226     this.range = range;                             // size of worker's subrange of request_queues
    227     this.id = id;                                   // worker's id and index in array of workers
     227        unsigned int start, unsigned int range, int id ) {
     228        ((thread &)this){ clu };
     229        this.request_queues = request_queues;                           // array of all queues
     230        this.current_queue = current_queue;                                     // currently gulped queue (start with empty queue to use in swap later)
     231        this.executor_ = executor_;                                                     // pointer to current executor
     232        this.start = start;                                                                     // start of worker's subrange of request_queues
     233        this.range = range;                                                                     // size of worker's subrange of request_queues
     234        this.id = id;                                                                           // worker's id and index in array of workers
    228235}
    229236
    230237static bool no_steal = false;
    231238struct executor {
    232     cluster * cluster;                                                      // if workers execute on separate cluster
    233         processor ** processors;                                            // array of virtual processors adding parallelism for workers
    234         work_queue * request_queues;                                // master array of work request queues
    235     copy_queue * local_queues;                      // array of all worker local queues to avoid deletion race
    236         work_queue ** worker_req_queues;                // secondary array of work queues to allow for swapping
    237     worker ** workers;                                                          // array of workers executing work requests
    238     worker_info * w_infos;                          // array of info about each worker
    239         unsigned int nprocessors, nworkers, nrqueues;   // number of processors/threads/request queues
    240         bool seperate_clus;                                                             // use same or separate cluster for executor
    241     volatile bool is_shutdown;                      // flag to communicate shutdown to worker threads
     239        cluster * cluster;                                                                      // if workers execute on separate cluster
     240        processor ** processors;                                                        // array of virtual processors adding parallelism for workers
     241        work_queue * request_queues;                                            // master array of work request queues
     242        copy_queue * local_queues;                                                      // array of all worker local queues to avoid deletion race
     243        work_queue ** worker_req_queues;                                        // secondary array of work queues to allow for swapping
     244        worker ** workers;                                                                      // array of workers executing work requests
     245        worker_info * w_infos;                                                          // array of info about each worker
     246        unsigned int nprocessors, nworkers, nrqueues;           // number of processors/threads/request queues
     247        bool seperate_clus;                                                                     // use same or separate cluster for executor
     248        volatile bool is_shutdown;                                                      // flag to communicate shutdown to worker threads
    242249}; // executor
    243250
     
    246253// #endif
    247254static inline void ^?{}( worker & mutex this ) with(this) {
    248     #ifdef ACTOR_STATS
    249     __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST);
    250     __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST);
    251     __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST);
    252     __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST);
    253     __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST);
    254     __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST);
    255     __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST);
    256 
    257     // per worker steal stats (uncomment alongside the lock above this routine to print)
    258     // lock( out_lock __cfaabi_dbg_ctx2 );
    259     // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) );
    260     // int count = 0;
    261     // int count2 = 0;
    262     // for ( i; range ) {
    263     //    if ( replaced_queue[start + i] > 0 ){
    264     //        count++;
    265     //        // printf("%d: %u, ",i, replaced_queue[i]);
    266     //    }
    267     //    if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0)
    268     //        count2++;
    269     // }
    270     // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers );
    271     // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers );
    272     // unlock( out_lock );
    273     #endif
     255        #ifdef ACTOR_STATS
     256        __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST);
     257        __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST);
     258        __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST);
     259        __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST);
     260        __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST);
     261        __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST);
     262        __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST);
     263
     264        // per worker steal stats (uncomment alongside the lock above this routine to print)
     265        // lock( out_lock __cfaabi_dbg_ctx2 );
     266        // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) );
     267        // int count = 0;
     268        // int count2 = 0;
     269        // for ( i; range ) {
     270        //      if ( replaced_queue[start + i] > 0 ){
     271        //              count++;
     272        //              // printf("%d: %u, ",i, replaced_queue[i]);
     273        //      }
     274        //      if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0)
     275        //              count2++;
     276        // }
     277        // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers );
     278        // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers );
     279        // unlock( out_lock );
     280        #endif
    274281}
    275282
    276283static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) {
    277     if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
    278     this.nprocessors = nprocessors;
    279     this.nworkers = nworkers;
    280     this.nrqueues = nrqueues;
    281     this.seperate_clus = seperate_clus;
    282     this.is_shutdown = false;
    283 
    284     if ( nworkers == nrqueues )
    285         no_steal = true;
    286    
    287     #ifdef ACTOR_STATS
    288     // stolen_arr = aalloc( nrqueues );
    289     // replaced_queue = aalloc( nrqueues );
    290     __total_workers = nworkers;
    291     #endif
    292 
    293     if ( seperate_clus ) {
    294         cluster = alloc();
    295         (*cluster){};
    296     } else cluster = active_cluster();
    297 
    298     request_queues = aalloc( nrqueues );
    299     worker_req_queues = aalloc( nrqueues );
    300     for ( i; nrqueues ) {
    301         request_queues[i]{ buf_size, i };
    302         worker_req_queues[i] = &request_queues[i];
    303     }
    304    
    305     processors = aalloc( nprocessors );
    306     for ( i; nprocessors )
    307         (*(processors[i] = alloc())){ *cluster };
    308 
    309     local_queues = aalloc( nworkers );
    310     workers = aalloc( nworkers );
    311     w_infos = aalloc( nworkers );
    312     unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
    313 
    314     for ( i; nworkers ) {
    315         w_infos[i]{};
    316         local_queues[i]{ buf_size };
    317     }
    318 
    319     for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
    320         range = reqPerWorker + ( i < extras ? 1 : 0 );
    321         (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i };
    322     } // for
     284        if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
     285        this.nprocessors = nprocessors;
     286        this.nworkers = nworkers;
     287        this.nrqueues = nrqueues;
     288        this.seperate_clus = seperate_clus;
     289        this.is_shutdown = false;
     290
     291        if ( nworkers == nrqueues )
     292                no_steal = true;
     293       
     294        #ifdef ACTOR_STATS
     295        // stolen_arr = aalloc( nrqueues );
     296        // replaced_queue = aalloc( nrqueues );
     297        __total_workers = nworkers;
     298        #endif
     299
     300        if ( seperate_clus ) {
     301                cluster = alloc();
     302                (*cluster){};
     303        } else cluster = active_cluster();
     304
     305        request_queues = aalloc( nrqueues );
     306        worker_req_queues = aalloc( nrqueues );
     307        for ( i; nrqueues ) {
     308                request_queues[i]{ buf_size, i };
     309                worker_req_queues[i] = &request_queues[i];
     310        }
     311       
     312        processors = aalloc( nprocessors );
     313        for ( i; nprocessors )
     314                (*(processors[i] = alloc())){ *cluster };
     315
     316        local_queues = aalloc( nworkers );
     317        workers = aalloc( nworkers );
     318        w_infos = aalloc( nworkers );
     319        unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
     320
     321        for ( i; nworkers ) {
     322                w_infos[i]{};
     323                local_queues[i]{ buf_size };
     324        }
     325
     326        for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
     327                range = reqPerWorker + ( i < extras ? 1 : 0 );
     328                (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i };
     329        } // for
    323330}
    324331static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __DEFAULT_EXECUTOR_BUFSIZE__ }; }
     
    329336
    330337static inline void ^?{}( executor & this ) with(this) {
    331     is_shutdown = true;
    332 
    333     for ( i; nworkers )
    334         delete( workers[i] );
    335 
    336     for ( i; nprocessors ) {
    337         delete( processors[i] );
    338     } // for
    339 
    340     #ifdef ACTOR_STATS
    341     size_t misses = 0;
    342     for ( i; nrqueues ) {
    343         misses += worker_req_queues[i]->missed;
    344     }
    345     // adelete( stolen_arr );
    346     // adelete( replaced_queue );
     338        is_shutdown = true;
     339
     340        for ( i; nworkers )
     341                delete( workers[i] );
     342
     343        for ( i; nprocessors ) {
     344                delete( processors[i] );
     345        } // for
     346
     347        #ifdef ACTOR_STATS_QUEUE_MISSED
     348        size_t misses = 0;
     349        for ( i; nrqueues ) {
     350                misses += worker_req_queues[i]->missed;
     351        }
     352        // adelete( stolen_arr );
     353        // adelete( replaced_queue );
     354        #endif
     355
     356        adelete( workers );
     357        adelete( w_infos );
     358        adelete( local_queues );
     359        adelete( request_queues );
     360        adelete( worker_req_queues );
     361        adelete( processors );
     362        if ( seperate_clus ) delete( cluster );
     363
     364        #ifdef ACTOR_STATS // print formatted stats
     365        printf("        Actor System Stats:\n");
     366        printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed);
     367        size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps;
     368        printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses);
     369        printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n",
     370                __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen);
     371        size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen;
     372        printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal);
     373        #endif
     374
     375    #ifndef ACTOR_STATS
     376    #ifdef ACTOR_STATS_QUEUE_MISSED
     377    printf("\t%lu", misses);
    347378    #endif
    348 
    349     adelete( workers );
    350     adelete( w_infos );
    351     adelete( local_queues );
    352     adelete( request_queues );
    353     adelete( worker_req_queues );
    354     adelete( processors );
    355     if ( seperate_clus ) delete( cluster );
    356 
    357     #ifdef ACTOR_STATS // print formatted stats
    358     printf("    Actor System Stats:\n");
    359     printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed);
    360     size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps;
    361     printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses);
    362     printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n",
    363         __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen);
    364     size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen;
    365     printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal);
    366379    #endif
    367        
     380               
    368381}
    369382
     
    372385
    373386static inline size_t __get_next_ticket( executor & this ) with(this) {
    374     #ifdef __CFA_DEBUG__
    375     size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
    376 
    377     // reserve MAX for dead actors
    378     if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
    379     return temp;
    380     #else
    381     return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues;
    382     #endif
     387        #ifdef __CFA_DEBUG__
     388        size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
     389
     390        // reserve MAX for dead actors
     391        if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
     392        return temp;
     393        #else
     394        return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues;
     395        #endif
    383396} // tickets
    384397
    385398// TODO: update globals in this file to be static fields once the static fields project is done
    386399static executor * __actor_executor_ = 0p;
    387 static bool __actor_executor_passed = false;            // was an executor passed to start_actor_system
    388 static size_t __num_actors_ = 0;                                        // number of actor objects in system
     400static bool __actor_executor_passed = false;                    // was an executor passed to start_actor_system
     401static size_t __num_actors_ = 0;                                                // number of actor objects in system
    389402static struct thread$ * __actor_executor_thd = 0p;              // used to wake executor after actors finish
    390403struct actor {
    391     size_t ticket;                                          // executor-queue handle
    392     allocation alloc;                                       // allocation action
    393     inline virtual_dtor;
     404        size_t ticket;                                                                          // executor-queue handle
     405        allocation alloc;                                                                       // allocation action
     406        inline virtual_dtor;
    394407};
    395408
    396409static inline void ?{}( actor & this ) with(this) {
    397     // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
    398     // member must be called to end it
    399     DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );
    400     alloc = Nodelete;
    401     ticket = __get_next_ticket( *__actor_executor_ );
    402     __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED );
    403     #ifdef ACTOR_STATS
    404     __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST );
    405     #endif
     410        // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
     411        // member must be called to end it
     412        DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );
     413        alloc = Nodelete;
     414        ticket = __get_next_ticket( *__actor_executor_ );
     415        __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED );
     416        #ifdef ACTOR_STATS
     417        __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST );
     418        #endif
    406419}
    407420
    408421static inline void check_actor( actor & this ) {
    409     if ( this.alloc != Nodelete ) {
    410         switch( this.alloc ) {
    411             case Delete: delete( &this ); break;
    412             case Destroy:
    413                 CFA_DEBUG( this.ticket = MAX; );        // mark as terminated
    414                 ^?{}(this);
    415                 break;
    416             case Finished:
    417                 CFA_DEBUG( this.ticket = MAX; );        // mark as terminated
    418                 break;
    419             default: ;                                                          // stop warning
    420         }
    421 
    422         if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated
    423             unpark( __actor_executor_thd );
    424         }
    425     }
     422        if ( this.alloc != Nodelete ) {
     423                switch( this.alloc ) {
     424                        case Delete: delete( &this ); break;
     425                        case Destroy:
     426                                CFA_DEBUG( this.ticket = MAX; );                // mark as terminated
     427                                ^?{}(this);
     428                                break;
     429                        case Finished:
     430                                CFA_DEBUG( this.ticket = MAX; );                // mark as terminated
     431                                break;
     432                        default: ;                                                                      // stop warning
     433                }
     434
     435                if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated
     436                        unpark( __actor_executor_thd );
     437                }
     438        }
    426439}
    427440
    428441struct message {
    429     allocation alloc;                   // allocation action
    430     inline virtual_dtor;
     442        allocation alloc;                                                                       // allocation action
     443        inline virtual_dtor;
    431444};
    432445
    433446static inline void ?{}( message & this ) {
    434     this.alloc = Nodelete;
     447        this.alloc = Nodelete;
    435448}
    436449static inline void ?{}( message & this, allocation alloc ) {
    437     memcpy( &this.alloc, &alloc, sizeof(allocation) ); // optimization to elide ctor
    438     CFA_DEBUG( if( this.alloc == Finished ) this.alloc = Nodelete; )
     450        memcpy( &this.alloc, &alloc, sizeof(allocation) );      // optimization to elide ctor
     451        CFA_DEBUG( if ( this.alloc == Finished ) this.alloc = Nodelete; );
    439452}
    440453static inline void ^?{}( message & this ) with(this) {
    441     CFA_DEBUG(
     454        CFA_DEBUG(
    442455                if ( alloc == Nodelete ) {
    443456                        printf( "CFA warning (UNIX pid:%ld) : program terminating with message %p allocated but never sent.\n",
     
    448461
    449462static inline void check_message( message & this ) {
    450     switch ( this.alloc ) {                                             // analyze message status
    451         case Nodelete: CFA_DEBUG( this.alloc = Finished ); break;
    452         case Delete: delete( &this ); break;
    453         case Destroy: ^?{}( this ); break;
    454         case Finished: break;
    455     } // switch
    456 }
    457 static inline void set_allocation( message & this, allocation state ) {
    458     CFA_DEBUG( if ( state == Nodelete ) state = Finished; )
    459     this.alloc = state;
     463        switch ( this.alloc ) {                                         // analyze message status
     464                case Nodelete: CFA_DEBUG( this.alloc = Finished ); break;
     465                case Delete: delete( &this ); break;
     466                case Destroy: ^?{}( this ); break;
     467                case Finished: break;
     468        } // switch
     469}
     470static inline allocation set_allocation( message & this, allocation state ) {
     471        CFA_DEBUG( if ( state == Nodelete ) state = Finished; );
     472        allocation prev = this.alloc;
     473        this.alloc = state;
     474        return prev;
     475}
     476static inline allocation get_allocation( message & this ) {
     477        return this.alloc;
    460478}
    461479
    462480static inline void deliver_request( request & this ) {
    463     DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
    464     actor * base_actor;
    465     message * base_msg;
    466     allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg );
    467     memcpy( &base_actor->alloc, &temp, sizeof(allocation) ); // optimization to elide ctor
    468     check_message( *base_msg );
    469     check_actor( *base_actor );
     481        DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
     482        actor * base_actor;
     483        message * base_msg;
     484        allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg );
     485        memcpy( &base_actor->alloc, &temp, sizeof(allocation) ); // optimization to elide ctor
     486        check_message( *base_msg );
     487        check_actor( *base_actor );
    470488}
    471489
     
    473491// returns ptr to newly owned queue if swap succeeds
    474492static inline work_queue * try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) {
    475     work_queue * my_queue = request_queues[my_idx];
    476     work_queue * other_queue = request_queues[victim_idx];
    477 
    478     // if either queue is 0p then they are in the process of being stolen
    479     if ( other_queue == 0p ) return 0p;
    480 
    481     // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false
    482     if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )
    483         return 0p;
    484 
    485     // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false
    486     if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
    487         /* paranoid */ verify( request_queues[my_idx] == 0p );
    488         request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val
    489         return 0p;
    490     }
    491 
    492     // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically
    493     request_queues[my_idx] = other_queue; // last write does not need to be atomic
    494     return other_queue;
     493        work_queue * my_queue = request_queues[my_idx];
     494        work_queue * other_queue = request_queues[victim_idx];
     495
     496        // if either queue is 0p then they are in the process of being stolen
     497        if ( other_queue == 0p ) return 0p;
     498
     499        // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false
     500        if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )
     501                return 0p;
     502
     503        // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false
     504        if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
     505                /* paranoid */ verify( request_queues[my_idx] == 0p );
     506                request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val
     507                return 0p;
     508        }
     509
     510        // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically
     511        request_queues[my_idx] = other_queue; // last write does not need to be atomic
     512        return other_queue;
    495513}
    496514
    497515// once a worker to steal from has been chosen, choose queue to steal from
    498516static inline void choose_queue( worker & this, unsigned int victim_id, unsigned int swap_idx ) with(this) {
    499     // have to calculate victim start and range since victim may be deleted before us in shutdown
    500     const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers;
    501     const unsigned int extras = executor_->nrqueues % executor_->nworkers;
    502     unsigned int vic_start, vic_range;
    503     if ( extras > victim_id  ) {
    504         vic_range = queues_per_worker + 1;
    505         vic_start = vic_range * victim_id;
    506     } else {
    507         vic_start = extras + victim_id * queues_per_worker;
    508         vic_range = queues_per_worker;
    509     }
    510     unsigned int start_idx = prng( vic_range );
    511 
    512     unsigned int tries = 0;
    513     work_queue * curr_steal_queue;
    514 
    515     for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) {
    516         tries++;
    517         curr_steal_queue = request_queues[ i + vic_start ];
    518         // avoid empty queues and queues that are being operated on
    519         if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) )
    520             continue;
    521 
    522         #ifdef ACTOR_STATS
    523         curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
    524         if ( curr_steal_queue ) {
    525             executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count;
    526             executor_->w_infos[id].stolen++;
    527             if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++;
    528             // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);
    529             // replaced_queue[swap_idx]++;
    530             // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED);
    531         } else {
    532             executor_->w_infos[id].failed_swaps++;
    533         }
    534         #else
    535         curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
    536         #endif // ACTOR_STATS
    537 
    538         return;
    539     }
    540 
    541     return;
     517        // have to calculate victim start and range since victim may be deleted before us in shutdown
     518        const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers;
     519        const unsigned int extras = executor_->nrqueues % executor_->nworkers;
     520        unsigned int vic_start, vic_range;
     521        if ( extras > victim_id  ) {
     522                vic_range = queues_per_worker + 1;
     523                vic_start = vic_range * victim_id;
     524        } else {
     525                vic_start = extras + victim_id * queues_per_worker;
     526                vic_range = queues_per_worker;
     527        }
     528        unsigned int start_idx = prng( vic_range );
     529
     530        unsigned int tries = 0;
     531        work_queue * curr_steal_queue;
     532
     533        for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) {
     534                tries++;
     535                curr_steal_queue = request_queues[ i + vic_start ];
     536                // avoid empty queues and queues that are being operated on
     537                if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) )
     538                        continue;
     539
     540                #ifdef ACTOR_STATS
     541                curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
     542                if ( curr_steal_queue ) {
     543                        executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count;
     544                        executor_->w_infos[id].stolen++;
     545                        if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++;
     546                        // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);
     547                        // replaced_queue[swap_idx]++;
     548                        // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED);
     549                } else {
     550                        executor_->w_infos[id].failed_swaps++;
     551                }
     552                #else
     553                curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
     554                #endif // ACTOR_STATS
     555
     556                return;
     557        }
     558
     559        return;
    542560}
    543561
    544562// choose a worker to steal from
    545563static inline void steal_work( worker & this, unsigned int swap_idx ) with(this) {
    546     #if RAND
    547     unsigned int victim = prng( executor_->nworkers );
    548     if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers;
    549     choose_queue( this, victim, swap_idx );
    550     #elif SEARCH
    551     unsigned long long min = MAX; // smaller timestamp means longer since service
    552     int min_id = 0; // use ints not uints to avoid integer underflow without hacky math
    553     int n_workers = executor_->nworkers;
    554     unsigned long long curr_stamp;
    555     int scount = 1;
    556     for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) {
    557         curr_stamp = executor_->w_infos[i].stamp;
    558         if ( curr_stamp < min ) {
    559             min = curr_stamp;
    560             min_id = i;
    561         }
    562     }
    563     choose_queue( this, min_id, swap_idx );
    564     #endif
     564        #if RAND
     565        unsigned int victim = prng( executor_->nworkers );
     566        if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers;
     567        choose_queue( this, victim, swap_idx );
     568        #elif SEARCH
     569        unsigned long long min = MAX; // smaller timestamp means longer since service
     570        int min_id = 0; // use ints not uints to avoid integer underflow without hacky math
     571        int n_workers = executor_->nworkers;
     572        unsigned long long curr_stamp;
     573        int scount = 1;
     574        for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) {
     575                curr_stamp = executor_->w_infos[i].stamp;
     576                if ( curr_stamp < min ) {
     577                        min = curr_stamp;
     578                        min_id = i;
     579                }
     580        }
     581        choose_queue( this, min_id, swap_idx );
     582        #endif
    565583}
    566584
    567585#define CHECK_TERMINATION if ( unlikely( executor_->is_shutdown ) ) break Exit
    568586void main( worker & this ) with(this) {
    569     // #ifdef ACTOR_STATS
    570     // for ( i; executor_->nrqueues ) {
    571     //    replaced_queue[i] = 0;
    572     //    __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST );
    573     // }
    574     // #endif
    575 
    576     // threshold of empty queues we see before we go stealing
    577     const unsigned int steal_threshold = 2 * range;
    578 
    579     // Store variable data here instead of worker struct to avoid any potential false sharing
    580     unsigned int empty_count = 0;
    581     request & req;
    582     work_queue * curr_work_queue;
    583 
    584     Exit:
    585     for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers
    586         curr_work_queue = request_queues[i + start];
     587        // #ifdef ACTOR_STATS
     588        // for ( i; executor_->nrqueues ) {
     589        //      replaced_queue[i] = 0;
     590        //      __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST );
     591        // }
     592        // #endif
     593
     594        // threshold of empty queues we see before we go stealing
     595        const unsigned int steal_threshold = 2 * range;
     596
     597        // Store variable data here instead of worker struct to avoid any potential false sharing
     598        unsigned int empty_count = 0;
     599        request & req;
     600        work_queue * curr_work_queue;
     601
     602        Exit:
     603        for ( unsigned int i = 0;; i = (i + 1) % range ) {      // cycle through set of request buffers
     604                curr_work_queue = request_queues[i + start];
    587605
    588606        #ifndef __STEAL
    589607        CHECK_TERMINATION;
    590608        #endif
    591 
    592         // check if queue is empty before trying to gulp it
    593         if ( is_empty( *curr_work_queue->c_queue ) ) {
    594             #ifdef __STEAL
    595             empty_count++;
    596             if ( empty_count < steal_threshold ) continue;
    597             #else
    598             continue;
    599             #endif
    600         }
    601         transfer( *curr_work_queue, &current_queue );
    602         #ifdef ACTOR_STATS
    603         executor_->w_infos[id].gulps++;
    604         #endif // ACTOR_STATS
    605         #ifdef __STEAL
    606         if ( is_empty( *current_queue ) ) {
    607             if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; }
    608             empty_count++;
    609             if ( empty_count < steal_threshold ) continue;
    610             empty_count = 0;
    611 
    612             CHECK_TERMINATION; // check for termination
    613 
    614             __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED );
    615            
    616             #ifdef ACTOR_STATS
    617             executor_->w_infos[id].try_steal++;
    618             #endif // ACTOR_STATS
    619            
    620             steal_work( this, start + prng( range ) );
    621             continue;
    622         }
    623         #endif // __STEAL
    624         while ( ! is_empty( *current_queue ) ) {
    625             #ifdef ACTOR_STATS
    626             executor_->w_infos[id].processed++;
    627             #endif
    628             &req = &remove( *current_queue );
    629             if ( !&req ) continue;
    630             deliver_request( req );
    631         }
    632         #ifdef __STEAL
    633         curr_work_queue->being_processed = false; // set done processing
    634         empty_count = 0; // we found work so reset empty counter
    635         #endif
    636 
    637         CHECK_TERMINATION;
    638        
    639         // potentially reclaim some of the current queue's vector space if it is unused
    640         reclaim( *current_queue );
    641     } // for
     609               
     610                // check if queue is empty before trying to gulp it
     611                if ( is_empty( *curr_work_queue->c_queue ) ) {
     612                        #ifdef __STEAL
     613                        empty_count++;
     614                        if ( empty_count < steal_threshold ) continue;
     615                        #else
     616                        continue;
     617                        #endif
     618                }
     619                transfer( *curr_work_queue, &current_queue );
     620                #ifdef ACTOR_STATS
     621                executor_->w_infos[id].gulps++;
     622                #endif // ACTOR_STATS
     623                #ifdef __STEAL
     624                if ( is_empty( *current_queue ) ) {
     625                        if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; }
     626                        empty_count++;
     627                        if ( empty_count < steal_threshold ) continue;
     628                        empty_count = 0;
     629
     630                        CHECK_TERMINATION; // check for termination
     631
     632                        __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED );
     633                       
     634                        #ifdef ACTOR_STATS
     635                        executor_->w_infos[id].try_steal++;
     636                        #endif // ACTOR_STATS
     637                       
     638                        steal_work( this, start + prng( range ) );
     639                        continue;
     640                }
     641                #endif // __STEAL
     642                while ( ! is_empty( *current_queue ) ) {
     643                        #ifdef ACTOR_STATS
     644                        executor_->w_infos[id].processed++;
     645                        #endif
     646                        &req = &remove( *current_queue );
     647                        if ( !&req ) continue;
     648                        deliver_request( req );
     649                }
     650                #ifdef __STEAL
     651                curr_work_queue->being_processed = false;               // set done processing
     652                empty_count = 0; // we found work so reset empty counter
     653                #endif
     654
     655                CHECK_TERMINATION;
     656               
     657                // potentially reclaim some of the current queue's vector space if it is unused
     658                reclaim( *current_queue );
     659        } // for
    642660}
    643661
    644662static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) {
    645     insert( request_queues[ticket], req);
     663        insert( request_queues[ticket], req);
    646664}
    647665
    648666static inline void send( actor & this, request & req ) {
    649     DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
    650     send( *__actor_executor_, req, this.ticket );
     667        DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
     668        send( *__actor_executor_, req, this.ticket );
    651669}
    652670
    653671static inline void __reset_stats() {
    654     #ifdef ACTOR_STATS
    655     __total_tries = 0;
    656     __total_stolen = 0;
    657     __all_gulps = 0;
    658     __total_failed_swaps = 0;
    659     __total_empty_stolen = 0;
    660     __all_processed = 0;
    661     __num_actors_stats = 0;
    662     __all_msgs_stolen = 0;
    663     #endif
     672        #ifdef ACTOR_STATS
     673        __total_tries = 0;
     674        __total_stolen = 0;
     675        __all_gulps = 0;
     676        __total_failed_swaps = 0;
     677        __total_empty_stolen = 0;
     678        __all_processed = 0;
     679        __num_actors_stats = 0;
     680        __all_msgs_stolen = 0;
     681        #endif
    664682}
    665683
    666684static inline void start_actor_system( size_t num_thds ) {
    667     __reset_stats();
    668     __actor_executor_thd = active_thread();
    669     __actor_executor_ = alloc();
    670     (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 };
     685        __reset_stats();
     686        __actor_executor_thd = active_thread();
     687        __actor_executor_ = alloc();
     688        (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 };
    671689}
    672690
     
    674692
    675693static inline void start_actor_system( executor & this ) {
    676     __reset_stats();
    677     __actor_executor_thd = active_thread();
    678     __actor_executor_ = &this;
    679     __actor_executor_passed = true;
     694        __reset_stats();
     695        __actor_executor_thd = active_thread();
     696        __actor_executor_ = &this;
     697        __actor_executor_passed = true;
    680698}
    681699
    682700static inline void stop_actor_system() {
    683     park( ); // unparked when actor system is finished
    684 
    685     if ( !__actor_executor_passed ) delete( __actor_executor_ );
    686     __actor_executor_ = 0p;
    687     __actor_executor_thd = 0p;
    688     __next_ticket = 0;
    689     __actor_executor_passed = false;
     701        park();                                                                                         // unparked when actor system is finished
     702
     703        if ( !__actor_executor_passed ) delete( __actor_executor_ );
     704        __actor_executor_ = 0p;
     705        __actor_executor_thd = 0p;
     706        __next_ticket = 0;
     707        __actor_executor_passed = false;
    690708}
    691709
     
    693711// assigned at creation to __base_msg_finished to avoid unused message warning
    694712message __base_msg_finished @= { .alloc : Finished };
    695 struct delete_message_t { inline message; } delete_msg = __base_msg_finished;
     713struct delete_msg_t { inline message; } delete_msg = __base_msg_finished;
    696714struct destroy_msg_t { inline message; } destroy_msg = __base_msg_finished;
    697715struct finished_msg_t { inline message; } finished_msg = __base_msg_finished;
    698716
    699 allocation receive( actor & this, delete_message_t & msg ) { return Delete; }
     717allocation receive( actor & this, delete_msg_t & msg ) { return Delete; }
    700718allocation receive( actor & this, destroy_msg_t & msg ) { return Destroy; }
    701719allocation receive( actor & this, finished_msg_t & msg ) { return Finished; }
    702 
Note: See TracChangeset for help on using the changeset viewer.