Index: libcfa/src/concurrency/io.cfa
===================================================================
--- libcfa/src/concurrency/io.cfa	(revision b44959f37442409070ab8f43baa6432ba2c8f812)
+++ libcfa/src/concurrency/io.cfa	(revision 78da4abe22e69798ee3738623dc9a2d99e34d207)
@@ -17,6 +17,6 @@
 
 #if defined(__CFA_DEBUG__)
-	// #define __CFA_DEBUG_PRINT_IO__
-	// #define __CFA_DEBUG_PRINT_IO_CORE__
+	#define __CFA_DEBUG_PRINT_IO__
+	#define __CFA_DEBUG_PRINT_IO_CORE__
 #endif
 
@@ -79,71 +79,8 @@
 	};
 
-	// returns true of acquired as leader or second leader
-	static inline bool try_lock( __leaderlock_t & this ) {
-		const uintptr_t thrd = 1z | (uintptr_t)active_thread();
-		bool block;
-		disable_interrupts();
-		for() {
-			struct $thread * expected = this.value;
-			if( 1p != expected && 0p != expected ) {
-				/* paranoid */ verify( thrd != (uintptr_t)expected ); // We better not already be the next leader
-				enable_interrupts( __cfaabi_dbg_ctx );
-				return false;
-			}
-			struct $thread * desired;
-			if( 0p == expected ) {
-				// If the lock isn't locked acquire it, no need to block
-				desired = 1p;
-				block = false;
-			}
-			else {
-				// If the lock is already locked try becomming the next leader
-				desired = (struct $thread *)thrd;
-				block = true;
-			}
-			if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break;
-		}
-		if( block ) {
-			enable_interrupts( __cfaabi_dbg_ctx );
-			park();
-			disable_interrupts();
-		}
-		return true;
-	}
-
-	static inline bool next( __leaderlock_t & this ) {
-		/* paranoid */ verify( ! __preemption_enabled() );
-		struct $thread * nextt;
-		for() {
-			struct $thread * expected = this.value;
-			/* paranoid */ verify( (1 & (uintptr_t)expected) == 1 ); // The lock better be locked
-
-			struct $thread * desired;
-			if( 1p == expected ) {
-				// No next leader, just unlock
-				desired = 0p;
-				nextt   = 0p;
-			}
-			else {
-				// There is a next leader, remove but keep locked
-				desired = 1p;
-				nextt   = (struct $thread *)(~1z & (uintptr_t)expected);
-			}
-			if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break;
-		}
-
-		if(nextt) {
-			unpark( nextt );
-			enable_interrupts( __cfaabi_dbg_ctx );
-			return true;
-		}
-		enable_interrupts( __cfaabi_dbg_ctx );
-		return false;
-	}
-
 //=============================================================================================
 // I/O Syscall
 //=============================================================================================
-	static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) {
+	static int __io_uring_enter( struct $io_context & ctx, unsigned to_submit, bool get ) {
 		bool need_sys_to_submit = false;
 		bool need_sys_to_complete = false;
@@ -152,9 +89,9 @@
 		TO_SUBMIT:
 		if( to_submit > 0 ) {
-			if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
+			if( !(ctx.ring_flags & IORING_SETUP_SQPOLL) ) {
 				need_sys_to_submit = true;
 				break TO_SUBMIT;
 			}
-			if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
+			if( (*ctx.sq.flags) & IORING_SQ_NEED_WAKEUP ) {
 				need_sys_to_submit = true;
 				flags |= IORING_ENTER_SQ_WAKEUP;
@@ -162,7 +99,7 @@
 		}
 
-		if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
+		if( get && !(ctx.ring_flags & IORING_SETUP_SQPOLL) ) {
 			flags |= IORING_ENTER_GETEVENTS;
-			if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
+			if( (ctx.ring_flags & IORING_SETUP_IOPOLL) ) {
 				need_sys_to_complete = true;
 			}
@@ -171,19 +108,7 @@
 		int ret = 0;
 		if( need_sys_to_submit || need_sys_to_complete ) {
-			__cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags);
-			ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8);
-			__cfadbg_print_safe(io_core, "Kernel I/O : IO_URING %d returned %d\n", ring.fd, ret);
-
-			if( ret < 0 ) {
-				switch((int)errno) {
-				case EAGAIN:
-				case EINTR:
-				case EBUSY:
-					ret = -1;
-					break;
-				default:
-					abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
-				}
-			}
+			__cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ctx.fd, to_submit, flags);
+			ret = syscall( __NR_io_uring_enter, ctx.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8);
+			__cfadbg_print_safe(io_core, "Kernel I/O : IO_URING %d returned %d\n", ctx.fd, ret);
 		}
 
@@ -196,34 +121,25 @@
 // I/O Polling
 //=============================================================================================
-	static unsigned __collect_submitions( struct __io_data & ring );
-	static __u32 __release_consumed_submission( struct __io_data & ring );
-	static inline void __clean( volatile struct io_uring_sqe * sqe );
-
-	// Process a single completion message from the io_uring
-	// This is NOT thread-safe
-	static inline void process( volatile struct io_uring_cqe & cqe ) {
-		struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
-		__cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
-
-		fulfil( *future, cqe.res );
-	}
-
-	static [int, bool] __drain_io( & struct __io_data ring ) {
-		/* paranoid */ verify( ! __preemption_enabled() );
-
-		unsigned to_submit = 0;
-		if( ring.poller_submits ) {
-			// If the poller thread also submits, then we need to aggregate the submissions which are ready
-			to_submit = __collect_submitions( ring );
-		}
-
-		int ret = __io_uring_enter(ring, to_submit, true);
+	static inline unsigned __flush( struct $io_context & );
+	static inline __u32 __release_sqes( struct $io_context & );
+
+	static [int, bool] __drain_io( & struct  $io_context ctx ) {
+		unsigned to_submit = __flush( ctx );
+		int ret = __io_uring_enter( ctx, to_submit, true );
 		if( ret < 0 ) {
-			return [0, true];
+			switch((int)errno) {
+			case EAGAIN:
+			case EINTR:
+			case EBUSY:
+				return [0, true];
+				break;
+			default:
+				abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
+			}
 		}
 
 		// update statistics
 		if (to_submit > 0) {
-			__STATS__( true,
+			__STATS__( false,
 				if( to_submit > 0 ) {
 					io.submit_q.submit_avg.rdy += to_submit;
@@ -232,15 +148,23 @@
 				}
 			)
-		}
-
-		__atomic_thread_fence( __ATOMIC_SEQ_CST );
+			/* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
+
+			/* paranoid */ verify( ctx.sq.to_submit >= ret );
+			ctx.sq.to_submit -= ret;
+
+			/* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
+
+			if(ret) {
+				__cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring\n", ret);
+			}
+		}
 
 		// Release the consumed SQEs
-		__release_consumed_submission( ring );
+		__release_sqes( ctx );
 
 		// Drain the queue
-		unsigned head = *ring.completion_q.head;
-		unsigned tail = *ring.completion_q.tail;
-		const __u32 mask = *ring.completion_q.mask;
+		unsigned head = *ctx.cq.head;
+		unsigned tail = *ctx.cq.tail;
+		const __u32 mask = *ctx.cq.mask;
 
 		// Nothing was new return 0
@@ -253,22 +177,27 @@
 		for(i; count) {
 			unsigned idx = (head + i) & mask;
-			volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
+			volatile struct io_uring_cqe & cqe = ctx.cq.cqes[idx];
 
 			/* paranoid */ verify(&cqe);
 
-			process( cqe );
+			struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
+			__cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
+
+			fulfil( *future, cqe.res );
+		}
+
+		if(count) {
+			__cfadbg_print_safe(io, "Kernel I/O : %u completed\n", count);
 		}
 
 		// Mark to the kernel that the cqe has been seen
 		// Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
-		__atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST );
+		__atomic_store_n( ctx.cq.head, head + count, __ATOMIC_SEQ_CST );
 
 		return [count, count > 0 || to_submit > 0];
 	}
 
-	void main( $io_ctx_thread & this ) {
-		__ioctx_register( this );
-
-		__cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this);
+	void main( $io_context & this ) {
+		__cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.fd, &this);
 
 		const int reset_cnt = 5;
@@ -276,19 +205,22 @@
 		// Then loop until we need to start
 		LOOP:
-		while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
+		while() {
+			waitfor( ^?{} : this) {
+				break LOOP;
+			}
+			or else {}
+
 			// Drain the io
 			int count;
 			bool again;
-			disable_interrupts();
-				[count, again] = __drain_io( *this.ring );
-
-				if(!again) reset--;
-
-				// Update statistics
-				__STATS__( true,
-					io.complete_q.completed_avg.val += count;
-					io.complete_q.completed_avg.cnt += 1;
-				)
-			enable_interrupts( __cfaabi_dbg_ctx );
+			[count, again] = __drain_io( this );
+
+			if(!again) reset--;
+
+			// Update statistics
+			__STATS__( false,
+				io.complete_q.completed_avg.val += count;
+				io.complete_q.completed_avg.cnt += 1;
+			)
 
 			// If we got something, just yield and check again
@@ -308,11 +240,11 @@
 			}
 
-				__STATS__( false,
-					io.complete_q.blocks += 1;
-				)
-				__cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this);
-
-				// block this thread
-				wait( this.sem );
+			__STATS__( false,
+				io.complete_q.blocks += 1;
+			)
+			__cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.fd, &this);
+
+			// block this thread
+			wait( this.sem );
 
 			// restore counter
