- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/iocall.cfa
r3e2b9c9 r5751a56 14 14 // 15 15 16 #define __cforall_thread__17 18 16 #include "bits/defs.hfa" 19 17 … … 23 21 24 22 #if defined(CFA_HAVE_LINUX_IO_URING_H) 25 #include <assert.h>26 23 #include <stdint.h> 27 #include <errno.h>28 24 #include <linux/io_uring.h> 29 25 30 #include "kernel.hfa" 31 #include "kernel/fwd.hfa" 32 #include "io/types.hfa" 26 #include "kernel_private.hfa" 33 27 34 28 extern [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ); 35 extern void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1)));29 extern void __submit( struct __io_data & ring, uint32_t idx ); 36 30 37 31 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) { … … 58 52 } 59 53 60 static inline io_context * __get_io_context( void ) {61 cluster * cltr = active_cluster();62 /* paranoid */ verifyf( cltr, "No active cluster for io operation\n");63 assertf( cltr->io.cnt > 0, "Cluster %p has no default io contexts and no context was specified\n", cltr );64 /* paranoid */ verifyf( cltr->io.ctxs, "default io contexts for cluster %p are missing\n", cltr);65 return &cltr->io.ctxs[ __tls_rand() % cltr->io.cnt ];66 }67 68 69 #if defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_IO_DRAIN) && defined(CFA_HAVE_IOSQE_ASYNC)70 #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_DRAIN | IOSQE_ASYNC)71 #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_ASYNC)72 #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_ASYNC)73 #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_IO_DRAIN)74 #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_DRAIN)75 #elif defined(CFA_HAVE_IOSQE_IO_DRAIN) && defined(CFA_HAVE_IOSQE_ASYNC)76 #define REGULAR_FLAGS (IOSQE_IO_DRAIN | IOSQE_ASYNC)77 #elif defined(CFA_HAVE_IOSQE_FIXED_FILE)78 #define REGULAR_FLAGS (IOSQE_FIXED_FILE)79 #elif defined(CFA_HAVE_IOSQE_IO_DRAIN)80 #define REGULAR_FLAGS (IOSQE_IO_DRAIN)81 #elif defined(CFA_HAVE_IOSQE_ASYNC)82 #define REGULAR_FLAGS (IOSQE_ASYNC)83 #else84 #define REGULAR_FLAGS (0)85 #endif86 87 #if defined(CFA_HAVE_IOSQE_IO_LINK) && defined(CFA_HAVE_IOSQE_IO_HARDLINK)88 #define LINK_FLAGS (IOSQE_IO_LINK | IOSQE_IO_HARDLINK)89 #elif defined(CFA_HAVE_IOSQE_IO_LINK)90 #define LINK_FLAGS (IOSQE_IO_LINK)91 #elif defined(CFA_HAVE_IOSQE_IO_HARDLINK)92 #define LINK_FLAGS (IOSQE_IO_HARDLINK)93 #else94 #define LINK_FLAGS (0)95 #endif96 97 #if defined(CFA_HAVE_SPLICE_F_FD_IN_FIXED)98 #define SPLICE_FLAGS (SPLICE_F_FD_IN_FIXED)99 #else100 #define SPLICE_FLAGS (0)101 #endif102 103 104 54 #define __submit_prelude \ 105 if( 0 != (submit_flags & LINK_FLAGS) ) { errno = ENOTSUP; return -1; } \106 (void)timeout; (void)cancellation; \107 if( !context ) context = __get_io_context(); \108 55 __io_user_data_t data = { 0, active_thread() }; \ 109 struct __io_data & ring = * context->thrd.ring; \56 struct __io_data & ring = *data.thrd->curr_cluster->io; \ 110 57 struct io_uring_sqe * sqe; \ 111 58 uint32_t idx; \ 112 [sqe, idx] = __submit_alloc( ring, (uint64_t)(uintptr_t)&data ); \ 113 sqe->flags = REGULAR_FLAGS & submit_flags; 59 [sqe, idx] = __submit_alloc( ring, (uint64_t)(uintptr_t)&data ); 114 60 115 61 #define __submit_wait \ 116 62 /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \ 117 63 verify( sqe->user_data == (uint64_t)(uintptr_t)&data ); \ 118 __submit( context, idx ); \64 __submit( ring, idx ); \ 119 65 park( __cfaabi_dbg_ctx ); \ 120 if( data.result < 0 ) { \121 errno = -data.result; \122 return -1; \123 } \124 66 return data.result; 125 67 #endif … … 128 70 // I/O Forwards 129 71 //============================================================================================= 130 #include <time.hfa>131 72 132 73 // Some forward declarations … … 180 121 // Asynchronous operations 181 122 #if defined(HAVE_PREADV2) 182 ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags , int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {123 ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 183 124 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READV) 184 125 return preadv2(fd, iov, iovcnt, offset, flags); … … 191 132 #endif 192 133 } 134 135 ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 136 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READV) 137 return preadv2(fd, iov, iovcnt, offset, flags); 138 #else 139 __submit_prelude 140 141 (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset }; 142 sqe->flags |= IOSQE_FIXED_FILE; 143 144 __submit_wait 145 #endif 146 } 193 147 #endif 194 148 195 149 #if defined(HAVE_PWRITEV2) 196 ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags , int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {150 ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 197 151 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITEV) 198 152 return pwritev2(fd, iov, iovcnt, offset, flags); … … 207 161 #endif 208 162 209 int cfa_fsync(int fd , int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {163 int cfa_fsync(int fd) { 210 164 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FSYNC) 211 165 return fsync(fd); … … 219 173 } 220 174 221 int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags , int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {175 int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags) { 222 176 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SYNC_FILE_RANGE) 223 177 return sync_file_range(fd, offset, nbytes, flags); … … 235 189 236 190 237 ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags , int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {191 ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags) { 238 192 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SENDMSG) 239 193 return sendmsg(sockfd, msg, flags); … … 248 202 } 249 203 250 ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags , int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {204 ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags) { 251 205 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_RECVMSG) 252 206 return recvmsg(sockfd, msg, flags); … … 261 215 } 262 216 263 ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags , int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {217 ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags) { 264 218 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SEND) 265 219 return send( sockfd, buf, len, flags ); … … 276 230 } 277 231 278 ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags , int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {232 ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags) { 279 233 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_RECV) 280 234 return recv( sockfd, buf, len, flags ); … … 291 245 } 292 246 293 int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags , int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {247 int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) { 294 248 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ACCEPT) 295 249 return accept4( sockfd, addr, addrlen, flags ); … … 306 260 } 307 261 308 int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen , int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {262 int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) { 309 263 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_CONNECT) 310 264 return connect( sockfd, addr, addrlen ); … … 320 274 } 321 275 322 int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len , int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {276 int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len) { 323 277 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FALLOCATE) 324 278 return fallocate( fd, mode, offset, len ); … … 335 289 } 336 290 337 int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice , int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {291 int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice) { 338 292 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FADVISE) 339 293 return posix_fadvise( fd, offset, len, advice ); … … 350 304 } 351 305 352 int cfa_madvise(void *addr, size_t length, int advice , int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {306 int cfa_madvise(void *addr, size_t length, int advice) { 353 307 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_MADVISE) 354 308 return madvise( addr, length, advice ); … … 365 319 } 366 320 367 int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode , int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {321 int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode) { 368 322 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_OPENAT) 369 323 return openat( dirfd, pathname, flags, mode ); … … 380 334 } 381 335 382 int cfa_close(int fd , int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {336 int cfa_close(int fd) { 383 337 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_CLOSE) 384 338 return close( fd ); … … 394 348 // Forward declare in case it is not supported 395 349 struct statx; 396 int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf , int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {350 int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf) { 397 351 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_STATX) 398 352 #if defined(__NR_statx) … … 406 360 407 361 (*sqe){ IORING_OP_STATX, dirfd, pathname, mask, (uint64_t)statxbuf }; 408 sqe-> statx_flags = flags;409 410 __submit_wait 411 #endif 412 } 413 414 ssize_t cfa_read(int fd, void *buf, size_t count , int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {362 sqe->flags = flags; 363 364 __submit_wait 365 #endif 366 } 367 368 ssize_t cfa_read(int fd, void *buf, size_t count) { 415 369 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READ) 416 370 return read( fd, buf, count ); … … 424 378 } 425 379 426 ssize_t cfa_write(int fd, void *buf, size_t count , int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {380 ssize_t cfa_write(int fd, void *buf, size_t count) { 427 381 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITE) 428 382 return read( fd, buf, count ); … … 436 390 } 437 391 438 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags , int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {392 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags) { 439 393 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SPLICE) 440 394 return splice( fd_in, off_in, fd_out, off_out, len, flags ); … … 445 399 sqe->splice_fd_in = fd_in; 446 400 sqe->splice_off_in = off_in; 447 sqe->splice_flags = flags | (SPLICE_FLAGS & submit_flags); 448 449 __submit_wait 450 #endif 451 } 452 453 ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 401 sqe->splice_flags = flags; 402 403 __submit_wait 404 #endif 405 } 406 407 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int in_flags, int out_flags) { 408 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SPLICE) 409 return splice( fd_in, off_in, fd_out, off_out, len, flags ); 410 #else 411 __submit_prelude 412 413 (*sqe){ IORING_OP_SPLICE, fd_out, 0p, len, off_out }; 414 sqe->splice_fd_in = fd_in; 415 sqe->splice_off_in = off_in; 416 sqe->splice_flags = flags | out_flags; 417 sqe->flags = in_flags; 418 419 __submit_wait 420 #endif 421 } 422 423 ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags) { 454 424 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_TEE) 455 425 return tee( fd_in, fd_out, len, flags ); … … 459 429 (*sqe){ IORING_OP_TEE, fd_out, 0p, len, 0 }; 460 430 sqe->splice_fd_in = fd_in; 461 sqe->splice_flags = flags | (SPLICE_FLAGS & submit_flags);431 sqe->splice_flags = flags; 462 432 463 433 __submit_wait … … 566 536 567 537 if( /*func == (fptr_t)splice || */ 568 func == (fptr_t)cfa_splice ) 538 func == (fptr_t)(ssize_t (*)(int, loff_t *, int, loff_t *, size_t, unsigned int))cfa_splice, 539 func == (fptr_t)(ssize_t (*)(int, loff_t *, int, loff_t *, size_t, unsigned int, int, int))cfa_splice ) 569 540 #define _CFA_IO_FEATURE_CFA_HAVE_IORING_OP_SPLICE , 570 541 return IS_DEFINED(CFA_HAVE_IORING_OP_SPLICE);
Note: See TracChangeset
for help on using the changeset viewer.