Index: libcfa/src/concurrency/io.cfa
===================================================================
--- libcfa/src/concurrency/io.cfa	(revision 39fc03effedee9acf13ca846d4c85c1bff3790ff)
+++ libcfa/src/concurrency/io.cfa	(revision f00b26d473acc916ee96e28fa1365688d9b3c2da)
@@ -16,26 +16,24 @@
 #if defined(__CFA_DEBUG__)
 	// #define __CFA_DEBUG_PRINT_IO__
-	// #define __CFA_DEBUG_PRINT_IO_CORE__
+	#define __CFA_DEBUG_PRINT_IO_CORE__
 #endif
 
-#include "kernel.hfa"
+#include "kernel_private.hfa"
 #include "bitmanip.hfa"
 
 #if !defined(CFA_HAVE_LINUX_IO_URING_H)
-	void __kernel_io_startup( cluster &, unsigned, bool ) {
+	void __kernel_io_startup() {
 		// Nothing to do without io_uring
 	}
 
-	void __kernel_io_finish_start( cluster & ) {
+	void __kernel_io_shutdown() {
 		// Nothing to do without io_uring
 	}
 
-	void __kernel_io_prepare_stop( cluster & ) {
-		// Nothing to do without io_uring
-	}
-
-	void __kernel_io_shutdown( cluster &, bool ) {
-		// Nothing to do without io_uring
-	}
+	void ?{}(io_context & this, struct cluster & cl) {}
+	void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {}
+
+	void ^?{}(io_context & this) {}
+	void ^?{}(io_context & this, bool cluster_context) {}
 
 #else
@@ -45,7 +43,8 @@
 	#include <string.h>
 	#include <unistd.h>
-	#include <sys/mman.h>
 
 	extern "C" {
+		#include <sys/epoll.h>
+		#include <sys/mman.h>
 		#include <sys/syscall.h>
 
@@ -57,6 +56,12 @@
 	#include "thread.hfa"
 
-	uint32_t entries_per_cluster() {
-		return 256;
+	void ?{}(io_context_params & this) {
+		this.num_entries = 256;
+		this.num_ready = 256;
+		this.submit_aff = -1;
+		this.eager_submits = false;
+		this.poller_submits = false;
+		this.poll_submit = false;
+		this.poll_complete = false;
 	}
 
@@ -90,21 +95,4 @@
 	#endif
 
-	// Fast poller user-thread
-	// Not using the "thread" keyword because we want to control
-	// more carefully when to start/stop it
-	struct __io_poller_fast {
-		struct __io_data * ring;
-		$thread thrd;
-	};
-
-	void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
-		this.ring = cltr.io;
-		(this.thrd){ "Fast I/O Poller", cltr };
-	}
-	void ^?{}( __io_poller_fast & mutex this );
-	void main( __io_poller_fast & this );
-	static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
-	void ^?{}( __io_poller_fast & mutex this ) {}
-
 	struct __submition_data {
 		// Head and tail of the ring (associated with array)
@@ -166,37 +154,212 @@
 		struct __completion_data completion_q;
 		uint32_t ring_flags;
-		int cltr_flags;
 		int fd;
-		semaphore submit;
-		volatile bool done;
-		struct {
-			struct {
-				__processor_id_t id;
-				void * stack;
-				pthread_t kthrd;
-				volatile bool blocked;
-			} slow;
-			__io_poller_fast fast;
-			__bin_sem_t sem;
-		} poller;
+		bool eager_submits:1;
+		bool poller_submits:1;
 	};
 
 //=============================================================================================
-// I/O Startup / Shutdown logic
+// I/O Startup / Shutdown logic + Master Poller
 //=============================================================================================
-	void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
-		if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) {
-			abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n");
-		}
-
-		this.io = malloc();
-
+
+// IO Master poller loop forward
+static void * iopoll_loop( __attribute__((unused)) void * args );
+
+static struct {
+	pthread_t     thrd;    // pthread handle to io poller thread
+	void *        stack;   // pthread stack for io poller thread
+	int           epollfd; // file descriptor to the epoll instance
+	volatile bool run;     // Whether or not to continue
+} iopoll;
+
+void __kernel_io_startup() {
+	__cfaabi_dbg_print_safe( "Kernel : Creating EPOLL instance\n" );
+
+	iopoll.epollfd = epoll_create1(0);
+      if (iopoll.epollfd == -1) {
+            abort( "internal error, epoll_create1\n");
+      }
+
+	__cfaabi_dbg_print_safe( "Kernel : Starting io poller thread\n" );
+
+	iopoll.run = true;
+	iopoll.stack = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );
+}
+
+void __kernel_io_shutdown() {
+	// Notify the io poller thread of the shutdown
+	iopoll.run = false;
+	sigval val = { 1 };
+	pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
+
+	// Wait for the io poller thread to finish
+
+	pthread_join( iopoll.thrd, 0p );
+	free( iopoll.stack );
+
+	int ret = close(iopoll.epollfd);
+      if (ret == -1) {
+            abort( "internal error, close epoll\n");
+      }
+
+	// Io polling is now fully stopped
+
+	__cfaabi_dbg_print_safe( "Kernel : IO poller stopped\n" );
+}
+
+static void * iopoll_loop( __attribute__((unused)) void * args ) {
+	__processor_id_t id;
+	id.id = doregister(&id);
+	__cfaabi_dbg_print_safe( "Kernel : IO poller thread starting\n" );
+
+	// Block signals to control when they arrive
+	sigset_t mask;
+	sigfillset(&mask);
+	if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
+	    abort( "internal error, pthread_sigmask" );
+	}
+
+	sigdelset( &mask, SIGUSR1 );
+
+	// Create sufficient events
+	struct epoll_event events[10];
+	// Main loop
+	while( iopoll.run ) {
+		// Wait for events
+		int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
+
+		// Check if an error occured
+            if (nfds == -1) {
+			if( errno == EINTR ) continue;
+                  abort( "internal error, pthread_sigmask" );
+            }
+
+		for(i; nfds) {
+			$io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64;
+			/* paranoid */ verify( io_ctx );
+			__cfadbg_print_safe(io_core, "Kernel I/O : Unparking io poller %p\n", io_ctx);
+			#if !defined( __CFA_NO_STATISTICS__ )
+				kernelTLS.this_stats = io_ctx->self.curr_cluster->stats;
+			#endif
+			__post( io_ctx->sem, &id );
+		}
+	}
+
+	__cfaabi_dbg_print_safe( "Kernel : IO poller thread stopping\n" );
+	unregister(&id);
+	return 0p;
+}
+
+//=============================================================================================
+// I/O Context Constrution/Destruction
+//=============================================================================================
+
+	void ?{}($io_ctx_thread & this, struct cluster & cl) { (this.self){ "IO Poller", cl }; }
+	void main( $io_ctx_thread & this );
+	static inline $thread * get_thread( $io_ctx_thread & this ) { return &this.self; }
+	void ^?{}( $io_ctx_thread & mutex this ) {}
+
+	static void __io_create ( __io_data & this, const io_context_params & params_in );
+	static void __io_destroy( __io_data & this );
+
+	void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {
+		(this.thrd){ cl };
+		this.thrd.ring = malloc();
+		__cfadbg_print_safe(io_core, "Kernel I/O : Creating ring for io_context %p\n", &this);
+		__io_create( *this.thrd.ring, params );
+
+		__cfadbg_print_safe(io_core, "Kernel I/O : Starting poller thread for io_context %p\n", &this);
+		this.thrd.done = false;
+		__thrd_start( this.thrd, main );
+
+		__cfadbg_print_safe(io_core, "Kernel I/O : io_context %p ready\n", &this);
+	}
+
+	void ?{}(io_context & this, struct cluster & cl) {
+		io_context_params params;
+		(this){ cl, params };
+	}
+
+	void ^?{}(io_context & this, bool cluster_context) {
+		__cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %p\n", &this);
+
+		// Notify the thread of the shutdown
+		__atomic_store_n(&this.thrd.done, true, __ATOMIC_SEQ_CST);
+
+		// If this is an io_context within a cluster, things get trickier
+		$thread & thrd = this.thrd.self;
+		if( cluster_context ) {
+			cluster & cltr = *thrd.curr_cluster;
+			/* paranoid */ verify( cltr.nprocessors == 0 || &cltr == mainCluster );
+			/* paranoid */ verify( !ready_mutate_islocked() );
+
+			// We need to adjust the clean-up based on where the thread is
+			if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
+
+				ready_schedule_lock( (struct __processor_id_t *)active_processor() );
+
+					// This is the tricky case
+					// The thread was preempted and now it is on the ready queue
+					// The thread should be the last on the list
+					/* paranoid */ verify( thrd.link.next != 0p );
+
+					// Remove the thread from the ready queue of this cluster
+					__attribute__((unused)) bool removed = remove_head( &cltr, &thrd );
+					/* paranoid */ verify( removed );
+					thrd.link.next = 0p;
+					thrd.link.prev = 0p;
+					__cfaabi_dbg_debug_do( thrd.unpark_stale = true );
+
+					// Fixup the thread state
+					thrd.state = Blocked;
+					thrd.ticket = 0;
+					thrd.preempted = __NO_PREEMPTION;
+
+				ready_schedule_unlock( (struct __processor_id_t *)active_processor() );
+
+				// Pretend like the thread was blocked all along
+			}
+			// !!! This is not an else if !!!
+			if( thrd.state == Blocked ) {
+
+				// This is the "easy case"
+				// The thread is parked and can easily be moved to active cluster
+				verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
+				thrd.curr_cluster = active_cluster();
+
+				// unpark the fast io_poller
+				unpark( &thrd __cfaabi_dbg_ctx2 );
+			}
+			else {
+
+				// The thread is in a weird state
+				// I don't know what to do here
+				abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n");
+			}
+		} else {
+			unpark( &thrd __cfaabi_dbg_ctx2 );
+		}
+
+		^(this.thrd){};
+		__cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %p\n", &this);
+
+		__io_destroy( *this.thrd.ring );
+		__cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %p\n", &this);
+
+		free(this.thrd.ring);
+	}
+
+	void ^?{}(io_context & this) {
+		^(this){ false };
+	}
+
+	static void __io_create( __io_data & this, const io_context_params & params_in ) {
 		// Step 1 : call to setup
 		struct io_uring_params params;
 		memset(&params, 0, sizeof(params));
-		if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS   ) params.flags |= IORING_SETUP_SQPOLL;
-		if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES ) params.flags |= IORING_SETUP_IOPOLL;
-
-		uint32_t nentries = entries_per_cluster();
+		if( params_in.poll_submit   ) params.flags |= IORING_SETUP_SQPOLL;
+		if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL;
+
+		uint32_t nentries = params_in.num_entries;
 
 		int fd = syscall(__NR_io_uring_setup, nentries, &params );
