source: libcfa/src/concurrency/io.cfa @ af7acb9

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since af7acb9 was 87e0b015, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

  • Property mode set to 100644
File size: 33.6 KB
RevLine 
[ecf6b46]1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Thu Apr 23 17:31:00 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
[4069faad]16// #define __CFA_DEBUG_PRINT_IO__
[0a805f2]17// #define __CFA_DEBUG_PRINT_IO_CORE__
[4069faad]18
[92976d9]19#include "kernel.hfa"
20
21#if !defined(HAVE_LINUX_IO_URING_H)
[b6f2b21]22        void __kernel_io_startup( cluster &, int, bool ) {
[92976d9]23                // Nothing to do without io_uring
24        }
25
[3f7d0b4]26        void __kernel_io_finish_start( cluster & ) {
[f6660520]27                // Nothing to do without io_uring
28        }
29
[3f7d0b4]30        void __kernel_io_prepare_stop( cluster & ) {
[f6660520]31                // Nothing to do without io_uring
32        }
33
[3f7d0b4]34        void __kernel_io_shutdown( cluster &, bool ) {
[92976d9]35                // Nothing to do without io_uring
36        }
37
38#else
39        extern "C" {
40                #define _GNU_SOURCE         /* See feature_test_macros(7) */
41                #include <errno.h>
42                #include <stdint.h>
43                #include <string.h>
44                #include <unistd.h>
45                #include <sys/mman.h>
46                #include <sys/syscall.h>
47
48                #include <linux/io_uring.h>
49        }
50
51        #include "bits/signal.hfa"
52        #include "kernel_private.hfa"
53        #include "thread.hfa"
54
55        uint32_t entries_per_cluster() {
56                return 256;
57        }
58
[f6660520]59        static void * __io_poller_slow( void * arg );
60
61        // Weirdly, some systems that do support io_uring don't actually define these
62        #ifdef __alpha__
63                /*
64                * alpha is the only exception, all other architectures
65                * have common numbers for new system calls.
66                */
67                #ifndef __NR_io_uring_setup
68                        #define __NR_io_uring_setup           535
69                #endif
70                #ifndef __NR_io_uring_enter
71                        #define __NR_io_uring_enter           536
72                #endif
73                #ifndef __NR_io_uring_register
74                        #define __NR_io_uring_register        537
75                #endif
76        #else /* !__alpha__ */
77                #ifndef __NR_io_uring_setup
78                        #define __NR_io_uring_setup           425
79                #endif
80                #ifndef __NR_io_uring_enter
81                        #define __NR_io_uring_enter           426
82                #endif
83                #ifndef __NR_io_uring_register
84                        #define __NR_io_uring_register        427
85                #endif
86        #endif
87
[61dd73d]88        // Fast poller user-thread
89        // Not using the "thread" keyword because we want to control
90        // more carefully when to start/stop it
91        struct __io_poller_fast {
92                struct __io_data * ring;
93                bool waiting;
94                $thread thrd;
95        };
96
97        void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
98                this.ring = cltr.io;
99                this.waiting = true;
100                (this.thrd){ "Fast I/O Poller", cltr };
101        }
102        void ^?{}( __io_poller_fast & mutex this );
103        void main( __io_poller_fast & this );
104        static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
105        void ^?{}( __io_poller_fast & mutex this ) {}
106
107        struct __submition_data {
108                // Head and tail of the ring (associated with array)
109                volatile uint32_t * head;
110                volatile uint32_t * tail;
111
112                // The actual kernel ring which uses head/tail
113                // indexes into the sqes arrays
114                uint32_t * array;
115
116                // number of entries and mask to go with it
117                const uint32_t * num;
118                const uint32_t * mask;
119
120                // Submission flags (Not sure what for)
121                uint32_t * flags;
122
123                // number of sqes not submitted (whatever that means)
124                uint32_t * dropped;
125
126                // Like head/tail but not seen by the kernel
127                volatile uint32_t alloc;
128                volatile uint32_t ready;
129
130                __spinlock_t lock;
131
132                // A buffer of sqes (not the actual ring)
133                struct io_uring_sqe * sqes;
134
135                // The location and size of the mmaped area
136                void * ring_ptr;
137                size_t ring_sz;
138
139                // Statistics
140                #if !defined(__CFA_NO_STATISTICS__)
141                        struct {
142                                struct {
[05cfa4d]143                                        volatile unsigned long long int val;
144                                        volatile unsigned long long int cnt;
145                                        volatile unsigned long long int block;
[61dd73d]146                                } submit_avg;
147                        } stats;
148                #endif
149        };
150
151        struct __completion_data {
152                // Head and tail of the ring
153                volatile uint32_t * head;
154                volatile uint32_t * tail;
155
156                // number of entries and mask to go with it
157                const uint32_t * mask;
158                const uint32_t * num;
159
160                // number of cqes not submitted (whatever that means)
161                uint32_t * overflow;
162
163                // the kernel ring
164                struct io_uring_cqe * cqes;
165
166                // The location and size of the mmaped area
167                void * ring_ptr;
168                size_t ring_sz;
169
170                // Statistics
171                #if !defined(__CFA_NO_STATISTICS__)
172                        struct {
173                                struct {
174                                        unsigned long long int val;
175                                        unsigned long long int slow_cnt;
176                                        unsigned long long int fast_cnt;
177                                } completed_avg;
178                        } stats;
179                #endif
180        };
181
182        struct __io_data {
183                struct __submition_data submit_q;
184                struct __completion_data completion_q;
[b6f2b21]185                uint32_t ring_flags;
186                int cltr_flags;
[61dd73d]187                int fd;
188                semaphore submit;
189                volatile bool done;
190                struct {
191                        struct {
192                                void * stack;
193                                pthread_t kthrd;
194                        } slow;
195                        __io_poller_fast fast;
196                        __bin_sem_t sem;
197                } poller;
198        };
[185efe6]199
[92976d9]200//=============================================================================================
201// I/O Startup / Shutdown logic
202//=============================================================================================
[b6f2b21]203        void __kernel_io_startup( cluster & this, int io_flags, bool main_cluster ) {
[61dd73d]204                this.io = malloc();
205
[92976d9]206                // Step 1 : call to setup
207                struct io_uring_params params;
208                memset(&params, 0, sizeof(params));
209
[2d8f7b0]210                uint32_t nentries = entries_per_cluster();
211
212                int fd = syscall(__NR_io_uring_setup, nentries, &params );
[92976d9]213                if(fd < 0) {
214                        abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
215                }
216
217                // Step 2 : mmap result
[61dd73d]218                memset( this.io, 0, sizeof(struct __io_data) );
219                struct __submition_data  & sq = this.io->submit_q;
220                struct __completion_data & cq = this.io->completion_q;
[92976d9]221
222                // calculate the right ring size
[2d8f7b0]223                sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
224                cq.ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
[92976d9]225
226                // Requires features
[d384787]227                #if defined(IORING_FEAT_SINGLE_MMAP)
228                        // adjust the size according to the parameters
229                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
230                                cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz);
231                        }
232                #endif
[92976d9]233
234                // mmap the Submit Queue into existence
[2d8f7b0]235                sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
236                if (sq.ring_ptr == (void*)MAP_FAILED) {
[92976d9]237                        abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
238                }
239
240                // Requires features
[d384787]241                #if defined(IORING_FEAT_SINGLE_MMAP)
242                        // mmap the Completion Queue into existence (may or may not be needed)
243                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
244                                cq->ring_ptr = sq->ring_ptr;
245                        }
246                        else
247                #endif
248                {
[92976d9]249                        // We need multiple call to MMAP
[2d8f7b0]250                        cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
251                        if (cq.ring_ptr == (void*)MAP_FAILED) {
252                                munmap(sq.ring_ptr, sq.ring_sz);
[92976d9]253                                abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
254                        }
[d384787]255                }
[92976d9]256
257                // mmap the submit queue entries
258                size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
[2d8f7b0]259                sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
260                if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
261                        munmap(sq.ring_ptr, sq.ring_sz);
262                        if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
[92976d9]263                        abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
264                }
265
266                // Get the pointers from the kernel to fill the structure
267                // submit queue
[2d8f7b0]268                sq.head    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
269                sq.tail    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
270                sq.mask    = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
271                sq.num     = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
272                sq.flags   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
273                sq.dropped = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
274                sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
275                sq.alloc = *sq.tail;
[c59a346]276                sq.ready = *sq.tail;
[92976d9]277
278                // completion queue
[2d8f7b0]279                cq.head     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
280                cq.tail     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
281                cq.mask     = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
282                cq.num      = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
283                cq.overflow = (         uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
284                cq.cqes   = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
285
286                // some paranoid checks
287                /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
288                /* paranoid */ verifyf( (*cq.num)  >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
289                /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
290                /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
291
292                /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
293                /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
294                /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
295                /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
[92976d9]296
297                // Update the global ring info
[b6f2b21]298                this.io->ring_flags = params.flags;
299                this.io->cltr_flags = io_flags;
300                this.io->fd         = fd;
301                this.io->done       = false;
[61dd73d]302                (this.io->submit){ min(*sq.num, *cq.num) };
[92976d9]303
[d384787]304                // Initialize statistics
[038be32]305                #if !defined(__CFA_NO_STATISTICS__)
[05cfa4d]306                        this.io->submit_q.stats.submit_avg.val   = 0;
307                        this.io->submit_q.stats.submit_avg.cnt   = 0;
308                        this.io->submit_q.stats.submit_avg.block = 0;
[61dd73d]309                        this.io->completion_q.stats.completed_avg.val = 0;
310                        this.io->completion_q.stats.completed_avg.slow_cnt = 0;
311                        this.io->completion_q.stats.completed_avg.fast_cnt = 0;
[038be32]312                #endif
[d384787]313
[f6660520]314                if(!main_cluster) {
315                        __kernel_io_finish_start( this );
316                }
317        }
318
319        void __kernel_io_finish_start( cluster & this ) {
[b6f2b21]320                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
321                        __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
322                        (this.io->poller.fast){ this };
323                        __thrd_start( this.io->poller.fast, main );
324                }
[f6660520]325
[92976d9]326                // Create the poller thread
[0a805f2]327                __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this);
[61dd73d]328                this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
[92976d9]329        }
330
[f6660520]331        void __kernel_io_prepare_stop( cluster & this ) {
[0a805f2]332                __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
[92976d9]333                // Notify the poller thread of the shutdown
[61dd73d]334                __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
[f6660520]335
336                // Stop the IO Poller
[92976d9]337                sigval val = { 1 };
[61dd73d]338                pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
339                post( this.io->poller.sem );
[92976d9]340
341                // Wait for the poller thread to finish
[61dd73d]342                pthread_join( this.io->poller.slow.kthrd, 0p );
343                free( this.io->poller.slow.stack );
[f6660520]344
[0a805f2]345                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
[4069faad]346
[b6f2b21]347                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
[05cfa4d]348                        with( this.io->poller.fast ) {
349                                /* paranoid */ verify( waiting ); // The thread shouldn't be in a system call
350                                /* paranoid */ verify( this.procs.head == 0p || &this == mainCluster );
351                                /* paranoid */ verify( this.idles.head == 0p || &this == mainCluster );
352
353                                // We need to adjust the clean-up based on where the thread is
354                                if( thrd.preempted != __NO_PREEMPTION ) {
355
356                                        // This is the tricky case
357                                        // The thread was preempted and now it is on the ready queue
358                                        /* paranoid */ verify( thrd.state == Active );           // The thread better be in this state
359                                        /* paranoid */ verify( thrd.next == 1p );                // The thread should be the last on the list
360                                        /* paranoid */ verify( this.ready_queue.head == &thrd ); // The thread should be the only thing on the list
361
362                                        // Remove the thread from the ready queue of this cluster
363                                        this.ready_queue.head = 1p;
364                                        thrd.next = 0p;
365
366                                        // Fixup the thread state
367                                        thrd.state = Blocked;
368                                        thrd.preempted = __NO_PREEMPTION;
369
370                                        // Pretend like the thread was blocked all along
371                                }
372                                // !!! This is not an else if !!!
373                                if( thrd.state == Blocked ) {
[6502a2b]374
[05cfa4d]375                                        // This is the "easy case"
376                                        // The thread is parked and can easily be moved to active cluster
377                                        verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
378                                        thrd.curr_cluster = active_cluster();
[6502a2b]379
[f6660520]380                        // unpark the fast io_poller
[05cfa4d]381                                        unpark( &thrd __cfaabi_dbg_ctx2 );
382                                }
383                                else {
384
385                                        // The thread is in a weird state
386                                        // I don't know what to do here
387                                        abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n");
388                                }
389
390                        }
[f6660520]391
[61dd73d]392                        ^(this.io->poller.fast){};
[4069faad]393
[0a805f2]394                        __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
[b6f2b21]395                }
[f6660520]396        }
397
398        void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
399                if(!main_cluster) {
400                        __kernel_io_prepare_stop( this );
401                }
[92976d9]402
[d384787]403                // print statistics
[038be32]404                #if !defined(__CFA_NO_STATISTICS__)
405                        if(this.print_stats) {
[61dd73d]406                                with(this.io->submit_q.stats, this.io->completion_q.stats) {
407                                        __cfaabi_bits_print_safe( STDERR_FILENO,
408                                                "----- I/O uRing Stats -----\n"
[cb870e0]409                                                "- total submit calls  : %'15llu\n"
410                                                "- avg submit          : %'18.2lf\n"
411                                                "- pre-submit block %%  : %'18.2lf\n"
412                                                "- total wait calls    : %'15llu   (%'llu slow, %'llu fast)\n"
413                                                "- avg completion/wait : %'18.2lf\n",
[61dd73d]414                                                submit_avg.cnt,
415                                                ((double)submit_avg.val) / submit_avg.cnt,
[05cfa4d]416                                                (100.0 * submit_avg.block) / submit_avg.cnt,
[61dd73d]417                                                completed_avg.slow_cnt + completed_avg.fast_cnt,
418                                                completed_avg.slow_cnt,  completed_avg.fast_cnt,
419                                                ((double)completed_avg.val) / (completed_avg.slow_cnt + completed_avg.fast_cnt)
420                                        );
421                                }
[038be32]422                        }
423                #endif
[d384787]424
[92976d9]425                // Shutdown the io rings
[61dd73d]426                struct __submition_data  & sq = this.io->submit_q;
427                struct __completion_data & cq = this.io->completion_q;
[92976d9]428
429                // unmap the submit queue entries
[2d8f7b0]430                munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
[92976d9]431
432                // unmap the Submit Queue ring
433                munmap(sq.ring_ptr, sq.ring_sz);
434
435                // unmap the Completion Queue ring, if it is different
436                if (cq.ring_ptr != sq.ring_ptr) {
437                        munmap(cq.ring_ptr, cq.ring_sz);
438                }
439
440                // close the file descriptor
[61dd73d]441                close(this.io->fd);
442
443                free( this.io );
[92976d9]444        }
445
446//=============================================================================================
447// I/O Polling
448//=============================================================================================
449        struct io_user_data {
450                int32_t result;
451                $thread * thrd;
452        };
453
454        // Process a single completion message from the io_uring
455        // This is NOT thread-safe
[61dd73d]456        static int __drain_io( struct __io_data & ring, sigset_t * mask, int waitcnt, bool in_kernel ) {
[f6660520]457                int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
[d384787]458                if( ret < 0 ) {
459                        switch((int)errno) {
460                        case EAGAIN:
461                        case EINTR:
462                                return -EAGAIN;
463                        default:
464                                abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
465                        }
466                }
467
468                // Drain the queue
[92976d9]469                unsigned head = *ring.completion_q.head;
470                unsigned tail = __atomic_load_n(ring.completion_q.tail, __ATOMIC_ACQUIRE);
471
[d384787]472                // Nothing was new return 0
473                if (head == tail) {
474                        return 0;
475                }
[92976d9]476
[d384787]477                uint32_t count = tail - head;
478                for(i; count) {
479                        unsigned idx = (head + i) & (*ring.completion_q.mask);
480                        struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
[92976d9]481
[d384787]482                        /* paranoid */ verify(&cqe);
[92976d9]483
[d384787]484                        struct io_user_data * data = (struct io_user_data *)cqe.user_data;
[4069faad]485                        __cfadbg_print_safe( io, "Kernel I/O : Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
[2d8f7b0]486
[d384787]487                        data->result = cqe.res;
[f6660520]488                        if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
489                        else         { __unpark( data->thrd __cfaabi_dbg_ctx2 ); }
[d384787]490                }
[2d8f7b0]491
492                // Allow new submissions to happen
[d384787]493                V(ring.submit, count);
[92976d9]494
495                // Mark to the kernel that the cqe has been seen
496                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
[d384787]497                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
[92976d9]498
[d384787]499                return count;
[92976d9]500        }
501
[f6660520]502        static void * __io_poller_slow( void * arg ) {
[92976d9]503                cluster * cltr = (cluster *)arg;
[61dd73d]504                struct __io_data & ring = *cltr->io;
[92976d9]505
506                sigset_t mask;
507                sigfillset(&mask);
508                if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
509                        abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
510                }
511
512                sigdelset( &mask, SIGUSR1 );
513
514                verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
515                verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
516
[1539bbd]517                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
518
[b6f2b21]519                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
520                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
[f6660520]521                                // In the user-thread approach drain and if anything was drained,
522                                // batton pass to the user-thread
523                                int count = __drain_io( ring, &mask, 1, true );
[3c039b0]524
525                                // Update statistics
526                                #if !defined(__CFA_NO_STATISTICS__)
527                                        ring.completion_q.stats.completed_avg.val += count;
528                                        ring.completion_q.stats.completed_avg.slow_cnt += 1;
529                                #endif
530
[f6660520]531                                if(count > 0) {
[0a805f2]532                                        __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
[f6660520]533                                        __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
534                                        wait( ring.poller.sem );
535                                }
[b6f2b21]536                        }
537                }
538                else {
539                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
[f6660520]540                                //In the naive approach, just poll the io completion queue directly
[3c039b0]541                                int count = __drain_io( ring, &mask, 1, true );
542
543                                // Update statistics
544                                #if !defined(__CFA_NO_STATISTICS__)
545                                        ring.completion_q.stats.completed_avg.val += count;
546                                        ring.completion_q.stats.completed_avg.slow_cnt += 1;
547                                #endif
[b6f2b21]548                        }
[92976d9]549                }
550
[1539bbd]551                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
552
[92976d9]553                return 0p;
554        }
555
[61dd73d]556        void main( __io_poller_fast & this ) {
[b6f2b21]557                verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD );
558
[61dd73d]559                // Start parked
560                park( __cfaabi_dbg_ctx );
[f6660520]561
[61dd73d]562                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
[1539bbd]563
[4e74466]564                int reset = 0;
565
[61dd73d]566                // Then loop until we need to start
567                while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
568                        // Drain the io
569                        this.waiting = false;
570                        int count = __drain_io( *this.ring, 0p, 0, false );
[4e74466]571                        reset += count > 0 ? 1 : 0;
[3c039b0]572
[61dd73d]573                        // Update statistics
574                        #if !defined(__CFA_NO_STATISTICS__)
575                                this.ring->completion_q.stats.completed_avg.val += count;
576                                this.ring->completion_q.stats.completed_avg.fast_cnt += 1;
577                        #endif
[3c039b0]578
[61dd73d]579                        this.waiting = true;
[4e74466]580                        if(reset < 5) {
[61dd73d]581                                // If we got something, just yield and check again
582                                yield();
583                        }
584                        else {
585                                // We didn't get anything baton pass to the slow poller
586                                __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
587                                post( this.ring->poller.sem );
588                                park( __cfaabi_dbg_ctx );
[4e74466]589                                reset = 0;
[f6660520]590                        }
591                }
[61dd73d]592
593                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
594        }
[f6660520]595
[92976d9]596//=============================================================================================
597// I/O Submissions
598//=============================================================================================
599
[2d8f7b0]600// Submition steps :
601// 1 - We need to make sure we don't overflow any of the buffer, P(ring.submit) to make sure
602//     entries are available. The semaphore make sure that there is no more operations in
603//     progress then the number of entries in the buffer. This probably limits concurrency
604//     more than necessary since submitted but not completed operations don't need any
605//     entries in user space. However, I don't know what happens if we overflow the buffers
606//     because too many requests completed at once. This is a safe approach in all cases.
607//     Furthermore, with hundreds of entries, this may be okay.
608//
609// 2 - Allocate a queue entry. The ring already has memory for all entries but only the ones
610//     listed in sq.array are visible by the kernel. For those not listed, the kernel does not
611//     offer any assurance that an entry is not being filled by multiple flags. Therefore, we
612//     need to write an allocator that allows allocating concurrently.
613//
614// 3 - Actually fill the submit entry, this is the only simple and straightforward step.
615//
616// 4 - Append the entry index to the array and adjust the tail accordingly. This operation
617//     needs to arrive to two concensus at the same time:
618//     A - The order in which entries are listed in the array: no two threads must pick the
619//         same index for their entries
620//     B - When can the tail be update for the kernel. EVERY entries in the array between
621//         head and tail must be fully filled and shouldn't ever be touched again.
622//
623
[61dd73d]624        static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring ) {
[2489d31]625                // Wait for a spot to be available
[05cfa4d]626                __attribute__((unused)) bool blocked = P(ring.submit);
627                #if !defined(__CFA_NO_STATISTICS__)
628                        __atomic_fetch_add( &ring.submit_q.stats.submit_avg.block, blocked ? 1ul64 : 0ul64, __ATOMIC_RELAXED );
629                #endif
[2489d31]630
631                // Allocate the sqe
632                uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST);
633
634                // Validate that we didn't overflow anything
635                // Check that nothing overflowed
636                /* paranoid */ verify( true );
637
638                // Check that it goes head -> tail -> alloc and never head -> alloc -> tail
639                /* paranoid */ verify( true );
640
641                // Return the sqe
642                return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx];
643        }
644
[61dd73d]645        static inline void __submit( struct __io_data & ring, uint32_t idx ) {
[2489d31]646                // get mutual exclusion
647                lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
648
649                // Append to the list of ready entries
650                uint32_t * tail = ring.submit_q.tail;
651                const uint32_t mask = *ring.submit_q.mask;
652
653                ring.submit_q.array[ (*tail) & mask ] = idx & mask;
654                __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
655
656                // Submit however, many entries need to be submitted
657                int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
658                if( ret < 0 ) {
659                        switch((int)errno) {
660                        default:
661                                abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
662                        }
[2d8f7b0]663                }
[2489d31]664
[038be32]665                // update statistics
666                #if !defined(__CFA_NO_STATISTICS__)
667                        ring.submit_q.stats.submit_avg.val += 1;
668                        ring.submit_q.stats.submit_avg.cnt += 1;
669                #endif
[d384787]670
[2489d31]671                unlock(ring.submit_q.lock);
672                // Make sure that idx was submitted
673                // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us
[4069faad]674                __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
[2489d31]675        }
676
677        static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) {
678                this.opcode = opcode;
679                #if !defined(IOSQE_ASYNC)
680                        this.flags = 0;
681                #else
682                        this.flags = IOSQE_ASYNC;
683                #endif
684                this.ioprio = 0;
685                this.fd = fd;
686                this.off = 0;
687                this.addr = 0;
688                this.len = 0;
689                this.rw_flags = 0;
690                this.__pad2[0] = this.__pad2[1] = this.__pad2[2] = 0;
[2d8f7b0]691        }
692
[2489d31]693        static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd, void * addr, uint32_t len, uint64_t off ) {
694                (this){ opcode, fd };
695                this.off = off;
696                this.addr = (uint64_t)addr;
697                this.len = len;
698        }
[f6660520]699
[92976d9]700
701//=============================================================================================
702// I/O Interface
703//=============================================================================================
[f6660520]704
[2d8f7b0]705        #define __submit_prelude \
[61dd73d]706                struct __io_data & ring = *active_cluster()->io; \
[2d8f7b0]707                struct io_uring_sqe * sqe; \
708                uint32_t idx; \
709                [sqe, idx] = __submit_alloc( ring );
710
711        #define __submit_wait \
712                io_user_data data = { 0, active_thread() }; \
[185efe6]713                /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \
[2d8f7b0]714                sqe->user_data = (uint64_t)&data; \
715                __submit( ring, idx ); \
716                park( __cfaabi_dbg_ctx ); \
717                return data.result;
[ecf6b46]718#endif
[2d8f7b0]719
[0ea6c5a]720// Some forward declarations
721extern "C" {
[1268ad8]722        #include <unistd.h>
[0ea6c5a]723        #include <sys/types.h>
[93f7c001]724        #include <sys/socket.h>
[6136ecc]725        #include <sys/syscall.h>
[08a994e]726
727#if defined(HAVE_PREADV2)
[0ea6c5a]728        struct iovec;
729        extern ssize_t preadv2 (int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
[08a994e]730#endif
731#if defined(HAVE_PWRITEV2)
732        struct iovec;
[0ea6c5a]733        extern ssize_t pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
[08a994e]734#endif
[0ea6c5a]735
736        extern int fsync(int fd);
737        extern int sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags);
738
739        struct msghdr;
740        struct sockaddr;
741        extern ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
742        extern ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
743        extern ssize_t send(int sockfd, const void *buf, size_t len, int flags);
744        extern ssize_t recv(int sockfd, void *buf, size_t len, int flags);
745        extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
746        extern int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
747
748        extern int fallocate(int fd, int mode, uint64_t offset, uint64_t len);
749        extern int posix_fadvise(int fd, uint64_t offset, uint64_t len, int advice);
750        extern int madvise(void *addr, size_t length, int advice);
751
752        extern int openat(int dirfd, const char *pathname, int flags, mode_t mode);
753        extern int close(int fd);
754
755        extern ssize_t read (int fd, void *buf, size_t count);
756}
757
[2d8f7b0]758//-----------------------------------------------------------------------------
759// Asynchronous operations
[08a994e]760#if defined(HAVE_PREADV2)
761        ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
762                #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READV)
763                        return preadv2(fd, iov, iovcnt, offset, flags);
764                #else
765                        __submit_prelude
[ecf6b46]766
[08a994e]767                        (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset };
[ecf6b46]768
[08a994e]769                        __submit_wait
770                #endif
771        }
772#endif
[ecf6b46]773
[08a994e]774#if defined(HAVE_PWRITEV2)
775        ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
776                #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITEV)
777                        return pwritev2(fd, iov, iovcnt, offset, flags);
778                #else
779                        __submit_prelude
[ecf6b46]780
[08a994e]781                        (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset };
[ecf6b46]782
[08a994e]783                        __submit_wait
784                #endif
785        }
786#endif
[ecf6b46]787
788int cfa_fsync(int fd) {
789        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FSYNC)
790                return fsync(fd);
791        #else
792                __submit_prelude
793
794                (*sqe){ IORING_OP_FSYNC, fd };
795
796                __submit_wait
797        #endif
798}
799
800int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags) {
801        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SYNC_FILE_RANGE)
802                return sync_file_range(fd, offset, nbytes, flags);
803        #else
804                __submit_prelude
805
806                (*sqe){ IORING_OP_SYNC_FILE_RANGE, fd };
807                sqe->off = offset;
808                sqe->len = nbytes;
809                sqe->sync_range_flags = flags;
810
811                __submit_wait
812        #endif
813}
814
815
816ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags) {
817        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SENDMSG)
[2292067]818                return sendmsg(sockfd, msg, flags);
[ecf6b46]819        #else
820                __submit_prelude
821
822                (*sqe){ IORING_OP_SENDMSG, sockfd, msg, 1, 0 };
823                sqe->msg_flags = flags;
824
825                __submit_wait
826        #endif
827}
828
829ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags) {
830        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECVMSG)
[2292067]831                return recvmsg(sockfd, msg, flags);
[ecf6b46]832        #else
833                __submit_prelude
834
835                (*sqe){ IORING_OP_RECVMSG, sockfd, msg, 1, 0 };
836                sqe->msg_flags = flags;
837
838                __submit_wait
839        #endif
840}
841
842ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags) {
843        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SEND)
844                return send( sockfd, buf, len, flags );
845        #else
846                __submit_prelude
847
848                (*sqe){ IORING_OP_SEND, sockfd };
849                sqe->addr = (uint64_t)buf;
850                sqe->len = len;
851                sqe->msg_flags = flags;
852
853                __submit_wait
854        #endif
855}
856
857ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags) {
858        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECV)
859                return recv( sockfd, buf, len, flags );
860        #else
861                __submit_prelude
862
863                (*sqe){ IORING_OP_RECV, sockfd };
864                sqe->addr = (uint64_t)buf;
865                sqe->len = len;
866                sqe->msg_flags = flags;
867
868                __submit_wait
869        #endif
870}
871
872int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
873        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_ACCEPT)
[0ea6c5a]874                return accept4( sockfd, addr, addrlen, flags );
[ecf6b46]875        #else
876                __submit_prelude
877
878                (*sqe){ IORING_OP_ACCEPT, sockfd };
879                sqe->addr = addr;
880                sqe->addr2 = addrlen;
881                sqe->accept_flags = flags;
882
883                __submit_wait
884        #endif
885}
886
887int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) {
888        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CONNECT)
[0ea6c5a]889                return connect( sockfd, addr, addrlen );
[ecf6b46]890        #else
891                __submit_prelude
892
893                (*sqe){ IORING_OP_CONNECT, sockfd };
894                sqe->addr = (uint64_t)addr;
895                sqe->off = addrlen;
896
897                __submit_wait
898        #endif
899}
900
901int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len) {
902        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FALLOCATE)
903                return fallocate( fd, mode, offset, len );
904        #else
905                __submit_prelude
906
907                (*sqe){ IORING_OP_FALLOCATE, fd };
908                sqe->off = offset;
909                sqe->len = length;
910                sqe->mode = mode;
911
912                __submit_wait
913        #endif
914}
915
916int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice) {
917        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FADVISE)
918                return posix_fadvise( fd, offset, len, advice );
919        #else
920                __submit_prelude
921
922                (*sqe){ IORING_OP_FADVISE, fd };
923                sqe->off = (uint64_t)offset;
924                sqe->len = length;
925                sqe->fadvise_advice = advice;
926
927                __submit_wait
928        #endif
929}
930
931int cfa_madvise(void *addr, size_t length, int advice) {
932        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_MADVISE)
933                return madvise( addr, length, advice );
934        #else
935                __submit_prelude
936
937                (*sqe){ IORING_OP_MADVISE, 0 };
938                sqe->addr = (uint64_t)addr;
939                sqe->len = length;
940                sqe->fadvise_advice = advice;
941
942                __submit_wait
943        #endif
944}
945
946int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode) {
947        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_OPENAT)
948                return openat( dirfd, pathname, flags, mode );
949        #else
950                __submit_prelude
951
952                (*sqe){ IORING_OP_OPENAT, dirfd };
953                sqe->addr = (uint64_t)pathname;
954                sqe->open_flags = flags;
955                sqe->mode = mode;
956
957                __submit_wait
958        #endif
959}
960
961int cfa_close(int fd) {
962        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CLOSE)
963                return close( fd );
964        #else
965                __submit_prelude
966
967                (*sqe){ IORING_OP_CLOSE, fd };
968
969                __submit_wait
970        #endif
971}
972
973
974ssize_t cfa_read(int fd, void *buf, size_t count) {
975        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READ)
976                return read( fd, buf, count );
977        #else
978                __submit_prelude
979
980                (*sqe){ IORING_OP_READ, fd, buf, count, 0 };
981
982                __submit_wait
983        #endif
984}
985
986ssize_t cfa_write(int fd, void *buf, size_t count) {
987        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITE)
988                return read( fd, buf, count );
989        #else
990                __submit_prelude
991
992                (*sqe){ IORING_OP_WRITE, fd, buf, count, 0 };
993
994                __submit_wait
995        #endif
996}
[2d8f7b0]997
998//-----------------------------------------------------------------------------
999// Check if a function is asynchronous
1000
1001// Macro magic to reduce the size of the following switch case
[ecf6b46]1002#define IS_DEFINED_APPLY(f, ...) f(__VA_ARGS__)
1003#define IS_DEFINED_SECOND(first, second, ...) second
1004#define IS_DEFINED_TEST(expansion) _CFA_IO_FEATURE_##expansion
1005#define IS_DEFINED(macro) IS_DEFINED_APPLY( IS_DEFINED_SECOND,IS_DEFINED_TEST(macro) false, true)
[2d8f7b0]1006
[ecf6b46]1007bool has_user_level_blocking( fptr_t func ) {
1008        #if defined(HAVE_LINUX_IO_URING_H)
[08a994e]1009                #if defined(HAVE_PREADV2)
[171ca0d]1010                        if( /*func == (fptr_t)preadv2 || */
1011                                func == (fptr_t)cfa_preadv2 )
1012                                #define _CFA_IO_FEATURE_IORING_OP_READV ,
1013                                return IS_DEFINED(IORING_OP_READV);
1014                #endif
1015
1016                #if defined(HAVE_PWRITEV2)
[08a994e]1017                        if( /*func == (fptr_t)pwritev2 || */
1018                                func == (fptr_t)cfa_pwritev2 )
1019                                #define _CFA_IO_FEATURE_IORING_OP_WRITEV ,
1020                                return IS_DEFINED(IORING_OP_WRITEV);
1021                #endif
[2d8f7b0]1022
[171ca0d]1023                if( /*func == (fptr_t)fsync || */
1024                        func == (fptr_t)cfa_fsync )
1025                        #define _CFA_IO_FEATURE_IORING_OP_FSYNC ,
1026                        return IS_DEFINED(IORING_OP_FSYNC);
[2d8f7b0]1027
1028                if( /*func == (fptr_t)ync_file_range || */
[ecf6b46]1029                        func == (fptr_t)cfa_sync_file_range )
[2d8f7b0]1030                        #define _CFA_IO_FEATURE_IORING_OP_SYNC_FILE_RANGE ,
1031                        return IS_DEFINED(IORING_OP_SYNC_FILE_RANGE);
1032
1033                if( /*func == (fptr_t)sendmsg || */
[ecf6b46]1034                        func == (fptr_t)cfa_sendmsg )
[2d8f7b0]1035                        #define _CFA_IO_FEATURE_IORING_OP_SENDMSG ,
1036                        return IS_DEFINED(IORING_OP_SENDMSG);
1037
1038                if( /*func == (fptr_t)recvmsg || */
[ecf6b46]1039                        func == (fptr_t)cfa_recvmsg )
[2d8f7b0]1040                        #define _CFA_IO_FEATURE_IORING_OP_RECVMSG ,
1041                        return IS_DEFINED(IORING_OP_RECVMSG);
1042
1043                if( /*func == (fptr_t)send || */
[2489d31]1044                        func == (fptr_t)cfa_send )
[2d8f7b0]1045                        #define _CFA_IO_FEATURE_IORING_OP_SEND ,
1046                        return IS_DEFINED(IORING_OP_SEND);
1047
1048                if( /*func == (fptr_t)recv || */
[2489d31]1049                        func == (fptr_t)cfa_recv )
[2d8f7b0]1050                        #define _CFA_IO_FEATURE_IORING_OP_RECV ,
1051                        return IS_DEFINED(IORING_OP_RECV);
1052
1053                if( /*func == (fptr_t)accept4 || */
[2489d31]1054                        func == (fptr_t)cfa_accept4 )
[2d8f7b0]1055                        #define _CFA_IO_FEATURE_IORING_OP_ACCEPT ,
1056                        return IS_DEFINED(IORING_OP_ACCEPT);
1057
1058                if( /*func == (fptr_t)connect || */
[2489d31]1059                        func == (fptr_t)cfa_connect )
[2d8f7b0]1060                        #define _CFA_IO_FEATURE_IORING_OP_CONNECT ,
1061                        return IS_DEFINED(IORING_OP_CONNECT);
1062
1063                if( /*func == (fptr_t)fallocate || */
[2489d31]1064                        func == (fptr_t)cfa_fallocate )
[2d8f7b0]1065                        #define _CFA_IO_FEATURE_IORING_OP_FALLOCATE ,
1066                        return IS_DEFINED(IORING_OP_FALLOCATE);
1067
[0ea6c5a]1068                if( /*func == (fptr_t)posix_fadvise || */
[2489d31]1069                        func == (fptr_t)cfa_fadvise )
[2d8f7b0]1070                        #define _CFA_IO_FEATURE_IORING_OP_FADVISE ,
1071                        return IS_DEFINED(IORING_OP_FADVISE);
1072
1073                if( /*func == (fptr_t)madvise || */
[2489d31]1074                        func == (fptr_t)cfa_madvise )
[2d8f7b0]1075                        #define _CFA_IO_FEATURE_IORING_OP_MADVISE ,
1076                        return IS_DEFINED(IORING_OP_MADVISE);
1077
1078                if( /*func == (fptr_t)openat || */
[2489d31]1079                        func == (fptr_t)cfa_openat )
[2d8f7b0]1080                        #define _CFA_IO_FEATURE_IORING_OP_OPENAT ,
1081                        return IS_DEFINED(IORING_OP_OPENAT);
1082
1083                if( /*func == (fptr_t)close || */
[2489d31]1084                        func == (fptr_t)cfa_close )
[2d8f7b0]1085                        #define _CFA_IO_FEATURE_IORING_OP_CLOSE ,
1086                        return IS_DEFINED(IORING_OP_CLOSE);
1087
1088                if( /*func == (fptr_t)read || */
[ecf6b46]1089                        func == (fptr_t)cfa_read )
[2d8f7b0]1090                        #define _CFA_IO_FEATURE_IORING_OP_READ ,
1091                        return IS_DEFINED(IORING_OP_READ);
1092
1093                if( /*func == (fptr_t)write || */
[ecf6b46]1094                        func == (fptr_t)cfa_write )
[2d8f7b0]1095                        #define _CFA_IO_FEATURE_IORING_OP_WRITE ,
1096                        return IS_DEFINED(IORING_OP_WRITE);
[ecf6b46]1097        #endif
[2d8f7b0]1098
[ecf6b46]1099        return false;
1100}
Note: See TracBrowser for help on using the repository browser.