Ignore:
Timestamp:
Jul 30, 2020, 3:00:19 PM (4 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
920dca3
Parents:
e0f93e0
Message:

Re-worked IO to use epoll and support multiple io_contexts per cluster.
Also redid how cluster options are handled.
Changed how iofwd calls are passed to support future features and io_contexts rework.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/iocall.cfa

    re0f93e0 rf00b26d4  
    2222#if defined(CFA_HAVE_LINUX_IO_URING_H)
    2323        #include <stdint.h>
     24        #include <errno.h>
    2425        #include <linux/io_uring.h>
    2526
     
    2728
    2829        extern [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data );
    29         extern void __submit( struct __io_data & ring, uint32_t idx );
     30        extern void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1)));
    3031
    3132        static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) {
     
    5253        }
    5354
     55
     56
     57      #if defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_IO_DRAIN) && defined(CFA_HAVE_IOSQE_ASYNC)
     58                #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_DRAIN | IOSQE_ASYNC)
     59        #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_ASYNC)
     60                #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_ASYNC)
     61      #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_IO_DRAIN)
     62                #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_DRAIN)
     63      #elif defined(CFA_HAVE_IOSQE_IO_DRAIN) && defined(CFA_HAVE_IOSQE_ASYNC)
     64                #define REGULAR_FLAGS (IOSQE_IO_DRAIN | IOSQE_ASYNC)
     65        #elif defined(CFA_HAVE_IOSQE_FIXED_FILE)
     66                #define REGULAR_FLAGS (IOSQE_FIXED_FILE)
     67      #elif defined(CFA_HAVE_IOSQE_IO_DRAIN)
     68                #define REGULAR_FLAGS (IOSQE_IO_DRAIN)
     69      #elif defined(CFA_HAVE_IOSQE_ASYNC)
     70                #define REGULAR_FLAGS (IOSQE_ASYNC)
     71        #else
     72                #define REGULAR_FLAGS (0)
     73        #endif
     74
     75        #if defined(CFA_HAVE_IOSQE_IO_LINK) && defined(CFA_HAVE_IOSQE_IO_HARDLINK)
     76                #define LINK_FLAGS (IOSQE_IO_LINK | IOSQE_IO_HARDLINK)
     77        #elif defined(CFA_HAVE_IOSQE_IO_LINK)
     78                #define LINK_FLAGS (IOSQE_IO_LINK)
     79        #elif defined(CFA_HAVE_IOSQE_IO_HARDLINK)
     80                #define LINK_FLAGS (IOSQE_IO_HARDLINK)
     81        #else
     82                #define LINK_FLAGS (0)
     83        #endif
     84
     85        #if defined(CFA_HAVE_SPLICE_F_FD_IN_FIXED)
     86                #define SPLICE_FLAGS (SPLICE_F_FD_IN_FIXED)
     87        #else
     88                #define SPLICE_FLAGS (0)
     89        #endif
     90
     91
    5492        #define __submit_prelude \
     93                if( 0 != (submit_flags & LINK_FLAGS) ) { errno = ENOTSUP; return -1; } \
     94                (void)timeout; (void)cancellation; \
     95                if( !context ) context = __get_io_context(); \
    5596                __io_user_data_t data = { 0, active_thread() }; \
    56                 struct __io_data & ring = *data.thrd->curr_cluster->io; \
     97                struct __io_data & ring = *context->thrd.ring; \
    5798                struct io_uring_sqe * sqe; \
    5899                uint32_t idx; \
    59                 [sqe, idx] = __submit_alloc( ring, (uint64_t)(uintptr_t)&data );
     100                [sqe, idx] = __submit_alloc( ring, (uint64_t)(uintptr_t)&data ); \
     101                sqe->flags = REGULAR_FLAGS & submit_flags;
    60102
    61103        #define __submit_wait \
    62104                /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \
    63105                verify( sqe->user_data == (uint64_t)(uintptr_t)&data ); \
    64                 __submit( ring, idx ); \
     106                __submit( context, idx ); \
    65107                park( __cfaabi_dbg_ctx ); \
     108                if( data.result < 0 ) { \
     109                        errno = -data.result; \
     110                        return -1; \
     111                } \
    66112                return data.result;
    67113#endif
     
    70116// I/O Forwards
    71117//=============================================================================================
     118#include <time.hfa>
    72119
    73120// Some forward declarations
     
    121168// Asynchronous operations
    122169#if defined(HAVE_PREADV2)
    123         ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
     170        ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    124171                #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READV)
    125172                        return preadv2(fd, iov, iovcnt, offset, flags);
     
    132179                #endif
    133180        }
    134 
    135         ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
    136                 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READV)
    137                         return preadv2(fd, iov, iovcnt, offset, flags);
     181#endif
     182
     183#if defined(HAVE_PWRITEV2)
     184        ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
     185                #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITEV)
     186                        return pwritev2(fd, iov, iovcnt, offset, flags);
    138187                #else
    139188                        __submit_prelude
    140189
    141                         (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset };
    142                         sqe->flags |= IOSQE_FIXED_FILE;
     190                        (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset };
    143191
    144192                        __submit_wait
     
    147195#endif
    148196
    149 #if defined(HAVE_PWRITEV2)
    150         ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
    151                 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITEV)
    152                         return pwritev2(fd, iov, iovcnt, offset, flags);
    153                 #else
    154                         __submit_prelude
    155 
    156                         (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset };
    157 
    158                         __submit_wait
    159                 #endif
    160         }
    161 #endif
    162 
    163 int cfa_fsync(int fd) {
     197int cfa_fsync(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    164198        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FSYNC)
    165199                return fsync(fd);
     
    173207}
    174208
    175 int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags) {
     209int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    176210        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SYNC_FILE_RANGE)
    177211                return sync_file_range(fd, offset, nbytes, flags);
     
    189223
    190224
    191 ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags) {
     225ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    192226        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SENDMSG)
    193227                return sendmsg(sockfd, msg, flags);
     
    202236}
    203237
    204 ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags) {
     238ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    205239        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_RECVMSG)
    206240                return recvmsg(sockfd, msg, flags);
     
    215249}
    216250
    217 ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags) {
     251ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    218252        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SEND)
    219253                return send( sockfd, buf, len, flags );
     
    230264}
    231265
    232 ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags) {
     266ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    233267        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_RECV)
    234268                return recv( sockfd, buf, len, flags );
     
    245279}
    246280
    247 int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
     281int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    248282        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ACCEPT)
    249283                return accept4( sockfd, addr, addrlen, flags );
     
    260294}
    261295
    262 int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) {
     296int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    263297        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_CONNECT)
    264298                return connect( sockfd, addr, addrlen );
     
    274308}
    275309
    276 int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len) {
     310int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    277311        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FALLOCATE)
    278312                return fallocate( fd, mode, offset, len );
     
    289323}
    290324
    291 int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice) {
     325int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    292326        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FADVISE)
    293327                return posix_fadvise( fd, offset, len, advice );
     
    304338}
    305339
    306 int cfa_madvise(void *addr, size_t length, int advice) {
     340int cfa_madvise(void *addr, size_t length, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    307341        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_MADVISE)
    308342                return madvise( addr, length, advice );
     
    319353}
    320354
    321 int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode) {
     355int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    322356        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_OPENAT)
    323357                return openat( dirfd, pathname, flags, mode );
     
    334368}
    335369
    336 int cfa_close(int fd) {
     370int cfa_close(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    337371        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_CLOSE)
    338372                return close( fd );
     
    348382// Forward declare in case it is not supported
    349383struct statx;
    350 int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf) {
     384int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    351385        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_STATX)
    352386                #if defined(__NR_statx)
     
    360394
    361395                (*sqe){ IORING_OP_STATX, dirfd, pathname, mask, (uint64_t)statxbuf };
    362                 sqe->flags = flags;
    363 
    364                 __submit_wait
    365         #endif
    366 }
    367 
    368 ssize_t cfa_read(int fd, void *buf, size_t count) {
     396                sqe->statx_flags = flags;
     397
     398                __submit_wait
     399        #endif
     400}
     401
     402ssize_t cfa_read(int fd, void *buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    369403        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READ)
    370404                return read( fd, buf, count );
     
    378412}
    379413
    380 ssize_t cfa_write(int fd, void *buf, size_t count) {
     414ssize_t cfa_write(int fd, void *buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    381415        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITE)
    382416                return read( fd, buf, count );
     
    390424}
    391425
    392 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags) {
     426ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    393427        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SPLICE)
    394428                return splice( fd_in, off_in, fd_out, off_out, len, flags );
     
    399433                sqe->splice_fd_in  = fd_in;
    400434                sqe->splice_off_in = off_in;
    401                 sqe->splice_flags  = flags;
    402 
    403                 __submit_wait
    404         #endif
    405 }
    406 
    407 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int in_flags, int out_flags) {
    408         #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SPLICE)
    409                 return splice( fd_in, off_in, fd_out, off_out, len, flags );
    410         #else
    411                 __submit_prelude
    412 
    413                 (*sqe){ IORING_OP_SPLICE, fd_out, 0p, len, off_out };
    414                 sqe->splice_fd_in  = fd_in;
    415                 sqe->splice_off_in = off_in;
    416                 sqe->splice_flags  = flags | out_flags;
    417                 sqe->flags = in_flags;
    418 
    419                 __submit_wait
    420         #endif
    421 }
    422 
    423 ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags) {
     435                sqe->splice_flags  = flags | (SPLICE_FLAGS & submit_flags);
     436
     437                __submit_wait
     438        #endif
     439}
     440
     441ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    424442        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_TEE)
    425443                return tee( fd_in, fd_out, len, flags );
     
    429447                (*sqe){ IORING_OP_TEE, fd_out, 0p, len, 0 };
    430448                sqe->splice_fd_in = fd_in;
    431                 sqe->splice_flags = flags;
     449                sqe->splice_flags  = flags | (SPLICE_FLAGS & submit_flags);
    432450
    433451                __submit_wait
     
    536554
    537555                if( /*func == (fptr_t)splice || */
    538                         func == (fptr_t)(ssize_t (*)(int, loff_t *, int, loff_t *, size_t, unsigned int))cfa_splice,
    539                         func == (fptr_t)(ssize_t (*)(int, loff_t *, int, loff_t *, size_t, unsigned int, int, int))cfa_splice )
     556                        func == (fptr_t)cfa_splice )
    540557                        #define _CFA_IO_FEATURE_CFA_HAVE_IORING_OP_SPLICE ,
    541558                        return IS_DEFINED(CFA_HAVE_IORING_OP_SPLICE);
Note: See TracChangeset for help on using the changeset viewer.