Index: benchmark/io/http/Makefile.am
===================================================================
--- benchmark/io/http/Makefile.am	(revision bace538f1b51717cd22831d1441e2537af67659f)
+++ benchmark/io/http/Makefile.am	(revision 402658b1b3d2a6667152f3fefbf639a05dd3234a)
@@ -29,4 +29,6 @@
 EXTRA_PROGRAMS = httpforall .dummy_hack
 
+CLEANFILES = httpforall
+
 nodist_httpforall_SOURCES = \
 	filecache.cfa \
Index: benchmark/io/http/main.cfa
===================================================================
--- benchmark/io/http/main.cfa	(revision bace538f1b51717cd22831d1441e2537af67659f)
+++ benchmark/io/http/main.cfa	(revision 402658b1b3d2a6667152f3fefbf639a05dd3234a)
@@ -46,4 +46,7 @@
 }
 
+extern void init_protocol(void);
+extern void deinit_protocol(void);
+
 //=============================================================================================
 // Main
@@ -61,5 +64,5 @@
 	//===================
 	// Open Socket
-	printf("Listening on port %d\n", options.socket.port);
+	printf("%ld : Listening on port %d\n", getpid(), options.socket.port);
 	int server_fd = socket(AF_INET, SOCK_STREAM, 0);
 	if(server_fd < 0) {
@@ -79,5 +82,5 @@
 		ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) );
 		if(ret < 0) {
-			if(errno == 98) {
+			if(errno == EADDRINUSE) {
 				if(waited == 0) {
 					printf("Waiting for port\n");
@@ -109,4 +112,5 @@
 		options.clopts.instance = &cl;
 
+
 		int pipe_cnt = options.clopts.nworkers * 2;
 		int pipe_off;
@@ -124,4 +128,6 @@
 		{
 			ServerProc procs[options.clopts.nprocs];
+
+			init_protocol();
 			{
 				Worker workers[options.clopts.nworkers];
@@ -151,6 +157,26 @@
 					printf("Shutting Down\n");
 				}
+
+				for(i; options.clopts.nworkers) {
+					printf("Cancelling %p\n", (void*)workers[i].cancel.target);
+					workers[i].done = true;
+					cancel(workers[i].cancel);
+				}
+
+				printf("Shutting down socket\n");
+				int ret = shutdown( server_fd, SHUT_RD );
+				if( ret < 0 ) { abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); }
+
+				//===================
+				// Close Socket
+				printf("Closing Socket\n");
+				ret = close( server_fd );
+				if(ret < 0) {
+					abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
+				}
 			}
 			printf("Workers Closed\n");
+
+			deinit_protocol();
 		}
 
@@ -162,12 +188,5 @@
 		}
 		free(fds);
-	}
 
-	//===================
-	// Close Socket
-	printf("Closing Socket\n");
-	ret = close( server_fd );
-	if(ret < 0) {
-		abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
 	}
 
Index: benchmark/io/http/options.cfa
===================================================================
--- benchmark/io/http/options.cfa	(revision bace538f1b51717cd22831d1441e2537af67659f)
+++ benchmark/io/http/options.cfa	(revision 402658b1b3d2a6667152f3fefbf639a05dd3234a)
@@ -12,5 +12,9 @@
 #include <parseargs.hfa>
 
