Changes in / [2dfdae3:508671e]


Ignore:
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/actor.hfa

    r2dfdae3 r508671e  
    4848typedef allocation (*__receive_fn)(actor &, message &, actor **, message **);
    4949struct request {
    50         actor * receiver;
    51         message * msg;
    52         __receive_fn fn;
     50    actor * receiver;
     51    message * msg;
     52    __receive_fn fn;
    5353};
    5454
    5555struct a_msg {
    56         int m;
     56    int m;
    5757};
    5858static inline void ?{}( request & this ) {}
    5959static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {
    60         this.receiver = receiver;
    61         this.msg = msg;
    62         this.fn = fn;
     60    this.receiver = receiver;
     61    this.msg = msg;
     62    this.fn = fn;
    6363}
    6464static inline void ?{}( request & this, request & copy ) {
    65         this.receiver = copy.receiver;
    66         this.msg = copy.msg;
    67         this.fn = copy.fn;
     65    this.receiver = copy.receiver;
     66    this.msg = copy.msg;
     67    this.fn = copy.fn;
    6868}
    6969
     
    7171// assumes gulping behaviour (once a remove occurs, removes happen until empty beforw next insert)
    7272struct copy_queue {
    73         request * buffer;
    74         size_t count, buffer_size, index, utilized, last_size;
     73    request * buffer;
     74    size_t count, buffer_size, index, utilized, last_size;
    7575};
    7676static inline void ?{}( copy_queue & this ) {}
    7777static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) {
    78         buffer_size = buf_size;
    79         buffer = aalloc( buffer_size );
    80         count = 0;
    81         utilized = 0;
    82         index = 0;
    83         last_size = 0;
     78    buffer_size = buf_size;
     79    buffer = aalloc( buffer_size );
     80    count = 0;
     81    utilized = 0;
     82    index = 0;
     83    last_size = 0;
    8484}
    8585static inline void ^?{}( copy_queue & this ) with(this) {
    86         DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" );
    87         adelete(buffer);
     86    DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" );
     87    adelete(buffer);
    8888}
    8989
    9090static inline void insert( copy_queue & this, request & elem ) with(this) {
    91         if ( count >= buffer_size ) { // increase arr size
    92                 last_size = buffer_size;
    93                 buffer_size = 2 * buffer_size;
    94                 buffer = realloc( buffer, sizeof( request ) * buffer_size );
    95                 /* paranoid */ verify( buffer );
    96         }
    97         memcpy( &buffer[count], &elem, sizeof(request) );
    98         count++;
     91    if ( count >= buffer_size ) { // increase arr size
     92        last_size = buffer_size;
     93        buffer_size = 2 * buffer_size;
     94        buffer = realloc( buffer, sizeof( request ) * buffer_size );
     95        /* paranoid */ verify( buffer );
     96    }
     97    memcpy( &buffer[count], &elem, sizeof(request) );
     98    count++;
    9999}
    100100
     
    102102// it is not supported to call insert() before the array is fully empty
    103103static inline request & remove( copy_queue & this ) with(this) {
    104         if ( count > 0 ) {
    105                 count--;
    106                 size_t old_idx = index;
    107                 index = count == 0 ? 0 : index + 1;
    108                 return buffer[old_idx];
    109         }
    110         request * ret = 0p;
    111         return *0p;
     104    if ( count > 0 ) {
     105        count--;
     106        size_t old_idx = index;
     107        index = count == 0 ? 0 : index + 1;
     108        return buffer[old_idx];
     109    }
     110    request * ret = 0p;
     111    return *0p;
    112112}
    113113
    114114// try to reclaim some memory if less than half of buffer is utilized
    115115static inline void reclaim( copy_queue & this ) with(this) {
    116         if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; }
    117         utilized = 0;
    118         buffer_size--;
    119         buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory
     116    if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; }
     117    utilized = 0;
     118    buffer_size--;
     119    buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory
    120120}
    121121
     
    123123
    124124struct work_queue {
    125         __spinlock_t mutex_lock;
    126         copy_queue * owned_queue;                                                       // copy queue allocated and cleaned up by this work_queue
    127         copy_queue * c_queue;                                                           // current queue
    128         volatile bool being_processed;                                          // flag to prevent concurrent processing
    129         #ifdef ACTOR_STATS
    130         unsigned int id;
    131         size_t missed;                                                                          // transfers skipped due to being_processed flag being up
    132         #endif
     125    __spinlock_t mutex_lock;
     126    copy_queue * owned_queue;       // copy queue allocated and cleaned up by this work_queue
     127    copy_queue * c_queue;           // current queue
     128    volatile bool being_processed;  // flag to prevent concurrent processing
     129    #ifdef ACTOR_STATS
     130    unsigned int id;
     131    size_t missed;                  // transfers skipped due to being_processed flag being up
     132    #endif
    133133}; // work_queue
    134134static inline void ?{}( work_queue & this, size_t buf_size, unsigned int i ) with(this) {
    135         owned_queue = alloc();                                                          // allocated separately to avoid false sharing
    136         (*owned_queue){ buf_size };
    137         c_queue = owned_queue;
    138         being_processed = false;
    139         #ifdef ACTOR_STATS
    140         id = i;
    141         missed = 0;
    142         #endif
     135    owned_queue = alloc();      // allocated separately to avoid false sharing
     136    (*owned_queue){ buf_size };
     137    c_queue = owned_queue;
     138    being_processed = false;
     139    #ifdef ACTOR_STATS
     140    id = i;
     141    missed = 0;
     142    #endif
    143143}
    144144
     
    147147
    148148static inline void insert( work_queue & this, request & elem ) with(this) {
    149         lock( mutex_lock __cfaabi_dbg_ctx2 );
    150         insert( *c_queue, elem );
    151         unlock( mutex_lock );
     149    lock( mutex_lock __cfaabi_dbg_ctx2 );
     150    insert( *c_queue, elem );
     151    unlock( mutex_lock );
    152152} // insert
    153153
    154154static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) {
    155         lock( mutex_lock __cfaabi_dbg_ctx2 );
    156         #ifdef __STEAL
    157 
    158         // check if queue is being processed elsewhere
    159         if ( unlikely( being_processed ) ) {
    160                 #ifdef ACTOR_STATS
    161                 missed++;
    162                 #endif
    163                 unlock( mutex_lock );
    164                 return;
    165         }
    166 
    167         being_processed = c_queue->count != 0;
    168         #endif // __STEAL
    169 
    170         c_queue->utilized = c_queue->count;
    171 
    172         // swap copy queue ptrs
    173         copy_queue * temp = *transfer_to;
    174         *transfer_to = c_queue;
    175         c_queue = temp;
    176         unlock( mutex_lock );
     155    lock( mutex_lock __cfaabi_dbg_ctx2 );
     156    #ifdef __STEAL
     157
     158    // check if queue is being processed elsewhere
     159    if ( unlikely( being_processed ) ) {
     160        #ifdef ACTOR_STATS
     161        missed++;
     162        #endif
     163        unlock( mutex_lock );
     164        return;
     165    }
     166
     167    being_processed = c_queue->count != 0;
     168    #endif // __STEAL
     169
     170    c_queue->utilized = c_queue->count;
     171
     172    // swap copy queue ptrs
     173    copy_queue * temp = *transfer_to;
     174    *transfer_to = c_queue;
     175    c_queue = temp;
     176    unlock( mutex_lock );
    177177} // transfer
    178178
    179179// needed since some info needs to persist past worker lifetimes
    180180struct worker_info {
    181         volatile unsigned long long stamp;
    182         #ifdef ACTOR_STATS
    183         size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen;
    184         unsigned long long processed;
    185         size_t gulps;
    186         #endif
     181    volatile unsigned long long stamp;
     182    #ifdef ACTOR_STATS
     183    size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen;
     184    unsigned long long processed;
     185    size_t gulps;
     186    #endif
    187187};
    188188static inline void ?{}( worker_info & this ) {
    189         #ifdef ACTOR_STATS
    190         this.stolen_from = 0;
    191         this.try_steal = 0;                                                                     // attempts to steal
    192         this.stolen = 0;                                                                        // successful steals
    193         this.processed = 0;                                                                     // requests processed
    194         this.gulps = 0;                                                                         // number of gulps
    195         this.failed_swaps = 0;                                                          // steal swap failures
    196         this.empty_stolen = 0;                                                          // queues empty after steal
    197         this.msgs_stolen = 0;                                                           // number of messages stolen
    198         #endif
    199         this.stamp = rdtscl();
     189    #ifdef ACTOR_STATS
     190    this.stolen_from = 0;
     191    this.try_steal = 0;                             // attempts to steal
     192    this.stolen = 0;                                // successful steals
     193    this.processed = 0;                             // requests processed
     194    this.gulps = 0;                                 // number of gulps
     195    this.failed_swaps = 0;                          // steal swap failures
     196    this.empty_stolen = 0;                          // queues empty after steal
     197    this.msgs_stolen = 0;                           // number of messages stolen
     198    #endif
     199    this.stamp = rdtscl();
    200200}
    201201
     
    205205// #endif
    206206thread worker {
    207         work_queue ** request_queues;
    208         copy_queue * current_queue;
    209         executor * executor_;
    210         unsigned int start, range;
    211         int id;
     207    work_queue ** request_queues;
     208    copy_queue * current_queue;
     209    executor * executor_;
     210    unsigned int start, range;
     211    int id;
    212212};
    213213
     
    215215// aggregate counters for statistics
    216216size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, __total_empty_stolen = 0,
    217         __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0;
     217    __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0;
    218218#endif
    219219static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, executor * executor_,
    220         unsigned int start, unsigned int range, int id ) {
    221         ((thread &)this){ clu };
    222         this.request_queues = request_queues;                           // array of all queues
    223         this.current_queue = current_queue;                                     // currently gulped queue (start with empty queue to use in swap later)
    224         this.executor_ = executor_;                                                     // pointer to current executor
    225         this.start = start;                                                                     // start of worker's subrange of request_queues
    226         this.range = range;                                                                     // size of worker's subrange of request_queues
    227         this.id = id;                                                                           // worker's id and index in array of workers
     220    unsigned int start, unsigned int range, int id ) {
     221    ((thread &)this){ clu };
     222    this.request_queues = request_queues;           // array of all queues
     223    this.current_queue = current_queue;             // currently gulped queue (start with empty queue to use in swap later)
     224    this.executor_ = executor_;                     // pointer to current executor
     225    this.start = start;                             // start of worker's subrange of request_queues
     226    this.range = range;                             // size of worker's subrange of request_queues
     227    this.id = id;                                   // worker's id and index in array of workers
    228228}
    229229
    230230static bool no_steal = false;
    231231struct executor {
    232         cluster * cluster;                                                                      // if workers execute on separate cluster
    233         processor ** processors;                                                        // array of virtual processors adding parallelism for workers
    234         work_queue * request_queues;                                            // master array of work request queues
    235         copy_queue * local_queues;                                                      // array of all worker local queues to avoid deletion race
    236         work_queue ** worker_req_queues;                                        // secondary array of work queues to allow for swapping
    237         worker ** workers;                                                                      // array of workers executing work requests
    238         worker_info * w_infos;                                                          // array of info about each worker
    239         unsigned int nprocessors, nworkers, nrqueues;           // number of processors/threads/request queues
    240         bool seperate_clus;                                                                     // use same or separate cluster for executor
    241         volatile bool is_shutdown;                                                      // flag to communicate shutdown to worker threads
     232    cluster * cluster;                                                      // if workers execute on separate cluster
     233        processor ** processors;                                            // array of virtual processors adding parallelism for workers
     234        work_queue * request_queues;                                // master array of work request queues
     235    copy_queue * local_queues;                      // array of all worker local queues to avoid deletion race
     236        work_queue ** worker_req_queues;                // secondary array of work queues to allow for swapping
     237    worker ** workers;                                                          // array of workers executing work requests
     238    worker_info * w_infos;                          // array of info about each worker
     239        unsigned int nprocessors, nworkers, nrqueues;   // number of processors/threads/request queues
     240        bool seperate_clus;                                                             // use same or separate cluster for executor
     241    volatile bool is_shutdown;                      // flag to communicate shutdown to worker threads
    242242}; // executor
    243243
     
    246246// #endif
    247247static inline void ^?{}( worker & mutex this ) with(this) {
    248         #ifdef ACTOR_STATS
    249         __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST);
    250         __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST);
    251         __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST);
    252         __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST);
    253         __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST);
    254         __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST);
    255         __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST);
    256 
    257         // per worker steal stats (uncomment alongside the lock above this routine to print)
    258         // lock( out_lock __cfaabi_dbg_ctx2 );
    259         // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) );
    260         // int count = 0;
    261         // int count2 = 0;
    262         // for ( i; range ) {
    263         //      if ( replaced_queue[start + i] > 0 ){
    264         //              count++;
    265         //              // printf("%d: %u, ",i, replaced_queue[i]);
    266         //      }
    267         //      if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0)
    268         //              count2++;
    269         // }
    270         // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers );
    271         // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers );
    272         // unlock( out_lock );
    273         #endif
     248    #ifdef ACTOR_STATS
     249    __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST);
     250    __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST);
     251    __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST);
     252    __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST);
     253    __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST);
     254    __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST);
     255    __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST);
     256
     257    // per worker steal stats (uncomment alongside the lock above this routine to print)
     258    // lock( out_lock __cfaabi_dbg_ctx2 );
     259    // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) );
     260    // int count = 0;
     261    // int count2 = 0;
     262    // for ( i; range ) {
     263    //    if ( replaced_queue[start + i] > 0 ){
     264    //        count++;
     265    //        // printf("%d: %u, ",i, replaced_queue[i]);
     266    //    }
     267    //    if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0)
     268    //        count2++;
     269    // }
     270    // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers );
     271    // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers );
     272    // unlock( out_lock );
     273    #endif
    274274}
    275275
    276276static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) {
    277         if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
    278         this.nprocessors = nprocessors;
    279         this.nworkers = nworkers;
    280         this.nrqueues = nrqueues;
    281         this.seperate_clus = seperate_clus;
    282         this.is_shutdown = false;
    283 
    284         if ( nworkers == nrqueues )
    285                 no_steal = true;
    286        
    287         #ifdef ACTOR_STATS
    288         // stolen_arr = aalloc( nrqueues );
    289         // replaced_queue = aalloc( nrqueues );
    290         __total_workers = nworkers;
    291         #endif
    292 
    293         if ( seperate_clus ) {
    294                 cluster = alloc();
    295                 (*cluster){};
    296         } else cluster = active_cluster();
    297 
    298         request_queues = aalloc( nrqueues );
    299         worker_req_queues = aalloc( nrqueues );
    300         for ( i; nrqueues ) {
    301                 request_queues[i]{ buf_size, i };
    302                 worker_req_queues[i] = &request_queues[i];
    303         }
    304        
    305         processors = aalloc( nprocessors );
    306         for ( i; nprocessors )
    307                 (*(processors[i] = alloc())){ *cluster };
    308 
    309         local_queues = aalloc( nworkers );
    310         workers = aalloc( nworkers );
    311         w_infos = aalloc( nworkers );
    312         unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
    313 
    314         for ( i; nworkers ) {
    315                 w_infos[i]{};
    316                 local_queues[i]{ buf_size };
    317         }
    318 
    319         for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
    320                 range = reqPerWorker + ( i < extras ? 1 : 0 );
    321                 (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i };
    322         } // for
     277    if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
     278    this.nprocessors = nprocessors;
     279    this.nworkers = nworkers;
     280    this.nrqueues = nrqueues;
     281    this.seperate_clus = seperate_clus;
     282    this.is_shutdown = false;
     283
     284    if ( nworkers == nrqueues )
     285        no_steal = true;
     286   
     287    #ifdef ACTOR_STATS
     288    // stolen_arr = aalloc( nrqueues );
     289    // replaced_queue = aalloc( nrqueues );
     290    __total_workers = nworkers;
     291    #endif
     292
     293    if ( seperate_clus ) {
     294        cluster = alloc();
     295        (*cluster){};
     296    } else cluster = active_cluster();
     297
     298    request_queues = aalloc( nrqueues );
     299    worker_req_queues = aalloc( nrqueues );
     300    for ( i; nrqueues ) {
     301        request_queues[i]{ buf_size, i };
     302        worker_req_queues[i] = &request_queues[i];
     303    }
     304   
     305    processors = aalloc( nprocessors );
     306    for ( i; nprocessors )
     307        (*(processors[i] = alloc())){ *cluster };
     308
     309    local_queues = aalloc( nworkers );
     310    workers = aalloc( nworkers );
     311    w_infos = aalloc( nworkers );
     312    unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
     313
     314    for ( i; nworkers ) {
     315        w_infos[i]{};
     316        local_queues[i]{ buf_size };
     317    }
     318
     319    for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
     320        range = reqPerWorker + ( i < extras ? 1 : 0 );
     321        (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i };
     322    } // for
    323323}
    324324static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __DEFAULT_EXECUTOR_BUFSIZE__ }; }
     
    329329
    330330static inline void ^?{}( executor & this ) with(this) {
    331         is_shutdown = true;
    332 
    333         for ( i; nworkers )
    334                 delete( workers[i] );
    335 
    336         for ( i; nprocessors ) {
    337                 delete( processors[i] );
    338         } // for
    339 
    340         #ifdef ACTOR_STATS
    341         size_t misses = 0;
    342         for ( i; nrqueues ) {
    343                 misses += worker_req_queues[i]->missed;
    344         }
    345         // adelete( stolen_arr );
    346         // adelete( replaced_queue );
    347         #endif
    348 
    349         adelete( workers );
    350         adelete( w_infos );
    351         adelete( local_queues );
    352         adelete( request_queues );
    353         adelete( worker_req_queues );
    354         adelete( processors );
    355         if ( seperate_clus ) delete( cluster );
    356 
    357         #ifdef ACTOR_STATS // print formatted stats
    358         printf("        Actor System Stats:\n");
    359         printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed);
    360         size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps;
    361         printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses);
    362         printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n",
    363                 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen);
    364         size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen;
    365         printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal);
    366         #endif
    367                
     331    is_shutdown = true;
     332
     333    for ( i; nworkers )
     334        delete( workers[i] );
     335
     336    for ( i; nprocessors ) {
     337        delete( processors[i] );
     338    } // for
     339
     340    #ifdef ACTOR_STATS
     341    size_t misses = 0;
     342    for ( i; nrqueues ) {
     343        misses += worker_req_queues[i]->missed;
     344    }
     345    // adelete( stolen_arr );
     346    // adelete( replaced_queue );
     347    #endif
     348
     349    adelete( workers );
     350    adelete( w_infos );
     351    adelete( local_queues );
     352    adelete( request_queues );
     353    adelete( worker_req_queues );
     354    adelete( processors );
     355    if ( seperate_clus ) delete( cluster );
     356
     357    #ifdef ACTOR_STATS // print formatted stats
     358    printf("    Actor System Stats:\n");
     359    printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed);
     360    size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps;
     361    printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses);
     362    printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n",
     363        __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen);
     364    size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen;
     365    printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal);
     366    #endif
     367       
    368368}
    369369
     
    372372
    373373static inline size_t __get_next_ticket( executor & this ) with(this) {
    374         #ifdef __CFA_DEBUG__
    375         size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
    376 
    377         // reserve MAX for dead actors
    378         if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
    379         return temp;
    380         #else
    381         return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues;
    382         #endif
     374    #ifdef __CFA_DEBUG__
     375    size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
     376
     377    // reserve MAX for dead actors
     378    if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
     379    return temp;
     380    #else
     381    return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues;
     382    #endif
    383383} // tickets
    384384
    385385// TODO: update globals in this file to be static fields once the static fields project is done
    386386static executor * __actor_executor_ = 0p;
    387 static bool __actor_executor_passed = false;                    // was an executor passed to start_actor_system
    388 static size_t __num_actors_ = 0;                                                // number of actor objects in system
     387static bool __actor_executor_passed = false;            // was an executor passed to start_actor_system
     388static size_t __num_actors_ = 0;                                        // number of actor objects in system
    389389static struct thread$ * __actor_executor_thd = 0p;              // used to wake executor after actors finish
    390390struct actor {
    391         size_t ticket;                                                                          // executor-queue handle
    392         allocation alloc;                                                                       // allocation action
    393         inline virtual_dtor;
     391    size_t ticket;                                          // executor-queue handle
     392    allocation alloc;                                       // allocation action
     393    inline virtual_dtor;
    394394};
    395395
    396396static inline void ?{}( actor & this ) with(this) {
    397         // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
    398         // member must be called to end it
    399         DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );
    400         alloc = Nodelete;
    401         ticket = __get_next_ticket( *__actor_executor_ );
    402         __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED );
    403         #ifdef ACTOR_STATS
    404         __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST );
    405         #endif
     397    // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
     398    // member must be called to end it
     399    DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );
     400    alloc = Nodelete;
     401    ticket = __get_next_ticket( *__actor_executor_ );
     402    __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED );
     403    #ifdef ACTOR_STATS
     404    __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST );
     405    #endif
    406406}
    407407
    408408static inline void check_actor( actor & this ) {
    409         if ( this.alloc != Nodelete ) {
    410                 switch( this.alloc ) {
    411                         case Delete: delete( &this ); break;
    412                         case Destroy:
    413                                 CFA_DEBUG( this.ticket = MAX; );                // mark as terminated
    414                                 ^?{}(this);
    415                                 break;
    416                         case Finished:
    417                                 CFA_DEBUG( this.ticket = MAX; );                // mark as terminated
    418                                 break;
    419                         default: ;                                                                      // stop warning
    420                 }
    421 
    422                 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated
    423                         unpark( __actor_executor_thd );
    424                 }
    425         }
     409    if ( this.alloc != Nodelete ) {
     410        switch( this.alloc ) {
     411            case Delete: delete( &this ); break;
     412            case Destroy:
     413                CFA_DEBUG( this.ticket = MAX; );        // mark as terminated
     414                ^?{}(this);
     415                break;
     416            case Finished:
     417                CFA_DEBUG( this.ticket = MAX; );        // mark as terminated
     418                break;
     419            default: ;                                                          // stop warning
     420        }
     421
     422        if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated
     423            unpark( __actor_executor_thd );
     424        }
     425    }
    426426}
    427427
    428428struct message {
    429         allocation alloc;                                                                       // allocation action
    430         inline virtual_dtor;
     429    allocation alloc;                   // allocation action
     430    inline virtual_dtor;
    431431};
    432432
    433433static inline void ?{}( message & this ) {
    434         this.alloc = Nodelete;
     434    this.alloc = Nodelete;
    435435}
    436436static inline void ?{}( message & this, allocation alloc ) {
    437         memcpy( &this.alloc, &alloc, sizeof(allocation) );      // optimization to elide ctor
    438         CFA_DEBUG( if ( this.alloc == Finished ) this.alloc = Nodelete; );
     437    memcpy( &this.alloc, &alloc, sizeof(allocation) ); // optimization to elide ctor
     438    CFA_DEBUG( if( this.alloc == Finished ) this.alloc = Nodelete; )
    439439}
    440440static inline void ^?{}( message & this ) with(this) {
    441         CFA_DEBUG(
     441    CFA_DEBUG(
    442442                if ( alloc == Nodelete ) {
    443443                        printf( "CFA warning (UNIX pid:%ld) : program terminating with message %p allocated but never sent.\n",
     
    448448
    449449static inline void check_message( message & this ) {
    450         switch ( this.alloc ) {                                         // analyze message status
    451                 case Nodelete: CFA_DEBUG( this.alloc = Finished ); break;
    452                 case Delete: delete( &this ); break;
    453                 case Destroy: ^?{}( this ); break;
    454                 case Finished: break;
    455         } // switch
    456 }
    457 static inline allocation set_allocation( message & this, allocation state ) {
    458         CFA_DEBUG( if ( state == Nodelete ) state = Finished; );
    459         allocation prev = this.alloc;
    460         this.alloc = state;
    461         return prev;
    462 }
    463 static inline allocation get_allocation( message & this ) {
    464         return this.alloc;
     450    switch ( this.alloc ) {                                             // analyze message status
     451        case Nodelete: CFA_DEBUG( this.alloc = Finished ); break;
     452        case Delete: delete( &this ); break;
     453        case Destroy: ^?{}( this ); break;
     454        case Finished: break;
     455    } // switch
     456}
     457static inline void set_allocation( message & this, allocation state ) {
     458    CFA_DEBUG( if ( state == Nodelete ) state = Finished; )
     459    this.alloc = state;
    465460}
    466461
    467462static inline void deliver_request( request & this ) {
    468         DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
    469         actor * base_actor;
    470         message * base_msg;
    471         allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg );
    472         memcpy( &base_actor->alloc, &temp, sizeof(allocation) ); // optimization to elide ctor
    473         check_message( *base_msg );
    474         check_actor( *base_actor );
     463    DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
     464    actor * base_actor;
     465    message * base_msg;
     466    allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg );
     467    memcpy( &base_actor->alloc, &temp, sizeof(allocation) ); // optimization to elide ctor
     468    check_message( *base_msg );
     469    check_actor( *base_actor );
    475470}
    476471
     
    478473// returns ptr to newly owned queue if swap succeeds
    479474static inline work_queue * try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) {
    480         work_queue * my_queue = request_queues[my_idx];
    481         work_queue * other_queue = request_queues[victim_idx];
    482 
    483         // if either queue is 0p then they are in the process of being stolen
    484         if ( other_queue == 0p ) return 0p;
    485 
    486         // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false
    487         if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )
    488                 return 0p;
    489 
    490         // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false
    491         if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
    492                 /* paranoid */ verify( request_queues[my_idx] == 0p );
    493                 request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val
    494                 return 0p;
    495         }
    496 
    497         // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically
    498         request_queues[my_idx] = other_queue; // last write does not need to be atomic
    499         return other_queue;
     475    work_queue * my_queue = request_queues[my_idx];
     476    work_queue * other_queue = request_queues[victim_idx];
     477
     478    // if either queue is 0p then they are in the process of being stolen
     479    if ( other_queue == 0p ) return 0p;
     480
     481    // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false
     482    if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )
     483        return 0p;
     484
     485    // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false
     486    if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
     487        /* paranoid */ verify( request_queues[my_idx] == 0p );
     488        request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val
     489        return 0p;
     490    }
     491
     492    // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically
     493    request_queues[my_idx] = other_queue; // last write does not need to be atomic
     494    return other_queue;
    500495}
    501496
    502497// once a worker to steal from has been chosen, choose queue to steal from
    503498static inline void choose_queue( worker & this, unsigned int victim_id, unsigned int swap_idx ) with(this) {
    504         // have to calculate victim start and range since victim may be deleted before us in shutdown
    505         const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers;
    506         const unsigned int extras = executor_->nrqueues % executor_->nworkers;
    507         unsigned int vic_start, vic_range;
    508         if ( extras > victim_id  ) {
    509                 vic_range = queues_per_worker + 1;
    510                 vic_start = vic_range * victim_id;
    511         } else {
    512                 vic_start = extras + victim_id * queues_per_worker;
    513                 vic_range = queues_per_worker;
    514         }
    515         unsigned int start_idx = prng( vic_range );
    516 
    517         unsigned int tries = 0;
    518         work_queue * curr_steal_queue;
    519 
    520         for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) {
    521                 tries++;
    522                 curr_steal_queue = request_queues[ i + vic_start ];
    523                 // avoid empty queues and queues that are being operated on
    524                 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) )
    525                         continue;
    526 
    527                 #ifdef ACTOR_STATS
    528                 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
    529                 if ( curr_steal_queue ) {
    530                         executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count;
    531                         executor_->w_infos[id].stolen++;
    532                         if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++;
    533                         // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);
    534                         // replaced_queue[swap_idx]++;
    535                         // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED);
    536                 } else {
    537                         executor_->w_infos[id].failed_swaps++;
    538                 }
    539                 #else
    540                 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
    541                 #endif // ACTOR_STATS
    542 
    543                 return;
    544         }
    545 
    546         return;
     499    // have to calculate victim start and range since victim may be deleted before us in shutdown
     500    const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers;
     501    const unsigned int extras = executor_->nrqueues % executor_->nworkers;
     502    unsigned int vic_start, vic_range;
     503    if ( extras > victim_id  ) {
     504        vic_range = queues_per_worker + 1;
     505        vic_start = vic_range * victim_id;
     506    } else {
     507        vic_start = extras + victim_id * queues_per_worker;
     508        vic_range = queues_per_worker;
     509    }
     510    unsigned int start_idx = prng( vic_range );
     511
     512    unsigned int tries = 0;
     513    work_queue * curr_steal_queue;
     514
     515    for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) {
     516        tries++;
     517        curr_steal_queue = request_queues[ i + vic_start ];
     518        // avoid empty queues and queues that are being operated on
     519        if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) )
     520            continue;
     521
     522        #ifdef ACTOR_STATS
     523        curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
     524        if ( curr_steal_queue ) {
     525            executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count;
     526            executor_->w_infos[id].stolen++;
     527            if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++;
     528            // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);
     529            // replaced_queue[swap_idx]++;
     530            // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED);
     531        } else {
     532            executor_->w_infos[id].failed_swaps++;
     533        }
     534        #else
     535        curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
     536        #endif // ACTOR_STATS
     537
     538        return;
     539    }
     540
     541    return;
    547542}
    548543
    549544// choose a worker to steal from
    550545static inline void steal_work( worker & this, unsigned int swap_idx ) with(this) {
    551         #if RAND
    552         unsigned int victim = prng( executor_->nworkers );
    553         if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers;
    554         choose_queue( this, victim, swap_idx );
    555         #elif SEARCH
    556         unsigned long long min = MAX; // smaller timestamp means longer since service
    557         int min_id = 0; // use ints not uints to avoid integer underflow without hacky math
    558         int n_workers = executor_->nworkers;
    559         unsigned long long curr_stamp;
    560         int scount = 1;
    561         for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) {
    562                 curr_stamp = executor_->w_infos[i].stamp;
    563                 if ( curr_stamp < min ) {
    564                         min = curr_stamp;
    565                         min_id = i;
    566                 }
    567         }
    568         choose_queue( this, min_id, swap_idx );
    569         #endif
     546    #if RAND
     547    unsigned int victim = prng( executor_->nworkers );
     548    if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers;
     549    choose_queue( this, victim, swap_idx );
     550    #elif SEARCH
     551    unsigned long long min = MAX; // smaller timestamp means longer since service
     552    int min_id = 0; // use ints not uints to avoid integer underflow without hacky math
     553    int n_workers = executor_->nworkers;
     554    unsigned long long curr_stamp;
     555    int scount = 1;
     556    for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) {
     557        curr_stamp = executor_->w_infos[i].stamp;
     558        if ( curr_stamp < min ) {
     559            min = curr_stamp;
     560            min_id = i;
     561        }
     562    }
     563    choose_queue( this, min_id, swap_idx );
     564    #endif
    570565}
    571566
    572567#define CHECK_TERMINATION if ( unlikely( executor_->is_shutdown ) ) break Exit
    573568void main( worker & this ) with(this) {
    574         // #ifdef ACTOR_STATS
    575         // for ( i; executor_->nrqueues ) {
    576         //      replaced_queue[i] = 0;
    577         //      __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST );
    578         // }
    579         // #endif
    580 
    581         // threshold of empty queues we see before we go stealing
    582         const unsigned int steal_threshold = 2 * range;
    583 
    584         // Store variable data here instead of worker struct to avoid any potential false sharing
    585         unsigned int empty_count = 0;
    586         request & req;
    587         work_queue * curr_work_queue;
    588 
    589         Exit:
    590         for ( unsigned int i = 0;; i = (i + 1) % range ) {      // cycle through set of request buffers
    591                 curr_work_queue = request_queues[i + start];
     569    // #ifdef ACTOR_STATS
     570    // for ( i; executor_->nrqueues ) {
     571    //    replaced_queue[i] = 0;
     572    //    __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST );
     573    // }
     574    // #endif
     575
     576    // threshold of empty queues we see before we go stealing
     577    const unsigned int steal_threshold = 2 * range;
     578
     579    // Store variable data here instead of worker struct to avoid any potential false sharing
     580    unsigned int empty_count = 0;
     581    request & req;
     582    work_queue * curr_work_queue;
     583
     584    Exit:
     585    for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers
     586        curr_work_queue = request_queues[i + start];
    592587
    593588        #ifndef __STEAL
    594589        CHECK_TERMINATION;
    595590        #endif
    596                
    597                 // check if queue is empty before trying to gulp it
    598                 if ( is_empty( *curr_work_queue->c_queue ) ) {
    599                         #ifdef __STEAL
    600                         empty_count++;
    601                         if ( empty_count < steal_threshold ) continue;
    602                         #else
    603                         continue;
    604                         #endif
    605                 }
    606                 transfer( *curr_work_queue, &current_queue );
    607                 #ifdef ACTOR_STATS
    608                 executor_->w_infos[id].gulps++;
    609                 #endif // ACTOR_STATS
    610                 #ifdef __STEAL
    611                 if ( is_empty( *current_queue ) ) {
    612                         if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; }
    613                         empty_count++;
    614                         if ( empty_count < steal_threshold ) continue;
    615                         empty_count = 0;
    616 
    617                         CHECK_TERMINATION; // check for termination
    618 
    619                         __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED );
    620                        
    621                         #ifdef ACTOR_STATS
    622                         executor_->w_infos[id].try_steal++;
    623                         #endif // ACTOR_STATS
    624                        
    625                         steal_work( this, start + prng( range ) );
    626                         continue;
    627                 }
    628                 #endif // __STEAL
    629                 while ( ! is_empty( *current_queue ) ) {
    630                         #ifdef ACTOR_STATS
    631                         executor_->w_infos[id].processed++;
    632                         #endif
    633                         &req = &remove( *current_queue );
    634                         if ( !&req ) continue;
    635                         deliver_request( req );
    636                 }
    637                 #ifdef __STEAL
    638                 curr_work_queue->being_processed = false;               // set done processing
    639                 empty_count = 0; // we found work so reset empty counter
    640                 #endif
    641 
    642                 CHECK_TERMINATION;
    643                
    644                 // potentially reclaim some of the current queue's vector space if it is unused
    645                 reclaim( *current_queue );
    646         } // for
     591
     592        // check if queue is empty before trying to gulp it
     593        if ( is_empty( *curr_work_queue->c_queue ) ) {
     594            #ifdef __STEAL
     595            empty_count++;
     596            if ( empty_count < steal_threshold ) continue;
     597            #else
     598            continue;
     599            #endif
     600        }
     601        transfer( *curr_work_queue, &current_queue );
     602        #ifdef ACTOR_STATS
     603        executor_->w_infos[id].gulps++;
     604        #endif // ACTOR_STATS
     605        #ifdef __STEAL
     606        if ( is_empty( *current_queue ) ) {
     607            if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; }
     608            empty_count++;
     609            if ( empty_count < steal_threshold ) continue;
     610            empty_count = 0;
     611
     612            CHECK_TERMINATION; // check for termination
     613
     614            __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED );
     615           
     616            #ifdef ACTOR_STATS
     617            executor_->w_infos[id].try_steal++;
     618            #endif // ACTOR_STATS
     619           
     620            steal_work( this, start + prng( range ) );
     621            continue;
     622        }
     623        #endif // __STEAL
     624        while ( ! is_empty( *current_queue ) ) {
     625            #ifdef ACTOR_STATS
     626            executor_->w_infos[id].processed++;
     627            #endif
     628            &req = &remove( *current_queue );
     629            if ( !&req ) continue;
     630            deliver_request( req );
     631        }
     632        #ifdef __STEAL
     633        curr_work_queue->being_processed = false; // set done processing
     634        empty_count = 0; // we found work so reset empty counter
     635        #endif
     636
     637        CHECK_TERMINATION;
     638       
     639        // potentially reclaim some of the current queue's vector space if it is unused
     640        reclaim( *current_queue );
     641    } // for
    647642}
    648643
    649644static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) {
    650         insert( request_queues[ticket], req);
     645    insert( request_queues[ticket], req);
    651646}
    652647
    653648static inline void send( actor & this, request & req ) {
    654         DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
    655         send( *__actor_executor_, req, this.ticket );
     649    DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
     650    send( *__actor_executor_, req, this.ticket );
    656651}
    657652
    658653static inline void __reset_stats() {
    659         #ifdef ACTOR_STATS
    660         __total_tries = 0;
    661         __total_stolen = 0;
    662         __all_gulps = 0;
    663         __total_failed_swaps = 0;
    664         __total_empty_stolen = 0;
    665         __all_processed = 0;
    666         __num_actors_stats = 0;
    667         __all_msgs_stolen = 0;
    668         #endif
     654    #ifdef ACTOR_STATS
     655    __total_tries = 0;
     656    __total_stolen = 0;
     657    __all_gulps = 0;
     658    __total_failed_swaps = 0;
     659    __total_empty_stolen = 0;
     660    __all_processed = 0;
     661    __num_actors_stats = 0;
     662    __all_msgs_stolen = 0;
     663    #endif
    669664}
    670665
    671666static inline void start_actor_system( size_t num_thds ) {
    672         __reset_stats();
    673         __actor_executor_thd = active_thread();
    674         __actor_executor_ = alloc();
    675         (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 };
     667    __reset_stats();
     668    __actor_executor_thd = active_thread();
     669    __actor_executor_ = alloc();
     670    (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 };
    676671}
    677672
     
    679674
    680675static inline void start_actor_system( executor & this ) {
    681         __reset_stats();
    682         __actor_executor_thd = active_thread();
    683         __actor_executor_ = &this;
    684         __actor_executor_passed = true;
     676    __reset_stats();
     677    __actor_executor_thd = active_thread();
     678    __actor_executor_ = &this;
     679    __actor_executor_passed = true;
    685680}
    686681
    687682static inline void stop_actor_system() {
    688         park();                                                                                         // unparked when actor system is finished
    689 
    690         if ( !__actor_executor_passed ) delete( __actor_executor_ );
    691         __actor_executor_ = 0p;
    692         __actor_executor_thd = 0p;
    693         __next_ticket = 0;
    694         __actor_executor_passed = false;
     683    park( ); // unparked when actor system is finished
     684
     685    if ( !__actor_executor_passed ) delete( __actor_executor_ );
     686    __actor_executor_ = 0p;
     687    __actor_executor_thd = 0p;
     688    __next_ticket = 0;
     689    __actor_executor_passed = false;
    695690}
    696691
     
    698693// assigned at creation to __base_msg_finished to avoid unused message warning
    699694message __base_msg_finished @= { .alloc : Finished };
    700 struct delete_msg_t { inline message; } delete_msg = __base_msg_finished;
     695struct delete_message_t { inline message; } delete_msg = __base_msg_finished;
    701696struct destroy_msg_t { inline message; } destroy_msg = __base_msg_finished;
    702697struct finished_msg_t { inline message; } finished_msg = __base_msg_finished;
    703698
    704 allocation receive( actor & this, delete_msg_t & msg ) { return Delete; }
     699allocation receive( actor & this, delete_message_t & msg ) { return Delete; }
    705700allocation receive( actor & this, destroy_msg_t & msg ) { return Destroy; }
    706701allocation receive( actor & this, finished_msg_t & msg ) { return Finished; }
     702
  • src/Parser/parser.yy

    r2dfdae3 r508671e  
    1010// Created On       : Sat Sep  1 20:22:55 2001
    1111// Last Modified By : Peter A. Buhr
    12 // Last Modified On : Tue Jun 20 22:10:31 2023
    13 // Update Count     : 6348
     12// Last Modified On : Sat Jun 17 18:53:24 2023
     13// Update Count     : 6347
    1414//
    1515
     
    689689        // | RESUME '(' comma_expression ')' compound_statement
    690690        //      { SemanticError( yylloc, "Resume expression is currently unimplemented." ); $$ = nullptr; }
    691         | IDENTIFIER IDENTIFIER                                                         // invalid syntax rule
     691        | IDENTIFIER IDENTIFIER                                                         // invalid syntax rules
    692692                { IdentifierBeforeIdentifier( *$1.str, *$2.str, "n expression" ); $$ = nullptr; }
    693         | IDENTIFIER type_qualifier                                                     // invalid syntax rule
     693        | IDENTIFIER type_qualifier                                                     // invalid syntax rules
    694694                { IdentifierBeforeType( *$1.str, "type qualifier" ); $$ = nullptr; }
    695         | IDENTIFIER storage_class                                                      // invalid syntax rule
     695        | IDENTIFIER storage_class                                                      // invalid syntax rules
    696696                { IdentifierBeforeType( *$1.str, "storage class" ); $$ = nullptr; }
    697         | IDENTIFIER basic_type_name                                            // invalid syntax rule
     697        | IDENTIFIER basic_type_name                                            // invalid syntax rules
    698698                { IdentifierBeforeType( *$1.str, "type" ); $$ = nullptr; }
    699         | IDENTIFIER TYPEDEFname                                                        // invalid syntax rule
     699        | IDENTIFIER TYPEDEFname                                                        // invalid syntax rules
    700700                { IdentifierBeforeType( *$1.str, "type" ); $$ = nullptr; }
    701         | IDENTIFIER TYPEGENname                                                        // invalid syntax rule
     701        | IDENTIFIER TYPEGENname                                                        // invalid syntax rules
    702702                { IdentifierBeforeType( *$1.str, "type" ); $$ = nullptr; }
    703703        ;
     
    12751275        | DEFAULT ':'                                                           { $$ = new ClauseNode( build_default( yylloc ) ); }
    12761276                // A semantic check is required to ensure only one default clause per switch/choose statement.
    1277         | DEFAULT error                                                                         //  invalid syntax rule
     1277        | DEFAULT error                                                                         //  invalid syntax rules
    12781278                { SemanticError( yylloc, "syntax error, colon missing after default." ); $$ = nullptr; }
    12791279        ;
     
    14051405                        else { SemanticError( yylloc, MISSING_HIGH ); $$ = nullptr; }
    14061406                }
    1407         | comma_expression updowneq comma_expression '~' '@' // CFA, invalid syntax rule
     1407        | comma_expression updowneq comma_expression '~' '@' // CFA, invalid syntax rules
    14081408                { SemanticError( yylloc, MISSING_ANON_FIELD ); $$ = nullptr; }
    1409         | '@' updowneq '@'                                                                      // CFA, invalid syntax rule
     1409        | '@' updowneq '@'                                                                      // CFA, invalid syntax rules
    14101410                { SemanticError( yylloc, MISSING_ANON_FIELD ); $$ = nullptr; }
    1411         | '@' updowneq comma_expression '~' '@'                         // CFA, invalid syntax rule
     1411        | '@' updowneq comma_expression '~' '@'                         // CFA, invalid syntax rules
    14121412                { SemanticError( yylloc, MISSING_ANON_FIELD ); $$ = nullptr; }
    1413         | comma_expression updowneq '@' '~' '@'                         // CFA, invalid syntax rule
     1413        | comma_expression updowneq '@' '~' '@'                         // CFA, invalid syntax rules
    14141414                { SemanticError( yylloc, MISSING_ANON_FIELD ); $$ = nullptr; }
    1415         | '@' updowneq '@' '~' '@'                                                      // CFA, invalid syntax rule
     1415        | '@' updowneq '@' '~' '@'                                                      // CFA, invalid syntax rules
    14161416                { SemanticError( yylloc, MISSING_ANON_FIELD ); $$ = nullptr; }
    14171417
     
    14341434                        else $$ = forCtrl( yylloc, $3, $1, $3->clone(), $4, nullptr, NEW_ONE );
    14351435                }
    1436         | comma_expression ';' '@' updowneq '@'                         // CFA, invalid syntax rule
     1436        | comma_expression ';' '@' updowneq '@'                         // CFA, invalid syntax rules
    14371437                { SemanticError( yylloc, "syntax error, missing low/high value for up/down-to range so index is uninitialized." ); $$ = nullptr; }
    14381438
    14391439        | comma_expression ';' comma_expression updowneq comma_expression '~' comma_expression // CFA
    14401440                { $$ = forCtrl( yylloc, $3, $1, UPDOWN( $4, $3->clone(), $5 ), $4, UPDOWN( $4, $5->clone(), $3->clone() ), $7 ); }
    1441         | comma_expression ';' '@' updowneq comma_expression '~' comma_expression // CFA, invalid syntax rule
     1441        | comma_expression ';' '@' updowneq comma_expression '~' comma_expression // CFA, invalid syntax rules
    14421442                {
    14431443                        if ( $4 == OperKinds::LThan || $4 == OperKinds::LEThan ) { SemanticError( yylloc, MISSING_LOW ); $$ = nullptr; }
     
    14521452        | comma_expression ';' comma_expression updowneq comma_expression '~' '@' // CFA
    14531453                { $$ = forCtrl( yylloc, $3, $1, UPDOWN( $4, $3->clone(), $5 ), $4, UPDOWN( $4, $5->clone(), $3->clone() ), nullptr ); }
    1454         | comma_expression ';' '@' updowneq comma_expression '~' '@' // CFA, invalid syntax rule
     1454        | comma_expression ';' '@' updowneq comma_expression '~' '@' // CFA, invalid syntax rules
    14551455                {
    14561456                        if ( $4 == OperKinds::LThan || $4 == OperKinds::LEThan ) { SemanticError( yylloc, MISSING_LOW ); $$ = nullptr; }
     
    15111511                        else $$ = forCtrl( yylloc, $1, $2, $3, nullptr, nullptr );
    15121512                }
    1513         | declaration '@' updowneq '@' '~' '@'                          // CFA, invalid syntax rule
     1513        | declaration '@' updowneq '@' '~' '@'                          // CFA, invalid syntax rules
    15141514                { SemanticError( yylloc, "syntax error, missing low/high value for up/down-to range so index is uninitialized." ); $$ = nullptr; }
    15151515
     
    16661666                { $$ = build_waitfor_timeout( yylloc, $1, $3, $4, maybe_build_compound( yylloc, $5 ) ); }
    16671667        // "else" must be conditional after timeout or timeout is never triggered (i.e., it is meaningless)
    1668         | wor_waitfor_clause wor when_clause_opt timeout statement wor ELSE statement // invalid syntax rule
     1668        | wor_waitfor_clause wor when_clause_opt timeout statement wor ELSE statement // invalid syntax rules
    16691669                { SemanticError( yylloc, "syntax error, else clause must be conditional after timeout or timeout never triggered." ); $$ = nullptr; }
    16701670        | wor_waitfor_clause wor when_clause_opt timeout statement wor when_clause ELSE statement
     
    17111711                { $$ = new ast::WaitUntilStmt::ClauseNode( ast::WaitUntilStmt::ClauseNode::Op::LEFT_OR, $1, build_waituntil_timeout( yylloc, $3, $4, maybe_build_compound( yylloc, $5 ) ) ); }
    17121712        // "else" must be conditional after timeout or timeout is never triggered (i.e., it is meaningless)
    1713         | wor_waituntil_clause wor when_clause_opt timeout statement wor ELSE statement // invalid syntax rule
     1713        | wor_waituntil_clause wor when_clause_opt timeout statement wor ELSE statement // invalid syntax rules
    17141714                { SemanticError( yylloc, "syntax error, else clause must be conditional after timeout or timeout never triggered." ); $$ = nullptr; }
    17151715        | wor_waituntil_clause wor when_clause_opt timeout statement wor when_clause ELSE statement
     
    31753175        | IDENTIFIER IDENTIFIER
    31763176                { IdentifierBeforeIdentifier( *$1.str, *$2.str, " declaration" ); $$ = nullptr; }
    3177         | IDENTIFIER type_qualifier                                                     // invalid syntax rule
     3177        | IDENTIFIER type_qualifier                                                     // invalid syntax rules
    31783178                { IdentifierBeforeType( *$1.str, "type qualifier" ); $$ = nullptr; }
    3179         | IDENTIFIER storage_class                                                      // invalid syntax rule
     3179        | IDENTIFIER storage_class                                                      // invalid syntax rules
    31803180                { IdentifierBeforeType( *$1.str, "storage class" ); $$ = nullptr; }
    3181         | IDENTIFIER basic_type_name                                            // invalid syntax rule
     3181        | IDENTIFIER basic_type_name                                            // invalid syntax rules
    31823182                { IdentifierBeforeType( *$1.str, "type" ); $$ = nullptr; }
    3183         | IDENTIFIER TYPEDEFname                                                        // invalid syntax rule
     3183        | IDENTIFIER TYPEDEFname                                                        // invalid syntax rules
    31843184                { IdentifierBeforeType( *$1.str, "type" ); $$ = nullptr; }
    3185         | IDENTIFIER TYPEGENname                                                        // invalid syntax rule
     3185        | IDENTIFIER TYPEGENname                                                        // invalid syntax rules
    31863186                { IdentifierBeforeType( *$1.str, "type" ); $$ = nullptr; }
    31873187        | external_function_definition
Note: See TracChangeset for help on using the changeset viewer.