// // Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo // // The contents of this file are covered under the licence agreement in the // file "LICENCE" distributed with Cforall. // // io.cfa -- // // Author : Thierry Delisle // Created On : Thu Apr 23 17:31:00 2020 // Last Modified By : // Last Modified On : // Update Count : // #define __cforall_thread__ #if defined(__CFA_DEBUG__) // #define __CFA_DEBUG_PRINT_IO__ // #define __CFA_DEBUG_PRINT_IO_CORE__ #endif #if defined(CFA_HAVE_LINUX_IO_URING_H) #define _GNU_SOURCE /* See feature_test_macros(7) */ #include #include #include #include #include extern "C" { #include #include #include } #include "stats.hfa" #include "kernel.hfa" #include "kernel/fwd.hfa" #include "io/types.hfa" // returns true of acquired as leader or second leader static inline bool try_lock( __leaderlock_t & this ) { const uintptr_t thrd = 1z | (uintptr_t)active_thread(); bool block; disable_interrupts(); for() { struct $thread * expected = this.value; if( 1p != expected && 0p != expected ) { /* paranoid */ verify( thrd != (uintptr_t)expected ); // We better not already be the next leader enable_interrupts( __cfaabi_dbg_ctx ); return false; } struct $thread * desired; if( 0p == expected ) { // If the lock isn't locked acquire it, no need to block desired = 1p; block = false; } else { // If the lock is already locked try becomming the next leader desired = (struct $thread *)thrd; block = true; } if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break; } if( block ) { enable_interrupts( __cfaabi_dbg_ctx ); park(); disable_interrupts(); } return true; } static inline bool next( __leaderlock_t & this ) { /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); struct $thread * nextt; for() { struct $thread * expected = this.value; /* paranoid */ verify( (1 & (uintptr_t)expected) == 1 ); // The lock better be locked struct $thread * desired; if( 1p == expected ) { // No next leader, just unlock desired = 0p; nextt = 0p; } else { // There is a next leader, remove but keep locked desired = 1p; nextt = (struct $thread *)(~1z & (uintptr_t)expected); } if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break; } if(nextt) { unpark( nextt ); enable_interrupts( __cfaabi_dbg_ctx ); return true; } enable_interrupts( __cfaabi_dbg_ctx ); return false; } //============================================================================================= // I/O Syscall //============================================================================================= static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) { bool need_sys_to_submit = false; bool need_sys_to_complete = false; unsigned flags = 0; TO_SUBMIT: if( to_submit > 0 ) { if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { need_sys_to_submit = true; break TO_SUBMIT; } if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) { need_sys_to_submit = true; flags |= IORING_ENTER_SQ_WAKEUP; } } if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { flags |= IORING_ENTER_GETEVENTS; if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) { need_sys_to_complete = true; } } int ret = 0; if( need_sys_to_submit || need_sys_to_complete ) { ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8); if( ret < 0 ) { switch((int)errno) { case EAGAIN: case EINTR: ret = -1; break; default: abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); } } } // Memory barrier __atomic_thread_fence( __ATOMIC_SEQ_CST ); return ret; } //============================================================================================= // I/O Polling //============================================================================================= static unsigned __collect_submitions( struct __io_data & ring ); static __u32 __release_consumed_submission( struct __io_data & ring ); static inline void process(struct io_uring_cqe & cqe ) { struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", future, cqe.res, data->thrd ); fulfil( *future, cqe.res ); } // Process a single completion message from the io_uring // This is NOT thread-safe static [int, bool] __drain_io( & struct __io_data ring ) { /* paranoid */ verify( !kernelTLS.preemption_state.enabled ); unsigned to_submit = 0; if( ring.poller_submits ) { // If the poller thread also submits, then we need to aggregate the submissions which are ready to_submit = __collect_submitions( ring ); } int ret = __io_uring_enter(ring, to_submit, true); if( ret < 0 ) { return [0, true]; } // update statistics if (to_submit > 0) { __STATS__( true, if( to_submit > 0 ) { io.submit_q.submit_avg.rdy += to_submit; io.submit_q.submit_avg.csm += ret; io.submit_q.submit_avg.cnt += 1; } ) } // Release the consumed SQEs __release_consumed_submission( ring ); // Drain the queue unsigned head = *ring.completion_q.head; unsigned tail = *ring.completion_q.tail; const __u32 mask = *ring.completion_q.mask; // Nothing was new return 0 if (head == tail) { return [0, to_submit > 0]; } __u32 count = tail - head; /* paranoid */ verify( count != 0 ); for(i; count) { unsigned idx = (head + i) & mask; struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; /* paranoid */ verify(&cqe); process( cqe ); } // Mark to the kernel that the cqe has been seen // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. __atomic_thread_fence( __ATOMIC_SEQ_CST ); __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED ); return [count, count > 0 || to_submit > 0]; } void main( $io_ctx_thread & this ) { epoll_event ev; __ioctx_register( this, ev ); __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring); int reset = 0; // Then loop until we need to start while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) { // Drain the io int count; bool again; disable_interrupts(); [count, again] = __drain_io( *this.ring ); if(!again) reset++; // Update statistics __STATS__( true, io.complete_q.completed_avg.val += count; io.complete_q.completed_avg.cnt += 1; ) enable_interrupts( __cfaabi_dbg_ctx ); // If we got something, just yield and check again if(reset < 5) { yield(); } // We didn't get anything baton pass to the slow poller else { __STATS__( false, io.complete_q.blocks += 1; ) __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self); reset = 0; // block this thread __ioctx_prepare_block( this, ev ); wait( this.sem ); } } __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring); } //============================================================================================= // I/O Submissions //============================================================================================= // Submition steps : // 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones // listed in sq.array are visible by the kernel. For those not listed, the kernel does not // offer any assurance that an entry is not being filled by multiple flags. Therefore, we // need to write an allocator that allows allocating concurrently. // // 2 - Actually fill the submit entry, this is the only simple and straightforward step. // // 3 - Append the entry index to the array and adjust the tail accordingly. This operation // needs to arrive to two concensus at the same time: // A - The order in which entries are listed in the array: no two threads must pick the // same index for their entries // B - When can the tail be update for the kernel. EVERY entries in the array between // head and tail must be fully filled and shouldn't ever be touched again. // [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) { /* paranoid */ verify( data != 0 ); // Prepare the data we need __attribute((unused)) int len = 0; __attribute((unused)) int block = 0; __u32 cnt = *ring.submit_q.num; __u32 mask = *ring.submit_q.mask; disable_interrupts(); __u32 off = __tls_rand(); enable_interrupts( __cfaabi_dbg_ctx ); // Loop around looking for an available spot for() { // Look through the list starting at some offset for(i; cnt) { __u64 expected = 0; __u32 idx = (i + off) & mask; struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; volatile __u64 * udata = &sqe->user_data; if( *udata == expected && __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) { // update statistics __STATS__( false, io.submit_q.alloc_avg.val += len; io.submit_q.alloc_avg.block += block; io.submit_q.alloc_avg.cnt += 1; ) // Success return the data return [sqe, idx]; } verify(expected != data); len ++; } block++; yield(); } } static inline __u32 __submit_to_ready_array( struct __io_data & ring, __u32 idx, const __u32 mask ) { /* paranoid */ verify( idx <= mask ); /* paranoid */ verify( idx != -1ul32 ); // We need to find a spot in the ready array __attribute((unused)) int len = 0; __attribute((unused)) int block = 0; __u32 ready_mask = ring.submit_q.ready_cnt - 1; disable_interrupts(); __u32 off = __tls_rand(); enable_interrupts( __cfaabi_dbg_ctx ); __u32 picked; LOOKING: for() { for(i; ring.submit_q.ready_cnt) { picked = (i + off) & ready_mask; __u32 expected = -1ul32; if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) { break LOOKING; } verify(expected != idx); len ++; } block++; __u32 released = __release_consumed_submission( ring ); if( released == 0 ) { yield(); } } // update statistics __STATS__( false, io.submit_q.look_avg.val += len; io.submit_q.look_avg.block += block; io.submit_q.look_avg.cnt += 1; ) return picked; } void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) { __io_data & ring = *ctx->thrd.ring; // Get now the data we definetely need volatile __u32 * const tail = ring.submit_q.tail; const __u32 mask = *ring.submit_q.mask; // There are 2 submission schemes, check which one we are using if( ring.poller_submits ) { // If the poller thread submits, then we just need to add this to the ready array __submit_to_ready_array( ring, idx, mask ); post( ctx->thrd.sem ); __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() ); } else if( ring.eager_submits ) { __u32 picked = __submit_to_ready_array( ring, idx, mask ); #if defined(LEADER_LOCK) if( !try_lock(ring.submit_q.submit_lock) ) { __STATS__( false, io.submit_q.helped += 1; ) return; } /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); __STATS__( true, io.submit_q.leader += 1; ) #else for() { yield(); if( try_lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2) ) { __STATS__( false, io.submit_q.leader += 1; ) break; } // If some one else collected our index, we are done #warning ABA problem if( ring.submit_q.ready[picked] != idx ) { __STATS__( false, io.submit_q.helped += 1; ) return; } __STATS__( false, io.submit_q.busy += 1; ) } #endif // We got the lock // Collect the submissions unsigned to_submit = __collect_submitions( ring ); // Actually submit int ret = __io_uring_enter( ring, to_submit, false ); #if defined(LEADER_LOCK) /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); next(ring.submit_q.submit_lock); #else unlock(ring.submit_q.submit_lock); #endif if( ret < 0 ) return; // Release the consumed SQEs __release_consumed_submission( ring ); // update statistics __STATS__( false, io.submit_q.submit_avg.rdy += to_submit; io.submit_q.submit_avg.csm += ret; io.submit_q.submit_avg.cnt += 1; ) } else { // get mutual exclusion #if defined(LEADER_LOCK) while(!try_lock(ring.submit_q.submit_lock)); #else lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2); #endif /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0, /* paranoid */ "index %u already reclaimed\n" /* paranoid */ "head %u, prev %u, tail %u\n" /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n", /* paranoid */ idx, /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ] /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ] /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ] /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ] /* paranoid */ ); // Append to the list of ready entries /* paranoid */ verify( idx <= mask ); ring.submit_q.array[ (*tail) & mask ] = idx; __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); // Submit however, many entries need to be submitted int ret = __io_uring_enter( ring, 1, false ); if( ret < 0 ) { switch((int)errno) { default: abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); } } // update statistics __STATS__( false, io.submit_q.submit_avg.csm += 1; io.submit_q.submit_avg.cnt += 1; ) // Release the consumed SQEs __release_consumed_submission( ring ); #if defined(LEADER_LOCK) next(ring.submit_q.submit_lock); #else unlock(ring.submit_q.submit_lock); #endif __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret ); } } // #define PARTIAL_SUBMIT 32 static unsigned __collect_submitions( struct __io_data & ring ) { /* paranoid */ verify( ring.submit_q.ready != 0p ); /* paranoid */ verify( ring.submit_q.ready_cnt > 0 ); unsigned to_submit = 0; __u32 tail = *ring.submit_q.tail; const __u32 mask = *ring.submit_q.mask; #if defined(PARTIAL_SUBMIT) #if defined(LEADER_LOCK) #error PARTIAL_SUBMIT and LEADER_LOCK cannot co-exist #endif const __u32 cnt = ring.submit_q.ready_cnt > PARTIAL_SUBMIT ? PARTIAL_SUBMIT : ring.submit_q.ready_cnt; const __u32 offset = ring.submit_q.prev_ready; ring.submit_q.prev_ready += cnt; #else const __u32 cnt = ring.submit_q.ready_cnt; const __u32 offset = 0; #endif // Go through the list of ready submissions for( c; cnt ) { __u32 i = (offset + c) % ring.submit_q.ready_cnt; // replace any submission with the sentinel, to consume it. __u32 idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); // If it was already the sentinel, then we are done if( idx == -1ul32 ) continue; // If we got a real submission, append it to the list ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask; to_submit++; } // Increment the tail based on how many we are ready to submit __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST); return to_submit; } static __u32 __release_consumed_submission( struct __io_data & ring ) { const __u32 smask = *ring.submit_q.mask; if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0; __u32 chead = *ring.submit_q.head; __u32 phead = ring.submit_q.prev_head; ring.submit_q.prev_head = chead; unlock(ring.submit_q.release_lock); __u32 count = chead - phead; for( i; count ) { __u32 idx = ring.submit_q.array[ (phead + i) & smask ]; ring.submit_q.sqes[ idx ].user_data = 0; } return count; } #endif