@@ -206,7 +369,7 @@
 
 		// Step 2 : mmap result
-		memset( this.io, 0, sizeof(struct __io_data) );
-		struct __submition_data  & sq = this.io->submit_q;
-		struct __completion_data & cq = this.io->completion_q;
+		memset( &this, 0, sizeof(struct __io_data) );
+		struct __submition_data  & sq = this.submit_q;
+		struct __completion_data & cq = this.completion_q;
 
 		// calculate the right ring size
@@ -275,7 +438,7 @@
 		(sq.release_lock){};
 
-		if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) {
-			/* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8)  );
-			sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
+		if( params_in.poller_submits || params_in.eager_submits ) {
+			/* paranoid */ verify( is_pow2( params_in.num_ready ) || (params_in.num_ready < 8) );
+			sq.ready_cnt = max( params_in.num_ready, 8 );
 			sq.ready = alloc_align( 64, sq.ready_cnt );
 			for(i; sq.ready_cnt) {
@@ -308,109 +471,14 @@
 
 		// Update the global ring info
-		this.io->ring_flags = params.flags;
-		this.io->cltr_flags = io_flags;
-		this.io->fd         = fd;
-		this.io->done       = false;
-		(this.io->submit){ min(*sq.num, *cq.num) };
-
-		if(!main_cluster) {
-			__kernel_io_finish_start( this );
-		}
-	}
-
-	void __kernel_io_finish_start( cluster & this ) {
-		if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
-			__cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
-			(this.io->poller.fast){ this };
-			__thrd_start( this.io->poller.fast, main );
-		}
-
-		// Create the poller thread
-		__cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this);
-		this.io->poller.slow.blocked = false;
-		this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
-	}
-
-	void __kernel_io_prepare_stop( cluster & this ) {
-		__cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
-		// Notify the poller thread of the shutdown
-		__atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
-
-		// Stop the IO Poller
-		sigval val = { 1 };
-		pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
-		post( this.io->poller.sem );
-
-		// Wait for the poller thread to finish
-		pthread_join( this.io->poller.slow.kthrd, 0p );
-		free( this.io->poller.slow.stack );
-
-		__cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
-
-		if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
-			with( this.io->poller.fast ) {
-				/* paranoid */ verify( this.nprocessors == 0 || &this == mainCluster );
-				/* paranoid */ verify( !ready_mutate_islocked() );
-
-				// We need to adjust the clean-up based on where the thread is
-				if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
-
-					ready_schedule_lock( (struct __processor_id_t *)active_processor() );
-
-						// This is the tricky case
-						// The thread was preempted and now it is on the ready queue
-						// The thread should be the last on the list
-						/* paranoid */ verify( thrd.link.next != 0p );
-
-						// Remove the thread from the ready queue of this cluster
-						__attribute__((unused)) bool removed = remove_head( &this, &thrd );
-						/* paranoid */ verify( removed );
-						thrd.link.next = 0p;
-						thrd.link.prev = 0p;
-						__cfaabi_dbg_debug_do( thrd.unpark_stale = true );
-
-						// Fixup the thread state
-						thrd.state = Blocked;
-						thrd.ticket = 0;
-						thrd.preempted = __NO_PREEMPTION;
-
-					ready_schedule_unlock( (struct __processor_id_t *)active_processor() );
-
-					// Pretend like the thread was blocked all along
-				}
-				// !!! This is not an else if !!!
-				if( thrd.state == Blocked ) {
-
-					// This is the "easy case"
-					// The thread is parked and can easily be moved to active cluster
-					verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
-					thrd.curr_cluster = active_cluster();
-
-					// unpark the fast io_poller
-					unpark( &thrd __cfaabi_dbg_ctx2 );
-				}
-				else {
-
-					// The thread is in a weird state
-					// I don't know what to do here
-					abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n");
-				}
-
-			}
-
-			^(this.io->poller.fast){};
-
-			__cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
-		}
-	}
-
-	void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
-		if(!main_cluster) {
-			__kernel_io_prepare_stop( this );
-		}
-
+		this.ring_flags = params.flags;
+		this.fd         = fd;
+		this.eager_submits  = params_in.eager_submits;
+		this.poller_submits = params_in.poller_submits;
+	}
+
+	void __io_destroy( __io_data & this ) {
 		// Shutdown the io rings
-		struct __submition_data  & sq = this.io->submit_q;
-		struct __completion_data & cq = this.io->completion_q;
+		struct __submition_data  & sq = this.submit_q;
+		struct __completion_data & cq = this.completion_q;
 
 		// unmap the submit queue entries
@@ -426,16 +494,13 @@
 
 		// close the file descriptor
-		close(this.io->fd);
-
-		free( this.io->submit_q.ready ); // Maybe null, doesn't matter
-		free( this.io );
-	}
-
-	int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) {
+		close(this.fd);
+
+		free( this.submit_q.ready ); // Maybe null, doesn't matter
+	}
+
+	int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) {
 		bool need_sys_to_submit = false;
 		bool need_sys_to_complete = false;
-		unsigned min_complete = 0;
 		unsigned flags = 0;
-
 
 		TO_SUBMIT:
@@ -451,12 +516,6 @@
 		}
 