@@ -320,7 +252,5 @@
 		}
 
-		__cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this);
-
-		__ioctx_unregister( this );
+		__cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.fd, &this);
 	}
 
@@ -345,4 +275,42 @@
 //
 
+	static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor *, __u32 idxs[], __u32 want );
+	static void __ioarbiter_submit  ( $io_arbiter & mutex this, $io_context * , __u32 idxs[], __u32 have );
+	static void __ioarbiter_flush   ( $io_arbiter & mutex this, $io_context * );
+	static inline void __ioarbiter_notify( $io_context & ctx );
+
+	//=============================================================================================
+	// Allocation
+	// for user's convenience fill the sqes from the indexes
+	static inline void __fill(struct io_uring_sqe * out_sqes[], __u32 want, __u32 idxs[], struct $io_context * ctx)  {
+		struct io_uring_sqe * sqes = ctx->sq.sqes;
+		for(i; want) {
+			out_sqes[i] = &sqes[idxs[i]];
+		}
+	}
+
+	// Try to directly allocate from the a given context
+	// Not thread-safe
+	static inline bool __alloc(struct $io_context * ctx, __u32 idxs[], __u32 want) {
+		__sub_ring_t & sq = ctx->sq;
+		const __u32 mask  = *sq.mask;
+		__u32 fhead = sq.free_ring.head;    // get the current head of the queue
+		__u32 ftail = sq.free_ring.tail;    // get the current tail of the queue
+
+		// If we don't have enough sqes, fail
+		if((ftail - fhead) < want) { return false; }
+
+		// copy all the indexes we want from the available list
+		for(i; want) {
+			idxs[i] = sq.free_ring.array[(fhead + i) & mask];
+		}
+
+		// Advance the head to mark the indexes as consumed
+		__atomic_store_n(&sq.free_ring.head, fhead + want, __ATOMIC_RELEASE);
+
+		// return success
+		return true;
+	}
+
 	// Allocate an submit queue entry.
 	// The kernel cannot see these entries until they are submitted, but other threads must be
@@ -350,370 +318,148 @@
 	// for convenience, return both the index and the pointer to the sqe
 	// sqe == &sqes[idx]