+#include <string.h>
+
 Options options @= {
+	false, // log
+
 	{ // file_cache
 		0,     // open_flags;
@@ -48,4 +52,5 @@
 		{'p', "port",           "Port the server will listen on", options.socket.port},
 		{'c', "cpus",           "Number of processors to use", options.clopts.nprocs},
+		{'L', "log",            "Enable logs", options.log, parse_settrue},
 		{'t', "threads",        "Number of worker threads to use", options.clopts.nworkers},
 		{'b', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog},
Index: benchmark/io/http/options.hfa
===================================================================
--- benchmark/io/http/options.hfa	(revision bace538f1b51717cd22831d1441e2537af67659f)
+++ benchmark/io/http/options.hfa	(revision 402658b1b3d2a6667152f3fefbf639a05dd3234a)
@@ -8,4 +8,6 @@
 
 struct Options {
+	bool log;
+
 	struct {
 		int open_flags;
Index: benchmark/io/http/protocol.cfa
===================================================================
--- benchmark/io/http/protocol.cfa	(revision bace538f1b51717cd22831d1441e2537af67659f)
+++ benchmark/io/http/protocol.cfa	(revision 402658b1b3d2a6667152f3fefbf639a05dd3234a)
@@ -18,10 +18,12 @@
 #include "options.hfa"
 
+const char * volatile date = 0p;
+
 const char * http_msgs[] = {
-	"HTTP/1.1 200 OK\nContent-Type: text/plain\nContent-Length: %zu\n\n",
-	"HTTP/1.1 400 Bad Request\nContent-Type: text/plain\nContent-Length: 0\n\n",
-	"HTTP/1.1 404 Not Found\nContent-Type: text/plain\nContent-Length: 0\n\n",
-	"HTTP/1.1 413 Payload Too Large\nContent-Type: text/plain\nContent-Length: 0\n\n",
-	"HTTP/1.1 414 URI Too Long\nContent-Type: text/plain\nContent-Length: 0\n\n",
+	"HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: %zu \n\n",
+	"HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
+	"HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
+	"HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
+	"HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
 };
 
@@ -45,5 +47,6 @@
 	while(len > 0) {
 		// Call write
-		int ret = write(fd, it, len);
+		int ret = cfa_write(fd, it, len, 0, -1`s, 0p, 0p);
+		// int ret = write(fd, it, len);
 		if( ret < 0 ) { if( errno != EAGAIN && errno != EWOULDBLOCK) abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) ); }
 
@@ -63,11 +66,22 @@
 int answer_header( int fd, size_t size ) {
 	const char * fmt = http_msgs[OK200];
-	int len = 100;
+	int len = 200;
 	char buffer[len];
-	len = snprintf(buffer, len, fmt, size);
+	len = snprintf(buffer, len, fmt, date, size);
 	return answer( fd, buffer, len );
 }
 
-[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) {
+int answer_plain( int fd, char buffer[], size_t size ) {
+	int ret = answer_header(fd, size);
+	if( ret < 0 ) return ret;
+	return answer(fd, buffer, size);
+}
+
+int answer_empty( int fd ) {
+	return answer_header(fd, 0);
+}
+
+
+[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation * cancel) {
 	char * it = buffer;
 	size_t count = len - 1;
@@ -75,8 +89,10 @@
 	READ:
 	for() {
-		int ret = cfa_read(fd, (void*)it, count, 0, -1`s, 0p, 0p);
+		int ret = cfa_read(fd, (void*)it, count, 0, -1`s, cancel, 0p);
+		// int ret = read(fd, (void*)it, count);
 		if(ret == 0 ) return [OK200, true, 0, 0];
 		if(ret < 0 ) {
 			if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ;
+			// if( errno == EINVAL ) return [E400, true, 0, 0];
 			abort( "read error: (%d) %s\n", (int)errno, strerror(errno) );
 		}
@@ -92,5 +108,5 @@
 	}
 
-	printf("%.*s\n", rlen, buffer);
+	if( options.log ) printf("%.*s\n", rlen, buffer);
 
 	it = buffer;
@@ -104,8 +120,10 @@
 
 void sendfile( int pipe[2], int fd, int ans_fd, size_t count ) {
+	unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE;
 	off_t offset = 0;
 	ssize_t ret;
 	SPLICE1: while(count > 0) {
-		ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, SPLICE_F_MOVE | SPLICE_F_MORE, 0, -1`s, 0p, 0p);
+		ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, 0, -1`s, 0p, 0p);
+		// ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags);
 		if( ret < 0 ) {
 			if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1;
@@ -117,5 +135,6 @@
 		size_t in_pipe = ret;
 		SPLICE2: while(in_pipe > 0) {
-			ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, SPLICE_F_MOVE | SPLICE_F_MORE, 0, -1`s, 0p, 0p);
+			ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, 0, -1`s, 0p, 0p);
+			// ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags);
 			if( ret < 0 ) {
 				if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2;
@@ -127,2 +146,56 @@
 	}
 }
+
+//=============================================================================================
+
+#include <clock.hfa>
+#include <time.hfa>
+#include <thread.hfa>
+
+struct date_buffer {
+	char buff[100];
+};
+
+thread DateFormater {
+	int idx;
+	date_buffer buffers[2];
+};
+
+void ?{}( DateFormater & this ) {
+	((thread&)this){ "Server Date Thread", *options.clopts.instance };
+	this.idx = 0;
+	memset( this.buffers[0].buff, 0, sizeof(this.buffers[0]) );
+	memset( this.buffers[1].buff, 0, sizeof(this.buffers[1]) );
+}
+
+void main(DateFormater & this) {
+	LOOP: for() {
+		waitfor( ^?{} : this) {
+			break LOOP;
+		}
+		or else {}
+
+		Time now = getTimeNsec();
+
+		strftime( this.buffers[this.idx].buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now );
+
+		char * next = this.buffers[this.idx].buff;
+		__atomic_exchange_n((char * volatile *)&date, next, __ATOMIC_SEQ_CST);
+		this.idx = (this.idx + 1) % 2;
+
+		sleep(1`s);
+	}
+}
+
+//=============================================================================================
+DateFormater * the_date_formatter;
+
+void init_protocol(void) {
+	the_date_formatter = alloc();
+	(*the_date_formatter){};
+}
+
+void deinit_protocol(void) {
+	^(*the_date_formatter){};
+	free( the_date_formatter );
+}
Index: benchmark/io/http/protocol.hfa
===================================================================
--- benchmark/io/http/protocol.hfa	(revision bace538f1b51717cd22831d1441e2537af67659f)
+++ benchmark/io/http/protocol.hfa	(revision 402658b1b3d2a6667152f3fefbf639a05dd3234a)
@@ -1,3 +1,5 @@
 #pragma once
+
+struct io_cancellation;
 
 enum HttpCode {
@@ -14,6 +16,8 @@
 int answer_error( int fd, HttpCode code );
 int answer_header( int fd, size_t size );
+int answer_plain( int fd, char buffer [], size_t size );
+int answer_empty( int fd );
 
-[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len);
+[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation *);
 
 void sendfile( int pipe[2], int fd, int ans_fd, size_t count );
Index: benchmark/io/http/worker.cfa
===================================================================
--- benchmark/io/http/worker.cfa	(revision bace538f1b51717cd22831d1441e2537af67659f)
+++ benchmark/io/http/worker.cfa	(revision 402658b1b3d2a6667152f3fefbf639a05dd3234a)
@@ -19,4 +19,9 @@
 	this.pipe[0] = -1;
 	this.pipe[1] = -1;
+	this.done = false;
+}
+
+extern "C" {
+extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
 }
 
@@ -28,11 +33,14 @@
 	CONNECTION:
 	for() {
-		int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, 0p, 0p );
+		if( options.log ) printf("=== Accepting connection ===\n");
+		int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, &this.cancel, 0p );
+		// int fd = accept4( this.[sockfd, addr, addrlen, flags] );
 		if(fd < 0) {
 			if( errno == ECONNABORTED ) break;
+			if( errno == EINVAL && this.done ) break;
 			abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
 		}
 
