Index: benchmark/io/http/Makefile.am
===================================================================
--- benchmark/io/http/Makefile.am	(revision db7a3ad393c27727cb72177db76e03d07067dbe1)
+++ benchmark/io/http/Makefile.am	(revision aeb20a4d256d7e9fe5da6b3be21994f201fa9cbe)
@@ -37,4 +37,6 @@
 	options.cfa \
 	options.hfa \
+	printer.cfa \
+	printer.hfa \
 	protocol.cfa \
 	protocol.hfa \
Index: benchmark/io/http/main.cfa
===================================================================
--- benchmark/io/http/main.cfa	(revision db7a3ad393c27727cb72177db76e03d07067dbe1)
+++ benchmark/io/http/main.cfa	(revision aeb20a4d256d7e9fe5da6b3be21994f201fa9cbe)
@@ -25,4 +25,5 @@
 #include "options.hfa"
 #include "socket.hfa"
+#include "printer.hfa"
 #include "worker.hfa"
 
@@ -31,74 +32,4 @@
 Duration default_preemption() {
 	return 0;
-}
-
-//=============================================================================================
-// Stats Printer
-//============================================================================================='
-
-thread StatsPrinter {
-	Worker * workers;
-	int worker_cnt;
-	condition_variable(fast_block_lock) var;
-};
-
-void ?{}( StatsPrinter & this, cluster & cl ) {
-	((thread&)this){ "Stats Printer Thread", cl };
-	this.worker_cnt = 0;
-}
-
-void ^?{}( StatsPrinter & mutex this ) {}
-
-#define eng3(X) (ws(3, 3, unit(eng( X ))))
-
-void main(StatsPrinter & this) {
-	LOOP: for() {
-		waitfor( ^?{} : this) {
-			break LOOP;
-		}
-		or else {}
-
-		wait(this.var, 10`s);
-
-		print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
-		if(this.worker_cnt != 0) {
-			uint64_t tries = 0;
-			uint64_t calls = 0;
-			uint64_t header = 0;
-			uint64_t splcin = 0;
-			uint64_t splcot = 0;
-			struct {
-				volatile uint64_t calls;
-				volatile uint64_t bytes;
-			} avgrd[zipf_cnts];
-			memset(avgrd, 0, sizeof(avgrd));
-
-			for(i; this.worker_cnt) {
-				tries += this.workers[i].stats.sendfile.tries;
-				calls += this.workers[i].stats.sendfile.calls;
-				header += this.workers[i].stats.sendfile.header;
-				splcin += this.workers[i].stats.sendfile.splcin;
-				splcot += this.workers[i].stats.sendfile.splcot;
-				for(j; zipf_cnts) {
-					avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls;
-					avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes;
-				}
-			}
-
-			double ratio = ((double)tries) / calls;
-
-			sout | "----- Worker Stats -----";
-			sout | "sendfile  : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";
-			sout | "            " | header | "header," | splcin | "splice in," | splcot | "splice out";
-			sout | " - zipf sizes:";
-			for(i; zipf_cnts) {
-				double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;
-				sout | "        " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";
-			}
-		}
-		else {
-			sout | "No Workers!";
-		}
-	}
 }
 
@@ -109,6 +40,4 @@
 	cluster self;
 	processor    * procs;
-	// io_context   * ctxs;
-	StatsPrinter * prnt;
 
 };
@@ -152,22 +81,12 @@
 	}
 
-	if(options.stats) {
-		this.prnt = alloc();
-		(*this.prnt){ this.self };
-	} else {
-		this.prnt = 0p;
-	}
-
 	#if !defined(__CFA_NO_STATISTICS__)
 		print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
 	#endif
 
-	options.clopts.instance[options.clopts.cltr_cnt] = &this.self;
-	options.clopts.cltr_cnt++;
+	options.clopts.instance = &this.self;
 }
 
 void ^?{}( ServerCluster & this ) {
-	delete(this.prnt);
-
 	for(i; options.clopts.nprocs) {
 		^(this.procs[i]){};
@@ -180,4 +99,13 @@
 extern void init_protocol(void);
 extern void deinit_protocol(void);
+
+//=============================================================================================
+// REUSEPORT
+//=============================================================================================
+
+size_t sockarr_size;
+struct __attribute__((aligned(128))) Q {
+	mpsc_queue(PendingRead) q;
+};
 
 //=============================================================================================
@@ -235,7 +163,4 @@
 
 	int server_fd;
-	if(!options.socket.manyreuse) {
-		server_fd = listener(address, addrlen);
-	}
 
 	//===================
@@ -257,32 +182,70 @@
 		{
 			// Stats printer makes a copy so this needs to persist longer than normal
-			Worker * workers;
-			ServerCluster cl[options.clopts.nclusters];
+			connection ** conns;
+			AcceptWorker  * aworkers = 0p;
+			ChannelWorker * cworkers = 0p;
+			Acceptor * acceptors = 0p;
+			Q * queues = 0p;
+			ServerCluster cl;
+
+			if(options.stats) {
+				stats_thrd = alloc();
+				(*stats_thrd){ cl.self };
+			} else {
+				stats_thrd = 0p;
+			}
 
 			init_protocol();
 			{
-				workers = anew(options.clopts.nworkers);
-				cl[0].prnt->workers = workers;
-				cl[0].prnt->worker_cnt = options.clopts.nworkers;
-				for(i; options.clopts.nworkers) {
-					// if( options.file_cache.fixed_fds ) {
-					// 	workers[i].pipe[0] = pipe_off + (i * 2) + 0;
-					// 	workers[i].pipe[1] = pipe_off + (i * 2) + 1;
-					// }
-					// else
-					{
-						workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0];
-						workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1];
-						workers[i].sockfd  = options.socket.manyreuse ?  listener(address, addrlen) : server_fd;
-						workers[i].addr    = (struct sockaddr *)&address;
-						workers[i].addrlen = (socklen_t*)&addrlen;
-						workers[i].flags   = 0;
-					}
-					unpark( workers[i] );
-				}
-				sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
-				for(i; options.clopts.nclusters) {
-					sout | options.clopts.thrd_cnt[i] | nonl;
-				}
+				conns = alloc(options.clopts.nworkers);
+				if(options.socket.reuseport) {
+					queues = alloc(options.clopts.nprocs);
+					acceptors = anew(options.clopts.nprocs);
+					for(i; options.clopts.nprocs) {
+						(queues[i]){};
+						{
+							acceptors[i].sockfd  = listener(address, addrlen);
+							acceptors[i].addr    = (struct sockaddr *)&address;
+							acceptors[i].addrlen = (socklen_t*)&addrlen;
+							acceptors[i].flags   = 0;
+							acceptors[i].queue   = &queues[i].q;
+						}
+						unpark( acceptors[i] );
+					}
+
+					cworkers = anew(options.clopts.nworkers);
+					for(i; options.clopts.nworkers) {
+						{
+							cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
+							cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
+							cworkers[i].queue = &queues[i % options.clopts.nprocs].q;
+							conns[i] = &cworkers[i].conn;
+						}
+						unpark( cworkers[i] );
+					}
+				}
+				else {
+					server_fd = listener(address, addrlen);
+					aworkers = anew(options.clopts.nworkers);
+					for(i; options.clopts.nworkers) {
+						// if( options.file_cache.fixed_fds ) {
+						// 	workers[i].pipe[0] = pipe_off + (i * 2) + 0;
+						// 	workers[i].pipe[1] = pipe_off + (i * 2) + 1;
+						// }
+						// else
+						{
+							aworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
+							aworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
+							aworkers[i].sockfd = server_fd;
+							aworkers[i].addr    = (struct sockaddr *)&address;
+							aworkers[i].addrlen = (socklen_t*)&addrlen;
+							aworkers[i].flags   = 0;
+							conns[i] = &aworkers[i].conn;
+						}
+						unpark( aworkers[i] );
+					}
+				}
+
+				sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors";
 				sout | nl;
 				{
@@ -307,49 +270,87 @@
 				}
 
-				sout | "Notifying connections..." | nonl; flush( sout );
-				for(i; options.clopts.nworkers) {
-					workers[i].done = true;
-				}
-				sout | "done";
-
-				sout | "Shutting down socket..." | nonl; flush( sout );
-				if(options.socket.manyreuse) {
-					for(i; options.clopts.nworkers) {
-						ret = shutdown( workers[i].sockfd, SHUT_RD );
-						if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) );
+				//===================
+				// Close Socket and join
+				if(options.socket.reuseport) {
+					sout | "Notifying connections..." | nonl; flush( sout );
+					for(i; options.clopts.nprocs) {
+						acceptors[i].done = true;
+					}
+					for(i; options.clopts.nworkers) {
+						cworkers[i].done = true;
+					}
+					sout | "done";
+
+					sout | "Shutting down Socket..." | nonl; flush( sout );
+					for(i; options.clopts.nprocs) {
+						ret = shutdown( acceptors[i].sockfd, SHUT_RD );
+						if( ret < 0 ) {
+							abort( "shutdown1 error: (%d) %s\n", (int)errno, strerror(errno) );
+						}
+					}
+					sout | "done";
+
+					sout | "Closing Socket..." | nonl; flush( sout );
+					for(i; options.clopts.nprocs) {
+						ret = close( acceptors[i].sockfd );
+						if( ret < 0) {
+							abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
+						}
+					}
+					sout | "done";
+
+					sout | "Stopping accept threads..." | nonl; flush( sout );
+					for(i; options.clopts.nprocs) {
+						join(acceptors[i]);
+					}
+					sout | "done";
+
+					sout | "Draining worker queues..." | nonl; flush( sout );
+					for(i; options.clopts.nprocs) {
+						PendingRead * p = 0p;
+						while(p = pop(queues[i].q)) {
+							fulfil(p->f, -ECONNRESET);
+						}
+					}
+					sout | "done";
+
+					sout | "Stopping worker threads..." | nonl; flush( sout );
+					for(i; options.clopts.nworkers) {
+						for(j; 2) {
+							ret = close(cworkers[i].conn.pipe[j]);
+							if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
+						}
+						join(cworkers[i]);
 					}
 				}
 				else {
+					sout | "Notifying connections..." | nonl; flush( sout );
+					for(i; options.clopts.nworkers) {
+						aworkers[i].done = true;
+					}
+					sout | "done";
+
+					sout | "Shutting down Socket..." | nonl; flush( sout );
 					ret = shutdown( server_fd, SHUT_RD );
 					if( ret < 0 ) {
-						abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
-					}
-				}
-				sout | "done";
-
-				//===================
-				// Close Socket
-				sout | "Closing Socket..." | nonl; flush( sout );
-				if(options.socket.manyreuse) {
-					for(i; options.clopts.nworkers) {
-						ret = close(workers[i].sockfd);
-						if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) );
-					}
-				}
-				else {
+						abort( "shutdown2 error: (%d) %s\n", (int)errno, strerror(errno) );
+					}
+					sout | "done";
+
+					sout | "Closing Socket..." | nonl; flush( sout );
 					ret = close( server_fd );
 					if(ret < 0) {
 						abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
 					}
-				}
-				sout | "done";
-
-				sout | "Stopping connection threads..." | nonl; flush( sout );
-				for(i; options.clopts.nworkers) {
-					for(j; 2) {
-						ret = close(workers[i].pipe[j]);
-						if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
-					}
-					join(workers[i]);
+					sout | "done";
+
+					sout | "Stopping connection threads..." | nonl; flush( sout );
+					for(i; options.clopts.nworkers) {
+						for(j; 2) {
+							ret = close(aworkers[i].conn.pipe[j]);
+							if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
+						}
+						join(aworkers[i]);
+					}
 				}
 			}
@@ -361,15 +362,16 @@
 
 			sout | "Stopping printer threads..." | nonl; flush( sout );
-			for(i; options.clopts.nclusters) {
-				StatsPrinter * p = cl[i].prnt;
-				if(p) {
-					notify_one(p->var);
-					join(*p);
-				}
-			}
+			if(stats_thrd) {
+				notify_one(stats_thrd->var);
+			}
+			delete(stats_thrd);
 			sout | "done";
 
 			// Now that the stats printer is stopped, we can reclaim this
-			adelete(workers);
+			adelete(aworkers);
+			adelete(cworkers);
+			adelete(acceptors);
+			adelete(queues);
+			free(conns);
 
 			sout | "Stopping processors/clusters..." | nonl; flush( sout );
@@ -377,13 +379,5 @@
 		sout | "done";
 
-		// sout | "Closing splice fds..." | nonl; flush( sout );
-		// for(i; pipe_cnt) {
-		// 	ret = close( fds[pipe_off + i] );
-		// 	if(ret < 0) {
-		// 		abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );
-		// 	}
-		// }
 		free(fds);
-		sout | "done";
 
 		sout | "Stopping processors..." | nonl; flush( sout );
Index: benchmark/io/http/options.cfa
===================================================================
--- benchmark/io/http/options.cfa	(revision db7a3ad393c27727cb72177db76e03d07067dbe1)
+++ benchmark/io/http/options.cfa	(revision aeb20a4d256d7e9fe5da6b3be21994f201fa9cbe)
@@ -38,10 +38,8 @@
 		10,    // backlog
 		1024,  // buflen
-		false, // onereuse
-		false  // manyreuse
+		false  // reuseport
 	},
 
 	{ // cluster
-		1,     // nclusters;
 		1,     // nprocs;
 		1,     // nworkers;
@@ -54,16 +52,9 @@
 
 void parse_options( int argc, char * argv[] ) {
-	// bool fixedfd = false;
-	// bool sqkpoll = false;
-	// bool iokpoll = false;
 	unsigned nentries = 0;
-	bool isolate = false;
-
-
 	static cfa_option opt[] = {
 		{ 'p', "port",           "Port the server will listen on", options.socket.port},
 		{ 'c', "cpus",           "Number of processors to use", options.clopts.nprocs},
 		{ 't', "threads",        "Number of worker threads to use", options.clopts.nworkers},
-		{'\0', "isolate",        "Create one cluster per processor", isolate, parse_settrue},
 		{'\0', "log",            "Enable logs", options.log, parse_settrue},
 		{'\0', "sout",           "Redirect standard out to file", options.reopen_stdout},
@@ -72,6 +63,5 @@
 		{'\0', "shell",          "Disable interactive mode", options.interactive, parse_setfalse},
 		{'\0', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog},
-		{'\0', "reuseport-one",  "Create a single listen socket with SO_REUSEPORT", options.socket.onereuse, parse_settrue},
-		{'\0', "reuseport",      "Use many listen sockets with SO_REUSEPORT", options.socket.manyreuse, parse_settrue},
+		{'\0', "reuseport",      "Use acceptor threads with reuse port SO_REUSEPORT", options.socket.reuseport, parse_settrue},
 		{'\0', "request_len",    "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen},
 		{'\0', "seed",           "seed to use for hashing", options.file_cache.hash_seed },
@@ -101,15 +91,7 @@
 		nentries = v;
 	}
-	if(isolate) {
-		options.clopts.nclusters = options.clopts.nprocs;
-		options.clopts.nprocs = 1;
-	}
 	options.clopts.params.num_entries = nentries;
-	options.clopts.instance = alloc(options.clopts.nclusters);
-	options.clopts.thrd_cnt = alloc(options.clopts.nclusters);
-	options.clopts.cltr_cnt = 0;
-	for(i; options.clopts.nclusters) {
-		options.clopts.thrd_cnt[i] = 0;
-	}
+	options.clopts.instance = 0p;
+	options.clopts.thrd_cnt = 0;
 
 
Index: benchmark/io/http/options.hfa
===================================================================
--- benchmark/io/http/options.hfa	(revision db7a3ad393c27727cb72177db76e03d07067dbe1)
+++ benchmark/io/http/options.hfa	(revision aeb20a4d256d7e9fe5da6b3be21994f201fa9cbe)
@@ -27,10 +27,8 @@
 		int backlog;
 		int buflen;
-		bool onereuse;
-		bool manyreuse;
+		bool reuseport;
 	} socket;
 
 	struct {
-		int nclusters;
 		int nprocs;
 		int nworkers;
@@ -38,7 +36,6 @@
 		bool procstats;
 		bool viewhalts;
-		cluster ** instance;
-		size_t   * thrd_cnt;
-		size_t     cltr_cnt;
+		cluster * instance;
+		size_t    thrd_cnt;
 	} clopts;
 };
Index: benchmark/io/http/printer.cfa
===================================================================
--- benchmark/io/http/printer.cfa	(revision aeb20a4d256d7e9fe5da6b3be21994f201fa9cbe)
+++ benchmark/io/http/printer.cfa	(revision aeb20a4d256d7e9fe5da6b3be21994f201fa9cbe)
@@ -0,0 +1,98 @@
+#include "printer.hfa"
+
+#include <fstream.hfa>
+#include <stats.hfa>
+
+void ?{}( sendfile_stats_t & this ) {
+	this.calls = 0;
+	this.tries = 0;
+	this.header = 0;
+	this.splcin = 0;
+	this.splcot = 0;
+	for(i; zipf_cnts) {
+		this.avgrd[i].calls = 0;
+		this.avgrd[i].bytes = 0;
+	};
+}
+
+void push(sendfile_stats_t & from, sendfile_stats_t & to) {
+	__atomic_fetch_add(&to.calls, from.calls, __ATOMIC_RELAXED); from.calls = 0;
+	__atomic_fetch_add(&to.tries, from.tries, __ATOMIC_RELAXED); from.tries = 0;
+	__atomic_fetch_add(&to.header, from.header, __ATOMIC_RELAXED); from.header = 0;
+	__atomic_fetch_add(&to.splcin, from.splcin, __ATOMIC_RELAXED); from.splcin = 0;
+	__atomic_fetch_add(&to.splcot, from.splcot, __ATOMIC_RELAXED); from.splcot = 0;
+	for(i; zipf_cnts) {
+		__atomic_fetch_add(&to.avgrd[i].calls, from.avgrd[i].calls, __ATOMIC_RELAXED); from.avgrd[i].calls = 0;
+		__atomic_fetch_add(&to.avgrd[i].bytes, from.avgrd[i].bytes, __ATOMIC_RELAXED); from.avgrd[i].bytes = 0;
+	};
+}
+
+void ?{}( acceptor_stats_t & this ) {
+	this.creates = 0;
+	this.accepts = 0;
+	this.eagains = 0;
+}
+
+void push(acceptor_stats_t & from, acceptor_stats_t & to) {
+	__atomic_fetch_add(&to.creates, from.creates, __ATOMIC_RELAXED); from.creates = 0;
+	__atomic_fetch_add(&to.accepts, from.accepts, __ATOMIC_RELAXED); from.accepts = 0;
+	__atomic_fetch_add(&to.eagains, from.eagains, __ATOMIC_RELAXED); from.eagains = 0;
+}
+
+void ?{}( StatsPrinter & this, cluster & cl ) {
+	((thread&)this){ "Stats Printer Thread", cl };
+
+	memset(&this.stats, 0, sizeof(this.stats));;
+}
+
+void ^?{}( StatsPrinter & mutex this ) {}
+
+#define eng3(X) (ws(3, 3, unit(eng( X ))))
+
+void main(StatsPrinter & this) {
+	LOOP: for() {
+		waitfor( ^?{} : this) {
+			break LOOP;
+		}
+		or else {}
+
+		wait(this.var, 10`s);
+
+		print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
+		{
+			struct {
+				volatile uint64_t calls;
+				volatile uint64_t bytes;
+			} avgrd[zipf_cnts];
+			memset(avgrd, 0, sizeof(avgrd));
+
+			uint64_t tries = this.stats.send.tries;
+			uint64_t calls  = this.stats.send.calls;
+			uint64_t header = this.stats.send.header;
+			uint64_t splcin = this.stats.send.splcin;
+			uint64_t splcot = this.stats.send.splcot;
+			for(j; zipf_cnts) {
+				avgrd[j].calls += this.stats.send.avgrd[j].calls;
+				avgrd[j].bytes += this.stats.send.avgrd[j].bytes;
+			}
+
+			double ratio = ((double)tries) / calls;
+
+			if(this.stats.accpt.accepts > 0) {
+				sout | "----- Acceptor Stats -----";
+				sout | "accept  : " | this.stats.accpt.accepts | "calls," | this.stats.accpt.eagains | "eagains," | this.stats.accpt.creates | " thrds";
+			}
+
+			sout | "----- Connection Stats -----";
+			sout | "sendfile  : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";
+			sout | "            " | header | "header," | splcin | "splice in," | splcot | "splice out";
+			sout | " - zipf sizes:";
+			for(i; zipf_cnts) {
+				double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;
+				sout | "        " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";
+			}
+		}
+	}
+}
+
+StatsPrinter * stats_thrd;
Index: benchmark/io/http/printer.hfa
===================================================================
--- benchmark/io/http/printer.hfa	(revision aeb20a4d256d7e9fe5da6b3be21994f201fa9cbe)
+++ benchmark/io/http/printer.hfa	(revision aeb20a4d256d7e9fe5da6b3be21994f201fa9cbe)
@@ -0,0 +1,49 @@
+#pragma once
+
+#include <stdint.h>
+
+#include <locks.hfa>
+#include <thread.hfa>
+
+extern const size_t zipf_sizes[];
+enum { zipf_cnts = 36, };
+
+struct sendfile_stats_t {
+	volatile uint64_t calls;
+	volatile uint64_t tries;
+	volatile uint64_t header;
+	volatile uint64_t splcin;
+	volatile uint64_t splcot;
+	struct {
+		volatile uint64_t calls;
+		volatile uint64_t bytes;
+	} avgrd[zipf_cnts];
+};
+
+void ?{}( sendfile_stats_t & this );
+
+void push(sendfile_stats_t & from, sendfile_stats_t & to);
+
+struct acceptor_stats_t {
+	volatile uint64_t creates;
+	volatile uint64_t accepts;
+	volatile uint64_t eagains;
+};
+
+void ?{}( acceptor_stats_t & this );
+
+void push(acceptor_stats_t & from, acceptor_stats_t & to);
+
+thread StatsPrinter {
+	struct {
+		__spinlock_t lock;
+		sendfile_stats_t send;
+		acceptor_stats_t accpt;
+	} stats;
+	condition_variable(fast_block_lock) var;
+};
+
+void ?{}( StatsPrinter & this, cluster & cl );
+void ^?{}( StatsPrinter & mutex this );
+
+extern StatsPrinter * stats_thrd;
Index: benchmark/io/http/protocol.cfa
===================================================================
--- benchmark/io/http/protocol.cfa	(revision db7a3ad393c27727cb72177db76e03d07067dbe1)
+++ benchmark/io/http/protocol.cfa	(revision aeb20a4d256d7e9fe5da6b3be21994f201fa9cbe)
@@ -30,4 +30,13 @@
 #define PLAINTEXT_NOCOPY
 #define LINKED_IO
