#pragma once #include #include #include #include #include #ifdef __CFA_DEBUG__ #define CFA_DEBUG( stmt ) stmt #else #define CFA_DEBUG( stmt ) #endif // CFA_DEBUG // Define the default number of processors created in the executor. Must be greater than 0. #define __DEFAULT_EXECUTOR_PROCESSORS__ 2 // Define the default number of threads created in the executor. Must be greater than 0. #define __DEFAULT_EXECUTOR_WORKERS__ 2 // Define the default number of executor request-queues (mailboxes) written to by actors and serviced by the // actor-executor threads. Must be greater than 0. #define __DEFAULT_EXECUTOR_RQUEUES__ 2 // Define if executor is created in a separate cluster #define __DEFAULT_EXECUTOR_SEPCLUS__ false #define __STEAL 1 // workstealing toggle. Disjoint from toggles above // whether to steal work or to steal a queue Only applicable if __STEAL == 1 #define __STEAL_WORK 0 // heuristic selection (only set one to be 1) #define __RAND_QUEUE 1 #define __RAND_WORKER 0 // show stealing stats // #define __STEAL_STATS // forward decls struct actor; struct message; enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status typedef Allocation (*__receive_fn)(actor &, message &); struct request { actor * receiver; message * msg; __receive_fn fn; bool stop; }; static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) { this.receiver = receiver; this.msg = msg; this.fn = fn; this.stop = false; } static inline void ?{}( request & this, request & copy ) { this.receiver = copy.receiver; this.msg = copy.msg; this.fn = copy.fn; this.stop = copy.stop; } // hybrid data structure. Copies until buffer is full and then allocates for intrusive list struct copy_queue { request * buffer; size_t count, buffer_size, index, utilized, last_size; }; static inline void ?{}( copy_queue & this ) {} static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) { buffer_size = buf_size; buffer = aalloc( buffer_size ); count = 0; utilized = 0; index = 0; last_size = 0; } static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); } static inline void insert( copy_queue & this, request & elem ) with(this) { if ( count >= buffer_size ) { // increase arr size last_size = buffer_size; buffer_size = 2 * buffer_size; buffer = realloc( buffer, sizeof( request ) * buffer_size ); /* paranoid */ verify( buffer ); } buffer[count]{ elem }; // C_TODO: change to memcpy // memcpy( &buffer[count], &elem, sizeof(request) ); count++; } // once you start removing you need to remove all elements // it is not supported to call insert() before the list is fully empty static inline request & remove( copy_queue & this ) with(this) { if ( count > 0 ) { count--; size_t old_idx = index; index = count == 0 ? 0 : index + 1; return buffer[old_idx]; } request * ret = 0p; return *0p; } // try to reclaim some memory static inline void reclaim( copy_queue & this ) with(this) { if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; } utilized = 0; buffer_size--; buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory } static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0; } static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global) struct work_queue { __spinlock_t mutex_lock; copy_queue owned_queue; copy_queue * c_queue; volatile bool being_processed; }; // work_queue static inline void ?{}( work_queue & this ) with(this) { owned_queue{ __buffer_size }; c_queue = &owned_queue; being_processed = false; } static inline void insert( work_queue & this, request & elem ) with(this) { lock( mutex_lock __cfaabi_dbg_ctx2 ); insert( *c_queue, elem ); unlock( mutex_lock ); } // insert static inline void transfer( work_queue & this, copy_queue ** transfer_to, work_queue ** queue_arr, unsigned int idx ) with(this) { lock( mutex_lock __cfaabi_dbg_ctx2 ); #if __STEAL #if __STEAL_WORK if ( unlikely( being_processed ) ) #else // check if queue has been stolen out from under us between // transfer() call and lock acquire C_TODO: maybe just use new queue! if ( unlikely( being_processed || queue_arr[idx] != &this ) ) #endif // __STEAL_WORK { unlock( mutex_lock ); return; } being_processed = c_queue->count != 0; #endif // __STEAL c_queue->utilized = c_queue->count; // swap copy queue ptrs copy_queue * temp = *transfer_to; *transfer_to = c_queue; c_queue = temp; unlock( mutex_lock ); } // transfer thread worker { work_queue ** request_queues; copy_queue * current_queue; worker ** worker_arr; // C_TODO: change n_workers, n_queues,worker_arr to just be pulled from ptr to executor request & req; unsigned int start, range, empty_count, n_workers, n_queues, id; #ifdef __STEAL_STATS unsigned int try_steal, stolen; #endif }; #ifdef __STEAL_STATS unsigned int total_tries = 0, total_stolen = 0, total_workers; #endif static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, unsigned int start, unsigned int range, worker ** worker_arr, unsigned int n_workers, unsigned int n_queues, unsigned int id ) { ((thread &)this){ clu }; this.request_queues = request_queues; this.current_queue = current_queue; this.start = start; this.range = range; this.empty_count = 0; this.n_workers = n_workers; this.worker_arr = worker_arr; this.n_queues = n_queues; this.id = id; #ifdef __STEAL_STATS this.try_steal = 0; this.stolen = 0; total_workers = n_workers; #endif } static inline void ^?{}( worker & mutex this ) with(this) { // delete( current_queue ); #ifdef __STEAL_STATS __atomic_add_fetch(&total_tries, try_steal, __ATOMIC_SEQ_CST); __atomic_add_fetch(&total_stolen, stolen, __ATOMIC_SEQ_CST); if (__atomic_sub_fetch(&total_workers, 1, __ATOMIC_SEQ_CST) == 0) printf("steal attempts: %u, steals: %u\n", total_tries, total_stolen); #endif } struct executor { cluster * cluster; // if workers execute on separate cluster processor ** processors; // array of virtual processors adding parallelism for workers work_queue * request_queues; // master array of work request queues copy_queue * local_queues; // array of all worker local queues to avoid deletion race work_queue ** worker_req_queues; // secondary array of work queues to allow for swapping worker ** workers; // array of workers executing work requests unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues bool seperate_clus; // use same or separate cluster for executor }; // executor static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) { if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" ); __buffer_size = buf_size; this.nprocessors = nprocessors; this.nworkers = nworkers; this.nrqueues = nrqueues; this.seperate_clus = seperate_clus; if ( seperate_clus ) { cluster = alloc(); (*cluster){}; } else cluster = active_cluster(); request_queues = aalloc( nrqueues ); worker_req_queues = aalloc( nrqueues ); for ( i; nrqueues ) { request_queues[i]{}; worker_req_queues[i] = &request_queues[i]; } processors = aalloc( nprocessors ); for ( i; nprocessors ) (*(processors[i] = alloc())){ *cluster }; local_queues = aalloc( nworkers ); workers = alloc( nworkers ); unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) { local_queues[i]{ buf_size }; range = reqPerWorker + ( i < extras ? 1 : 0 ); (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], start, range, workers, nworkers, nrqueues, i }; } // for } static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __buffer_size }; } static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; } static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; } static inline void ?{}( executor & this, unsigned int nprocessors ) { this{ nprocessors, __DEFAULT_EXECUTOR_WORKERS__ }; } static inline void ?{}( executor & this ) { this{ __DEFAULT_EXECUTOR_PROCESSORS__ }; } // C_TODO: once stealing is implemented make sure shutdown still works static inline void ^?{}( executor & this ) with(this) { #if __STEAL request sentinels[nrqueues]; for ( unsigned int i = 0; i < nrqueues; i++ ) { insert( request_queues[i], sentinels[i] ); // force eventually termination } // for #else request sentinels[nworkers]; unsigned int reqPerWorker = nrqueues / nworkers; for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) { insert( request_queues[step], sentinels[i] ); // force eventually termination } // for #endif for ( i; nworkers ) delete( workers[i] ); for ( i; nprocessors ) { delete( processors[i] ); } // for adelete( workers ); adelete( local_queues ); adelete( request_queues ); adelete( worker_req_queues ); adelete( processors ); if ( seperate_clus ) delete( cluster ); } // this is a static field of executor but have to forward decl for get_next_ticket static unsigned int __next_ticket = 0; static inline unsigned int get_next_ticket( executor & this ) with(this) { return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; } // tickets // C_TODO: update globals in this file to be static fields once the project is done static executor * __actor_executor_ = 0p; static bool __actor_executor_passed = false; // was an executor passed to start_actor_system static unsigned long int __num_actors_; // number of actor objects in system static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish struct actor { unsigned long int ticket; // executor-queue handle to provide FIFO message execution Allocation allocation_; // allocation action }; static inline void ?{}( actor & this ) { // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive // member must be called to end it verifyf( __actor_executor_, "Creating actor before calling start_actor_system()." ); this.allocation_ = Nodelete; this.ticket = get_next_ticket( *__actor_executor_ ); __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST ); } static inline void ^?{}( actor & this ) {} static inline void check_actor( actor & this ) { if ( this.allocation_ != Nodelete ) { switch( this.allocation_ ) { case Delete: delete( &this ); break; case Destroy: CFA_DEBUG( this.ticket = MAX; ); // mark as terminated ^?{}(this); break; case Finished: CFA_DEBUG( this.ticket = MAX; ); // mark as terminated break; default: ; // stop warning } if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_SEQ_CST ) == 0 ) ) { // all actors have terminated unpark( __actor_executor_thd ); } } } struct message { Allocation allocation_; // allocation action }; static inline void ?{}( message & this ) { this.allocation_ = Nodelete; } static inline void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; } static inline void ^?{}( message & this ) {} static inline void check_message( message & this ) { switch ( this.allocation_ ) { // analyze message status case Nodelete: break; case Delete: delete( &this ); break; case Destroy: ^?{}(this); break; case Finished: break; } // switch } static inline void deliver_request( request & this ) { Allocation actor_allocation = this.fn( *this.receiver, *this.msg ); this.receiver->allocation_ = actor_allocation; check_actor( *this.receiver ); check_message( *this.msg ); } // Couple of ways to approach work stealing // 1: completely worker agnostic, just find a big queue and steal it // 2: track some heuristic of worker's load and focus on that and then pick a queue from that worker // worker heuristics: // - how many queues have work? // - size of largest queue // - total # of messages // - messages currently servicing // - pick randomly // - pick from closer threads/workers (this can be combined with others) // lock free or global lock for queue stealing #define __LOCK_SWP 0 __spinlock_t swp_lock; // tries to atomically swap two queues and returns a bool indicating if the swap failed static inline bool try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) { #if __LOCK_SWP lock( swp_lock __cfaabi_dbg_ctx2 ); work_queue * temp = request_queues[my_idx]; request_queues[my_idx] = request_queues[victim_idx]; request_queues[victim_idx] = temp; unlock( swp_lock ); return true; #else // __LOCK_SWP else work_queue * my_queue = request_queues[my_idx]; work_queue * other_queue = request_queues[victim_idx]; if ( other_queue == 0p || my_queue == 0p ) return false; // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) return false; // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) { /* paranoid */ verify( request_queues[my_idx] == 0p ); request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val return false; } // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically request_queues[my_idx] = other_queue; // last write does not need to be atomic return true; #endif // __LOCK_SWP } // once a worker to steal from has been chosen, choose queue to steal from static inline bool choose_queue( worker & this, unsigned int victim_id, unsigned int & last_idx ) with(this) { #if __RAND_QUEUE unsigned int tries = 0; const unsigned int start_idx = prng( n_queues ); work_queue * curr_steal_queue; for ( unsigned int i = start_idx; tries < n_queues; i = (i + 1) % n_queues ) { tries++; curr_steal_queue = request_queues[i]; #if __STEAL_WORK // avoid empty queues and queues that are being operated on if ( curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) ) continue; // in this case we just return from transfer if this doesn't work transfer( *curr_steal_queue, ¤t_queue, request_queues, i ); if ( isEmpty( *current_queue ) ) continue; last_idx = i; #ifdef __STEAL_STATS stolen++; #endif // __STEAL_STATS #else // __STEAL_WORK else // avoid empty queues and queues that are being operated on if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) ) continue; #ifdef __STEAL_STATS bool success = try_swap_queues( this, i, last_idx ); if ( success ) stolen++; #else try_swap_queues( this, i, last_idx ); #endif // __STEAL_STATS // C_TODO: try transfer immediately // transfer( *request_queues[last_idx], ¤t_queue, request_queues, last_idx ); // if ( isEmpty( *current_queue ) ) return false; return false; #endif // __STEAL_WORK return true; } // for return false; #elif __RAND_WORKER // have to calculate victim start and range since victim may be deleted before us in shutdown const unsigned int queues_per_worker = n_queues / n_workers; const unsigned int extras = n_queues % n_workers; unsigned int vic_start, vic_range; if ( extras > victim_id ) { vic_range = queues_per_worker + 1; vic_start = vic_range * victim_id; } else { vic_start = extras + victim_id * queues_per_worker; vic_range = queues_per_worker; } unsigned int start_idx = prng( vic_range ); unsigned int tries = 0; work_queue * curr_steal_queue; for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) { tries++; curr_steal_queue = request_queues[ i + vic_start ]; // avoid empty queues and queues that are being operated on if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) ) continue; try_swap_queues( this, i, last_idx ); #ifdef __STEAL_STATS bool success = try_swap_queues( this, i, last_idx ); if ( success ) stolen++; #else try_swap_queues( this, i, last_idx ); #endif // __STEAL_STATS // C_TODO: try transfer immediately // transfer( *request_queues[last_idx], ¤t_queue, request_queues, last_idx ); // if ( isEmpty( *current_queue ) ) return false; return false; } #endif } // choose a worker to steal from static inline bool choose_victim( worker & this, unsigned int & last_idx ) with(this) { #if __RAND_WORKER unsigned int victim = prng( n_workers ); if ( victim == id ) victim = ( victim + 1 ) % n_workers; return choose_queue( this, victim, last_idx ); #else return choose_queue( this, 0, last_idx ); #endif } // look for work to steal // returns a bool: true => a queue was stolen, false => no work was stolen static inline bool steal_work( worker & this, unsigned int & last_idx ) with(this) { // C_TODO: add debug tracking of how many steals occur // to steal queue acquire both queue's locks in address ordering (maybe can do atomic swap) // maybe a flag to hint which queue is being processed // look at count to see if queue is worth stealing (dont steal empty queues) // if steal and then flag is up then dont process and just continue looking at own queues // (best effort approach) its ok if stealing isn't fruitful // -> more important to not delay busy threads return choose_victim( this, last_idx ); } void main( worker & this ) with(this) { // threshold of empty queues we see before we go stealing const unsigned int steal_threshold = 2 * n_queues; unsigned int curr_idx; work_queue * curr_work_queue; Exit: for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers // C_TODO: potentially check queue count instead of immediately trying to transfer curr_idx = i + start; curr_work_queue = request_queues[curr_idx]; transfer( *curr_work_queue, ¤t_queue, request_queues, curr_idx ); if ( isEmpty( *current_queue ) ) { #if __STEAL empty_count++; if ( empty_count < steal_threshold ) continue; empty_count = 0; // C_TODO: look into stealing backoff schemes #ifdef __STEAL_STATS try_steal++; #endif // __STEAL_STATS if ( ! steal_work( this, curr_idx ) ) continue; #else // __STEAL else continue; #endif // __STEAL } while ( ! isEmpty( *current_queue ) ) { &req = &remove( *current_queue ); if ( !&req ) continue; // possibly add some work stealing/idle sleep here if ( req.stop ) break Exit; deliver_request( req ); } #if __STEAL curr_work_queue->being_processed = false; // set done processing #endif empty_count = 0; // we found work so reset empty counter reclaim( *current_queue ); } // for } static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) { insert( request_queues[ticket], req); } static inline void send( actor & this, request & req ) { send( *__actor_executor_, req, this.ticket ); } static inline void start_actor_system( size_t num_thds ) { __actor_executor_thd = active_thread(); __actor_executor_ = alloc(); (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 }; } static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); } static inline void start_actor_system( executor & this ) { __actor_executor_thd = active_thread(); __actor_executor_ = &this; __actor_executor_passed = true; } static inline void stop_actor_system() { park( ); // will receive signal when actor system is finished if ( !__actor_executor_passed ) delete( __actor_executor_ ); __actor_executor_ = 0p; __actor_executor_thd = 0p; __next_ticket = 0; __actor_executor_passed = false; }