- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/iocall.cfa
r5751a56 r3e2b9c9 14 14 // 15 15 16 #define __cforall_thread__ 17 16 18 #include "bits/defs.hfa" 17 19 … … 21 23 22 24 #if defined(CFA_HAVE_LINUX_IO_URING_H) 25 #include <assert.h> 23 26 #include <stdint.h> 27 #include <errno.h> 24 28 #include <linux/io_uring.h> 25 29 26 #include "kernel_private.hfa" 30 #include "kernel.hfa" 31 #include "kernel/fwd.hfa" 32 #include "io/types.hfa" 27 33 28 34 extern [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ); 29 extern void __submit( struct __io_data & ring, uint32_t idx);35 extern void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1))); 30 36 31 37 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) { … … 52 58 } 53 59 60 static inline io_context * __get_io_context( void ) { 61 cluster * cltr = active_cluster(); 62 /* paranoid */ verifyf( cltr, "No active cluster for io operation\n"); 63 assertf( cltr->io.cnt > 0, "Cluster %p has no default io contexts and no context was specified\n", cltr ); 64 /* paranoid */ verifyf( cltr->io.ctxs, "default io contexts for cluster %p are missing\n", cltr); 65 return &cltr->io.ctxs[ __tls_rand() % cltr->io.cnt ]; 66 } 67 68 69 #if defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_IO_DRAIN) && defined(CFA_HAVE_IOSQE_ASYNC) 70 #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_DRAIN | IOSQE_ASYNC) 71 #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_ASYNC) 72 #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_ASYNC) 73 #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_IO_DRAIN) 74 #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_DRAIN) 75 #elif defined(CFA_HAVE_IOSQE_IO_DRAIN) && defined(CFA_HAVE_IOSQE_ASYNC) 76 #define REGULAR_FLAGS (IOSQE_IO_DRAIN | IOSQE_ASYNC) 77 #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) 78 #define REGULAR_FLAGS (IOSQE_FIXED_FILE) 79 #elif defined(CFA_HAVE_IOSQE_IO_DRAIN) 80 #define REGULAR_FLAGS (IOSQE_IO_DRAIN) 81 #elif defined(CFA_HAVE_IOSQE_ASYNC) 82 #define REGULAR_FLAGS (IOSQE_ASYNC) 83 #else 84 #define REGULAR_FLAGS (0) 85 #endif 86 87 #if defined(CFA_HAVE_IOSQE_IO_LINK) && defined(CFA_HAVE_IOSQE_IO_HARDLINK) 88 #define LINK_FLAGS (IOSQE_IO_LINK | IOSQE_IO_HARDLINK) 89 #elif defined(CFA_HAVE_IOSQE_IO_LINK) 90 #define LINK_FLAGS (IOSQE_IO_LINK) 91 #elif defined(CFA_HAVE_IOSQE_IO_HARDLINK) 92 #define LINK_FLAGS (IOSQE_IO_HARDLINK) 93 #else 94 #define LINK_FLAGS (0) 95 #endif 96 97 #if defined(CFA_HAVE_SPLICE_F_FD_IN_FIXED) 98 #define SPLICE_FLAGS (SPLICE_F_FD_IN_FIXED) 99 #else 100 #define SPLICE_FLAGS (0) 101 #endif 102 103 54 104 #define __submit_prelude \ 105 if( 0 != (submit_flags & LINK_FLAGS) ) { errno = ENOTSUP; return -1; } \ 106 (void)timeout; (void)cancellation; \ 107 if( !context ) context = __get_io_context(); \ 55 108 __io_user_data_t data = { 0, active_thread() }; \ 56 struct __io_data & ring = * data.thrd->curr_cluster->io; \109 struct __io_data & ring = *context->thrd.ring; \ 57 110 struct io_uring_sqe * sqe; \ 58 111 uint32_t idx; \ 59 [sqe, idx] = __submit_alloc( ring, (uint64_t)(uintptr_t)&data ); 112 [sqe, idx] = __submit_alloc( ring, (uint64_t)(uintptr_t)&data ); \ 113 sqe->flags = REGULAR_FLAGS & submit_flags; 60 114 61 115 #define __submit_wait \ 62 116 /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \ 63 117 verify( sqe->user_data == (uint64_t)(uintptr_t)&data ); \ 64 __submit( ring, idx ); \118 __submit( context, idx ); \ 65 119 park( __cfaabi_dbg_ctx ); \ 120 if( data.result < 0 ) { \ 121 errno = -data.result; \ 122 return -1; \ 123 } \ 66 124 return data.result; 67 125 #endif … … 70 128 // I/O Forwards 71 129 //============================================================================================= 130 #include <time.hfa> 72 131 73 132 // Some forward declarations … … 121 180 // Asynchronous operations 122 181 #if defined(HAVE_PREADV2) 123 ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags ) {182 ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 124 183 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READV) 125 184 return preadv2(fd, iov, iovcnt, offset, flags); … … 132 191 #endif 133 192 } 134 135 ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 136 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READV) 137 return preadv2(fd, iov, iovcnt, offset, flags); 193 #endif 194 195 #if defined(HAVE_PWRITEV2) 196 ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 197 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITEV) 198 return pwritev2(fd, iov, iovcnt, offset, flags); 138 199 #else 139 200 __submit_prelude 140 201 141 (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset }; 142 sqe->flags |= IOSQE_FIXED_FILE; 202 (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset }; 143 203 144 204 __submit_wait … … 147 207 #endif 148 208 149 #if defined(HAVE_PWRITEV2) 150 ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 151 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITEV) 152 return pwritev2(fd, iov, iovcnt, offset, flags); 153 #else 154 __submit_prelude 155 156 (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset }; 157 158 __submit_wait 159 #endif 160 } 161 #endif 162 163 int cfa_fsync(int fd) { 209 int cfa_fsync(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 164 210 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FSYNC) 165 211 return fsync(fd); … … 173 219 } 174 220 175 int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags ) {221 int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 176 222 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SYNC_FILE_RANGE) 177 223 return sync_file_range(fd, offset, nbytes, flags); … … 189 235 190 236 191 ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags ) {237 ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 192 238 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SENDMSG) 193 239 return sendmsg(sockfd, msg, flags); … … 202 248 } 203 249 204 ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags ) {250 ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 205 251 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_RECVMSG) 206 252 return recvmsg(sockfd, msg, flags); … … 215 261 } 216 262 217 ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags ) {263 ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 218 264 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SEND) 219 265 return send( sockfd, buf, len, flags ); … … 230 276 } 231 277 232 ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags ) {278 ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 233 279 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_RECV) 234 280 return recv( sockfd, buf, len, flags ); … … 245 291 } 246 292 247 int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags ) {293 int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 248 294 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ACCEPT) 249 295 return accept4( sockfd, addr, addrlen, flags ); … … 260 306 } 261 307 262 int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen ) {308 int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 263 309 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_CONNECT) 264 310 return connect( sockfd, addr, addrlen ); … … 274 320 } 275 321 276 int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len ) {322 int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 277 323 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FALLOCATE) 278 324 return fallocate( fd, mode, offset, len ); … … 289 335 } 290 336 291 int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice ) {337 int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 292 338 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FADVISE) 293 339 return posix_fadvise( fd, offset, len, advice ); … … 304 350 } 305 351 306 int cfa_madvise(void *addr, size_t length, int advice ) {352 int cfa_madvise(void *addr, size_t length, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 307 353 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_MADVISE) 308 354 return madvise( addr, length, advice ); … … 319 365 } 320 366 321 int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode ) {367 int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 322 368 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_OPENAT) 323 369 return openat( dirfd, pathname, flags, mode ); … … 334 380 } 335 381 336 int cfa_close(int fd ) {382 int cfa_close(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 337 383 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_CLOSE) 338 384 return close( fd ); … … 348 394 // Forward declare in case it is not supported 349 395 struct statx; 350 int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf ) {396 int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 351 397 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_STATX) 352 398 #if defined(__NR_statx) … … 360 406 361 407 (*sqe){ IORING_OP_STATX, dirfd, pathname, mask, (uint64_t)statxbuf }; 362 sqe-> flags = flags;363 364 __submit_wait 365 #endif 366 } 367 368 ssize_t cfa_read(int fd, void *buf, size_t count ) {408 sqe->statx_flags = flags; 409 410 __submit_wait 411 #endif 412 } 413 414 ssize_t cfa_read(int fd, void *buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 369 415 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READ) 370 416 return read( fd, buf, count ); … … 378 424 } 379 425 380 ssize_t cfa_write(int fd, void *buf, size_t count ) {426 ssize_t cfa_write(int fd, void *buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 381 427 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITE) 382 428 return read( fd, buf, count ); … … 390 436 } 391 437 392 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags ) {438 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 393 439 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SPLICE) 394 440 return splice( fd_in, off_in, fd_out, off_out, len, flags ); … … 399 445 sqe->splice_fd_in = fd_in; 400 446 sqe->splice_off_in = off_in; 401 sqe->splice_flags = flags; 402 403 __submit_wait 404 #endif 405 } 406 407 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int in_flags, int out_flags) { 408 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SPLICE) 409 return splice( fd_in, off_in, fd_out, off_out, len, flags ); 410 #else 411 __submit_prelude 412 413 (*sqe){ IORING_OP_SPLICE, fd_out, 0p, len, off_out }; 414 sqe->splice_fd_in = fd_in; 415 sqe->splice_off_in = off_in; 416 sqe->splice_flags = flags | out_flags; 417 sqe->flags = in_flags; 418 419 __submit_wait 420 #endif 421 } 422 423 ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags) { 447 sqe->splice_flags = flags | (SPLICE_FLAGS & submit_flags); 448 449 __submit_wait 450 #endif 451 } 452 453 ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 424 454 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_TEE) 425 455 return tee( fd_in, fd_out, len, flags ); … … 429 459 (*sqe){ IORING_OP_TEE, fd_out, 0p, len, 0 }; 430 460 sqe->splice_fd_in = fd_in; 431 sqe->splice_flags = flags;461 sqe->splice_flags = flags | (SPLICE_FLAGS & submit_flags); 432 462 433 463 __submit_wait … … 536 566 537 567 if( /*func == (fptr_t)splice || */ 538 func == (fptr_t)(ssize_t (*)(int, loff_t *, int, loff_t *, size_t, unsigned int))cfa_splice, 539 func == (fptr_t)(ssize_t (*)(int, loff_t *, int, loff_t *, size_t, unsigned int, int, int))cfa_splice ) 568 func == (fptr_t)cfa_splice ) 540 569 #define _CFA_IO_FEATURE_CFA_HAVE_IORING_OP_SPLICE , 541 570 return IS_DEFINED(CFA_HAVE_IORING_OP_SPLICE);
Note: See TracChangeset
for help on using the changeset viewer.