+
+static inline __s32 wait_res( io_future_t & this ) {
+	wait( this );
+	if( this.result < 0 ) {{
+		errno = -this.result;
+		return -1;
+	}}
+	return this.result;
+}
 
 struct https_msg_str {
@@ -470,4 +479,5 @@
 
 			if(is_error(splice_in.res)) {
+				if(splice_in.res.error == -EPIPE) return -ECONNRESET;
 				mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error;
 				close(fd);
@@ -503,5 +513,5 @@
 }
 
-[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) {
+[HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f) {
 	char * it = buffer;
 	size_t count = len - 1;
@@ -509,5 +519,12 @@
 	READ:
 	for() {
-		int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
+		int ret;
+		if( f ) {
+			ret = wait_res(*f);
+			reset(*f);
+			f = 0p;
+		} else {
+			ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
+		}
 		// int ret = read(fd, (void*)it, count);
 		if(ret == 0 ) return [OK200, true, 0, 0];
@@ -570,5 +587,5 @@
 
 void ?{}( DateFormater & this ) {
-	((thread&)this){ "Server Date Thread", *options.clopts.instance[0] };
+	((thread&)this){ "Server Date Thread", *options.clopts.instance };
 	this.idx = 0;
 	memset( &this.buffers[0], 0, sizeof(this.buffers[0]) );
Index: benchmark/io/http/protocol.hfa
===================================================================
--- benchmark/io/http/protocol.hfa	(revision db7a3ad393c27727cb72177db76e03d07067dbe1)
+++ benchmark/io/http/protocol.hfa	(revision aeb20a4d256d7e9fe5da6b3be21994f201fa9cbe)
@@ -1,4 +1,5 @@
 #pragma once
 
+struct io_future_t;
 struct sendfile_stats_t;
 
@@ -22,3 +23,3 @@
 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t count, struct sendfile_stats_t & );
 
-[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len);
+[HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f);
Index: benchmark/io/http/socket.cfa
===================================================================
--- benchmark/io/http/socket.cfa	(revision db7a3ad393c27727cb72177db76e03d07067dbe1)
+++ benchmark/io/http/socket.cfa	(revision aeb20a4d256d7e9fe5da6b3be21994f201fa9cbe)
@@ -26,10 +26,12 @@
 
 int listener(struct sockaddr_in & address, int addrlen) {
-	int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+	int type = SOCK_STREAM;
+	if(options.socket.reuseport) type |= SOCK_NONBLOCK;
+	int sockfd = socket(AF_INET, type, 0);
 	if(sockfd < 0) {
 		abort( "socket error: (%d) %s\n", (int)errno, strerror(errno) );
 	}
 
-	if(options.socket.onereuse || options.socket.manyreuse) {
+	if(options.socket.reuseport) {
 		int value = 1;
 		// if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const void*)&on, sizeof(on)))
Index: benchmark/io/http/worker.cfa
===================================================================
--- benchmark/io/http/worker.cfa	(revision db7a3ad393c27727cb72177db76e03d07067dbe1)
+++ benchmark/io/http/worker.cfa	(revision aeb20a4d256d7e9fe5da6b3be21994f201fa9cbe)
@@ -8,4 +8,5 @@
 #include <fstream.hfa>
 #include <iofwd.hfa>
+#include <mutex_stmt.hfa>
 
 #include "options.hfa"
@@ -14,40 +15,118 @@
 
 //=============================================================================================
-// Worker Thread
-//=============================================================================================
-void ?{}( Worker & this ) {
-	size_t cli = rand() % options.clopts.cltr_cnt;
-	((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
-	options.clopts.thrd_cnt[cli]++;
-	this.pipe[0] = -1;
-	this.pipe[1] = -1;
+// Generic connection handling
+//=============================================================================================
+static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f, unsigned long long & last ) {
+	REQUEST:
+	for() {
+		bool closed;
+		HttpCode code;
+		const char * file;
+		size_t name_size;
+
+		// Read the http request
+		if( options.log ) sout | "=== Reading request ===";
+		[code, closed, file, name_size] = http_read(fd, buffer, len, f);
+		f = 0p;
+
+		// if we are done, break out of the loop
+		if( closed ) break REQUEST;
+
+		// If this wasn't a request retrun 400
+		if( code != OK200 ) {
+			sout | "=== Invalid Request :" | code_val(code) | "===";
+			answer_error(fd, code);
+			continue REQUEST;
+		}
+
+		if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
+			if( options.log ) sout | "=== Request for /plaintext ===";
+
+			int ret = answer_plaintext(fd);
+			if( ret == -ECONNRESET ) break REQUEST;
+
+			if( options.log ) sout | "=== Answer sent ===";
+			continue REQUEST;
+		}
+
+		if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
+			if( options.log ) sout | "=== Request for /ping ===";
+
+			// Send the header
+			int ret = answer_empty(fd);
+			if( ret == -ECONNRESET ) break REQUEST;
+
+			if( options.log ) sout | "=== Answer sent ===";
+			continue REQUEST;
+		}
+
+		if( options.log ) {
+			sout | "=== Request for file " | nonl;
+			write(sout, file, name_size);
+			sout | " ===";
+		}
+
+		if( !options.file_cache.path ) {
+			if( options.log ) {
+				sout | "=== File Not Found (" | nonl;
+				write(sout, file, name_size);
+				sout | ") ===";
+			}
+			answer_error(fd, E405);
+			continue REQUEST;
+		}
+
+		// Get the fd from the file cache
+		int ans_fd;
+		size_t count;
+		[ans_fd, count] = get_file( file, name_size );
+
+		// If we can't find the file, return 404
+		if( ans_fd < 0 ) {
+			if( options.log ) {
+				sout | "=== File Not Found (" | nonl;
+				write(sout, file, name_size);
+				sout | ") ===";
+			}
+			answer_error(fd, E404);
+			continue REQUEST;
+		}
+
+		// Send the desired file
+		int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
+		if( ret == -ECONNRESET ) break REQUEST;
+
+		if( options.log ) sout | "=== Answer sent ===";
+	}
+
+	if (stats_thrd) {
+		unsigned long long next = rdtscl();
+		if(next > (last + 500000000)) {
+			if(try_lock(stats_thrd->stats.lock)) {
+				push(this.stats.sendfile, stats_thrd->stats.send);
+				unlock(stats_thrd->stats.lock);
+				last = next;
+			}
+		}
+	}
+}
+
+//=============================================================================================
+// Self Accepting Worker Thread
+//=============================================================================================
+void ?{}( AcceptWorker & this ) {
+	((thread&)this){ "Server Worker Thread", *options.clopts.instance, 64000 };
+	options.clopts.thrd_cnt++;
 	this.done = false;
-
-	this.stats.sendfile.calls = 0;
-	this.stats.sendfile.tries = 0;
-	this.stats.sendfile.header = 0;
-	this.stats.sendfile.splcin = 0;
-	this.stats.sendfile.splcot = 0;
-	for(i; zipf_cnts) {
-		this.stats.sendfile.avgrd[i].calls = 0;
-		this.stats.sendfile.avgrd[i].bytes = 0;
-	}
-}
-
-extern "C" {
-extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
-}
-
-void main( Worker & this ) {
+}
+
+void main( AcceptWorker & this ) {
 	park();
-	/* paranoid */ assert( this.pipe[0] != -1 );
-	/* paranoid */ assert( this.pipe[1] != -1 );
-
-	const bool reuse = options.socket.manyreuse;
-
-	CONNECTION:
+	unsigned long long last = rdtscl();
+	/* paranoid */ assert( this.conn.pipe[0] != -1 );
+	/* paranoid */ assert( this.conn.pipe[1] != -1 );
 	for() {
 		if( options.log ) sout | "=== Accepting connection ===";
-		int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], CFA_IO_LAZY );
+		int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY );
 		if(fd < 0) {
 			if( errno == ECONNABORTED ) break;
@@ -58,89 +137,101 @@
 
 		if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
-		REQUEST:
-		for() {
-			bool closed;
-			HttpCode code;
-			const char * file;
-			size_t name_size;
-
-			// Read the http request
-			size_t len = options.socket.buflen;
-			char buffer[len];
-			if( options.log ) sout | "=== Reading request ===";
-			[code, closed, file, name_size] = http_read(fd, buffer, len);
-
-			// if we are done, break out of the loop
-			if( closed ) break REQUEST;
-
-			// If this wasn't a request retrun 400
-			if( code != OK200 ) {
-				sout | "=== Invalid Request :" | code_val(code) | "===";
-				answer_error(fd, code);
-				continue REQUEST;
-			}
-
-			if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
-				if( options.log ) sout | "=== Request for /plaintext ===";
-
-				int ret = answer_plaintext(fd);
-				if( ret == -ECONNRESET ) break REQUEST;
-
-				if( options.log ) sout | "=== Answer sent ===";
-				continue REQUEST;
-			}
-
-			if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
-				if( options.log ) sout | "=== Request for /ping ===";
-
-				// Send the header
-				int ret = answer_empty(fd);
-				if( ret == -ECONNRESET ) break REQUEST;
-
-				if( options.log ) sout | "=== Answer sent ===";
-				continue REQUEST;
-			}
-
-			if( options.log ) {
-				sout | "=== Request for file " | nonl;
-				write(sout, file, name_size);
-				sout | " ===";
-			}
-
-			if( !options.file_cache.path ) {
-				if( options.log ) {
-					sout | "=== File Not Found (" | nonl;
-					write(sout, file, name_size);
-					sout | ") ===";
+		size_t len = options.socket.buflen;
+		char buffer[len];
+		handle_connection( this.conn, fd, buffer, len, 0p, last );
+
+		if( options.log ) sout | "=== Connection closed ===";
+	}
+}
+
+
+//=============================================================================================
+// Channel Worker Thread
+//=============================================================================================
+void ?{}( ChannelWorker & this ) {
+	((thread&)this){ "Server Worker Thread", *options.clopts.instance, 64000 };
+	options.clopts.thrd_cnt++;
+	this.done = false;
+}
+
+void main( ChannelWorker & this ) {
+	park();
+	unsigned long long last = rdtscl();
+	/* paranoid */ assert( this.conn.pipe[0] != -1 );
+	/* paranoid */ assert( this.conn.pipe[1] != -1 );
+	for() {
+		size_t len = options.socket.buflen;
+		char buffer[len];
+		PendingRead p;
+		p.in.buf = (void*)buffer;
+		p.in.len = len;
+		push(*this.queue, &p);
+
+		if( options.log ) sout | "=== Waiting new connection ===";
+		handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last );
+
+		if( options.log ) sout | "=== Connection closed ===";
+		if(this.done) break;
+	}
+}
+
+extern "C" {
+extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
+}
+
+void ?{}( Acceptor & this ) {
+	((thread&)this){ "Server Worker Thread", *options.clopts.instance, 64000 };
+	options.clopts.thrd_cnt++;
+	this.done = false;
+}
+
+void main( Acceptor & this ) {
+	park();
+	unsigned long long last = rdtscl();
+	if( options.log ) sout | "=== Accepting connection ===";
+	for() {
+		int fd = accept4(this.sockfd, this.[addr, addrlen, flags]);
+		if(fd < 0) {
+			if( errno == EWOULDBLOCK) {
+				this.stats.eagains++;
+				yield();
+				continue;
+			}
+			if( errno == ECONNABORTED ) break;
+			if( this.done && (errno == EINVAL || errno == EBADF) ) break;
+			abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
+		}
+		this.stats.accepts++;
+
+		if(this.done) return;
+
+		if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
+
+		if(fd) {
+			PendingRead * p = 0p;
+			for() {
+				if(this.done) return;
+				p = pop(*this.queue);
+				if(p) break;
+				yield();
+				this.stats.creates++;
+			};
+
+			p->out.fd = fd;
+			async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY);
+		}
+
+		if (stats_thrd) {
+			unsigned long long next = rdtscl();
+			if(next > (last + 500000000)) {
+				if(try_lock(stats_thrd->stats.lock)) {
+					push(this.stats, stats_thrd->stats.accpt);
+					unlock(stats_thrd->stats.lock);
+					last = next;
 				}
-				answer_error(fd, E405);
-				continue REQUEST;
-			}
-
-			// Get the fd from the file cache
-			int ans_fd;
-			size_t count;
-			[ans_fd, count] = get_file( file, name_size );
-
-			// If we can't find the file, return 404
-			if( ans_fd < 0 ) {
-				if( options.log ) {
-					sout | "=== File Not Found (" | nonl;
-					write(sout, file, name_size);
-					sout | ") ===";
-				}
-				answer_error(fd, E404);
-				continue REQUEST;
-			}
-
-			// Send the desired file
-			int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
-			if( ret == -ECONNRESET ) break REQUEST;
-
-			if( options.log ) sout | "=== Answer sent ===";
-		}
-
-		if( options.log ) sout | "=== Connection closed ===";
-		continue CONNECTION;
-	}
-}
+			}
+		}
+
+		if( options.log ) sout | "=== Accepting connection ===";
+	}
+}
Index: benchmark/io/http/worker.hfa
===================================================================
--- benchmark/io/http/worker.hfa	(revision db7a3ad393c27727cb72177db76e03d07067dbe1)
+++ benchmark/io/http/worker.hfa	(revision aeb20a4d256d7e9fe5da6b3be21994f201fa9cbe)
@@ -1,4 +1,6 @@
 #pragma once
 
