// Mutex buffer is embedded in the nomutex executor to allow the executor to delete the workers without causing a // deadlock. If the executor is the monitor and the buffer is class, the thread calling the executor's destructor // (which is mutex) blocks when deleting the workers, preventing outstanding workers from calling remove to drain the // buffer. #include #include forall( T & | $dlistable(T, T) ) { monitor Buffer { // unbounded buffer dlist( T, T ) queue; // unbounded list of work requests condition delay; }; // Buffer void insert( Buffer(T) & mutex buf, T * elem ) with(buf) { dlist( T, T ) * qptr = &queue; // workaround https://cforall.uwaterloo.ca/trac/ticket/166 insert_last( *qptr, *elem ); // insert element into buffer signal( delay ); // restart } // insert T * remove( Buffer(T) & mutex buf ) with(buf) { dlist( T, T ) * qptr = &queue; // workaround https://cforall.uwaterloo.ca/trac/ticket/166 // if ( (*qptr)`is_empty ) wait( delay ); // no request to process ? => wait if ( (*qptr)`is_empty ) return 0p; // no request to process ? => wait return &pop_first( *qptr ); } // remove } // forall struct WRequest { // client request, no return void (* action)( void ); DLISTED_MGD_IMPL_IN(WRequest) }; // WRequest DLISTED_MGD_IMPL_OUT(WRequest) void ?{}( WRequest & req ) with(req) { action = 0; } void ?{}( WRequest & req, void (* action)( void ) ) with(req) { req.action = action; } bool stop( WRequest & req ) { return req.action == 0; } void doit( WRequest & req ) { req.action(); } // Each worker has its own set (when requests buffers > workers) of work buffers to reduce contention between client // and server, where work requests arrive and are distributed into buffers in a roughly round-robin order. thread Worker { Buffer(WRequest) * requests; WRequest * request; unsigned int start, range; }; // Worker void main( Worker & w ) with(w) { for ( int i = 0;; i = (i + 1) % range ) { request = remove( requests[i + start] ); if ( ! request ) { yield(); continue; } if ( stop( *request ) ) break; doit( *request ); delete( request ); } // for } // Worker::main void ?{}( Worker & worker, cluster * wc, Buffer(WRequest) * requests, unsigned int start, unsigned int range ) { ((thread &)worker){ *wc }; worker.[requests, request, start, range] = [requests, ((WRequest *) 0), start, range]; } // ?{} WRequest * current_request( Worker & worker ) { return worker.request; } struct Executor { cluster * cluster; // if workers execute on separate cluster processor ** processors; // array of virtual processors adding parallelism for workers Buffer(WRequest) * requests; // list of work requests Worker ** workers; // array of workers executing work requests unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues bool sepClus; // use same or separate cluster for executor unsigned int next; // demultiplexed across worker buffers }; // Executor unsigned int tickets( Executor & ex ) with(ex) { //return uFetchAdd( next, 1 ) % nrqueues; return next++ % nrqueues; // no locking, interference randomizes } // tickets void ?{}( Executor & ex, unsigned int np, unsigned int nw, unsigned int nr, bool sc = false ) with(ex) { [nprocessors, nworkers, nrqueues, sepClus] = [np, nw, nr, sc]; assert( nrqueues >= nworkers ); cluster = 0p; processors = aalloc( nprocessors ); requests = anew( nrqueues ); workers = aalloc( nworkers ); for ( i; nprocessors ) { processors[i] = new( *cluster ); } // for unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; // for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) { for ( i; nworkers : start; 0u ~ @ ~ range : range; ) { range = reqPerWorker + ( i < extras ? 1 : 0 ); workers[i] = new( cluster, requests, start, range ); } // for } // ?{} void ?{}( Executor & ex, unsigned int nprocessors, unsigned int nworkers, bool sepClus = false ) { ex{ nprocessors, nworkers, nworkers, sepClus }; } void ?{}( Executor & ex, unsigned int nprocessors, bool sepClus = false ) { ex{ nprocessors, nprocessors, nprocessors, sepClus }; } void ?{}( Executor & ex ) { // special for current cluster, no processors added ex{ 0, 5, false }; } void ^?{}( Executor & ex ) with(ex) { // Add one sentinel per worker to stop them. Since in destructor, no new external work should be queued. Cannot // combine next two loops and only have a single sentinel because workers arrive in arbitrary order, so worker1 may // take the single sentinel while waiting for worker 0 to end. WRequest sentinel[nworkers]; unsigned int reqPerWorker = nrqueues / nworkers; for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) { insert( requests[step], &sentinel[i] ); // force eventually termination } // for for ( i; nworkers ) { delete( workers[i] ); } // for for ( i; nprocessors ) { delete( processors[i] ); } // for free( workers ); // adelete( nrqueues, requests ); for ( i; nrqueues ) ^?{}( requests[i] ); // FIX ME: problem with resolver free( requests ); free( processors ); if ( sepClus ) { delete( cluster ); } } // ^?{} void send( Executor & ex, void (* action)( void ) ) { // asynchronous call, no return value WRequest * node = new( action ); insert( ex.requests[tickets( ex )], node ); } // send int counter = 0; void work( void ) { __atomic_add_fetch( &counter, 1, __ATOMIC_SEQ_CST ); // fprintf( stderr, "workie\n" ); } int main( int argc, char * argv[] ) { int times = 1_000_000; if ( argc == 2 ) times = atoi( argv[1] ); processor p[7]; { Executor exector; for ( i; times ) { send( exector, work ); if ( i % 100 == 0 ) yield(); } // for } printf( "%d\n", counter ); } // Local Variables: // // tab-width: 4" // // compile-command: "cfa executor.cfa" // // End: //