-	[* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) {
-		/* paranoid */ verify( data != 0 );
-
-		// Prepare the data we need
-		__attribute((unused)) int len   = 0;
-		__attribute((unused)) int block = 0;
-		__u32 cnt = *ring.submit_q.num;
-		__u32 mask = *ring.submit_q.mask;
-
-		__u32 off = thread_rand();
-
-		// Loop around looking for an available spot
-		for() {
-			// Look through the list starting at some offset
-			for(i; cnt) {
-				__u64 expected = 3;
-				__u32 idx = (i + off) & mask; // Get an index from a random
-				volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
-				volatile __u64 * udata = &sqe->user_data;
-
-				// Allocate the entry by CASing the user_data field from 0 to the future address
-				if( *udata == expected &&
-					__atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
-				{
-					// update statistics
-					__STATS__( false,
-						io.submit_q.alloc_avg.val   += len;
-						io.submit_q.alloc_avg.block += block;
-						io.submit_q.alloc_avg.cnt   += 1;
-					)
-
-					// debug log
-					__cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data );
-
-					// Success return the data
-					return [sqe, idx];
-				}
-				verify(expected != data);
-
-				// This one was used
-				len ++;
-			}
-
-			block++;
-
-			yield();
-		}
-	}
-
-	static inline __u32 __submit_to_ready_array( struct __io_data & ring, __u32 idx, const __u32 mask ) {
-		/* paranoid */ verify( idx <= mask   );
-		/* paranoid */ verify( idx != -1ul32 );
-
-		// We need to find a spot in the ready array
-		__attribute((unused)) int len   = 0;
-		__attribute((unused)) int block = 0;
-		__u32 ready_mask = ring.submit_q.ready_cnt - 1;
-
-		__u32 off = thread_rand();
-
-		__u32 picked;
-		LOOKING: for() {
-			for(i; ring.submit_q.ready_cnt) {
-				picked = (i + off) & ready_mask;
-				__u32 expected = -1ul32;
-				if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
-					break LOOKING;
-				}
-				verify(expected != idx);
-
-				len ++;
-			}
-
-			block++;
-
-			__u32 released = __release_consumed_submission( ring );
-			if( released == 0 ) {
-				yield();
-			}
-		}
-
-		// update statistics
-		__STATS__( false,
-			io.submit_q.look_avg.val   += len;
-			io.submit_q.look_avg.block += block;
-			io.submit_q.look_avg.cnt   += 1;
-		)
-
-		return picked;
-	}
-
-	void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) {
-		__io_data & ring = *ctx->thrd.ring;
-
+	struct $io_context * cfa_io_allocate(struct io_uring_sqe * sqes[], __u32 idxs[], __u32 want) {
+		__cfadbg_print_safe(io, "Kernel I/O : attempting to allocate %u\n", want);
+
+		disable_interrupts();
+		processor * proc = __cfaabi_tls.this_processor;
+		/* paranoid */ verify( __cfaabi_tls.this_processor );
+		/* paranoid */ verify( proc->io.lock == false );
+
+		__atomic_store_n( &proc->io.lock, true, __ATOMIC_SEQ_CST );
+		$io_context * ctx = proc->io.ctx;
+		$io_arbiter * ioarb = proc->cltr->io.arbiter;
+		/* paranoid */ verify( ioarb );
+
+		// Can we proceed to the fast path
+		if(  ctx				// We alreay have an instance?
+		&&  !ctx->revoked )		// Our instance is still valid?
 		{
-			__attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
-			__cfadbg_print_safe( io,
-				"Kernel I/O : submitting %u (%p) for %p\n"
-				"    data: %p\n"
-				"    opcode: %s\n"
-				"    fd: %d\n"
-				"    flags: %d\n"
-				"    prio: %d\n"
-				"    off: %p\n"
-				"    addr: %p\n"
-				"    len: %d\n"
-				"    other flags: %d\n"
-				"    splice fd: %d\n"
-				"    pad[0]: %llu\n"
-				"    pad[1]: %llu\n"
-				"    pad[2]: %llu\n",
-				idx, sqe,
-				active_thread(),
-				(void*)sqe->user_data,
-				opcodes[sqe->opcode],
-				sqe->fd,
-				sqe->flags,
-				sqe->ioprio,
-				(void*)sqe->off,
-				(void*)sqe->addr,
-				sqe->len,
-				sqe->accept_flags,
-				sqe->splice_fd_in,
-				sqe->__pad2[0],
-				sqe->__pad2[1],
-				sqe->__pad2[2]
-			);
-		}
-
-
-		// Get now the data we definetely need
-		volatile __u32 * const tail = ring.submit_q.tail;
-		const __u32 mask  = *ring.submit_q.mask;
-
-		// There are 2 submission schemes, check which one we are using
-		if( ring.poller_submits ) {
-			// If the poller thread submits, then we just need to add this to the ready array
-			__submit_to_ready_array( ring, idx, mask );
-
-			post( ctx->thrd.sem );
-
-			__cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
-		}
-		else if( ring.eager_submits ) {
-			__attribute__((unused)) __u32 picked = __submit_to_ready_array( ring, idx, mask );
-
-			#if defined(LEADER_LOCK)
-				if( !try_lock(ring.submit_q.submit_lock) ) {
-					__STATS__( false,
-						io.submit_q.helped += 1;
-					)
-					return;
-				}
-				/* paranoid */ verify( ! __preemption_enabled() );
-				__STATS__( true,
-					io.submit_q.leader += 1;
-				)
-			#else
-				for() {
-					yield();
-
-					if( try_lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2) ) {
-						__STATS__( false,
-							io.submit_q.leader += 1;
-						)
-						break;
-					}
-
-					// If some one else collected our index, we are done
-					#warning ABA problem
-					if( ring.submit_q.ready[picked] != idx ) {
-						__STATS__( false,
-							io.submit_q.helped += 1;
-						)
-						return;
-					}
-
-					__STATS__( false,
-						io.submit_q.busy += 1;
-					)
-				}
-			#endif
-
-			// We got the lock
-			// Collect the submissions
-			unsigned to_submit = __collect_submitions( ring );
-
-			// Actually submit
-			int ret = __io_uring_enter( ring, to_submit, false );
-
-			#if defined(LEADER_LOCK)
-				/* paranoid */ verify( ! __preemption_enabled() );
-				next(ring.submit_q.submit_lock);
-			#else
-				unlock(ring.submit_q.submit_lock);
-			#endif
-			if( ret < 0 ) {
-				return;
-			}
-
-			// Release the consumed SQEs
-			__release_consumed_submission( ring );
-
-			// update statistics
-			__STATS__( false,
-				io.submit_q.submit_avg.rdy += to_submit;
-				io.submit_q.submit_avg.csm += ret;
-				io.submit_q.submit_avg.cnt += 1;
-			)
-
-			__cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() );
-		}
-		else
+			__cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n");
+
+			// We can proceed to the fast path
+			if( __alloc(ctx, idxs, want) ) {
+				// Allocation was successful
+				// Mark the instance as no longer in-use and re-enable interrupts
+				__atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE );
+				enable_interrupts( __cfaabi_dbg_ctx );
+
+				__cfadbg_print_safe(io, "Kernel I/O : fast allocation successful\n");
+
+				__fill( sqes, want, idxs, ctx );
+				return ctx;
+			}
+			// The fast path failed, fallback
+		}
+
+		// Fast path failed, fallback on arbitration
+		__atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE );
+		enable_interrupts( __cfaabi_dbg_ctx );
+
+		__cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n");
+
+		struct $io_context * ret = __ioarbiter_allocate(*ioarb, proc, idxs, want);
+
+		__cfadbg_print_safe(io, "Kernel I/O : slow allocation completed\n");
+
+		__fill( sqes, want, idxs,ret );
+		return ret;
+	}
+
+
+	//=============================================================================================
+	// submission
+	static inline void __submit( struct $io_context * ctx, __u32 idxs[], __u32 have) {
+		// We can proceed to the fast path
+		// Get the right objects
+		__sub_ring_t & sq = ctx->sq;
+		const __u32 mask  = *sq.mask;
+		__u32 tail = sq.kring.ready;
+
+		// Add the sqes to the array
+		for( i; have ) {
+			sq.kring.array[ (tail + i) & mask ] = idxs[i];
+		}
+
+		// Make the sqes visible to the submitter
+		__atomic_store_n(&sq.kring.ready, tail + have, __ATOMIC_RELEASE);
+
+		// Make sure the poller is awake
+		__cfadbg_print_safe(io, "Kernel I/O : waking the poller\n");
+		post( ctx->sem );
+	}
+
+	void cfa_io_submit( struct $io_context * inctx, __u32 idxs[], __u32 have ) __attribute__((nonnull (1))) {
+		__cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u\n", have);
+
+		disable_interrupts();
+		processor * proc = __cfaabi_tls.this_processor;
+		/* paranoid */ verify( __cfaabi_tls.this_processor );
+		/* paranoid */ verify( proc->io.lock == false );
+
+		__atomic_store_n( &proc->io.lock, true, __ATOMIC_SEQ_CST );
+		$io_context * ctx = proc->io.ctx;
+
+		// Can we proceed to the fast path
+		if(  ctx				// We alreay have an instance?
+		&&  !ctx->revoked		// Our instance is still valid?
+		&&   ctx == inctx )		// We have the right instance?
 		{
-			// get mutual exclusion
-			#if defined(LEADER_LOCK)
-				while(!try_lock(ring.submit_q.submit_lock));
-			#else
-				lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2);
-			#endif
-
-			/* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64,
-			/* paranoid */ 	"index %u already reclaimed\n"
-			/* paranoid */ 	"head %u, prev %u, tail %u\n"
-			/* paranoid */ 	"[-0: %u,-1: %u,-2: %u,-3: %u]\n",
-			/* paranoid */ 	idx,
-			/* paranoid */ 	*ring.submit_q.head, ring.submit_q.prev_head, *tail
-			/* paranoid */ 	,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
-			/* paranoid */ 	,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
-			/* paranoid */ 	,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
-			/* paranoid */ 	,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
-			/* paranoid */ );
-
-			// Append to the list of ready entries
-
-			/* paranoid */ verify( idx <= mask );
-			ring.submit_q.array[ (*tail) & mask ] = idx;
-			__atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
-
-			// Submit however, many entries need to be submitted
-			int ret = __io_uring_enter( ring, 1, false );
-			if( ret < 0 ) {
-				switch((int)errno) {
-				default:
-					abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
-				}
-			}
-
-			/* paranoid */ verify(ret == 1);
-
-			// update statistics
-			__STATS__( false,
-				io.submit_q.submit_avg.csm += 1;
-				io.submit_q.submit_avg.cnt += 1;
-			)
-
-			{
-				__attribute__((unused)) volatile __u32 * const head = ring.submit_q.head;
-				__attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ];
-				__attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx];
-
-				__cfadbg_print_safe( io,
-					"Kernel I/O : last submitted is %u (%p)\n"
-					"    data: %p\n"
-					"    opcode: %s\n"
-					"    fd: %d\n"
-					"    flags: %d\n"
-					"    prio: %d\n"
-					"    off: %p\n"
-					"    addr: %p\n"
-					"    len: %d\n"
-					"    other flags: %d\n"
-					"    splice fd: %d\n"
-					"    pad[0]: %llu\n"
-					"    pad[1]: %llu\n"
-					"    pad[2]: %llu\n",
-					last_idx, sqe,
-					(void*)sqe->user_data,
-					opcodes[sqe->opcode],
-					sqe->fd,
-					sqe->flags,
-					sqe->ioprio,
-					(void*)sqe->off,
-					(void*)sqe->addr,
-					sqe->len,
-					sqe->accept_flags,
-					sqe->splice_fd_in,
-					sqe->__pad2[0],
-					sqe->__pad2[1],
-					sqe->__pad2[2]
-				);
-			}
-
-			__atomic_thread_fence( __ATOMIC_SEQ_CST );
-			// Release the consumed SQEs
-
-			__release_consumed_submission( ring );
-			// ring.submit_q.sqes[idx].user_data = 3ul64;
-
-			#if defined(LEADER_LOCK)
-				next(ring.submit_q.submit_lock);
-			#else
-				unlock(ring.submit_q.submit_lock);
-			#endif
-
-			__cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() );
-		}
-	}
-
-	// #define PARTIAL_SUBMIT 32
-
-	// go through the list of submissions in the ready array and moved them into
-	// the ring's submit queue
-	static unsigned __collect_submitions( struct __io_data & ring ) {
-		/* paranoid */ verify( ring.submit_q.ready != 0p );
-		/* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
-
-		unsigned to_submit = 0;
-		__u32 tail = *ring.submit_q.tail;
-		const __u32 mask = *ring.submit_q.mask;
-		#if defined(PARTIAL_SUBMIT)
-			#if defined(LEADER_LOCK)
-				#error PARTIAL_SUBMIT and LEADER_LOCK cannot co-exist
-			#endif
-			const __u32 cnt = ring.submit_q.ready_cnt > PARTIAL_SUBMIT ? PARTIAL_SUBMIT : ring.submit_q.ready_cnt;
-			const __u32 offset = ring.submit_q.prev_ready;
-			ring.submit_q.prev_ready += cnt;
-		#else
-			const __u32 cnt = ring.submit_q.ready_cnt;
-			const __u32 offset = 0;
-		#endif
-
-		// Go through the list of ready submissions
-		for( c; cnt ) {
-			__u32 i = (offset + c) % ring.submit_q.ready_cnt;
-
-			// replace any submission with the sentinel, to consume it.
-			__u32 idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
-
-			// If it was already the sentinel, then we are done
-			if( idx == -1ul32 ) continue;
-
-			// If we got a real submission, append it to the list
-			ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
-			to_submit++;
-		}
-
-		// Increment the tail based on how many we are ready to submit
-		__atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
-
-		return to_submit;
-	}
+			__submit(ctx, idxs, have);
+
+			// Mark the instance as no longer in-use, re-enable interrupts and return
+			__atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE );
+			enable_interrupts( __cfaabi_dbg_ctx );
+
+			__cfadbg_print_safe(io, "Kernel I/O : submitted on fast path\n");
+			return;
+		}
+
+		// Fast path failed, fallback on arbitration
+		__atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE );
+		enable_interrupts( __cfaabi_dbg_ctx );
+
+		__cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n");
+
+		__ioarbiter_submit(*inctx->arbiter, inctx, idxs, have);
+	}
+
+	//=============================================================================================
+	// Flushing
+	static unsigned __flush( struct $io_context & ctx ) {
+		// First check for external
+		if( !__atomic_load_n(&ctx.ext_sq.empty, __ATOMIC_SEQ_CST) ) {
+			// We have external submissions, delegate to the arbiter
+			__ioarbiter_flush( *ctx.arbiter, &ctx );
+		}
+
+		__u32 tail  = *ctx.sq.kring.tail;
+		__u32 ready = ctx.sq.kring.ready;
+
+		/* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
+		ctx.sq.to_submit += (ready - tail);
+		/* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
+
+		if(ctx.sq.to_submit) {
+			__cfadbg_print_safe(io, "Kernel I/O : %u ready to submit\n", ctx.sq.to_submit);
+		}
+
+		__atomic_store_n(ctx.sq.kring.tail, ready, __ATOMIC_RELEASE);
+
+		return ctx.sq.to_submit;
+	}
+
 
 	// Go through the ring's submit queue and release everything that has already been consumed
 	// by io_uring
-	static __u32 __release_consumed_submission( struct __io_data & ring ) {
-		const __u32 smask = *ring.submit_q.mask;
-
-		// We need to get the lock to copy the old head and new head
-		if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
+	// This cannot be done by multiple threads
+	static __u32 __release_sqes( struct $io_context & ctx ) {
+		const __u32 mask = *ctx.sq.mask;
+
 		__attribute__((unused))
-		__u32 ctail = *ring.submit_q.tail;        // get the current tail of the queue
-		__u32 chead = *ring.submit_q.head;		// get the current head of the queue
-		__u32 phead = ring.submit_q.prev_head;	// get the head the last time we were here
-		ring.submit_q.prev_head = chead;		// note up to were we processed
-		unlock(ring.submit_q.release_lock);
+		__u32 ctail = *ctx.sq.kring.tail;    // get the current tail of the queue
+		__u32 chead = *ctx.sq.kring.head;	 // get the current head of the queue
+		__u32 phead = ctx.sq.kring.released; // get the head the last time we were here
+
+		__u32 ftail = ctx.sq.free_ring.tail;  // get the current tail of the queue
 
 		// the 3 fields are organized like this diagram
@@ -734,28 +480,184 @@
 		__u32 count = chead - phead;
 
+		if(count == 0) {
+			return 0;
+		}
+
 		// We acquired an previous-head/current-head range
 		// go through the range and release the sqes
 		for( i; count ) {
-			__u32 idx = ring.submit_q.array[ (phead + i) & smask ];
-
-			/* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data );
-			__clean( &ring.submit_q.sqes[ idx ] );
-		}
+			__u32 idx = ctx.sq.kring.array[ (phead + i) & mask ];
+			ctx.sq.free_ring.array[ (ftail + i) & mask ] = idx;
+		}
+
+		ctx.sq.kring.released = chead;		// note up to were we processed
+		__atomic_store_n(&ctx.sq.free_ring.tail, ftail + count, __ATOMIC_SEQ_CST);
+
+		__ioarbiter_notify(ctx);
+
 		return count;
 	}
 
-	void __sqe_clean( volatile struct io_uring_sqe * sqe ) {
-		__clean( sqe );
-	}
-
-	static inline void __clean( volatile struct io_uring_sqe * sqe ) {
-		// If we are in debug mode, thrash the fields to make sure we catch reclamation errors
-		__cfaabi_dbg_debug_do(
-			memset(sqe, 0xde, sizeof(*sqe));
-			sqe->opcode = (sizeof(opcodes) / sizeof(const char *)) - 1;
-		);
-
-		// Mark the entry as unused
-		__atomic_store_n(&sqe->user_data, 3ul64, __ATOMIC_SEQ_CST);
+//=============================================================================================
+// I/O Arbiter
+//=============================================================================================
+	static inline void __revoke( $io_arbiter & this, $io_context * ctx ) {
+		if(ctx->revoked) return;
+
+		remove( this.assigned, *ctx );
+
+		// Mark as revoked
+		__atomic_store_n(&ctx->revoked, true, __ATOMIC_SEQ_CST);
+
+		// Wait for the processor to no longer use it
+		while(ctx->proc->io.lock) Pause();
+
+		// Remove the coupling with the processor
+		ctx->proc->io.ctx = 0p;
+		ctx->proc = 0p;
+
+		// add to available contexts
+		addHead( this.available, *ctx );
+	}
+
+	static inline void __assign( $io_arbiter & this, $io_context * ctx, processor * proc ) {
+		remove( this.available, *ctx );
+
+		ctx->revoked = false;
+		ctx->proc = proc;
+		__atomic_store_n(&proc->io.ctx, ctx, __ATOMIC_SEQ_CST);
+
+		// add to assigned contexts
+		addTail( this.assigned, *ctx );
+	}
+
+	static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor * proc, __u32 idxs[], __u32 want ) {
+		__cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n");
+
+		SeqIter($io_context) iter;
+		$io_context & ci;
+		// Do we already have something available?
+		for( over( iter, this.available ); iter | ci;) {
+			__cfadbg_print_safe(io, "Kernel I/O : attempting available context\n");
+
+			$io_context * c = &ci;
+			if(__alloc(c, idxs, want)) {
+				__assign( this, c, proc);
+				return c;
+			}
+		}
+
+
+		// Otherwise, we have no choice but to revoke everyone to check if other instance have available data
+		for( over( iter, this.assigned ); iter | ci; ) {
+			__cfadbg_print_safe(io, "Kernel I/O : revoking context for allocation\n");
+
+			$io_context * c = &ci;
+			__revoke( this, c );
+
+			if(__alloc(c, idxs, want)) {
+				__assign( this, c, proc);
+				return c;
+			}
+		}
+
+		__cfadbg_print_safe(io, "Kernel I/O : waiting for available resources\n");
+
+		// No one has any resources left, wait for something to finish
+		// Mark as pending
+		__atomic_store_n( &this.pending.flag, true, __ATOMIC_SEQ_CST );
+
+		// Wait for our turn to submit
+		wait( this.pending.blocked, want );
+
+		__attribute((unused)) bool ret =
+		__alloc( this.pending.ctx, idxs, want);
+		/* paranoid */ verify( ret );
+
+		__assign( this, this.pending.ctx, proc);
+		return this.pending.ctx;
+	}
+
+	static void __ioarbiter_notify( $io_arbiter & mutex this, $io_context * ctx ) {
+		/* paranoid */ verify( !is_empty(this.pending.blocked) );
+		this.pending.ctx = ctx;
+
+		while( !is_empty(this.pending.blocked) ) {
+			__u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head;
+			__u32 want = front( this.pending.blocked );
+
+			if( have > want ) return;
+
+			signal_block( this.pending.blocked );
+		}
+
+		this.pending.flag = false;
+	}
+
+	static void __ioarbiter_notify( $io_context & ctx ) {
+		if(__atomic_load_n( &ctx.arbiter->pending.flag, __ATOMIC_SEQ_CST)) {
+			__ioarbiter_notify( *ctx.arbiter, &ctx );
+		}
+	}
+
+	// Simply append to the pending
+	static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * ctx, __u32 idxs[], __u32 have ) {
+		__cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd);
+
+		/* paranoid */ verify( &this == ctx->arbiter );
+
+		// Mark as pending
+		__atomic_store_n( &ctx->ext_sq.empty, false, __ATOMIC_SEQ_CST );
+
+		// Wake-up the poller
+		post( ctx->sem );
+
+		__cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have);
+
+		// Wait for our turn to submit
+		wait( ctx->ext_sq.blocked );
+
+		// Submit our indexes
+		__submit(ctx, idxs, have);
+
+		__cfadbg_print_safe(io, "Kernel I/O : %u submitted from arbiter\n", have);
+	}
+
+	static void __ioarbiter_flush( $io_arbiter & mutex this, $io_context * ctx ) {
+		/* paranoid */ verify( &this == ctx->arbiter );
+
+		__revoke( this, ctx );
+
+		__cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n");
+
+		condition & blcked = ctx->ext_sq.blocked;
+		/* paranoid */ verify( ctx->ext_sq.empty == is_empty( blcked ) );
+		while(!is_empty( blcked )) {
+			signal_block( blcked );
+		}
+
+		ctx->ext_sq.empty = true;
+	}
+
+	void __ioarbiter_register( $io_arbiter & mutex this, $io_context & ctx ) {
+		__cfadbg_print_safe(io, "Kernel I/O : registering new context\n");
+
+		ctx.arbiter = &this;
+
+		// add to available contexts
+		addHead( this.available, ctx );
+
+		// Check if this solves pending allocations
+		if(this.pending.flag) {
+			__ioarbiter_notify( ctx );
+		}
+	}
+
+	void __ioarbiter_unregister( $io_arbiter & mutex this, $io_context & ctx ) {
+		/* paranoid */ verify( &this == ctx.arbiter );
+
+		__revoke( this, &ctx );
+
+		remove( this.available, ctx );
 	}
 #endif