-		printf("New connection %d, waiting for requests\n", fd);
+		if( options.log ) printf("=== New connection %d, waiting for requests ===\n", fd);
 		REQUEST:
 		for() {
@@ -45,10 +53,11 @@
 			size_t len = options.socket.buflen;
 			char buffer[len];
-			printf("Reading request\n");
-			[code, closed, file, name_size] = http_read(fd, buffer, len);
+			if( options.log ) printf("=== Reading request ===\n");
+			[code, closed, file, name_size] = http_read(fd, buffer, len, &this.cancel);
 
 			// if we are done, break out of the loop
 			if( closed ) {
-				printf("Connection closed\n");
+				if( options.log ) printf("=== Connection closed ===\n");
+				close(fd);
 				continue CONNECTION;
 			}
@@ -56,10 +65,32 @@
 			// If this wasn't a request retrun 400
 			if( code != OK200 ) {
-				printf("Invalid Request : %d\n", code_val(code));
+				printf("=== Invalid Request : %d ===\n", code_val(code));
 				answer_error(fd, code);
 				continue REQUEST;
 			}
 
-			printf("Request for file %.*s\n", (int)name_size, file);
+			if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
+				if( options.log ) printf("=== Request for /plaintext ===\n");
+
+				char text[] = "Hello, World!\n";
+
+				// Send the header
+				answer_plain(fd, text, sizeof(text));
+
+				if( options.log ) printf("=== Answer sent ===\n");
+				continue REQUEST;
+			}
+
+			if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
+				if( options.log ) printf("=== Request for /ping ===\n");
+
+				// Send the header
+				answer_empty(fd);
+
+				if( options.log ) printf("=== Answer sent ===\n");
+				continue REQUEST;
+			}
+
+			if( options.log ) printf("=== Request for file %.*s ===\n", (int)name_size, file);
 
 			// Get the fd from the file cache
@@ -70,5 +101,5 @@
 			// If we can't find the file, return 404
 			if( ans_fd < 0 ) {
-				printf("File Not Found\n");
+				printf("=== File Not Found ===\n");
 				answer_error(fd, E404);
 				continue REQUEST;
@@ -81,5 +112,5 @@
 			sendfile( this.pipe, fd, ans_fd, count);
 
-			printf("File sent\n");
+			if( options.log ) printf("=== Answer sent ===\n");
 		}
 	}
Index: benchmark/io/http/worker.hfa
===================================================================
--- benchmark/io/http/worker.hfa	(revision bace538f1b51717cd22831d1441e2537af67659f)
+++ benchmark/io/http/worker.hfa	(revision 402658b1b3d2a6667152f3fefbf639a05dd3234a)
@@ -17,4 +17,6 @@
 	socklen_t * addrlen;
 	int flags;
+	io_cancellation cancel;
+	volatile bool done;
 };
 void ?{}( Worker & this);
Index: benchmark/io/readv.cfa
===================================================================
--- benchmark/io/readv.cfa	(revision bace538f1b51717cd22831d1441e2537af67659f)
+++ benchmark/io/readv.cfa	(revision 402658b1b3d2a6667152f3fefbf639a05dd3234a)
@@ -96,10 +96,10 @@
 
 	char **left;
