#pragma once #include #include #include #include #include #ifdef __CFA_DEBUG__ #define CFA_DEBUG( stmt ) stmt #else #define CFA_DEBUG( stmt ) #endif // CFA_DEBUG #define DEBUG_ABORT( cond, string ) CFA_DEBUG( if ( cond ) abort( string ) ) // Define the default number of processors created in the executor. Must be greater than 0. #define __DEFAULT_EXECUTOR_PROCESSORS__ 2 // Define the default number of threads created in the executor. Must be greater than 0. #define __DEFAULT_EXECUTOR_WORKERS__ 2 // Define the default number of executor request-queues (mailboxes) written to by actors and serviced by the // actor-executor threads. Must be greater than 0. #define __DEFAULT_EXECUTOR_RQUEUES__ 4 // Define if executor is created in a separate cluster #define __DEFAULT_EXECUTOR_SEPCLUS__ false #define __DEFAULT_EXECUTOR_BUFSIZE__ 10 #define __STEAL 1 // workstealing toggle. Disjoint from toggles above // workstealing heuristic selection (only set one to be 1) // #define RAND 0 #define SEARCH 1 // show stats // #define ACTOR_STATS // used to run and only track missed queue gulps #ifdef ACTOR_STATS #define ACTOR_STATS_QUEUE_MISSED #endif // forward decls struct actor; struct message; struct executor; enum allocation { Nodelete, Delete, Destroy, Finished }; // allocation status typedef allocation (*__receive_fn)(actor &, message &, actor **, message **); struct request { actor * receiver; message * msg; __receive_fn fn; }; struct a_msg { int m; }; static inline void ?{}( request & this ) {} static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) { this.receiver = receiver; this.msg = msg; this.fn = fn; } static inline void ?{}( request & this, request & copy ) { this.receiver = copy.receiver; this.msg = copy.msg; this.fn = copy.fn; } // Vector-like data structure that supports O(1) queue operations with no bound on size // assumes gulping behaviour (once a remove occurs, removes happen until empty beforw next insert) struct copy_queue { request * buffer; size_t count, buffer_size, index, utilized, last_size; }; static inline void ?{}( copy_queue & this ) {} static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) { buffer_size = buf_size; buffer = aalloc( buffer_size ); count = 0; utilized = 0; index = 0; last_size = 0; } static inline void ^?{}( copy_queue & this ) with(this) { DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" ); adelete(buffer); } static inline void insert( copy_queue & this, request & elem ) with(this) { if ( count >= buffer_size ) { // increase arr size last_size = buffer_size; buffer_size = 2 * buffer_size; buffer = realloc( buffer, sizeof( request ) * buffer_size ); /* paranoid */ verify( buffer ); } memcpy( &buffer[count], &elem, sizeof(request) ); count++; } // once you start removing you need to remove all elements // it is not supported to call insert() before the array is fully empty static inline request & remove( copy_queue & this ) with(this) { if ( count > 0 ) { count--; size_t old_idx = index; index = count == 0 ? 0 : index + 1; return buffer[old_idx]; } request * ret = 0p; return *0p; } // try to reclaim some memory if less than half of buffer is utilized static inline void reclaim( copy_queue & this ) with(this) { if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; } utilized = 0; buffer_size--; buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory } static inline bool is_empty( copy_queue & this ) with(this) { return count == 0; } struct work_queue { __spinlock_t mutex_lock; copy_queue * owned_queue; // copy queue allocated and cleaned up by this work_queue copy_queue * c_queue; // current queue volatile bool being_processed; // flag to prevent concurrent processing #ifdef ACTOR_STATS unsigned int id; #endif #ifdef ACTOR_STATS_QUEUE_MISSED size_t missed; // transfers skipped due to being_processed flag being up #endif }; // work_queue static inline void ?{}( work_queue & this, size_t buf_size, unsigned int i ) with(this) { owned_queue = alloc(); // allocated separately to avoid false sharing (*owned_queue){ buf_size }; c_queue = owned_queue; being_processed = false; #ifdef ACTOR_STATS id = i; missed = 0; #endif } // clean up copy_queue owned by this work_queue static inline void ^?{}( work_queue & this ) with(this) { delete( owned_queue ); } static inline void insert( work_queue & this, request & elem ) with(this) { lock( mutex_lock __cfaabi_dbg_ctx2 ); insert( *c_queue, elem ); unlock( mutex_lock ); } // insert static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) { lock( mutex_lock __cfaabi_dbg_ctx2 ); #ifdef __STEAL // check if queue is being processed elsewhere if ( unlikely( being_processed ) ) { #ifdef ACTOR_STATS missed++; #endif unlock( mutex_lock ); return; } being_processed = c_queue->count != 0; #endif // __STEAL c_queue->utilized = c_queue->count; // swap copy queue ptrs copy_queue * temp = *transfer_to; *transfer_to = c_queue; c_queue = temp; unlock( mutex_lock ); } // transfer // needed since some info needs to persist past worker lifetimes struct worker_info { volatile unsigned long long stamp; #ifdef ACTOR_STATS size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen; unsigned long long processed; size_t gulps; #endif }; static inline void ?{}( worker_info & this ) { #ifdef ACTOR_STATS this.stolen_from = 0; this.try_steal = 0; // attempts to steal this.stolen = 0; // successful steals this.processed = 0; // requests processed this.gulps = 0; // number of gulps this.failed_swaps = 0; // steal swap failures this.empty_stolen = 0; // queues empty after steal this.msgs_stolen = 0; // number of messages stolen #endif this.stamp = rdtscl(); } // #ifdef ACTOR_STATS // unsigned int * stolen_arr; // unsigned int * replaced_queue; // #endif thread worker { work_queue ** request_queues; copy_queue * current_queue; executor * executor_; unsigned int start, range; int id; }; #ifdef ACTOR_STATS // aggregate counters for statistics size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, __total_empty_stolen = 0, __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0; #endif static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, executor * executor_, unsigned int start, unsigned int range, int id ) { ((thread &)this){ clu }; this.request_queues = request_queues; // array of all queues this.current_queue = current_queue; // currently gulped queue (start with empty queue to use in swap later) this.executor_ = executor_; // pointer to current executor this.start = start; // start of worker's subrange of request_queues this.range = range; // size of worker's subrange of request_queues this.id = id; // worker's id and index in array of workers } static bool no_steal = false; struct executor { cluster * cluster; // if workers execute on separate cluster processor ** processors; // array of virtual processors adding parallelism for workers work_queue * request_queues; // master array of work request queues copy_queue * local_queues; // array of all worker local queues to avoid deletion race work_queue ** worker_req_queues; // secondary array of work queues to allow for swapping worker ** workers; // array of workers executing work requests worker_info * w_infos; // array of info about each worker unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues bool seperate_clus; // use same or separate cluster for executor volatile bool is_shutdown; // flag to communicate shutdown to worker threads }; // executor // #ifdef ACTOR_STATS // __spinlock_t out_lock; // #endif static inline void ^?{}( worker & mutex this ) with(this) { #ifdef ACTOR_STATS __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST); __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST); __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST); __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST); __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST); __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST); __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST); // per worker steal stats (uncomment alongside the lock above this routine to print) // lock( out_lock __cfaabi_dbg_ctx2 ); // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) ); // int count = 0; // int count2 = 0; // for ( i; range ) { // if ( replaced_queue[start + i] > 0 ){ // count++; // // printf("%d: %u, ",i, replaced_queue[i]); // } // if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0) // count2++; // } // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers ); // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers ); // unlock( out_lock ); #endif } static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) { if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" ); this.nprocessors = nprocessors; this.nworkers = nworkers; this.nrqueues = nrqueues; this.seperate_clus = seperate_clus; this.is_shutdown = false; if ( nworkers == nrqueues ) no_steal = true; #ifdef ACTOR_STATS // stolen_arr = aalloc( nrqueues ); // replaced_queue = aalloc( nrqueues ); __total_workers = nworkers; #endif if ( seperate_clus ) { this.cluster = alloc(); (*cluster){}; } else cluster = active_cluster(); request_queues = aalloc( nrqueues ); worker_req_queues = aalloc( nrqueues ); for ( i; nrqueues ) { request_queues[i]{ buf_size, i }; worker_req_queues[i] = &request_queues[i]; } processors = aalloc( nprocessors ); for ( i; nprocessors ) (*(processors[i] = alloc())){ *cluster }; local_queues = aalloc( nworkers ); workers = aalloc( nworkers ); w_infos = aalloc( nworkers ); unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; for ( i; nworkers ) { w_infos[i]{}; local_queues[i]{ buf_size }; } for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) { range = reqPerWorker + ( i < extras ? 1 : 0 ); (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i }; } // for } static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __DEFAULT_EXECUTOR_BUFSIZE__ }; } static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; } static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; } static inline void ?{}( executor & this, unsigned int nprocessors ) { this{ nprocessors, __DEFAULT_EXECUTOR_WORKERS__ }; } static inline void ?{}( executor & this ) { this{ __DEFAULT_EXECUTOR_PROCESSORS__ }; } static inline void ^?{}( executor & this ) with(this) { is_shutdown = true; for ( i; nworkers ) delete( workers[i] ); for ( i; nprocessors ) { delete( processors[i] ); } // for #ifdef ACTOR_STATS_QUEUE_MISSED size_t misses = 0; for ( i; nrqueues ) { misses += worker_req_queues[i]->missed; } // adelete( stolen_arr ); // adelete( replaced_queue ); #endif adelete( workers ); adelete( w_infos ); adelete( local_queues ); adelete( request_queues ); adelete( worker_req_queues ); adelete( processors ); if ( seperate_clus ) delete( this.cluster ); #ifdef ACTOR_STATS // print formatted stats printf(" Actor System Stats:\n"); printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed); size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps; printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses); printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n", __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen); size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen; printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal); #endif #ifndef ACTOR_STATS #ifdef ACTOR_STATS_QUEUE_MISSED printf("\t%lu", misses); #endif #endif } // this is a static field of executor but have to forward decl for get_next_ticket static size_t __next_ticket = 0; static inline size_t __get_next_ticket( executor & this ) with(this) { #ifdef __CFA_DEBUG__ size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; // reserve MAX for dead actors if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; return temp; #else return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues; #endif } // tickets // TODO: update globals in this file to be static fields once the static fields project is done static executor * __actor_executor_ = 0p; static bool __actor_executor_passed = false; // was an executor passed to actor_start static size_t __num_actors_ = 0; // number of actor objects in system static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish struct actor { size_t ticket; // executor-queue handle allocation alloc; // allocation action inline virtual_dtor; }; static inline void ?{}( actor & this ) with(this) { // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive // member must be called to end it DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling actor_start() can cause undefined behaviour.\n" ); alloc = Nodelete; ticket = __get_next_ticket( *__actor_executor_ ); __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED ); #ifdef ACTOR_STATS __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST ); #endif } static inline void check_actor( actor & this ) { if ( this.alloc != Nodelete ) { switch( this.alloc ) { case Delete: delete( &this ); break; case Destroy: CFA_DEBUG( this.ticket = MAX; ); // mark as terminated ^?{}(this); break; case Finished: CFA_DEBUG( this.ticket = MAX; ); // mark as terminated break; default: ; // stop warning } if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated unpark( __actor_executor_thd ); } } } struct message { allocation alloc; // allocation action inline virtual_dtor; }; static inline void ?{}( message & this ) { this.alloc = Nodelete; } static inline void ?{}( message & this, allocation alloc ) { memcpy( &this.alloc, &alloc, sizeof(allocation) ); // optimization to elide ctor CFA_DEBUG( if ( this.alloc == Finished ) this.alloc = Nodelete; ); } static inline void ^?{}( message & this ) with(this) { CFA_DEBUG( if ( alloc == Nodelete ) { printf( "CFA warning (UNIX pid:%ld) : program terminating with message %p allocated but never sent.\n", (long int)getpid(), &this ); } ) } static inline void check_message( message & this ) { switch ( this.alloc ) { // analyze message status case Nodelete: CFA_DEBUG( this.alloc = Finished ); break; case Delete: delete( &this ); break; case Destroy: ^?{}( this ); break; case Finished: break; } // switch } static inline allocation set_allocation( message & this, allocation state ) { CFA_DEBUG( if ( state == Nodelete ) state = Finished; ); allocation prev = this.alloc; this.alloc = state; return prev; } static inline allocation get_allocation( message & this ) { return this.alloc; } static inline void deliver_request( request & this ) { DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); actor * base_actor; message * base_msg; allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg ); memcpy( &base_actor->alloc, &temp, sizeof(allocation) ); // optimization to elide ctor check_message( *base_msg ); check_actor( *base_actor ); } // tries to atomically swap two queues and returns 0p if the swap failed // returns ptr to newly owned queue if swap succeeds static inline work_queue * try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) { work_queue * my_queue = request_queues[my_idx]; work_queue * other_queue = request_queues[victim_idx]; // if either queue is 0p then they are in the process of being stolen if ( other_queue == 0p ) return 0p; // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) return 0p; // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) { /* paranoid */ verify( request_queues[my_idx] == 0p ); request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val return 0p; } // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically request_queues[my_idx] = other_queue; // last write does not need to be atomic return other_queue; } // once a worker to steal from has been chosen, choose queue to steal from static inline void choose_queue( worker & this, unsigned int victim_id, unsigned int swap_idx ) with(this) { // have to calculate victim start and range since victim may be deleted before us in shutdown const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers; const unsigned int extras = executor_->nrqueues % executor_->nworkers; unsigned int vic_start, vic_range; if ( extras > victim_id ) { vic_range = queues_per_worker + 1; vic_start = vic_range * victim_id; } else { vic_start = extras + victim_id * queues_per_worker; vic_range = queues_per_worker; } unsigned int start_idx = prng( vic_range ); unsigned int tries = 0; work_queue * curr_steal_queue; for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) { tries++; curr_steal_queue = request_queues[ i + vic_start ]; // avoid empty queues and queues that are being operated on if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) ) continue; #ifdef ACTOR_STATS curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); if ( curr_steal_queue ) { executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count; executor_->w_infos[id].stolen++; if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++; // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED); // replaced_queue[swap_idx]++; // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED); } else { executor_->w_infos[id].failed_swaps++; } #else curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); #endif // ACTOR_STATS return; } return; } // choose a worker to steal from static inline void steal_work( worker & this, unsigned int swap_idx ) with(this) { #if RAND unsigned int victim = prng( executor_->nworkers ); if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers; choose_queue( this, victim, swap_idx ); #elif SEARCH unsigned long long min = MAX; // smaller timestamp means longer since service int min_id = 0; // use ints not uints to avoid integer underflow without hacky math int n_workers = executor_->nworkers; unsigned long long curr_stamp; int scount = 1; for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) { curr_stamp = executor_->w_infos[i].stamp; if ( curr_stamp < min ) { min = curr_stamp; min_id = i; } } choose_queue( this, min_id, swap_idx ); #endif } #define CHECK_TERMINATION if ( unlikely( executor_->is_shutdown ) ) break Exit void main( worker & this ) with(this) { // #ifdef ACTOR_STATS // for ( i; executor_->nrqueues ) { // replaced_queue[i] = 0; // __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST ); // } // #endif // threshold of empty queues we see before we go stealing const unsigned int steal_threshold = 2 * range; // Store variable data here instead of worker struct to avoid any potential false sharing unsigned int empty_count = 0; request & req; work_queue * curr_work_queue; Exit: for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers curr_work_queue = request_queues[i + start]; #ifndef __STEAL CHECK_TERMINATION; #endif // check if queue is empty before trying to gulp it if ( is_empty( *curr_work_queue->c_queue ) ) { #ifdef __STEAL empty_count++; if ( empty_count < steal_threshold ) continue; #else continue; #endif } transfer( *curr_work_queue, ¤t_queue ); #ifdef ACTOR_STATS executor_->w_infos[id].gulps++; #endif // ACTOR_STATS #ifdef __STEAL if ( is_empty( *current_queue ) ) { if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; } empty_count++; if ( empty_count < steal_threshold ) continue; empty_count = 0; CHECK_TERMINATION; // check for termination __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED ); #ifdef ACTOR_STATS executor_->w_infos[id].try_steal++; #endif // ACTOR_STATS steal_work( this, start + prng( range ) ); continue; } #endif // __STEAL while ( ! is_empty( *current_queue ) ) { #ifdef ACTOR_STATS executor_->w_infos[id].processed++; #endif &req = &remove( *current_queue ); if ( !&req ) continue; deliver_request( req ); } #ifdef __STEAL curr_work_queue->being_processed = false; // set done processing empty_count = 0; // we found work so reset empty counter #endif CHECK_TERMINATION; // potentially reclaim some of the current queue's vector space if it is unused reclaim( *current_queue ); } // for } static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) { insert( request_queues[ticket], req); } static inline void send( actor & this, request & req ) { DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); send( *__actor_executor_, req, this.ticket ); } static inline void __reset_stats() { #ifdef ACTOR_STATS __total_tries = 0; __total_stolen = 0; __all_gulps = 0; __total_failed_swaps = 0; __total_empty_stolen = 0; __all_processed = 0; __num_actors_stats = 0; __all_msgs_stolen = 0; #endif } static inline void actor_start( size_t num_thds ) { __reset_stats(); __actor_executor_thd = active_thread(); __actor_executor_ = alloc(); (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 }; } static inline void actor_start() { actor_start( get_proc_count( *active_cluster() ) ); } static inline void actor_start( executor & this ) { __reset_stats(); __actor_executor_thd = active_thread(); __actor_executor_ = &this; __actor_executor_passed = true; } static inline void actor_stop() { park(); // unparked when actor system is finished if ( !__actor_executor_passed ) delete( __actor_executor_ ); __actor_executor_ = 0p; __actor_executor_thd = 0p; __next_ticket = 0; __actor_executor_passed = false; } // Default messages to send to any actor to change status // assigned at creation to __base_msg_finished to avoid unused message warning message __base_msg_finished @= { .alloc = Finished }; struct delete_msg_t { inline message; } delete_msg = __base_msg_finished; struct destroy_msg_t { inline message; } destroy_msg = __base_msg_finished; struct finished_msg_t { inline message; } finished_msg = __base_msg_finished; allocation receive( actor & this, delete_msg_t & ) { return Delete; } allocation receive( actor & this, destroy_msg_t & ) { return Destroy; } allocation receive( actor & this, finished_msg_t & ) { return Finished; } // Default messages used all the time. struct start_msg_t { inline message; } start_msg = __base_msg_finished; // start actor struct stop_msg_t { inline message; } stop_msg = __base_msg_finished; // terminate actor