source: libcfa/src/concurrency/io.cfa @ f6660520

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since f6660520 was f6660520, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Added new implementation of io_uring that uses user-thread

  • Property mode set to 100644
File size: 28.1 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Thu Apr 23 17:31:00 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#include "kernel.hfa"
17
18#if !defined(HAVE_LINUX_IO_URING_H)
19        void __kernel_io_startup( cluster & ) {
20                // Nothing to do without io_uring
21        }
22
23        void __kernel_io_start_thrd( cluster & ) {
24                // Nothing to do without io_uring
25        }
26
27        void __kernel_io_stop_thrd ( cluster & ) {
28                // Nothing to do without io_uring
29        }
30
31        void __kernel_io_shutdown( cluster & ) {
32                // Nothing to do without io_uring
33        }
34
35#else
36        extern "C" {
37                #define _GNU_SOURCE         /* See feature_test_macros(7) */
38                #include <errno.h>
39                #include <stdint.h>
40                #include <string.h>
41                #include <unistd.h>
42                #include <sys/mman.h>
43                #include <sys/syscall.h>
44
45                #include <linux/io_uring.h>
46        }
47
48        #include "bits/signal.hfa"
49        #include "kernel_private.hfa"
50        #include "thread.hfa"
51
52        uint32_t entries_per_cluster() {
53                return 256;
54        }
55
56        static void * __io_poller_slow( void * arg );
57
58        // Weirdly, some systems that do support io_uring don't actually define these
59        #ifdef __alpha__
60                /*
61                * alpha is the only exception, all other architectures
62                * have common numbers for new system calls.
63                */
64                #ifndef __NR_io_uring_setup
65                        #define __NR_io_uring_setup           535
66                #endif
67                #ifndef __NR_io_uring_enter
68                        #define __NR_io_uring_enter           536
69                #endif
70                #ifndef __NR_io_uring_register
71                        #define __NR_io_uring_register        537
72                #endif
73        #else /* !__alpha__ */
74                #ifndef __NR_io_uring_setup
75                        #define __NR_io_uring_setup           425
76                #endif
77                #ifndef __NR_io_uring_enter
78                        #define __NR_io_uring_enter           426
79                #endif
80                #ifndef __NR_io_uring_register
81                        #define __NR_io_uring_register        427
82                #endif
83        #endif
84
85        #if defined(__CFA_IO_POLLING_USER__)
86                void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
87                        this.ring = &cltr.io;
88                        (this.thrd){ "I/O Poller", cltr };
89                }
90                void ^?{}( __io_poller_fast & mutex this );
91        void main( __io_poller_fast & this );
92        static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
93                void ^?{}( __io_poller_fast & mutex this ) {}
94        #endif
95
96//=============================================================================================
97// I/O Startup / Shutdown logic
98//=============================================================================================
99        void __kernel_io_startup( cluster & this, bool main_cluster ) {
100                // Step 1 : call to setup
101                struct io_uring_params params;
102                memset(&params, 0, sizeof(params));
103
104                uint32_t nentries = entries_per_cluster();
105
106                int fd = syscall(__NR_io_uring_setup, nentries, &params );
107                if(fd < 0) {
108                        abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
109                }
110
111                // Step 2 : mmap result
112                memset(&this.io, 0, sizeof(struct io_ring));
113                struct io_uring_sq & sq = this.io.submit_q;
114                struct io_uring_cq & cq = this.io.completion_q;
115
116                // calculate the right ring size
117                sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
118                cq.ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
119
120                // Requires features
121                #if defined(IORING_FEAT_SINGLE_MMAP)
122                        // adjust the size according to the parameters
123                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
124                                cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz);
125                        }
126                #endif
127
128                // mmap the Submit Queue into existence
129                sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
130                if (sq.ring_ptr == (void*)MAP_FAILED) {
131                        abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
132                }
133
134                // Requires features
135                #if defined(IORING_FEAT_SINGLE_MMAP)
136                        // mmap the Completion Queue into existence (may or may not be needed)
137                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
138                                cq->ring_ptr = sq->ring_ptr;
139                        }
140                        else
141                #endif
142                {
143                        // We need multiple call to MMAP
144                        cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
145                        if (cq.ring_ptr == (void*)MAP_FAILED) {
146                                munmap(sq.ring_ptr, sq.ring_sz);
147                                abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
148                        }
149                }
150
151                // mmap the submit queue entries
152                size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
153                sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
154                if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
155                        munmap(sq.ring_ptr, sq.ring_sz);
156                        if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
157                        abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
158                }
159
160                // Get the pointers from the kernel to fill the structure
161                // submit queue
162                sq.head    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
163                sq.tail    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
164                sq.mask    = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
165                sq.num     = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
166                sq.flags   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
167                sq.dropped = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
168                sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
169                sq.alloc = *sq.tail;
170                sq.ready = *sq.tail;
171
172                // completion queue
173                cq.head     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
174                cq.tail     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
175                cq.mask     = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
176                cq.num      = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
177                cq.overflow = (         uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
178                cq.cqes   = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
179
180                // some paranoid checks
181                /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
182                /* paranoid */ verifyf( (*cq.num)  >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
183                /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
184                /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
185
186                /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
187                /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
188                /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
189                /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
190
191                // Update the global ring info
192                this.io.flags = params.flags;
193                this.io.fd    = fd;
194                this.io.done  = false;
195                (this.io.submit){ min(*sq.num, *cq.num) };
196
197                // Initialize statistics
198                #if !defined(__CFA_NO_STATISTICS__)
199                        this.io.submit_q.stats.submit_avg.val = 0;
200                        this.io.submit_q.stats.submit_avg.cnt = 0;
201                        this.io.completion_q.stats.completed_avg.val = 0;
202                        this.io.completion_q.stats.completed_avg.cnt = 0;
203                #endif
204
205                if(!main_cluster) {
206                        __kernel_io_finish_start( this );
207                }
208        }
209
210        void __kernel_io_finish_start( cluster & this ) {
211                #if defined(__CFA_IO_POLLING_USER__)
212                        (this.io.poller.fast){ this };
213                        __thrd_start( this.io.poller.fast, main );
214                #endif
215
216                // Create the poller thread
217                this.io.poller.slow.stack = __create_pthread( &this.io.poller.slow.kthrd, __io_poller_slow, &this );
218        }
219
220        void __kernel_io_prepare_stop( cluster & this ) {
221                // Notify the poller thread of the shutdown
222                __atomic_store_n(&this.io.done, true, __ATOMIC_SEQ_CST);
223
224                // Stop the IO Poller
225                sigval val = { 1 };
226                pthread_sigqueue( this.io.poller.slow.kthrd, SIGUSR1, val );
227                #if defined(__CFA_IO_POLLING_USER__)
228                        post( this.io.poller.sem );
229                #endif
230
231                // Wait for the poller thread to finish
232                pthread_join( this.io.poller.slow.kthrd, 0p );
233                free( this.io.poller.slow.stack );
234
235                #if defined(__CFA_IO_POLLING_USER__)
236                        // unpark the fast io_poller
237                        unpark( &this.io.poller.fast.thrd __cfaabi_dbg_ctx2 );
238
239                        ^(this.io.poller.fast){};
240                #endif
241        }
242
243        void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
244                if(!main_cluster) {
245                        __kernel_io_prepare_stop( this );
246                }
247
248                // print statistics
249                #if !defined(__CFA_NO_STATISTICS__)
250                        if(this.print_stats) {
251                                __cfaabi_bits_print_safe( STDERR_FILENO,
252                                        "----- I/O uRing Stats -----\n"
253                                        "- total submit calls  : %llu\n"
254                                        "- avg submit          : %lf\n"
255                                        "- total wait calls    : %llu\n"
256                                        "- avg completion/wait : %lf\n",
257                                        this.io.submit_q.stats.submit_avg.cnt,
258                                        ((double)this.io.submit_q.stats.submit_avg.val) / this.io.submit_q.stats.submit_avg.cnt,
259                                        this.io.completion_q.stats.completed_avg.cnt,
260                                        ((double)this.io.completion_q.stats.completed_avg.val) / this.io.completion_q.stats.completed_avg.cnt
261                                );
262                        }
263                #endif
264
265                // Shutdown the io rings
266                struct io_uring_sq & sq = this.io.submit_q;
267                struct io_uring_cq & cq = this.io.completion_q;
268
269                // unmap the submit queue entries
270                munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
271
272                // unmap the Submit Queue ring
273                munmap(sq.ring_ptr, sq.ring_sz);
274
275                // unmap the Completion Queue ring, if it is different
276                if (cq.ring_ptr != sq.ring_ptr) {
277                        munmap(cq.ring_ptr, cq.ring_sz);
278                }
279
280                // close the file descriptor
281                close(this.io.fd);
282        }
283
284//=============================================================================================
285// I/O Polling
286//=============================================================================================
287        struct io_user_data {
288                int32_t result;
289                $thread * thrd;
290        };
291
292        // Process a single completion message from the io_uring
293        // This is NOT thread-safe
294        static int __drain_io( struct io_ring & ring, sigset_t * mask, int waitcnt, bool in_kernel ) {
295                int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
296                if( ret < 0 ) {
297                        switch((int)errno) {
298                        case EAGAIN:
299                        case EINTR:
300                                return -EAGAIN;
301                        default:
302                                abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
303                        }
304                }
305
306                // Drain the queue
307                unsigned head = *ring.completion_q.head;
308                unsigned tail = __atomic_load_n(ring.completion_q.tail, __ATOMIC_ACQUIRE);
309
310                // Nothing was new return 0
311                if (head == tail) {
312                        #if !defined(__CFA_NO_STATISTICS__)
313                                ring.completion_q.stats.completed_avg.cnt += 1;
314                        #endif
315                        return 0;
316                }
317
318                uint32_t count = tail - head;
319                for(i; count) {
320                        unsigned idx = (head + i) & (*ring.completion_q.mask);
321                        struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
322
323                        /* paranoid */ verify(&cqe);
324
325                        struct io_user_data * data = (struct io_user_data *)cqe.user_data;
326                        // __cfaabi_bits_print_safe( STDERR_FILENO, "Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
327
328                        data->result = cqe.res;
329                        if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
330                        else         { __unpark( data->thrd __cfaabi_dbg_ctx2 ); }
331                }
332
333                // Allow new submissions to happen
334                V(ring.submit, count);
335
336                // Mark to the kernel that the cqe has been seen
337                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
338                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
339
340                // Update statistics
341                #if !defined(__CFA_NO_STATISTICS__)
342                        ring.completion_q.stats.completed_avg.val += count;
343                        ring.completion_q.stats.completed_avg.cnt += 1;
344                #endif
345
346                return count;
347        }
348
349        static void * __io_poller_slow( void * arg ) {
350                cluster * cltr = (cluster *)arg;
351                struct io_ring & ring = cltr->io;
352
353                sigset_t mask;
354                sigfillset(&mask);
355                if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
356                        abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
357                }
358
359                sigdelset( &mask, SIGUSR1 );
360
361                verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
362                verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
363
364                while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
365                        #if defined(__CFA_IO_POLLING_USER__)
366
367                                // In the user-thread approach drain and if anything was drained,
368                                // batton pass to the user-thread
369                                int count = __drain_io( ring, &mask, 1, true );
370                                if(count > 0) {
371                                        __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
372                                        wait( ring.poller.sem );
373                                }
374
375                        #else
376
377                                //In the naive approach, just poll the io completion queue directly
378                                __drain_io( ring, &mask, 1, true );
379
380                        #endif
381                }
382
383                return 0p;
384        }
385
386        #if defined(__CFA_IO_POLLING_USER__)
387                void main( __io_poller_fast & this ) {
388                        // Start parked
389                        park( __cfaabi_dbg_ctx );
390
391                        // Then loop until we need to start
392                        while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
393                                // Drain the io
394                                if(0 > __drain_io( *this.ring, 0p, 0, false )) {
395                                        // If we got something, just yield and check again
396                                        yield();
397                                }
398                                else {
399                                        // We didn't get anything baton pass to the slow poller
400                                        post( this.ring->poller.sem );
401                                        park( __cfaabi_dbg_ctx );
402                                }
403                        }
404                }
405        #endif
406
407//=============================================================================================
408// I/O Submissions
409//=============================================================================================
410
411// Submition steps :
412// 1 - We need to make sure we don't overflow any of the buffer, P(ring.submit) to make sure
413//     entries are available. The semaphore make sure that there is no more operations in
414//     progress then the number of entries in the buffer. This probably limits concurrency
415//     more than necessary since submitted but not completed operations don't need any
416//     entries in user space. However, I don't know what happens if we overflow the buffers
417//     because too many requests completed at once. This is a safe approach in all cases.
418//     Furthermore, with hundreds of entries, this may be okay.
419//
420// 2 - Allocate a queue entry. The ring already has memory for all entries but only the ones
421//     listed in sq.array are visible by the kernel. For those not listed, the kernel does not
422//     offer any assurance that an entry is not being filled by multiple flags. Therefore, we
423//     need to write an allocator that allows allocating concurrently.
424//
425// 3 - Actually fill the submit entry, this is the only simple and straightforward step.
426//
427// 4 - Append the entry index to the array and adjust the tail accordingly. This operation
428//     needs to arrive to two concensus at the same time:
429//     A - The order in which entries are listed in the array: no two threads must pick the
430//         same index for their entries
431//     B - When can the tail be update for the kernel. EVERY entries in the array between
432//         head and tail must be fully filled and shouldn't ever be touched again.
433//
434
435        static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct io_ring & ring ) {
436                // Wait for a spot to be available
437                P(ring.submit);
438
439                // Allocate the sqe
440                uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST);
441
442                // Validate that we didn't overflow anything
443                // Check that nothing overflowed
444                /* paranoid */ verify( true );
445
446                // Check that it goes head -> tail -> alloc and never head -> alloc -> tail
447                /* paranoid */ verify( true );
448
449                // Return the sqe
450                return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx];
451        }
452
453        static inline void __submit( struct io_ring & ring, uint32_t idx ) {
454                // get mutual exclusion
455                lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
456
457                // Append to the list of ready entries
458                uint32_t * tail = ring.submit_q.tail;
459                const uint32_t mask = *ring.submit_q.mask;
460
461                ring.submit_q.array[ (*tail) & mask ] = idx & mask;
462                __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
463
464                // Submit however, many entries need to be submitted
465                int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
466                // __cfaabi_bits_print_safe( STDERR_FILENO, "Performed io_submit, returned %d\n", ret );
467                if( ret < 0 ) {
468                        switch((int)errno) {
469                        default:
470                                abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
471                        }
472                }
473
474                // update statistics
475                #if !defined(__CFA_NO_STATISTICS__)
476                        ring.submit_q.stats.submit_avg.val += 1;
477                        ring.submit_q.stats.submit_avg.cnt += 1;
478                #endif
479
480                unlock(ring.submit_q.lock);
481                // Make sure that idx was submitted
482                // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us
483        }
484
485        static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) {
486                this.opcode = opcode;
487                #if !defined(IOSQE_ASYNC)
488                        this.flags = 0;
489                #else
490                        this.flags = IOSQE_ASYNC;
491                #endif
492                this.ioprio = 0;
493                this.fd = fd;
494                this.off = 0;
495                this.addr = 0;
496                this.len = 0;
497                this.rw_flags = 0;
498                this.__pad2[0] = this.__pad2[1] = this.__pad2[2] = 0;
499        }
500
501        static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd, void * addr, uint32_t len, uint64_t off ) {
502                (this){ opcode, fd };
503                this.off = off;
504                this.addr = (uint64_t)addr;
505                this.len = len;
506        }
507
508
509//=============================================================================================
510// I/O Interface
511//=============================================================================================
512
513        #define __submit_prelude \
514                struct io_ring & ring = active_cluster()->io; \
515                struct io_uring_sqe * sqe; \
516                uint32_t idx; \
517                [sqe, idx] = __submit_alloc( ring );
518
519        #define __submit_wait \
520                io_user_data data = { 0, active_thread() }; \
521                /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \
522                sqe->user_data = (uint64_t)&data; \
523                __submit( ring, idx ); \
524                park( __cfaabi_dbg_ctx ); \
525                return data.result;
526#endif
527
528// Some forward declarations
529extern "C" {
530        #include <sys/types.h>
531        struct iovec;
532        extern ssize_t preadv2 (int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
533        extern ssize_t pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
534
535        extern int fsync(int fd);
536        extern int sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags);
537
538        struct msghdr;
539        struct sockaddr;
540        extern ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
541        extern ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
542        extern ssize_t send(int sockfd, const void *buf, size_t len, int flags);
543        extern ssize_t recv(int sockfd, void *buf, size_t len, int flags);
544        extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
545        extern int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
546
547        extern int fallocate(int fd, int mode, uint64_t offset, uint64_t len);
548        extern int posix_fadvise(int fd, uint64_t offset, uint64_t len, int advice);
549        extern int madvise(void *addr, size_t length, int advice);
550
551        extern int openat(int dirfd, const char *pathname, int flags, mode_t mode);
552        extern int close(int fd);
553
554        struct statx;
555        extern int statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf);
556
557        extern ssize_t read (int fd, void *buf, size_t count);
558}
559
560//-----------------------------------------------------------------------------
561// Asynchronous operations
562ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
563        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READV)
564                return preadv2(fd, iov, iovcnt, offset, flags);
565        #else
566                __submit_prelude
567
568                (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset };
569
570                __submit_wait
571        #endif
572}
573
574ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
575        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITEV)
576                return pwritev2(fd, iov, iovcnt, offset, flags);
577        #else
578                __submit_prelude
579
580                (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset };
581
582                __submit_wait
583        #endif
584}
585
586int cfa_fsync(int fd) {
587        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FSYNC)
588                return fsync(fd);
589        #else
590                __submit_prelude
591
592                (*sqe){ IORING_OP_FSYNC, fd };
593
594                __submit_wait
595        #endif
596}
597
598int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags) {
599        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SYNC_FILE_RANGE)
600                return sync_file_range(fd, offset, nbytes, flags);
601        #else
602                __submit_prelude
603
604                (*sqe){ IORING_OP_SYNC_FILE_RANGE, fd };
605                sqe->off = offset;
606                sqe->len = nbytes;
607                sqe->sync_range_flags = flags;
608
609                __submit_wait
610        #endif
611}
612
613
614ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags) {
615        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SENDMSG)
616                return recv(sockfd, msg, flags);
617        #else
618                __submit_prelude
619
620                (*sqe){ IORING_OP_SENDMSG, sockfd, msg, 1, 0 };
621                sqe->msg_flags = flags;
622
623                __submit_wait
624        #endif
625}
626
627ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags) {
628        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECVMSG)
629                return recv(sockfd, msg, flags);
630        #else
631                __submit_prelude
632
633                (*sqe){ IORING_OP_RECVMSG, sockfd, msg, 1, 0 };
634                sqe->msg_flags = flags;
635
636                __submit_wait
637        #endif
638}
639
640ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags) {
641        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SEND)
642                return send( sockfd, buf, len, flags );
643        #else
644                __submit_prelude
645
646                (*sqe){ IORING_OP_SEND, sockfd };
647                sqe->addr = (uint64_t)buf;
648                sqe->len = len;
649                sqe->msg_flags = flags;
650
651                __submit_wait
652        #endif
653}
654
655ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags) {
656        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECV)
657                return recv( sockfd, buf, len, flags );
658        #else
659                __submit_prelude
660
661                (*sqe){ IORING_OP_RECV, sockfd };
662                sqe->addr = (uint64_t)buf;
663                sqe->len = len;
664                sqe->msg_flags = flags;
665
666                __submit_wait
667        #endif
668}
669
670int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
671        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_ACCEPT)
672                return accept4( sockfd, addr, addrlen, flags );
673        #else
674                __submit_prelude
675
676                (*sqe){ IORING_OP_ACCEPT, sockfd };
677                sqe->addr = addr;
678                sqe->addr2 = addrlen;
679                sqe->accept_flags = flags;
680
681                __submit_wait
682        #endif
683}
684
685int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) {
686        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CONNECT)
687                return connect( sockfd, addr, addrlen );
688        #else
689                __submit_prelude
690
691                (*sqe){ IORING_OP_CONNECT, sockfd };
692                sqe->addr = (uint64_t)addr;
693                sqe->off = addrlen;
694
695                __submit_wait
696        #endif
697}
698
699int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len) {
700        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FALLOCATE)
701                return fallocate( fd, mode, offset, len );
702        #else
703                __submit_prelude
704
705                (*sqe){ IORING_OP_FALLOCATE, fd };
706                sqe->off = offset;
707                sqe->len = length;
708                sqe->mode = mode;
709
710                __submit_wait
711        #endif
712}
713
714int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice) {
715        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FADVISE)
716                return posix_fadvise( fd, offset, len, advice );
717        #else
718                __submit_prelude
719
720                (*sqe){ IORING_OP_FADVISE, fd };
721                sqe->off = (uint64_t)offset;
722                sqe->len = length;
723                sqe->fadvise_advice = advice;
724
725                __submit_wait
726        #endif
727}
728
729int cfa_madvise(void *addr, size_t length, int advice) {
730        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_MADVISE)
731                return madvise( addr, length, advice );
732        #else
733                __submit_prelude
734
735                (*sqe){ IORING_OP_MADVISE, 0 };
736                sqe->addr = (uint64_t)addr;
737                sqe->len = length;
738                sqe->fadvise_advice = advice;
739
740                __submit_wait
741        #endif
742}
743
744int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode) {
745        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_OPENAT)
746                return openat( dirfd, pathname, flags, mode );
747        #else
748                __submit_prelude
749
750                (*sqe){ IORING_OP_OPENAT, dirfd };
751                sqe->addr = (uint64_t)pathname;
752                sqe->open_flags = flags;
753                sqe->mode = mode;
754
755                __submit_wait
756        #endif
757}
758
759int cfa_close(int fd) {
760        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CLOSE)
761                return close( fd );
762        #else
763                __submit_prelude
764
765                (*sqe){ IORING_OP_CLOSE, fd };
766
767                __submit_wait
768        #endif
769}
770
771int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf) {
772        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_STATX)
773                //return statx( dirfd, pathname, flags, mask, statxbuf );
774                return syscall( __NR_io_uring_setup, dirfd, pathname, flags, mask, statxbuf );
775        #else
776                __submit_prelude
777
778                (*sqe){ IORING_OP_STATX, dirfd };
779                sqe->addr = (uint64_t)pathname;
780                sqe->statx_flags = flags;
781                sqe->len = mask;
782                sqe->off = (uint64_t)statxbuf;
783
784                __submit_wait
785        #endif
786}
787
788
789ssize_t cfa_read(int fd, void *buf, size_t count) {
790        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READ)
791                return read( fd, buf, count );
792        #else
793                __submit_prelude
794
795                (*sqe){ IORING_OP_READ, fd, buf, count, 0 };
796
797                __submit_wait
798        #endif
799}
800
801ssize_t cfa_write(int fd, void *buf, size_t count) {
802        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITE)
803                return read( fd, buf, count );
804        #else
805                __submit_prelude
806
807                (*sqe){ IORING_OP_WRITE, fd, buf, count, 0 };
808
809                __submit_wait
810        #endif
811}
812
813//-----------------------------------------------------------------------------
814// Check if a function is asynchronous
815
816// Macro magic to reduce the size of the following switch case
817#define IS_DEFINED_APPLY(f, ...) f(__VA_ARGS__)
818#define IS_DEFINED_SECOND(first, second, ...) second
819#define IS_DEFINED_TEST(expansion) _CFA_IO_FEATURE_##expansion
820#define IS_DEFINED(macro) IS_DEFINED_APPLY( IS_DEFINED_SECOND,IS_DEFINED_TEST(macro) false, true)
821
822bool has_user_level_blocking( fptr_t func ) {
823        #if defined(HAVE_LINUX_IO_URING_H)
824                if( /*func == (fptr_t)preadv2 || */
825                        func == (fptr_t)cfa_preadv2 )
826                        #define _CFA_IO_FEATURE_IORING_OP_READV ,
827                        return IS_DEFINED(IORING_OP_READV);
828
829                if( /*func == (fptr_t)pwritev2 || */
830                        func == (fptr_t)cfa_pwritev2 )
831                        #define _CFA_IO_FEATURE_IORING_OP_WRITEV ,
832                        return IS_DEFINED(IORING_OP_WRITEV);
833
834                if( /*func == (fptr_t)fsync || */
835                        func == (fptr_t)cfa_fsync )
836                        #define _CFA_IO_FEATURE_IORING_OP_FSYNC ,
837                        return IS_DEFINED(IORING_OP_FSYNC);
838
839                if( /*func == (fptr_t)ync_file_range || */
840                        func == (fptr_t)cfa_sync_file_range )
841                        #define _CFA_IO_FEATURE_IORING_OP_SYNC_FILE_RANGE ,
842                        return IS_DEFINED(IORING_OP_SYNC_FILE_RANGE);
843
844                if( /*func == (fptr_t)sendmsg || */
845                        func == (fptr_t)cfa_sendmsg )
846                        #define _CFA_IO_FEATURE_IORING_OP_SENDMSG ,
847                        return IS_DEFINED(IORING_OP_SENDMSG);
848
849                if( /*func == (fptr_t)recvmsg || */
850                        func == (fptr_t)cfa_recvmsg )
851                        #define _CFA_IO_FEATURE_IORING_OP_RECVMSG ,
852                        return IS_DEFINED(IORING_OP_RECVMSG);
853
854                if( /*func == (fptr_t)send || */
855                        func == (fptr_t)cfa_send )
856                        #define _CFA_IO_FEATURE_IORING_OP_SEND ,
857                        return IS_DEFINED(IORING_OP_SEND);
858
859                if( /*func == (fptr_t)recv || */
860                        func == (fptr_t)cfa_recv )
861                        #define _CFA_IO_FEATURE_IORING_OP_RECV ,
862                        return IS_DEFINED(IORING_OP_RECV);
863
864                if( /*func == (fptr_t)accept4 || */
865                        func == (fptr_t)cfa_accept4 )
866                        #define _CFA_IO_FEATURE_IORING_OP_ACCEPT ,
867                        return IS_DEFINED(IORING_OP_ACCEPT);
868
869                if( /*func == (fptr_t)connect || */
870                        func == (fptr_t)cfa_connect )
871                        #define _CFA_IO_FEATURE_IORING_OP_CONNECT ,
872                        return IS_DEFINED(IORING_OP_CONNECT);
873
874                if( /*func == (fptr_t)fallocate || */
875                        func == (fptr_t)cfa_fallocate )
876                        #define _CFA_IO_FEATURE_IORING_OP_FALLOCATE ,
877                        return IS_DEFINED(IORING_OP_FALLOCATE);
878
879                if( /*func == (fptr_t)posix_fadvise || */
880                        func == (fptr_t)cfa_fadvise )
881                        #define _CFA_IO_FEATURE_IORING_OP_FADVISE ,
882                        return IS_DEFINED(IORING_OP_FADVISE);
883
884                if( /*func == (fptr_t)madvise || */
885                        func == (fptr_t)cfa_madvise )
886                        #define _CFA_IO_FEATURE_IORING_OP_MADVISE ,
887                        return IS_DEFINED(IORING_OP_MADVISE);
888
889                if( /*func == (fptr_t)openat || */
890                        func == (fptr_t)cfa_openat )
891                        #define _CFA_IO_FEATURE_IORING_OP_OPENAT ,
892                        return IS_DEFINED(IORING_OP_OPENAT);
893
894                if( /*func == (fptr_t)close || */
895                        func == (fptr_t)cfa_close )
896                        #define _CFA_IO_FEATURE_IORING_OP_CLOSE ,
897                        return IS_DEFINED(IORING_OP_CLOSE);
898
899                if( /*func == (fptr_t)statx || */
900                        func == (fptr_t)cfa_statx )
901                        #define _CFA_IO_FEATURE_IORING_OP_STATX ,
902                        return IS_DEFINED(IORING_OP_STATX);
903
904                if( /*func == (fptr_t)read || */
905                        func == (fptr_t)cfa_read )
906                        #define _CFA_IO_FEATURE_IORING_OP_READ ,
907                        return IS_DEFINED(IORING_OP_READ);
908
909                if( /*func == (fptr_t)write || */
910                        func == (fptr_t)cfa_write )
911                        #define _CFA_IO_FEATURE_IORING_OP_WRITE ,
912                        return IS_DEFINED(IORING_OP_WRITE);
913        #endif
914
915        return false;
916}
Note: See TracBrowser for help on using the repository browser.