#include "kernel.hfa" #if !defined(HAVE_LINUX_IO_URING_H) void __kernel_io_startup( cluster & this ) { // Nothing to do without io_uring } void __kernel_io_shutdown( cluster & this ) { // Nothing to do without io_uring } bool is_async( void (*)() ) { return false; } #else extern "C" { #define _GNU_SOURCE /* See feature_test_macros(7) */ #include #include #include #include #include #include #include } #include "bits/signal.hfa" #include "kernel_private.hfa" #include "thread.hfa" uint32_t entries_per_cluster() { return 256; } static void * __io_poller( void * arg ); //============================================================================================= // I/O Startup / Shutdown logic //============================================================================================= void __kernel_io_startup( cluster & this ) { // Step 1 : call to setup struct io_uring_params params; memset(¶ms, 0, sizeof(params)); int fd = syscall(__NR_io_uring_setup, entries_per_cluster(), ¶ms ); if(fd < 0) { abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno)); } // Step 2 : mmap result memset(&this.io, 0, sizeof(struct io_ring)); struct io_uring_sq * sq = &this.io.submit_q; struct io_uring_cq * cq = &this.io.completion_q; // calculate the right ring size sq->ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) ); cq->ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe)); // Requires features // // adjust the size according to the parameters // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { // cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz); // } // mmap the Submit Queue into existence sq->ring_ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING); if (sq->ring_ptr == (void*)MAP_FAILED) { abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno)); } // mmap the Completion Queue into existence (may or may not be needed) // Requires features // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { // cq->ring_ptr = sq->ring_ptr; // } // else { // We need multiple call to MMAP cq->ring_ptr = mmap(0, cq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING); if (cq->ring_ptr == (void*)MAP_FAILED) { munmap(sq->ring_ptr, sq->ring_sz); abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno)); } // } // mmap the submit queue entries size_t size = params.sq_entries * sizeof(struct io_uring_sqe); sq->sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES); if (sq->sqes == (struct io_uring_sqe *)MAP_FAILED) { munmap(sq->ring_ptr, sq->ring_sz); if (cq->ring_ptr != sq->ring_ptr) munmap(cq->ring_ptr, cq->ring_sz); abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno)); } // Get the pointers from the kernel to fill the structure // submit queue sq->head = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.head; sq->tail = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.tail; sq->mask = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.ring_mask; sq->entries = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.ring_entries; sq->flags = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.flags; sq->dropped = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.dropped; sq->array = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.array; // completion queue cq->head = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.head; cq->tail = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.tail; cq->mask = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.ring_mask; cq->entries = (struct io_uring_cqe *)((intptr_t)cq->ring_ptr) + params.cq_off.ring_entries; cq->overflow = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.overflow; cq->cqes = (struct io_uring_cqe *)((intptr_t)cq->ring_ptr) + params.cq_off.cqes; // Update the global ring info this.io.flags = params.flags; this.io.fd = fd; this.io.done = false; // Create the poller thread this.io.stack = __create_pthread( &this.io.poller, __io_poller, &this ); } void __kernel_io_shutdown( cluster & this ) { // Stop the IO Poller // Notify the poller thread of the shutdown __atomic_store_n(&this.io.done, true, __ATOMIC_SEQ_CST); sigval val = { 1 }; pthread_sigqueue( this.io.poller, SIGUSR1, val ); // Wait for the poller thread to finish pthread_join( this.io.poller, 0p ); free( this.io.stack ); // Shutdown the io rings struct io_uring_sq & sq = this.io.submit_q; struct io_uring_cq & cq = this.io.completion_q; // unmap the submit queue entries munmap(sq.sqes, *sq.entries * sizeof(struct io_uring_sqe)); // unmap the Submit Queue ring munmap(sq.ring_ptr, sq.ring_sz); // unmap the Completion Queue ring, if it is different if (cq.ring_ptr != sq.ring_ptr) { munmap(cq.ring_ptr, cq.ring_sz); } // close the file descriptor close(this.io.fd); } //============================================================================================= // I/O Polling //============================================================================================= struct io_user_data { int32_t result; $thread * thrd; }; // Process a single completion message from the io_uring // This is NOT thread-safe static bool __io_process(struct io_ring & ring) { unsigned head = *ring.completion_q.head; unsigned tail = __atomic_load_n(ring.completion_q.tail, __ATOMIC_ACQUIRE); if (head == tail) return false; unsigned idx = head & (*ring.completion_q.mask); struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; /* paranoid */ verify(&cqe); struct io_user_data * data = (struct io_user_data *)cqe.user_data; data->result = cqe.res; unpark( data->thrd __cfaabi_dbg_ctx2 ); // Mark to the kernel that the cqe has been seen // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. __atomic_fetch_add( ring.completion_q.head, 1, __ATOMIC_RELEASE ); return true; } static void * __io_poller( void * arg ) { cluster * cltr = (cluster *)arg; struct io_ring & ring = cltr->io; sigset_t mask; sigfillset(&mask); if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) { abort( "KERNEL ERROR: IO_URING - pthread_sigmask" ); } sigdelset( &mask, SIGUSR1 ); verify( (*ring.submit_q.head) == (*ring.submit_q.tail) ); verify( (*ring.completion_q.head) == (*ring.completion_q.tail) ); LOOP: while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { int ret = syscall( __NR_io_uring_enter, ring.fd, 0, 1, IORING_ENTER_GETEVENTS, &mask, _NSIG / 8); if( ret < 0 ) { switch((int)errno) { case EAGAIN: case EINTR: continue LOOP; default: abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); } } // Drain the queue while(__io_process(ring)) {} } return 0p; } //============================================================================================= // I/O Submissions //============================================================================================= [* struct io_uring_sqe, uint32_t] io_submit_prelude( struct io_ring & ring ) { return [0p, 0]; } //============================================================================================= // I/O Interface //============================================================================================= /* extern "C" { #define __USE_GNU #include } ssize_t async_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { #if !defined(IORING_OP_READV) return preadv2(fd, iov, iovcnt, offset, flags); #else sqe->opcode = IORING_OP_READV; sqe->flags = 0; sqe->ioprio = 0; sqe->fd = fd; sqe->off = offset; sqe->addr = (unsigned long) iov; sqe->len = iovcnt; sqe->rw_flags = 0; sqe->user_data = 0; sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; #endif } ssize_t async_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { #if !defined(IORING_OP_WRITEV) return pwritev2(fd, iov, iovcnt, offset, flags); #else #warning not implemented #endif } bool is_async( void (*func)() ) { switch((uintptr_t)func) { case (uintptr_t)preadv2: case (uintptr_t)async_preadv2: #if defined(IORING_OP_READV) return true; #else return false; #endif default: return false; } } */ #endif