Ignore:
Timestamp:
Aug 31, 2023, 11:31:15 PM (2 years ago)
Author:
JiadaL <j82liang@…>
Branches:
master
Children:
950c58e
Parents:
92355883 (diff), 686912c (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Resolve conflict

Location:
libcfa/src/concurrency
Files:
17 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/actor.hfa

    r92355883 r2a301ff  
    3939// #define ACTOR_STATS
    4040
     41// used to run and only track missed queue gulps
     42#ifdef ACTOR_STATS
     43#define ACTOR_STATS_QUEUE_MISSED
     44#endif
     45
    4146// forward decls
    4247struct actor;
     
    4853typedef allocation (*__receive_fn)(actor &, message &, actor **, message **);
    4954struct request {
    50     actor * receiver;
    51     message * msg;
    52     __receive_fn fn;
     55        actor * receiver;
     56        message * msg;
     57        __receive_fn fn;
    5358};
    5459
    5560struct a_msg {
    56     int m;
     61        int m;
    5762};
    5863static inline void ?{}( request & this ) {}
    5964static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {
    60     this.receiver = receiver;
    61     this.msg = msg;
    62     this.fn = fn;
     65        this.receiver = receiver;
     66        this.msg = msg;
     67        this.fn = fn;
    6368}
    6469static inline void ?{}( request & this, request & copy ) {
    65     this.receiver = copy.receiver;
    66     this.msg = copy.msg;
    67     this.fn = copy.fn;
     70        this.receiver = copy.receiver;
     71        this.msg = copy.msg;
     72        this.fn = copy.fn;
    6873}
    6974
     
    7176// assumes gulping behaviour (once a remove occurs, removes happen until empty beforw next insert)
    7277struct copy_queue {
    73     request * buffer;
    74     size_t count, buffer_size, index, utilized, last_size;
     78        request * buffer;
     79        size_t count, buffer_size, index, utilized, last_size;
    7580};
    7681static inline void ?{}( copy_queue & this ) {}
    7782static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) {
    78     buffer_size = buf_size;
    79     buffer = aalloc( buffer_size );
    80     count = 0;
    81     utilized = 0;
    82     index = 0;
    83     last_size = 0;
     83        buffer_size = buf_size;
     84        buffer = aalloc( buffer_size );
     85        count = 0;
     86        utilized = 0;
     87        index = 0;
     88        last_size = 0;
    8489}
    8590static inline void ^?{}( copy_queue & this ) with(this) {
    86     DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" );
    87     adelete(buffer);
     91        DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" );
     92        adelete(buffer);
    8893}
    8994
    9095static inline void insert( copy_queue & this, request & elem ) with(this) {
    91     if ( count >= buffer_size ) { // increase arr size
    92         last_size = buffer_size;
    93         buffer_size = 2 * buffer_size;
    94         buffer = realloc( buffer, sizeof( request ) * buffer_size );
    95         /* paranoid */ verify( buffer );
    96     }
    97     memcpy( &buffer[count], &elem, sizeof(request) );
    98     count++;
     96        if ( count >= buffer_size ) { // increase arr size
     97                last_size = buffer_size;
     98                buffer_size = 2 * buffer_size;
     99                buffer = realloc( buffer, sizeof( request ) * buffer_size );
     100                /* paranoid */ verify( buffer );
     101        }
     102        memcpy( &buffer[count], &elem, sizeof(request) );
     103        count++;
    99104}
    100105
     
    102107// it is not supported to call insert() before the array is fully empty
    103108static inline request & remove( copy_queue & this ) with(this) {
    104     if ( count > 0 ) {
    105         count--;
    106         size_t old_idx = index;
    107         index = count == 0 ? 0 : index + 1;
    108         return buffer[old_idx];
    109     }
    110     request * ret = 0p;
    111     return *0p;
     109        if ( count > 0 ) {
     110                count--;
     111                size_t old_idx = index;
     112                index = count == 0 ? 0 : index + 1;
     113                return buffer[old_idx];
     114        }
     115        request * ret = 0p;
     116        return *0p;
    112117}
    113118
    114119// try to reclaim some memory if less than half of buffer is utilized
    115120static inline void reclaim( copy_queue & this ) with(this) {
    116     if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; }
    117     utilized = 0;
    118     buffer_size--;
    119     buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory
     121        if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; }
     122        utilized = 0;
     123        buffer_size--;
     124        buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory
    120125}
    121126
     
    123128
    124129struct work_queue {
    125     __spinlock_t mutex_lock;
    126     copy_queue * owned_queue;       // copy queue allocated and cleaned up by this work_queue
    127     copy_queue * c_queue;           // current queue
    128     volatile bool being_processed;  // flag to prevent concurrent processing
    129     #ifdef ACTOR_STATS
    130     unsigned int id;
    131     size_t missed;                  // transfers skipped due to being_processed flag being up
     130        __spinlock_t mutex_lock;
     131        copy_queue * owned_queue;                                                       // copy queue allocated and cleaned up by this work_queue
     132        copy_queue * c_queue;                                                           // current queue
     133        volatile bool being_processed;                                          // flag to prevent concurrent processing
     134        #ifdef ACTOR_STATS
     135        unsigned int id;
    132136    #endif
     137    #ifdef ACTOR_STATS_QUEUE_MISSED
     138        size_t missed;                                                                          // transfers skipped due to being_processed flag being up
     139        #endif
    133140}; // work_queue
    134141static inline void ?{}( work_queue & this, size_t buf_size, unsigned int i ) with(this) {
    135     owned_queue = alloc();      // allocated separately to avoid false sharing
    136     (*owned_queue){ buf_size };
    137     c_queue = owned_queue;
    138     being_processed = false;
    139     #ifdef ACTOR_STATS
    140     id = i;
    141     missed = 0;
    142     #endif
     142        owned_queue = alloc();                                                          // allocated separately to avoid false sharing
     143        (*owned_queue){ buf_size };
     144        c_queue = owned_queue;
     145        being_processed = false;
     146        #ifdef ACTOR_STATS
     147        id = i;
     148        missed = 0;
     149        #endif
    143150}
    144151
     
    147154
    148155static inline void insert( work_queue & this, request & elem ) with(this) {
    149     lock( mutex_lock __cfaabi_dbg_ctx2 );
    150     insert( *c_queue, elem );
    151     unlock( mutex_lock );
     156        lock( mutex_lock __cfaabi_dbg_ctx2 );
     157        insert( *c_queue, elem );
     158        unlock( mutex_lock );
    152159} // insert
    153160
    154161static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) {
    155     lock( mutex_lock __cfaabi_dbg_ctx2 );
    156     #ifdef __STEAL
    157 
    158     // check if queue is being processed elsewhere
    159     if ( unlikely( being_processed ) ) {
    160         #ifdef ACTOR_STATS
    161         missed++;
    162         #endif
    163         unlock( mutex_lock );
    164         return;
    165     }
    166 
    167     being_processed = c_queue->count != 0;
    168     #endif // __STEAL
    169 
    170     c_queue->utilized = c_queue->count;
    171 
    172     // swap copy queue ptrs
    173     copy_queue * temp = *transfer_to;
    174     *transfer_to = c_queue;
    175     c_queue = temp;
    176     unlock( mutex_lock );
     162        lock( mutex_lock __cfaabi_dbg_ctx2 );
     163        #ifdef __STEAL
     164
     165        // check if queue is being processed elsewhere
     166        if ( unlikely( being_processed ) ) {
     167                #ifdef ACTOR_STATS
     168                missed++;
     169                #endif
     170                unlock( mutex_lock );
     171                return;
     172        }
     173
     174        being_processed = c_queue->count != 0;
     175        #endif // __STEAL
     176
     177        c_queue->utilized = c_queue->count;
     178
     179        // swap copy queue ptrs
     180        copy_queue * temp = *transfer_to;
     181        *transfer_to = c_queue;
     182        c_queue = temp;
     183        unlock( mutex_lock );
    177184} // transfer
    178185
    179186// needed since some info needs to persist past worker lifetimes
    180187struct worker_info {
    181     volatile unsigned long long stamp;
    182     #ifdef ACTOR_STATS
    183     size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen;
    184     unsigned long long processed;
    185     size_t gulps;
    186     #endif
     188        volatile unsigned long long stamp;
     189        #ifdef ACTOR_STATS
     190        size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen;
     191        unsigned long long processed;
     192        size_t gulps;
     193        #endif
    187194};
    188195static inline void ?{}( worker_info & this ) {
    189     #ifdef ACTOR_STATS
    190     this.stolen_from = 0;
    191     this.try_steal = 0;                             // attempts to steal
    192     this.stolen = 0;                                // successful steals
    193     this.processed = 0;                             // requests processed
    194     this.gulps = 0;                                 // number of gulps
    195     this.failed_swaps = 0;                          // steal swap failures
    196     this.empty_stolen = 0;                          // queues empty after steal
    197     this.msgs_stolen = 0;                           // number of messages stolen
    198     #endif
    199     this.stamp = rdtscl();
     196        #ifdef ACTOR_STATS
     197        this.stolen_from = 0;
     198        this.try_steal = 0;                                                                     // attempts to steal
     199        this.stolen = 0;                                                                        // successful steals
     200        this.processed = 0;                                                                     // requests processed
     201        this.gulps = 0;                                                                         // number of gulps
     202        this.failed_swaps = 0;                                                          // steal swap failures
     203        this.empty_stolen = 0;                                                          // queues empty after steal
     204        this.msgs_stolen = 0;                                                           // number of messages stolen
     205        #endif
     206        this.stamp = rdtscl();
    200207}
    201208
     
    205212// #endif
    206213thread worker {
    207     work_queue ** request_queues;
    208     copy_queue * current_queue;
    209     executor * executor_;
    210     unsigned int start, range;
    211     int id;
     214        work_queue ** request_queues;
     215        copy_queue * current_queue;
     216        executor * executor_;
     217        unsigned int start, range;
     218        int id;
    212219};
    213220
     
    215222// aggregate counters for statistics
    216223size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, __total_empty_stolen = 0,
    217     __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0;
     224        __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0;
    218225#endif
    219226static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, executor * executor_,
    220     unsigned int start, unsigned int range, int id ) {
    221     ((thread &)this){ clu };
    222     this.request_queues = request_queues;           // array of all queues
    223     this.current_queue = current_queue;             // currently gulped queue (start with empty queue to use in swap later)
    224     this.executor_ = executor_;                     // pointer to current executor
    225     this.start = start;                             // start of worker's subrange of request_queues
    226     this.range = range;                             // size of worker's subrange of request_queues
    227     this.id = id;                                   // worker's id and index in array of workers
     227        unsigned int start, unsigned int range, int id ) {
     228        ((thread &)this){ clu };
     229        this.request_queues = request_queues;                           // array of all queues
     230        this.current_queue = current_queue;                                     // currently gulped queue (start with empty queue to use in swap later)
     231        this.executor_ = executor_;                                                     // pointer to current executor
     232        this.start = start;                                                                     // start of worker's subrange of request_queues
     233        this.range = range;                                                                     // size of worker's subrange of request_queues
     234        this.id = id;                                                                           // worker's id and index in array of workers
    228235}
    229236
    230237static bool no_steal = false;
    231238struct executor {
    232     cluster * cluster;                                                      // if workers execute on separate cluster
    233         processor ** processors;                                            // array of virtual processors adding parallelism for workers
    234         work_queue * request_queues;                                // master array of work request queues
    235     copy_queue * local_queues;                      // array of all worker local queues to avoid deletion race
    236         work_queue ** worker_req_queues;                // secondary array of work queues to allow for swapping
    237     worker ** workers;                                                          // array of workers executing work requests
    238     worker_info * w_infos;                          // array of info about each worker
    239         unsigned int nprocessors, nworkers, nrqueues;   // number of processors/threads/request queues
    240         bool seperate_clus;                                                             // use same or separate cluster for executor
    241     volatile bool is_shutdown;                      // flag to communicate shutdown to worker threads
     239        cluster * cluster;                                                                      // if workers execute on separate cluster
     240        processor ** processors;                                                        // array of virtual processors adding parallelism for workers
     241        work_queue * request_queues;                                            // master array of work request queues
     242        copy_queue * local_queues;                                                      // array of all worker local queues to avoid deletion race
     243        work_queue ** worker_req_queues;                                        // secondary array of work queues to allow for swapping
     244        worker ** workers;                                                                      // array of workers executing work requests
     245        worker_info * w_infos;                                                          // array of info about each worker
     246        unsigned int nprocessors, nworkers, nrqueues;           // number of processors/threads/request queues
     247        bool seperate_clus;                                                                     // use same or separate cluster for executor
     248        volatile bool is_shutdown;                                                      // flag to communicate shutdown to worker threads
    242249}; // executor
    243250
     
    246253// #endif
    247254static inline void ^?{}( worker & mutex this ) with(this) {
    248     #ifdef ACTOR_STATS
    249     __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST);
    250     __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST);
    251     __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST);
    252     __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST);
    253     __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST);
    254     __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST);
    255     __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST);
    256 
    257     // per worker steal stats (uncomment alongside the lock above this routine to print)
    258     // lock( out_lock __cfaabi_dbg_ctx2 );
    259     // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) );
    260     // int count = 0;
    261     // int count2 = 0;
    262     // for ( i; range ) {
    263     //    if ( replaced_queue[start + i] > 0 ){
    264     //        count++;
    265     //        // printf("%d: %u, ",i, replaced_queue[i]);
    266     //    }
    267     //    if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0)
    268     //        count2++;
    269     // }
    270     // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers );
    271     // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers );
    272     // unlock( out_lock );
    273     #endif
     255        #ifdef ACTOR_STATS
     256        __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST);
     257        __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST);
     258        __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST);
     259        __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST);
     260        __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST);
     261        __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST);
     262        __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST);
     263
     264        // per worker steal stats (uncomment alongside the lock above this routine to print)
     265        // lock( out_lock __cfaabi_dbg_ctx2 );
     266        // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) );
     267        // int count = 0;
     268        // int count2 = 0;
     269        // for ( i; range ) {
     270        //      if ( replaced_queue[start + i] > 0 ){
     271        //              count++;
     272        //              // printf("%d: %u, ",i, replaced_queue[i]);
     273        //      }
     274        //      if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0)
     275        //              count2++;
     276        // }
     277        // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers );
     278        // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers );
     279        // unlock( out_lock );
     280        #endif
    274281}
    275282
    276283static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) {
    277     if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
    278     this.nprocessors = nprocessors;
    279     this.nworkers = nworkers;
    280     this.nrqueues = nrqueues;
    281     this.seperate_clus = seperate_clus;
    282     this.is_shutdown = false;
    283 
    284     if ( nworkers == nrqueues )
    285         no_steal = true;
    286    
    287     #ifdef ACTOR_STATS
    288     // stolen_arr = aalloc( nrqueues );
    289     // replaced_queue = aalloc( nrqueues );
    290     __total_workers = nworkers;
    291     #endif
    292 
    293     if ( seperate_clus ) {
    294         cluster = alloc();
    295         (*cluster){};
    296     } else cluster = active_cluster();
    297 
    298     request_queues = aalloc( nrqueues );
    299     worker_req_queues = aalloc( nrqueues );
    300     for ( i; nrqueues ) {
    301         request_queues[i]{ buf_size, i };
    302         worker_req_queues[i] = &request_queues[i];
    303     }
    304    
    305     processors = aalloc( nprocessors );
    306     for ( i; nprocessors )
    307         (*(processors[i] = alloc())){ *cluster };
    308 
    309     local_queues = aalloc( nworkers );
    310     workers = aalloc( nworkers );
    311     w_infos = aalloc( nworkers );
    312     unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
    313 
    314     for ( i; nworkers ) {
    315         w_infos[i]{};
    316         local_queues[i]{ buf_size };
    317     }
    318 
    319     for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
    320         range = reqPerWorker + ( i < extras ? 1 : 0 );
    321         (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i };
    322     } // for
     284        if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
     285        this.nprocessors = nprocessors;
     286        this.nworkers = nworkers;
     287        this.nrqueues = nrqueues;
     288        this.seperate_clus = seperate_clus;
     289        this.is_shutdown = false;
     290
     291        if ( nworkers == nrqueues )
     292                no_steal = true;
     293       
     294        #ifdef ACTOR_STATS
     295        // stolen_arr = aalloc( nrqueues );
     296        // replaced_queue = aalloc( nrqueues );
     297        __total_workers = nworkers;
     298        #endif
     299
     300        if ( seperate_clus ) {
     301                cluster = alloc();
     302                (*cluster){};
     303        } else cluster = active_cluster();
     304
     305        request_queues = aalloc( nrqueues );
     306        worker_req_queues = aalloc( nrqueues );
     307        for ( i; nrqueues ) {
     308                request_queues[i]{ buf_size, i };
     309                worker_req_queues[i] = &request_queues[i];
     310        }
     311       
     312        processors = aalloc( nprocessors );
     313        for ( i; nprocessors )
     314                (*(processors[i] = alloc())){ *cluster };
     315
     316        local_queues = aalloc( nworkers );
     317        workers = aalloc( nworkers );
     318        w_infos = aalloc( nworkers );
     319        unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
     320
     321        for ( i; nworkers ) {
     322                w_infos[i]{};
     323                local_queues[i]{ buf_size };
     324        }
     325
     326        for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
     327                range = reqPerWorker + ( i < extras ? 1 : 0 );
     328                (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i };
     329        } // for
    323330}
    324331static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __DEFAULT_EXECUTOR_BUFSIZE__ }; }
     
    329336
    330337static inline void ^?{}( executor & this ) with(this) {
    331     is_shutdown = true;
    332 
    333     for ( i; nworkers )
    334         delete( workers[i] );
    335 
    336     for ( i; nprocessors ) {
    337         delete( processors[i] );
    338     } // for
    339 
    340     #ifdef ACTOR_STATS
    341     size_t misses = 0;
    342     for ( i; nrqueues ) {
    343         misses += worker_req_queues[i]->missed;
    344     }
    345     // adelete( stolen_arr );
    346     // adelete( replaced_queue );
     338        is_shutdown = true;
     339
     340        for ( i; nworkers )
     341                delete( workers[i] );
     342
     343        for ( i; nprocessors ) {
     344                delete( processors[i] );
     345        } // for
     346
     347        #ifdef ACTOR_STATS_QUEUE_MISSED
     348        size_t misses = 0;
     349        for ( i; nrqueues ) {
     350                misses += worker_req_queues[i]->missed;
     351        }
     352        // adelete( stolen_arr );
     353        // adelete( replaced_queue );
     354        #endif
     355
     356        adelete( workers );
     357        adelete( w_infos );
     358        adelete( local_queues );
     359        adelete( request_queues );
     360        adelete( worker_req_queues );
     361        adelete( processors );
     362        if ( seperate_clus ) delete( cluster );
     363
     364        #ifdef ACTOR_STATS // print formatted stats
     365        printf("        Actor System Stats:\n");
     366        printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed);
     367        size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps;
     368        printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses);
     369        printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n",
     370                __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen);
     371        size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen;
     372        printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal);
     373        #endif
     374
     375    #ifndef ACTOR_STATS
     376    #ifdef ACTOR_STATS_QUEUE_MISSED
     377    printf("\t%lu", misses);
    347378    #endif
    348 
    349     adelete( workers );
    350     adelete( w_infos );
    351     adelete( local_queues );
    352     adelete( request_queues );
    353     adelete( worker_req_queues );
    354     adelete( processors );
    355     if ( seperate_clus ) delete( cluster );
    356 
    357     #ifdef ACTOR_STATS // print formatted stats
    358     printf("    Actor System Stats:\n");
    359     printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed);
    360     size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps;
    361     printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses);
    362     printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n",
    363         __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen);
    364     size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen;
    365     printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal);
    366379    #endif
    367        
     380               
    368381}
    369382
     
    372385
    373386static inline size_t __get_next_ticket( executor & this ) with(this) {
    374     #ifdef __CFA_DEBUG__
    375     size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
    376 
    377     // reserve MAX for dead actors
    378     if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
    379     return temp;
    380     #else
    381     return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues;
    382     #endif
     387        #ifdef __CFA_DEBUG__
     388        size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
     389
     390        // reserve MAX for dead actors
     391        if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
     392        return temp;
     393        #else
     394        return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues;
     395        #endif
    383396} // tickets
    384397
    385398// TODO: update globals in this file to be static fields once the static fields project is done
    386399static executor * __actor_executor_ = 0p;
    387 static bool __actor_executor_passed = false;            // was an executor passed to start_actor_system
    388 static size_t __num_actors_ = 0;                                        // number of actor objects in system
     400static bool __actor_executor_passed = false;                    // was an executor passed to start_actor_system
     401static size_t __num_actors_ = 0;                                                // number of actor objects in system
    389402static struct thread$ * __actor_executor_thd = 0p;              // used to wake executor after actors finish
    390403struct actor {
    391     size_t ticket;                                          // executor-queue handle
    392     allocation allocation_;                                         // allocation action
    393     inline virtual_dtor;
     404        size_t ticket;                                                                          // executor-queue handle
     405        allocation alloc;                                                                       // allocation action
     406        inline virtual_dtor;
    394407};
    395408
    396409static inline void ?{}( actor & this ) with(this) {
    397     // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
    398     // member must be called to end it
    399     DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );
    400     allocation_ = Nodelete;
    401     ticket = __get_next_ticket( *__actor_executor_ );
    402     __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED );
    403     #ifdef ACTOR_STATS
    404     __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST );
    405     #endif
     410        // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
     411        // member must be called to end it
     412        DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );
     413        alloc = Nodelete;
     414        ticket = __get_next_ticket( *__actor_executor_ );
     415        __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED );
     416        #ifdef ACTOR_STATS
     417        __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST );
     418        #endif
    406419}
    407420
    408421static inline void check_actor( actor & this ) {
    409     if ( this.allocation_ != Nodelete ) {
    410         switch( this.allocation_ ) {
    411             case Delete: delete( &this ); break;
    412             case Destroy:
    413                 CFA_DEBUG( this.ticket = MAX; );        // mark as terminated
    414                 ^?{}(this);
    415                 break;
    416             case Finished:
    417                 CFA_DEBUG( this.ticket = MAX; );        // mark as terminated
    418                 break;
    419             default: ;                                                          // stop warning
    420         }
    421 
    422         if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated
    423             unpark( __actor_executor_thd );
    424         }
    425     }
     422        if ( this.alloc != Nodelete ) {
     423                switch( this.alloc ) {
     424                        case Delete: delete( &this ); break;
     425                        case Destroy:
     426                                CFA_DEBUG( this.ticket = MAX; );                // mark as terminated
     427                                ^?{}(this);
     428                                break;
     429                        case Finished:
     430                                CFA_DEBUG( this.ticket = MAX; );                // mark as terminated
     431                                break;
     432                        default: ;                                                                      // stop warning
     433                }
     434
     435                if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated
     436                        unpark( __actor_executor_thd );
     437                }
     438        }
    426439}
    427440
    428441struct message {
    429     allocation allocation_;                     // allocation action
    430     inline virtual_dtor;
     442        allocation alloc;                                                                       // allocation action
     443        inline virtual_dtor;
    431444};
    432445
    433446static inline void ?{}( message & this ) {
    434     this.allocation_ = Nodelete;
     447        this.alloc = Nodelete;
    435448}
    436449static inline void ?{}( message & this, allocation alloc ) {
    437     memcpy( &this.allocation_, &alloc, sizeof(allocation) ); // optimization to elide ctor
    438     DEBUG_ABORT( this.allocation_ == Finished, "The Finished allocation status is not supported for message types.\n" );
     450        memcpy( &this.alloc, &alloc, sizeof(allocation) );      // optimization to elide ctor
     451        CFA_DEBUG( if ( this.alloc == Finished ) this.alloc = Nodelete; );
    439452}
    440453static inline void ^?{}( message & this ) with(this) {
    441     CFA_DEBUG( if ( allocation_ == Nodelete ) printf("A message at location %p was allocated but never sent.\n", &this); )
     454        CFA_DEBUG(
     455                if ( alloc == Nodelete ) {
     456                        printf( "CFA warning (UNIX pid:%ld) : program terminating with message %p allocated but never sent.\n",
     457                                        (long int)getpid(), &this );
     458                }
     459        )
    442460}
    443461
    444462static inline void check_message( message & this ) {
    445     switch ( this.allocation_ ) {                                               // analyze message status
    446         case Nodelete: CFA_DEBUG( this.allocation_ = Finished ); break;
    447         case Delete: delete( &this ); break;
    448         case Destroy: ^?{}( this ); break;
    449         case Finished: break;
    450     } // switch
    451 }
    452 static inline void set_allocation( message & this, allocation state ) {
    453     this.allocation_ = state;
     463        switch ( this.alloc ) {                                         // analyze message status
     464                case Nodelete: CFA_DEBUG( this.alloc = Finished ); break;
     465                case Delete: delete( &this ); break;
     466                case Destroy: ^?{}( this ); break;
     467                case Finished: break;
     468        } // switch
     469}
     470static inline allocation set_allocation( message & this, allocation state ) {
     471        CFA_DEBUG( if ( state == Nodelete ) state = Finished; );
     472        allocation prev = this.alloc;
     473        this.alloc = state;
     474        return prev;
     475}
     476static inline allocation get_allocation( message & this ) {
     477        return this.alloc;
    454478}
    455479
    456480static inline void deliver_request( request & this ) {
    457     DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
    458     actor * base_actor;
    459     message * base_msg;
    460     allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg );
    461     base_actor->allocation_ = temp;
    462     check_message( *base_msg );
    463     check_actor( *base_actor );
     481        DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
     482        actor * base_actor;
     483        message * base_msg;
     484        allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg );
     485        memcpy( &base_actor->alloc, &temp, sizeof(allocation) ); // optimization to elide ctor
     486        check_message( *base_msg );
     487        check_actor( *base_actor );
    464488}
    465489
     
    467491// returns ptr to newly owned queue if swap succeeds
    468492static inline work_queue * try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) {
    469     work_queue * my_queue = request_queues[my_idx];
    470     work_queue * other_queue = request_queues[victim_idx];
    471 
    472     // if either queue is 0p then they are in the process of being stolen
    473     if ( other_queue == 0p ) return 0p;
    474 
    475     // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false
    476     if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )
    477         return 0p;
    478 
    479     // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false
    480     if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
    481         /* paranoid */ verify( request_queues[my_idx] == 0p );
    482         request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val
    483         return 0p;
    484     }
    485 
    486     // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically
    487     request_queues[my_idx] = other_queue; // last write does not need to be atomic
    488     return other_queue;
     493        work_queue * my_queue = request_queues[my_idx];
     494        work_queue * other_queue = request_queues[victim_idx];
     495
     496        // if either queue is 0p then they are in the process of being stolen
     497        if ( other_queue == 0p ) return 0p;
     498
     499        // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false
     500        if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )
     501                return 0p;
     502
     503        // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false
     504        if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
     505                /* paranoid */ verify( request_queues[my_idx] == 0p );
     506                request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val
     507                return 0p;
     508        }
     509
     510        // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically
     511        request_queues[my_idx] = other_queue; // last write does not need to be atomic
     512        return other_queue;
    489513}
    490514
    491515// once a worker to steal from has been chosen, choose queue to steal from
    492516static inline void choose_queue( worker & this, unsigned int victim_id, unsigned int swap_idx ) with(this) {
    493     // have to calculate victim start and range since victim may be deleted before us in shutdown
    494     const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers;
    495     const unsigned int extras = executor_->nrqueues % executor_->nworkers;
    496     unsigned int vic_start, vic_range;
    497     if ( extras > victim_id  ) {
    498         vic_range = queues_per_worker + 1;
    499         vic_start = vic_range * victim_id;
    500     } else {
    501         vic_start = extras + victim_id * queues_per_worker;
    502         vic_range = queues_per_worker;
    503     }
    504     unsigned int start_idx = prng( vic_range );
    505 
    506     unsigned int tries = 0;
    507     work_queue * curr_steal_queue;
    508 
    509     for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) {
    510         tries++;
    511         curr_steal_queue = request_queues[ i + vic_start ];
    512         // avoid empty queues and queues that are being operated on
    513         if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) )
    514             continue;
    515 
    516         #ifdef ACTOR_STATS
    517         curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
    518         if ( curr_steal_queue ) {
    519             executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count;
    520             executor_->w_infos[id].stolen++;
    521             if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++;
    522             // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);
    523             // replaced_queue[swap_idx]++;
    524             // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED);
    525         } else {
    526             executor_->w_infos[id].failed_swaps++;
    527         }
    528         #else
    529         curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
    530         #endif // ACTOR_STATS
    531 
    532         return;
    533     }
    534 
    535     return;
     517        // have to calculate victim start and range since victim may be deleted before us in shutdown
     518        const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers;
     519        const unsigned int extras = executor_->nrqueues % executor_->nworkers;
     520        unsigned int vic_start, vic_range;
     521        if ( extras > victim_id  ) {
     522                vic_range = queues_per_worker + 1;
     523                vic_start = vic_range * victim_id;
     524        } else {
     525                vic_start = extras + victim_id * queues_per_worker;
     526                vic_range = queues_per_worker;
     527        }
     528        unsigned int start_idx = prng( vic_range );
     529
     530        unsigned int tries = 0;
     531        work_queue * curr_steal_queue;
     532
     533        for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) {
     534                tries++;
     535                curr_steal_queue = request_queues[ i + vic_start ];
     536                // avoid empty queues and queues that are being operated on
     537                if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) )
     538                        continue;
     539
     540                #ifdef ACTOR_STATS
     541                curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
     542                if ( curr_steal_queue ) {
     543                        executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count;
     544                        executor_->w_infos[id].stolen++;
     545                        if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++;
     546                        // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);
     547                        // replaced_queue[swap_idx]++;
     548                        // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED);
     549                } else {
     550                        executor_->w_infos[id].failed_swaps++;
     551                }
     552                #else
     553                curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
     554                #endif // ACTOR_STATS
     555
     556                return;
     557        }
     558
     559        return;
    536560}
    537561
    538562// choose a worker to steal from
    539563static inline void steal_work( worker & this, unsigned int swap_idx ) with(this) {
    540     #if RAND
    541     unsigned int victim = prng( executor_->nworkers );
    542     if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers;
    543     choose_queue( this, victim, swap_idx );
    544     #elif SEARCH
    545     unsigned long long min = MAX; // smaller timestamp means longer since service
    546     int min_id = 0; // use ints not uints to avoid integer underflow without hacky math
    547     int n_workers = executor_->nworkers;
    548     unsigned long long curr_stamp;
    549     int scount = 1;
    550     for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) {
    551         curr_stamp = executor_->w_infos[i].stamp;
    552         if ( curr_stamp < min ) {
    553             min = curr_stamp;
    554             min_id = i;
    555         }
    556     }
    557     choose_queue( this, min_id, swap_idx );
    558     #endif
     564        #if RAND
     565        unsigned int victim = prng( executor_->nworkers );
     566        if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers;
     567        choose_queue( this, victim, swap_idx );
     568        #elif SEARCH
     569        unsigned long long min = MAX; // smaller timestamp means longer since service
     570        int min_id = 0; // use ints not uints to avoid integer underflow without hacky math
     571        int n_workers = executor_->nworkers;
     572        unsigned long long curr_stamp;
     573        int scount = 1;
     574        for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) {
     575                curr_stamp = executor_->w_infos[i].stamp;
     576                if ( curr_stamp < min ) {
     577                        min = curr_stamp;
     578                        min_id = i;
     579                }
     580        }
     581        choose_queue( this, min_id, swap_idx );
     582        #endif
    559583}
    560584
    561585#define CHECK_TERMINATION if ( unlikely( executor_->is_shutdown ) ) break Exit
    562586void main( worker & this ) with(this) {
    563     // #ifdef ACTOR_STATS
    564     // for ( i; executor_->nrqueues ) {
    565     //     replaced_queue[i] = 0;
    566     //     __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST );
    567     // }
    568     // #endif
    569 
    570     // threshold of empty queues we see before we go stealing
    571     const unsigned int steal_threshold = 2 * range;
    572 
    573     // Store variable data here instead of worker struct to avoid any potential false sharing
    574     unsigned int empty_count = 0;
    575     request & req;
    576     work_queue * curr_work_queue;
    577 
    578     Exit:
    579     for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers
    580         curr_work_queue = request_queues[i + start];
    581        
    582         // check if queue is empty before trying to gulp it
    583         if ( is_empty( *curr_work_queue->c_queue ) ) {
    584             #ifdef __STEAL
    585             empty_count++;
    586             if ( empty_count < steal_threshold ) continue;
    587             #else
    588             continue;
    589             #endif
    590         }
    591         transfer( *curr_work_queue, &current_queue );
    592         #ifdef ACTOR_STATS
    593         executor_->w_infos[id].gulps++;
    594         #endif // ACTOR_STATS
    595         #ifdef __STEAL
    596         if ( is_empty( *current_queue ) ) {
    597             if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; }
    598             empty_count++;
    599             if ( empty_count < steal_threshold ) continue;
    600             empty_count = 0;
    601 
    602             CHECK_TERMINATION; // check for termination
    603 
    604             __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED );
    605            
    606             #ifdef ACTOR_STATS
    607             executor_->w_infos[id].try_steal++;
    608             #endif // ACTOR_STATS
    609            
    610             steal_work( this, start + prng( range ) );
    611             continue;
    612         }
    613         #endif // __STEAL
    614         while ( ! is_empty( *current_queue ) ) {
    615             #ifdef ACTOR_STATS
    616             executor_->w_infos[id].processed++;
    617             #endif
    618             &req = &remove( *current_queue );
    619             if ( !&req ) continue;
    620             deliver_request( req );
    621         }
    622         #ifdef __STEAL
    623         curr_work_queue->being_processed = false; // set done processing
    624         empty_count = 0; // we found work so reset empty counter
     587        // #ifdef ACTOR_STATS
     588        // for ( i; executor_->nrqueues ) {
     589        //       replaced_queue[i] = 0;
     590        //       __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST );
     591        // }
     592        // #endif
     593
     594        // threshold of empty queues we see before we go stealing
     595        const unsigned int steal_threshold = 2 * range;
     596
     597        // Store variable data here instead of worker struct to avoid any potential false sharing
     598        unsigned int empty_count = 0;
     599        request & req;
     600        work_queue * curr_work_queue;
     601
     602        Exit:
     603        for ( unsigned int i = 0;; i = (i + 1) % range ) {      // cycle through set of request buffers
     604                curr_work_queue = request_queues[i + start];
     605
     606        #ifndef __STEAL
     607        CHECK_TERMINATION;
    625608        #endif
    626 
    627         CHECK_TERMINATION;
    628        
    629         // potentially reclaim some of the current queue's vector space if it is unused
    630         reclaim( *current_queue );
    631     } // for
     609               
     610                // check if queue is empty before trying to gulp it
     611                if ( is_empty( *curr_work_queue->c_queue ) ) {
     612                        #ifdef __STEAL
     613                        empty_count++;
     614                        if ( empty_count < steal_threshold ) continue;
     615                        #else
     616                        continue;
     617                        #endif
     618                }
     619                transfer( *curr_work_queue, &current_queue );
     620                #ifdef ACTOR_STATS
     621                executor_->w_infos[id].gulps++;
     622                #endif // ACTOR_STATS
     623                #ifdef __STEAL
     624                if ( is_empty( *current_queue ) ) {
     625                        if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; }
     626                        empty_count++;
     627                        if ( empty_count < steal_threshold ) continue;
     628                        empty_count = 0;
     629
     630                        CHECK_TERMINATION; // check for termination
     631
     632                        __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED );
     633                       
     634                        #ifdef ACTOR_STATS
     635                        executor_->w_infos[id].try_steal++;
     636                        #endif // ACTOR_STATS
     637                       
     638                        steal_work( this, start + prng( range ) );
     639                        continue;
     640                }
     641                #endif // __STEAL
     642                while ( ! is_empty( *current_queue ) ) {
     643                        #ifdef ACTOR_STATS
     644                        executor_->w_infos[id].processed++;
     645                        #endif
     646                        &req = &remove( *current_queue );
     647                        if ( !&req ) continue;
     648                        deliver_request( req );
     649                }
     650                #ifdef __STEAL
     651                curr_work_queue->being_processed = false;               // set done processing
     652                empty_count = 0; // we found work so reset empty counter
     653                #endif
     654
     655                CHECK_TERMINATION;
     656               
     657                // potentially reclaim some of the current queue's vector space if it is unused
     658                reclaim( *current_queue );
     659        } // for
    632660}
    633661
    634662static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) {
    635     insert( request_queues[ticket], req);
     663        insert( request_queues[ticket], req);
    636664}
    637665
    638666static inline void send( actor & this, request & req ) {
    639     DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
    640     send( *__actor_executor_, req, this.ticket );
     667        DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
     668        send( *__actor_executor_, req, this.ticket );
    641669}
    642670
    643671static inline void __reset_stats() {
    644     #ifdef ACTOR_STATS
    645     __total_tries = 0;
    646     __total_stolen = 0;
    647     __all_gulps = 0;
    648     __total_failed_swaps = 0;
    649     __total_empty_stolen = 0;
    650     __all_processed = 0;
    651     __num_actors_stats = 0;
    652     __all_msgs_stolen = 0;
    653     #endif
     672        #ifdef ACTOR_STATS
     673        __total_tries = 0;
     674        __total_stolen = 0;
     675        __all_gulps = 0;
     676        __total_failed_swaps = 0;
     677        __total_empty_stolen = 0;
     678        __all_processed = 0;
     679        __num_actors_stats = 0;
     680        __all_msgs_stolen = 0;
     681        #endif
    654682}
    655683
    656684static inline void start_actor_system( size_t num_thds ) {
    657     __reset_stats();
    658     __actor_executor_thd = active_thread();
    659     __actor_executor_ = alloc();
    660     (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 };
     685        __reset_stats();
     686        __actor_executor_thd = active_thread();
     687        __actor_executor_ = alloc();
     688        (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 };
    661689}
    662690
     
    664692
    665693static inline void start_actor_system( executor & this ) {
    666     __reset_stats();
    667     __actor_executor_thd = active_thread();
    668     __actor_executor_ = &this;
    669     __actor_executor_passed = true;
     694        __reset_stats();
     695        __actor_executor_thd = active_thread();
     696        __actor_executor_ = &this;
     697        __actor_executor_passed = true;
    670698}
    671699
    672700static inline void stop_actor_system() {
    673     park( ); // will be unparked when actor system is finished
    674 
    675     if ( !__actor_executor_passed ) delete( __actor_executor_ );
    676     __actor_executor_ = 0p;
    677     __actor_executor_thd = 0p;
    678     __next_ticket = 0;
    679     __actor_executor_passed = false;
     701        park();                                                                                         // unparked when actor system is finished
     702
     703        if ( !__actor_executor_passed ) delete( __actor_executor_ );
     704        __actor_executor_ = 0p;
     705        __actor_executor_thd = 0p;
     706        __next_ticket = 0;
     707        __actor_executor_passed = false;
    680708}
    681709
    682710// Default messages to send to any actor to change status
    683711// assigned at creation to __base_msg_finished to avoid unused message warning
    684 message __base_msg_finished @= { .allocation_ : Finished };
    685 struct __delete_msg_t { inline message; } delete_msg = __base_msg_finished;
    686 struct __destroy_msg_t { inline message; } destroy_msg = __base_msg_finished;
    687 struct __finished_msg_t { inline message; } finished_msg = __base_msg_finished;
    688 
    689 allocation receive( actor & this, __delete_msg_t & msg ) { return Delete; }
    690 allocation receive( actor & this, __destroy_msg_t & msg ) { return Destroy; }
    691 allocation receive( actor & this, __finished_msg_t & msg ) { return Finished; }
    692 
     712message __base_msg_finished @= { .alloc : Finished };
     713struct delete_msg_t { inline message; } delete_msg = __base_msg_finished;
     714struct destroy_msg_t { inline message; } destroy_msg = __base_msg_finished;
     715struct finished_msg_t { inline message; } finished_msg = __base_msg_finished;
     716
     717allocation receive( actor & this, delete_msg_t & msg ) { return Delete; }
     718allocation receive( actor & this, destroy_msg_t & msg ) { return Destroy; }
     719allocation receive( actor & this, finished_msg_t & msg ) { return Finished; }
  • libcfa/src/concurrency/alarm.hfa

    r92355883 r2a301ff  
    1010// Created On       : Fri Jun 2 11:31:25 2017
    1111// Last Modified By : Peter A. Buhr
    12 // Last Modified On : Mon Mar 26 16:25:41 2018
    13 // Update Count     : 11
     12// Last Modified On : Wed Aug 30 21:27:40 2023
     13// Update Count     : 12
    1414//
    1515
     
    2323#include "time.hfa"
    2424
    25 #include "containers/list.hfa"
     25#include "collections/list.hfa"
    2626
    2727struct thread$;
  • libcfa/src/concurrency/channel.hfa

    r92355883 r2a301ff  
    6868    #endif
    6969};
     70static inline void ?{}( channel(T) & this, channel(T) this2 ) = void;
     71static inline void ?=?( channel(T) & this, channel(T) this2 ) = void;
    7072
    7173static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
     
    326328    return retval;
    327329}
     330static inline void remove( channel(T) & chan ) { T elem = (T)remove( chan ); }
     331
     332
     333///////////////////////////////////////////////////////////////////////////////////////////
     334// The following is Go-style operator support for channels
     335///////////////////////////////////////////////////////////////////////////////////////////
     336
     337static inline void ?<<?( channel(T) & chan, T elem ) { insert( chan, elem ); }
     338static inline void ?<<?( T & ret, channel(T) & chan ) { ret = remove( chan ); }
    328339
    329340///////////////////////////////////////////////////////////////////////////////////////////
     
    340351    unlock( mutex_lock );
    341352
    342     // only return true when not special OR case, not exceptional calse and status is SAT
    343     return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT;
     353    // only return true when not special OR case and status is SAT
     354    return !node.park_counter ? false : *node.clause_status == __SELECT_SAT;
    344355}
    345356
     
    363374// type used by select statement to capture a chan read as the selected operation
    364375struct chan_read {
    365     T & ret;
    366     channel(T) & chan;
     376    T * ret;
     377    channel(T) * chan;
    367378};
    368 
    369 static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) {
    370     &cr.chan = &chan;
    371     &cr.ret = &ret;
    372 }
    373 static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; }
    374 
    375 static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) {
    376     __closed_remove( chan, ret );
     379__CFA_SELECT_GET_TYPE( chan_read(T) );
     380
     381static inline void ?{}( chan_read(T) & cr, channel(T) * chan, T * ret ) {
     382    cr.chan = chan;
     383    cr.ret = ret;
     384}
     385static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ &chan, &ret }; return cr; }
     386
     387static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(*this.chan, this) {
     388    __closed_remove( *chan, *ret );
    377389    // if we get here then the insert succeeded
    378390    __make_select_node_available( node );
    379391}
    380392
    381 static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) {
    382     lock( mutex_lock );
    383     node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
     393static inline bool register_select( chan_read(T) & this, select_node & node ) with(*this.chan, this) {
     394    lock( mutex_lock );
     395    node.extra = ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
    384396
    385397    #ifdef CHAN_STATS
     
    396408
    397409            if ( __handle_pending( prods, node ) ) {
    398                 __prods_handoff( chan, ret );
     410                __prods_handoff( *chan, *ret );
    399411                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
    400412                unlock( mutex_lock );
     
    422434    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
    423435        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
    424         __prods_handoff( chan, ret );
     436        __prods_handoff( *chan, *ret );
    425437        __set_avail_then_unlock( node, mutex_lock );
    426438        return true;
     
    439451
    440452    // Remove from buffer
    441     __do_remove( chan, ret );
     453    __do_remove( *chan, *ret );
    442454    __set_avail_then_unlock( node, mutex_lock );
    443455    return true;
    444456}
    445 static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
    446 static inline void on_selected( chan_read(T) & this, select_node & node ) with(this) {
    447     if ( node.extra == 0p ) // check if woken up due to closed channel
    448         __closed_remove( chan, ret );
     457static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); }
     458static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
     459    if ( unlikely(node.extra == 0p) ) {
     460        if ( !exception_in_flight() ) __closed_remove( *chan, *ret ); // check if woken up due to closed channel
     461        else return false;
     462    }
    449463    // This is only reachable if not closed or closed exception was handled
    450 }
     464    return true;
     465}
     466
     467// type used by select statement to capture a chan read as the selected operation that doesn't have a param to read to
     468struct chan_read_no_ret {
     469    T retval;
     470    chan_read( T ) c_read;
     471};
     472__CFA_SELECT_GET_TYPE( chan_read_no_ret(T) );
     473
     474static inline void ?{}( chan_read_no_ret(T) & this, channel(T) & chan ) {
     475    this.c_read{ &chan, &this.retval };
     476}
     477
     478static inline chan_read_no_ret(T) remove( channel(T) & chan ) { chan_read_no_ret(T) c_read{ chan }; return c_read; }
     479static inline bool register_select( chan_read_no_ret(T) & this, select_node & node ) {
     480    this.c_read.ret = &this.retval;
     481    return register_select( this.c_read, node );
     482}
     483static inline bool unregister_select( chan_read_no_ret(T) & this, select_node & node ) { return unregister_select( this.c_read, node ); }
     484static inline bool on_selected( chan_read_no_ret(T) & this, select_node & node ) { return on_selected( this.c_read, node ); }
    451485
    452486// type used by select statement to capture a chan write as the selected operation
    453487struct chan_write {
    454488    T elem;
    455     channel(T) & chan;
     489    channel(T) * chan;
    456490};
    457 
    458 static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) {
    459     &cw.chan = &chan;
     491__CFA_SELECT_GET_TYPE( chan_write(T) );
     492
     493static inline void ?{}( chan_write(T) & cw, channel(T) * chan, T elem ) {
     494    cw.chan = chan;
    460495    memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
    461496}
    462 static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; }
    463 
    464 static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) {
    465     __closed_insert( chan, elem );
     497static inline chan_write(T) ?<<?( channel(T) & chan, T elem ) { chan_write(T) cw{ &chan, elem }; return cw; }
     498static inline chan_write(T) insert( T elem, channel(T) & chan) { chan_write(T) cw{ &chan, elem }; return cw; }
     499
     500static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(*this.chan, this) {
     501    __closed_insert( *chan, elem );
    466502    // if we get here then the insert succeeded
    467503    __make_select_node_available( node );
    468504}
    469505
    470 static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) {
     506static inline bool register_select( chan_write(T) & this, select_node & node ) with(*this.chan, this) {
    471507    lock( mutex_lock );
    472508    node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
     
    486522
    487523            if ( __handle_pending( cons, node ) ) {
    488                 __cons_handoff( chan, elem );
     524                __cons_handoff( *chan, elem );
    489525                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
    490526                unlock( mutex_lock );
     
    513549    ConsEmpty: if ( !cons`isEmpty ) {
    514550        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
    515         __cons_handoff( chan, elem );
     551        __cons_handoff( *chan, elem );
    516552        __set_avail_then_unlock( node, mutex_lock );
    517553        return true;
     
    530566
    531567    // otherwise carry out write either via normal insert
    532     __buf_insert( chan, elem );
     568    __buf_insert( *chan, elem );
    533569    __set_avail_then_unlock( node, mutex_lock );
    534570    return true;
    535571}
    536 static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
    537 
    538 static inline void on_selected( chan_write(T) & this, select_node & node ) with(this) {
    539     if ( node.extra == 0p ) // check if woken up due to closed channel
    540         __closed_insert( chan, elem );
    541 
     572static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); }
     573
     574static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
     575    if ( unlikely(node.extra == 0p) ) {
     576        if ( !exception_in_flight() ) __closed_insert( *chan, elem ); // check if woken up due to closed channel
     577        else return false;
     578    }
    542579    // This is only reachable if not closed or closed exception was handled
     580    return true;
    543581}
    544582
  • libcfa/src/concurrency/coroutine.cfa

    r92355883 r2a301ff  
    2828#include "kernel/private.hfa"
    2929#include "exception.hfa"
     30#include "exception.h"
    3031#include "math.hfa"
    3132
     
    7778        free( desc->cancellation );
    7879        desc->cancellation = 0p;
     80}
     81
     82// helper for popping from coroutine's ehm buffer
     83inline nonlocal_exception * pop_ehm_head( coroutine$ * this ) {
     84    lock( this->ehm_state.buffer_lock __cfaabi_dbg_ctx2 );
     85    nonlocal_exception * nl_ex = pop_head( this->ehm_state.ehm_buffer );
     86    unlock( this->ehm_state.buffer_lock );
     87    return nl_ex;
    7988}
    8089
     
    121130        last = 0p;
    122131        cancellation = 0p;
     132    ehm_state.ehm_buffer{};
     133    ehm_state.buffer_lock{};
     134    ehm_state.ehm_enabled = false;
    123135}
    124136
    125137void ^?{}(coroutine$& this) libcfa_public {
     138    // handle any leftover pending non-local exceptions
     139    nonlocal_exception * nl_ex = pop_ehm_head( &this );
     140    unsigned unhandled_ex = 0;
     141   
     142    // if any leftover exceptions handle
     143    while ( nl_ex != 0p ){
     144        unhandled_ex++;
     145        free( nl_ex->the_exception );
     146        free( nl_ex );
     147        nl_ex = pop_ehm_head( &this );
     148    }
     149
     150    #ifdef __CFA_DEBUG__
     151    if ( unhandled_ex > 0 )
     152        printf( "Warning: Coroutine %p exited with %u pending nonlocal exceptions.\n", &this, unhandled_ex );
     153    #endif
     154
    126155        if(this.state != Halted && this.state != Start && this.state != Primed) {
    127156                coroutine$ * src = active_coroutine();
     
    283312}
    284313
     314
     315////////////////////////////////////////////////////////////////////////////////////////////////////
     316// non local ehm routines
     317
     318void defaultResumeAtHandler( exception_t * except ) {
     319    __cfaehm_allocate_exception( except );
     320    free( except );
     321    __cfaehm_begin_unwind( (void(*)(exception_t *))defaultTerminationHandler );
     322}
     323
     324bool poll( coroutine$ * cor ) libcfa_public {
     325    nonlocal_exception * nl_ex = pop_ehm_head( cor );
     326
     327    // if no exceptions return false
     328    if ( nl_ex == 0p ) return false;
     329   
     330    // otherwise loop and throwResume all pending exceptions
     331    while ( nl_ex != 0p ){
     332        exception_t * ex = nl_ex->the_exception;
     333        free( nl_ex );
     334        __cfaehm_throw_resume( ex, defaultResumeAtHandler );
     335       
     336        // only reached if resumption handled. other dealloc handled in defaultResumeAtHandler
     337        free( ex );
     338        nl_ex = pop_ehm_head( cor );
     339    }
     340   
     341    return true;
     342}
     343
     344bool poll() libcfa_public { return poll( active_coroutine() ); }
     345coroutine$ * resumer() libcfa_public { return active_coroutine()->last; }
     346
     347// user facing ehm operations
     348forall(T & | is_coroutine(T)) {
     349    // enable/disable non-local exceptions
     350    void enable_ehm( T & cor ) libcfa_public { get_coroutine( cor )->ehm_state.ehm_enabled = true; }
     351    void disable_ehm( T & cor ) libcfa_public { get_coroutine( cor )->ehm_state.ehm_enabled = false; }
     352
     353    // poll for non-local exceptions
     354    bool poll( T & cor ) libcfa_public { return poll( get_coroutine( cor ) ); }
     355
     356    // poll iff nonlocal ehm is enabled
     357    bool checked_poll( T & cor ) libcfa_public { return get_coroutine( cor )->ehm_state.ehm_enabled ? poll( cor ) : false; }
     358
     359    coroutine$ * resumer( T & cor ) libcfa_public { return get_coroutine( cor )->last; }
     360}
     361
     362// resume non local exception at receiver (i.e. enqueue in ehm buffer)
     363forall(exceptT *, T & | ehm_resume_at( exceptT, T ))
     364void resumeAt( T & receiver, exceptT & ex )  libcfa_public {
     365    coroutine$ * cor = get_coroutine( receiver );
     366    nonlocal_exception * nl_ex = alloc();
     367    exceptT * ex_copy = alloc();
     368    memcpy( ex_copy, &ex, sizeof(exceptT) );
     369    (*nl_ex){ (exception_t *)ex_copy };
     370    lock( cor->ehm_state.buffer_lock __cfaabi_dbg_ctx2 );
     371    append( cor->ehm_state.ehm_buffer, nl_ex );
     372    unlock( cor->ehm_state.buffer_lock );
     373}
     374
     375forall(exceptT * | { void $throwResume(exceptT &); })
     376void resumeAt( coroutine$ * receiver, exceptT & ex ) libcfa_public {
     377    nonlocal_exception * nl_ex = alloc();
     378    exceptT * ex_copy = alloc();
     379    memcpy( ex_copy, &ex, sizeof(exceptT) );
     380    (*nl_ex){ (exception_t *)ex_copy };
     381    lock( receiver->ehm_state.buffer_lock __cfaabi_dbg_ctx2 );
     382    append( receiver->ehm_state.ehm_buffer, nl_ex );
     383    unlock( receiver->ehm_state.buffer_lock );
     384}
     385
    285386// Local Variables: //
    286387// mode: c //
  • libcfa/src/concurrency/coroutine.hfa

    r92355883 r2a301ff  
    1919#include "invoke.h"
    2020#include "../exception.hfa"
     21
     22//-----------------------------------------------------------------------------
     23// Type used to store and queue nonlocal exceptions on coroutines
     24struct nonlocal_exception {
     25    exception_t * the_exception;
     26    nonlocal_exception * next;
     27};
     28static inline void ?{} ( nonlocal_exception & this, exception_t * ex ) with(this) {
     29    the_exception = ex;
     30    next = 0p;
     31}
     32
     33static inline nonlocal_exception *& get_next( nonlocal_exception & this ) __attribute__((const)) {
     34    return this.next;
     35}
    2136
    2237//-----------------------------------------------------------------------------
     
    203218}
    204219
     220// non local ehm and coroutine utility routines
     221bool poll( coroutine$ * cor );
     222bool poll();
     223coroutine$ * resumer();
     224
     225forall(T & | is_coroutine(T)) {
     226    void enable_ehm( T & cor );
     227    void disable_ehm( T & cor );
     228    bool poll( T & cor );
     229    bool checked_poll( T & cor );
     230    coroutine$ * resumer( T & cor );
     231}
     232
     233// trait for exceptions able to be resumed at another coroutine
     234forall(exceptT *, T & | is_coroutine(T))
     235trait ehm_resume_at { void $throwResume(exceptT &); };
     236
     237// general resumeAt
     238forall(exceptT *, T & | ehm_resume_at( exceptT, T ))
     239void resumeAt( T & receiver, exceptT & ex );
     240
     241// resumeAt for underlying coroutine$ type
     242forall(exceptT * | { void $throwResume(exceptT &); })
     243void resumeAt( coroutine$ * receiver, exceptT & ex );
     244
    205245// Local Variables: //
    206246// mode: c //
  • libcfa/src/concurrency/future.hfa

    r92355883 r2a301ff  
    3939        futex_mutex lock;
    4040        };
     41    __CFA_SELECT_GET_TYPE( future(T) );
    4142
    4243    struct future_node {
     
    180181        }
    181182               
    182         void on_selected( future(T) & this, select_node & node ) {}
     183        bool on_selected( future(T) & this, select_node & node ) { return true; }
    183184        }
    184185}
  • libcfa/src/concurrency/invoke.h

    r92355883 r2a301ff  
    1010// Created On       : Tue Jan 17 12:27:26 2016
    1111// Last Modified By : Peter A. Buhr
    12 // Last Modified On : Tue Mar 14 13:39:31 2023
    13 // Update Count     : 59
     12// Last Modified On : Wed Aug 30 21:27:51 2023
     13// Update Count     : 60
    1414//
    1515
    1616// No not use #pragma once was this file is included twice in some places. It has its own guard system.
    1717
    18 #include "bits/containers.hfa"
     18#include "bits/collections.hfa"
    1919#include "bits/defs.hfa"
    2020#include "bits/locks.hfa"
     
    2323
    2424#ifdef __cforall
    25 #include "containers/list.hfa"
     25#include "collections/list.hfa"
    2626extern "C" {
    2727#endif
     
    7474        };
    7575
     76    struct nonlocal_ehm {
     77        // list of pending nonlocal exceptions
     78        __queue_t(struct nonlocal_exception) ehm_buffer;
     79
     80        // lock to protect the buffer
     81        struct __spinlock_t buffer_lock;
     82
     83        // enable/disabled flag
     84        bool ehm_enabled;
     85    };
     86
    7687        enum __Coroutine_State { Halted, Start, Primed, Blocked, Ready, Active, Cancelled, Halting };
    7788
     
    98109                struct _Unwind_Exception * cancellation;
    99110
     111        // Non-local exception handling information
     112        struct nonlocal_ehm ehm_state;
    100113        };
    101114        // Wrapper for gdb
     
    242255        #ifdef __cforall
    243256        extern "Cforall" {
     257        static inline bool exception_in_flight() {
     258            return __get_stack( &active_thread()->self_cor )->exception_context.current_exception != 0p;
     259        }
     260
    244261                static inline thread$ * volatile & ?`next ( thread$ * this ) {
    245262                        return this->user_link.next;
  • libcfa/src/concurrency/iofwd.hfa

    r92355883 r2a301ff  
    1010// Created On       : Thu Apr 23 17:31:00 2020
    1111// Last Modified By : Peter A. Buhr
    12 // Last Modified On : Mon Mar 13 23:54:57 2023
    13 // Update Count     : 1
     12// Last Modified On : Fri Jul 21 21:36:01 2023
     13// Update Count     : 3
    1414//
    1515
     
    1818#include <unistd.h>
    1919#include <sys/socket.h>
     20#include <string.h>                                                                             // memset
    2021
    2122extern "C" {
     
    151152#if CFA_HAVE_LINUX_IO_URING_H
    152153        static inline void zero_sqe(struct io_uring_sqe * sqe) {
    153                 sqe->flags = 0;
    154                 sqe->ioprio = 0;
    155                 sqe->fd = 0;
    156                 sqe->off = 0;
    157                 sqe->addr = 0;
    158                 sqe->len = 0;
    159                 sqe->fsync_flags = 0;
    160                 sqe->__pad2[0] = 0;
    161                 sqe->__pad2[1] = 0;
    162                 sqe->__pad2[2] = 0;
    163                 sqe->fd = 0;
    164                 sqe->off = 0;
    165                 sqe->addr = 0;
    166                 sqe->len = 0;
     154                memset( sqe, 0, sizeof( struct io_uring_sqe ) );
    167155        }
    168156#endif
  • libcfa/src/concurrency/kernel.cfa

    r92355883 r2a301ff  
    569569                returnToKernel();
    570570        __enable_interrupts_checked();
    571 
    572571}
    573572
  • libcfa/src/concurrency/kernel.hfa

    r92355883 r2a301ff  
    1010// Created On       : Tue Jan 17 12:27:26 2017
    1111// Last Modified By : Peter A. Buhr
    12 // Last Modified On : Tue Feb  4 12:29:26 2020
    13 // Update Count     : 22
     12// Last Modified On : Wed Aug 30 21:28:46 2023
     13// Update Count     : 23
    1414//
    1515
     
    2020#include "coroutine.hfa"
    2121
    22 #include "containers/list.hfa"
     22#include "collections/list.hfa"
    2323
    2424extern "C" {
  • libcfa/src/concurrency/kernel/startup.cfa

    r92355883 r2a301ff  
    487487        last = 0p;
    488488        cancellation = 0p;
     489    ehm_state.ehm_buffer{};
     490    ehm_state.buffer_lock{};
     491    ehm_state.ehm_enabled = false;
    489492}
    490493
  • libcfa/src/concurrency/locks.cfa

    r92355883 r2a301ff  
    239239}
    240240
    241 void on_selected( blocking_lock & this, select_node & node ) {}
     241bool on_selected( blocking_lock & this, select_node & node ) { return true; }
    242242
    243243//-----------------------------------------------------------------------------
  • libcfa/src/concurrency/locks.hfa

    r92355883 r2a301ff  
    2121
    2222#include "bits/weakso_locks.hfa"
    23 #include "containers/lockfree.hfa"
    24 #include "containers/list.hfa"
     23#include "collections/lockfree.hfa"
     24#include "collections/list.hfa"
    2525
    2626#include "limits.hfa"
     
    112112static inline bool   register_select( single_acquisition_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); }
    113113static inline bool   unregister_select( single_acquisition_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); }
    114 static inline void   on_selected( single_acquisition_lock & this, select_node & node ) { on_selected( (blocking_lock &)this, node ); }
     114static inline bool   on_selected( single_acquisition_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); }
     115__CFA_SELECT_GET_TYPE( single_acquisition_lock );
    115116
    116117//----------
     
    129130static inline bool   register_select( owner_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); }
    130131static inline bool   unregister_select( owner_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); }
    131 static inline void   on_selected( owner_lock & this, select_node & node ) { on_selected( (blocking_lock &)this, node ); }
     132static inline bool   on_selected( owner_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); }
     133__CFA_SELECT_GET_TYPE( owner_lock );
    132134
    133135//-----------------------------------------------------------------------------
     
    138140};
    139141
    140 static inline void ?{}(mcs_node & this) { this.next = 0p; }
     142static inline void ?{}( mcs_node & this ) { this.next = 0p; }
    141143
    142144static inline mcs_node * volatile & ?`next ( mcs_node * node ) {
     
    148150};
    149151
    150 static inline void lock(mcs_lock & l, mcs_node & n) {
     152static inline void lock( mcs_lock & l, mcs_node & n ) {
    151153        if(push(l.queue, &n))
    152154                wait(n.sem);
     
    172174};
    173175
    174 static inline void ?{}(mcs_spin_node & this) { this.next = 0p; this.locked = true; }
     176static inline void ?{}( mcs_spin_node & this ) { this.next = 0p; this.locked = true; }
    175177
    176178struct mcs_spin_lock {
     
    178180};
    179181
    180 static inline void lock(mcs_spin_lock & l, mcs_spin_node & n) {
     182static inline void lock( mcs_spin_lock & l, mcs_spin_node & n ) {
    181183    n.locked = true;
    182184        mcs_spin_node * prev = __atomic_exchange_n(&l.queue.tail, &n, __ATOMIC_SEQ_CST);
     
    271273};
    272274static inline void  ?{}( go_mutex & this ) with(this) { val = 0; }
    273 // static inline void ?{}( go_mutex & this, go_mutex this2 ) = void; // these don't compile correctly at the moment so they should be omitted
    274 // static inline void ?=?( go_mutex & this, go_mutex this2 ) = void;
     275static inline void ?{}( go_mutex & this, go_mutex this2 ) = void;
     276static inline void ?=?( go_mutex & this, go_mutex this2 ) = void;
    275277
    276278static inline bool internal_try_lock(go_mutex & this, int & compare_val, int new_val ) with(this) {
     
    619621}
    620622
    621 static inline void on_selected( simple_owner_lock & this, select_node & node ) {}
    622 
     623static inline bool on_selected( simple_owner_lock & this, select_node & node ) { return true; }
     624__CFA_SELECT_GET_TYPE( simple_owner_lock );
    623625
    624626//-----------------------------------------------------------------------------
  • libcfa/src/concurrency/once.hfa

    r92355883 r2a301ff  
    1616#pragma once
    1717
    18 #include "containers/lockfree.hfa"
     18#include "collections/lockfree.hfa"
    1919#include "kernel/fwd.hfa"
    2020
  • libcfa/src/concurrency/select.cfa

    r92355883 r2a301ff  
    4949    return false;
    5050}
    51 void on_selected( select_timeout_node & this, select_node & node ) {}
     51bool on_selected( select_timeout_node & this, select_node & node ) { return true; }
    5252
    5353// Gateway routine to wait on duration
  • libcfa/src/concurrency/select.hfa

    r92355883 r2a301ff  
    1717#pragma once
    1818
    19 #include "containers/list.hfa"
     19#include "collections/list.hfa"
    2020#include "alarm.hfa"
    2121#include "kernel.hfa"
     
    9494
    9595    // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node
    96     //    passed as an arg to this routine
    97     // If on_selected returns false, the statement is not run, if it returns true it is run.
    98     void on_selected( T &, select_node & );
     96    //    passed as an arg to this routine. If true is returned proceed as normal, if false is returned the statement is skipped
     97    bool on_selected( T &, select_node & );
    9998};
     99// Used inside the compiler to allow for overloading on return type for operations such as '?<<?' for channels
     100// YOU MUST USE THIS MACRO OR INCLUDE AN EQUIVALENT DECL FOR YOUR TYPE TO SUPPORT WAITUNTIL
     101#define __CFA_SELECT_GET_TYPE( typename ) typename __CFA_select_get_type( typename __CFA_t )
     102
    100103
    101104//=============================================================================================
     
    208211bool register_select( select_timeout_node & this, select_node & node );
    209212bool unregister_select( select_timeout_node & this, select_node & node );
    210 void on_selected( select_timeout_node & this, select_node & node );
     213bool on_selected( select_timeout_node & this, select_node & node );
     214select_timeout_node __CFA_select_get_type( select_timeout_node this );
    211215
    212216// Gateway routines to waituntil on duration
    213217select_timeout_node timeout( Duration duration );
    214218select_timeout_node sleep( Duration duration );
     219
  • libcfa/src/concurrency/stats.cfa

    r92355883 r2a301ff  
    1111#if !defined(__CFA_NO_STATISTICS__)
    1212        void __init_stats( struct __stats_t * stats ) {
    13                 stats->ready.push.local.attempt = 0;
    14                 stats->ready.push.local.success = 0;
    15                 stats->ready.push.share.attempt = 0;
    16                 stats->ready.push.share.success = 0;
    17                 stats->ready.push.extrn.attempt = 0;
    18                 stats->ready.push.extrn.success = 0;
    19                 stats->ready.pop.local .attempt = 0;
    20                 stats->ready.pop.local .success = 0;
    21                 stats->ready.pop.help  .attempt = 0;
    22                 stats->ready.pop.help  .success = 0;
    23                 stats->ready.pop.steal .attempt = 0;
    24                 stats->ready.pop.steal .success = 0;
    25                 stats->ready.pop.search.attempt = 0;
    26                 stats->ready.pop.search.success = 0;
    27                 stats->ready.threads.migration = 0;
    28                 stats->ready.threads.extunpark = 0;
    29                 stats->ready.threads.threads   = 0;
    30                 stats->ready.threads.cthreads  = 0;
    31                 stats->ready.threads.preempt.yield  = 0;
    32                 stats->ready.threads.preempt.rllfwd = 0;
    33                 stats->ready.sleep.halts   = 0;
    34                 stats->ready.sleep.cancels = 0;
    35                 stats->ready.sleep.early   = 0;
    36                 stats->ready.sleep.wakes   = 0;
    37                 stats->ready.sleep.seen    = 0;
    38                 stats->ready.sleep.exits   = 0;
     13                memset( &stats->ready, 0, sizeof( stats->ready ) );
    3914
    4015                #if defined(CFA_HAVE_LINUX_IO_URING_H)
    41                         stats->io.alloc.fast        = 0;
    42                         stats->io.alloc.slow        = 0;
    43                         stats->io.alloc.fail        = 0;
    44                         stats->io.alloc.revoke      = 0;
    45                         stats->io.alloc.block       = 0;
    46                         stats->io.submit.fast       = 0;
    47                         stats->io.submit.slow       = 0;
    48                         stats->io.submit.eagr       = 0;
    49                         stats->io.submit.nblk       = 0;
    50                         stats->io.submit.extr       = 0;
    51                         stats->io.flush.external    = 0;
    52                         stats->io.flush.signal      = 0;
    53                         stats->io.flush.dirty       = 0;
    54                         stats->io.flush.full        = 0;
    55                         stats->io.flush.idle        = 0;
    56                         stats->io.flush.eager       = 0;
    57                         stats->io.calls.flush       = 0;
    58                         stats->io.calls.submitted   = 0;
    59                         stats->io.calls.drain       = 0;
    60                         stats->io.calls.completed   = 0;
    61                         stats->io.calls.locked      = 0;
    62                         stats->io.calls.helped      = 0;
    63                         stats->io.calls.errors.busy = 0;
    64                         stats->io.ops.sockread      = 0;
    65                         stats->io.ops.epllread      = 0;
    66                         stats->io.ops.sockwrite     = 0;
    67                         stats->io.ops.epllwrite     = 0;
     16                        memset( &stats->io, 0, sizeof( stats->io ) );
    6817                #endif
    6918
Note: See TracChangeset for help on using the changeset viewer.