-	parse_args( opt, opt_cnt, "[OPTIONS]...\ncforall yield benchmark", left );
+	parse_args( opt, opt_cnt, "[OPTIONS]...\ncforall readv benchmark", left );
 
 	if(kpollcp || odirect) {
 		if( (buflen % 512) != 0 ) {
 			fprintf(stderr, "Buffer length must be a multiple of 512 when using O_DIRECT, was %lu\n\n", buflen);
-			print_args_usage(opt, opt_cnt, "[OPTIONS]...\ncforall yield benchmark", true);
+			print_args_usage(opt, opt_cnt, "[OPTIONS]...\ncforall readv benchmark", true);
 		}
 	}
Index: example/io/simple/server_epoll.c
===================================================================
--- example/io/simple/server_epoll.c	(revision bace538f1b51717cd22831d1441e2537af67659f)
+++ example/io/simple/server_epoll.c	(revision 402658b1b3d2a6667152f3fefbf639a05dd3234a)
@@ -88,5 +88,5 @@
       }
 
-      ev.events = EPOLLIN | EPOLLONESHOT;
+      ev.events = EPOLLOUT | EPOLLIN | EPOLLONESHOT;
       ev.data.u64 = (uint64_t)&ring;
       if (epoll_ctl(epollfd, EPOLL_CTL_ADD, ring.ring_fd, &ev) == -1) {
@@ -99,5 +99,5 @@
 
 	while(1) {
-            BLOCK:
+            BLOCK:;
             int nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
             if (nfds == -1) {
Index: libcfa/src/concurrency/io.cfa
===================================================================
--- libcfa/src/concurrency/io.cfa	(revision bace538f1b51717cd22831d1441e2537af67659f)
+++ libcfa/src/concurrency/io.cfa	(revision 402658b1b3d2a6667152f3fefbf639a05dd3234a)
@@ -31,5 +31,4 @@
 
 	extern "C" {
-		#include <sys/epoll.h>
 		#include <sys/syscall.h>
 
@@ -41,4 +40,42 @@
 	#include "kernel/fwd.hfa"
 	#include "io/types.hfa"
+
+	static const char * opcodes[] = {
+		"OP_NOP",
+		"OP_READV",
+		"OP_WRITEV",
+		"OP_FSYNC",
+		"OP_READ_FIXED",
+		"OP_WRITE_FIXED",
+		"OP_POLL_ADD",
+		"OP_POLL_REMOVE",
+		"OP_SYNC_FILE_RANGE",
+		"OP_SENDMSG",
+		"OP_RECVMSG",
+		"OP_TIMEOUT",
+		"OP_TIMEOUT_REMOVE",
+		"OP_ACCEPT",
+		"OP_ASYNC_CANCEL",
+		"OP_LINK_TIMEOUT",
+		"OP_CONNECT",
+		"OP_FALLOCATE",
+		"OP_OPENAT",
+		"OP_CLOSE",
+		"OP_FILES_UPDATE",
+		"OP_STATX",
+		"OP_READ",
+		"OP_WRITE",
+		"OP_FADVISE",
+		"OP_MADVISE",
+		"OP_SEND",
+		"OP_RECV",
+		"OP_OPENAT2",
+		"OP_EPOLL_CTL",
+		"OP_SPLICE",
+		"OP_PROVIDE_BUFFERS",
+		"OP_REMOVE_BUFFERS",
+		"OP_TEE",
+		"INVALID_OP"
+	};
 
 	// returns true of acquired as leader or second leader
@@ -134,4 +171,5 @@
 		int ret = 0;
 		if( need_sys_to_submit || need_sys_to_complete ) {
+			__cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags);
 			ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8);
 			if( ret < 0 ) {
@@ -157,6 +195,9 @@
 	static unsigned __collect_submitions( struct __io_data & ring );
 	static __u32 __release_consumed_submission( struct __io_data & ring );
-
-	static inline void process(struct io_uring_cqe & cqe ) {
+	static inline void __clean( volatile struct io_uring_sqe * sqe );
+
+	// Process a single completion message from the io_uring
+	// This is NOT thread-safe
+	static inline void process( volatile struct io_uring_cqe & cqe ) {
 		struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
 		__cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
@@ -165,6 +206,4 @@
 	}
 
-	// Process a single completion message from the io_uring
-	// This is NOT thread-safe
 	static [int, bool] __drain_io( & struct __io_data ring ) {
 		/* paranoid */ verify( ! __preemption_enabled() );
@@ -192,4 +231,6 @@
 		}
 
+		__atomic_thread_fence( __ATOMIC_SEQ_CST );
+
 		// Release the consumed SQEs
 		__release_consumed_submission( ring );
@@ -209,5 +250,5 @@
 		for(i; count) {
 			unsigned idx = (head + i) & mask;
-			struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
+			volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
 
 			/* paranoid */ verify(&cqe);
@@ -218,6 +259,5 @@
 		// Mark to the kernel that the cqe has been seen
 		// Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
-		__atomic_thread_fence( __ATOMIC_SEQ_CST );
-		__atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
+		__atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST );
 
 		return [count, count > 0 || to_submit > 0];
@@ -225,11 +265,12 @@
 
 	void main( $io_ctx_thread & this ) {
-		epoll_event ev;
-		__ioctx_register( this, ev );
-
-		__cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring);
-
-		int reset = 0;
+		__ioctx_register( this );
+
+		__cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this);
+
+		const int reset_cnt = 5;
+		int reset = reset_cnt;
 		// Then loop until we need to start
+		LOOP:
 		while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
 			// Drain the io
@@ -239,5 +280,5 @@
 				[count, again] = __drain_io( *this.ring );
 
-				if(!again) reset++;
+				if(!again) reset--;
 
 				// Update statistics
@@ -249,22 +290,32 @@
 
 			// If we got something, just yield and check again
-			if(reset < 5) {
+			if(reset > 1) {
 				yield();
-			}
-			// We didn't get anything baton pass to the slow poller
-			else {
+				continue LOOP;
+			}
+
+			// We alread failed to find completed entries a few time.
+			if(reset == 1) {
+				// Rearm the context so it can block
+				// but don't block right away
+				// we need to retry one last time in case
+				// something completed *just now*
+				__ioctx_prepare_block( this );
+				continue LOOP;
+			}
+
 				__STATS__( false,
 					io.complete_q.blocks += 1;
 				)
-				__cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self);
-				reset = 0;
+				__cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this);
 
 				// block this thread
-				__ioctx_prepare_block( this, ev );
 				wait( this.sem );
-			}
-		}
-
-		__cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
+
+			// restore counter
+			reset = reset_cnt;
+		}
+
+		__cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this);
 	}
 