+#include <iofwd.hfa>
+#include <queueLockFree.hfa>
 #include <thread.hfa>
 
@@ -7,25 +9,24 @@
 }
 
+#include "printer.hfa"
+
 //=============================================================================================
 // Worker Thread
 //=============================================================================================
 
-extern const size_t zipf_sizes[];
-enum { zipf_cnts = 36, };
-
-struct sendfile_stats_t {
-	volatile uint64_t calls;
-	volatile uint64_t tries;
-	volatile uint64_t header;
-	volatile uint64_t splcin;
-	volatile uint64_t splcot;
+struct connection {
+	int pipe[2];
 	struct {
-		volatile uint64_t calls;
-		volatile uint64_t bytes;
-	} avgrd[zipf_cnts];
+		sendfile_stats_t sendfile;
+	} stats;
 };
 
-thread Worker {
-	int pipe[2];
+static inline void ?{}( connection & this ) {
+	this.pipe[0] = -1;
+	this.pipe[1] = -1;
+}
+
+thread AcceptWorker {
+	connection conn;
 	int sockfd;
 	struct sockaddr * addr;
@@ -33,8 +34,42 @@
 	int flags;
 	volatile bool done;
+};
+void ?{}( AcceptWorker & this);
+void main( AcceptWorker & );
+
+
+struct PendingRead {
+	PendingRead * volatile next;
+	io_future_t f;
 	struct {
-		sendfile_stats_t sendfile;
-	} stats;
+		void * buf;
+		size_t len;
+	} in;
+	struct {
+		volatile int fd;
+	} out;
 };
