Changeset 53ee27e for libcfa/src/concurrency/iocall.cfa
- Timestamp:
- Aug 4, 2020, 5:13:51 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- eafec07
- Parents:
- 3f850d7 (diff), 21b0a23 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/iocall.cfa
r3f850d7 r53ee27e 14 14 // 15 15 16 #define __cforall_thread__ 17 16 18 #include "bits/defs.hfa" 19 #include "kernel.hfa" 17 20 18 21 //============================================================================================= … … 21 24 22 25 #if defined(CFA_HAVE_LINUX_IO_URING_H) 26 #include <assert.h> 23 27 #include <stdint.h> 28 #include <errno.h> 24 29 #include <linux/io_uring.h> 25 30 26 #include "kernel_private.hfa" 31 #include "kernel/fwd.hfa" 32 #include "io/types.hfa" 27 33 28 34 extern [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ); 29 extern void __submit( struct __io_data & ring, uint32_t idx);35 extern void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1))); 30 36 31 37 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) { … … 52 58 } 53 59 60 static inline io_context * __get_io_context( void ) { 61 cluster * cltr = active_cluster(); 62 /* paranoid */ verifyf( cltr, "No active cluster for io operation\n"); 63 assertf( cltr->io.cnt > 0, "Cluster %p has no default io contexts and no context was specified\n", cltr ); 64 /* paranoid */ verifyf( cltr->io.ctxs, "default io contexts for cluster %p are missing\n", cltr); 65 return &cltr->io.ctxs[ __tls_rand() % cltr->io.cnt ]; 66 } 67 68 69 #if defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_IO_DRAIN) && defined(CFA_HAVE_IOSQE_ASYNC) 70 #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_DRAIN | IOSQE_ASYNC) 71 #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_ASYNC) 72 #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_ASYNC) 73 #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_IO_DRAIN) 74 #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_DRAIN) 75 #elif defined(CFA_HAVE_IOSQE_IO_DRAIN) && defined(CFA_HAVE_IOSQE_ASYNC) 76 #define REGULAR_FLAGS (IOSQE_IO_DRAIN | IOSQE_ASYNC) 77 #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) 78 #define REGULAR_FLAGS (IOSQE_FIXED_FILE) 79 #elif defined(CFA_HAVE_IOSQE_IO_DRAIN) 80 #define REGULAR_FLAGS (IOSQE_IO_DRAIN) 81 #elif defined(CFA_HAVE_IOSQE_ASYNC) 82 #define REGULAR_FLAGS (IOSQE_ASYNC) 83 #else 84 #define REGULAR_FLAGS (0) 85 #endif 86 87 #if defined(CFA_HAVE_IOSQE_IO_LINK) && defined(CFA_HAVE_IOSQE_IO_HARDLINK) 88 #define LINK_FLAGS (IOSQE_IO_LINK | IOSQE_IO_HARDLINK) 89 #elif defined(CFA_HAVE_IOSQE_IO_LINK) 90 #define LINK_FLAGS (IOSQE_IO_LINK) 91 #elif defined(CFA_HAVE_IOSQE_IO_HARDLINK) 92 #define LINK_FLAGS (IOSQE_IO_HARDLINK) 93 #else 94 #define LINK_FLAGS (0) 95 #endif 96 97 #if defined(CFA_HAVE_SPLICE_F_FD_IN_FIXED) 98 #define SPLICE_FLAGS (SPLICE_F_FD_IN_FIXED) 99 #else 100 #define SPLICE_FLAGS (0) 101 #endif 102 103 54 104 #define __submit_prelude \ 105 if( 0 != (submit_flags & LINK_FLAGS) ) { errno = ENOTSUP; return -1; } \ 106 (void)timeout; (void)cancellation; \ 107 if( !context ) context = __get_io_context(); \ 55 108 __io_user_data_t data = { 0, active_thread() }; \ 56 struct __io_data & ring = * data.thrd->curr_cluster->io; \109 struct __io_data & ring = *context->thrd.ring; \ 57 110 struct io_uring_sqe * sqe; \ 58 111 uint32_t idx; \ 59 [sqe, idx] = __submit_alloc( ring, (uint64_t)(uintptr_t)&data ); 112 [sqe, idx] = __submit_alloc( ring, (uint64_t)(uintptr_t)&data ); \ 113 sqe->flags = REGULAR_FLAGS & submit_flags; 60 114 61 115 #define __submit_wait \ 62 116 /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \ 63 117 verify( sqe->user_data == (uint64_t)(uintptr_t)&data ); \ 64 __submit( ring, idx ); \118 __submit( context, idx ); \ 65 119 park( __cfaabi_dbg_ctx ); \ 120 if( data.result < 0 ) { \ 121 errno = -data.result; \ 122 return -1; \ 123 } \ 66 124 return data.result; 67 125 #endif … … 70 128 // I/O Forwards 71 129 //============================================================================================= 130 #include <time.hfa> 72 131 73 132 // Some forward declarations … … 121 180 // Asynchronous operations 122 181 #if defined(HAVE_PREADV2) 123 ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags ) {182 ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 124 183 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READV) 125 184 return preadv2(fd, iov, iovcnt, offset, flags); … … 132 191 #endif 133 192 } 134 135 ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 136 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READV) 137 return preadv2(fd, iov, iovcnt, offset, flags); 193 #endif 194 195 #if defined(HAVE_PWRITEV2) 196 ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 197 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITEV) 198 return pwritev2(fd, iov, iovcnt, offset, flags); 138 199 #else 139 200 __submit_prelude 140 201 141 (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset }; 142 sqe->flags |= IOSQE_FIXED_FILE; 202 (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset }; 143 203 144 204 __submit_wait … … 147 207 #endif 148 208 149 #if defined(HAVE_PWRITEV2) 150 ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 151 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITEV) 152 return pwritev2(fd, iov, iovcnt, offset, flags); 153 #else 154 __submit_prelude 155 156 (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset }; 157 158 __submit_wait 159 #endif 160 } 161 #endif 162 163 int cfa_fsync(int fd) { 209 int cfa_fsync(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 164 210 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FSYNC) 165 211 return fsync(fd); … … 173 219 } 174 220 175 int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags ) {221 int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 176 222 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SYNC_FILE_RANGE) 177 223 return sync_file_range(fd, offset, nbytes, flags); … … 189 235 190 236 191 ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags ) {237 ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 192 238 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SENDMSG) 193 239 return sendmsg(sockfd, msg, flags); … … 202 248 } 203 249 204 ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags ) {250 ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 205 251 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_RECVMSG) 206 252 return recvmsg(sockfd, msg, flags); … … 215 261 } 216 262 217 ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags ) {263 ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 218 264 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SEND) 219 265 return send( sockfd, buf, len, flags ); … … 230 276 } 231 277 232 ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags ) {278 ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 233 279 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_RECV) 234 280 return recv( sockfd, buf, len, flags ); … … 245 291 } 246 292 247 int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags ) {293 int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 248 294 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ACCEPT) 249 295 return accept4( sockfd, addr, addrlen, flags ); … … 260 306 } 261 307 262 int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen ) {308 int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 263 309 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_CONNECT) 264 310 return connect( sockfd, addr, addrlen ); … … 274 320 } 275 321 276 int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len ) {322 int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 277 323 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FALLOCATE) 278 324 return fallocate( fd, mode, offset, len ); … … 291 337 } 292 338 293 int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice ) {339 int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 294 340 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FADVISE) 295 341 return posix_fadvise( fd, offset, len, advice ); … … 306 352 } 307 353 308 int cfa_madvise(void *addr, size_t length, int advice ) {354 int cfa_madvise(void *addr, size_t length, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 309 355 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_MADVISE) 310 356 return madvise( addr, length, advice ); … … 321 367 } 322 368 323 int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode ) {369 int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 324 370 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_OPENAT) 325 371 return openat( dirfd, pathname, flags, mode ); … … 336 382 } 337 383 338 int cfa_close(int fd ) {384 int cfa_close(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 339 385 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_CLOSE) 340 386 return close( fd ); … … 350 396 // Forward declare in case it is not supported 351 397 struct statx; 352 int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf ) {398 int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 353 399 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_STATX) 354 400 #if defined(__NR_statx) … … 362 408 363 409 (*sqe){ IORING_OP_STATX, dirfd, pathname, mask, (uint64_t)statxbuf }; 364 sqe-> flags = flags;365 366 __submit_wait 367 #endif 368 } 369 370 ssize_t cfa_read(int fd, void *buf, size_t count ) {410 sqe->statx_flags = flags; 411 412 __submit_wait 413 #endif 414 } 415 416 ssize_t cfa_read(int fd, void *buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 371 417 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READ) 372 418 return read( fd, buf, count ); … … 380 426 } 381 427 382 ssize_t cfa_write(int fd, void *buf, size_t count ) {428 ssize_t cfa_write(int fd, void *buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 383 429 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITE) 384 430 return read( fd, buf, count ); … … 392 438 } 393 439 394 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags ) {440 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 395 441 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SPLICE) 396 442 return splice( fd_in, off_in, fd_out, off_out, len, flags ); … … 413 459 sqe->splice_off_in = (uint64_t)-1; 414 460 } 415 sqe->splice_flags = flags; 416 417 __submit_wait 418 #endif 419 } 420 421 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int in_flags, int out_flags) { 422 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SPLICE) 423 return splice( fd_in, off_in, fd_out, off_out, len, flags ); 424 #else 425 __submit_prelude 426 427 (*sqe){ IORING_OP_SPLICE, fd_out }; 428 if( off_out ) { 429 sqe->off = *off_out; 430 } 431 else { 432 sqe->off = (uint64_t)-1; 433 } 434 sqe->len = len; 435 sqe->splice_fd_in = fd_in; 436 if( off_in ) { 437 sqe->splice_off_in = *off_in; 438 } 439 else { 440 sqe->splice_off_in = (uint64_t)-1; 441 } 442 sqe->splice_flags = flags | out_flags; 443 sqe->flags = in_flags; 444 445 __submit_wait 446 #endif 447 } 448 449 ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags) { 461 sqe->splice_flags = flags | (SPLICE_FLAGS & submit_flags); 462 463 __submit_wait 464 #endif 465 } 466 467 ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 450 468 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_TEE) 451 469 return tee( fd_in, fd_out, len, flags ); … … 455 473 (*sqe){ IORING_OP_TEE, fd_out, 0p, len, 0 }; 456 474 sqe->splice_fd_in = fd_in; 457 sqe->splice_flags = flags;475 sqe->splice_flags = flags | (SPLICE_FLAGS & submit_flags); 458 476 459 477 __submit_wait … … 562 580 563 581 if( /*func == (fptr_t)splice || */ 564 func == (fptr_t)(ssize_t (*)(int, loff_t *, int, loff_t *, size_t, unsigned int))cfa_splice, 565 func == (fptr_t)(ssize_t (*)(int, loff_t *, int, loff_t *, size_t, unsigned int, int, int))cfa_splice ) 582 func == (fptr_t)cfa_splice ) 566 583 #define _CFA_IO_FEATURE_CFA_HAVE_IORING_OP_SPLICE , 567 584 return IS_DEFINED(CFA_HAVE_IORING_OP_SPLICE);
Note: See TracChangeset
for help on using the changeset viewer.