@@ -289,5 +340,10 @@
 //
 
-	[* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) {
+	// Allocate an submit queue entry.
+	// The kernel cannot see these entries until they are submitted, but other threads must be
+	// able to see which entries can be used and which are already un used by an other thread
+	// for convenience, return both the index and the pointer to the sqe
+	// sqe == &sqes[idx]
+	[* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) {
 		/* paranoid */ verify( data != 0 );
 
@@ -304,9 +360,10 @@
 			// Look through the list starting at some offset
 			for(i; cnt) {
-				__u64 expected = 0;
-				__u32 idx = (i + off) & mask;
-				struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
+				__u64 expected = 3;
+				__u32 idx = (i + off) & mask; // Get an index from a random
+				volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
 				volatile __u64 * udata = &sqe->user_data;
 
+				// Allocate the entry by CASing the user_data field from 0 to the future address
 				if( *udata == expected &&
 					__atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
@@ -319,4 +376,6 @@
 					)
 
+					// debug log
+					__cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data );
 
 					// Success return the data
@@ -325,8 +384,12 @@
 				verify(expected != data);
 
+				// This one was used
 				len ++;
 			}
 
 			block++;
+
+			abort( "Kernel I/O : all submit queue entries used, yielding\n" );
+
 			yield();
 		}
@@ -377,4 +440,41 @@
 	void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) {
 		__io_data & ring = *ctx->thrd.ring;
+
+		{
+			__attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
+			__cfadbg_print_safe( io,
+				"Kernel I/O : submitting %u (%p) for %p\n"
+				"    data: %p\n"
+				"    opcode: %s\n"
+				"    fd: %d\n"
+				"    flags: %d\n"
+				"    prio: %d\n"
+				"    off: %p\n"
+				"    addr: %p\n"
+				"    len: %d\n"
+				"    other flags: %d\n"
+				"    splice fd: %d\n"
+				"    pad[0]: %llu\n"
+				"    pad[1]: %llu\n"
+				"    pad[2]: %llu\n",
+				idx, sqe,
+				active_thread(),
+				(void*)sqe->user_data,
+				opcodes[sqe->opcode],
+				sqe->fd,
+				sqe->flags,
+				sqe->ioprio,
+				sqe->off,
+				sqe->addr,
+				sqe->len,
+				sqe->accept_flags,
+				sqe->splice_fd_in,
+				sqe->__pad2[0],
+				sqe->__pad2[1],
+				sqe->__pad2[2]
+			);
+		}
+
+
 		// Get now the data we definetely need
 		volatile __u32 * const tail = ring.submit_q.tail;
@@ -443,5 +543,7 @@
 				unlock(ring.submit_q.submit_lock);
 			#endif
-			if( ret < 0 ) return;
+			if( ret < 0 ) {
+				return;
+			}
 
 			// Release the consumed SQEs
@@ -454,6 +556,9 @@
 				io.submit_q.submit_avg.cnt += 1;
 			)
-		}
-		else {
+
+			__cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() );
+		}
+		else
+		{
 			// get mutual exclusion
 			#if defined(LEADER_LOCK)
@@ -463,5 +568,5 @@
 			#endif
 
-			/* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
+			/* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64,
 			/* paranoid */ 	"index %u already reclaimed\n"
 			/* paranoid */ 	"head %u, prev %u, tail %u\n"
@@ -490,4 +595,6 @@
 			}
 
