// // Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo // // The contents of this file are covered under the licence agreement in the // file "LICENCE" distributed with Cforall. // // io.cfa -- // // Author : Thierry Delisle // Created On : Thu Apr 23 17:31:00 2020 // Last Modified By : // Last Modified On : // Update Count : // #define __cforall_thread__ #if defined(__CFA_DEBUG__) // #define __CFA_DEBUG_PRINT_IO__ // #define __CFA_DEBUG_PRINT_IO_CORE__ #endif #if defined(CFA_HAVE_LINUX_IO_URING_H) #define _GNU_SOURCE /* See feature_test_macros(7) */ #include #include #include #include #include extern "C" { #include #include #include #include } #include "stats.hfa" #include "kernel.hfa" #include "kernel/fwd.hfa" #include "io/types.hfa" static const char * opcodes[] = { "OP_NOP", "OP_READV", "OP_WRITEV", "OP_FSYNC", "OP_READ_FIXED", "OP_WRITE_FIXED", "OP_POLL_ADD", "OP_POLL_REMOVE", "OP_SYNC_FILE_RANGE", "OP_SENDMSG", "OP_RECVMSG", "OP_TIMEOUT", "OP_TIMEOUT_REMOVE", "OP_ACCEPT", "OP_ASYNC_CANCEL", "OP_LINK_TIMEOUT", "OP_CONNECT", "OP_FALLOCATE", "OP_OPENAT", "OP_CLOSE", "OP_FILES_UPDATE", "OP_STATX", "OP_READ", "OP_WRITE", "OP_FADVISE", "OP_MADVISE", "OP_SEND", "OP_RECV", "OP_OPENAT2", "OP_EPOLL_CTL", "OP_SPLICE", "OP_PROVIDE_BUFFERS", "OP_REMOVE_BUFFERS", "OP_TEE", "INVALID_OP" }; // returns true of acquired as leader or second leader static inline bool try_lock( __leaderlock_t & this ) { const uintptr_t thrd = 1z | (uintptr_t)active_thread(); bool block; disable_interrupts(); for() { struct $thread * expected = this.value; if( 1p != expected && 0p != expected ) { /* paranoid */ verify( thrd != (uintptr_t)expected ); // We better not already be the next leader enable_interrupts( __cfaabi_dbg_ctx ); return false; } struct $thread * desired; if( 0p == expected ) { // If the lock isn't locked acquire it, no need to block desired = 1p; block = false; } else { // If the lock is already locked try becomming the next leader desired = (struct $thread *)thrd; block = true; } if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break; } if( block ) { enable_interrupts( __cfaabi_dbg_ctx ); park(); disable_interrupts(); } return true; } static inline bool next( __leaderlock_t & this ) { /* paranoid */ verify( ! __preemption_enabled() ); struct $thread * nextt; for() { struct $thread * expected = this.value; /* paranoid */ verify( (1 & (uintptr_t)expected) == 1 ); // The lock better be locked struct $thread * desired; if( 1p == expected ) { // No next leader, just unlock desired = 0p; nextt = 0p; } else { // There is a next leader, remove but keep locked desired = 1p; nextt = (struct $thread *)(~1z & (uintptr_t)expected); } if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break; } if(nextt) { unpark( nextt ); enable_interrupts( __cfaabi_dbg_ctx ); return true; } enable_interrupts( __cfaabi_dbg_ctx ); return false; } //============================================================================================= // I/O Syscall //============================================================================================= static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) { bool need_sys_to_submit = false; bool need_sys_to_complete = false; unsigned flags = 0; TO_SUBMIT: if( to_submit > 0 ) { if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { need_sys_to_submit = true; break TO_SUBMIT; } if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) { need_sys_to_submit = true; flags |= IORING_ENTER_SQ_WAKEUP; } } if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { flags |= IORING_ENTER_GETEVENTS; if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) { need_sys_to_complete = true; } } int ret = 0; if( need_sys_to_submit || need_sys_to_complete ) { __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags); ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8); if( ret < 0 ) { switch((int)errno) { case EAGAIN: case EINTR: ret = -1; break; default: abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); } } } // Memory barrier __atomic_thread_fence( __ATOMIC_SEQ_CST ); return ret; } //============================================================================================= // I/O Polling //============================================================================================= static unsigned __collect_submitions( struct __io_data & ring ); static __u32 __release_consumed_submission( struct __io_data & ring ); // Process a single completion message from the io_uring // This is NOT thread-safe static inline void process( volatile struct io_uring_cqe & cqe ) { struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); fulfil( *future, cqe.res ); } static [int, bool] __drain_io( & struct __io_data ring ) { /* paranoid */ verify( ! __preemption_enabled() ); unsigned to_submit = 0; if( ring.poller_submits ) { // If the poller thread also submits, then we need to aggregate the submissions which are ready to_submit = __collect_submitions( ring ); } int ret = __io_uring_enter(ring, to_submit, true); if( ret < 0 ) { return [0, true]; } // update statistics if (to_submit > 0) { __STATS__( true, if( to_submit > 0 ) { io.submit_q.submit_avg.rdy += to_submit; io.submit_q.submit_avg.csm += ret; io.submit_q.submit_avg.cnt += 1; } ) } __atomic_thread_fence( __ATOMIC_SEQ_CST ); // Release the consumed SQEs __release_consumed_submission( ring ); // Drain the queue unsigned head = *ring.completion_q.head; unsigned tail = *ring.completion_q.tail; const __u32 mask = *ring.completion_q.mask; // Nothing was new return 0 if (head == tail) { return [0, to_submit > 0]; } __u32 count = tail - head; /* paranoid */ verify( count != 0 ); for(i; count) { unsigned idx = (head + i) & mask; volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; /* paranoid */ verify(&cqe); process( cqe ); } // Mark to the kernel that the cqe has been seen // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST ); return [count, count > 0 || to_submit > 0]; } void main( $io_ctx_thread & this ) { epoll_event ev; __ioctx_register( this, ev ); __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this); const int reset_cnt = 5; int reset = reset_cnt; // Then loop until we need to start LOOP: while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) { // Drain the io int count; bool again; disable_interrupts(); [count, again] = __drain_io( *this.ring ); if(!again) reset--; // Update statistics __STATS__( true, io.complete_q.completed_avg.val += count; io.complete_q.completed_avg.cnt += 1; ) enable_interrupts( __cfaabi_dbg_ctx ); // If we got something, just yield and check again if(reset > 1) { yield(); continue LOOP; } // We alread failed to find completed entries a few time. if(reset == 1) { // Rearm the context so it can block // but don't block right away // we need to retry one last time in case // something completed *just now* __ioctx_prepare_block( this, ev ); continue LOOP; } __STATS__( false, io.complete_q.blocks += 1; ) __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this); // block this thread wait( this.sem ); eventfd_t v; eventfd_read(this.ring->efd, &v); // restore counter reset = reset_cnt; } __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this); } //============================================================================================= // I/O Submissions //============================================================================================= // Submition steps : // 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones // listed in sq.array are visible by the kernel. For those not listed, the kernel does not // offer any assurance that an entry is not being filled by multiple flags. Therefore, we // need to write an allocator that allows allocating concurrently. // // 2 - Actually fill the submit entry, this is the only simple and straightforward step. // // 3 - Append the entry index to the array and adjust the tail accordingly. This operation // needs to arrive to two concensus at the same time: // A - The order in which entries are listed in the array: no two threads must pick the // same index for their entries // B - When can the tail be update for the kernel. EVERY entries in the array between // head and tail must be fully filled and shouldn't ever be touched again. // // Allocate an submit queue entry. // The kernel cannot see these entries until they are submitted, but other threads must be // able to see which entries can be used and which are already un used by an other thread // for convenience, return both the index and the pointer to the sqe // sqe == &sqes[idx] [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) { /* paranoid */ verify( data != 0 ); // Prepare the data we need __attribute((unused)) int len = 0; __attribute((unused)) int block = 0; __u32 cnt = *ring.submit_q.num; __u32 mask = *ring.submit_q.mask; __u32 off = thread_rand(); // Loop around looking for an available spot for() { // Look through the list starting at some offset for(i; cnt) { __u64 expected = 3; __u32 idx = (i + off) & mask; // Get an index from a random volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; volatile __u64 * udata = &sqe->user_data; // Allocate the entry by CASing the user_data field from 0 to the future address if( *udata == expected && __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) { // update statistics __STATS__( false, io.submit_q.alloc_avg.val += len; io.submit_q.alloc_avg.block += block; io.submit_q.alloc_avg.cnt += 1; ) // debug log __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data ); // Success return the data sqe->opcode = 0; sqe->flags = 0; sqe->ioprio = 0; sqe->fd = 0; sqe->off = 0; sqe->addr = 0; sqe->len = 0; sqe->accept_flags = 0; sqe->__pad2[0] = 0; sqe->__pad2[1] = 0; sqe->__pad2[2] = 0; return [sqe, idx]; } verify(expected != data); // This one was used len ++; } block++; abort( "Kernel I/O : all submit queue entries used, yielding\n" ); yield(); } } static inline __u32 __submit_to_ready_array( struct __io_data & ring, __u32 idx, const __u32 mask ) { /* paranoid */ verify( idx <= mask ); /* paranoid */ verify( idx != -1ul32 ); // We need to find a spot in the ready array __attribute((unused)) int len = 0; __attribute((unused)) int block = 0; __u32 ready_mask = ring.submit_q.ready_cnt - 1; __u32 off = thread_rand(); __u32 picked; LOOKING: for() { for(i; ring.submit_q.ready_cnt) { picked = (i + off) & ready_mask; __u32 expected = -1ul32; if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) { break LOOKING; } verify(expected != idx); len ++; } block++; __u32 released = __release_consumed_submission( ring ); if( released == 0 ) { yield(); } } // update statistics __STATS__( false, io.submit_q.look_avg.val += len; io.submit_q.look_avg.block += block; io.submit_q.look_avg.cnt += 1; ) return picked; } void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) { __io_data & ring = *ctx->thrd.ring; { __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; __cfadbg_print_safe( io, "Kernel I/O : submitting %u (%p) for %p\n" " data: %p\n" " opcode: %s\n" " fd: %d\n" " flags: %d\n" " prio: %d\n" " off: %p\n" " addr: %p\n" " len: %d\n" " other flags: %d\n" " splice fd: %d\n" " pad[0]: %llu\n" " pad[1]: %llu\n" " pad[2]: %llu\n", idx, sqe, active_thread(), (void*)sqe->user_data, opcodes[sqe->opcode], sqe->fd, sqe->flags, sqe->ioprio, sqe->off, sqe->addr, sqe->len, sqe->accept_flags, sqe->splice_fd_in, sqe->__pad2[0], sqe->__pad2[1], sqe->__pad2[2] ); } // Get now the data we definetely need volatile __u32 * const tail = ring.submit_q.tail; const __u32 mask = *ring.submit_q.mask; // There are 2 submission schemes, check which one we are using if( ring.poller_submits ) { // If the poller thread submits, then we just need to add this to the ready array __submit_to_ready_array( ring, idx, mask ); post( ctx->thrd.sem ); __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() ); } else if( ring.eager_submits ) { __u32 picked = __submit_to_ready_array( ring, idx, mask ); #if defined(LEADER_LOCK) if( !try_lock(ring.submit_q.submit_lock) ) { __STATS__( false, io.submit_q.helped += 1; ) return; } /* paranoid */ verify( ! __preemption_enabled() ); __STATS__( true, io.submit_q.leader += 1; ) #else for() { yield(); if( try_lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2) ) { __STATS__( false, io.submit_q.leader += 1; ) break; } // If some one else collected our index, we are done #warning ABA problem if( ring.submit_q.ready[picked] != idx ) { __STATS__( false, io.submit_q.helped += 1; ) return; } __STATS__( false, io.submit_q.busy += 1; ) } #endif // We got the lock // Collect the submissions unsigned to_submit = __collect_submitions( ring ); // Actually submit int ret = __io_uring_enter( ring, to_submit, false ); #if defined(LEADER_LOCK) /* paranoid */ verify( ! __preemption_enabled() ); next(ring.submit_q.submit_lock); #else unlock(ring.submit_q.submit_lock); #endif if( ret < 0 ) { return; } // Release the consumed SQEs __release_consumed_submission( ring ); // update statistics __STATS__( false, io.submit_q.submit_avg.rdy += to_submit; io.submit_q.submit_avg.csm += ret; io.submit_q.submit_avg.cnt += 1; ) __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() ); } else { // get mutual exclusion #if defined(LEADER_LOCK) while(!try_lock(ring.submit_q.submit_lock)); #else lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2); #endif /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64, /* paranoid */ "index %u already reclaimed\n" /* paranoid */ "head %u, prev %u, tail %u\n" /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n", /* paranoid */ idx, /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ] /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ] /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ] /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ] /* paranoid */ ); // Append to the list of ready entries /* paranoid */ verify( idx <= mask ); ring.submit_q.array[ (*tail) & mask ] = idx; __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); // Submit however, many entries need to be submitted int ret = __io_uring_enter( ring, 1, false ); if( ret < 0 ) { switch((int)errno) { default: abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); } } /* paranoid */ verify(ret == 1); // update statistics __STATS__( false, io.submit_q.submit_avg.csm += 1; io.submit_q.submit_avg.cnt += 1; ) { __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head; __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ]; __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx]; __cfadbg_print_safe( io, "Kernel I/O : last submitted is %u (%p)\n" " data: %p\n" " opcode: %s\n" " fd: %d\n" " flags: %d\n" " prio: %d\n" " off: %p\n" " addr: %p\n" " len: %d\n" " other flags: %d\n" " splice fd: %d\n" " pad[0]: %llu\n" " pad[1]: %llu\n" " pad[2]: %llu\n", last_idx, sqe, (void*)sqe->user_data, opcodes[sqe->opcode], sqe->fd, sqe->flags, sqe->ioprio, sqe->off, sqe->addr, sqe->len, sqe->accept_flags, sqe->splice_fd_in, sqe->__pad2[0], sqe->__pad2[1], sqe->__pad2[2] ); } __atomic_thread_fence( __ATOMIC_SEQ_CST ); // Release the consumed SQEs __release_consumed_submission( ring ); // ring.submit_q.sqes[idx].user_data = 3ul64; #if defined(LEADER_LOCK) next(ring.submit_q.submit_lock); #else unlock(ring.submit_q.submit_lock); #endif __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() ); } } // #define PARTIAL_SUBMIT 32 // go through the list of submissions in the ready array and moved them into // the ring's submit queue static unsigned __collect_submitions( struct __io_data & ring ) { /* paranoid */ verify( ring.submit_q.ready != 0p ); /* paranoid */ verify( ring.submit_q.ready_cnt > 0 ); unsigned to_submit = 0; __u32 tail = *ring.submit_q.tail; const __u32 mask = *ring.submit_q.mask; #if defined(PARTIAL_SUBMIT) #if defined(LEADER_LOCK) #error PARTIAL_SUBMIT and LEADER_LOCK cannot co-exist #endif const __u32 cnt = ring.submit_q.ready_cnt > PARTIAL_SUBMIT ? PARTIAL_SUBMIT : ring.submit_q.ready_cnt; const __u32 offset = ring.submit_q.prev_ready; ring.submit_q.prev_ready += cnt; #else const __u32 cnt = ring.submit_q.ready_cnt; const __u32 offset = 0; #endif // Go through the list of ready submissions for( c; cnt ) { __u32 i = (offset + c) % ring.submit_q.ready_cnt; // replace any submission with the sentinel, to consume it. __u32 idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); // If it was already the sentinel, then we are done if( idx == -1ul32 ) continue; // If we got a real submission, append it to the list ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask; to_submit++; } // Increment the tail based on how many we are ready to submit __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST); return to_submit; } // Go through the ring's submit queue and release everything that has already been consumed // by io_uring static __u32 __release_consumed_submission( struct __io_data & ring ) { const __u32 smask = *ring.submit_q.mask; // We need to get the lock to copy the old head and new head if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0; __attribute__((unused)) __u32 ctail = *ring.submit_q.tail; // get the current tail of the queue __u32 chead = *ring.submit_q.head; // get the current head of the queue __u32 phead = ring.submit_q.prev_head; // get the head the last time we were here ring.submit_q.prev_head = chead; // note up to were we processed unlock(ring.submit_q.release_lock); // the 3 fields are organized like this diagram // except it's are ring // ---+--------+--------+---- // ---+--------+--------+---- // ^ ^ ^ // phead chead ctail // make sure ctail doesn't wrap around and reach phead /* paranoid */ verify( (ctail >= chead && chead >= phead) || (chead >= phead && phead >= ctail) || (phead >= ctail && ctail >= chead) ); // find the range we need to clear __u32 count = chead - phead; // We acquired an previous-head/current-head range // go through the range and release the sqes for( i; count ) { __u32 idx = ring.submit_q.array[ (phead + i) & smask ]; /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data ); ring.submit_q.sqes[ idx ].user_data = 3ul64; } return count; } #endif