#include #include #include #ifdef __CFA_DEBUG__ #define CFA_DEBUG( stmt ) stmt #else #define CFA_DEBUG( stmt ) #endif // CFA_DEBUG // Define the default number of processors created in the executor. Must be greater than 0. #define __DEFAULT_EXECUTOR_PROCESSORS__ 2 // Define the default number of threads created in the executor. Must be greater than 0. #define __DEFAULT_EXECUTOR_WORKERS__ 2 // Define the default number of executor request-queues (mailboxes) written to by actors and serviced by the // actor-executor threads. Must be greater than 0. #define __DEFAULT_EXECUTOR_RQUEUES__ 2 // Define if executor is created in a separate cluster #define __DEFAULT_EXECUTOR_SEPCLUS__ false // forward decls struct actor; struct message; enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status typedef Allocation (*__receive_fn)(actor &, message &); struct request { actor * receiver; message * msg; __receive_fn fn; bool stop; inline dlink(request); }; P9_EMBEDDED( request, dlink(request) ) void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) { this.receiver = receiver; this.msg = msg; this.fn = fn; this.stop = false; } struct work_queue { futex_mutex mutex_lock; dlist( request ) input; // unbounded list of work requests }; // work_queue void ?{}( work_queue & this ) with(this) { input{}; mutex_lock{}; } void insert( work_queue & this, request & elem ) with(this) { lock( mutex_lock ); insert_last( input, elem ); unlock( mutex_lock ); } // insert void transfer( work_queue & this, dlist(request) & transferTo ) with(this) { lock( mutex_lock ); //C_TODO CHANGE // transferTo->transfer( input ); // transfer input to output // this is awfully inefficient but Ill use it until transfer is implemented request * r; while ( ! input`isEmpty ) { r = &try_pop_front( input ); if ( r ) insert_last( transferTo, *r ); } // transfer( input, transferTo ); unlock( mutex_lock ); } // transfer thread worker { work_queue * request_queues; dlist( request ) current_queue; request & req; unsigned int start, range; }; static inline void ?{}( worker & this, cluster & clu, work_queue * request_queues, unsigned int start, unsigned int range ) { ((thread &)this){ clu }; this.request_queues = request_queues; this.current_queue{}; this.start = start; this.range = range; } struct executor { cluster * cluster; // if workers execute on separate cluster processor ** processors; // array of virtual processors adding parallelism for workers work_queue * request_queues; // master list of work request queues worker ** workers; // array of workers executing work requests unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues bool seperate_clus; // use same or separate cluster for executor }; // executor static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) with(this) { if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" ); this.nprocessors = nprocessors; this.nworkers = nworkers; this.nrqueues = nrqueues; this.seperate_clus = seperate_clus; if ( seperate_clus ) { cluster = alloc(); (*cluster){}; } else cluster = active_cluster(); request_queues = aalloc( nrqueues ); for ( i; nrqueues ) request_queues[i]{}; processors = aalloc( nprocessors ); for ( i; nprocessors ) (*(processors[i] = alloc())){ *cluster }; workers = alloc( nworkers ); unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) { range = reqPerWorker + ( i < extras ? 1 : 0 ); (*(workers[i] = alloc())){ *cluster, request_queues, start, range }; } // for } static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; } static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; } static inline void ?{}( executor & this, unsigned int nprocessors ) { this{ nprocessors, __DEFAULT_EXECUTOR_WORKERS__ }; } static inline void ?{}( executor & this ) { this{ __DEFAULT_EXECUTOR_PROCESSORS__ }; } static inline void ^?{}( executor & this ) with(this) { request sentinels[nworkers]; unsigned int reqPerWorker = nrqueues / nworkers; for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) { insert( request_queues[step], sentinels[i] ); // force eventually termination } // for for ( i; nworkers ) delete( workers[i] ); for ( i; nprocessors ) { delete( processors[i] ); } // for delete( workers ); delete( request_queues ); delete( processors ); if ( seperate_clus ) delete( cluster ); } // this is a static field of executor but have to forward decl for get_next_ticket static unsigned int __next_ticket = 0; static inline unsigned int get_next_ticket( executor & this ) with(this) { return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; } // tickets // C_TODO: update globals in this file to be static fields once the project is done static executor * __actor_executor_ = 0p; static bool __actor_executor_passed = false; // was an executor passed to start_actor_system static unsigned long int __num_actors_; // number of actor objects in system static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish struct actor { unsigned long int ticket; // executor-queue handle to provide FIFO message execution Allocation allocation_; // allocation action }; void ?{}( actor & this ) { // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive // member must be called to end it verifyf( __actor_executor_, "Creating actor before calling start_actor_system()." ); this.allocation_ = Nodelete; this.ticket = get_next_ticket( *__actor_executor_ ); __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST ); } void ^?{}( actor & this ) {} static inline void check_actor( actor & this ) { if ( this.allocation_ != Nodelete ) { switch( this.allocation_ ) { case Delete: delete( &this ); break; case Destroy: CFA_DEBUG( this.ticket = MAX; ); // mark as terminated ^?{}(this); break; case Finished: CFA_DEBUG( this.ticket = MAX; ); // mark as terminated break; default: ; // stop warning } if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_SEQ_CST ) == 0 ) ) { // all actors have terminated unpark( __actor_executor_thd ); } } } struct message { Allocation allocation_; // allocation action }; void ?{}( message & this ) { this.allocation_ = Nodelete; } void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; } void ^?{}( message & this ) {} static inline void check_message( message & this ) { switch ( this.allocation_ ) { // analyze message status case Nodelete: break; case Delete: delete( &this ); break; case Destroy: ^?{}(this); break; case Finished: break; } // switch } void deliver_request( request & this ) { Allocation actor_allocation = this.fn( *this.receiver, *this.msg ); this.receiver->allocation_ = actor_allocation; check_actor( *this.receiver ); check_message( *this.msg ); } void main( worker & this ) with(this) { Exit: for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers transfer( request_queues[i + start], current_queue ); while ( ! current_queue`isEmpty ) { &req = &try_pop_front( current_queue ); if ( !&req ) continue; // possibly add some work stealing/idle sleep here if ( req.stop ) break Exit; deliver_request( req ); delete( &req ); } // while } // for } static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) { insert( request_queues[ticket], req); } static inline void send( actor & this, request & req ) { send( *__actor_executor_, req, this.ticket ); } static inline void start_actor_system( size_t num_thds ) { __actor_executor_thd = active_thread(); __actor_executor_ = alloc(); (*__actor_executor_){ 0, num_thds, num_thds * 16 }; } static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); } static inline void start_actor_system( executor & this ) { __actor_executor_thd = active_thread(); __actor_executor_ = &this; __actor_executor_passed = true; } static inline void stop_actor_system() { park( ); // will receive signal when actor system is finished if ( !__actor_executor_passed ) delete( __actor_executor_ ); __actor_executor_ = 0p; __actor_executor_thd = 0p; __next_ticket = 0; __actor_executor_passed = false; }