Index: libcfa/src/executor.cfa
===================================================================
--- libcfa/src/executor.cfa	(revision cbbd8fd770714fe40fffe90e2b2b492682101bfb)
+++ libcfa/src/executor.cfa	(revision 089ee6bd97c8a8ad1dcf8e5982f246ed6fe212b2)
@@ -6,9 +6,31 @@
 #include <containers/list.hfa>
 #include <thread.hfa>
-#include <stdio.h>
+//#include <malloc.h>										// trace
 
-struct WRequest {					// client request, no return
-    void (* action)( void );
-    DLISTED_MGD_IMPL_IN(WRequest)
+forall( dtype T | $dlistable(T, T) ) {
+	monitor Buffer {									// unbounded buffer
+		dlist( T, T ) queue;							// unbounded list of work requests
+		condition delay;
+	}; // Buffer
+
+	void insert( Buffer(T) & mutex buf, T * elem ) with(buf) {
+		dlist( T, T ) * qptr = &queue;					// workaround https://cforall.uwaterloo.ca/trac/ticket/166
+		insert_last( *qptr, *elem );					// insert element into buffer
+		signal( delay );								// restart
+	} // insert
+
+	T * remove( Buffer(T) & mutex buf ) with(buf) {
+		dlist( T, T ) * qptr = &queue;					// workaround https://cforall.uwaterloo.ca/trac/ticket/166
+		if ( (*qptr)`is_empty ) wait( delay );			// no request to process ? => wait
+		return &pop_first( *qptr );
+	} // remove
+
+//	void ?{}( Buffer(T) & ) {}
+//	void ^?{}( Buffer(T) & mutex ) {}
+} // forall
+
+struct WRequest {										// client request, no return
+	void (* action)( void );
+	DLISTED_MGD_IMPL_IN(WRequest)
 }; // WRequest
 DLISTED_MGD_IMPL_OUT(WRequest)
@@ -19,134 +41,133 @@
 void doit( WRequest & req ) { req.action(); }
 
-monitor WRBuffer {					// unbounded buffer
-    dlist( WRequest, WRequest ) queue;			// unbounded list of work requests
-    condition delay;
-}; // WRBuffer
-
-void insert( WRBuffer & mutex buf, WRequest * elem ) with(buf) {
-    insert_last( queue, *elem );			// insert element into buffer
-    signal( delay );				        // restart
-} // insert
-
-WRequest * remove( WRBuffer & mutex buf ) with(buf) {
-    if ( queue`is_empty ) wait( delay );	        // no request to process ? => wait
-    return & pop_first( queue );
-} // remove
-
-// Each worker has its own work buffer to reduce contention between client and server. Hence, work requests arrive and
-// are distributed into buffers in a roughly round-robin order.
+// Each worker has its own set (when requests buffers > workers) of work buffers to reduce contention between client
+// and server, where work requests arrive and are distributed into buffers in a roughly round-robin order.
 
 thread Worker {
-    WRBuffer * requests;
-    unsigned int start, range;
+	Buffer(WRequest) * requests;
+	WRequest * request;
+	unsigned int start, range;
 }; // Worker
 
 void main( Worker & w ) with(w) {
-    for ( int i = 0;; i = (i + 1) % range ) {
-	WRequest * request = remove( requests[i + start] );
-      if ( ! request ) { yield(); continue; }
-      if ( stop( *request ) ) break;
-	doit( *request );
-	delete( request );
-    } // for
+	for ( int i = 0;; i = (i + 1) % range ) {
+		request = remove( requests[i + start] );
+	  if ( ! request ) { yield(); continue; }
+	  if ( stop( *request ) ) break;
+		doit( *request );
+		delete( request );
+	} // for
 } // Worker::main
 
-void ?{}( Worker & worker, cluster * wc, WRBuffer * requests, unsigned int start, unsigned int range ) {
-    (*get_thread(worker)){ *wc };			// create on given cluster
-    worker.[requests, start, range] = [requests, start, range];
+void ?{}( Worker & worker, cluster * wc, Buffer(WRequest) * requests, unsigned int start, unsigned int range ) {
+	((thread &)worker){ *wc };
+	worker.[requests, request, start, range] = [requests, 0p, start, range];
 } // ?{}
 
+WRequest * current_request( Worker & worker ) { return worker.request; }
+
 struct Executor {
-    cluster * cluster;					// if workers execute on separate cluster
-    processor ** processors;				// array of virtual processors adding parallelism for workers
-    WRBuffer * requests;			        // list of work requests
-    Worker ** workers;					// array of workers executing work requests
-    unsigned int nprocessors, nworkers, nmailboxes;	// number of mailboxes/workers/processor tasks
-    bool sepClus;					// use same or separate cluster for executor
+	cluster * cluster;									// if workers execute on separate cluster
+	processor ** processors;							// array of virtual processors adding parallelism for workers
+	Buffer(WRequest) * requests;						// list of work requests
+	Worker ** workers;									// array of workers executing work requests
+	unsigned int nprocessors, nworkers, nrqueues;		// number of processors/threads/request queues
+	bool sepClus;										// use same or separate cluster for executor
 }; // Executor
 
-static thread_local unsigned int next;			// demultiplexed across worker buffers
+static thread_local unsigned int next;					// demultiplexed across worker buffers
+
 unsigned int tickets( Executor & ex ) with(ex) {
-    //return uFetchAdd( next, 1 ) % nmailboxes;
-    return next++ % nmailboxes;				// no locking, interference randomizes
+	//return uFetchAdd( next, 1 ) % nrqueues;
+	return next++ % nrqueues;							// no locking, interference randomizes
 } // tickets
 
-void ?{}( Executor & ex, unsigned int np, unsigned int nw, unsigned int nm, bool sc = false ) with(ex) {
-    [nprocessors, nworkers, nmailboxes, sepClus] = [np, nw, nm, sc];
-    assert( nmailboxes >= nworkers );
-    cluster = sepClus ? new( "Executor" ) : active_cluster();
-    processors = (processor **)anew( nprocessors );
-    requests = (WRBuffer *)anew( nmailboxes );
-    workers = (Worker **)anew( nworkers );
+void ?{}( Executor & ex, unsigned int np, unsigned int nw, unsigned int nr, bool sc = false ) with(ex) {
+	[nprocessors, nworkers, nrqueues, sepClus] = [np, nw, nr, sc];
+	assert( nrqueues >= nworkers );
+	cluster = sepClus ? new( "Executor" ) : active_cluster();
+	processors = aalloc( nprocessors );
+	requests = anew( nrqueues );
+	workers = aalloc( nworkers );
 
-    for ( i; nprocessors ) {
-	processors[ i ] = new( *cluster );
-    } // for
+	for ( i; nprocessors ) {
+		processors[i] = new( *cluster );
+	} // for
 
-    unsigned int reqPerWorker = nmailboxes / nworkers, extras = nmailboxes % nworkers;
-    for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker + ( i < extras ? 1 : 0 ) ) {
-	workers[ i ] = new( cluster, requests, step, reqPerWorker + ( i < extras ? 1 : 0 ) );
-    } // for
+	unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
+//	for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
+    for ( i; nworkers : start; 0u ~ @ ~ range : range; ) {
+	    range = reqPerWorker + ( i < extras ? 1 : 0 );
+		workers[i] = new( cluster, requests, start, range );
+	} // for
 } // ?{}
 
 void ?{}( Executor & ex, unsigned int nprocessors, unsigned int nworkers, bool sepClus = false ) {
-    ex{ nprocessors, nworkers, nworkers, sepClus };
+	ex{ nprocessors, nworkers, nworkers, sepClus };
 }
 void ?{}( Executor & ex, unsigned int nprocessors, bool sepClus = false ) {
-    ex{ nprocessors, nprocessors, nprocessors, sepClus };
+	ex{ nprocessors, nprocessors, nprocessors, sepClus };
 }
-void ?{}( Executor & ex ) {				// special for current cluster
-    ex{ 0, active_cluster()->nprocessors, false };
+void ?{}( Executor & ex ) {								// special for current cluster
+	ex{ 0, active_cluster()->nprocessors, false };
 }
 void ^?{}( Executor & ex ) with(ex) {
-    // Add one sentinel per worker to stop them. Since in destructor, no new work should be queued.  Cannot combine next
-    // two loops and only have a single sentinel because workers arrive in arbitrary order, so worker1 may take the
-    // single sentinel while waiting for worker 0 to end.
+	// Add one sentinel per worker to stop them. Since in destructor, no new external work should be queued.  Cannot
+	// combine next two loops and only have a single sentinel because workers arrive in arbitrary order, so worker1 may
+	// take the single sentinel while waiting for worker 0 to end.
 
-    WRequest sentinel[nworkers];
-    unsigned int reqPerWorker = nmailboxes / nworkers;
-    for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {
-	insert( requests[step], &sentinel[i] );		// force eventually termination
-    } // for
-    for ( i; nworkers ) {
-    	delete( workers[ i ] );
-    } // for
-    for ( i; nprocessors ) {
-	delete( processors[ i ] );
-    } // for
+	WRequest sentinel[nworkers];
+	unsigned int reqPerWorker = nrqueues / nworkers;
+	for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {
+		insert( requests[step], &sentinel[i] );			// force eventually termination
+	} // for
+	for ( i; nworkers ) {
+		delete( workers[i] );
+	} // for
+	for ( i; nprocessors ) {
+		delete( processors[i] );
+	} // for
 
-    delete( workers );
-    delete( requests );
-    delete( processors );
-    if ( sepClus ) { delete( cluster ); }
+	free( workers );
+//	adelete( nrqueues, requests );
+	for ( i; nrqueues ) ^?{}( requests[i] );
+	free( requests );
+	free( processors );
+	if ( sepClus ) { delete( cluster ); }
 } // ^?{}
 
 void send( Executor & ex, void (* action)( void ) ) {	// asynchronous call, no return value
-    WRequest * node = new( action );
-    insert( ex.requests[tickets( ex )], node );
+	WRequest * node = new( action );
+	insert( ex.requests[tickets( ex )], node );
 } // send
+
 
 int counter = 0;
 
 void workie( void ) {
-    __atomic_add_fetch( &counter, 1, __ATOMIC_SEQ_CST );
-//    fprintf( stderr, "workie\n" );
+	__atomic_add_fetch( &counter, 1, __ATOMIC_SEQ_CST );
+//	fprintf( stderr, "workie\n" );
 }
 
-int main() {
-    {
-	Executor exector;
-	for ( i; 3000 ) {
-	    send( exector, workie );
-	    if ( i % 100 == 0 ) {
-//              fprintf( stderr, "%d\n", i );
-                yield();
-            }
-	} // for
-    }
-    printf( "%d\n", counter );
+int main( int argc, char * argv[] ) {
+	int times = 1_000_000;
+	if ( argc == 2 ) times = atoi( argv[1] );
+	processor p[7];
+	{
+		printf( "%d\n", active_cluster()->nprocessors );
+		Executor exector;
+		for ( i; times ) {
+			send( exector, workie );
+			if ( i % 100 == 0 ) {
+//			  fprintf( stderr, "%d\n", i );
+				yield();
+			}
+		} // for
+	}
+	printf( "%d\n", counter );
 }
 
 // Local Variables: //
+// tab-width: 4" //
 // compile-command: "cfa executor.cfa" //
 // End: //