Index: libcfa/src/concurrency/io/call.cfa.in
===================================================================
--- libcfa/src/concurrency/io/call.cfa.in	(revision b44959f37442409070ab8f43baa6432ba2c8f812)
+++ libcfa/src/concurrency/io/call.cfa.in	(revision 78da4abe22e69798ee3738623dc9a2d99e34d207)
@@ -54,15 +54,15 @@
 			| IOSQE_IO_DRAIN
 		#endif
+		#if defined(CFA_HAVE_IOSQE_IO_LINK)
+			| IOSQE_IO_LINK
+		#endif
+		#if defined(CFA_HAVE_IOSQE_IO_HARDLINK)
+			| IOSQE_IO_HARDLINK
+		#endif
 		#if defined(CFA_HAVE_IOSQE_ASYNC)
 			| IOSQE_ASYNC
 		#endif
-	;
-
-	static const __u32 LINK_FLAGS = 0
-		#if defined(CFA_HAVE_IOSQE_IO_LINK)
-			| IOSQE_IO_LINK
-		#endif
-		#if defined(CFA_HAVE_IOSQE_IO_HARDLINK)
-			| IOSQE_IO_HARDLINK
+		#if defined(CFA_HAVE_IOSQE_BUFFER_SELECTED)
+			| IOSQE_BUFFER_SELECTED
 		#endif
 	;
@@ -74,16 +74,6 @@
 	;
 
-	extern [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data );
-	extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1)));
-
-	static inline io_context * __get_io_context( void ) {
-		cluster * cltr = active_cluster();
-
-		/* paranoid */ verifyf( cltr, "No active cluster for io operation\\n");
-		assertf( cltr->io.cnt > 0, "Cluster %p has no default io contexts and no context was specified\\n", cltr );
-
-		/* paranoid */ verifyf( cltr->io.ctxs, "default io contexts for cluster %p are missing\\n", cltr);
-		return &cltr->io.ctxs[ thread_rand() % cltr->io.cnt ];
-	}
+	extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want)  __attribute__((nonnull (1,2)));
+	extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have ) __attribute__((nonnull (1,2)));
 #endif
 
@@ -195,5 +185,5 @@
 		return ', '.join(args_a)
 
-AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context) {{
+AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, int submit_flags) {{
 	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_{op})
 		ssize_t res = {name}({args});
@@ -205,24 +195,11 @@
 		}}
 	#else
-		// we don't support LINK yet
-		if( 0 != (submit_flags & LINK_FLAGS) ) {{
-			errno = ENOTSUP; return -1;
-		}}
-
-		if( !context ) {{
-			context = __get_io_context();
-		}}
-		if(cancellation) {{
-			cancellation->target = (__u64)(uintptr_t)&future;
-		}}
-
 		__u8 sflags = REGULAR_FLAGS & submit_flags;
