// Mutex buffer is embedded in the nomutex executor to allow the executor to delete the workers without causing a // deadlock. If the executor is the monitor and the buffer is class, the thread calling the executor's destructor // (which is mutex) blocks when deleting the workers, preventing outstanding workers from calling remove to drain the // buffer. #include #include #include forall( dtype T ) monitor Buffer { // unbounded buffer __queue_t( T ) queue; // unbounded list of work requests condition delay; }; // Buffer forall( dtype T | is_node(T) ) { void insert( Buffer( T ) & mutex buf, T * elem ) with(buf) { append( queue, elem ); // insert element into buffer signal( delay ); // restart } // insert T * remove( Buffer( T ) & mutex buf ) with(buf) { if ( queue.head != 0 ) wait( delay ); // no request to process ? => wait // return pop_head( queue ); } // remove } // distribution struct WRequest { // client request, no return void (* action)( void ); WRequest * next; // intrusive queue field }; // WRequest WRequest *& get_next( WRequest & this ) { return this.next; } void ?{}( WRequest & req ) with(req) { action = 0; next = 0; } void ?{}( WRequest & req, void (* action)( void ) ) with(req) { req.action = action; next = 0; } bool stop( WRequest & req ) { return req.action == 0; } void doit( WRequest & req ) { req.action(); } // Each worker has its own work buffer to reduce contention between client and server. Hence, work requests arrive and // are distributed into buffers in a roughly round-robin order. thread Worker { Buffer( WRequest ) * requests; unsigned int start, range; }; // Worker void main( Worker & w ) with(w) { for ( int i = 0;; i = (i + 1) % range ) { WRequest * request = remove( requests[i + start] ); if ( ! request ) { yield(); continue; } if ( stop( *request ) ) break; doit( *request ); delete( request ); } // for } // Worker::main void ?{}( Worker & worker, cluster * wc, Buffer( WRequest ) * requests, unsigned int start, unsigned int range ) { (*get_thread(worker)){ *wc }; // create on given cluster worker.[requests, start, range] = [requests, start, range]; } // ?{} struct Executor { cluster * cluster; // if workers execute on separate cluster processor ** processors; // array of virtual processors adding parallelism for workers Buffer( WRequest ) * requests; // list of work requests Worker ** workers; // array of workers executing work requests unsigned int nprocessors, nworkers, nmailboxes; // number of mailboxes/workers/processor tasks bool sepClus; // use same or separate cluster for executor }; // Executor static thread_local unsigned int next; // demultiplexed across worker buffers unsigned int tickets( Executor & ex ) with(ex) { //return uFetchAdd( next, 1 ) % nmailboxes; return next++ % nmailboxes; // no locking, interference randomizes } // tickets void ?{}( Executor & ex, unsigned int np, unsigned int nw, unsigned int nm, bool sc = false ) with(ex) { [nprocessors, nworkers, nmailboxes, sepClus] = [np, nw, nm, sc]; assert( nmailboxes >= nworkers ); cluster = sepClus ? new( "Executor" ) : active_cluster(); processors = (processor **)anew( nprocessors ); requests = anew( nmailboxes ); workers = (Worker **)anew( nworkers ); for ( i; nprocessors ) { processors[ i ] = new( *cluster ); } // for unsigned int reqPerWorker = nmailboxes / nworkers, extras = nmailboxes % nworkers; for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker + ( i < extras ? 1 : 0 ) ) { workers[ i ] = new( cluster, requests, step, reqPerWorker + ( i < extras ? 1 : 0 ) ); } // for } // ?{} void ?{}( Executor & ex, unsigned int nprocessors, unsigned int nworkers, bool sepClus = false ) { ex{ nprocessors, nworkers, nworkers, sepClus }; } void ?{}( Executor & ex, unsigned int nprocessors, bool sepClus = false ) { ex{ nprocessors, nprocessors, nprocessors, sepClus }; } void ?{}( Executor & ex ) { // special for current cluster ex{ 0, active_cluster()->nprocessors, false }; } void ^?{}( Executor & ex ) with(ex) { // Add one sentinel per worker to stop them. Since in destructor, no new work should be queued. Cannot combine next // two loops and only have a single sentinel because workers arrive in arbitrary order, so worker1 may take the // single sentinel while waiting for worker 0 to end. WRequest sentinel[nworkers]; unsigned int reqPerWorker = nmailboxes / nworkers; for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) { insert( requests[step], &sentinel[i] ); // force eventually termination } // for for ( i; nworkers ) { delete( workers[ i ] ); } // for for ( i; nprocessors ) { delete( processors[ i ] ); } // for delete( workers ); delete( requests ); delete( processors ); if ( sepClus ) { delete( cluster ); } } // ^?{} void send( Executor & ex, void (* action)( void ) ) { // asynchronous call, no return value WRequest * node = new( action ); insert( ex.requests[tickets( ex )], node ); } // send int counter = 0; void workie( void ) { __atomic_add_fetch( &counter, 1, __ATOMIC_SEQ_CST ); // fprintf( stderr, "workie\n" ); } int main() { { Executor exector; for ( i; 3000 ) { send( exector, workie ); if ( i % 100 ) yield(); } // for } printf( "%d\n", counter ); } // Local Variables: // // compile-command: "cfa executor.cfa" // // End: //