-		TO_COMPLETE:
 		if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
 			flags |= IORING_ENTER_GETEVENTS;
-			if( mask ) {
-				need_sys_to_complete = true;
-				min_complete = 1;
-				break TO_COMPLETE;
-			}
 			if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
 				need_sys_to_complete = true;
@@ -466,5 +525,5 @@
 		int ret = 0;
 		if( need_sys_to_submit || need_sys_to_complete ) {
-			ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);
+			ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, 0p, _NSIG / 8);
 			if( ret < 0 ) {
 				switch((int)errno) {
@@ -490,25 +549,24 @@
 	static uint32_t __release_consumed_submission( struct __io_data & ring );
 
-	static inline void process(struct io_uring_cqe & cqe, struct __processor_id_t * id ) {
+	static inline void process(struct io_uring_cqe & cqe ) {
 		struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data;
 		__cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
 
 		data->result = cqe.res;
-		if(!id) { unpark(     data->thrd __cfaabi_dbg_ctx2 ); }
-		else  { __unpark( id, data->thrd __cfaabi_dbg_ctx2 ); }
+		unpark( data->thrd __cfaabi_dbg_ctx2 );
 	}
 
 	// Process a single completion message from the io_uring
 	// This is NOT thread-safe
-	static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) {
+	static [int, bool] __drain_io( & struct __io_data ring ) {
 		/* paranoid */ verify( !kernelTLS.preemption_state.enabled );
 
 		unsigned to_submit = 0;
-		if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
+		if( ring.poller_submits ) {
 			// If the poller thread also submits, then we need to aggregate the submissions which are ready
 			to_submit = __collect_submitions( ring );
 		}
 
-		int ret = __io_uring_enter(ring, to_submit, true, mask);
+		int ret = __io_uring_enter(ring, to_submit, true);
 		if( ret < 0 ) {
 			return [0, true];
@@ -547,9 +605,6 @@
 			/* paranoid */ verify(&cqe);
 
-			process( cqe, !mask ? (struct __processor_id_t *)0p : &ring.poller.slow.id );
-		}
-
-		// Allow new submissions to happen
-		// V(ring.submit, count);
+			process( cqe );
+		}
 
 		// Mark to the kernel that the cqe has been seen
@@ -561,99 +616,23 @@
 	}
 
-	static void * __io_poller_slow( void * arg ) {
-		#if !defined( __CFA_NO_STATISTICS__ )
-			__stats_t local_stats;
-			__init_stats( &local_stats );
-			kernelTLS.this_stats = &local_stats;
-		#endif
-
-		cluster * cltr = (cluster *)arg;
-		struct __io_data & ring = *cltr->io;
-
-		ring.poller.slow.id.id = doregister( &ring.poller.slow.id );
-
-		sigset_t mask;
-		sigfillset(&mask);
-		if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
-			abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
-		}
-
-		sigdelset( &mask, SIGUSR1 );
-
-		verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
-		verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
-
-		__cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
-
-		if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
-			while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
-
-				__atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST );
-
-				// In the user-thread approach drain and if anything was drained,
-				// batton pass to the user-thread
-				int count;
-				bool again;
-				[count, again] = __drain_io( ring, &mask );
-
-				__atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
-
-				// Update statistics
-				__STATS__( true,
-					io.complete_q.completed_avg.val += count;
-					io.complete_q.completed_avg.slow_cnt += 1;
-				)
-
-				if(again) {
-					__cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
-					__unpark( &ring.poller.slow.id, &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
-					wait( ring.poller.sem );
-				}
-			}
-		}
-		else {
-			while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
-				//In the naive approach, just poll the io completion queue directly
-				int count;
-				bool again;
-				[count, again] = __drain_io( ring, &mask );
-
-				// Update statistics
-				__STATS__( true,
-					io.complete_q.completed_avg.val += count;
-					io.complete_q.completed_avg.slow_cnt += 1;
-				)
-			}
-		}
-
-		__cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
-
-		unregister( &ring.poller.slow.id );
-
-		#if !defined(__CFA_NO_STATISTICS__)
-			__tally_stats(cltr->stats, &local_stats);
-		#endif
-
-		return 0p;
-	}
-
-	void main( __io_poller_fast & this ) {
-		verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD );
-
-		// Start parked
-		park( __cfaabi_dbg_ctx );
-
-		__cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
+	void main( $io_ctx_thread & this ) {
+		epoll_event ev;
+		ev.events = EPOLLIN | EPOLLONESHOT;
+		ev.data.u64 = (uint64_t)&this;
+		int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, this.ring->fd, &ev);
+		if (ret < 0) {
+			abort( "KERNEL ERROR: EPOLL ADD - (%d) %s\n", (int)errno, strerror(errno) );
+		}
+
+		__cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring);
 
 		int reset = 0;
