Changeset b9c06b98 for libcfa/src


Ignore:
Timestamp:
Jun 26, 2023, 11:01:39 PM (12 months ago)
Author:
Peter A. Buhr <pabuhr@…>
Branches:
master
Children:
2dfdae3
Parents:
c4497e3
Message:

formatting, change set_allocation to return previous value, add get_allocation

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/actor.hfa

    rc4497e3 rb9c06b98  
    4848typedef allocation (*__receive_fn)(actor &, message &, actor **, message **);
    4949struct request {
    50     actor * receiver;
    51     message * msg;
    52     __receive_fn fn;
     50        actor * receiver;
     51        message * msg;
     52        __receive_fn fn;
    5353};
    5454
    5555struct a_msg {
    56     int m;
     56        int m;
    5757};
    5858static inline void ?{}( request & this ) {}
    5959static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {
    60     this.receiver = receiver;
    61     this.msg = msg;
    62     this.fn = fn;
     60        this.receiver = receiver;
     61        this.msg = msg;
     62        this.fn = fn;
    6363}
    6464static inline void ?{}( request & this, request & copy ) {
    65     this.receiver = copy.receiver;
    66     this.msg = copy.msg;
    67     this.fn = copy.fn;
     65        this.receiver = copy.receiver;
     66        this.msg = copy.msg;
     67        this.fn = copy.fn;
    6868}
    6969
     
    7171// assumes gulping behaviour (once a remove occurs, removes happen until empty beforw next insert)
    7272struct copy_queue {
    73     request * buffer;
    74     size_t count, buffer_size, index, utilized, last_size;
     73        request * buffer;
     74        size_t count, buffer_size, index, utilized, last_size;
    7575};
    7676static inline void ?{}( copy_queue & this ) {}
    7777static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) {
    78     buffer_size = buf_size;
    79     buffer = aalloc( buffer_size );
    80     count = 0;
    81     utilized = 0;
    82     index = 0;
    83     last_size = 0;
     78        buffer_size = buf_size;
     79        buffer = aalloc( buffer_size );
     80        count = 0;
     81        utilized = 0;
     82        index = 0;
     83        last_size = 0;
    8484}
    8585static inline void ^?{}( copy_queue & this ) with(this) {
    86     DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" );
    87     adelete(buffer);
     86        DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" );
     87        adelete(buffer);
    8888}
    8989
    9090static inline void insert( copy_queue & this, request & elem ) with(this) {
    91     if ( count >= buffer_size ) { // increase arr size
    92         last_size = buffer_size;
    93         buffer_size = 2 * buffer_size;
    94         buffer = realloc( buffer, sizeof( request ) * buffer_size );
    95         /* paranoid */ verify( buffer );
    96     }
    97     memcpy( &buffer[count], &elem, sizeof(request) );
    98     count++;
     91        if ( count >= buffer_size ) { // increase arr size
     92                last_size = buffer_size;
     93                buffer_size = 2 * buffer_size;
     94                buffer = realloc( buffer, sizeof( request ) * buffer_size );
     95                /* paranoid */ verify( buffer );
     96        }
     97        memcpy( &buffer[count], &elem, sizeof(request) );
     98        count++;
    9999}
    100100
     
    102102// it is not supported to call insert() before the array is fully empty
    103103static inline request & remove( copy_queue & this ) with(this) {
    104     if ( count > 0 ) {
    105         count--;
    106         size_t old_idx = index;
    107         index = count == 0 ? 0 : index + 1;
    108         return buffer[old_idx];
    109     }
    110     request * ret = 0p;
    111     return *0p;
     104        if ( count > 0 ) {
     105                count--;
     106                size_t old_idx = index;
     107                index = count == 0 ? 0 : index + 1;
     108                return buffer[old_idx];
     109        }
     110        request * ret = 0p;
     111        return *0p;
    112112}
    113113
    114114// try to reclaim some memory if less than half of buffer is utilized
    115115static inline void reclaim( copy_queue & this ) with(this) {
    116     if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; }
    117     utilized = 0;
    118     buffer_size--;
    119     buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory
     116        if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; }
     117        utilized = 0;
     118        buffer_size--;
     119        buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory
    120120}
    121121
     
    123123
    124124struct work_queue {
    125     __spinlock_t mutex_lock;
    126     copy_queue * owned_queue;       // copy queue allocated and cleaned up by this work_queue
    127     copy_queue * c_queue;           // current queue
    128     volatile bool being_processed;  // flag to prevent concurrent processing
    129     #ifdef ACTOR_STATS
    130     unsigned int id;
    131     size_t missed;                  // transfers skipped due to being_processed flag being up
    132     #endif
     125        __spinlock_t mutex_lock;
     126        copy_queue * owned_queue;                                                       // copy queue allocated and cleaned up by this work_queue
     127        copy_queue * c_queue;                                                           // current queue
     128        volatile bool being_processed;                                          // flag to prevent concurrent processing
     129        #ifdef ACTOR_STATS
     130        unsigned int id;
     131        size_t missed;                                                                          // transfers skipped due to being_processed flag being up
     132        #endif
    133133}; // work_queue
    134134static inline void ?{}( work_queue & this, size_t buf_size, unsigned int i ) with(this) {
    135     owned_queue = alloc();      // allocated separately to avoid false sharing
    136     (*owned_queue){ buf_size };
    137     c_queue = owned_queue;
    138     being_processed = false;
    139     #ifdef ACTOR_STATS
    140     id = i;
    141     missed = 0;
    142     #endif
     135        owned_queue = alloc();                                                          // allocated separately to avoid false sharing
     136        (*owned_queue){ buf_size };
     137        c_queue = owned_queue;
     138        being_processed = false;
     139        #ifdef ACTOR_STATS
     140        id = i;
     141        missed = 0;
     142        #endif
    143143}
    144144
     
    147147
    148148static inline void insert( work_queue & this, request & elem ) with(this) {
    149     lock( mutex_lock __cfaabi_dbg_ctx2 );
    150     insert( *c_queue, elem );
    151     unlock( mutex_lock );
     149        lock( mutex_lock __cfaabi_dbg_ctx2 );
     150        insert( *c_queue, elem );
     151        unlock( mutex_lock );
    152152} // insert
    153153
    154154static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) {
    155     lock( mutex_lock __cfaabi_dbg_ctx2 );
    156     #ifdef __STEAL
    157 
    158     // check if queue is being processed elsewhere
    159     if ( unlikely( being_processed ) ) {
    160         #ifdef ACTOR_STATS
    161         missed++;
    162         #endif
    163         unlock( mutex_lock );
    164         return;
    165     }
    166 
    167     being_processed = c_queue->count != 0;
    168     #endif // __STEAL
    169 
    170     c_queue->utilized = c_queue->count;
    171 
    172     // swap copy queue ptrs
    173     copy_queue * temp = *transfer_to;
    174     *transfer_to = c_queue;
    175     c_queue = temp;
    176     unlock( mutex_lock );
     155        lock( mutex_lock __cfaabi_dbg_ctx2 );
     156        #ifdef __STEAL
     157
     158        // check if queue is being processed elsewhere
     159        if ( unlikely( being_processed ) ) {
     160                #ifdef ACTOR_STATS
     161                missed++;
     162                #endif
     163                unlock( mutex_lock );
     164                return;
     165        }
     166
     167        being_processed = c_queue->count != 0;
     168        #endif // __STEAL
     169
     170        c_queue->utilized = c_queue->count;
     171
     172        // swap copy queue ptrs
     173        copy_queue * temp = *transfer_to;
     174        *transfer_to = c_queue;
     175        c_queue = temp;
     176        unlock( mutex_lock );
    177177} // transfer
    178178
    179179// needed since some info needs to persist past worker lifetimes
    180180struct worker_info {
    181     volatile unsigned long long stamp;
    182     #ifdef ACTOR_STATS
    183     size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen;
    184     unsigned long long processed;
    185     size_t gulps;
    186     #endif
     181        volatile unsigned long long stamp;
     182        #ifdef ACTOR_STATS
     183        size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen;
     184        unsigned long long processed;
     185        size_t gulps;
     186        #endif
    187187};
    188188static inline void ?{}( worker_info & this ) {
    189     #ifdef ACTOR_STATS
    190     this.stolen_from = 0;
    191     this.try_steal = 0;                             // attempts to steal
    192     this.stolen = 0;                                // successful steals
    193     this.processed = 0;                             // requests processed
    194     this.gulps = 0;                                 // number of gulps
    195     this.failed_swaps = 0;                          // steal swap failures
    196     this.empty_stolen = 0;                          // queues empty after steal
    197     this.msgs_stolen = 0;                           // number of messages stolen
    198     #endif
    199     this.stamp = rdtscl();
     189        #ifdef ACTOR_STATS
     190        this.stolen_from = 0;
     191        this.try_steal = 0;                                                                     // attempts to steal
     192        this.stolen = 0;                                                                        // successful steals
     193        this.processed = 0;                                                                     // requests processed
     194        this.gulps = 0;                                                                         // number of gulps
     195        this.failed_swaps = 0;                                                          // steal swap failures
     196        this.empty_stolen = 0;                                                          // queues empty after steal
     197        this.msgs_stolen = 0;                                                           // number of messages stolen
     198        #endif
     199        this.stamp = rdtscl();
    200200}
    201201
     
    205205// #endif
    206206thread worker {
    207     work_queue ** request_queues;
    208     copy_queue * current_queue;
    209     executor * executor_;
    210     unsigned int start, range;
    211     int id;
     207        work_queue ** request_queues;
     208        copy_queue * current_queue;
     209        executor * executor_;
     210        unsigned int start, range;
     211        int id;
    212212};
    213213
     
    215215// aggregate counters for statistics
    216216size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, __total_empty_stolen = 0,
    217     __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0;
     217        __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0;
    218218#endif
    219219static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, executor * executor_,
    220     unsigned int start, unsigned int range, int id ) {
    221     ((thread &)this){ clu };
    222     this.request_queues = request_queues;           // array of all queues
    223     this.current_queue = current_queue;             // currently gulped queue (start with empty queue to use in swap later)
    224     this.executor_ = executor_;                     // pointer to current executor
    225     this.start = start;                             // start of worker's subrange of request_queues
    226     this.range = range;                             // size of worker's subrange of request_queues
    227     this.id = id;                                   // worker's id and index in array of workers
     220        unsigned int start, unsigned int range, int id ) {
     221        ((thread &)this){ clu };
     222        this.request_queues = request_queues;                           // array of all queues
     223        this.current_queue = current_queue;                                     // currently gulped queue (start with empty queue to use in swap later)
     224        this.executor_ = executor_;                                                     // pointer to current executor
     225        this.start = start;                                                                     // start of worker's subrange of request_queues
     226        this.range = range;                                                                     // size of worker's subrange of request_queues
     227        this.id = id;                                                                           // worker's id and index in array of workers
    228228}
    229229
    230230static bool no_steal = false;
    231231struct executor {
    232     cluster * cluster;                                                      // if workers execute on separate cluster
    233         processor ** processors;                                            // array of virtual processors adding parallelism for workers
    234         work_queue * request_queues;                                // master array of work request queues
    235     copy_queue * local_queues;                      // array of all worker local queues to avoid deletion race
    236         work_queue ** worker_req_queues;                // secondary array of work queues to allow for swapping
    237     worker ** workers;                                                          // array of workers executing work requests
    238     worker_info * w_infos;                          // array of info about each worker
    239         unsigned int nprocessors, nworkers, nrqueues;   // number of processors/threads/request queues
    240         bool seperate_clus;                                                             // use same or separate cluster for executor
    241     volatile bool is_shutdown;                      // flag to communicate shutdown to worker threads
     232        cluster * cluster;                                                                      // if workers execute on separate cluster
     233        processor ** processors;                                                        // array of virtual processors adding parallelism for workers
     234        work_queue * request_queues;                                            // master array of work request queues
     235        copy_queue * local_queues;                                                      // array of all worker local queues to avoid deletion race
     236        work_queue ** worker_req_queues;                                        // secondary array of work queues to allow for swapping
     237        worker ** workers;                                                                      // array of workers executing work requests
     238        worker_info * w_infos;                                                          // array of info about each worker
     239        unsigned int nprocessors, nworkers, nrqueues;           // number of processors/threads/request queues
     240        bool seperate_clus;                                                                     // use same or separate cluster for executor
     241        volatile bool is_shutdown;                                                      // flag to communicate shutdown to worker threads
    242242}; // executor
    243243
     
    246246// #endif
    247247static inline void ^?{}( worker & mutex this ) with(this) {
    248     #ifdef ACTOR_STATS
    249     __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST);
    250     __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST);
    251     __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST);
    252     __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST);
    253     __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST);
    254     __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST);
    255     __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST);
    256 
    257     // per worker steal stats (uncomment alongside the lock above this routine to print)
    258     // lock( out_lock __cfaabi_dbg_ctx2 );
    259     // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) );
    260     // int count = 0;
    261     // int count2 = 0;
    262     // for ( i; range ) {
    263     //    if ( replaced_queue[start + i] > 0 ){
    264     //        count++;
    265     //        // printf("%d: %u, ",i, replaced_queue[i]);
    266     //    }
    267     //    if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0)
    268     //        count2++;
    269     // }
    270     // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers );
    271     // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers );
    272     // unlock( out_lock );
    273     #endif
     248        #ifdef ACTOR_STATS
     249        __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST);
     250        __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST);
     251        __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST);
     252        __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST);
     253        __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST);
     254        __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST);
     255        __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST);
     256
     257        // per worker steal stats (uncomment alongside the lock above this routine to print)
     258        // lock( out_lock __cfaabi_dbg_ctx2 );
     259        // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) );
     260        // int count = 0;
     261        // int count2 = 0;
     262        // for ( i; range ) {
     263        //      if ( replaced_queue[start + i] > 0 ){
     264        //              count++;
     265        //              // printf("%d: %u, ",i, replaced_queue[i]);
     266        //      }
     267        //      if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0)
     268        //              count2++;
     269        // }
     270        // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers );
     271        // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers );
     272        // unlock( out_lock );
     273        #endif
    274274}
    275275
    276276static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) {
    277     if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
    278     this.nprocessors = nprocessors;
    279     this.nworkers = nworkers;
    280     this.nrqueues = nrqueues;
    281     this.seperate_clus = seperate_clus;
    282     this.is_shutdown = false;
    283 
    284     if ( nworkers == nrqueues )
    285         no_steal = true;
    286    
    287     #ifdef ACTOR_STATS
    288     // stolen_arr = aalloc( nrqueues );
    289     // replaced_queue = aalloc( nrqueues );
    290     __total_workers = nworkers;
    291     #endif
    292 
    293     if ( seperate_clus ) {
    294         cluster = alloc();
    295         (*cluster){};
    296     } else cluster = active_cluster();
    297 
    298     request_queues = aalloc( nrqueues );
    299     worker_req_queues = aalloc( nrqueues );
    300     for ( i; nrqueues ) {
    301         request_queues[i]{ buf_size, i };
    302         worker_req_queues[i] = &request_queues[i];
    303     }
    304    
    305     processors = aalloc( nprocessors );
    306     for ( i; nprocessors )
    307         (*(processors[i] = alloc())){ *cluster };
    308 
    309     local_queues = aalloc( nworkers );
    310     workers = aalloc( nworkers );
    311     w_infos = aalloc( nworkers );
    312     unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
    313 
    314     for ( i; nworkers ) {
    315         w_infos[i]{};
    316         local_queues[i]{ buf_size };
    317     }
    318 
    319     for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
    320         range = reqPerWorker + ( i < extras ? 1 : 0 );
    321         (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i };
    322     } // for
     277        if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
     278        this.nprocessors = nprocessors;
     279        this.nworkers = nworkers;
     280        this.nrqueues = nrqueues;
     281        this.seperate_clus = seperate_clus;
     282        this.is_shutdown = false;
     283
     284        if ( nworkers == nrqueues )
     285                no_steal = true;
     286       
     287        #ifdef ACTOR_STATS
     288        // stolen_arr = aalloc( nrqueues );
     289        // replaced_queue = aalloc( nrqueues );
     290        __total_workers = nworkers;
     291        #endif
     292
     293        if ( seperate_clus ) {
     294                cluster = alloc();
     295                (*cluster){};
     296        } else cluster = active_cluster();
     297
     298        request_queues = aalloc( nrqueues );
     299        worker_req_queues = aalloc( nrqueues );
     300        for ( i; nrqueues ) {
     301                request_queues[i]{ buf_size, i };
     302                worker_req_queues[i] = &request_queues[i];
     303        }
     304       
     305        processors = aalloc( nprocessors );
     306        for ( i; nprocessors )
     307                (*(processors[i] = alloc())){ *cluster };
     308
     309        local_queues = aalloc( nworkers );
     310        workers = aalloc( nworkers );
     311        w_infos = aalloc( nworkers );
     312        unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
     313
     314        for ( i; nworkers ) {
     315                w_infos[i]{};
     316                local_queues[i]{ buf_size };
     317        }
     318
     319        for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
     320                range = reqPerWorker + ( i < extras ? 1 : 0 );
     321                (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i };
     322        } // for
    323323}
    324324static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __DEFAULT_EXECUTOR_BUFSIZE__ }; }
     
    329329
    330330static inline void ^?{}( executor & this ) with(this) {
    331     is_shutdown = true;
    332 
    333     for ( i; nworkers )
    334         delete( workers[i] );
    335 
    336     for ( i; nprocessors ) {
    337         delete( processors[i] );
    338     } // for
    339 
    340     #ifdef ACTOR_STATS
    341     size_t misses = 0;
    342     for ( i; nrqueues ) {
    343         misses += worker_req_queues[i]->missed;
    344     }
    345     // adelete( stolen_arr );
    346     // adelete( replaced_queue );
    347     #endif
    348 
    349     adelete( workers );
    350     adelete( w_infos );
    351     adelete( local_queues );
    352     adelete( request_queues );
    353     adelete( worker_req_queues );
    354     adelete( processors );
    355     if ( seperate_clus ) delete( cluster );
    356 
    357     #ifdef ACTOR_STATS // print formatted stats
    358     printf("    Actor System Stats:\n");
    359     printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed);
    360     size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps;
    361     printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses);
    362     printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n",
    363         __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen);
    364     size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen;
    365     printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal);
    366     #endif
    367        
     331        is_shutdown = true;
     332
     333        for ( i; nworkers )
     334                delete( workers[i] );
     335
     336        for ( i; nprocessors ) {
     337                delete( processors[i] );
     338        } // for
     339
     340        #ifdef ACTOR_STATS
     341        size_t misses = 0;
     342        for ( i; nrqueues ) {
     343                misses += worker_req_queues[i]->missed;
     344        }
     345        // adelete( stolen_arr );
     346        // adelete( replaced_queue );
     347        #endif
     348
     349        adelete( workers );
     350        adelete( w_infos );
     351        adelete( local_queues );
     352        adelete( request_queues );
     353        adelete( worker_req_queues );
     354        adelete( processors );
     355        if ( seperate_clus ) delete( cluster );
     356
     357        #ifdef ACTOR_STATS // print formatted stats
     358        printf("        Actor System Stats:\n");
     359        printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed);
     360        size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps;
     361        printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses);
     362        printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n",
     363                __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen);
     364        size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen;
     365        printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal);
     366        #endif
     367               
    368368}
    369369
     
    372372
    373373static inline size_t __get_next_ticket( executor & this ) with(this) {
    374     #ifdef __CFA_DEBUG__
    375     size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
    376 
    377     // reserve MAX for dead actors
    378     if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
    379     return temp;
    380     #else
    381     return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues;
    382     #endif
     374        #ifdef __CFA_DEBUG__
     375        size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
     376
     377        // reserve MAX for dead actors
     378        if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
     379        return temp;
     380        #else
     381        return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues;
     382        #endif
    383383} // tickets
    384384
    385385// TODO: update globals in this file to be static fields once the static fields project is done
    386386static executor * __actor_executor_ = 0p;
    387 static bool __actor_executor_passed = false;            // was an executor passed to start_actor_system
    388 static size_t __num_actors_ = 0;                                        // number of actor objects in system
     387static bool __actor_executor_passed = false;                    // was an executor passed to start_actor_system
     388static size_t __num_actors_ = 0;                                                // number of actor objects in system
    389389static struct thread$ * __actor_executor_thd = 0p;              // used to wake executor after actors finish
    390390struct actor {
    391     size_t ticket;                                          // executor-queue handle
    392     allocation alloc;                                       // allocation action
    393     inline virtual_dtor;
     391        size_t ticket;                                                                          // executor-queue handle
     392        allocation alloc;                                                                       // allocation action
     393        inline virtual_dtor;
    394394};
    395395
    396396static inline void ?{}( actor & this ) with(this) {
    397     // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
    398     // member must be called to end it
    399     DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );
    400     alloc = Nodelete;
    401     ticket = __get_next_ticket( *__actor_executor_ );
    402     __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED );
    403     #ifdef ACTOR_STATS
    404     __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST );
    405     #endif
     397        // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
     398        // member must be called to end it
     399        DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );
     400        alloc = Nodelete;
     401        ticket = __get_next_ticket( *__actor_executor_ );
     402        __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED );
     403        #ifdef ACTOR_STATS
     404        __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST );
     405        #endif
    406406}
    407407
    408408static inline void check_actor( actor & this ) {
    409     if ( this.alloc != Nodelete ) {
    410         switch( this.alloc ) {
    411             case Delete: delete( &this ); break;
    412             case Destroy:
    413                 CFA_DEBUG( this.ticket = MAX; );        // mark as terminated
    414                 ^?{}(this);
    415                 break;
    416             case Finished:
    417                 CFA_DEBUG( this.ticket = MAX; );        // mark as terminated
    418                 break;
    419             default: ;                                                          // stop warning
    420         }
    421 
    422         if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated
    423             unpark( __actor_executor_thd );
    424         }
    425     }
     409        if ( this.alloc != Nodelete ) {
     410                switch( this.alloc ) {
     411                        case Delete: delete( &this ); break;
     412                        case Destroy:
     413                                CFA_DEBUG( this.ticket = MAX; );                // mark as terminated
     414                                ^?{}(this);
     415                                break;
     416                        case Finished:
     417                                CFA_DEBUG( this.ticket = MAX; );                // mark as terminated
     418                                break;
     419                        default: ;                                                                      // stop warning
     420                }
     421
     422                if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated
     423                        unpark( __actor_executor_thd );
     424                }
     425        }
    426426}
    427427
    428428struct message {
    429     allocation alloc;                   // allocation action
    430     inline virtual_dtor;
     429        allocation alloc;                                                                       // allocation action
     430        inline virtual_dtor;
    431431};
    432432
    433433static inline void ?{}( message & this ) {
    434     this.alloc = Nodelete;
     434        this.alloc = Nodelete;
    435435}
    436436static inline void ?{}( message & this, allocation alloc ) {
    437     memcpy( &this.alloc, &alloc, sizeof(allocation) ); // optimization to elide ctor
    438     CFA_DEBUG( if( this.alloc == Finished ) this.alloc = Nodelete; )
     437        memcpy( &this.alloc, &alloc, sizeof(allocation) );      // optimization to elide ctor
     438        CFA_DEBUG( if ( this.alloc == Finished ) this.alloc = Nodelete; );
    439439}
    440440static inline void ^?{}( message & this ) with(this) {
    441     CFA_DEBUG(
     441        CFA_DEBUG(
    442442                if ( alloc == Nodelete ) {
    443443                        printf( "CFA warning (UNIX pid:%ld) : program terminating with message %p allocated but never sent.\n",
     
    448448
    449449static inline void check_message( message & this ) {
    450     switch ( this.alloc ) {                                             // analyze message status
    451         case Nodelete: CFA_DEBUG( this.alloc = Finished ); break;
    452         case Delete: delete( &this ); break;
    453         case Destroy: ^?{}( this ); break;
    454         case Finished: break;
    455     } // switch
    456 }
    457 static inline void set_allocation( message & this, allocation state ) {
    458     CFA_DEBUG( if ( state == Nodelete ) state = Finished; )
    459     this.alloc = state;
     450        switch ( this.alloc ) {                                         // analyze message status
     451                case Nodelete: CFA_DEBUG( this.alloc = Finished ); break;
     452                case Delete: delete( &this ); break;
     453                case Destroy: ^?{}( this ); break;
     454                case Finished: break;
     455        } // switch
     456}
     457static inline allocation set_allocation( message & this, allocation state ) {
     458        CFA_DEBUG( if ( state == Nodelete ) state = Finished; );
     459        allocation prev = this.alloc;
     460        this.alloc = state;
     461        return prev;
     462}
     463static inline allocation get_allocation( message & this ) {
     464        return this.alloc;
    460465}
    461466
    462467static inline void deliver_request( request & this ) {
    463     DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
    464     actor * base_actor;
    465     message * base_msg;
    466     allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg );
    467     memcpy( &base_actor->alloc, &temp, sizeof(allocation) ); // optimization to elide ctor
    468     check_message( *base_msg );
    469     check_actor( *base_actor );
     468        DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
     469        actor * base_actor;
     470        message * base_msg;
     471        allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg );
     472        memcpy( &base_actor->alloc, &temp, sizeof(allocation) ); // optimization to elide ctor
     473        check_message( *base_msg );
     474        check_actor( *base_actor );
    470475}
    471476
     
    473478// returns ptr to newly owned queue if swap succeeds
    474479static inline work_queue * try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) {
    475     work_queue * my_queue = request_queues[my_idx];
    476     work_queue * other_queue = request_queues[victim_idx];
    477 
    478     // if either queue is 0p then they are in the process of being stolen
    479     if ( other_queue == 0p ) return 0p;
    480 
    481     // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false
    482     if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )
    483         return 0p;
    484 
    485     // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false
    486     if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
    487         /* paranoid */ verify( request_queues[my_idx] == 0p );
    488         request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val
    489         return 0p;
    490     }
    491 
    492     // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically
    493     request_queues[my_idx] = other_queue; // last write does not need to be atomic
    494     return other_queue;
     480        work_queue * my_queue = request_queues[my_idx];
     481        work_queue * other_queue = request_queues[victim_idx];
     482
     483        // if either queue is 0p then they are in the process of being stolen
     484        if ( other_queue == 0p ) return 0p;
     485
     486        // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false
     487        if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )
     488                return 0p;
     489
     490        // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false
     491        if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
     492                /* paranoid */ verify( request_queues[my_idx] == 0p );
     493                request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val
     494                return 0p;
     495        }
     496
     497        // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically
     498        request_queues[my_idx] = other_queue; // last write does not need to be atomic
     499        return other_queue;
    495500}
    496501
    497502// once a worker to steal from has been chosen, choose queue to steal from
    498503static inline void choose_queue( worker & this, unsigned int victim_id, unsigned int swap_idx ) with(this) {
    499     // have to calculate victim start and range since victim may be deleted before us in shutdown
    500     const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers;
    501     const unsigned int extras = executor_->nrqueues % executor_->nworkers;
    502     unsigned int vic_start, vic_range;
    503     if ( extras > victim_id  ) {
    504         vic_range = queues_per_worker + 1;
    505         vic_start = vic_range * victim_id;
    506     } else {
    507         vic_start = extras + victim_id * queues_per_worker;
    508         vic_range = queues_per_worker;
    509     }
    510     unsigned int start_idx = prng( vic_range );
    511 
    512     unsigned int tries = 0;
    513     work_queue * curr_steal_queue;
    514 
    515     for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) {
    516         tries++;
    517         curr_steal_queue = request_queues[ i + vic_start ];
    518         // avoid empty queues and queues that are being operated on
    519         if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) )
    520             continue;
    521 
    522         #ifdef ACTOR_STATS
    523         curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
    524         if ( curr_steal_queue ) {
    525             executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count;
    526             executor_->w_infos[id].stolen++;
    527             if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++;
    528             // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);
    529             // replaced_queue[swap_idx]++;
    530             // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED);
    531         } else {
    532             executor_->w_infos[id].failed_swaps++;
    533         }
    534         #else
    535         curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
    536         #endif // ACTOR_STATS
    537 
    538         return;
    539     }
    540 
    541     return;
     504        // have to calculate victim start and range since victim may be deleted before us in shutdown
     505        const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers;
     506        const unsigned int extras = executor_->nrqueues % executor_->nworkers;
     507        unsigned int vic_start, vic_range;
     508        if ( extras > victim_id  ) {
     509                vic_range = queues_per_worker + 1;
     510                vic_start = vic_range * victim_id;
     511        } else {
     512                vic_start = extras + victim_id * queues_per_worker;
     513                vic_range = queues_per_worker;
     514        }
     515        unsigned int start_idx = prng( vic_range );
     516
     517        unsigned int tries = 0;
     518        work_queue * curr_steal_queue;
     519
     520        for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) {
     521                tries++;
     522                curr_steal_queue = request_queues[ i + vic_start ];
     523                // avoid empty queues and queues that are being operated on
     524                if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) )
     525                        continue;
     526
     527                #ifdef ACTOR_STATS
     528                curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
     529                if ( curr_steal_queue ) {
     530                        executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count;
     531                        executor_->w_infos[id].stolen++;
     532                        if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++;
     533                        // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);
     534                        // replaced_queue[swap_idx]++;
     535                        // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED);
     536                } else {
     537                        executor_->w_infos[id].failed_swaps++;
     538                }
     539                #else
     540                curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
     541                #endif // ACTOR_STATS
     542
     543                return;
     544        }
     545
     546        return;
    542547}
    543548
    544549// choose a worker to steal from
    545550static inline void steal_work( worker & this, unsigned int swap_idx ) with(this) {
    546     #if RAND
    547     unsigned int victim = prng( executor_->nworkers );
    548     if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers;
    549     choose_queue( this, victim, swap_idx );
    550     #elif SEARCH
    551     unsigned long long min = MAX; // smaller timestamp means longer since service
    552     int min_id = 0; // use ints not uints to avoid integer underflow without hacky math
    553     int n_workers = executor_->nworkers;
    554     unsigned long long curr_stamp;
    555     int scount = 1;
    556     for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) {
    557         curr_stamp = executor_->w_infos[i].stamp;
    558         if ( curr_stamp < min ) {
    559             min = curr_stamp;
    560             min_id = i;
    561         }
    562     }
    563     choose_queue( this, min_id, swap_idx );
    564     #endif
     551        #if RAND
     552        unsigned int victim = prng( executor_->nworkers );
     553        if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers;
     554        choose_queue( this, victim, swap_idx );
     555        #elif SEARCH
     556        unsigned long long min = MAX; // smaller timestamp means longer since service
     557        int min_id = 0; // use ints not uints to avoid integer underflow without hacky math
     558        int n_workers = executor_->nworkers;
     559        unsigned long long curr_stamp;
     560        int scount = 1;
     561        for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) {
     562                curr_stamp = executor_->w_infos[i].stamp;
     563                if ( curr_stamp < min ) {
     564                        min = curr_stamp;
     565                        min_id = i;
     566                }
     567        }
     568        choose_queue( this, min_id, swap_idx );
     569        #endif
    565570}
    566571
    567572#define CHECK_TERMINATION if ( unlikely( executor_->is_shutdown ) ) break Exit
    568573void main( worker & this ) with(this) {
    569     // #ifdef ACTOR_STATS
    570     // for ( i; executor_->nrqueues ) {
    571     //    replaced_queue[i] = 0;
    572     //    __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST );
    573     // }
    574     // #endif
    575 
    576     // threshold of empty queues we see before we go stealing
    577     const unsigned int steal_threshold = 2 * range;
    578 
    579     // Store variable data here instead of worker struct to avoid any potential false sharing
    580     unsigned int empty_count = 0;
    581     request & req;
    582     work_queue * curr_work_queue;
    583 
    584     Exit:
    585     for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers
    586         curr_work_queue = request_queues[i + start];
    587        
    588         // check if queue is empty before trying to gulp it
    589         if ( is_empty( *curr_work_queue->c_queue ) ) {
    590             #ifdef __STEAL
    591             empty_count++;
    592             if ( empty_count < steal_threshold ) continue;
    593             #else
    594             continue;
    595             #endif
    596         }
    597         transfer( *curr_work_queue, &current_queue );
    598         #ifdef ACTOR_STATS
    599         executor_->w_infos[id].gulps++;
    600         #endif // ACTOR_STATS
    601         #ifdef __STEAL
    602         if ( is_empty( *current_queue ) ) {
    603             if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; }
    604             empty_count++;
    605             if ( empty_count < steal_threshold ) continue;
    606             empty_count = 0;
    607 
    608             CHECK_TERMINATION; // check for termination
    609 
    610             __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED );
    611            
    612             #ifdef ACTOR_STATS
    613             executor_->w_infos[id].try_steal++;
    614             #endif // ACTOR_STATS
    615            
    616             steal_work( this, start + prng( range ) );
    617             continue;
    618         }
    619         #endif // __STEAL
    620         while ( ! is_empty( *current_queue ) ) {
    621             #ifdef ACTOR_STATS
    622             executor_->w_infos[id].processed++;
    623             #endif
    624             &req = &remove( *current_queue );
    625             if ( !&req ) continue;
    626             deliver_request( req );
    627         }
    628         #ifdef __STEAL
    629         curr_work_queue->being_processed = false; // set done processing
    630         empty_count = 0; // we found work so reset empty counter
    631         #endif
    632 
    633         CHECK_TERMINATION;
    634        
    635         // potentially reclaim some of the current queue's vector space if it is unused
    636         reclaim( *current_queue );
    637     } // for
     574        // #ifdef ACTOR_STATS
     575        // for ( i; executor_->nrqueues ) {
     576        //      replaced_queue[i] = 0;
     577        //      __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST );
     578        // }
     579        // #endif
     580
     581        // threshold of empty queues we see before we go stealing
     582        const unsigned int steal_threshold = 2 * range;
     583
     584        // Store variable data here instead of worker struct to avoid any potential false sharing
     585        unsigned int empty_count = 0;
     586        request & req;
     587        work_queue * curr_work_queue;
     588
     589        Exit:
     590        for ( unsigned int i = 0;; i = (i + 1) % range ) {      // cycle through set of request buffers
     591                curr_work_queue = request_queues[i + start];
     592               
     593                // check if queue is empty before trying to gulp it
     594                if ( is_empty( *curr_work_queue->c_queue ) ) {
     595                        #ifdef __STEAL
     596                        empty_count++;
     597                        if ( empty_count < steal_threshold ) continue;
     598                        #else
     599                        continue;
     600                        #endif
     601                }
     602                transfer( *curr_work_queue, &current_queue );
     603                #ifdef ACTOR_STATS
     604                executor_->w_infos[id].gulps++;
     605                #endif // ACTOR_STATS
     606                #ifdef __STEAL
     607                if ( is_empty( *current_queue ) ) {
     608                        if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; }
     609                        empty_count++;
     610                        if ( empty_count < steal_threshold ) continue;
     611                        empty_count = 0;
     612
     613                        CHECK_TERMINATION; // check for termination
     614
     615                        __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED );
     616                       
     617                        #ifdef ACTOR_STATS
     618                        executor_->w_infos[id].try_steal++;
     619                        #endif // ACTOR_STATS
     620                       
     621                        steal_work( this, start + prng( range ) );
     622                        continue;
     623                }
     624                #endif // __STEAL
     625                while ( ! is_empty( *current_queue ) ) {
     626                        #ifdef ACTOR_STATS
     627                        executor_->w_infos[id].processed++;
     628                        #endif
     629                        &req = &remove( *current_queue );
     630                        if ( !&req ) continue;
     631                        deliver_request( req );
     632                }
     633                #ifdef __STEAL
     634                curr_work_queue->being_processed = false;               // set done processing
     635                empty_count = 0; // we found work so reset empty counter
     636                #endif
     637
     638                CHECK_TERMINATION;
     639               
     640                // potentially reclaim some of the current queue's vector space if it is unused
     641                reclaim( *current_queue );
     642        } // for
    638643}
    639644
    640645static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) {
    641     insert( request_queues[ticket], req);
     646        insert( request_queues[ticket], req);
    642647}
    643648
    644649static inline void send( actor & this, request & req ) {
    645     DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
    646     send( *__actor_executor_, req, this.ticket );
     650        DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
     651        send( *__actor_executor_, req, this.ticket );
    647652}
    648653
    649654static inline void __reset_stats() {
    650     #ifdef ACTOR_STATS
    651     __total_tries = 0;
    652     __total_stolen = 0;
    653     __all_gulps = 0;
    654     __total_failed_swaps = 0;
    655     __total_empty_stolen = 0;
    656     __all_processed = 0;
    657     __num_actors_stats = 0;
    658     __all_msgs_stolen = 0;
    659     #endif
     655        #ifdef ACTOR_STATS
     656        __total_tries = 0;
     657        __total_stolen = 0;
     658        __all_gulps = 0;
     659        __total_failed_swaps = 0;
     660        __total_empty_stolen = 0;
     661        __all_processed = 0;
     662        __num_actors_stats = 0;
     663        __all_msgs_stolen = 0;
     664        #endif
    660665}
    661666
    662667static inline void start_actor_system( size_t num_thds ) {
    663     __reset_stats();
    664     __actor_executor_thd = active_thread();
    665     __actor_executor_ = alloc();
    666     (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 };
     668        __reset_stats();
     669        __actor_executor_thd = active_thread();
     670        __actor_executor_ = alloc();
     671        (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 };
    667672}
    668673
     
    670675
    671676static inline void start_actor_system( executor & this ) {
    672     __reset_stats();
    673     __actor_executor_thd = active_thread();
    674     __actor_executor_ = &this;
    675     __actor_executor_passed = true;
     677        __reset_stats();
     678        __actor_executor_thd = active_thread();
     679        __actor_executor_ = &this;
     680        __actor_executor_passed = true;
    676681}
    677682
    678683static inline void stop_actor_system() {
    679     park( ); // unparked when actor system is finished
    680 
    681     if ( !__actor_executor_passed ) delete( __actor_executor_ );
    682     __actor_executor_ = 0p;
    683     __actor_executor_thd = 0p;
    684     __next_ticket = 0;
    685     __actor_executor_passed = false;
     684        park();                                                                                         // unparked when actor system is finished
     685
     686        if ( !__actor_executor_passed ) delete( __actor_executor_ );
     687        __actor_executor_ = 0p;
     688        __actor_executor_thd = 0p;
     689        __next_ticket = 0;
     690        __actor_executor_passed = false;
    686691}
    687692
     
    689694// assigned at creation to __base_msg_finished to avoid unused message warning
    690695message __base_msg_finished @= { .alloc : Finished };
    691 struct delete_message_t { inline message; } delete_msg = __base_msg_finished;
     696struct delete_msg_t { inline message; } delete_msg = __base_msg_finished;
    692697struct destroy_msg_t { inline message; } destroy_msg = __base_msg_finished;
    693698struct finished_msg_t { inline message; } finished_msg = __base_msg_finished;
    694699
    695 allocation receive( actor & this, delete_message_t & msg ) { return Delete; }
     700allocation receive( actor & this, delete_msg_t & msg ) { return Delete; }
    696701allocation receive( actor & this, destroy_msg_t & msg ) { return Destroy; }
    697702allocation receive( actor & this, finished_msg_t & msg ) { return Finished; }
    698 
Note: See TracChangeset for help on using the changeset viewer.