#pragma once #include #include #include #include #ifdef __CFA_DEBUG__ #define CFA_DEBUG( stmt ) stmt #else #define CFA_DEBUG( stmt ) #endif // CFA_DEBUG // Define the default number of processors created in the executor. Must be greater than 0. #define __DEFAULT_EXECUTOR_PROCESSORS__ 2 // Define the default number of threads created in the executor. Must be greater than 0. #define __DEFAULT_EXECUTOR_WORKERS__ 2 // Define the default number of executor request-queues (mailboxes) written to by actors and serviced by the // actor-executor threads. Must be greater than 0. #define __DEFAULT_EXECUTOR_RQUEUES__ 2 // Define if executor is created in a separate cluster #define __DEFAULT_EXECUTOR_SEPCLUS__ false // forward decls struct actor; struct message; enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status typedef Allocation (*__receive_fn)(actor &, message &); struct request { actor * receiver; message * msg; __receive_fn fn; bool stop; inline dlink(request); }; P9_EMBEDDED( request, dlink(request) ) static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) { this.receiver = receiver; this.msg = msg; this.fn = fn; this.stop = false; } static inline void ?{}( request & this, request & copy ) { this.receiver = copy.receiver; this.msg = copy.msg; this.fn = copy.fn; this.stop = copy.stop; } // hybrid data structure. Copies until buffer is full and then allocates for intrusive list struct copy_queue { dlist( request ) list; request * buffer; size_t count, buffer_size, index; }; static inline void ?{}( copy_queue & this ) {} static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) { list{}; buffer_size = buf_size; buffer = aalloc( buffer_size ); count = 0; index = 0; } static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); } static inline void insert( copy_queue & this, request & elem ) with(this) { if ( count < buffer_size ) { // fast path ( no alloc ) buffer[count]{ elem }; count++; return; } request * new_elem = alloc(); (*new_elem){ elem }; insert_last( list, *new_elem ); } // once you start removing you need to remove all elements // it is not supported to call insert() before the list is fully empty // should_delete is an output param static inline request & remove( copy_queue & this, bool & should_delete ) with(this) { if ( count > 0 ) { count--; should_delete = false; size_t old_idx = index; index = count == 0 ? 0 : index + 1; return buffer[old_idx]; } should_delete = true; return try_pop_front( list ); } static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0 && list`isEmpty; } static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global) struct work_queue { futex_mutex mutex_lock; copy_queue * c_queue; // C_TODO: try putting this on the stack with ptr juggling }; // work_queue static inline void ?{}( work_queue & this ) with(this) { c_queue = alloc(); (*c_queue){ __buffer_size }; // C_TODO: support passing copy buff size as arg to executor } static inline void ^?{}( work_queue & this ) with(this) { delete( c_queue ); } static inline void insert( work_queue & this, request & elem ) with(this) { lock( mutex_lock ); insert( *c_queue, elem ); unlock( mutex_lock ); } // insert static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) { lock( mutex_lock ); // swap copy queue ptrs copy_queue * temp = *transfer_to; *transfer_to = c_queue; c_queue = temp; unlock( mutex_lock ); } // transfer thread worker { work_queue * request_queues; copy_queue * current_queue; request & req; unsigned int start, range; }; static inline void ?{}( worker & this, cluster & clu, work_queue * request_queues, unsigned int start, unsigned int range ) { ((thread &)this){ clu }; this.request_queues = request_queues; this.current_queue = alloc(); (*this.current_queue){ __buffer_size }; this.start = start; this.range = range; } static inline void ^?{}( worker & mutex this ) with(this) { delete( current_queue ); } struct executor { cluster * cluster; // if workers execute on separate cluster processor ** processors; // array of virtual processors adding parallelism for workers work_queue * request_queues; // master list of work request queues worker ** workers; // array of workers executing work requests unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues bool seperate_clus; // use same or separate cluster for executor }; // executor static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) { if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" ); __buffer_size = buf_size; this.nprocessors = nprocessors; this.nworkers = nworkers; this.nrqueues = nrqueues; this.seperate_clus = seperate_clus; if ( seperate_clus ) { cluster = alloc(); (*cluster){}; } else cluster = active_cluster(); request_queues = aalloc( nrqueues ); for ( i; nrqueues ) request_queues[i]{}; processors = aalloc( nprocessors ); for ( i; nprocessors ) (*(processors[i] = alloc())){ *cluster }; workers = alloc( nworkers ); unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) { range = reqPerWorker + ( i < extras ? 1 : 0 ); (*(workers[i] = alloc())){ *cluster, request_queues, start, range }; } // for } static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __buffer_size }; } static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; } static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; } static inline void ?{}( executor & this, unsigned int nprocessors ) { this{ nprocessors, __DEFAULT_EXECUTOR_WORKERS__ }; } static inline void ?{}( executor & this ) { this{ __DEFAULT_EXECUTOR_PROCESSORS__ }; } static inline void ^?{}( executor & this ) with(this) { request sentinels[nworkers]; unsigned int reqPerWorker = nrqueues / nworkers; for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) { insert( request_queues[step], sentinels[i] ); // force eventually termination } // for for ( i; nworkers ) delete( workers[i] ); for ( i; nprocessors ) { delete( processors[i] ); } // for adelete( workers ); adelete( request_queues ); adelete( processors ); if ( seperate_clus ) delete( cluster ); } // this is a static field of executor but have to forward decl for get_next_ticket static unsigned int __next_ticket = 0; static inline unsigned int get_next_ticket( executor & this ) with(this) { return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; } // tickets // C_TODO: update globals in this file to be static fields once the project is done static executor * __actor_executor_ = 0p; static bool __actor_executor_passed = false; // was an executor passed to start_actor_system static unsigned long int __num_actors_; // number of actor objects in system static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish struct actor { unsigned long int ticket; // executor-queue handle to provide FIFO message execution Allocation allocation_; // allocation action }; static inline void ?{}( actor & this ) { // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive // member must be called to end it verifyf( __actor_executor_, "Creating actor before calling start_actor_system()." ); this.allocation_ = Nodelete; this.ticket = get_next_ticket( *__actor_executor_ ); __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST ); } static inline void ^?{}( actor & this ) {} static inline void check_actor( actor & this ) { if ( this.allocation_ != Nodelete ) { switch( this.allocation_ ) { case Delete: delete( &this ); break; case Destroy: CFA_DEBUG( this.ticket = MAX; ); // mark as terminated ^?{}(this); break; case Finished: CFA_DEBUG( this.ticket = MAX; ); // mark as terminated break; default: ; // stop warning } if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_SEQ_CST ) == 0 ) ) { // all actors have terminated unpark( __actor_executor_thd ); } } } struct message { Allocation allocation_; // allocation action }; static inline void ?{}( message & this ) { this.allocation_ = Nodelete; } static inline void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; } static inline void ^?{}( message & this ) {} static inline void check_message( message & this ) { switch ( this.allocation_ ) { // analyze message status case Nodelete: break; case Delete: delete( &this ); break; case Destroy: ^?{}(this); break; case Finished: break; } // switch } static inline void deliver_request( request & this ) { Allocation actor_allocation = this.fn( *this.receiver, *this.msg ); this.receiver->allocation_ = actor_allocation; check_actor( *this.receiver ); check_message( *this.msg ); } void main( worker & this ) with(this) { bool should_delete; Exit: for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers // C_TODO: potentially check queue count instead of immediately trying to transfer transfer( request_queues[i + start], ¤t_queue ); while ( ! isEmpty( *current_queue ) ) { &req = &remove( *current_queue, should_delete ); if ( !&req ) continue; // possibly add some work stealing/idle sleep here if ( req.stop ) break Exit; deliver_request( req ); if ( should_delete ) delete( &req ); } // while } // for } static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) { insert( request_queues[ticket], req); } static inline void send( actor & this, request & req ) { send( *__actor_executor_, req, this.ticket ); } static inline void start_actor_system( size_t num_thds ) { __actor_executor_thd = active_thread(); __actor_executor_ = alloc(); (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 }; } static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); } static inline void start_actor_system( executor & this ) { __actor_executor_thd = active_thread(); __actor_executor_ = &this; __actor_executor_passed = true; } static inline void stop_actor_system() { park( ); // will receive signal when actor system is finished if ( !__actor_executor_passed ) delete( __actor_executor_ ); __actor_executor_ = 0p; __actor_executor_thd = 0p; __next_ticket = 0; __actor_executor_passed = false; }