// Program to test the optimial batchsize in a single threaded process extern "C" { #ifndef _GNU_SOURCE /* See feature_test_macros(7) */ #define _GNU_SOURCE /* See feature_test_macros(7) */ #endif #include #include #include #include #include #include #include #include #include #include #include #include #include // timespec #include // timeval #include } enum { TIMEGRAN = 1000000000LL }; // nanosecond granularity, except for timeval #include # ifndef __NR_io_uring_setup # define __NR_io_uring_setup 425 # endif # ifndef __NR_io_uring_enter # define __NR_io_uring_enter 426 # endif # ifndef __NR_io_uring_register # define __NR_io_uring_register 427 # endif struct io_uring_sq { // Head and tail of the ring (associated with array) volatile uint32_t * head; volatile uint32_t * tail; // The actual kernel ring which uses head/tail // indexes into the sqes arrays uint32_t * array; // number of entries and mask to go with it const uint32_t * num; const uint32_t * mask; // Submission flags (Not sure what for) uint32_t * flags; // number of sqes not submitted (whatever that means) uint32_t * dropped; // Like head/tail but not seen by the kernel volatile uint32_t alloc; // A buffer of sqes (not the actual ring) struct io_uring_sqe * sqes; // The location and size of the mmaped area void * ring_ptr; size_t ring_sz; }; struct io_uring_cq { // Head and tail of the ring volatile uint32_t * head; volatile uint32_t * tail; // number of entries and mask to go with it const uint32_t * mask; const uint32_t * num; // number of cqes not submitted (whatever that means) uint32_t * overflow; // the kernel ring struct io_uring_cqe * cqes; // The location and size of the mmaped area void * ring_ptr; size_t ring_sz; }; struct io_ring { struct io_uring_sq submit_q; struct io_uring_cq completion_q; uint32_t flags; int fd; }; struct fred { io_ring io; }; fred self; int myfd; long long unsigned submits = 0; long long unsigned completes = 0; void submit_and_drain(struct iovec * iov, int n) { for(int i = 0; i < n; i++) { struct io_uring_sqe * sqe = &self.io.submit_q.sqes[ 0 ]; sqe->opcode = IORING_OP_READV; #if !defined(IOSQE_ASYNC) sqe->flags = 0; #else sqe->flags = IOSQE_ASYNC; #endif sqe->ioprio = 0; sqe->fd = myfd; sqe->off = 0; sqe->addr = (__u64)iov; sqe->len = 1; sqe->rw_flags = 0; sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; } volatile uint32_t * tail = self.io.submit_q.tail; __atomic_fetch_add(tail, n, __ATOMIC_SEQ_CST); int ret = syscall( __NR_io_uring_enter, self.io.fd, n, n, IORING_ENTER_GETEVENTS, nullptr, 0); if( ret < 0 ) { switch((int)errno) { case EAGAIN: case EINTR: default: fprintf(stderr, "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); abort(); } } submits += ret; uint32_t chead = *self.io.completion_q.head; uint32_t ctail = *self.io.completion_q.tail; const uint32_t mask = *self.io.completion_q.mask; // Memory barrier __atomic_thread_fence( __ATOMIC_SEQ_CST ); uint32_t count = ctail - chead; __atomic_fetch_add( self.io.completion_q.head, count, __ATOMIC_RELAXED ); completes += count; } uint64_t getTimeNsec() { timespec curr; clock_gettime( CLOCK_REALTIME, &curr ); return (int64_t)curr.tv_sec * TIMEGRAN + curr.tv_nsec; } uint64_t to_miliseconds( uint64_t durtn ) { return durtn / (TIMEGRAN / 1000LL); } double to_fseconds(uint64_t durtn ) { return durtn / (double)TIMEGRAN; } uint64_t from_fseconds(double sec) { return sec * TIMEGRAN; } int main(int argc, char * argv[]) { int buflen = 50; int batch = 1; double duration = 5; setlocale(LC_ALL, ""); for(;;) { static struct option options[] = { {"duration", required_argument, 0, 'd'}, {"batchsize", required_argument, 0, 'b'}, {"buflen", required_argument, 0, 'l'}, {0, 0, 0, 0} }; int idx = 0; int opt = getopt_long(argc, argv, "d:l:b:", options, &idx); const char * arg = optarg ? optarg : ""; char * end; switch(opt) { // Exit Case case -1: goto arg_loop; case 'd': \ duration = strtod(arg, &end); \ if(*end != '\0') { \ fprintf(stderr, "Duration must be a valid double, was %s\n", arg); \ goto usage; \ } \ break; case 'l': buflen = strtoul(arg, &end, 10); if(*end != '\0' && buflen < 10) { fprintf(stderr, "Buffer size must be at least 10, was %s\n", arg); goto usage; } case 'b': batch = strtoul(arg, &end, 10); if(*end != '\0' && batch < 0) { fprintf(stderr, "Batch size must be at least 1, was %s\n", arg); goto usage; } break; default: /* ? */ fprintf(stderr, "%d\n", opt); usage: fprintf( stderr, " -l, --buflen=SIZE Number of bytes to read per request\n" ); fprintf( stderr, " -b, --batchsize=COUNT Number of request to batch together\n" ); exit(EXIT_FAILURE); } } arg_loop: myfd = open(__FILE__, 0); // Step 1 : call to setup struct io_uring_params params; memset(¶ms, 0, sizeof(params)); uint32_t nentries = 2048; int fd = syscall(__NR_io_uring_setup, nentries, ¶ms ); if(fd < 0) { fprintf(stderr, "KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno)); abort(); } // Step 2 : mmap result memset(&self.io, 0, sizeof(struct io_ring)); struct io_uring_sq & sq = self.io.submit_q; struct io_uring_cq & cq = self.io.completion_q; // calculate the right ring size sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) ); cq.ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe)); // Requires features // // adjust the size according to the parameters // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { // cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz); // } // mmap the Submit Queue into existence sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING); if (sq.ring_ptr == (void*)MAP_FAILED) { fprintf(stderr, "KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno)); abort(); } // mmap the Completion Queue into existence (may or may not be needed) // Requires features // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { // cq->ring_ptr = sq->ring_ptr; // } // else { // We need multiple call to MMAP cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING); if (cq.ring_ptr == (void*)MAP_FAILED) { munmap(sq.ring_ptr, sq.ring_sz); fprintf(stderr, "KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno)); abort(); } // } // mmap the submit queue entries size_t size = params.sq_entries * sizeof(struct io_uring_sqe); sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES); if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) { munmap(sq.ring_ptr, sq.ring_sz); if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz); fprintf(stderr, "KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno)); abort(); } // Get the pointers from the kernel to fill the structure // submit queue sq.head = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head); sq.tail = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail); sq.mask = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask); sq.num = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries); sq.flags = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags); sq.dropped = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); sq.alloc = *sq.tail; // completion queue cq.head = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head); cq.tail = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail); cq.mask = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask); cq.num = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries); cq.overflow = ( uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow); cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes); self.io.fd = fd; // Allocate the sqe uint32_t idx = 0; // Return the sqe struct io_uring_sqe * sqe = &self.io.submit_q.sqes[ idx & (*self.io.submit_q.mask)]; char data[buflen]; struct iovec iov = { data, (size_t)buflen }; sqe->opcode = IORING_OP_READV; #if !defined(IOSQE_ASYNC) sqe->flags = 0; #else sqe->flags = IOSQE_ASYNC; #endif sqe->ioprio = 0; sqe->fd = myfd; sqe->off = 0; sqe->addr = (__u64)&iov; sqe->len = 1; sqe->rw_flags = 0; sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; // Append to the list of ready entries for(unsigned i = 0; i < *self.io.submit_q.num; i++) { self.io.submit_q.array[ i ] = 0; } printf("Running for %f second, reading %d bytes in batches of %d\n", duration, buflen, batch); uint64_t start = getTimeNsec(); uint64_t end = getTimeNsec(); uint64_t prev = getTimeNsec(); for(;;) { submit_and_drain(&iov, batch); end = getTimeNsec(); uint64_t delta = end - start; if( to_fseconds(end - prev) > 0.1 ) { printf(" %.1f\r", to_fseconds(delta)); fflush(stdout); prev = end; } if( delta >= from_fseconds(duration) ) { break; } } printf("Took %'ld ms\n", to_miliseconds(end - start)); printf("Submitted %'llu\n", submits); printf("Completed %'llu\n", completes); printf("Submitted / sec %'.f\n", submits / to_fseconds(end - start)); printf("Completed / sec %'.f\n", completes / to_fseconds(end - start)); }