source: example/io/batch-readv.c @ cb921d4

ADTast-experimentalenumforall-pointer-decaypthread-emulationqualifiedEnum
Last change on this file since cb921d4 was 4aa495f, checked in by Peter A. Buhr <pabuhr@…>, 4 years ago

More fixes after another change to getTimeNsec()

  • Property mode set to 100644
File size: 4.9 KB
Line 
1// Program to test the optimial batchsize in a single threaded process
2extern "C" {
3        #include <getopt.h>
4        #include <locale.h>
5        #include <time.h>                                                                               // timespec
6        #include <sys/time.h>                                                                   // timeval
7}
8
9enum { TIMEGRAN = 1000000000LL };                                       // nanosecond granularity, except for timeval
10
11#include <omp.h>
12
13#include "io_uring.h"
14
15
16int myfd;
17
18long long unsigned submits   = 0;
19long long unsigned completes = 0;
20
21void submit_and_drain(struct iovec * iov, int n) {
22        for(int i = 0; i < n; i++) {
23                struct io_uring_sqe * sqe =  &self.io.submit_q.sqes[ 0 ];
24
25                sqe->opcode = IORING_OP_READV;
26                #if !defined(IOSQE_ASYNC)
27                        sqe->flags = 0;
28                #else
29                        sqe->flags = IOSQE_ASYNC;
30                #endif
31                sqe->ioprio = 0;
32                sqe->fd = myfd;
33                sqe->off = 0;
34                sqe->addr = (__u64)iov;
35                sqe->len = 1;
36                sqe->rw_flags = 0;
37                sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
38        }
39
40        volatile uint32_t * tail = self.io.submit_q.tail;
41        __atomic_fetch_add(tail, n, __ATOMIC_SEQ_CST);
42
43        int ret = syscall( __NR_io_uring_enter, self.io.fd, n, n, IORING_ENTER_GETEVENTS, nullptr, 0);
44        if( ret < 0 ) {
45                switch((int)errno) {
46                case EAGAIN:
47                case EINTR:
48                default:
49                        fprintf(stderr, "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
50                        abort();
51                }
52        }
53
54        submits += ret;
55
56        uint32_t chead = *self.io.completion_q.head;
57        uint32_t ctail = *self.io.completion_q.tail;
58        const uint32_t mask = *self.io.completion_q.mask;
59
60        // Memory barrier
61        __atomic_thread_fence( __ATOMIC_SEQ_CST );
62
63        uint32_t count = ctail - chead;
64        __atomic_fetch_add( self.io.completion_q.head, count, __ATOMIC_RELAXED );
65        completes += count;
66}
67
68uint64_t timeHiRes() {
69        timespec curr;
70        clock_gettime( CLOCK_REALTIME, &curr );
71        return (int64_t)curr.tv_sec * TIMEGRAN + curr.tv_nsec;
72}
73
74uint64_t to_miliseconds( uint64_t durtn ) { return durtn / (TIMEGRAN / 1000LL); }
75double to_fseconds(uint64_t durtn ) { return durtn / (double)TIMEGRAN; }
76uint64_t from_fseconds(double sec) { return sec * TIMEGRAN; }
77
78int main(int argc, char * argv[]) {
79        int buflen = 50;
80        int batch  = 1;
81        double duration = 5;
82
83        setlocale(LC_ALL, "");
84
85        for(;;) {
86                static struct option options[] = {
87                        {"duration",     required_argument, 0, 'd'},
88                        {"batchsize",   required_argument, 0, 'b'},
89                        {"buflen",      required_argument, 0, 'l'},
90                        {0, 0, 0, 0}
91                };
92
93                int idx = 0;
94                int opt = getopt_long(argc, argv, "d:l:b:", options, &idx);
95
96                const char * arg = optarg ? optarg : "";
97                char * end;
98                switch(opt) {
99                        // Exit Case
100                        case -1:
101                                goto arg_loop;
102                        case 'd': \
103                                duration = strtod(arg, &end); \
104                                if(*end != '\0') { \
105                                        fprintf(stderr, "Duration must be a valid double, was %s\n", arg); \
106                                        goto usage; \
107                                } \
108                                break;
109                        case 'l':
110                                buflen = strtoul(arg, &end, 10);
111                                if(*end != '\0' && buflen < 10) {
112                                        fprintf(stderr, "Buffer size must be at least 10, was %s\n", arg);
113                                        goto usage;
114                                }
115                        case 'b':
116                                batch = strtoul(arg, &end, 10);
117                                if(*end != '\0' && batch < 0) {
118                                        fprintf(stderr, "Batch size must be at least 1, was %s\n", arg);
119                                        goto usage;
120                                }
121                                break;
122                        default: /* ? */
123                                fprintf(stderr, "%d\n", opt);
124                        usage:
125                                fprintf( stderr, "  -l, --buflen=SIZE        Number of bytes to read per request\n" );
126                                fprintf( stderr, "  -b, --batchsize=COUNT    Number of request to batch together\n" );
127                                exit(EXIT_FAILURE);
128                }
129        }
130        arg_loop:
131
132        myfd = open(__FILE__, 0);
133
134        init_uring(2048);
135
136        // Allocate the sqe
137        uint32_t idx = 0;
138
139        // Return the sqe
140        struct io_uring_sqe * sqe =  &self.io.submit_q.sqes[ idx & (*self.io.submit_q.mask)];
141
142        char data[buflen];
143        struct iovec iov = { data, (size_t)buflen };
144
145        sqe->opcode = IORING_OP_READV;
146        #if !defined(IOSQE_ASYNC)
147                sqe->flags = 0;
148        #else
149                sqe->flags = IOSQE_ASYNC;
150        #endif
151        sqe->ioprio = 0;
152        sqe->fd = myfd;
153        sqe->off = 0;
154        sqe->addr = (__u64)&iov;
155        sqe->len = 1;
156        sqe->rw_flags = 0;
157        sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
158
159        // Append to the list of ready entries
160        for(unsigned i = 0; i < *self.io.submit_q.num; i++) {
161                self.io.submit_q.array[ i ] = 0;
162        }
163
164        printf("Running for %f second, reading %d bytes in batches of %d\n", duration, buflen, batch);
165        uint64_t start = timeHiRes();
166        uint64_t end   = timeHiRes();
167        uint64_t prev  = timeHiRes();
168        for(;;) {
169                submit_and_drain(&iov, batch);
170                end = timeHiRes();
171                uint64_t delta = end - start;
172                if( to_fseconds(end - prev) > 0.1 ) {
173                        printf(" %.1f\r", to_fseconds(delta));
174                        fflush(stdout);
175                        prev = end;
176                }
177                if( delta >= from_fseconds(duration) ) {
178                        break;
179                }
180        }
181
182        printf("Took %'ld ms\n", to_miliseconds(end - start));
183        printf("Submitted        %'llu\n", submits);
184        printf("Completed        %'llu\n", completes);
185        printf("Submitted / sec  %'.f\n", submits   / to_fseconds(end - start));
186        printf("Completed / sec  %'.f\n", completes / to_fseconds(end - start));
187        printf("ns per Submitted %'.f\n", 1000000000.0 * to_fseconds(end - start) / (submits   / batch) );
188        printf("ns per Completed %'.f\n", 1000000000.0 * to_fseconds(end - start) / (completes / batch) );
189}
Note: See TracBrowser for help on using the repository browser.