Ignore:
Timestamp:
Aug 11, 2020, 4:40:15 PM (5 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
0d070ca
Parents:
07d867b (diff), 129674b (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' into new-ast

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    r07d867b r22f94a4  
    1414//
    1515
    16 // #define __CFA_DEBUG_PRINT_IO__
    17 // #define __CFA_DEBUG_PRINT_IO_CORE__
    18 
    19 #include "kernel.hfa"
    20 
    21 #if !defined(HAVE_LINUX_IO_URING_H)
    22         void __kernel_io_startup( cluster &, int, bool ) {
    23                 // Nothing to do without io_uring
    24         }
    25 
    26         void __kernel_io_finish_start( cluster & ) {
    27                 // Nothing to do without io_uring
    28         }
    29 
    30         void __kernel_io_prepare_stop( cluster & ) {
    31                 // Nothing to do without io_uring
    32         }
    33 
    34         void __kernel_io_shutdown( cluster &, bool ) {
    35                 // Nothing to do without io_uring
    36         }
    37 
    38 #else
     16#define __cforall_thread__
     17
     18#if defined(__CFA_DEBUG__)
     19        // #define __CFA_DEBUG_PRINT_IO__
     20        // #define __CFA_DEBUG_PRINT_IO_CORE__
     21#endif
     22
     23
     24#if defined(CFA_HAVE_LINUX_IO_URING_H)
     25        #define _GNU_SOURCE         /* See feature_test_macros(7) */
     26        #include <errno.h>
     27        #include <signal.h>
     28        #include <stdint.h>
     29        #include <string.h>
     30        #include <unistd.h>
     31
    3932        extern "C" {
    40                 #define _GNU_SOURCE         /* See feature_test_macros(7) */
    41                 #include <errno.h>
    42                 #include <stdint.h>
    43                 #include <string.h>
    44                 #include <unistd.h>
    45                 #include <sys/mman.h>
     33                #include <sys/epoll.h>
    4634                #include <sys/syscall.h>
    4735
     
    4937        }
    5038
    51         #include "bits/signal.hfa"
    52         #include "kernel_private.hfa"
    53         #include "thread.hfa"
    54 
    55         uint32_t entries_per_cluster() {
    56                 return 256;
    57         }
    58 
    59         static void * __io_poller_slow( void * arg );
    60 
    61         // Weirdly, some systems that do support io_uring don't actually define these
    62         #ifdef __alpha__
    63                 /*
    64                 * alpha is the only exception, all other architectures
    65                 * have common numbers for new system calls.
    66                 */
    67                 #ifndef __NR_io_uring_setup
    68                         #define __NR_io_uring_setup           535
    69                 #endif
    70                 #ifndef __NR_io_uring_enter
    71                         #define __NR_io_uring_enter           536
    72                 #endif
    73                 #ifndef __NR_io_uring_register
    74                         #define __NR_io_uring_register        537
    75                 #endif
    76         #else /* !__alpha__ */
    77                 #ifndef __NR_io_uring_setup
    78                         #define __NR_io_uring_setup           425
    79                 #endif
    80                 #ifndef __NR_io_uring_enter
    81                         #define __NR_io_uring_enter           426
    82                 #endif
    83                 #ifndef __NR_io_uring_register
    84                         #define __NR_io_uring_register        427
    85                 #endif
    86         #endif
    87 
    88         // Fast poller user-thread
    89         // Not using the "thread" keyword because we want to control
    90         // more carefully when to start/stop it
    91         struct __io_poller_fast {
    92                 struct __io_data * ring;
    93                 bool waiting;
    94                 $thread thrd;
    95         };
    96 
    97         void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
    98                 this.ring = cltr.io;
    99                 this.waiting = true;
    100                 (this.thrd){ "Fast I/O Poller", cltr };
    101         }
    102         void ^?{}( __io_poller_fast & mutex this );
    103         void main( __io_poller_fast & this );
    104         static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
    105         void ^?{}( __io_poller_fast & mutex this ) {}
    106 
    107         struct __submition_data {
    108                 // Head and tail of the ring (associated with array)
    109                 volatile uint32_t * head;
    110                 volatile uint32_t * tail;
    111 
    112                 // The actual kernel ring which uses head/tail
    113                 // indexes into the sqes arrays
    114                 uint32_t * array;
    115 
    116                 // number of entries and mask to go with it
    117                 const uint32_t * num;
    118                 const uint32_t * mask;
    119 
    120                 // Submission flags (Not sure what for)
    121                 uint32_t * flags;
    122 
    123                 // number of sqes not submitted (whatever that means)
    124                 uint32_t * dropped;
    125 
    126                 // Like head/tail but not seen by the kernel
    127                 volatile uint32_t alloc;
    128                 volatile uint32_t ready;
    129 
    130                 __spinlock_t lock;
    131 
    132                 // A buffer of sqes (not the actual ring)
    133                 struct io_uring_sqe * sqes;
    134 
    135                 // The location and size of the mmaped area
    136                 void * ring_ptr;
    137                 size_t ring_sz;
    138 
    139                 // Statistics
    140                 #if !defined(__CFA_NO_STATISTICS__)
    141                         struct {
    142                                 struct {
    143                                         volatile unsigned long long int val;
    144                                         volatile unsigned long long int cnt;
    145                                         volatile unsigned long long int block;
    146                                 } submit_avg;
    147                         } stats;
    148                 #endif
    149         };
    150 
    151         struct __completion_data {
    152                 // Head and tail of the ring
    153                 volatile uint32_t * head;
    154                 volatile uint32_t * tail;
    155 
    156                 // number of entries and mask to go with it
    157                 const uint32_t * mask;
    158                 const uint32_t * num;
    159 
    160                 // number of cqes not submitted (whatever that means)
    161                 uint32_t * overflow;
    162 
    163                 // the kernel ring
    164                 struct io_uring_cqe * cqes;
    165 
    166                 // The location and size of the mmaped area
    167                 void * ring_ptr;
    168                 size_t ring_sz;
    169 
    170                 // Statistics
    171                 #if !defined(__CFA_NO_STATISTICS__)
    172                         struct {
    173                                 struct {
    174                                         unsigned long long int val;
    175                                         unsigned long long int slow_cnt;
    176                                         unsigned long long int fast_cnt;
    177                                 } completed_avg;
    178                         } stats;
    179                 #endif
    180         };
    181 
    182         struct __io_data {
    183                 struct __submition_data submit_q;
    184                 struct __completion_data completion_q;
    185                 uint32_t ring_flags;
    186                 int cltr_flags;
    187                 int fd;
    188                 semaphore submit;
    189                 volatile bool done;
    190                 struct {
    191                         struct {
    192                                 void * stack;
    193                                 pthread_t kthrd;
    194                         } slow;
    195                         __io_poller_fast fast;
    196                         __bin_sem_t sem;
    197                 } poller;
    198         };
    199 
    200 //=============================================================================================
    201 // I/O Startup / Shutdown logic
    202 //=============================================================================================
    203         void __kernel_io_startup( cluster & this, int io_flags, bool main_cluster ) {
    204                 this.io = malloc();
    205 
    206                 // Step 1 : call to setup
    207                 struct io_uring_params params;
    208                 memset(&params, 0, sizeof(params));
    209 
    210                 uint32_t nentries = entries_per_cluster();
    211 
    212                 int fd = syscall(__NR_io_uring_setup, nentries, &params );
    213                 if(fd < 0) {
    214                         abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
    215                 }
    216 
    217                 // Step 2 : mmap result
    218                 memset( this.io, 0, sizeof(struct __io_data) );
    219                 struct __submition_data  & sq = this.io->submit_q;
    220                 struct __completion_data & cq = this.io->completion_q;
    221 
    222                 // calculate the right ring size
    223                 sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
    224                 cq.ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
    225 
    226                 // Requires features
    227                 #if defined(IORING_FEAT_SINGLE_MMAP)
    228                         // adjust the size according to the parameters
    229                         if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
    230                                 cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz);
    231                         }
    232                 #endif
    233 
    234                 // mmap the Submit Queue into existence
    235                 sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
    236                 if (sq.ring_ptr == (void*)MAP_FAILED) {
    237                         abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
    238                 }
    239 
    240                 // Requires features
    241                 #if defined(IORING_FEAT_SINGLE_MMAP)
    242                         // mmap the Completion Queue into existence (may or may not be needed)
    243                         if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
    244                                 cq->ring_ptr = sq->ring_ptr;
    245                         }
    246                         else
    247                 #endif
    248                 {
    249                         // We need multiple call to MMAP
    250                         cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
    251                         if (cq.ring_ptr == (void*)MAP_FAILED) {
    252                                 munmap(sq.ring_ptr, sq.ring_sz);
    253                                 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
    254                         }
    255                 }
    256 
    257                 // mmap the submit queue entries
    258                 size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
    259                 sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
    260                 if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
    261                         munmap(sq.ring_ptr, sq.ring_sz);
    262                         if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
    263                         abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
    264                 }
    265 
    266                 // Get the pointers from the kernel to fill the structure
    267                 // submit queue
    268                 sq.head    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
    269                 sq.tail    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
    270                 sq.mask    = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
    271                 sq.num     = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
    272                 sq.flags   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
    273                 sq.dropped = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
    274                 sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
    275                 sq.alloc = *sq.tail;
    276                 sq.ready = *sq.tail;
    277 
    278                 // completion queue
    279                 cq.head     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
    280                 cq.tail     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
    281                 cq.mask     = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
    282                 cq.num      = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
    283                 cq.overflow = (         uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
    284                 cq.cqes   = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
    285 
    286                 // some paranoid checks
    287                 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
    288                 /* paranoid */ verifyf( (*cq.num)  >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
    289                 /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
    290                 /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
    291 
    292                 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
    293                 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
    294                 /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
    295                 /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
    296 
    297                 // Update the global ring info
    298                 this.io->ring_flags = params.flags;
    299                 this.io->cltr_flags = io_flags;
    300                 this.io->fd         = fd;
    301                 this.io->done       = false;
    302                 (this.io->submit){ min(*sq.num, *cq.num) };
    303 
    304                 // Initialize statistics
    305                 #if !defined(__CFA_NO_STATISTICS__)
    306                         this.io->submit_q.stats.submit_avg.val   = 0;
    307                         this.io->submit_q.stats.submit_avg.cnt   = 0;
    308                         this.io->submit_q.stats.submit_avg.block = 0;
    309                         this.io->completion_q.stats.completed_avg.val = 0;
    310                         this.io->completion_q.stats.completed_avg.slow_cnt = 0;
    311                         this.io->completion_q.stats.completed_avg.fast_cnt = 0;
    312                 #endif
    313 
    314                 if(!main_cluster) {
    315                         __kernel_io_finish_start( this );
    316                 }
    317         }
    318 
    319         void __kernel_io_finish_start( cluster & this ) {
    320                 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    321                         __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
    322                         (this.io->poller.fast){ this };
    323                         __thrd_start( this.io->poller.fast, main );
    324                 }
    325 
    326                 // Create the poller thread
    327                 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this);
    328                 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
    329         }
    330 
    331         void __kernel_io_prepare_stop( cluster & this ) {
    332                 __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
    333                 // Notify the poller thread of the shutdown
    334                 __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
    335 
    336                 // Stop the IO Poller
    337                 sigval val = { 1 };
    338                 pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
    339                 post( this.io->poller.sem );
    340 
    341                 // Wait for the poller thread to finish
    342                 pthread_join( this.io->poller.slow.kthrd, 0p );
    343                 free( this.io->poller.slow.stack );
    344 
    345                 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
    346 
    347                 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    348                         with( this.io->poller.fast ) {
    349                                 /* paranoid */ verify( waiting ); // The thread shouldn't be in a system call
    350                                 /* paranoid */ verify( this.procs.head == 0p || &this == mainCluster );
    351                                 /* paranoid */ verify( this.idles.head == 0p || &this == mainCluster );
    352 
    353                                 // We need to adjust the clean-up based on where the thread is
    354                                 if( thrd.preempted != __NO_PREEMPTION ) {
    355 
    356                                         // This is the tricky case
    357                                         // The thread was preempted and now it is on the ready queue
    358                                         /* paranoid */ verify( thrd.state == Active );           // The thread better be in this state
    359                                         /* paranoid */ verify( thrd.next == 1p );                // The thread should be the last on the list
    360                                         /* paranoid */ verify( this.ready_queue.head == &thrd ); // The thread should be the only thing on the list
    361 
    362                                         // Remove the thread from the ready queue of this cluster
    363                                         this.ready_queue.head = 1p;
    364                                         thrd.next = 0p;
    365 
    366                                         // Fixup the thread state
    367                                         thrd.state = Blocked;
    368                                         thrd.preempted = __NO_PREEMPTION;
    369 
    370                                         // Pretend like the thread was blocked all along
    371                                 }
    372                                 // !!! This is not an else if !!!
    373                                 if( thrd.state == Blocked ) {
    374 
    375                                         // This is the "easy case"
    376                                         // The thread is parked and can easily be moved to active cluster
    377                                         verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
    378                                         thrd.curr_cluster = active_cluster();
    379 
    380                         // unpark the fast io_poller
    381                                         unpark( &thrd __cfaabi_dbg_ctx2 );
    382                                 }
    383                                 else {
    384 
    385                                         // The thread is in a weird state
    386                                         // I don't know what to do here
    387                                         abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n");
    388                                 }
    389 
    390                         }
    391 
    392                         ^(this.io->poller.fast){};
    393 
    394                         __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
    395                 }
    396         }
    397 
    398         void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
    399                 if(!main_cluster) {
    400                         __kernel_io_prepare_stop( this );
    401                 }
    402 
    403                 // print statistics
    404                 #if !defined(__CFA_NO_STATISTICS__)
    405                         if(this.print_stats) {
    406                                 with(this.io->submit_q.stats, this.io->completion_q.stats) {
    407                                         __cfaabi_bits_print_safe( STDERR_FILENO,
    408                                                 "----- I/O uRing Stats -----\n"
    409                                                 "- total submit calls  : %'15llu\n"
    410                                                 "- avg submit          : %'18.2lf\n"
    411                                                 "- pre-submit block %%  : %'18.2lf\n"
    412                                                 "- total wait calls    : %'15llu   (%'llu slow, %'llu fast)\n"
    413                                                 "- avg completion/wait : %'18.2lf\n",
    414                                                 submit_avg.cnt,
    415                                                 ((double)submit_avg.val) / submit_avg.cnt,
    416                                                 (100.0 * submit_avg.block) / submit_avg.cnt,
    417                                                 completed_avg.slow_cnt + completed_avg.fast_cnt,
    418                                                 completed_avg.slow_cnt,  completed_avg.fast_cnt,
    419                                                 ((double)completed_avg.val) / (completed_avg.slow_cnt + completed_avg.fast_cnt)
    420                                         );
    421                                 }
    422                         }
    423                 #endif
    424 
    425                 // Shutdown the io rings
    426                 struct __submition_data  & sq = this.io->submit_q;
    427                 struct __completion_data & cq = this.io->completion_q;
    428 
    429                 // unmap the submit queue entries
    430                 munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
    431 
    432                 // unmap the Submit Queue ring
    433                 munmap(sq.ring_ptr, sq.ring_sz);
    434 
    435                 // unmap the Completion Queue ring, if it is different
    436                 if (cq.ring_ptr != sq.ring_ptr) {
    437                         munmap(cq.ring_ptr, cq.ring_sz);
    438                 }
    439 
    440                 // close the file descriptor
    441                 close(this.io->fd);
    442 
    443                 free( this.io );
     39        #include "stats.hfa"
     40        #include "kernel.hfa"
     41        #include "kernel/fwd.hfa"
     42        #include "io/types.hfa"
     43
     44//=============================================================================================
     45// I/O Syscall
     46//=============================================================================================
     47        static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) {
     48                bool need_sys_to_submit = false;
     49                bool need_sys_to_complete = false;
     50                unsigned flags = 0;
     51
     52                TO_SUBMIT:
     53                if( to_submit > 0 ) {
     54                        if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
     55                                need_sys_to_submit = true;
     56                                break TO_SUBMIT;
     57                        }
     58                        if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
     59                                need_sys_to_submit = true;
     60                                flags |= IORING_ENTER_SQ_WAKEUP;
     61                        }
     62                }
     63
     64                if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
     65                        flags |= IORING_ENTER_GETEVENTS;
     66                        if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
     67                                need_sys_to_complete = true;
     68                        }
     69                }
     70
     71                int ret = 0;
     72                if( need_sys_to_submit || need_sys_to_complete ) {
     73                        ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, 0p, _NSIG / 8);
     74                        if( ret < 0 ) {
     75                                switch((int)errno) {
     76                                case EAGAIN:
     77                                case EINTR:
     78                                        ret = -1;
     79                                        break;
     80                                default:
     81                                        abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
     82                                }
     83                        }
     84                }
     85
     86                // Memory barrier
     87                __atomic_thread_fence( __ATOMIC_SEQ_CST );
     88                return ret;
    44489        }
    44590
     
    44792// I/O Polling
    44893//=============================================================================================
    449         struct io_user_data {
    450                 int32_t result;
    451                 $thread * thrd;
    452         };
     94        static unsigned __collect_submitions( struct __io_data & ring );
     95        static uint32_t __release_consumed_submission( struct __io_data & ring );
     96
     97        static inline void process(struct io_uring_cqe & cqe ) {
     98                struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data;
     99                __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
     100
     101                data->result = cqe.res;
     102                unpark( data->thrd __cfaabi_dbg_ctx2 );
     103        }
    453104
    454105        // Process a single completion message from the io_uring
    455106        // This is NOT thread-safe
    456         static int __drain_io( struct __io_data & ring, sigset_t * mask, int waitcnt, bool in_kernel ) {
    457                 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
     107        static [int, bool] __drain_io( & struct __io_data ring ) {
     108                /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
     109
     110                unsigned to_submit = 0;
     111                if( ring.poller_submits ) {
     112                        // If the poller thread also submits, then we need to aggregate the submissions which are ready
     113                        to_submit = __collect_submitions( ring );
     114                }
     115
     116                int ret = __io_uring_enter(ring, to_submit, true);
    458117                if( ret < 0 ) {
    459                         switch((int)errno) {
    460                         case EAGAIN:
    461                         case EINTR:
    462                                 return -EAGAIN;
    463                         default:
    464                                 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
    465                         }
    466                 }
     118                        return [0, true];
     119                }
     120
     121                // update statistics
     122                if (to_submit > 0) {
     123                        __STATS__( true,
     124                                if( to_submit > 0 ) {
     125                                        io.submit_q.submit_avg.rdy += to_submit;
     126                                        io.submit_q.submit_avg.csm += ret;
     127                                        io.submit_q.submit_avg.cnt += 1;
     128                                }
     129                        )
     130                }
     131
     132                // Release the consumed SQEs
     133                __release_consumed_submission( ring );
    467134
    468135                // Drain the queue
    469136                unsigned head = *ring.completion_q.head;
    470                 unsigned tail = __atomic_load_n(ring.completion_q.tail, __ATOMIC_ACQUIRE);
     137                unsigned tail = *ring.completion_q.tail;
     138                const uint32_t mask = *ring.completion_q.mask;
    471139
    472140                // Nothing was new return 0
    473141                if (head == tail) {
    474                         return 0;
     142                        return [0, to_submit > 0];
    475143                }
    476144
    477145                uint32_t count = tail - head;
     146                /* paranoid */ verify( count != 0 );
    478147                for(i; count) {
    479                         unsigned idx = (head + i) & (*ring.completion_q.mask);
     148                        unsigned idx = (head + i) & mask;
    480149                        struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
    481150
    482151                        /* paranoid */ verify(&cqe);
    483152
    484                         struct io_user_data * data = (struct io_user_data *)cqe.user_data;
    485                         __cfadbg_print_safe( io, "Kernel I/O : Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
    486 
    487                         data->result = cqe.res;
    488                         if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
    489                         else         { __unpark( data->thrd __cfaabi_dbg_ctx2 ); }
    490                 }
    491 
    492                 // Allow new submissions to happen
    493                 V(ring.submit, count);
     153                        process( cqe );
     154                }
    494155
    495156                // Mark to the kernel that the cqe has been seen
    496157                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
     158                __atomic_thread_fence( __ATOMIC_SEQ_CST );
    497159                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
    498160
    499                 return count;
    500         }
    501 
    502         static void * __io_poller_slow( void * arg ) {
    503                 cluster * cltr = (cluster *)arg;
    504                 struct __io_data & ring = *cltr->io;
    505 
    506                 sigset_t mask;
    507                 sigfillset(&mask);
    508                 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
    509                         abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
    510                 }
    511 
    512                 sigdelset( &mask, SIGUSR1 );
    513 
    514                 verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
    515                 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
    516 
    517                 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
    518 
    519                 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    520                         while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    521                                 // In the user-thread approach drain and if anything was drained,
    522                                 // batton pass to the user-thread
    523                                 int count = __drain_io( ring, &mask, 1, true );
     161                return [count, count > 0 || to_submit > 0];
     162        }
     163
     164        void main( $io_ctx_thread & this ) {
     165                epoll_event ev;
     166                __ioctx_register( this, ev );
     167
     168                __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring);
     169
     170                int reset = 0;
     171                // Then loop until we need to start
     172                while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
     173                        // Drain the io
     174                        int count;
     175                        bool again;
     176                        disable_interrupts();
     177                                [count, again] = __drain_io( *this.ring );
     178
     179                                if(!again) reset++;
    524180
    525181                                // Update statistics
    526                                 #if !defined(__CFA_NO_STATISTICS__)
    527                                         ring.completion_q.stats.completed_avg.val += count;
    528                                         ring.completion_q.stats.completed_avg.slow_cnt += 1;
    529                                 #endif
    530 
    531                                 if(count > 0) {
    532                                         __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
    533                                         __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
    534                                         wait( ring.poller.sem );
    535                                 }
    536                         }
    537                 }
    538                 else {
    539                         while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    540                                 //In the naive approach, just poll the io completion queue directly
    541                                 int count = __drain_io( ring, &mask, 1, true );
    542 
    543                                 // Update statistics
    544                                 #if !defined(__CFA_NO_STATISTICS__)
    545                                         ring.completion_q.stats.completed_avg.val += count;
    546                                         ring.completion_q.stats.completed_avg.slow_cnt += 1;
    547                                 #endif
    548                         }
    549                 }
    550 
    551                 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
    552 
    553                 return 0p;
    554         }
    555 
    556         void main( __io_poller_fast & this ) {
    557                 verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD );
    558 
    559                 // Start parked
    560                 park( __cfaabi_dbg_ctx );
    561 
    562                 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
    563 
    564                 int reset = 0;
    565 
    566                 // Then loop until we need to start
    567                 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
    568                         // Drain the io
    569                         this.waiting = false;
    570                         int count = __drain_io( *this.ring, 0p, 0, false );
    571                         reset += count > 0 ? 1 : 0;
    572 
    573                         // Update statistics
    574                         #if !defined(__CFA_NO_STATISTICS__)
    575                                 this.ring->completion_q.stats.completed_avg.val += count;
    576                                 this.ring->completion_q.stats.completed_avg.fast_cnt += 1;
    577                         #endif
    578 
    579                         this.waiting = true;
     182                                __STATS__( true,
     183                                        io.complete_q.completed_avg.val += count;
     184                                        io.complete_q.completed_avg.fast_cnt += 1;
     185                                )
     186                        enable_interrupts( __cfaabi_dbg_ctx );
     187
     188                        // If we got something, just yield and check again
    580189                        if(reset < 5) {
    581                                 // If we got something, just yield and check again
    582190                                yield();
    583191                        }
     192                        // We didn't get anything baton pass to the slow poller
    584193                        else {
    585                                 // We didn't get anything baton pass to the slow poller
    586                                 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
    587                                 post( this.ring->poller.sem );
    588                                 park( __cfaabi_dbg_ctx );
     194                                __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self);
    589195                                reset = 0;
     196
     197                                // block this thread
     198                                __ioctx_prepare_block( this, ev );
     199                                wait( this.sem );
    590200                        }
    591201                }
     
    599209
    600210// Submition steps :
    601 // 1 - We need to make sure we don't overflow any of the buffer, P(ring.submit) to make sure
    602 //     entries are available. The semaphore make sure that there is no more operations in
    603 //     progress then the number of entries in the buffer. This probably limits concurrency
    604 //     more than necessary since submitted but not completed operations don't need any
    605 //     entries in user space. However, I don't know what happens if we overflow the buffers
    606 //     because too many requests completed at once. This is a safe approach in all cases.
    607 //     Furthermore, with hundreds of entries, this may be okay.
    608 //
    609 // 2 - Allocate a queue entry. The ring already has memory for all entries but only the ones
     211// 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones
    610212//     listed in sq.array are visible by the kernel. For those not listed, the kernel does not
    611213//     offer any assurance that an entry is not being filled by multiple flags. Therefore, we
    612214//     need to write an allocator that allows allocating concurrently.
    613215//
    614 // 3 - Actually fill the submit entry, this is the only simple and straightforward step.
    615 //
    616 // 4 - Append the entry index to the array and adjust the tail accordingly. This operation
     216// 2 - Actually fill the submit entry, this is the only simple and straightforward step.
     217//
     218// 3 - Append the entry index to the array and adjust the tail accordingly. This operation
    617219//     needs to arrive to two concensus at the same time:
    618220//     A - The order in which entries are listed in the array: no two threads must pick the
     
    622224//
    623225
    624         static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring ) {
    625                 // Wait for a spot to be available
    626                 __attribute__((unused)) bool blocked = P(ring.submit);
    627                 #if !defined(__CFA_NO_STATISTICS__)
    628                         __atomic_fetch_add( &ring.submit_q.stats.submit_avg.block, blocked ? 1ul64 : 0ul64, __ATOMIC_RELAXED );
    629                 #endif
    630 
    631                 // Allocate the sqe
    632                 uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST);
    633 
    634                 // Validate that we didn't overflow anything
    635                 // Check that nothing overflowed
    636                 /* paranoid */ verify( true );
    637 
    638                 // Check that it goes head -> tail -> alloc and never head -> alloc -> tail
    639                 /* paranoid */ verify( true );
    640 
    641                 // Return the sqe
    642                 return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx];
    643         }
    644 
    645         static inline void __submit( struct __io_data & ring, uint32_t idx ) {
    646                 // get mutual exclusion
    647                 lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
    648 
    649                 // Append to the list of ready entries
    650                 uint32_t * tail = ring.submit_q.tail;
     226        [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) {
     227                /* paranoid */ verify( data != 0 );
     228
     229                // Prepare the data we need
     230                __attribute((unused)) int len   = 0;
     231                __attribute((unused)) int block = 0;
     232                uint32_t cnt = *ring.submit_q.num;
     233                uint32_t mask = *ring.submit_q.mask;
     234
     235                disable_interrupts();
     236                        uint32_t off = __tls_rand();
     237                enable_interrupts( __cfaabi_dbg_ctx );
     238
     239                // Loop around looking for an available spot
     240                for() {
     241                        // Look through the list starting at some offset
     242                        for(i; cnt) {
     243                                uint64_t expected = 0;
     244                                uint32_t idx = (i + off) & mask;
     245                                struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
     246                                volatile uint64_t * udata = (volatile uint64_t *)&sqe->user_data;
     247
     248                                if( *udata == expected &&
     249                                        __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
     250                                {
     251                                        // update statistics
     252                                        __STATS__( false,
     253                                                io.submit_q.alloc_avg.val   += len;
     254                                                io.submit_q.alloc_avg.block += block;
     255                                                io.submit_q.alloc_avg.cnt   += 1;
     256                                        )
     257
     258
     259                                        // Success return the data
     260                                        return [sqe, idx];
     261                                }
     262                                verify(expected != data);
     263
     264                                len ++;
     265                        }
     266
     267                        block++;
     268                        yield();
     269                }
     270        }
     271
     272        static inline uint32_t __submit_to_ready_array( struct __io_data & ring, uint32_t idx, const uint32_t mask ) {
     273                /* paranoid */ verify( idx <= mask   );
     274                /* paranoid */ verify( idx != -1ul32 );
     275
     276                // We need to find a spot in the ready array
     277                __attribute((unused)) int len   = 0;
     278                __attribute((unused)) int block = 0;
     279                uint32_t ready_mask = ring.submit_q.ready_cnt - 1;
     280
     281                disable_interrupts();
     282                        uint32_t off = __tls_rand();
     283                enable_interrupts( __cfaabi_dbg_ctx );
     284
     285                uint32_t picked;
     286                LOOKING: for() {
     287                        for(i; ring.submit_q.ready_cnt) {
     288                                picked = (i + off) & ready_mask;
     289                                uint32_t expected = -1ul32;
     290                                if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
     291                                        break LOOKING;
     292                                }
     293                                verify(expected != idx);
     294
     295                                len ++;
     296                        }
     297
     298                        block++;
     299                        if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
     300                                __release_consumed_submission( ring );
     301                                unlock( ring.submit_q.lock );
     302                        }
     303                        else {
     304                                yield();
     305                        }
     306                }
     307
     308                // update statistics
     309                __STATS__( false,
     310                        io.submit_q.look_avg.val   += len;
     311                        io.submit_q.look_avg.block += block;
     312                        io.submit_q.look_avg.cnt   += 1;
     313                )
     314
     315                return picked;
     316        }
     317
     318        void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1))) {
     319                __io_data & ring = *ctx->thrd.ring;
     320                // Get now the data we definetely need
     321                volatile uint32_t * const tail = ring.submit_q.tail;
     322                const uint32_t mask  = *ring.submit_q.mask;
     323
     324                // There are 2 submission schemes, check which one we are using
     325                if( ring.poller_submits ) {
     326                        // If the poller thread submits, then we just need to add this to the ready array
     327                        __submit_to_ready_array( ring, idx, mask );
     328
     329                        post( ctx->thrd.sem );
     330
     331                        __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
     332                }
     333                else if( ring.eager_submits ) {
     334                        uint32_t picked = __submit_to_ready_array( ring, idx, mask );
     335
     336                        for() {
     337                                yield();
     338
     339                                // If some one else collected our index, we are done
     340                                #warning ABA problem
     341                                if( ring.submit_q.ready[picked] != idx ) {
     342                                        __STATS__( false,
     343                                                io.submit_q.helped += 1;
     344                                        )
     345                                        return;
     346                                }
     347
     348                                if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
     349                                        __STATS__( false,
     350                                                io.submit_q.leader += 1;
     351                                        )
     352                                        break;
     353                                }
     354
     355                                __STATS__( false,
     356                                        io.submit_q.busy += 1;
     357                                )
     358                        }
     359
     360                        // We got the lock
     361                        unsigned to_submit = __collect_submitions( ring );
     362                        int ret = __io_uring_enter( ring, to_submit, false );
     363                        if( ret < 0 ) {
     364                                unlock(ring.submit_q.lock);
     365                                return;
     366                        }
     367
     368                        /* paranoid */ verify( ret > 0 || to_submit == 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) );
     369
     370                        // Release the consumed SQEs
     371                        __release_consumed_submission( ring );
     372
     373                        // update statistics
     374                        __STATS__( true,
     375                                io.submit_q.submit_avg.rdy += to_submit;
     376                                io.submit_q.submit_avg.csm += ret;
     377                                io.submit_q.submit_avg.cnt += 1;
     378                        )
     379
     380                        unlock(ring.submit_q.lock);
     381                }
     382                else {
     383                        // get mutual exclusion
     384                        lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
     385
     386                        /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
     387                        /* paranoid */  "index %u already reclaimed\n"
     388                        /* paranoid */  "head %u, prev %u, tail %u\n"
     389                        /* paranoid */  "[-0: %u,-1: %u,-2: %u,-3: %u]\n",
     390                        /* paranoid */  idx,
     391                        /* paranoid */  *ring.submit_q.head, ring.submit_q.prev_head, *tail
     392                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
     393                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
     394                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
     395                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
     396                        /* paranoid */ );
     397
     398                        // Append to the list of ready entries
     399
     400                        /* paranoid */ verify( idx <= mask );
     401                        ring.submit_q.array[ (*tail) & mask ] = idx;
     402                        __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
     403
     404                        // Submit however, many entries need to be submitted
     405                        int ret = __io_uring_enter( ring, 1, false );
     406                        if( ret < 0 ) {
     407                                switch((int)errno) {
     408                                default:
     409                                        abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
     410                                }
     411                        }
     412
     413                        // update statistics
     414                        __STATS__( false,
     415                                io.submit_q.submit_avg.csm += 1;
     416                                io.submit_q.submit_avg.cnt += 1;
     417                        )
     418
     419                        // Release the consumed SQEs
     420                        __release_consumed_submission( ring );
     421
     422                        unlock(ring.submit_q.lock);
     423
     424                        __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
     425                }
     426        }
     427
     428        static unsigned __collect_submitions( struct __io_data & ring ) {
     429                /* paranoid */ verify( ring.submit_q.ready != 0p );
     430                /* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
     431
     432                unsigned to_submit = 0;
     433                uint32_t tail = *ring.submit_q.tail;
    651434                const uint32_t mask = *ring.submit_q.mask;
    652435
    653                 ring.submit_q.array[ (*tail) & mask ] = idx & mask;
    654                 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
    655 
    656                 // Submit however, many entries need to be submitted
    657                 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
    658                 if( ret < 0 ) {
    659                         switch((int)errno) {
    660                         default:
    661                                 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
    662                         }
    663                 }
    664 
    665                 // update statistics
    666                 #if !defined(__CFA_NO_STATISTICS__)
    667                         ring.submit_q.stats.submit_avg.val += 1;
    668                         ring.submit_q.stats.submit_avg.cnt += 1;
    669                 #endif
    670 
    671                 unlock(ring.submit_q.lock);
    672                 // Make sure that idx was submitted
    673                 // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us
    674                 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
    675         }
    676 
    677         static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) {
    678                 this.opcode = opcode;
    679                 #if !defined(IOSQE_ASYNC)
    680                         this.flags = 0;
    681                 #else
    682                         this.flags = IOSQE_ASYNC;
    683                 #endif
    684                 this.ioprio = 0;
    685                 this.fd = fd;
    686                 this.off = 0;
    687                 this.addr = 0;
    688                 this.len = 0;
    689                 this.rw_flags = 0;
    690                 this.__pad2[0] = this.__pad2[1] = this.__pad2[2] = 0;
    691         }
    692 
    693         static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd, void * addr, uint32_t len, uint64_t off ) {
    694                 (this){ opcode, fd };
    695                 this.off = off;
    696                 this.addr = (uint64_t)addr;
    697                 this.len = len;
    698         }
    699 
    700 
    701 //=============================================================================================
    702 // I/O Interface
    703 //=============================================================================================
    704 
    705         #define __submit_prelude \
    706                 struct __io_data & ring = *active_cluster()->io; \
    707                 struct io_uring_sqe * sqe; \
    708                 uint32_t idx; \
    709                 [sqe, idx] = __submit_alloc( ring );
    710 
    711         #define __submit_wait \
    712                 io_user_data data = { 0, active_thread() }; \
    713                 /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \
    714                 sqe->user_data = (uint64_t)&data; \
    715                 __submit( ring, idx ); \
    716                 park( __cfaabi_dbg_ctx ); \
    717                 return data.result;
     436                // Go through the list of ready submissions
     437                for( i; ring.submit_q.ready_cnt ) {
     438                        // replace any submission with the sentinel, to consume it.
     439                        uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
     440
     441                        // If it was already the sentinel, then we are done
     442                        if( idx == -1ul32 ) continue;
     443
     444                        // If we got a real submission, append it to the list
     445                        ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
     446                        to_submit++;
     447                }
     448
     449                // Increment the tail based on how many we are ready to submit
     450                __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
     451
     452                return to_submit;
     453        }
     454
     455        static uint32_t __release_consumed_submission( struct __io_data & ring ) {
     456                const uint32_t smask = *ring.submit_q.mask;
     457
     458                if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
     459                uint32_t chead = *ring.submit_q.head;
     460                uint32_t phead = ring.submit_q.prev_head;
     461                ring.submit_q.prev_head = chead;
     462                unlock(ring.submit_q.release_lock);
     463
     464                uint32_t count = chead - phead;
     465                for( i; count ) {
     466                        uint32_t idx = ring.submit_q.array[ (phead + i) & smask ];
     467                        ring.submit_q.sqes[ idx ].user_data = 0;
     468                }
     469                return count;
     470        }
    718471#endif
    719 
    720 // Some forward declarations
    721 extern "C" {
    722         #include <unistd.h>
    723         #include <sys/types.h>
    724         #include <sys/socket.h>
    725         #include <sys/syscall.h>
    726 
    727 #if defined(HAVE_PREADV2)
    728         struct iovec;
    729         extern ssize_t preadv2 (int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
    730 #endif
    731 #if defined(HAVE_PWRITEV2)
    732         struct iovec;
    733         extern ssize_t pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
    734 #endif
    735 
    736         extern int fsync(int fd);
    737         extern int sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags);
    738 
    739         struct msghdr;
    740         struct sockaddr;
    741         extern ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
    742         extern ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
    743         extern ssize_t send(int sockfd, const void *buf, size_t len, int flags);
    744         extern ssize_t recv(int sockfd, void *buf, size_t len, int flags);
    745         extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
    746         extern int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
    747 
    748         extern int fallocate(int fd, int mode, uint64_t offset, uint64_t len);
    749         extern int posix_fadvise(int fd, uint64_t offset, uint64_t len, int advice);
    750         extern int madvise(void *addr, size_t length, int advice);
    751 
    752         extern int openat(int dirfd, const char *pathname, int flags, mode_t mode);
    753         extern int close(int fd);
    754 
    755         extern ssize_t read (int fd, void *buf, size_t count);
    756 }
    757 
    758 //-----------------------------------------------------------------------------
    759 // Asynchronous operations
    760 #if defined(HAVE_PREADV2)
    761         ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
    762                 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READV)
    763                         return preadv2(fd, iov, iovcnt, offset, flags);
    764                 #else
    765                         __submit_prelude
    766 
    767                         (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset };
    768 
    769                         __submit_wait
    770                 #endif
    771         }
    772 #endif
    773 
    774 #if defined(HAVE_PWRITEV2)
    775         ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
    776                 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITEV)
    777                         return pwritev2(fd, iov, iovcnt, offset, flags);
    778                 #else
    779                         __submit_prelude
    780 
    781                         (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset };
    782 
    783                         __submit_wait
    784                 #endif
    785         }
    786 #endif
    787 
    788 int cfa_fsync(int fd) {
    789         #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FSYNC)
    790                 return fsync(fd);
    791         #else
    792                 __submit_prelude
    793 
    794                 (*sqe){ IORING_OP_FSYNC, fd };
    795 
    796                 __submit_wait
    797         #endif
    798 }
    799 
    800 int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags) {
    801         #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SYNC_FILE_RANGE)
    802                 return sync_file_range(fd, offset, nbytes, flags);
    803         #else
    804                 __submit_prelude
    805 
    806                 (*sqe){ IORING_OP_SYNC_FILE_RANGE, fd };
    807                 sqe->off = offset;
    808                 sqe->len = nbytes;
    809                 sqe->sync_range_flags = flags;
    810 
    811                 __submit_wait
    812         #endif
    813 }
    814 
    815 
    816 ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags) {
    817         #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SENDMSG)
    818                 return sendmsg(sockfd, msg, flags);
    819         #else
    820                 __submit_prelude
    821 
    822                 (*sqe){ IORING_OP_SENDMSG, sockfd, msg, 1, 0 };
    823                 sqe->msg_flags = flags;
    824 
    825                 __submit_wait
    826         #endif
    827 }
    828 
    829 ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags) {
    830         #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECVMSG)
    831                 return recvmsg(sockfd, msg, flags);
    832         #else
    833                 __submit_prelude
    834 
    835                 (*sqe){ IORING_OP_RECVMSG, sockfd, msg, 1, 0 };
    836                 sqe->msg_flags = flags;
    837 
    838                 __submit_wait
    839         #endif
    840 }
    841 
    842 ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags) {
    843         #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SEND)
    844                 return send( sockfd, buf, len, flags );
    845         #else
    846                 __submit_prelude
    847 
    848                 (*sqe){ IORING_OP_SEND, sockfd };
    849                 sqe->addr = (uint64_t)buf;
    850                 sqe->len = len;
    851                 sqe->msg_flags = flags;
    852 
    853                 __submit_wait
    854         #endif
    855 }
    856 
    857 ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags) {
    858         #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECV)
    859                 return recv( sockfd, buf, len, flags );
    860         #else
    861                 __submit_prelude
    862 
    863                 (*sqe){ IORING_OP_RECV, sockfd };
    864                 sqe->addr = (uint64_t)buf;
    865                 sqe->len = len;
    866                 sqe->msg_flags = flags;
    867 
    868                 __submit_wait
    869         #endif
    870 }
    871 
    872 int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
    873         #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_ACCEPT)
    874                 return accept4( sockfd, addr, addrlen, flags );
    875         #else
    876                 __submit_prelude
    877 
    878                 (*sqe){ IORING_OP_ACCEPT, sockfd };
    879                 sqe->addr = addr;
    880                 sqe->addr2 = addrlen;
    881                 sqe->accept_flags = flags;
    882 
    883                 __submit_wait
    884         #endif
    885 }
    886 
    887 int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) {
    888         #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CONNECT)
    889                 return connect( sockfd, addr, addrlen );
    890         #else
    891                 __submit_prelude
    892 
    893                 (*sqe){ IORING_OP_CONNECT, sockfd };
    894                 sqe->addr = (uint64_t)addr;
    895                 sqe->off = addrlen;
    896 
    897                 __submit_wait
    898         #endif
    899 }
    900 
    901 int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len) {
    902         #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FALLOCATE)
    903                 return fallocate( fd, mode, offset, len );
    904         #else
    905                 __submit_prelude
    906 
    907                 (*sqe){ IORING_OP_FALLOCATE, fd };
    908                 sqe->off = offset;
    909                 sqe->len = length;
    910                 sqe->mode = mode;
    911 
    912                 __submit_wait
    913         #endif
    914 }
    915 
    916 int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice) {
    917         #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FADVISE)
    918                 return posix_fadvise( fd, offset, len, advice );
    919         #else
    920                 __submit_prelude
    921 
    922                 (*sqe){ IORING_OP_FADVISE, fd };
    923                 sqe->off = (uint64_t)offset;
    924                 sqe->len = length;
    925                 sqe->fadvise_advice = advice;
    926 
    927                 __submit_wait
    928         #endif
    929 }
    930 
    931 int cfa_madvise(void *addr, size_t length, int advice) {
    932         #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_MADVISE)
    933                 return madvise( addr, length, advice );
    934         #else
    935                 __submit_prelude
    936 
    937                 (*sqe){ IORING_OP_MADVISE, 0 };
    938                 sqe->addr = (uint64_t)addr;
    939                 sqe->len = length;
    940                 sqe->fadvise_advice = advice;
    941 
    942                 __submit_wait
    943         #endif
    944 }
    945 
    946 int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode) {
    947         #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_OPENAT)
    948                 return openat( dirfd, pathname, flags, mode );
    949         #else
    950                 __submit_prelude
    951 
    952                 (*sqe){ IORING_OP_OPENAT, dirfd };
    953                 sqe->addr = (uint64_t)pathname;
    954                 sqe->open_flags = flags;
    955                 sqe->mode = mode;
    956 
    957                 __submit_wait
    958         #endif
    959 }
    960 
    961 int cfa_close(int fd) {
    962         #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CLOSE)
    963                 return close( fd );
    964         #else
    965                 __submit_prelude
    966 
    967                 (*sqe){ IORING_OP_CLOSE, fd };
    968 
    969                 __submit_wait
    970         #endif
    971 }
    972 
    973 
    974 ssize_t cfa_read(int fd, void *buf, size_t count) {
    975         #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READ)
    976                 return read( fd, buf, count );
    977         #else
    978                 __submit_prelude
    979 
    980                 (*sqe){ IORING_OP_READ, fd, buf, count, 0 };
    981 
    982                 __submit_wait
    983         #endif
    984 }
    985 
    986 ssize_t cfa_write(int fd, void *buf, size_t count) {
    987         #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITE)
    988                 return read( fd, buf, count );
    989         #else
    990                 __submit_prelude
    991 
    992                 (*sqe){ IORING_OP_WRITE, fd, buf, count, 0 };
    993 
    994                 __submit_wait
    995         #endif
    996 }
    997 
    998 //-----------------------------------------------------------------------------
    999 // Check if a function is asynchronous
    1000 
    1001 // Macro magic to reduce the size of the following switch case
    1002 #define IS_DEFINED_APPLY(f, ...) f(__VA_ARGS__)
    1003 #define IS_DEFINED_SECOND(first, second, ...) second
    1004 #define IS_DEFINED_TEST(expansion) _CFA_IO_FEATURE_##expansion
    1005 #define IS_DEFINED(macro) IS_DEFINED_APPLY( IS_DEFINED_SECOND,IS_DEFINED_TEST(macro) false, true)
    1006 
    1007 bool has_user_level_blocking( fptr_t func ) {
    1008         #if defined(HAVE_LINUX_IO_URING_H)
    1009                 #if defined(HAVE_PREADV2)
    1010                         if( /*func == (fptr_t)preadv2 || */
    1011                                 func == (fptr_t)cfa_preadv2 )
    1012                                 #define _CFA_IO_FEATURE_IORING_OP_READV ,
    1013                                 return IS_DEFINED(IORING_OP_READV);
    1014                 #endif
    1015 
    1016                 #if defined(HAVE_PWRITEV2)
    1017                         if( /*func == (fptr_t)pwritev2 || */
    1018                                 func == (fptr_t)cfa_pwritev2 )
    1019                                 #define _CFA_IO_FEATURE_IORING_OP_WRITEV ,
    1020                                 return IS_DEFINED(IORING_OP_WRITEV);
    1021                 #endif
    1022 
    1023                 if( /*func == (fptr_t)fsync || */
    1024                         func == (fptr_t)cfa_fsync )
    1025                         #define _CFA_IO_FEATURE_IORING_OP_FSYNC ,
    1026                         return IS_DEFINED(IORING_OP_FSYNC);
    1027 
    1028                 if( /*func == (fptr_t)ync_file_range || */
    1029                         func == (fptr_t)cfa_sync_file_range )
    1030                         #define _CFA_IO_FEATURE_IORING_OP_SYNC_FILE_RANGE ,
    1031                         return IS_DEFINED(IORING_OP_SYNC_FILE_RANGE);
    1032 
    1033                 if( /*func == (fptr_t)sendmsg || */
    1034                         func == (fptr_t)cfa_sendmsg )
    1035                         #define _CFA_IO_FEATURE_IORING_OP_SENDMSG ,
    1036                         return IS_DEFINED(IORING_OP_SENDMSG);
    1037 
    1038                 if( /*func == (fptr_t)recvmsg || */
    1039                         func == (fptr_t)cfa_recvmsg )
    1040                         #define _CFA_IO_FEATURE_IORING_OP_RECVMSG ,
    1041                         return IS_DEFINED(IORING_OP_RECVMSG);
    1042 
    1043                 if( /*func == (fptr_t)send || */
    1044                         func == (fptr_t)cfa_send )
    1045                         #define _CFA_IO_FEATURE_IORING_OP_SEND ,
    1046                         return IS_DEFINED(IORING_OP_SEND);
    1047 
    1048                 if( /*func == (fptr_t)recv || */
    1049                         func == (fptr_t)cfa_recv )
    1050                         #define _CFA_IO_FEATURE_IORING_OP_RECV ,
    1051                         return IS_DEFINED(IORING_OP_RECV);
    1052 
    1053                 if( /*func == (fptr_t)accept4 || */
    1054                         func == (fptr_t)cfa_accept4 )
    1055                         #define _CFA_IO_FEATURE_IORING_OP_ACCEPT ,
    1056                         return IS_DEFINED(IORING_OP_ACCEPT);
    1057 
    1058                 if( /*func == (fptr_t)connect || */
    1059                         func == (fptr_t)cfa_connect )
    1060                         #define _CFA_IO_FEATURE_IORING_OP_CONNECT ,
    1061                         return IS_DEFINED(IORING_OP_CONNECT);
    1062 
    1063                 if( /*func == (fptr_t)fallocate || */
    1064                         func == (fptr_t)cfa_fallocate )
    1065                         #define _CFA_IO_FEATURE_IORING_OP_FALLOCATE ,
    1066                         return IS_DEFINED(IORING_OP_FALLOCATE);
    1067 
    1068                 if( /*func == (fptr_t)posix_fadvise || */
    1069                         func == (fptr_t)cfa_fadvise )
    1070                         #define _CFA_IO_FEATURE_IORING_OP_FADVISE ,
    1071                         return IS_DEFINED(IORING_OP_FADVISE);
    1072 
    1073                 if( /*func == (fptr_t)madvise || */
    1074                         func == (fptr_t)cfa_madvise )
    1075                         #define _CFA_IO_FEATURE_IORING_OP_MADVISE ,
    1076                         return IS_DEFINED(IORING_OP_MADVISE);
    1077 
    1078                 if( /*func == (fptr_t)openat || */
    1079                         func == (fptr_t)cfa_openat )
    1080                         #define _CFA_IO_FEATURE_IORING_OP_OPENAT ,
    1081                         return IS_DEFINED(IORING_OP_OPENAT);
    1082 
    1083                 if( /*func == (fptr_t)close || */
    1084                         func == (fptr_t)cfa_close )
    1085                         #define _CFA_IO_FEATURE_IORING_OP_CLOSE ,
    1086                         return IS_DEFINED(IORING_OP_CLOSE);
    1087 
    1088                 if( /*func == (fptr_t)read || */
    1089                         func == (fptr_t)cfa_read )
    1090                         #define _CFA_IO_FEATURE_IORING_OP_READ ,
    1091                         return IS_DEFINED(IORING_OP_READ);
    1092 
    1093                 if( /*func == (fptr_t)write || */
    1094                         func == (fptr_t)cfa_write )
    1095                         #define _CFA_IO_FEATURE_IORING_OP_WRITE ,
    1096                         return IS_DEFINED(IORING_OP_WRITE);
    1097         #endif
    1098 
    1099         return false;
    1100 }
Note: See TracChangeset for help on using the changeset viewer.