+			/* paranoid */ verify(ret == 1);
+
 			// update statistics
 			__STATS__( false,
@@ -496,6 +603,45 @@
 			)
 
+			{
+				__attribute__((unused)) volatile __u32 * const head = ring.submit_q.head;
+				__attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ];
+				__attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx];
+
+				__cfadbg_print_safe( io,
+					"Kernel I/O : last submitted is %u (%p)\n"
+					"    data: %p\n"
+					"    opcode: %s\n"
+					"    fd: %d\n"
+					"    flags: %d\n"
+					"    prio: %d\n"
+					"    off: %p\n"
+					"    addr: %p\n"
+					"    len: %d\n"
+					"    other flags: %d\n"
+					"    splice fd: %d\n"
+					"    pad[0]: %llu\n"
+					"    pad[1]: %llu\n"
+					"    pad[2]: %llu\n",
+					last_idx, sqe,
+					(void*)sqe->user_data,
+					opcodes[sqe->opcode],
+					sqe->fd,
+					sqe->flags,
+					sqe->ioprio,
+					sqe->off,
+					sqe->addr,
+					sqe->len,
+					sqe->accept_flags,
+					sqe->splice_fd_in,
+					sqe->__pad2[0],
+					sqe->__pad2[1],
+					sqe->__pad2[2]
+				);
+			}
+
+			__atomic_thread_fence( __ATOMIC_SEQ_CST );
 			// Release the consumed SQEs
 			__release_consumed_submission( ring );
+			// ring.submit_q.sqes[idx].user_data = 3ul64;
 
 			#if defined(LEADER_LOCK)
@@ -505,9 +651,12 @@
 			#endif
 
-			__cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
+			__cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() );
 		}
 	}
 
 	// #define PARTIAL_SUBMIT 32
+
+	// go through the list of submissions in the ready array and moved them into
+	// the ring's submit queue
 	static unsigned __collect_submitions( struct __io_data & ring ) {
 		/* paranoid */ verify( ring.submit_q.ready != 0p );
@@ -550,19 +699,59 @@
 	}
 