-void ?{}( Worker & this);
-void main( Worker & );
+
+static inline PendingRead * volatile & ?`next ( PendingRead * node ) {
+	return node->next;
+}
+
+thread ChannelWorker {
+	connection conn;
+	volatile bool done;
+	mpsc_queue(PendingRead) * queue;
+};
+void ?{}( ChannelWorker & );
+void main( ChannelWorker & );
+
+thread Acceptor {
+	mpsc_queue(PendingRead) * queue;
+	int sockfd;
+	struct sockaddr * addr;
+	socklen_t * addrlen;
+	int flags;
+	volatile bool done;
+	acceptor_stats_t stats;
+};
+void ?{}( Acceptor & );
+void main( Acceptor & );
Index: benchmark/io/sendfile/producer.cfa
===================================================================
--- benchmark/io/sendfile/producer.cfa	(revision aeb20a4d256d7e9fe5da6b3be21994f201fa9cbe)
+++ benchmark/io/sendfile/producer.cfa	(revision aeb20a4d256d7e9fe5da6b3be21994f201fa9cbe)
@@ -0,0 +1,509 @@
+// programs that sends a file many times as fast as it can
+// compares sendfile to splice
+
+#define _GNU_SOURCE
+
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <errno.h>
+#include <locale.h>
+#include <time.h>
+#include <unistd.h>
+
+#include <sys/ioctl.h>
+#include <sys/sendfile.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <fcntl.h>
+
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+
+#include <liburing.h>
+
+enum {
+	USAGE_ERROR = 1,
+	HOST_ERROR,
+	PIPE_ERROR,
+	FSTAT_ERROR,
+	SOCKET_ERROR,
+	CONNECT_ERROR,
+	SENDFILE_ERROR,
+	SPLICEIN_ERROR,
+	SPLICEOUT_ERROR,
+	URINGWAIT_ERROR
+};
+
+enum { buffer_len = 10240 };
+char buffer[buffer_len];
+
+enum { TIMEGRAN = 1000000000LL, TIMES = 100000 };
+
+int pipefd[2];
+struct io_uring ring;
+
+char * buf;
+
+struct stats {
+	size_t calls;
+	size_t bytes;
+	struct {
+		struct {
+			size_t cnt;
+			size_t bytes;
+		} r, w;
+	} shorts;
+};
+static void my_sendfile(int out, int in, size_t size, struct stats *);
+static void my_splice  (int out, int in, size_t size, struct stats *);
+static void my_iouring (int out, int in, size_t size, struct stats *);
+static void my_ringlink(int out, int in, size_t size, struct stats *);
+static void my_readwrit(int out, int in, size_t size, struct stats *);
+typedef void (*sender_t)(int out, int in, size_t size, struct stats *);
+
+static void run(sender_t sender, struct addrinfo * addr, int infd, size_t size);
+
+int main(int argc, char * argv[]) {
+	setlocale(LC_ALL, "");
+	const char * file_path;
+	struct addrinfo * addr;
+	int file_fd;
+	int ret;
+	switch(argc) {
+	case 3:
+		{
+			// Open the file
+			const char * const path = argv[2];
+			ret = open(path, 0, O_RDONLY);
+			if(ret < 0) {
+				fprintf( stderr, "cannot open file '%s': %s\n\n", path, strerror(errno) );
+				goto USAGE;
+			}
+
+			file_path = path;
+			file_fd = ret;
+
+
+			// connect to the address
+			char * state = 0;
+			char * str = argv[1];
+			const char * const host = strtok_r(str, ":", &state);
+			if(NULL == host) {
+				fprintf( stderr, "Invalid host:port specification, no host.\n\n" );
+				goto USAGE;
+			}
+
+			const char * const port = strtok_r(NULL, ":", &state);
+			if(NULL == port) {
+				fprintf( stderr, "Invalid host:port specification, no port.\n\n" );
+				goto USAGE;
+			}
+
+			printf("looking up '%s:%s'\n", host, port);
+
+			struct addrinfo hints = {};
+			struct addrinfo * pResultList = NULL;
+
+			hints.ai_family = AF_INET;
+			hints.ai_socktype = SOCK_STREAM;
+			hints.ai_flags = AI_NUMERICSERV;
+
+			ret = getaddrinfo(host, port, &hints, &pResultList);
+
+			switch(ret) {
+			case 0:
+				addr = pResultList;
+				goto DONE;
+
+			case EAI_ADDRFAMILY:
+				fprintf( stderr, "The specified network host does not have any network addresses in the requested address family.\n\n" );
+				break;
+
+			case EAI_AGAIN:
+				fprintf( stderr, "The name server returned a temporary failure indication. Try again later.\n\n" );
+				exit( HOST_ERROR );
+
+			case EAI_BADFLAGS:
+				fprintf( stderr, "hints.ai_flags  contains invalid flags; or, hints.ai_flags included AI_CANONNAME and name was NULL.\n\n" );
+				exit( HOST_ERROR );
+
+			case EAI_FAIL:
+				fprintf( stderr, "The name server returned a permanent failure indication.\n\n" );
+				break;
+
+			case EAI_FAMILY:
+				fprintf( stderr, "The requested address family is not supported.\n\n" );
+				exit( HOST_ERROR );
+
+			case EAI_MEMORY:
+				fprintf( stderr, "Out of memory.\n\n" );
+				exit( HOST_ERROR );
+
+			case EAI_NODATA:
+				fprintf( stderr, "The specified network host exists, but does not have any network addresses defined.\n\n" );
+				break;
+
+			case EAI_NONAME:
+				fprintf( stderr, "The unkonwn host or invalid port.\n\n" );
+				break;
+
+			case EAI_SERVICE:
+				fprintf( stderr, "The requested service is not available for the requested socket type.\n\n" );
+				break;
+
+			case EAI_SOCKTYPE:
+				fprintf( stderr, "The requested  socket  type  is  not  supported.\n\n" );
+				exit( HOST_ERROR );
+
+			case EAI_SYSTEM:
+				// Other system error, check errno for details.
+			default:
+				fprintf( stderr, "Unnown hostname error: (%d) %s\n\n", (int)errno, strerror(errno) );
+				exit( HOST_ERROR );
+			}
+			if(pResultList) freeaddrinfo(pResultList);
+			goto USAGE;
+		}
+	USAGE:
+	default:
+		fprintf( stderr, "USAGE: %s host:port file\n", argv[0] );
+		exit( USAGE_ERROR );
+	}
+
+	DONE:
+
+	io_uring_queue_init(16, &ring, 0);
+
+	size_t file_size = 0;
+	{
+		struct stat buf;
+   		ret = fstat(file_fd, &buf);
+		if(0 != ret) {
+			fprintf( stderr, "fstat error: (%d) %s\n\n", (int)errno, strerror(errno) );
+			exit( FSTAT_ERROR );
+		}
+		file_size = buf.st_size;
+	}
+
+	{
+		char addr_str[INET_ADDRSTRLEN];
+		struct sockaddr_in * address = (struct sockaddr_in *) addr->ai_addr;
+		inet_ntop( AF_INET, &address->sin_addr, addr_str, INET_ADDRSTRLEN );
+		printf("sending '%s' (%zu bytes) to '%s:%i'\n", file_path, file_size, addr_str, ntohs(address->sin_port));
+	}
+
+	ret = pipe(pipefd);
+	if( ret < 0 ) {
+		fprintf( stderr, "pipe error: (%d) %s\n\n", (int)errno, strerror(errno) );
+		exit( PIPE_ERROR );
+	}
+
+	buf = malloc(file_size);
+
+	printf("--- read + write ---\n");
+	run(my_readwrit, addr, file_fd, file_size);
+	printf("--- splice ---\n");
+	run(my_splice  , addr, file_fd, file_size);
+	printf("--- sendfile ---\n");
+	run(my_sendfile, addr, file_fd, file_size);
+	printf("--- io_uring ---\n");
+	run(my_iouring, addr, file_fd, file_size);
+	printf("--- io_uring + link ---\n");
+	run(my_ringlink, addr, file_fd, file_size);
+
+	close(pipefd[0]);
+	close(pipefd[1]);
+	close(file_fd);
+	return 0;
+}
+
+static void run(sender_t sender, struct addrinfo * addr, int infd, size_t size) {
+
+	int sock = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
+      if(sock < 0) {
+		fprintf( stderr, "socket error: (%d) %s\n\n", (int)errno, strerror(errno) );
+		exit( SOCKET_ERROR );
+      }
+
+      int ret = connect(sock, addr->ai_addr, addr->ai_addrlen);
+      if(ret < 0) {
+            fprintf( stderr, "connect error: (%d) %s\n\n", (int)errno, strerror(errno) );
+		exit( CONNECT_ERROR );
+      }
+
+	struct stats st;
+	st.calls = 0;
+	st.bytes = 0;
+	st.shorts.r.cnt = 0;
+	st.shorts.r.bytes = 0;
+	st.shorts.w.cnt = 0;
+	st.shorts.w.bytes = 0;
+
+	struct timespec after, before;
+
+	clock_gettime(CLOCK_MONOTONIC, &before);
+
+	for(long long int i = 0; i < TIMES; i++) {
+		sender( sock, infd, size, &st );
+	}
+
+	clock_gettime(CLOCK_MONOTONIC, &after);
+
+	close(sock);
+
+	uint64_t tb = ((int64_t)before.tv_sec * TIMEGRAN) + before.tv_nsec;
+	uint64_t ta = ((int64_t)after.tv_sec * TIMEGRAN) + after.tv_nsec;
+	double secs = ((double)ta - tb) / TIMEGRAN;
+
+	printf("Sent %'zu bytes in %'zu files, %f seconds\n", st.bytes, st.calls, secs);
+	printf(" - %'3.3f bytes per second\n", (((double)st.bytes) / secs));
+	printf(" - %'f seconds per file\n", secs / st.calls);
+	printf(" - %'3.3f bytes per calls\n", (((double)st.bytes) / st.calls));
+	if(st.shorts.r.cnt ){
+		printf(" - %'zu short reads\n", st.shorts.r.cnt);
+		printf(" - %'3.3f bytes per short read\n", (((double)st.shorts.r.bytes) / st.shorts.r.cnt));
+	} else printf("No short reads\n");
+	if(st.shorts.w.cnt ){
+		printf(" - %'zu short reads\n", st.shorts.w.cnt);
+		printf(" - %'3.3f bytes per short read\n", (((double)st.shorts.w.bytes) / st.shorts.w.cnt));
+	} else printf("No short writes\n");
+}
+
+static void my_sendfile(int out, int in, size_t size, struct stats * st) {
+	off_t off = 0;
+	for(;;) {
+
+		ssize_t ret = sendfile(out, in, &off, size);
+		if(ret < 0) {
+			fprintf( stderr, "connect error: (%d) %s\n\n", (int)errno, strerror(errno) );
+			exit( SENDFILE_ERROR );
+		}
+
+		st->calls++;
+		st->bytes += ret;
+		off += ret;
+		size -= ret;
+		if( size == 0 ) return;
+		st->shorts.r.cnt++;
+		st->shorts.r.bytes += ret;
+	}
+}
+
+static void my_splice  (int out, int in, size_t size, struct stats * st) {
+	unsigned flags = 0; //SPLICE_F_MOVE; // | SPLICE_F_MORE;
+	off_t offset = 0;
+	size_t writes = 0;
+	for(;;) {
+		ssize_t reti = 0;
+		reti = splice(in, &offset, pipefd[1], NULL, size, flags);
+		if( reti < 0 ) {
+			fprintf( stderr, "splice in error: (%d) %s\n\n", (int)errno, strerror(errno) );
+			exit( SPLICEIN_ERROR );
+		}
+
+		size -= reti;
+		size_t in_pipe = reti;
+		for(;;) {
+			ssize_t reto = 0;
+			reto = splice(pipefd[0], NULL, out, NULL, in_pipe, flags);
+			if( reto < 0 ) {
+				fprintf( stderr, "splice out error: (%d) %s\n\n", (int)errno, strerror(errno) );
+				exit( SPLICEOUT_ERROR );
+			}
+			in_pipe -= reto;
+			writes += reto;
+			if(0 == in_pipe) break;
+			st->shorts.w.cnt++;
+			st->shorts.w.bytes += reto;
+		}
+		if(0 == size) break;
+		st->shorts.r.cnt++;
+		st->shorts.r.bytes += reti;
+	}
+	st->calls++;
+	st->bytes += writes;
+}
+
+static ssize_t naive_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags) {
+	struct io_uring_sqe * sqe = io_uring_get_sqe(&ring);
+
+	io_uring_prep_splice(sqe, fd_in, NULL != off_in ? *off_in: -1, fd_out, NULL != off_out ? *off_out: -1, len, flags);
+
+	io_uring_submit(&ring);
+
+	struct io_uring_cqe * cqe = NULL;
+	/* wait for the sqe to complete */
+	int ret = io_uring_wait_cqe_nr(&ring, &cqe, 1);
+
+	/* read and process cqe event */
+	switch(ret) {
+	case 0:
+		{
+			ssize_t val = cqe->res;
+			if( cqe->res < 0 ) {
+				printf("Completion Error : %s\n", strerror( -cqe->res ));
+				return EXIT_FAILURE;
+			}
+			io_uring_cqe_seen(&ring, cqe);
+			return val;
+		}
+	default:
+		fprintf( stderr, "io_uring_wait error: (%d) %s\n\n", (int)-ret, strerror(-ret) );
+		exit( URINGWAIT_ERROR );
+	}
+}
+
+static void my_iouring (int out, int in, size_t size, struct stats * st) {
+	unsigned flags = 0; //SPLICE_F_MOVE; // | SPLICE_F_MORE;
+	off_t offset = 0;
+	size_t writes = 0;
+	for(;;) {
+		ssize_t reti = 0;
+		reti = naive_splice(in, &offset, pipefd[1], NULL, size, flags);
+		if( reti < 0 ) {
+			fprintf( stderr, "splice in error: (%d) %s\n\n", (int)errno, strerror(errno) );
+			exit( SPLICEIN_ERROR );
+		}
+
+		size -= reti;
+		size_t in_pipe = reti;
+		for(;;) {
+			ssize_t reto = 0;
+			reto = naive_splice(pipefd[0], NULL, out, NULL, in_pipe, flags);
+			if( reto < 0 ) {
+				fprintf( stderr, "splice out error: (%d) %s\n\n", (int)errno, strerror(errno) );
+				exit( SPLICEOUT_ERROR );
+			}
+			in_pipe -= reto;
+			writes += reto;
+			if(0 == in_pipe) break;
+			st->shorts.w.cnt++;
+			st->shorts.w.bytes += reto;
+		}
+		if(0 == size) break;
+		st->shorts.r.cnt++;
+		st->shorts.r.bytes += reti;
+	}
+	st->calls++;
+	st->bytes += writes;
+}
+
+static void my_ringlink(int out, int in, size_t size, struct stats * st) {
+	enum { SPLICE_IN, SPLICE_OUT };
+
+	size_t in_pipe = size;
+	off_t offset = 0;
+	bool has_in = false;
+	bool has_out = false;
+	while(true) {
+		if(!has_in && size > 0) {
+			struct io_uring_sqe * sqe = io_uring_get_sqe(&ring);
+			io_uring_prep_splice(sqe, in, offset, pipefd[1], -1, size, 0);
+			sqe->user_data = SPLICE_IN;
+			sqe->flags = IOSQE_IO_LINK;
+			has_in = true;
+		}
+		if(!has_out) {
+			struct io_uring_sqe * sqe = io_uring_get_sqe(&ring);
+			io_uring_prep_splice(sqe, pipefd[0], -1, out, -1, in_pipe, 0);
+			sqe->user_data = SPLICE_OUT;
+			has_out = true;
+		}
+
+		int ret = io_uring_submit_and_wait(&ring, 1);
+		if(ret < 0) {
+			fprintf( stderr, "io_uring_submit error: (%d) %s\n\n", (int)-ret, strerror(-ret) );
+			exit( URINGWAIT_ERROR );
+		}
+
+		/* poll the cq and count how much polling we did */
+		while(true) {
+			struct io_uring_cqe * cqe = NULL;
+			/* wait for the sqe to complete */
+			int ret = io_uring_wait_cqe_nr(&ring, &cqe, 0);
+
+			/* read and process cqe event */
+			switch(ret) {
+			case 0:
+				if( cqe->res < 0 ) {
+					printf("Completion Error : %s\n", strerror( -cqe->res ));
+					exit( URINGWAIT_ERROR );
+				}
+
+				ssize_t write = cqe->res;
+				int which = cqe->user_data;
+				io_uring_cqe_seen(&ring, cqe);
+				switch( which ) {
+				case SPLICE_IN:
+					has_in = false;
+					size -= write;
+					offset += write;
+					if(0 == size) break;
+					st->shorts.r.cnt++;
+					st->shorts.r.bytes += write;
+					break;
+				case SPLICE_OUT:
+					has_out = false;
+					in_pipe -= write;
+					st->bytes += write;
+					if(0 == in_pipe) break;
+					st->shorts.w.cnt++;
+					st->shorts.w.bytes += write;
+					break;
+				default:
+					printf("Completion Error : unknown user data\n");
+					exit( URINGWAIT_ERROR );
+				}
+				continue;
+			case -EAGAIN:
+				goto OUTER;
+			default:
+				fprintf( stderr, "io_uring_get_cqe error: (%d) %s\n\n", (int)-ret, strerror(-ret) );
+				exit( URINGWAIT_ERROR );
+			}
+		}
+		OUTER:
+		if(0 == in_pipe) break;
+	}
+	st->calls++;
+}
+
+static void my_readwrit(int out, int in, size_t size, struct stats * st) {
+	off_t offset = 0;
+	size_t writes = 0;
+	for(;;) {
+		ssize_t reti = pread(in, buf, size, offset);
+		if( reti < 0 ) {
+			printf("Read in Error : (%d) %s\n\n", (int)errno, strerror(errno) );
+			exit( 1 );
+		}
+
+		offset += reti;
+		size -= reti;
+
+		size_t in_buf = reti;
+		for(;;) {
+			ssize_t reto = write(out, buf, in_buf);
+			if( reto < 0 ) {
+					printf("Write out Error : (%d) %s\n\n", (int)errno, strerror(errno) );
+					exit( 1 );
+				}
+
+			in_buf -= reto;
+			writes += reto;
+			if(0 == in_buf) break;
+			st->shorts.w.cnt++;
+			st->shorts.w.bytes += reto;
+		}
+		if(0 == size) break;
+		st->shorts.r.cnt++;
+		st->shorts.r.bytes += reti;
+	}
+	st->calls++;
+	st->bytes += writes;
+}
Index: libcfa/src/containers/queueLockFree.hfa
===================================================================
--- libcfa/src/containers/queueLockFree.hfa	(revision db7a3ad393c27727cb72177db76e03d07067dbe1)
+++ libcfa/src/containers/queueLockFree.hfa	(revision aeb20a4d256d7e9fe5da6b3be21994f201fa9cbe)
@@ -2,4 +2,6 @@
 
 #include <assert.h>
+
+#include <bits/defs.hfa>
 
 forall( T &) {