-		struct __io_data & ring = *context->thrd.ring;
-
 		__u32 idx;
 		struct io_uring_sqe * sqe;
-		[(volatile struct io_uring_sqe *) sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
+		struct $io_context * ctx = cfa_io_allocate( &sqe, &idx, 1 );
 
 		sqe->opcode = IORING_OP_{op};
+		sqe->user_data = (__u64)(uintptr_t)&future;
 		sqe->flags = sflags;
 		sqe->ioprio = 0;
@@ -239,16 +216,12 @@
 
 		verify( sqe->user_data == (__u64)(uintptr_t)&future );
-		__submit( context, idx );
+		cfa_io_submit( ctx, &idx, 1 );
 	#endif
 }}"""
 
-SyncTemplate = """{ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {{
-	if( timeout >= 0 ) {{
-		errno = ENOTSUP;
-		return -1;
-	}}
+SyncTemplate = """{ret} cfa_{name}({params}, int submit_flags) {{
 	io_future_t future;
 
-	async_{name}( future, {args}, submit_flags, cancellation, context );
+	async_{name}( future, {args}, submit_flags );
 
 	wait( future );
@@ -415,8 +388,8 @@
 	if c.define:
 		print("""#if defined({define})
-	{ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
+	{ret} cfa_{name}({params}, int submit_flags);
 #endif""".format(define=c.define,ret=c.ret, name=c.name, params=c.params))
 	else:
-		print("{ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);"
+		print("{ret} cfa_{name}({params}, int submit_flags);"
 		.format(ret=c.ret, name=c.name, params=c.params))
 
@@ -426,8 +399,8 @@
 	if c.define:
 		print("""#if defined({define})
-	void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context);
+	void async_{name}(io_future_t & future, {params}, int submit_flags);
 #endif""".format(define=c.define,name=c.name, params=c.params))
 	else:
-		print("void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context);"
+		print("void async_{name}(io_future_t & future, {params}, int submit_flags);"
 		.format(name=c.name, params=c.params))
 print("\n")
@@ -474,37 +447,4 @@
 
 print("""
-//-----------------------------------------------------------------------------
-bool cancel(io_cancellation & this) {
-	#if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL)
-		return false;
-	#else
-		io_future_t future;
-
-		io_context * context = __get_io_context();
-
-		__u8 sflags = 0;
-		struct __io_data & ring = *context->thrd.ring;
-
-		__u32 idx;
-		volatile struct io_uring_sqe * sqe;
-		[sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
-
-		sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
-		sqe->opcode = IORING_OP_ASYNC_CANCEL;
-		sqe->flags = sflags;
-		sqe->addr = this.target;
-
-		verify( sqe->user_data == (__u64)(uintptr_t)&future );
-		__submit( context, idx );
-
-		wait(future);
-
-		if( future.result == 0 ) return true; // Entry found
-		if( future.result == -EALREADY) return true; // Entry found but in progress
-		if( future.result == -ENOENT ) return false; // Entry not found
-		return false;
-	#endif
-}
-
 //-----------------------------------------------------------------------------
 // Check if a function is has asynchronous
Index: libcfa/src/concurrency/io/setup.cfa
===================================================================
--- libcfa/src/concurrency/io/setup.cfa	(revision b44959f37442409070ab8f43baa6432ba2c8f812)
+++ libcfa/src/concurrency/io/setup.cfa	(revision 78da4abe22e69798ee3738623dc9a2d99e34d207)
@@ -36,12 +36,9 @@
 	void ?{}(io_context_params & this) {}
 
-	void ?{}(io_context & this, struct cluster & cl) {}
-	void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {}
-
-	void ^?{}(io_context & this) {}
-	void ^?{}(io_context & this, bool cluster_context) {}
-
-	void register_fixed_files( io_context &, int *, unsigned ) {}
-	void register_fixed_files( cluster    &, int *, unsigned ) {}
+	void  ?{}($io_context & this, struct cluster & cl) {}
+	void ^?{}($io_context & this) {}
+
+	$io_arbiter * create(void) { return 0p; }
+	void destroy($io_arbiter *) {}
 
 #else
@@ -68,10 +65,4 @@
 	void ?{}(io_context_params & this) {
 		this.num_entries = 256;
-		this.num_ready = 256;
-		this.submit_aff = -1;
-		this.eager_submits = false;
-		this.poller_submits = false;
-		this.poll_submit = false;
-		this.poll_complete = false;
 	}
 
@@ -194,7 +185,7 @@
 
 			for(i; nfds) {
-				$io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64;
+				$io_context * io_ctx = ($io_context *)(uintptr_t)events[i].data.u64;
 				/* paranoid */ verify( io_ctx );
-				__cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->ring->fd, io_ctx);
+				__cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->fd, io_ctx);
 				#if !defined( __CFA_NO_STATISTICS__ )
 					__cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;
@@ -202,5 +193,5 @@
 
 				eventfd_t v;
-				eventfd_read(io_ctx->ring->efd, &v);
+				eventfd_read(io_ctx->efd, &v);
 
 				post( io_ctx->sem );
@@ -219,109 +210,51 @@
 //=============================================================================================
 
-	void ?{}($io_ctx_thread & this, struct cluster & cl) { (this.self){ "IO Poller", cl }; }
-	void main( $io_ctx_thread & this );
-	static inline $thread * get_thread( $io_ctx_thread & this ) { return &this.self; }
-	void ^?{}( $io_ctx_thread & mutex this ) {}
-
-	static void __io_create ( __io_data & this, const io_context_params & params_in );
-	static void __io_destroy( __io_data & this );
-
-	void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {
-		(this.thrd){ cl };
-		this.thrd.ring = malloc();
-		__cfadbg_print_safe(io_core, "Kernel I/O : Creating ring for io_context %p\n", &this);
-		__io_create( *this.thrd.ring, params );
-
-		__cfadbg_print_safe(io_core, "Kernel I/O : Starting poller thread for io_context %p\n", &this);
-		this.thrd.done = false;
-		__thrd_start( this.thrd, main );
-
-		__cfadbg_print_safe(io_core, "Kernel I/O : io_context %p ready\n", &this);
+	static void __io_uring_setup ( $io_context & this, const io_context_params & params_in );
+	static void __io_uring_teardown( $io_context & this );
+	static void __epoll_register($io_context & ctx);
+	static void __epoll_unregister($io_context & ctx);
+	void __ioarbiter_register( $io_arbiter & mutex, $io_context & ctx );
+	void __ioarbiter_unregister( $io_arbiter & mutex, $io_context & ctx );
+
+	void ?{}($io_context & this, struct cluster & cl) {
+		(this.self){ "IO Poller", cl };
+		this.ext_sq.empty = true;
+		__io_uring_setup( this, cl.io.params );
+		__cfadbg_print_safe(io_core, "Kernel I/O : Created ring for io_context %u (%p)\n", this.fd, &this);
+
+		__epoll_register(this);
+
+		__ioarbiter_register(*cl.io.arbiter, this);
+
+		__thrd_start( this, main );
+		__cfadbg_print_safe(io_core, "Kernel I/O : Started poller thread for io_context %u\n", this.fd);
+	}
+
+	void ^?{}($io_context & mutex this) {
+		__cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %u\n", this.fd);
+
+		^(this.self){};
+		__cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %u\n", this.fd);
+
+		__ioarbiter_unregister(*this.arbiter, this);
+
+		__epoll_unregister(this);
+
+		__io_uring_teardown( this );
+		__cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %u\n", this.fd);
 	}
 
 	void ?{}(io_context & this, struct cluster & cl) {
-		io_context_params params;
-		(this){ cl, params };
-	}
-
-	void ^?{}(io_context & this, bool cluster_context) {
-		__cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %p\n", &this);
-
-		// Notify the thread of the shutdown
-		__atomic_store_n(&this.thrd.done, true, __ATOMIC_SEQ_CST);
-
-		// If this is an io_context within a cluster, things get trickier
-		$thread & thrd = this.thrd.self;
-		if( cluster_context ) {
-			// We are about to do weird things with the threads
-			// we don't need interrupts to complicate everything
-			disable_interrupts();
-
-			// Get cluster info
-			cluster & cltr = *thrd.curr_cluster;
-			/* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster );
-			/* paranoid */ verify( !ready_mutate_islocked() );
-
-			// We need to adjust the clean-up based on where the thread is
-			if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
-				// This is the tricky case
-				// The thread was preempted or ready to run and now it is on the ready queue
-				// but the cluster is shutting down, so there aren't any processors to run the ready queue
-				// the solution is to steal the thread from the ready-queue and pretend it was blocked all along
-
-				ready_schedule_lock();
-					// The thread should on the list
-					/* paranoid */ verify( thrd.link.next != 0p );
-
-					// Remove the thread from the ready queue of this cluster
-					// The thread should be the last on the list
-					__attribute__((unused)) bool removed = remove_head( &cltr, &thrd );
-					/* paranoid */ verify( removed );
-					thrd.link.next = 0p;
-					thrd.link.prev = 0p;
-
-					// Fixup the thread state
-					thrd.state = Blocked;
-					thrd.ticket = TICKET_BLOCKED;
-					thrd.preempted = __NO_PREEMPTION;
-
-				ready_schedule_unlock();
-
-				// Pretend like the thread was blocked all along
-			}
-			// !!! This is not an else if !!!
-			// Ok, now the thread is blocked (whether we cheated to get here or not)
-			if( thrd.state == Blocked ) {
-				// This is the "easy case"
-				// The thread is parked and can easily be moved to active cluster
-				verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
-				thrd.curr_cluster = active_cluster();
-
-				// unpark the fast io_poller
-				unpark( &thrd );
-			}
-			else {
-				// The thread is in a weird state
-				// I don't know what to do here
-				abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n");
-			}
-
-			// The weird thread kidnapping stuff is over, restore interrupts.
-			enable_interrupts( __cfaabi_dbg_ctx );
-		} else {
-			post( this.thrd.sem );
-		}
-
-		^(this.thrd){};
-		__cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %p\n", &this);
-
-		__io_destroy( *this.thrd.ring );
-		__cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %p\n", &this);
-
-		free(this.thrd.ring);
+		// this.ctx = new(cl);
+		this.ctx = alloc();
+		(*this.ctx){ cl };
+
+		__cfadbg_print_safe(io_core, "Kernel I/O : io_context %u ready\n", this.ctx->fd);
 	}
 
 	void ^?{}(io_context & this) {
-		^(this){ false };
+		post( this.ctx->sem );
+
+		delete(this.ctx);
 	}
 
@@ -329,10 +262,10 @@
 	extern void __enable_interrupts_hard();
 
-	static void __io_create( __io_data & this, const io_context_params & params_in ) {
+	static void __io_uring_setup( $io_context & this, const io_context_params & params_in ) {
 		// Step 1 : call to setup
 		struct io_uring_params params;
 		memset(&params, 0, sizeof(params));
-		if( params_in.poll_submit   ) params.flags |= IORING_SETUP_SQPOLL;
-		if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL;
+		// if( params_in.poll_submit   ) params.flags |= IORING_SETUP_SQPOLL;
+		// if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL;
 
 		__u32 nentries = params_in.num_entries != 0 ? params_in.num_entries : 256;
@@ -340,7 +273,4 @@
 			abort("ERROR: I/O setup 'num_entries' must be a power of 2\n");
 		}
-		if( params_in.poller_submits && params_in.eager_submits ) {
-			abort("ERROR: I/O setup 'poller_submits' and 'eager_submits' cannot be used together\n");
-		}
 
 		int fd = syscall(__NR_io_uring_setup, nentries, &params );
@@ -350,7 +280,6 @@
 
 		// Step 2 : mmap result
-		memset( &this, 0, sizeof(struct __io_data) );
-		struct __submition_data  & sq = this.submit_q;
-		struct __completion_data & cq = this.completion_q;
+		struct __sub_ring_t & sq = this.sq;
+		struct __cmp_ring_t & cq = this.cq;
 
 		// calculate the right ring size
@@ -401,37 +330,23 @@
 		// Get the pointers from the kernel to fill the structure
 		// submit queue
-		sq.head    = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
-		sq.tail    = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
-		sq.mask    = (   const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
-		sq.num     = (   const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
-		sq.flags   = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
-		sq.dropped = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
-		sq.array   = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
-		sq.prev_head = *sq.head;
-
-		{
-			const __u32 num = *sq.num;
-			for( i; num ) {
-				__sqe_clean( &sq.sqes[i] );
-			}
-		}
-
-		(sq.submit_lock){};
-		(sq.release_lock){};
-
-		if( params_in.poller_submits || params_in.eager_submits ) {
-			/* paranoid */ verify( is_pow2( params_in.num_ready ) || (params_in.num_ready < 8) );
-			sq.ready_cnt = max( params_in.num_ready, 8 );
-			sq.ready = alloc( sq.ready_cnt, 64`align );
-			for(i; sq.ready_cnt) {
-				sq.ready[i] = -1ul32;
-			}
-			sq.prev_ready = 0;
-		}
-		else {
-			sq.ready_cnt = 0;
-			sq.ready = 0p;
-			sq.prev_ready = 0;
-		}
+		sq.kring.head  = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
+		sq.kring.tail  = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
+		sq.kring.array = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
+		sq.mask        = (   const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
+		sq.num         = (   const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
+		sq.flags       = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
+		sq.dropped     = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
+
+		sq.kring.ready = 0;
+		sq.kring.released = 0;
+
+		sq.free_ring.head = 0;
+		sq.free_ring.tail = *sq.num;
+		sq.free_ring.array = alloc( *sq.num, 128`align );
+		for(i; (__u32)*sq.num) {
+			sq.free_ring.array[i] = i;
+		}
+
+		sq.to_submit = 0;
 
 		// completion queue
@@ -468,19 +383,17 @@
 		/* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
 		/* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
-		/* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
-		/* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
+		/* paranoid */ verifyf( (*sq.kring.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.kring.head );
+		/* paranoid */ verifyf( (*sq.kring.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.kring.tail );
 
 		// Update the global ring info
-		this.ring_flags = params.flags;
+		this.ring_flags = 0;
 		this.fd         = fd;
 		this.efd        = efd;
-		this.eager_submits  = params_in.eager_submits;
-		this.poller_submits = params_in.poller_submits;
-	}
-
-	static void __io_destroy( __io_data & this ) {
+	}
+
+	static void __io_uring_teardown( $io_context & this ) {
 		// Shutdown the io rings
-		struct __submition_data  & sq = this.submit_q;
-		struct __completion_data & cq = this.completion_q;
+		struct __sub_ring_t & sq = this.sq;
+		struct __cmp_ring_t & cq = this.cq;
 
 		// unmap the submit queue entries
@@ -499,5 +412,5 @@
 		close(this.efd);
 
-		free( this.submit_q.ready ); // Maybe null, doesn't matter
+		free( this.sq.free_ring.array ); // Maybe null, doesn't matter
 	}
 
@@ -505,9 +418,9 @@
 // I/O Context Sleep
 //=============================================================================================
-	static inline void __ioctx_epoll_ctl($io_ctx_thread & ctx, int op, const char * error) {
+	static inline void __epoll_ctl($io_context & ctx, int op, const char * error) {
 		struct epoll_event ev;
 		ev.events = EPOLLIN | EPOLLONESHOT;
 		ev.data.u64 = (__u64)&ctx;
-		int ret = epoll_ctl(iopoll.epollfd, op, ctx.ring->efd, &ev);
+		int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev);
 		if (ret < 0) {
 			abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
@@ -515,19 +428,14 @@
 	}
 
-	void __ioctx_register($io_ctx_thread & ctx) {
-		__ioctx_epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
-	}
-
-	void __ioctx_prepare_block($io_ctx_thread & ctx) {
-		__cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.ring->fd, &ctx);
-		__ioctx_epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
-	}
-
-	void __ioctx_unregister($io_ctx_thread & ctx) {
+	static void __epoll_register($io_context & ctx) {
+		__epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
+	}
+
+	static void __epoll_unregister($io_context & ctx) {
 		// Read the current epoch so we know when to stop
 		size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST);
 
 		// Remove the fd from the iopoller
-		__ioctx_epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");
+		__epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");
 
 		// Notify the io poller thread of the shutdown
@@ -543,20 +451,33 @@
 	}
 
+	void __ioctx_prepare_block($io_context & ctx) {
+		__cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx);
+		__epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
+	}
+
+
 //=============================================================================================
 // I/O Context Misc Setup
 //=============================================================================================
-	void register_fixed_files( io_context & ctx, int * files, unsigned count ) {
-		int ret = syscall( __NR_io_uring_register, ctx.thrd.ring->fd, IORING_REGISTER_FILES, files, count );
-		if( ret < 0 ) {
-			abort( "KERNEL ERROR: IO_URING REGISTER - (%d) %s\n", (int)errno, strerror(errno) );
-		}
-
-		__cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
-	}
-
-	void register_fixed_files( cluster & cltr, int * files, unsigned count ) {
-		for(i; cltr.io.cnt) {
-			register_fixed_files( cltr.io.ctxs[i], files, count );
-		}
-	}
+	void ?{}( $io_arbiter & this ) {
+		this.pending.flag = false;
+	}
+
+	void ^?{}( $io_arbiter & mutex this ) {
+		/* paranoid */ verify( empty(this.assigned) );
+		/* paranoid */ verify( empty(this.available) );
+		/* paranoid */ verify( is_empty(this.pending.blocked) );
+	}
+
+	$io_arbiter * create(void) {
+		return new();
+	}
+	void destroy($io_arbiter * arbiter) {
+		delete(arbiter);
+	}
+
+//=============================================================================================
+// I/O Context Misc Setup
+//=============================================================================================
+
 #endif
Index: libcfa/src/concurrency/io/types.hfa
===================================================================
--- libcfa/src/concurrency/io/types.hfa	(revision b44959f37442409070ab8f43baa6432ba2c8f812)
+++ libcfa/src/concurrency/io/types.hfa	(revision 78da4abe22e69798ee3738623dc9a2d99e34d207)
@@ -25,22 +25,35 @@
 
 #if defined(CFA_HAVE_LINUX_IO_URING_H)
-	#define LEADER_LOCK
-	struct __leaderlock_t {
-		struct $thread * volatile value;	// ($thread) next_leader | (bool:1) is_locked
-	};
+	#include "bits/sequence.hfa"
+	#include "monitor.hfa"
 
-	static inline void ?{}( __leaderlock_t & this ) { this.value = 0p; }
+	struct processor;
+	monitor $io_arbiter;
 
 	//-----------------------------------------------------------------------
 	// Ring Data structure
-      struct __submition_data {
-		// Head and tail of the ring (associated with array)
-		volatile __u32 * head;
-		volatile __u32 * tail;
-		volatile __u32 prev_head;
+      struct __sub_ring_t {
+		struct {
+			// Head and tail of the ring (associated with array)
+			volatile __u32 * head;	 // one passed last index consumed by the kernel
+			volatile __u32 * tail;   // one passed last index visible to the kernel
+			volatile __u32 ready;    // one passed last index added to array ()
+			volatile __u32 released; // one passed last index released back to the free list
 
-		// The actual kernel ring which uses head/tail
-		// indexes into the sqes arrays
-		__u32 * array;
+			// The actual kernel ring which uses head/tail
+			// indexes into the sqes arrays
+			__u32 * array;
+		} kring;
+
+		struct {
+			volatile __u32 head;
+			volatile __u32 tail;
+			// The ring which contains free allocations
+			// indexes into the sqes arrays
+			__u32 * array;
+		} free_ring;
+
+		// number of sqes to submit on next system call.
+		__u32 to_submit;
 
 		// number of entries and mask to go with it
@@ -48,24 +61,13 @@
 		const __u32 * mask;
 
-		// Submission flags (Not sure what for)
+		// Submission flags, currently only IORING_SETUP_SQPOLL
 		__u32 * flags;
 
-		// number of sqes not submitted (whatever that means)
+		// number of sqes not submitted
+		// From documentation : [dropped] is incremented for each invalid submission queue entry encountered in the ring buffer.
 		__u32 * dropped;
 
-		// Like head/tail but not seen by the kernel
-		volatile __u32 * ready;
-		__u32 ready_cnt;
-		__u32 prev_ready;
-
-		#if defined(LEADER_LOCK)
-			__leaderlock_t submit_lock;
-		#else
-			__spinlock_t submit_lock;
-		#endif
-		__spinlock_t  release_lock;
-
 		// A buffer of sqes (not the actual ring)
-		volatile struct io_uring_sqe * sqes;
+		struct io_uring_sqe * sqes;
 
 		// The location and size of the mmaped area
@@ -74,5 +76,5 @@
 	};
 
-	struct __completion_data {
+	struct __cmp_ring_t {
 		// Head and tail of the ring
 		volatile __u32 * head;
@@ -83,5 +85,5 @@
 		const __u32 * num;
 
-		// number of cqes not submitted (whatever that means)
+		// I don't know what this value is for
 		__u32 * overflow;
 
@@ -94,12 +96,44 @@
 	};
 
-	struct __io_data {
-		struct __submition_data submit_q;
-		struct __completion_data completion_q;
+	struct __attribute__((aligned(128))) $io_context {
+		inline Seqable;
+
+		volatile bool revoked;
+		processor * proc;
+
+		$io_arbiter * arbiter;
+
+		struct {
+			volatile bool empty;
+			condition blocked;
+		} ext_sq;
+
+		struct __sub_ring_t sq;
+		struct __cmp_ring_t cq;
 		__u32 ring_flags;
 		int fd;
 		int efd;
-		bool eager_submits:1;
-		bool poller_submits:1;
+
+		single_sem sem;
+		$thread self;
+	};
+
+	void main( $io_context & this );
+	static inline $thread  * get_thread ( $io_context & this ) __attribute__((const)) { return &this.self; }
+	static inline $monitor * get_monitor( $io_context & this ) __attribute__((const)) { return &this.self.self_mon; }
+	static inline $io_context *& Back( $io_context * n ) { return ($io_context *)Back( (Seqable *)n ); }
+	static inline $io_context *& Next( $io_context * n ) { return ($io_context *)Next( (Colable *)n ); }
+	void ^?{}( $io_context & mutex this );
+
+	monitor __attribute__((aligned(128))) $io_arbiter {
+		struct {
+			condition blocked;
+			$io_context * ctx;
+			volatile bool flag;
+		} pending;
+
+		Sequence($io_context) assigned;
+
+		Sequence($io_context) available;
 	};
 
@@ -133,9 +167,5 @@
 	#endif
 
-	struct $io_ctx_thread;
-	void __ioctx_register($io_ctx_thread & ctx);
-	void __ioctx_unregister($io_ctx_thread & ctx);
-	void __ioctx_prepare_block($io_ctx_thread & ctx);
-	void __sqe_clean( volatile struct io_uring_sqe * sqe );
+	void __ioctx_prepare_block($io_context & ctx);
 #endif
 
Index: libcfa/src/concurrency/iofwd.hfa
===================================================================
--- libcfa/src/concurrency/iofwd.hfa	(revision b44959f37442409070ab8f43baa6432ba2c8f812)
+++ libcfa/src/concurrency/iofwd.hfa	(revision 78da4abe22e69798ee3738623dc9a2d99e34d207)
@@ -48,6 +48,5 @@
 struct cluster;
 struct io_future_t;
-struct io_context;
-struct io_cancellation;
+struct $io_context;
 
 struct iovec;
@@ -55,70 +54,76 @@
 struct sockaddr;
 struct statx;
+struct epoll_event;
+
+//----------
+// underlying calls
+extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want)  __attribute__((nonnull (1,2)));
+extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have ) __attribute__((nonnull (1,2)));
 
 //----------
 // synchronous calls
 #if defined(CFA_HAVE_PREADV2)
-	extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
+	extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags);
 #endif
 #if defined(CFA_HAVE_PWRITEV2)
-	extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
+	extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags);
 #endif
-extern int cfa_fsync(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
-extern int cfa_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
-extern int cfa_sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
-extern  ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
-extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
-extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
-extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
-extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
-extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
-extern int cfa_fallocate(int fd, int mode, off_t offset, off_t len, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
-extern int cfa_posix_fadvise(int fd, off_t offset, off_t len, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
-extern int cfa_madvise(void *addr, size_t length, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
-extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
+extern int cfa_fsync(int fd, int submit_flags);
+extern int cfa_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event, int submit_flags);
+extern int cfa_sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags, int submit_flags);
+extern  ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags);
+extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags);
+extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags);
+extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags);
+extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags);
+extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags);
+extern int cfa_fallocate(int fd, int mode, off_t offset, off_t len, int submit_flags);
+extern int cfa_posix_fadvise(int fd, off_t offset, off_t len, int advice, int submit_flags);
+extern int cfa_madvise(void *addr, size_t length, int advice, int submit_flags);
+extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags);
 #if defined(CFA_HAVE_OPENAT2)
-	extern int cfa_openat2(int dirfd, const char *pathname, struct open_how * how, size_t size, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
+	extern int cfa_openat2(int dirfd, const char *pathname, struct open_how * how, size_t size, int submit_flags);
 #endif
-extern int cfa_close(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
+extern int cfa_close(int fd, int submit_flags);
 #if defined(CFA_HAVE_STATX)
-	extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
+	extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags);
 #endif
-extern ssize_t cfa_read(int fd, void * buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
-extern ssize_t cfa_write(int fd, void * buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
-extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
-extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
+extern ssize_t cfa_read(int fd, void * buf, size_t count, int submit_flags);
+extern ssize_t cfa_write(int fd, void * buf, size_t count, int submit_flags);
+extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags);
+extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags);
 
 //----------
 // asynchronous calls
 #if defined(CFA_HAVE_PREADV2)
-	extern void async_preadv2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
+	extern void async_preadv2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags);
 #endif
 #if defined(CFA_HAVE_PWRITEV2)
-	extern void async_pwritev2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
+	extern void async_pwritev2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags);
 #endif
-extern void async_fsync(io_future_t & future, int fd, int submit_flags, io_cancellation * cancellation, io_context * context);
-extern void async_epoll_ctl(io_future_t & future, int epfd, int op, int fd, struct epoll_event *event, int submit_flags, io_cancellation * cancellation, io_context * context);
-extern void async_sync_file_range(io_future_t & future, int fd, off64_t offset, off64_t nbytes, unsigned int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
-extern void async_sendmsg(io_future_t & future, int sockfd, const struct msghdr *msg, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
-extern void async_recvmsg(io_future_t & future, int sockfd, struct msghdr *msg, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
-extern void async_send(io_future_t & future, int sockfd, const void *buf, size_t len, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
-extern void async_recv(io_future_t & future, int sockfd, void *buf, size_t len, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
-extern void async_accept4(io_future_t & future, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
-extern void async_connect(io_future_t & future, int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags, io_cancellation * cancellation, io_context * context);
-extern void async_fallocate(io_future_t & future, int fd, int mode, off_t offset, off_t len, int submit_flags, io_cancellation * cancellation, io_context * context);
-extern void async_posix_fadvise(io_future_t & future, int fd, off_t offset, off_t len, int advice, int submit_flags, io_cancellation * cancellation, io_context * context);
-extern void async_madvise(io_future_t & future, void *addr, size_t length, int advice, int submit_flags, io_cancellation * cancellation, io_context * context);
-extern void async_openat(io_future_t & future, int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags, io_cancellation * cancellation, io_context * context);
+extern void async_fsync(io_future_t & future, int fd, int submit_flags);
+extern void async_epoll_ctl(io_future_t & future, int epfd, int op, int fd, struct epoll_event *event, int submit_flags);
+extern void async_sync_file_range(io_future_t & future, int fd, off64_t offset, off64_t nbytes, unsigned int flags, int submit_flags);
+extern void async_sendmsg(io_future_t & future, int sockfd, const struct msghdr *msg, int flags, int submit_flags);
+extern void async_recvmsg(io_future_t & future, int sockfd, struct msghdr *msg, int flags, int submit_flags);
+extern void async_send(io_future_t & future, int sockfd, const void *buf, size_t len, int flags, int submit_flags);
+extern void async_recv(io_future_t & future, int sockfd, void *buf, size_t len, int flags, int submit_flags);
+extern void async_accept4(io_future_t & future, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags);
+extern void async_connect(io_future_t & future, int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags);
+extern void async_fallocate(io_future_t & future, int fd, int mode, off_t offset, off_t len, int submit_flags);
+extern void async_posix_fadvise(io_future_t & future, int fd, off_t offset, off_t len, int advice, int submit_flags);
+extern void async_madvise(io_future_t & future, void *addr, size_t length, int advice, int submit_flags);
+extern void async_openat(io_future_t & future, int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags);
 #if defined(CFA_HAVE_OPENAT2)
-	extern void async_openat2(io_future_t & future, int dirfd, const char *pathname, struct open_how * how, size_t size, int submit_flags, io_cancellation * cancellation, io_context * context);
+	extern void async_openat2(io_future_t & future, int dirfd, const char *pathname, struct open_how * how, size_t size, int submit_flags);
 #endif
-extern void async_close(io_future_t & future, int fd, int submit_flags, io_cancellation * cancellation, io_context * context);
+extern void async_close(io_future_t & future, int fd, int submit_flags);
 #if defined(CFA_HAVE_STATX)
-	extern void async_statx(io_future_t & future, int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags, io_cancellation * cancellation, io_context * context);
+	extern void async_statx(io_future_t & future, int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags);
 #endif
-void async_read(io_future_t & future, int fd, void * buf, size_t count, int submit_flags, io_cancellation * cancellation, io_context * context);
-extern void async_write(io_future_t & future, int fd, void * buf, size_t count, int submit_flags, io_cancellation * cancellation, io_context * context);
-extern void async_splice(io_future_t & future, int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
-extern void async_tee(io_future_t & future, int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
+void async_read(io_future_t & future, int fd, void * buf, size_t count, int submit_flags);
+extern void async_write(io_future_t & future, int fd, void * buf, size_t count, int submit_flags);
+extern void async_splice(io_future_t & future, int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags);
+extern void async_tee(io_future_t & future, int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags);
 
 
@@ -126,6 +131,2 @@
 // Check if a function is blocks a only the user thread
 bool has_user_level_blocking( fptr_t func );
-
-//-----------------------------------------------------------------------------
-void register_fixed_files( io_context & ctx , int * files, unsigned count );
-void register_fixed_files( cluster    & cltr, int * files, unsigned count );
Index: libcfa/src/concurrency/kernel.hfa
===================================================================
--- libcfa/src/concurrency/kernel.hfa	(revision b44959f37442409070ab8f43baa6432ba2c8f812)
+++ libcfa/src/concurrency/kernel.hfa	(revision 78da4abe22e69798ee3738623dc9a2d99e34d207)
@@ -41,4 +41,23 @@
 
 //-----------------------------------------------------------------------------
+// I/O
+struct cluster;
+struct $io_context;
+struct $io_arbiter;
+
+struct io_context_params {
+	int num_entries;
+};
+
+void  ?{}(io_context_params & this);
+
+struct io_context {
+	$io_context * ctx;
+	cluster * cltr;
+};
+void  ?{}(io_context & this, struct cluster & cl);
+void ^?{}(io_context & this);
+
+//-----------------------------------------------------------------------------
 // Processor
 extern struct cluster * mainCluster;
@@ -78,4 +97,9 @@
 	pthread_t kernel_thread;
 
+	struct {
+		$io_context * volatile ctx;
+		volatile bool lock;
+	} io;
+
 	// Preemption data
 	// Node which is added in the discrete event simulaiton
@@ -116,47 +140,4 @@
 
 DLISTED_MGD_IMPL_OUT(processor)
-
-//-----------------------------------------------------------------------------
-// I/O
-struct __io_data;
-
-// IO poller user-thread
-// Not using the "thread" keyword because we want to control
-// more carefully when to start/stop it
-struct $io_ctx_thread {
-	struct __io_data * ring;
-	single_sem sem;
-	volatile bool done;
-	$thread self;
-};
-
-
-struct io_context {
-	$io_ctx_thread thrd;
-};
-
-struct io_context_params {
-	int num_entries;
-	int num_ready;
-	int submit_aff;
-	bool eager_submits:1;
-	bool poller_submits:1;
-	bool poll_submit:1;
-	bool poll_complete:1;
-};
-
-void  ?{}(io_context_params & this);
-
-void  ?{}(io_context & this, struct cluster & cl);
-void  ?{}(io_context & this, struct cluster & cl, const io_context_params & params);
-void ^?{}(io_context & this);
-
-struct io_cancellation {
-	__u64 target;
-};
-
-static inline void  ?{}(io_cancellation & this) { this.target = -1u; }
-static inline void ^?{}(io_cancellation &) {}
-bool cancel(io_cancellation & this);
 
 //-----------------------------------------------------------------------------
@@ -244,6 +225,6 @@
 
 	struct {
-		io_context * ctxs;
-		unsigned cnt;
+		$io_arbiter * arbiter;
+		io_context_params params;
 	} io;
 
Index: libcfa/src/concurrency/kernel/startup.cfa
===================================================================
--- libcfa/src/concurrency/kernel/startup.cfa	(revision b44959f37442409070ab8f43baa6432ba2c8f812)
+++ libcfa/src/concurrency/kernel/startup.cfa	(revision 78da4abe22e69798ee3738623dc9a2d99e34d207)
@@ -104,5 +104,5 @@
 KERNEL_STORAGE($thread,	             mainThread);
 KERNEL_STORAGE(__stack_t,            mainThreadCtx);
-KERNEL_STORAGE(io_context,           mainPollerThread);
+KERNEL_STORAGE(io_context,           mainIoContext);
 KERNEL_STORAGE(__scheduler_RWLock_t, __scheduler_lock);
 #if !defined(__CFA_NO_STATISTICS__)
@@ -231,4 +231,7 @@
 	__kernel_io_startup();
 
+	io_context * mainio = (io_context *)&storage_mainIoContext;
+	(*mainio){ *mainCluster };
+
 	// Add the main thread to the ready queue
 	// once resume is called on mainProcessor->runner the mainThread needs to be scheduled like any normal thread
@@ -243,14 +246,4 @@
 	// THE SYSTEM IS NOW COMPLETELY RUNNING
 
-
-	// SKULLDUGGERY: The constructor for the mainCluster will call alloc with a dimension of 0
-	// malloc *can* return a non-null value, we should free it if that is the case
-	free( mainCluster->io.ctxs );
-
-	// Now that the system is up, finish creating systems that need threading
-	mainCluster->io.ctxs = (io_context *)&storage_mainPollerThread;
-	mainCluster->io.cnt  = 1;
-	(*mainCluster->io.ctxs){ *mainCluster };
-
 	__cfadbg_print_safe(runtime_core, "Kernel : Started\n--------------------------------------------------\n\n");
 
@@ -263,7 +256,6 @@
 static void __kernel_shutdown(void) {
 	//Before we start shutting things down, wait for systems that need threading to shutdown
-	^(*mainCluster->io.ctxs){};
-	mainCluster->io.cnt  = 0;
-	mainCluster->io.ctxs = 0p;
+	io_context * mainio = (io_context *)&storage_mainIoContext;
+	^(*mainio){};
 
 	/* paranoid */ verify( __preemption_enabled() );
@@ -486,4 +478,7 @@
 	pending_preemption = false;
 
+	this.io.ctx = 0p;
+	this.io.lock = false;
+
 	#if !defined(__CFA_NO_STATISTICS__)
 		print_stats = 0;
@@ -584,4 +579,7 @@
 	threads{ __get };
 
+	io.arbiter = create();
+	io.params = io_params;
+
 	doregister(this);
 
@@ -596,18 +594,8 @@
 	ready_mutate_unlock( last_size );
 	enable_interrupts_noPoll(); // Don't poll, could be in main cluster
-
-
-	this.io.cnt  = num_io;
-	this.io.ctxs = aalloc(num_io);
-	for(i; this.io.cnt) {
-		(this.io.ctxs[i]){ this, io_params };
-	}
 }
 
 void ^?{}(cluster & this) {
-	for(i; this.io.cnt) {
-		^(this.io.ctxs[i]){ true };
-	}
-	free(this.io.ctxs);
+	destroy(this.io.arbiter);
 
 	// Lock the RWlock so no-one pushes/pops while we are changing the queue
Index: libcfa/src/concurrency/kernel_private.hfa
===================================================================
--- libcfa/src/concurrency/kernel_private.hfa	(revision b44959f37442409070ab8f43baa6432ba2c8f812)
+++ libcfa/src/concurrency/kernel_private.hfa	(revision 78da4abe22e69798ee3738623dc9a2d99e34d207)
@@ -77,5 +77,6 @@
 //-----------------------------------------------------------------------------
 // I/O
-void ^?{}(io_context & this, bool );
+$io_arbiter * create(void);
+void destroy($io_arbiter *);
 
 //=======================================================================