+	// Go through the ring's submit queue and release everything that has already been consumed
+	// by io_uring
 	static __u32 __release_consumed_submission( struct __io_data & ring ) {
 		const __u32 smask = *ring.submit_q.mask;
 
+		// We need to get the lock to copy the old head and new head
 		if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
-		__u32 chead = *ring.submit_q.head;
-		__u32 phead = ring.submit_q.prev_head;
-		ring.submit_q.prev_head = chead;
+		__attribute__((unused))
+		__u32 ctail = *ring.submit_q.tail;        // get the current tail of the queue
+		__u32 chead = *ring.submit_q.head;		// get the current head of the queue
+		__u32 phead = ring.submit_q.prev_head;	// get the head the last time we were here
+		ring.submit_q.prev_head = chead;		// note up to were we processed
 		unlock(ring.submit_q.release_lock);
 
+		// the 3 fields are organized like this diagram
+		// except it's are ring
+		// ---+--------+--------+----
+		// ---+--------+--------+----
+		//    ^        ^        ^
+		// phead    chead    ctail
+
+		// make sure ctail doesn't wrap around and reach phead
+		/* paranoid */ verify(
+			   (ctail >= chead && chead >= phead)
+			|| (chead >= phead && phead >= ctail)
+			|| (phead >= ctail && ctail >= chead)
+		);
+
+		// find the range we need to clear
 		__u32 count = chead - phead;
+
+		// We acquired an previous-head/current-head range
+		// go through the range and release the sqes
 		for( i; count ) {
 			__u32 idx = ring.submit_q.array[ (phead + i) & smask ];
-			ring.submit_q.sqes[ idx ].user_data = 0;
+
+			/* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data );
+			__clean( &ring.submit_q.sqes[ idx ] );
 		}
 		return count;
 	}
+
+	void __sqe_clean( volatile struct io_uring_sqe * sqe ) {
+		__clean( sqe );
+	}
+
+	static inline void __clean( volatile struct io_uring_sqe * sqe ) {
+		// If we are in debug mode, thrash the fields to make sure we catch reclamation errors
+		__cfaabi_dbg_debug_do(
+			memset(sqe, 0xde, sizeof(*sqe));
+			sqe->opcode = IORING_OP_LAST;
+		);
+
+		// Mark the entry as unused
+		__atomic_store_n(&sqe->user_data, 3ul64, __ATOMIC_SEQ_CST);
+	}
 #endif
Index: libcfa/src/concurrency/io/call.cfa.in
===================================================================
--- libcfa/src/concurrency/io/call.cfa.in	(revision bace538f1b51717cd22831d1441e2537af67659f)
+++ libcfa/src/concurrency/io/call.cfa.in	(revision 402658b1b3d2a6667152f3fefbf639a05dd3234a)
@@ -74,5 +74,5 @@
 	;
 
-	extern [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data );
+	extern [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data );
 	extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1)));
 
@@ -222,9 +222,19 @@
 		__u32 idx;
 		struct io_uring_sqe * sqe;
-		[sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
-
-		sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
+		[(volatile struct io_uring_sqe *) sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
+
 		sqe->opcode = IORING_OP_{op};
-		sqe->flags = sflags;{body}
+		sqe->flags = sflags;
+		sqe->ioprio = 0;
+		sqe->fd = 0;
+		sqe->off = 0;
+		sqe->addr = 0;
+		sqe->len = 0;
+		sqe->accept_flags = 0;
+		sqe->__pad2[0] = 0;
+		sqe->__pad2[1] = 0;
+		sqe->__pad2[2] = 0;{body}
+
+		asm volatile("": : :"memory");
 
 		verify( sqe->user_data == (__u64)(uintptr_t)&future );
@@ -312,8 +322,8 @@
 	}),
 	# CFA_HAVE_IORING_OP_ACCEPT
-	Call('ACCEPT4', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', {
+	Call('ACCEPT', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', {
 		'fd': 'sockfd',
-		'addr': 'addr',
-		'addr2': 'addrlen',
+		'addr': '(__u64)addr',
+		'addr2': '(__u64)addrlen',
 		'accept_flags': 'flags'
 	}),
@@ -464,4 +474,37 @@
 
 print("""
+//-----------------------------------------------------------------------------
+bool cancel(io_cancellation & this) {
+	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL)
+		return false;
+	#else
+		io_future_t future;
+
+		io_context * context = __get_io_context();
+
+		__u8 sflags = 0;
+		struct __io_data & ring = *context->thrd.ring;
+
+		__u32 idx;
+		volatile struct io_uring_sqe * sqe;
+		[sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
+
+		sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
+		sqe->opcode = IORING_OP_ASYNC_CANCEL;
+		sqe->flags = sflags;
+		sqe->addr = this.target;
+
+		verify( sqe->user_data == (__u64)(uintptr_t)&future );
+		__submit( context, idx );
+
+		wait(future);
+
+		if( future.result == 0 ) return true; // Entry found
+		if( future.result == -EALREADY) return true; // Entry found but in progress
+		if( future.result == -ENOENT ) return false; // Entry not found
+		return false;
+	#endif
+}
+
 //-----------------------------------------------------------------------------
 // Check if a function is has asynchronous
Index: libcfa/src/concurrency/io/setup.cfa
===================================================================
--- libcfa/src/concurrency/io/setup.cfa	(revision bace538f1b51717cd22831d1441e2537af67659f)
+++ libcfa/src/concurrency/io/setup.cfa	(revision 402658b1b3d2a6667152f3fefbf639a05dd3234a)
@@ -52,4 +52,5 @@
 		#include <pthread.h>
 		#include <sys/epoll.h>
+		#include <sys/eventfd.h>
 		#include <sys/mman.h>
 		#include <sys/syscall.h>
@@ -169,6 +170,10 @@
 		// Main loop
 		while( iopoll.run ) {
+			__cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");
+
 			// Wait for events
 			int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
+
+			__cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);
 
 			// Check if an error occured
@@ -181,8 +186,12 @@
 				$io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64;
 				/* paranoid */ verify( io_ctx );
-				__cfadbg_print_safe(io_core, "Kernel I/O : Unparking io poller %p\n", io_ctx);
+				__cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->ring->fd, io_ctx);
 				#if !defined( __CFA_NO_STATISTICS__ )
 					__cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;
 				#endif
+
+				eventfd_t v;
+				eventfd_read(io_ctx->ring->efd, &v);
+
 				post( io_ctx->sem );
 			}
@@ -233,4 +242,9 @@
 		$thread & thrd = this.thrd.self;
 		if( cluster_context ) {
+			// We are about to do weird things with the threads
+			// we don't need interrupts to complicate everything
+			disable_interrupts();
+
+			// Get cluster info
 			cluster & cltr = *thrd.curr_cluster;
 			/* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster );
@@ -239,13 +253,15 @@
 			// We need to adjust the clean-up based on where the thread is
 			if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
+				// This is the tricky case
+				// The thread was preempted or ready to run and now it is on the ready queue
+				// but the cluster is shutting down, so there aren't any processors to run the ready queue
+				// the solution is to steal the thread from the ready-queue and pretend it was blocked all along
 
 				ready_schedule_lock();
-
-					// This is the tricky case
-					// The thread was preempted and now it is on the ready queue
+					// The thread should on the list
+					/* paranoid */ verify( thrd.link.next != 0p );
+
+					// Remove the thread from the ready queue of this cluster
 					// The thread should be the last on the list
-					/* paranoid */ verify( thrd.link.next != 0p );
-
-					// Remove the thread from the ready queue of this cluster
 					__attribute__((unused)) bool removed = remove_head( &cltr, &thrd );
 					/* paranoid */ verify( removed );
@@ -263,6 +279,6 @@
 			}
 			// !!! This is not an else if !!!
