Index: libcfa/src/executor.cfa
===================================================================
--- libcfa/src/executor.cfa	(revision d30fdbcf79ad6405a42681f6b90a353a48ce87e7)
+++ libcfa/src/executor.cfa	(revision d30fdbcf79ad6405a42681f6b90a353a48ce87e7)
@@ -0,0 +1,151 @@
+// Mutex buffer is embedded in the nomutex executor to allow the executor to delete the workers without causing a
+// deadlock.  If the executor is the monitor and the buffer is class, the thread calling the executor's destructor
+// (which is mutex) blocks when deleting the workers, preventing outstanding workers from calling remove to drain the
+// buffer.
+
+#include <bits/containers.hfa>
+#include <thread.hfa>
+#include <stdio.h>
+
+forall( otype T | is_node(T) | is_monitor(T) ) {
+    monitor Buffer {					// unbounded buffer
+	__queue_t( T ) queue;				// unbounded list of work requests
+	condition delay;
+    }; // Buffer
+
+    void insert( Buffer( T ) & mutex buf, T * elem ) with(buf) {
+	append( queue, elem );				// insert element into buffer
+	signal( delay );				// restart
+    } // insert
+
+    T * remove( Buffer( T ) & mutex buf ) with(buf) {
+	if ( ! queue ) wait( delay );			// no request to process ? => wait
+	return pop_head( queue );
+    } // remove
+} // distribution
+
+struct WRequest {					// client request, no return
+    void (* action)( void );
+    WRequest * next;					// intrusive queue field
+}; // WRequest
+
+WRequest *& get_next( WRequest & this ) { return this.next; }
+void ?{}( WRequest & req ) with(req) { action = 0; next = 0; }
+void ?{}( WRequest & req, void (* action)( void ) ) with(req) { req.action = action; next = 0; }
+bool stop( WRequest & req ) { return req.action == 0; }
+void doit( WRequest & req ) { req.action(); }
+
+// Each worker has its own work buffer to reduce contention between client and server. Hence, work requests arrive and
+// are distributed into buffers in a roughly round-robin order.
+
+thread Worker {
+    Buffer( WRequest ) * requests;
+    unsigned int start, range;
+}; // Worker
+
+void main( Worker & w ) with(w) {
+    for ( int i = 0;; i = (i + 1) % range ) {
+	WRequest * request = remove( requests[i + start] );
+      if ( ! request ) { yield(); continue; }
+      if ( stop( *request ) ) break;
+	doit( *request );
+	delete( request );
+    } // for
+} // Worker::main
+
+void ?{}( Worker & worker, cluster * wc, Buffer( WRequest ) * requests, unsigned int start, unsigned int range ) {
+    (*get_thread(worker)){ *wc };			// create on given cluster
+    worker.[requests, start, range] = [requests, start, range];
+} // ?{}
+
+struct Executor {
+    cluster * cluster;					// if workers execute on separate cluster
+    processor ** processors;				// array of virtual processors adding parallelism for workers
+    Buffer( WRequest ) * requests;			// list of work requests
+    Worker ** workers;					// array of workers executing work requests
+    unsigned int nprocessors, nworkers, nmailboxes;	// number of mailboxes/workers/processor tasks
+    bool sepClus;					// use same or separate cluster for executor
+}; // Executor
+
+static thread_local unsigned int next;			// demultiplexed across worker buffers
+unsigned int tickets( Executor & ex ) with(ex) {
+    //return uFetchAdd( next, 1 ) % nmailboxes;
+    return next++ % nmailboxes;				// no locking, interference randomizes
+} // tickets
+
+void ?{}( Executor & ex, unsigned int np, unsigned int nw, unsigned int nm, bool sc = false ) with(ex) {
+    [nprocessors, nworkers, nmailboxes, sepClus] = [np, nw, nm, sc];
+    assert( nmailboxes >= nworkers );
+    cluster = sepClus ? new( "Executor" ) : active_cluster();
+    processors = (processor **)anew( nprocessors );
+    requests = anew( nmailboxes );
+    workers = (Worker **)anew( nworkers );
+
+    for ( i; nprocessors ) {
+	processors[ i ] = new( *cluster );
+    } // for
+
+    unsigned int reqPerWorker = nmailboxes / nworkers, extras = nmailboxes % nworkers;
+    for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker + ( i < extras ? 1 : 0 ) ) {
+	workers[ i ] = new( cluster, requests, step, reqPerWorker + ( i < extras ? 1 : 0 ) );
+    } // for
+} // ?{}
+
+void ?{}( Executor & ex, unsigned int nprocessors, unsigned int nworkers, bool sepClus = false ) {
+    ex{ nprocessors, nworkers, nworkers, sepClus };
+}
+void ?{}( Executor & ex, unsigned int nprocessors, bool sepClus = false ) {
+    ex{ nprocessors, nprocessors, nprocessors, sepClus };
+}
+void ?{}( Executor & ex ) {				// special for current cluster
+    ex{ 0, active_cluster()->nprocessors, false };
+}
+void ^?{}( Executor & ex ) with(ex) {
+    // Add one sentinel per worker to stop them. Since in destructor, no new work should be queued.  Cannot combine next
+    // two loops and only have a single sentinel because workers arrive in arbitrary order, so worker1 may take the
+    // single sentinel while waiting for worker 0 to end.
+
+    WRequest sentinel[nworkers];
+    unsigned int reqPerWorker = nmailboxes / nworkers;
+    for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {
+	insert( requests[step], &sentinel[i] );		// force eventually termination
+    } // for
+    for ( i; nworkers ) {
+    	delete( workers[ i ] );
+    } // for
+    for ( i; nprocessors ) {
+	delete( processors[ i ] );
+    } // for
+
+    delete( workers );
+    delete( requests );
+    delete( processors );
+    if ( sepClus ) { delete( cluster ); }
+} // ^?{}
+
+void send( Executor & ex, void (* action)( void ) ) {	// asynchronous call, no return value
+    WRequest * node = new( action );
+    insert( ex.requests[tickets( ex )], node );
+} // send
+
+int counter = 0;
+
+void workie( void ) {
+    __atomic_add_fetch( &counter, 1, __ATOMIC_SEQ_CST );
+//    fprintf( stderr, "workie\n" );
+}
+
+int main() {
+    {
+	Executor exector;
+	for ( i; 3000 ) {
+	    send( exector, workie );
+	    if ( i % 100 ) yield();
+	} // for
+    }
+    printf( "%d\n", counter );
+}
+
+// Local Variables: //
+// compile-command: "cfa executor.cfa" //
+// End: //
