source: libcfa/src/concurrency/io.cfa @ 6502a2b

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 6502a2b was 6502a2b, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Tentative fix to the deadlock in cluster shutdown

  • Property mode set to 100644
File size: 29.5 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Thu Apr 23 17:31:00 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16// #define __CFA_DEBUG_PRINT_IO__
17// #define __CFA_DEBUG_PRINT_IO_CORE__
18
19#include "kernel.hfa"
20
21#if !defined(HAVE_LINUX_IO_URING_H)
22        void __kernel_io_startup( cluster & ) {
23                // Nothing to do without io_uring
24        }
25
26        void __kernel_io_start_thrd( cluster & ) {
27                // Nothing to do without io_uring
28        }
29
30        void __kernel_io_stop_thrd ( cluster & ) {
31                // Nothing to do without io_uring
32        }
33
34        void __kernel_io_shutdown( cluster & ) {
35                // Nothing to do without io_uring
36        }
37
38#else
39        extern "C" {
40                #define _GNU_SOURCE         /* See feature_test_macros(7) */
41                #include <errno.h>
42                #include <stdint.h>
43                #include <string.h>
44                #include <unistd.h>
45                #include <sys/mman.h>
46                #include <sys/syscall.h>
47
48                #include <linux/io_uring.h>
49        }
50
51        #include "bits/signal.hfa"
52        #include "kernel_private.hfa"
53        #include "thread.hfa"
54
55        uint32_t entries_per_cluster() {
56                return 256;
57        }
58
59        static void * __io_poller_slow( void * arg );
60
61        // Weirdly, some systems that do support io_uring don't actually define these
62        #ifdef __alpha__
63                /*
64                * alpha is the only exception, all other architectures
65                * have common numbers for new system calls.
66                */
67                #ifndef __NR_io_uring_setup
68                        #define __NR_io_uring_setup           535
69                #endif
70                #ifndef __NR_io_uring_enter
71                        #define __NR_io_uring_enter           536
72                #endif
73                #ifndef __NR_io_uring_register
74                        #define __NR_io_uring_register        537
75                #endif
76        #else /* !__alpha__ */
77                #ifndef __NR_io_uring_setup
78                        #define __NR_io_uring_setup           425
79                #endif
80                #ifndef __NR_io_uring_enter
81                        #define __NR_io_uring_enter           426
82                #endif
83                #ifndef __NR_io_uring_register
84                        #define __NR_io_uring_register        427
85                #endif
86        #endif
87
88        #if defined(__CFA_IO_POLLING_USER__)
89                void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
90                        this.ring = &cltr.io;
91                        this.waiting = true;
92                        (this.thrd){ "Fast I/O Poller", cltr };
93                }
94                void ^?{}( __io_poller_fast & mutex this );
95        void main( __io_poller_fast & this );
96        static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
97                void ^?{}( __io_poller_fast & mutex this ) {}
98        #endif
99
100//=============================================================================================
101// I/O Startup / Shutdown logic
102//=============================================================================================
103        void __kernel_io_startup( cluster & this, bool main_cluster ) {
104                // Step 1 : call to setup
105                struct io_uring_params params;
106                memset(&params, 0, sizeof(params));
107
108                uint32_t nentries = entries_per_cluster();
109
110                int fd = syscall(__NR_io_uring_setup, nentries, &params );
111                if(fd < 0) {
112                        abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
113                }
114
115                // Step 2 : mmap result
116                memset(&this.io, 0, sizeof(struct io_ring));
117                struct io_uring_sq & sq = this.io.submit_q;
118                struct io_uring_cq & cq = this.io.completion_q;
119
120                // calculate the right ring size
121                sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
122                cq.ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
123
124                // Requires features
125                #if defined(IORING_FEAT_SINGLE_MMAP)
126                        // adjust the size according to the parameters
127                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
128                                cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz);
129                        }
130                #endif
131
132                // mmap the Submit Queue into existence
133                sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
134                if (sq.ring_ptr == (void*)MAP_FAILED) {
135                        abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
136                }
137
138                // Requires features
139                #if defined(IORING_FEAT_SINGLE_MMAP)
140                        // mmap the Completion Queue into existence (may or may not be needed)
141                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
142                                cq->ring_ptr = sq->ring_ptr;
143                        }
144                        else
145                #endif
146                {
147                        // We need multiple call to MMAP
148                        cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
149                        if (cq.ring_ptr == (void*)MAP_FAILED) {
150                                munmap(sq.ring_ptr, sq.ring_sz);
151                                abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
152                        }
153                }
154
155                // mmap the submit queue entries
156                size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
157                sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
158                if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
159                        munmap(sq.ring_ptr, sq.ring_sz);
160                        if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
161                        abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
162                }
163
164                // Get the pointers from the kernel to fill the structure
165                // submit queue
166                sq.head    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
167                sq.tail    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
168                sq.mask    = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
169                sq.num     = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
170                sq.flags   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
171                sq.dropped = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
172                sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
173                sq.alloc = *sq.tail;
174                sq.ready = *sq.tail;
175
176                // completion queue
177                cq.head     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
178                cq.tail     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
179                cq.mask     = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
180                cq.num      = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
181                cq.overflow = (         uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
182                cq.cqes   = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
183
184                // some paranoid checks
185                /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
186                /* paranoid */ verifyf( (*cq.num)  >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
187                /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
188                /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
189
190                /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
191                /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
192                /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
193                /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
194
195                // Update the global ring info
196                this.io.flags = params.flags;
197                this.io.fd    = fd;
198                this.io.done  = false;
199                (this.io.submit){ min(*sq.num, *cq.num) };
200
201                // Initialize statistics
202                #if !defined(__CFA_NO_STATISTICS__)
203                        this.io.submit_q.stats.submit_avg.val = 0;
204                        this.io.submit_q.stats.submit_avg.cnt = 0;
205                        this.io.completion_q.stats.completed_avg.val = 0;
206                        this.io.completion_q.stats.completed_avg.cnt = 0;
207                #endif
208
209                if(!main_cluster) {
210                        __kernel_io_finish_start( this );
211                }
212        }
213
214        void __kernel_io_finish_start( cluster & this ) {
215                #if defined(__CFA_IO_POLLING_USER__)
216                        __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
217                        (this.io.poller.fast){ this };
218                        __thrd_start( this.io.poller.fast, main );
219                #endif
220
221                // Create the poller thread
222                __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this);
223                this.io.poller.slow.stack = __create_pthread( &this.io.poller.slow.kthrd, __io_poller_slow, &this );
224        }
225
226        void __kernel_io_prepare_stop( cluster & this ) {
227                __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
228                // Notify the poller thread of the shutdown
229                __atomic_store_n(&this.io.done, true, __ATOMIC_SEQ_CST);
230
231                // Stop the IO Poller
232                sigval val = { 1 };
233                pthread_sigqueue( this.io.poller.slow.kthrd, SIGUSR1, val );
234                #if defined(__CFA_IO_POLLING_USER__)
235                        post( this.io.poller.sem );
236                #endif
237
238                // Wait for the poller thread to finish
239                pthread_join( this.io.poller.slow.kthrd, 0p );
240                free( this.io.poller.slow.stack );
241
242                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
243
244                #if defined(__CFA_IO_POLLING_USER__)
245                        verify( this.io.poller.fast.waiting );
246                        verify( this.io.poller.fast.thrd.state == Blocked );
247
248                        this.io.poller.fast.thrd.curr_cluster = mainCluster;
249
250                        // unpark the fast io_poller
251                        unpark( &this.io.poller.fast.thrd __cfaabi_dbg_ctx2 );
252
253                        ^(this.io.poller.fast){};
254
255                        __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
256                #endif
257        }
258
259        void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
260                if(!main_cluster) {
261                        __kernel_io_prepare_stop( this );
262                }
263
264                // print statistics
265                #if !defined(__CFA_NO_STATISTICS__)
266                        if(this.print_stats) {
267                                __cfaabi_bits_print_safe( STDERR_FILENO,
268                                        "----- I/O uRing Stats -----\n"
269                                        "- total submit calls  : %llu\n"
270                                        "- avg submit          : %lf\n"
271                                        "- total wait calls    : %llu\n"
272                                        "- avg completion/wait : %lf\n",
273                                        this.io.submit_q.stats.submit_avg.cnt,
274                                        ((double)this.io.submit_q.stats.submit_avg.val) / this.io.submit_q.stats.submit_avg.cnt,
275                                        this.io.completion_q.stats.completed_avg.cnt,
276                                        ((double)this.io.completion_q.stats.completed_avg.val) / this.io.completion_q.stats.completed_avg.cnt
277                                );
278                        }
279                #endif
280
281                // Shutdown the io rings
282                struct io_uring_sq & sq = this.io.submit_q;
283                struct io_uring_cq & cq = this.io.completion_q;
284
285                // unmap the submit queue entries
286                munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
287
288                // unmap the Submit Queue ring
289                munmap(sq.ring_ptr, sq.ring_sz);
290
291                // unmap the Completion Queue ring, if it is different
292                if (cq.ring_ptr != sq.ring_ptr) {
293                        munmap(cq.ring_ptr, cq.ring_sz);
294                }
295
296                // close the file descriptor
297                close(this.io.fd);
298        }
299
300//=============================================================================================
301// I/O Polling
302//=============================================================================================
303        struct io_user_data {
304                int32_t result;
305                $thread * thrd;
306        };
307
308        // Process a single completion message from the io_uring
309        // This is NOT thread-safe
310        static int __drain_io( struct io_ring & ring, sigset_t * mask, int waitcnt, bool in_kernel ) {
311                int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
312                if( ret < 0 ) {
313                        switch((int)errno) {
314                        case EAGAIN:
315                        case EINTR:
316                                return -EAGAIN;
317                        default:
318                                abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
319                        }
320                }
321
322                // Drain the queue
323                unsigned head = *ring.completion_q.head;
324                unsigned tail = __atomic_load_n(ring.completion_q.tail, __ATOMIC_ACQUIRE);
325
326                // Nothing was new return 0
327                if (head == tail) {
328                        #if !defined(__CFA_NO_STATISTICS__)
329                                ring.completion_q.stats.completed_avg.cnt += 1;
330                        #endif
331                        return 0;
332                }
333
334                uint32_t count = tail - head;
335                for(i; count) {
336                        unsigned idx = (head + i) & (*ring.completion_q.mask);
337                        struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
338
339                        /* paranoid */ verify(&cqe);
340
341                        struct io_user_data * data = (struct io_user_data *)cqe.user_data;
342                        __cfadbg_print_safe( io, "Kernel I/O : Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
343
344                        data->result = cqe.res;
345                        if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
346                        else         { __unpark( data->thrd __cfaabi_dbg_ctx2 ); }
347                }
348
349                // Allow new submissions to happen
350                V(ring.submit, count);
351
352                // Mark to the kernel that the cqe has been seen
353                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
354                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
355
356                // Update statistics
357                #if !defined(__CFA_NO_STATISTICS__)
358                        ring.completion_q.stats.completed_avg.val += count;
359                        ring.completion_q.stats.completed_avg.cnt += 1;
360                #endif
361
362                return count;
363        }
364
365        static void * __io_poller_slow( void * arg ) {
366                cluster * cltr = (cluster *)arg;
367                struct io_ring & ring = cltr->io;
368
369                sigset_t mask;
370                sigfillset(&mask);
371                if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
372                        abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
373                }
374
375                sigdelset( &mask, SIGUSR1 );
376
377                verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
378                verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
379
380                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
381
382                while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
383                        #if defined(__CFA_IO_POLLING_USER__)
384
385                                // In the user-thread approach drain and if anything was drained,
386                                // batton pass to the user-thread
387                                int count = __drain_io( ring, &mask, 1, true );
388                                if(count > 0) {
389                                        __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
390                                        __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
391                                        wait( ring.poller.sem );
392                                }
393
394                        #else
395
396                                //In the naive approach, just poll the io completion queue directly
397                                __drain_io( ring, &mask, 1, true );
398
399                        #endif
400                }
401
402                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
403
404                return 0p;
405        }
406
407        #if defined(__CFA_IO_POLLING_USER__)
408                void main( __io_poller_fast & this ) {
409                        // Start parked
410                        park( __cfaabi_dbg_ctx );
411
412                        __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
413
414                        // Then loop until we need to start
415                        while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
416                                // Drain the io
417                                this.waiting = false;
418                                int ret = __drain_io( *this.ring, 0p, 0, false );
419                                this.waiting = true;
420                                if(0 > ret) {
421                                        // If we got something, just yield and check again
422                                        yield();
423                                }
424                                else {
425                                        // We didn't get anything baton pass to the slow poller
426                                        __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
427                                        post( this.ring->poller.sem );
428                                        park( __cfaabi_dbg_ctx );
429                                }
430                        }
431
432                        __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
433                }
434        #endif
435
436//=============================================================================================
437// I/O Submissions
438//=============================================================================================
439
440// Submition steps :
441// 1 - We need to make sure we don't overflow any of the buffer, P(ring.submit) to make sure
442//     entries are available. The semaphore make sure that there is no more operations in
443//     progress then the number of entries in the buffer. This probably limits concurrency
444//     more than necessary since submitted but not completed operations don't need any
445//     entries in user space. However, I don't know what happens if we overflow the buffers
446//     because too many requests completed at once. This is a safe approach in all cases.
447//     Furthermore, with hundreds of entries, this may be okay.
448//
449// 2 - Allocate a queue entry. The ring already has memory for all entries but only the ones
450//     listed in sq.array are visible by the kernel. For those not listed, the kernel does not
451//     offer any assurance that an entry is not being filled by multiple flags. Therefore, we
452//     need to write an allocator that allows allocating concurrently.
453//
454// 3 - Actually fill the submit entry, this is the only simple and straightforward step.
455//
456// 4 - Append the entry index to the array and adjust the tail accordingly. This operation
457//     needs to arrive to two concensus at the same time:
458//     A - The order in which entries are listed in the array: no two threads must pick the
459//         same index for their entries
460//     B - When can the tail be update for the kernel. EVERY entries in the array between
461//         head and tail must be fully filled and shouldn't ever be touched again.
462//
463
464        static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct io_ring & ring ) {
465                // Wait for a spot to be available
466                P(ring.submit);
467
468                // Allocate the sqe
469                uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST);
470
471                // Validate that we didn't overflow anything
472                // Check that nothing overflowed
473                /* paranoid */ verify( true );
474
475                // Check that it goes head -> tail -> alloc and never head -> alloc -> tail
476                /* paranoid */ verify( true );
477
478                // Return the sqe
479                return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx];
480        }
481
482        static inline void __submit( struct io_ring & ring, uint32_t idx ) {
483                // get mutual exclusion
484                lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
485
486                // Append to the list of ready entries
487                uint32_t * tail = ring.submit_q.tail;
488                const uint32_t mask = *ring.submit_q.mask;
489
490                ring.submit_q.array[ (*tail) & mask ] = idx & mask;
491                __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
492
493                // Submit however, many entries need to be submitted
494                int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
495                if( ret < 0 ) {
496                        switch((int)errno) {
497                        default:
498                                abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
499                        }
500                }
501
502                // update statistics
503                #if !defined(__CFA_NO_STATISTICS__)
504                        ring.submit_q.stats.submit_avg.val += 1;
505                        ring.submit_q.stats.submit_avg.cnt += 1;
506                #endif
507
508                unlock(ring.submit_q.lock);
509                // Make sure that idx was submitted
510                // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us
511                __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
512        }
513
514        static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) {
515                this.opcode = opcode;
516                #if !defined(IOSQE_ASYNC)
517                        this.flags = 0;
518                #else
519                        this.flags = IOSQE_ASYNC;
520                #endif
521                this.ioprio = 0;
522                this.fd = fd;
523                this.off = 0;
524                this.addr = 0;
525                this.len = 0;
526                this.rw_flags = 0;
527                this.__pad2[0] = this.__pad2[1] = this.__pad2[2] = 0;
528        }
529
530        static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd, void * addr, uint32_t len, uint64_t off ) {
531                (this){ opcode, fd };
532                this.off = off;
533                this.addr = (uint64_t)addr;
534                this.len = len;
535        }
536
537
538//=============================================================================================
539// I/O Interface
540//=============================================================================================
541
542        #define __submit_prelude \
543                struct io_ring & ring = active_cluster()->io; \
544                struct io_uring_sqe * sqe; \
545                uint32_t idx; \
546                [sqe, idx] = __submit_alloc( ring );
547
548        #define __submit_wait \
549                io_user_data data = { 0, active_thread() }; \
550                /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \
551                sqe->user_data = (uint64_t)&data; \
552                __submit( ring, idx ); \
553                park( __cfaabi_dbg_ctx ); \
554                return data.result;
555#endif
556
557// Some forward declarations
558extern "C" {
559        #include <unistd.h>
560        #include <sys/types.h>
561        #include <sys/socket.h>
562        #include <sys/syscall.h>
563        struct iovec;
564        extern ssize_t preadv2 (int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
565        extern ssize_t pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
566
567        extern int fsync(int fd);
568        extern int sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags);
569
570        struct msghdr;
571        struct sockaddr;
572        extern ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
573        extern ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
574        extern ssize_t send(int sockfd, const void *buf, size_t len, int flags);
575        extern ssize_t recv(int sockfd, void *buf, size_t len, int flags);
576        extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
577        extern int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
578
579        extern int fallocate(int fd, int mode, uint64_t offset, uint64_t len);
580        extern int posix_fadvise(int fd, uint64_t offset, uint64_t len, int advice);
581        extern int madvise(void *addr, size_t length, int advice);
582
583        extern int openat(int dirfd, const char *pathname, int flags, mode_t mode);
584        extern int close(int fd);
585
586        struct statx;
587        extern int statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf);
588
589        extern ssize_t read (int fd, void *buf, size_t count);
590}
591
592//-----------------------------------------------------------------------------
593// Asynchronous operations
594ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
595        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READV)
596                return preadv2(fd, iov, iovcnt, offset, flags);
597        #else
598                __submit_prelude
599
600                (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset };
601
602                __submit_wait
603        #endif
604}
605
606ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
607        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITEV)
608                return pwritev2(fd, iov, iovcnt, offset, flags);
609        #else
610                __submit_prelude
611
612                (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset };
613
614                __submit_wait
615        #endif
616}
617
618int cfa_fsync(int fd) {
619        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FSYNC)
620                return fsync(fd);
621        #else
622                __submit_prelude
623
624                (*sqe){ IORING_OP_FSYNC, fd };
625
626                __submit_wait
627        #endif
628}
629
630int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags) {
631        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SYNC_FILE_RANGE)
632                return sync_file_range(fd, offset, nbytes, flags);
633        #else
634                __submit_prelude
635
636                (*sqe){ IORING_OP_SYNC_FILE_RANGE, fd };
637                sqe->off = offset;
638                sqe->len = nbytes;
639                sqe->sync_range_flags = flags;
640
641                __submit_wait
642        #endif
643}
644
645
646ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags) {
647        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SENDMSG)
648                return sendmsg(sockfd, msg, flags);
649        #else
650                __submit_prelude
651
652                (*sqe){ IORING_OP_SENDMSG, sockfd, msg, 1, 0 };
653                sqe->msg_flags = flags;
654
655                __submit_wait
656        #endif
657}
658
659ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags) {
660        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECVMSG)
661                return recvmsg(sockfd, msg, flags);
662        #else
663                __submit_prelude
664
665                (*sqe){ IORING_OP_RECVMSG, sockfd, msg, 1, 0 };
666                sqe->msg_flags = flags;
667
668                __submit_wait
669        #endif
670}
671
672ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags) {
673        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SEND)
674                return send( sockfd, buf, len, flags );
675        #else
676                __submit_prelude
677
678                (*sqe){ IORING_OP_SEND, sockfd };
679                sqe->addr = (uint64_t)buf;
680                sqe->len = len;
681                sqe->msg_flags = flags;
682
683                __submit_wait
684        #endif
685}
686
687ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags) {
688        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECV)
689                return recv( sockfd, buf, len, flags );
690        #else
691                __submit_prelude
692
693                (*sqe){ IORING_OP_RECV, sockfd };
694                sqe->addr = (uint64_t)buf;
695                sqe->len = len;
696                sqe->msg_flags = flags;
697
698                __submit_wait
699        #endif
700}
701
702int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
703        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_ACCEPT)
704                return accept4( sockfd, addr, addrlen, flags );
705        #else
706                __submit_prelude
707
708                (*sqe){ IORING_OP_ACCEPT, sockfd };
709                sqe->addr = addr;
710                sqe->addr2 = addrlen;
711                sqe->accept_flags = flags;
712
713                __submit_wait
714        #endif
715}
716
717int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) {
718        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CONNECT)
719                return connect( sockfd, addr, addrlen );
720        #else
721                __submit_prelude
722
723                (*sqe){ IORING_OP_CONNECT, sockfd };
724                sqe->addr = (uint64_t)addr;
725                sqe->off = addrlen;
726
727                __submit_wait
728        #endif
729}
730
731int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len) {
732        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FALLOCATE)
733                return fallocate( fd, mode, offset, len );
734        #else
735                __submit_prelude
736
737                (*sqe){ IORING_OP_FALLOCATE, fd };
738                sqe->off = offset;
739                sqe->len = length;
740                sqe->mode = mode;
741
742                __submit_wait
743        #endif
744}
745
746int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice) {
747        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FADVISE)
748                return posix_fadvise( fd, offset, len, advice );
749        #else
750                __submit_prelude
751
752                (*sqe){ IORING_OP_FADVISE, fd };
753                sqe->off = (uint64_t)offset;
754                sqe->len = length;
755                sqe->fadvise_advice = advice;
756
757                __submit_wait
758        #endif
759}
760
761int cfa_madvise(void *addr, size_t length, int advice) {
762        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_MADVISE)
763                return madvise( addr, length, advice );
764        #else
765                __submit_prelude
766
767                (*sqe){ IORING_OP_MADVISE, 0 };
768                sqe->addr = (uint64_t)addr;
769                sqe->len = length;
770                sqe->fadvise_advice = advice;
771
772                __submit_wait
773        #endif
774}
775
776int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode) {
777        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_OPENAT)
778                return openat( dirfd, pathname, flags, mode );
779        #else
780                __submit_prelude
781
782                (*sqe){ IORING_OP_OPENAT, dirfd };
783                sqe->addr = (uint64_t)pathname;
784                sqe->open_flags = flags;
785                sqe->mode = mode;
786
787                __submit_wait
788        #endif
789}
790
791int cfa_close(int fd) {
792        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CLOSE)
793                return close( fd );
794        #else
795                __submit_prelude
796
797                (*sqe){ IORING_OP_CLOSE, fd };
798
799                __submit_wait
800        #endif
801}
802
803int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf) {
804        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_STATX)
805                //return statx( dirfd, pathname, flags, mask, statxbuf );
806                return syscall( __NR_statx, dirfd, pathname, flags, mask, statxbuf );
807        #else
808                __submit_prelude
809
810                (*sqe){ IORING_OP_STATX, dirfd };
811                sqe->addr = (uint64_t)pathname;
812                sqe->statx_flags = flags;
813                sqe->len = mask;
814                sqe->off = (uint64_t)statxbuf;
815
816                __submit_wait
817        #endif
818}
819
820
821ssize_t cfa_read(int fd, void *buf, size_t count) {
822        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READ)
823                return read( fd, buf, count );
824        #else
825                __submit_prelude
826
827                (*sqe){ IORING_OP_READ, fd, buf, count, 0 };
828
829                __submit_wait
830        #endif
831}
832
833ssize_t cfa_write(int fd, void *buf, size_t count) {
834        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITE)
835                return read( fd, buf, count );
836        #else
837                __submit_prelude
838
839                (*sqe){ IORING_OP_WRITE, fd, buf, count, 0 };
840
841                __submit_wait
842        #endif
843}
844
845//-----------------------------------------------------------------------------
846// Check if a function is asynchronous
847
848// Macro magic to reduce the size of the following switch case
849#define IS_DEFINED_APPLY(f, ...) f(__VA_ARGS__)
850#define IS_DEFINED_SECOND(first, second, ...) second
851#define IS_DEFINED_TEST(expansion) _CFA_IO_FEATURE_##expansion
852#define IS_DEFINED(macro) IS_DEFINED_APPLY( IS_DEFINED_SECOND,IS_DEFINED_TEST(macro) false, true)
853
854bool has_user_level_blocking( fptr_t func ) {
855        #if defined(HAVE_LINUX_IO_URING_H)
856                if( /*func == (fptr_t)preadv2 || */
857                        func == (fptr_t)cfa_preadv2 )
858                        #define _CFA_IO_FEATURE_IORING_OP_READV ,
859                        return IS_DEFINED(IORING_OP_READV);
860
861                if( /*func == (fptr_t)pwritev2 || */
862                        func == (fptr_t)cfa_pwritev2 )
863                        #define _CFA_IO_FEATURE_IORING_OP_WRITEV ,
864                        return IS_DEFINED(IORING_OP_WRITEV);
865
866                if( /*func == (fptr_t)fsync || */
867                        func == (fptr_t)cfa_fsync )
868                        #define _CFA_IO_FEATURE_IORING_OP_FSYNC ,
869                        return IS_DEFINED(IORING_OP_FSYNC);
870
871                if( /*func == (fptr_t)ync_file_range || */
872                        func == (fptr_t)cfa_sync_file_range )
873                        #define _CFA_IO_FEATURE_IORING_OP_SYNC_FILE_RANGE ,
874                        return IS_DEFINED(IORING_OP_SYNC_FILE_RANGE);
875
876                if( /*func == (fptr_t)sendmsg || */
877                        func == (fptr_t)cfa_sendmsg )
878                        #define _CFA_IO_FEATURE_IORING_OP_SENDMSG ,
879                        return IS_DEFINED(IORING_OP_SENDMSG);
880
881                if( /*func == (fptr_t)recvmsg || */
882                        func == (fptr_t)cfa_recvmsg )
883                        #define _CFA_IO_FEATURE_IORING_OP_RECVMSG ,
884                        return IS_DEFINED(IORING_OP_RECVMSG);
885
886                if( /*func == (fptr_t)send || */
887                        func == (fptr_t)cfa_send )
888                        #define _CFA_IO_FEATURE_IORING_OP_SEND ,
889                        return IS_DEFINED(IORING_OP_SEND);
890
891                if( /*func == (fptr_t)recv || */
892                        func == (fptr_t)cfa_recv )
893                        #define _CFA_IO_FEATURE_IORING_OP_RECV ,
894                        return IS_DEFINED(IORING_OP_RECV);
895
896                if( /*func == (fptr_t)accept4 || */
897                        func == (fptr_t)cfa_accept4 )
898                        #define _CFA_IO_FEATURE_IORING_OP_ACCEPT ,
899                        return IS_DEFINED(IORING_OP_ACCEPT);
900
901                if( /*func == (fptr_t)connect || */
902                        func == (fptr_t)cfa_connect )
903                        #define _CFA_IO_FEATURE_IORING_OP_CONNECT ,
904                        return IS_DEFINED(IORING_OP_CONNECT);
905
906                if( /*func == (fptr_t)fallocate || */
907                        func == (fptr_t)cfa_fallocate )
908                        #define _CFA_IO_FEATURE_IORING_OP_FALLOCATE ,
909                        return IS_DEFINED(IORING_OP_FALLOCATE);
910
911                if( /*func == (fptr_t)posix_fadvise || */
912                        func == (fptr_t)cfa_fadvise )
913                        #define _CFA_IO_FEATURE_IORING_OP_FADVISE ,
914                        return IS_DEFINED(IORING_OP_FADVISE);
915
916                if( /*func == (fptr_t)madvise || */
917                        func == (fptr_t)cfa_madvise )
918                        #define _CFA_IO_FEATURE_IORING_OP_MADVISE ,
919                        return IS_DEFINED(IORING_OP_MADVISE);
920
921                if( /*func == (fptr_t)openat || */
922                        func == (fptr_t)cfa_openat )
923                        #define _CFA_IO_FEATURE_IORING_OP_OPENAT ,
924                        return IS_DEFINED(IORING_OP_OPENAT);
925
926                if( /*func == (fptr_t)close || */
927                        func == (fptr_t)cfa_close )
928                        #define _CFA_IO_FEATURE_IORING_OP_CLOSE ,
929                        return IS_DEFINED(IORING_OP_CLOSE);
930
931                if( /*func == (fptr_t)statx || */
932                        func == (fptr_t)cfa_statx )
933                        #define _CFA_IO_FEATURE_IORING_OP_STATX ,
934                        return IS_DEFINED(IORING_OP_STATX);
935
936                if( /*func == (fptr_t)read || */
937                        func == (fptr_t)cfa_read )
938                        #define _CFA_IO_FEATURE_IORING_OP_READ ,
939                        return IS_DEFINED(IORING_OP_READ);
940
941                if( /*func == (fptr_t)write || */
942                        func == (fptr_t)cfa_write )
943                        #define _CFA_IO_FEATURE_IORING_OP_WRITE ,
944                        return IS_DEFINED(IORING_OP_WRITE);
945        #endif
946
947        return false;
948}
Note: See TracBrowser for help on using the repository browser.