+			// Ok, now the thread is blocked (whether we cheated to get here or not)
 			if( thrd.state == Blocked ) {
-
 				// This is the "easy case"
 				// The thread is parked and can easily be moved to active cluster
@@ -274,9 +290,11 @@
 			}
 			else {
-
 				// The thread is in a weird state
 				// I don't know what to do here
 				abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n");
 			}
+
+			// The weird thread kidnapping stuff is over, restore interrupts.
+			enable_interrupts( __cfaabi_dbg_ctx );
 		} else {
 			post( this.thrd.sem );
@@ -365,4 +383,5 @@
 		}
 
+		// Step 3 : Initialize the data structure
 		// Get the pointers from the kernel to fill the structure
 		// submit queue
@@ -379,5 +398,6 @@
 			const __u32 num = *sq.num;
 			for( i; num ) {
-				sq.sqes[i].user_data = 0ul64;
+				sq.sqes[i].opcode = IORING_OP_LAST;
+				sq.sqes[i].user_data = 3ul64;
 			}
 		}
@@ -409,4 +429,25 @@
 		cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
 
+		// Step 4 : eventfd
+		int efd;
+		for() {
+			efd = eventfd(0, 0);
+			if (efd < 0) {
+				if (errno == EINTR) continue;
+				abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno));
+			}
+			break;
+		}
+
+		int ret;
+		for() {
+			ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1);
+			if (ret < 0) {
+				if (errno == EINTR) continue;
+				abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno));
+			}
+			break;
+		}
+
 		// some paranoid checks
 		/* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
@@ -423,4 +464,5 @@
 		this.ring_flags = params.flags;
 		this.fd         = fd;
+		this.efd        = efd;
 		this.eager_submits  = params_in.eager_submits;
 		this.poller_submits = params_in.poller_submits;
@@ -445,4 +487,5 @@
 		// close the file descriptor
 		close(this.fd);
+		close(this.efd);
 
 		free( this.submit_q.ready ); // Maybe null, doesn't matter
@@ -452,19 +495,23 @@
 // I/O Context Sleep
 //=============================================================================================
-
-	void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev) {
-		ev.events = EPOLLIN | EPOLLONESHOT;
+	#define IOEVENTS EPOLLIN | EPOLLONESHOT
+
+	static inline void __ioctx_epoll_ctl($io_ctx_thread & ctx, int op, const char * error) {
+		struct epoll_event ev;
+		ev.events = IOEVENTS;
 		ev.data.u64 = (__u64)&ctx;
-		int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, ctx.ring->fd, &ev);
+		int ret = epoll_ctl(iopoll.epollfd, op, ctx.ring->efd, &ev);
 		if (ret < 0) {
-			abort( "KERNEL ERROR: EPOLL ADD - (%d) %s\n", (int)errno, strerror(errno) );
-		}
-	}
-
-	void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev) {
-		int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, ctx.ring->fd, &ev);
-		if (ret < 0) {
-			abort( "KERNEL ERROR: EPOLL REARM - (%d) %s\n", (int)errno, strerror(errno) );
-		}
+			abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
+		}
+	}
+
+	void __ioctx_register($io_ctx_thread & ctx) {
+		__ioctx_epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
+	}
+
+	void __ioctx_prepare_block($io_ctx_thread & ctx) {
+		__cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.ring->fd, &ctx);
+		__ioctx_epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
 	}
 
Index: libcfa/src/concurrency/io/types.hfa
===================================================================
--- libcfa/src/concurrency/io/types.hfa	(revision bace538f1b51717cd22831d1441e2537af67659f)
+++ libcfa/src/concurrency/io/types.hfa	(revision 402658b1b3d2a6667152f3fefbf639a05dd3234a)
@@ -65,5 +65,5 @@
 
 		// A buffer of sqes (not the actual ring)
-		struct io_uring_sqe * sqes;
+		volatile struct io_uring_sqe * sqes;
 
 		// The location and size of the mmaped area
@@ -85,5 +85,5 @@
 
 		// the kernel ring
-		struct io_uring_cqe * cqes;
+		volatile struct io_uring_cqe * cqes;
 
 		// The location and size of the mmaped area
@@ -97,4 +97,5 @@
 		__u32 ring_flags;
 		int fd;
+		int efd;
 		bool eager_submits:1;
 		bool poller_submits:1;
@@ -130,8 +131,8 @@
 	#endif
 
-	struct epoll_event;
 	struct $io_ctx_thread;
-	void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev);
-	void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev);
+	void __ioctx_register($io_ctx_thread & ctx);
+	void __ioctx_prepare_block($io_ctx_thread & ctx);
+	void __sqe_clean( volatile struct io_uring_sqe * sqe );
 #endif
 