-
 		// Then loop until we need to start
-		while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
-
+		while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
 			// Drain the io
 			int count;
 			bool again;
 			disable_interrupts();
-				[count, again] = __drain_io( *this.ring, 0p );
+				[count, again] = __drain_io( *this.ring );
 
 				if(!again) reset++;
@@ -672,24 +651,19 @@
 			// We didn't get anything baton pass to the slow poller
 			else {
-				__cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
+				__cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self);
 				reset = 0;
 
 				// wake up the slow poller
-				post( this.ring->poller.sem );
+				ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, this.ring->fd, &ev);
+				if (ret < 0) {
+					abort( "KERNEL ERROR: EPOLL REARM - (%d) %s\n", (int)errno, strerror(errno) );
+				}
 
 				// park this thread
-				park( __cfaabi_dbg_ctx );
+				wait( this.sem );
 			}
 		}
 
 		__cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
-	}
-
-	static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
-	static inline void __wake_poller( struct __io_data & ring ) {
-		if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;
-
-		sigval val = { 1 };
-		pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
 	}
 
@@ -806,19 +780,20 @@
 	}
 
-	void __submit( struct __io_data & ring, uint32_t idx ) {
+	void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1))) {
+		__io_data & ring = *ctx->thrd.ring;
 		// Get now the data we definetely need
 		uint32_t * const tail = ring.submit_q.tail;
-		const uint32_t mask = *ring.submit_q.mask;
+		const uint32_t mask  = *ring.submit_q.mask;
 
 		// There are 2 submission schemes, check which one we are using
-		if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
+		if( ring.poller_submits ) {
 			// If the poller thread submits, then we just need to add this to the ready array
 			__submit_to_ready_array( ring, idx, mask );
 
-			__wake_poller( ring );
+			post( ctx->thrd.sem );
 
 			__cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
 		}
-		else if( ring.cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS ) {
+		else if( ring.eager_submits ) {
 			uint32_t picked = __submit_to_ready_array( ring, idx, mask );
 
@@ -849,5 +824,5 @@
 			// We got the lock
 			unsigned to_submit = __collect_submitions( ring );
-			int ret = __io_uring_enter( ring, to_submit, false, 0p );
+			int ret = __io_uring_enter( ring, to_submit, false );
 			if( ret < 0 ) {
 				unlock(ring.submit_q.lock);
@@ -892,5 +867,5 @@
 
 			// Submit however, many entries need to be submitted
-			int ret = __io_uring_enter( ring, 1, false, 0p );
+			int ret = __io_uring_enter( ring, 1, false );
 			if( ret < 0 ) {
 				switch((int)errno) {
@@ -963,6 +938,6 @@
 //=============================================================================================
 
-	void register_fixed_files( cluster & cl, int * files, unsigned count ) {
-		int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );
+	void register_fixed_files( io_context & ctx, int * files, unsigned count ) {
+		int ret = syscall( __NR_io_uring_register, ctx.thrd.ring->fd, IORING_REGISTER_FILES, files, count );
 		if( ret < 0 ) {
 			abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
@@ -971,3 +946,9 @@
 		__cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
 	}
+
+	void register_fixed_files( cluster & cltr, int * files, unsigned count ) {
+		for(i; cltr.io.cnt) {
+			register_fixed_files( cltr.io.ctxs[i], files, count );
+		}
+	}
 #endif
Index: libcfa/src/concurrency/iocall.cfa
===================================================================
--- libcfa/src/concurrency/iocall.cfa	(revision 39fc03effedee9acf13ca846d4c85c1bff3790ff)
+++ libcfa/src/concurrency/iocall.cfa	(revision f00b26d473acc916ee96e28fa1365688d9b3c2da)
@@ -22,4 +22,5 @@
 #if defined(CFA_HAVE_LINUX_IO_URING_H)
 	#include <stdint.h>
+	#include <errno.h>
 	#include <linux/io_uring.h>
 
@@ -27,5 +28,5 @@
 
 	extern [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data );
-	extern void __submit( struct __io_data & ring, uint32_t idx );
+	extern void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1)));
 
 	static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) {
@@ -52,16 +53,61 @@
 	}
 
+
+
+      #if defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_IO_DRAIN) && defined(CFA_HAVE_IOSQE_ASYNC)
+		#define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_DRAIN | IOSQE_ASYNC)
+	#elif defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_ASYNC)
+		#define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_ASYNC)
+      #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_IO_DRAIN)
+		#define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_DRAIN)
+      #elif defined(CFA_HAVE_IOSQE_IO_DRAIN) && defined(CFA_HAVE_IOSQE_ASYNC)
+		#define REGULAR_FLAGS (IOSQE_IO_DRAIN | IOSQE_ASYNC)
+	#elif defined(CFA_HAVE_IOSQE_FIXED_FILE)
+		#define REGULAR_FLAGS (IOSQE_FIXED_FILE)
+      #elif defined(CFA_HAVE_IOSQE_IO_DRAIN)
+		#define REGULAR_FLAGS (IOSQE_IO_DRAIN)
+      #elif defined(CFA_HAVE_IOSQE_ASYNC)
+		#define REGULAR_FLAGS (IOSQE_ASYNC)
+	#else
+		#define REGULAR_FLAGS (0)
+	#endif
+
+	#if defined(CFA_HAVE_IOSQE_IO_LINK) && defined(CFA_HAVE_IOSQE_IO_HARDLINK)
+		#define LINK_FLAGS (IOSQE_IO_LINK | IOSQE_IO_HARDLINK)
+	#elif defined(CFA_HAVE_IOSQE_IO_LINK)
+		#define LINK_FLAGS (IOSQE_IO_LINK)
+	#elif defined(CFA_HAVE_IOSQE_IO_HARDLINK)
+		#define LINK_FLAGS (IOSQE_IO_HARDLINK)
+	#else
+		#define LINK_FLAGS (0)
+	#endif
+
+	#if defined(CFA_HAVE_SPLICE_F_FD_IN_FIXED)
+		#define SPLICE_FLAGS (SPLICE_F_FD_IN_FIXED)
+	#else
+		#define SPLICE_FLAGS (0)
+	#endif
+
+
 	#define __submit_prelude \
+		if( 0 != (submit_flags & LINK_FLAGS) ) { errno = ENOTSUP; return -1; } \
+		(void)timeout; (void)cancellation; \
+		if( !context ) context = __get_io_context(); \
 		__io_user_data_t data = { 0, active_thread() }; \
-		struct __io_data & ring = *data.thrd->curr_cluster->io; \
+		struct __io_data & ring = *context->thrd.ring; \
 		struct io_uring_sqe * sqe; \
 		uint32_t idx; \
-		[sqe, idx] = __submit_alloc( ring, (uint64_t)(uintptr_t)&data );
+		[sqe, idx] = __submit_alloc( ring, (uint64_t)(uintptr_t)&data ); \
+		sqe->flags = REGULAR_FLAGS & submit_flags;
 
 	#define __submit_wait \
 		/*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \
 		verify( sqe->user_data == (uint64_t)(uintptr_t)&data ); \
-		__submit( ring, idx ); \
+		__submit( context, idx ); \
 		park( __cfaabi_dbg_ctx ); \
+		if( data.result < 0 ) { \
+			errno = -data.result; \
+			return -1; \
+		} \
 		return data.result;
 #endif
@@ -70,4 +116,5 @@
 // I/O Forwards
 //=============================================================================================
+#include <time.hfa>
 
 // Some forward declarations
@@ -121,5 +168,5 @@
 // Asynchronous operations
 #if defined(HAVE_PREADV2)
-	ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
+	ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
 		#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READV)
 			return preadv2(fd, iov, iovcnt, offset, flags);
@@ -132,13 +179,14 @@
 		#endif
 	}
-
-	ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
-		#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READV)
-			return preadv2(fd, iov, iovcnt, offset, flags);
+#endif
+
+#if defined(HAVE_PWRITEV2)
+	ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
+		#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITEV)
+			return pwritev2(fd, iov, iovcnt, offset, flags);
 		#else
 			__submit_prelude
 
-			(*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset };
-			sqe->flags |= IOSQE_FIXED_FILE;
+			(*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset };
 
 			__submit_wait
@@ -147,19 +195,5 @@
 #endif
 
-#if defined(HAVE_PWRITEV2)
-	ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
-		#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITEV)
-			return pwritev2(fd, iov, iovcnt, offset, flags);
-		#else
-			__submit_prelude
-
-			(*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset };
-
-			__submit_wait
-		#endif
-	}
-#endif
-
-int cfa_fsync(int fd) {
+int cfa_fsync(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
 	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FSYNC)
 		return fsync(fd);
@@ -173,5 +207,5 @@
 }
 
-int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags) {
+int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
 	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SYNC_FILE_RANGE)
 		return sync_file_range(fd, offset, nbytes, flags);
@@ -189,5 +223,5 @@
 
 
-ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags) {
+ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
 	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SENDMSG)
 		return sendmsg(sockfd, msg, flags);
@@ -202,5 +236,5 @@
 }
 
-ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags) {
+ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
 	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_RECVMSG)
 		return recvmsg(sockfd, msg, flags);
@@ -215,5 +249,5 @@
 }
 
-ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags) {
+ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
 	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SEND)
 		return send( sockfd, buf, len, flags );
@@ -230,5 +264,5 @@
 }
 
-ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags) {
+ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
 	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_RECV)
 		return recv( sockfd, buf, len, flags );
@@ -245,5 +279,5 @@
 }
 
-int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
+int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
 	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ACCEPT)
 		return accept4( sockfd, addr, addrlen, flags );
@@ -260,5 +294,5 @@
 }
 
-int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) {
+int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
 	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_CONNECT)
 		return connect( sockfd, addr, addrlen );
@@ -274,5 +308,5 @@
 }
 
-int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len) {
+int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
 	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FALLOCATE)
 		return fallocate( fd, mode, offset, len );
@@ -289,5 +323,5 @@
 }
 
-int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice) {
+int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
 	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FADVISE)
 		return posix_fadvise( fd, offset, len, advice );
@@ -304,5 +338,5 @@
 }
 
-int cfa_madvise(void *addr, size_t length, int advice) {
+int cfa_madvise(void *addr, size_t length, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
 	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_MADVISE)
 		return madvise( addr, length, advice );
@@ -319,5 +353,5 @@
 }
 
-int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode) {
+int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
 	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_OPENAT)
 		return openat( dirfd, pathname, flags, mode );
@@ -334,5 +368,5 @@
 }
 
-int cfa_close(int fd) {
+int cfa_close(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
 	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_CLOSE)
 		return close( fd );
@@ -348,5 +382,5 @@
 // Forward declare in case it is not supported
 struct statx;
-int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf) {
+int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
 	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_STATX)
 		#if defined(__NR_statx)
@@ -360,11 +394,11 @@
 
 		(*sqe){ IORING_OP_STATX, dirfd, pathname, mask, (uint64_t)statxbuf };
-		sqe->flags = flags;
-
-		__submit_wait
-	#endif
-}
-
-ssize_t cfa_read(int fd, void *buf, size_t count) {
+		sqe->statx_flags = flags;
+
+		__submit_wait
+	#endif
+}
+
+ssize_t cfa_read(int fd, void *buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
 	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READ)
 		return read( fd, buf, count );
@@ -378,5 +412,5 @@
 }
 
-ssize_t cfa_write(int fd, void *buf, size_t count) {
+ssize_t cfa_write(int fd, void *buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
 	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITE)
 		return read( fd, buf, count );
@@ -390,5 +424,5 @@
 }
 
-ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags) {
+ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
 	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SPLICE)
 		return splice( fd_in, off_in, fd_out, off_out, len, flags );
@@ -399,27 +433,11 @@
 		sqe->splice_fd_in  = fd_in;
 		sqe->splice_off_in = off_in;
-		sqe->splice_flags  = flags;
-
-		__submit_wait
-	#endif
-}
-
-ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int in_flags, int out_flags) {
-	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SPLICE)
-		return splice( fd_in, off_in, fd_out, off_out, len, flags );
-	#else
-		__submit_prelude
-
-		(*sqe){ IORING_OP_SPLICE, fd_out, 0p, len, off_out };
-		sqe->splice_fd_in  = fd_in;
-		sqe->splice_off_in = off_in;
-		sqe->splice_flags  = flags | out_flags;
-		sqe->flags = in_flags;
-
-		__submit_wait
-	#endif
-}
-
-ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags) {
+		sqe->splice_flags  = flags | (SPLICE_FLAGS & submit_flags);
+
+		__submit_wait
+	#endif
+}
+
+ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
 	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_TEE)
 		return tee( fd_in, fd_out, len, flags );
@@ -429,5 +447,5 @@
 		(*sqe){ IORING_OP_TEE, fd_out, 0p, len, 0 };
 		sqe->splice_fd_in = fd_in;
-		sqe->splice_flags = flags;
+		sqe->splice_flags  = flags | (SPLICE_FLAGS & submit_flags);
 
 		__submit_wait
@@ -536,6 +554,5 @@
 
 		if( /*func == (fptr_t)splice || */
-			func == (fptr_t)(ssize_t (*)(int, loff_t *, int, loff_t *, size_t, unsigned int))cfa_splice,
-			func == (fptr_t)(ssize_t (*)(int, loff_t *, int, loff_t *, size_t, unsigned int, int, int))cfa_splice )
+			func == (fptr_t)cfa_splice )
 			#define _CFA_IO_FEATURE_CFA_HAVE_IORING_OP_SPLICE ,
 			return IS_DEFINED(CFA_HAVE_IORING_OP_SPLICE);
Index: libcfa/src/concurrency/iofwd.hfa
===================================================================
--- libcfa/src/concurrency/iofwd.hfa	(revision 39fc03effedee9acf13ca846d4c85c1bff3790ff)
+++ libcfa/src/concurrency/iofwd.hfa	(revision f00b26d473acc916ee96e28fa1365688d9b3c2da)
@@ -19,6 +19,27 @@
 extern "C" {
 	#include <sys/types.h>
+	#if CFA_HAVE_LINUX_IO_URING_H
+		#include <linux/io_uring.h>
+	#endif
 }
 #include "bits/defs.hfa"
+#include "time.hfa"
+
+#if defined(CFA_HAVE_IOSQE_FIXED_FILE)
+	#define CFA_IO_FIXED_FD1 IOSQE_FIXED_FILE
+#endif
+#if defined(CFA_HAVE_SPLICE_F_FD_IN_FIXED)
+	#define CFA_IO_FIXED_FD2 SPLICE_F_FD_IN_FIXED
+#endif
+#if defined(CFA_HAVE_IOSQE_IO_DRAIN)
+	#define CFA_IO_DRAIN IOSQE_IO_DRAIN
+#endif
+#if defined(CFA_HAVE_IOSQE_ASYNC)
+	#define CFA_IO_ASYNC IOSQE_ASYNC
+#endif
+
+struct cluster;
+struct io_context;
+struct io_cancellation;
 
 struct iovec;
@@ -27,26 +48,30 @@
 struct statx;
 
-extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
-extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
-extern int cfa_fsync(int fd);
-extern int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags);
-extern ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags);
-extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags);
-extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags);
-extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags);
-extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
-extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
-extern int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len);
-extern int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice);
-extern int cfa_madvise(void *addr, size_t length, int advice);
-extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode);
-extern int cfa_close(int fd);
-extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf);
-extern ssize_t cfa_read(int fd, void *buf, size_t count);
-extern ssize_t cfa_write(int fd, void *buf, size_t count);
-extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags);
-extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags);
+extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
+extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
+extern int cfa_fsync(int fd, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
+extern int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
+extern ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
+extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
+extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
+extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
+extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
+extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
+extern int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
+extern int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
+extern int cfa_madvise(void *addr, size_t length, int advice, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
+extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
+extern int cfa_close(int fd, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
+extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
+extern ssize_t cfa_read(int fd, void *buf, size_t count, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
+extern ssize_t cfa_write(int fd, void *buf, size_t count, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
+extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
+extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
 
 //-----------------------------------------------------------------------------
 // Check if a function is blocks a only the user thread
 bool has_user_level_blocking( fptr_t func );
+
+//-----------------------------------------------------------------------------
+void register_fixed_files( io_context & ctx , int * files, unsigned count );
+void register_fixed_files( cluster    & cltr, int * files, unsigned count );
Index: libcfa/src/concurrency/kernel.cfa
===================================================================
--- libcfa/src/concurrency/kernel.cfa	(revision 39fc03effedee9acf13ca846d4c85c1bff3790ff)
+++ libcfa/src/concurrency/kernel.cfa	(revision f00b26d473acc916ee96e28fa1365688d9b3c2da)
@@ -130,4 +130,5 @@
 KERNEL_STORAGE($thread,	             mainThread);
 KERNEL_STORAGE(__stack_t,            mainThreadCtx);
+KERNEL_STORAGE(io_context,           mainPollerThread);
 KERNEL_STORAGE(__scheduler_RWLock_t, __scheduler_lock);
 #if !defined(__CFA_NO_STATISTICS__)
@@ -310,5 +311,5 @@
 }
 
-void ?{}(cluster & this, const char name[], Duration preemption_rate, unsigned io_flags) with( this ) {
+void ?{}(cluster & this, const char name[], Duration preemption_rate, unsigned num_io, const io_context_params & io_params) with( this ) {
 	this.name = name;
 	this.preemption_rate = preemption_rate;
@@ -335,10 +336,16 @@
 	ready_mutate_unlock( last_size );
 
-
-	__kernel_io_startup( this, io_flags, &this == mainCluster );
+	this.io.cnt  = num_io;
+	this.io.ctxs = aalloc(num_io);
+	for(i; this.io.cnt) {
+		(this.io.ctxs[i]){ this, io_params };
+	}
 }
 
 void ^?{}(cluster & this) {
-	__kernel_io_shutdown( this, &this == mainCluster );
+	for(i; this.io.cnt) {
+		^(this.io.ctxs[i]){ true };
+	}
+	free(this.io.ctxs);
 
 	// Lock the RWlock so no-one pushes/pops while we are changing the queue
@@ -853,5 +860,5 @@
 	// Initialize the main cluster
 	mainCluster = (cluster *)&storage_mainCluster;
-	(*mainCluster){"Main Cluster"};
+	(*mainCluster){"Main Cluster", 0};
 
 	__cfadbg_print_safe(runtime_core, "Kernel : Main cluster ready\n");
@@ -901,4 +908,7 @@
 	#endif
 
+	// Start IO
+	__kernel_io_startup();
+
 	// Enable preemption
 	kernel_start_preemption();
@@ -918,6 +928,7 @@
 
 	// Now that the system is up, finish creating systems that need threading
-	__kernel_io_finish_start( *mainCluster );
-
+	mainCluster->io.ctxs = (io_context *)&storage_mainPollerThread;
+	mainCluster->io.cnt  = 1;
+	(*mainCluster->io.ctxs){ *mainCluster };
 
 	__cfadbg_print_safe(runtime_core, "Kernel : Started\n--------------------------------------------------\n\n");
@@ -930,5 +941,7 @@
 static void __kernel_shutdown(void) {
 	//Before we start shutting things down, wait for systems that need threading to shutdown
-	__kernel_io_prepare_stop( *mainCluster );
+	^(*mainCluster->io.ctxs){};
+	mainCluster->io.cnt  = 0;
+	mainCluster->io.ctxs = 0p;
 
 	/* paranoid */ verify( TL_GET( preemption_state.enabled ) );
@@ -949,4 +962,7 @@
 	// Disable preemption
 	kernel_stop_preemption();
+
+	// Stop IO
+	__kernel_io_shutdown();
 
 	// Destroy the main processor and its context in reverse order of construction
Index: libcfa/src/concurrency/kernel.hfa
===================================================================
--- libcfa/src/concurrency/kernel.hfa	(revision 39fc03effedee9acf13ca846d4c85c1bff3790ff)
+++ libcfa/src/concurrency/kernel.hfa	(revision f00b26d473acc916ee96e28fa1365688d9b3c2da)
@@ -129,11 +129,42 @@
 struct __io_data;
 
-#define CFA_CLUSTER_IO_POLLER_USER_THREAD    (1 << 0) // 0x01
-#define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS (1 << 1) // 0x02
-#define CFA_CLUSTER_IO_EAGER_SUBMITS         (1 << 2) // 0x04
-#define CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS   (1 << 3) // 0x08
-#define CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES (1 << 4) // 0x10
-#define CFA_CLUSTER_IO_BUFFLEN_OFFSET        16
-
+// IO poller user-thread
+// Not using the "thread" keyword because we want to control
+// more carefully when to start/stop it
+struct $io_ctx_thread {
+	struct __io_data * ring;
+	single_sem sem;
+	volatile bool done;
+	$thread self;
+};
+
+
+struct io_context {
+	$io_ctx_thread thrd;
+};
+
+struct io_context_params {
+	int num_entries;
+	int num_ready;
+	int submit_aff;
+	bool eager_submits:1;
+	bool poller_submits:1;
+	bool poll_submit:1;
+	bool poll_complete:1;
+};
+
+void  ?{}(io_context_params & this);
+
+void  ?{}(io_context & this, struct cluster & cl);
+void  ?{}(io_context & this, struct cluster & cl, const io_context_params & params);
+void ^?{}(io_context & this);
+
+struct io_cancellation {
+	uint32_t target;
+};
+
+static inline void  ?{}(io_cancellation & this) { this.target = -1u; }
+static inline void ^?{}(io_cancellation & this) {}
+bool cancel(io_cancellation & this);
 
 //-----------------------------------------------------------------------------
@@ -206,5 +237,8 @@
 	} node;
 
-	struct __io_data * io;
+	struct {
+		io_context * ctxs;
+		unsigned cnt;
+	} io;
 
 	#if !defined(__CFA_NO_STATISTICS__)
@@ -215,13 +249,19 @@
 extern Duration default_preemption();
 
-void ?{} (cluster & this, const char name[], Duration preemption_rate, unsigned flags);
+void ?{} (cluster & this, const char name[], Duration preemption_rate, unsigned num_io, const io_context_params & io_params);
 void ^?{}(cluster & this);
 
-static inline void ?{} (cluster & this)                                           { this{"Anonymous Cluster", default_preemption(), 0}; }
-static inline void ?{} (cluster & this, Duration preemption_rate)                 { this{"Anonymous Cluster", preemption_rate, 0}; }
-static inline void ?{} (cluster & this, const char name[])                        { this{name, default_preemption(), 0}; }
-static inline void ?{} (cluster & this, unsigned flags)                           { this{"Anonymous Cluster", default_preemption(), flags}; }
-static inline void ?{} (cluster & this, Duration preemption_rate, unsigned flags) { this{"Anonymous Cluster", preemption_rate, flags}; }
-static inline void ?{} (cluster & this, const char name[], unsigned flags)        { this{name, default_preemption(), flags}; }
+static inline void ?{} (cluster & this)                                            { io_context_params default_params;    this{"Anonymous Cluster", default_preemption(), 1, default_params}; }
+static inline void ?{} (cluster & this, Duration preemption_rate)                  { io_context_params default_params;    this{"Anonymous Cluster", preemption_rate, 1, default_params}; }
+static inline void ?{} (cluster & this, const char name[])                         { io_context_params default_params;    this{name, default_preemption(), 1, default_params}; }
+static inline void ?{} (cluster & this, unsigned num_io)                           { io_context_params default_params;    this{"Anonymous Cluster", default_preemption(), num_io, default_params}; }
+static inline void ?{} (cluster & this, Duration preemption_rate, unsigned num_io) { io_context_params default_params;    this{"Anonymous Cluster", preemption_rate, num_io, default_params}; }
+static inline void ?{} (cluster & this, const char name[], unsigned num_io)        { io_context_params default_params;    this{name, default_preemption(), num_io, default_params}; }
+static inline void ?{} (cluster & this, const io_context_params & io_params)                                            { this{"Anonymous Cluster", default_preemption(), 1, io_params}; }
+static inline void ?{} (cluster & this, Duration preemption_rate, const io_context_params & io_params)                  { this{"Anonymous Cluster", preemption_rate, 1, io_params}; }
+static inline void ?{} (cluster & this, const char name[], const io_context_params & io_params)                         { this{name, default_preemption(), 1, io_params}; }
+static inline void ?{} (cluster & this, unsigned num_io, const io_context_params & io_params)                           { this{"Anonymous Cluster", default_preemption(), num_io, io_params}; }
+static inline void ?{} (cluster & this, Duration preemption_rate, unsigned num_io, const io_context_params & io_params) { this{"Anonymous Cluster", preemption_rate, num_io, io_params}; }
+static inline void ?{} (cluster & this, const char name[], unsigned num_io, const io_context_params & io_params)        { this{name, default_preemption(), num_io, io_params}; }
 
 static inline [cluster *&, cluster *& ] __get( cluster & this ) __attribute__((const)) { return this.node.[next, prev]; }
Index: libcfa/src/concurrency/kernel_private.hfa
===================================================================
--- libcfa/src/concurrency/kernel_private.hfa	(revision 39fc03effedee9acf13ca846d4c85c1bff3790ff)
+++ libcfa/src/concurrency/kernel_private.hfa	(revision f00b26d473acc916ee96e28fa1365688d9b3c2da)
@@ -84,10 +84,21 @@
 void __unpark( struct __processor_id_t *, $thread * thrd __cfaabi_dbg_ctx_param2 );
 
-//-----------------------------------------------------------------------------
-// I/O
-void __kernel_io_startup     ( cluster &, unsigned, bool );
-void __kernel_io_finish_start( cluster & );
-void __kernel_io_prepare_stop( cluster & );
-void __kernel_io_shutdown    ( cluster &, bool );
+static inline bool __post(single_sem & this, struct __processor_id_t * id) {
+	for() {
+		struct $thread * expected = this.ptr;
+		if(expected == 1p) return false;
+		if(expected == 0p) {
+			if(__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
+				return false;
+			}
+		}
+		else {
+			if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
+				__unpark( id, expected __cfaabi_dbg_ctx2 );
+				return true;
+			}
+		}
+	}
+}
 
 //-----------------------------------------------------------------------------
@@ -109,4 +120,19 @@
 void doregister( struct cluster * cltr, struct $thread & thrd );
 void unregister( struct cluster * cltr, struct $thread & thrd );
+
+//-----------------------------------------------------------------------------
+// I/O
+void __kernel_io_startup     ();
+void __kernel_io_shutdown    ();
+
+static inline io_context * __get_io_context( void ) {
+	cluster * cltr = active_cluster();
+	/* paranoid */ verifyf( cltr, "No active cluster for io operation\n");
+	assertf( cltr->io.cnt > 0, "Cluster %p has no default io contexts and no context was specified\n", cltr );
+	/* paranoid */ verifyf( cltr->io.ctxs, "default io contexts for cluster %p are missing\n", cltr);
+	return &cltr->io.ctxs[ __tls_rand() % cltr->io.cnt ];
+}
+
+void ^?{}(io_context & this, bool );
 
 //=======================================================================
