Changes in libcfa/src/concurrency/io.cfa [8962722:4069faad]
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r8962722 r4069faad 1 // 2 // Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo 3 // 4 // The contents of this file are covered under the licence agreement in the 5 // file "LICENCE" distributed with Cforall. 6 // 7 // io.cfa -- 8 // 9 // Author : Thierry Delisle 10 // Created On : Thu Apr 23 17:31:00 2020 11 // Last Modified By : 12 // Last Modified On : 13 // Update Count : 14 // 15 16 // #define __CFA_DEBUG_PRINT_IO__ 17 1 18 #include "kernel.hfa" 2 19 3 20 #if !defined(HAVE_LINUX_IO_URING_H) 4 void __kernel_io_startup( cluster & this) {21 void __kernel_io_startup( cluster & ) { 5 22 // Nothing to do without io_uring 6 23 } 7 24 8 void __kernel_io_s hutdown( cluster & this) {25 void __kernel_io_start_thrd( cluster & ) { 9 26 // Nothing to do without io_uring 10 27 } 11 28 12 bool is_async( void (*)() ) { 13 return false; 29 void __kernel_io_stop_thrd ( cluster & ) { 30 // Nothing to do without io_uring 31 } 32 33 void __kernel_io_shutdown( cluster & ) { 34 // Nothing to do without io_uring 14 35 } 15 36 … … 35 56 } 36 57 37 static void * __io_poller( void * arg ); 38 39 // Weirdly, some systems that do support io_uring don't actually define these 40 #ifdef __alpha__ 41 /* 42 * alpha is the only exception, all other architectures 43 * have common numbers for new system calls. 44 */ 45 # ifndef __NR_io_uring_setup 46 # define __NR_io_uring_setup 535 47 # endif 48 # ifndef __NR_io_uring_enter 49 # define __NR_io_uring_enter 536 50 # endif 51 # ifndef __NR_io_uring_register 52 # define __NR_io_uring_register 537 53 # endif 54 #else /* !__alpha__ */ 55 # ifndef __NR_io_uring_setup 56 # define __NR_io_uring_setup 425 57 # endif 58 # ifndef __NR_io_uring_enter 59 # define __NR_io_uring_enter 426 60 # endif 61 # ifndef __NR_io_uring_register 62 # define __NR_io_uring_register 427 63 # endif 64 #endif 65 58 static void * __io_poller_slow( void * arg ); 59 60 // Weirdly, some systems that do support io_uring don't actually define these 61 #ifdef __alpha__ 62 /* 63 * alpha is the only exception, all other architectures 64 * have common numbers for new system calls. 65 */ 66 #ifndef __NR_io_uring_setup 67 #define __NR_io_uring_setup 535 68 #endif 69 #ifndef __NR_io_uring_enter 70 #define __NR_io_uring_enter 536 71 #endif 72 #ifndef __NR_io_uring_register 73 #define __NR_io_uring_register 537 74 #endif 75 #else /* !__alpha__ */ 76 #ifndef __NR_io_uring_setup 77 #define __NR_io_uring_setup 425 78 #endif 79 #ifndef __NR_io_uring_enter 80 #define __NR_io_uring_enter 426 81 #endif 82 #ifndef __NR_io_uring_register 83 #define __NR_io_uring_register 427 84 #endif 85 #endif 86 87 #if defined(__CFA_IO_POLLING_USER__) 88 void ?{}( __io_poller_fast & this, struct cluster & cltr ) { 89 this.ring = &cltr.io; 90 (this.thrd){ "I/O Poller", cltr }; 91 } 92 void ^?{}( __io_poller_fast & mutex this ); 93 void main( __io_poller_fast & this ); 94 static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; } 95 void ^?{}( __io_poller_fast & mutex this ) {} 96 #endif 66 97 67 98 //============================================================================================= 68 99 // I/O Startup / Shutdown logic 69 100 //============================================================================================= 70 void __kernel_io_startup( cluster & this ) {101 void __kernel_io_startup( cluster & this, bool main_cluster ) { 71 102 // Step 1 : call to setup 72 103 struct io_uring_params params; … … 90 121 91 122 // Requires features 92 // // adjust the size according to the parameters 93 // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 94 // cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz); 95 // } 123 #if defined(IORING_FEAT_SINGLE_MMAP) 124 // adjust the size according to the parameters 125 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 126 cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz); 127 } 128 #endif 96 129 97 130 // mmap the Submit Queue into existence … … 101 134 } 102 135 103 // mmap the Completion Queue into existence (may or may not be needed)104 136 // Requires features 105 // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 106 // cq->ring_ptr = sq->ring_ptr; 107 // } 108 // else { 137 #if defined(IORING_FEAT_SINGLE_MMAP) 138 // mmap the Completion Queue into existence (may or may not be needed) 139 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 140 cq->ring_ptr = sq->ring_ptr; 141 } 142 else 143 #endif 144 { 109 145 // We need multiple call to MMAP 110 146 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING); … … 113 149 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno)); 114 150 } 115 //}151 } 116 152 117 153 // mmap the submit queue entries … … 134 170 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 135 171 sq.alloc = *sq.tail; 172 sq.ready = *sq.tail; 136 173 137 174 // completion queue … … 160 197 (this.io.submit){ min(*sq.num, *cq.num) }; 161 198 199 // Initialize statistics 200 #if !defined(__CFA_NO_STATISTICS__) 201 this.io.submit_q.stats.submit_avg.val = 0; 202 this.io.submit_q.stats.submit_avg.cnt = 0; 203 this.io.completion_q.stats.completed_avg.val = 0; 204 this.io.completion_q.stats.completed_avg.cnt = 0; 205 #endif 206 207 if(!main_cluster) { 208 __kernel_io_finish_start( this ); 209 } 210 } 211 212 void __kernel_io_finish_start( cluster & this ) { 213 #if defined(__CFA_IO_POLLING_USER__) 214 __cfadbg_print_safe(io, "Kernel I/O : Creating fast poller for cluter %p\n", &this); 215 (this.io.poller.fast){ "Fast IO Poller", this }; 216 __thrd_start( this.io.poller.fast, main ); 217 #endif 218 162 219 // Create the poller thread 163 this.io.stack = __create_pthread( &this.io.poller, __io_poller, &this ); 164 } 165 166 void __kernel_io_shutdown( cluster & this ) { 167 // Stop the IO Poller 220 __cfadbg_print_safe(io, "Kernel I/O : Creating slow poller for cluter %p\n", &this); 221 this.io.poller.slow.stack = __create_pthread( &this.io.poller.slow.kthrd, __io_poller_slow, &this ); 222 } 223 224 void __kernel_io_prepare_stop( cluster & this ) { 225 __cfadbg_print_safe(io, "Kernel I/O : Stopping pollers for cluster\n", &this); 168 226 // Notify the poller thread of the shutdown 169 227 __atomic_store_n(&this.io.done, true, __ATOMIC_SEQ_CST); 228 229 // Stop the IO Poller 170 230 sigval val = { 1 }; 171 pthread_sigqueue( this.io.poller, SIGUSR1, val ); 231 pthread_sigqueue( this.io.poller.slow.kthrd, SIGUSR1, val ); 232 #if defined(__CFA_IO_POLLING_USER__) 233 post( this.io.poller.sem ); 234 #endif 172 235 173 236 // Wait for the poller thread to finish 174 pthread_join( this.io.poller, 0p ); 175 free( this.io.stack ); 237 pthread_join( this.io.poller.slow.kthrd, 0p ); 238 free( this.io.poller.slow.stack ); 239 240 __cfadbg_print_safe(io, "Kernel I/O : Slow poller stopped for cluster\n", &this); 241 242 #if defined(__CFA_IO_POLLING_USER__) 243 // unpark the fast io_poller 244 unpark( &this.io.poller.fast.thrd __cfaabi_dbg_ctx2 ); 245 246 ^(this.io.poller.fast){}; 247 248 __cfadbg_print_safe(io, "Kernel I/O : Fast poller stopped for cluster\n", &this); 249 #endif 250 } 251 252 void __kernel_io_shutdown( cluster & this, bool main_cluster ) { 253 if(!main_cluster) { 254 __kernel_io_prepare_stop( this ); 255 } 256 257 // print statistics 258 #if !defined(__CFA_NO_STATISTICS__) 259 if(this.print_stats) { 260 __cfaabi_bits_print_safe( STDERR_FILENO, 261 "----- I/O uRing Stats -----\n" 262 "- total submit calls : %llu\n" 263 "- avg submit : %lf\n" 264 "- total wait calls : %llu\n" 265 "- avg completion/wait : %lf\n", 266 this.io.submit_q.stats.submit_avg.cnt, 267 ((double)this.io.submit_q.stats.submit_avg.val) / this.io.submit_q.stats.submit_avg.cnt, 268 this.io.completion_q.stats.completed_avg.cnt, 269 ((double)this.io.completion_q.stats.completed_avg.val) / this.io.completion_q.stats.completed_avg.cnt 270 ); 271 } 272 #endif 176 273 177 274 // Shutdown the io rings … … 204 301 // Process a single completion message from the io_uring 205 302 // This is NOT thread-safe 206 static bool __io_process(struct io_ring & ring) { 303 static int __drain_io( struct io_ring & ring, sigset_t * mask, int waitcnt, bool in_kernel ) { 304 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 305 if( ret < 0 ) { 306 switch((int)errno) { 307 case EAGAIN: 308 case EINTR: 309 return -EAGAIN; 310 default: 311 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 312 } 313 } 314 315 // Drain the queue 207 316 unsigned head = *ring.completion_q.head; 208 317 unsigned tail = __atomic_load_n(ring.completion_q.tail, __ATOMIC_ACQUIRE); 209 318 210 if (head == tail) return false; 211 212 unsigned idx = head & (*ring.completion_q.mask); 213 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; 214 215 /* paranoid */ verify(&cqe); 216 217 struct io_user_data * data = (struct io_user_data *)cqe.user_data; 218 // __cfaabi_bits_print_safe( STDERR_FILENO, "Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd ); 219 220 data->result = cqe.res; 221 __unpark( data->thrd __cfaabi_dbg_ctx2 ); 319 // Nothing was new return 0 320 if (head == tail) { 321 #if !defined(__CFA_NO_STATISTICS__) 322 ring.completion_q.stats.completed_avg.cnt += 1; 323 #endif 324 return 0; 325 } 326 327 uint32_t count = tail - head; 328 for(i; count) { 329 unsigned idx = (head + i) & (*ring.completion_q.mask); 330 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; 331 332 /* paranoid */ verify(&cqe); 333 334 struct io_user_data * data = (struct io_user_data *)cqe.user_data; 335 __cfadbg_print_safe( io, "Kernel I/O : Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd ); 336 337 data->result = cqe.res; 338 if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); } 339 else { __unpark( data->thrd __cfaabi_dbg_ctx2 ); } 340 } 222 341 223 342 // Allow new submissions to happen 224 V(ring.submit );343 V(ring.submit, count); 225 344 226 345 // Mark to the kernel that the cqe has been seen 227 346 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 228 __atomic_fetch_add( ring.completion_q.head, 1, __ATOMIC_RELAXED ); 229 230 return true; 231 } 232 233 static void * __io_poller( void * arg ) { 347 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED ); 348 349 // Update statistics 350 #if !defined(__CFA_NO_STATISTICS__) 351 ring.completion_q.stats.completed_avg.val += count; 352 ring.completion_q.stats.completed_avg.cnt += 1; 353 #endif 354 355 return count; 356 } 357 358 static void * __io_poller_slow( void * arg ) { 234 359 cluster * cltr = (cluster *)arg; 235 360 struct io_ring & ring = cltr->io; … … 246 371 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) ); 247 372 248 LOOP: while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 249 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, 1, IORING_ENTER_GETEVENTS, &mask, _NSIG / 8); 250 if( ret < 0 ) { 251 switch((int)errno) { 252 case EAGAIN: 253 case EINTR: 254 continue LOOP; 255 default: 256 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 373 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 374 #if defined(__CFA_IO_POLLING_USER__) 375 376 // In the user-thread approach drain and if anything was drained, 377 // batton pass to the user-thread 378 int count = __drain_io( ring, &mask, 1, true ); 379 if(count > 0) { 380 __cfadbg_print_safe(io, "Kernel I/O : Moving to ring %p to fast poller\n", &ring); 381 __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 ); 382 wait( ring.poller.sem ); 383 } 384 385 #else 386 387 //In the naive approach, just poll the io completion queue directly 388 __drain_io( ring, &mask, 1, true ); 389 390 #endif 391 } 392 393 return 0p; 394 } 395 396 #if defined(__CFA_IO_POLLING_USER__) 397 void main( __io_poller_fast & this ) { 398 // Start parked 399 park( __cfaabi_dbg_ctx ); 400 401 // Then loop until we need to start 402 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) { 403 // Drain the io 404 if(0 > __drain_io( *this.ring, 0p, 0, false )) { 405 // If we got something, just yield and check again 406 yield(); 407 } 408 else { 409 // We didn't get anything baton pass to the slow poller 410 __cfadbg_print_safe(io, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring); 411 post( this.ring->poller.sem ); 412 park( __cfaabi_dbg_ctx ); 257 413 } 258 414 } 259 260 // Drain the queue 261 while(__io_process(ring)) {} 262 } 263 264 return 0p; 265 } 415 } 416 #endif 266 417 267 418 //============================================================================================= … … 293 444 // 294 445 295 static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct io_ring & ring ) { 296 // Wait for a spot to be available 297 P(ring.submit); 298 299 // Allocate the sqe 300 uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST); 301 302 // Validate that we didn't overflow anything 303 // Check that nothing overflowed 304 /* paranoid */ verify( true ); 305 306 // Check that it goes head -> tail -> alloc and never head -> alloc -> tail 307 /* paranoid */ verify( true ); 308 309 // Return the sqe 310 return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx]; 311 } 312 313 static inline void __submit( struct io_ring & ring, uint32_t idx ) { 314 // get mutual exclusion 315 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 316 317 // Append to the list of ready entries 318 uint32_t * tail = ring.submit_q.tail; 319 const uint32_t mask = *ring.submit_q.mask; 320 321 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 322 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 323 324 // Submit however, many entries need to be submitted 325 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0); 326 // __cfaabi_bits_print_safe( STDERR_FILENO, "Performed io_submit, returned %d\n", ret ); 327 if( ret < 0 ) { 328 switch((int)errno) { 329 default: 330 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 331 } 332 } 333 334 unlock(ring.submit_q.lock); 335 // Make sure that idx was submitted 336 // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us 337 } 338 339 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) { 340 this.opcode = opcode; 341 #if !defined(IOSQE_ASYNC) 342 this.flags = 0; 343 #else 344 this.flags = IOSQE_ASYNC; 345 #endif 346 this.ioprio = 0; 347 this.fd = fd; 348 this.off = 0; 349 this.addr = 0; 350 this.len = 0; 351 this.rw_flags = 0; 352 this.__pad2[0] = this.__pad2[1] = this.__pad2[2] = 0; 353 } 354 355 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd, void * addr, uint32_t len, uint64_t off ) { 356 (this){ opcode, fd }; 357 this.off = off; 358 this.addr = (uint64_t)addr; 359 this.len = len; 360 } 446 static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct io_ring & ring ) { 447 // Wait for a spot to be available 448 P(ring.submit); 449 450 // Allocate the sqe 451 uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST); 452 453 // Validate that we didn't overflow anything 454 // Check that nothing overflowed 455 /* paranoid */ verify( true ); 456 457 // Check that it goes head -> tail -> alloc and never head -> alloc -> tail 458 /* paranoid */ verify( true ); 459 460 // Return the sqe 461 return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx]; 462 } 463 464 static inline void __submit( struct io_ring & ring, uint32_t idx ) { 465 // get mutual exclusion 466 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 467 468 // Append to the list of ready entries 469 uint32_t * tail = ring.submit_q.tail; 470 const uint32_t mask = *ring.submit_q.mask; 471 472 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 473 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 474 475 // Submit however, many entries need to be submitted 476 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0); 477 if( ret < 0 ) { 478 switch((int)errno) { 479 default: 480 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 481 } 482 } 483 484 // update statistics 485 #if !defined(__CFA_NO_STATISTICS__) 486 ring.submit_q.stats.submit_avg.val += 1; 487 ring.submit_q.stats.submit_avg.cnt += 1; 488 #endif 489 490 unlock(ring.submit_q.lock); 491 // Make sure that idx was submitted 492 // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us 493 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret ); 494 } 495 496 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) { 497 this.opcode = opcode; 498 #if !defined(IOSQE_ASYNC) 499 this.flags = 0; 500 #else 501 this.flags = IOSQE_ASYNC; 502 #endif 503 this.ioprio = 0; 504 this.fd = fd; 505 this.off = 0; 506 this.addr = 0; 507 this.len = 0; 508 this.rw_flags = 0; 509 this.__pad2[0] = this.__pad2[1] = this.__pad2[2] = 0; 510 } 511 512 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd, void * addr, uint32_t len, uint64_t off ) { 513 (this){ opcode, fd }; 514 this.off = off; 515 this.addr = (uint64_t)addr; 516 this.len = len; 517 } 518 361 519 362 520 //============================================================================================= 363 521 // I/O Interface 364 522 //============================================================================================= 365 extern "C" {366 #define __USE_GNU367 #define _GNU_SOURCE368 #include <fcntl.h>369 #include <sys/uio.h>370 #include <sys/socket.h>371 #include <sys/stat.h>372 }373 523 374 524 #define __submit_prelude \ … … 385 535 park( __cfaabi_dbg_ctx ); \ 386 536 return data.result; 537 #endif 538 539 // Some forward declarations 540 extern "C" { 541 #include <sys/types.h> 542 struct iovec; 543 extern ssize_t preadv2 (int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags); 544 extern ssize_t pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags); 545 546 extern int fsync(int fd); 547 extern int sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags); 548 549 struct msghdr; 550 struct sockaddr; 551 extern ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags); 552 extern ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags); 553 extern ssize_t send(int sockfd, const void *buf, size_t len, int flags); 554 extern ssize_t recv(int sockfd, void *buf, size_t len, int flags); 555 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); 556 extern int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen); 557 558 extern int fallocate(int fd, int mode, uint64_t offset, uint64_t len); 559 extern int posix_fadvise(int fd, uint64_t offset, uint64_t len, int advice); 560 extern int madvise(void *addr, size_t length, int advice); 561 562 extern int openat(int dirfd, const char *pathname, int flags, mode_t mode); 563 extern int close(int fd); 564 565 struct statx; 566 extern int statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf); 567 568 extern ssize_t read (int fd, void *buf, size_t count); 569 } 387 570 388 571 //----------------------------------------------------------------------------- 389 572 // Asynchronous operations 390 ssize_t async_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 391 #if !defined(IORING_OP_READV) 392 return preadv2(fd, iov, iovcnt, offset, flags); 393 #else 394 __submit_prelude 395 396 (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset }; 397 398 __submit_wait 399 #endif 400 } 401 402 ssize_t async_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 403 #if !defined(IORING_OP_WRITEV) 404 return pwritev2(fd, iov, iovcnt, offset, flags); 405 #else 406 __submit_prelude 407 408 (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset }; 409 410 __submit_wait 411 #endif 412 } 413 414 int async_fsync(int fd) { 415 #if !defined(IORING_OP_FSYNC) 416 return fsync(fd); 417 #else 418 __submit_prelude 419 420 (*sqe){ IORING_OP_FSYNC, fd }; 421 422 __submit_wait 423 #endif 424 } 425 426 int async_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags) { 427 #if !defined(IORING_OP_SYNC_FILE_RANGE) 428 return sync_file_range(fd, offset, nbytes, flags); 429 #else 430 __submit_prelude 431 432 (*sqe){ IORING_OP_SYNC_FILE_RANGE, fd }; 433 sqe->off = offset; 434 sqe->len = nbytes; 435 sqe->sync_range_flags = flags; 436 437 __submit_wait 438 #endif 439 } 440 441 442 ssize_t async_sendmsg(int sockfd, const struct msghdr *msg, int flags) { 443 #if !defined(IORING_OP_SENDMSG) 444 return recv(sockfd, msg, flags); 445 #else 446 __submit_prelude 447 448 (*sqe){ IORING_OP_SENDMSG, sockfd, msg, 1, 0 }; 449 sqe->msg_flags = flags; 450 451 __submit_wait 452 #endif 453 } 454 455 ssize_t async_recvmsg(int sockfd, struct msghdr *msg, int flags) { 456 #if !defined(IORING_OP_RECVMSG) 457 return recv(sockfd, msg, flags); 458 #else 459 __submit_prelude 460 461 (*sqe){ IORING_OP_RECVMSG, sockfd, msg, 1, 0 }; 462 sqe->msg_flags = flags; 463 464 __submit_wait 465 #endif 466 } 467 468 ssize_t async_send(int sockfd, const void *buf, size_t len, int flags) { 469 #if !defined(IORING_OP_SEND) 470 return send( sockfd, buf, len, flags ); 471 #else 472 __submit_prelude 473 474 (*sqe){ IORING_OP_SEND, sockfd }; 475 sqe->addr = (uint64_t)buf; 476 sqe->len = len; 477 sqe->msg_flags = flags; 478 479 __submit_wait 480 #endif 481 } 482 483 ssize_t async_recv(int sockfd, void *buf, size_t len, int flags) { 484 #if !defined(IORING_OP_RECV) 485 return recv( sockfd, buf, len, flags ); 486 #else 487 __submit_prelude 488 489 (*sqe){ IORING_OP_RECV, sockfd }; 490 sqe->addr = (uint64_t)buf; 491 sqe->len = len; 492 sqe->msg_flags = flags; 493 494 __submit_wait 495 #endif 496 } 497 498 int async_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) { 499 #if !defined(IORING_OP_ACCEPT) 500 __SOCKADDR_ARG _addr; 501 _addr.__sockaddr__ = addr; 502 return accept4( sockfd, _addr, addrlen, flags ); 503 #else 504 __submit_prelude 505 506 (*sqe){ IORING_OP_ACCEPT, sockfd }; 507 sqe->addr = addr; 508 sqe->addr2 = addrlen; 509 sqe->accept_flags = flags; 510 511 __submit_wait 512 #endif 513 } 514 515 int async_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) { 516 #if !defined(IORING_OP_CONNECT) 517 __CONST_SOCKADDR_ARG _addr; 518 _addr.__sockaddr__ = addr; 519 return connect( sockfd, _addr, addrlen ); 520 #else 521 __submit_prelude 522 523 (*sqe){ IORING_OP_CONNECT, sockfd }; 524 sqe->addr = (uint64_t)addr; 525 sqe->off = addrlen; 526 527 __submit_wait 528 #endif 529 } 530 531 int async_fallocate(int fd, int mode, uint64_t offset, uint64_t len) { 532 #if !defined(IORING_OP_FALLOCATE) 533 return fallocate( fd, mode, offset, len ); 534 #else 535 __submit_prelude 536 537 (*sqe){ IORING_OP_FALLOCATE, fd }; 538 sqe->off = offset; 539 sqe->len = length; 540 sqe->mode = mode; 541 542 __submit_wait 543 #endif 544 } 545 546 int async_fadvise(int fd, uint64_t offset, uint64_t len, int advice) { 547 #if !defined(IORING_OP_FADVISE) 548 return posix_fadvise( fd, offset, len, advice ); 549 #else 550 __submit_prelude 551 552 (*sqe){ IORING_OP_FADVISE, fd }; 553 sqe->off = (uint64_t)offset; 554 sqe->len = length; 555 sqe->fadvise_advice = advice; 556 557 __submit_wait 558 #endif 559 } 560 561 int async_madvise(void *addr, size_t length, int advice) { 562 #if !defined(IORING_OP_MADVISE) 563 return madvise( addr, length, advice ); 564 #else 565 __submit_prelude 566 567 (*sqe){ IORING_OP_MADVISE, 0 }; 568 sqe->addr = (uint64_t)addr; 569 sqe->len = length; 570 sqe->fadvise_advice = advice; 571 572 __submit_wait 573 #endif 574 } 575 576 int async_openat(int dirfd, const char *pathname, int flags, mode_t mode) { 577 #if !defined(IORING_OP_OPENAT) 578 return openat( dirfd, pathname, flags, mode ); 579 #else 580 __submit_prelude 581 582 (*sqe){ IORING_OP_OPENAT, dirfd }; 583 sqe->addr = (uint64_t)pathname; 584 sqe->open_flags = flags; 585 sqe->mode = mode; 586 587 __submit_wait 588 #endif 589 } 590 591 int async_close(int fd) { 592 #if !defined(IORING_OP_CLOSE) 593 return close( fd ); 594 #else 595 __submit_prelude 596 597 (*sqe){ IORING_OP_CLOSE, fd }; 598 599 __submit_wait 600 #endif 601 } 602 603 int async_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf) { 604 #if !defined(IORING_OP_STATX) 605 //return statx( dirfd, pathname, flags, mask, statxbuf ); 606 return syscall( __NR_io_uring_setup, dirfd, pathname, flags, mask, statxbuf ); 607 #else 608 __submit_prelude 609 610 (*sqe){ IORING_OP_STATX, dirfd }; 611 sqe->addr = (uint64_t)pathname; 612 sqe->statx_flags = flags; 613 sqe->len = mask; 614 sqe->off = (uint64_t)statxbuf; 615 616 __submit_wait 617 #endif 618 } 619 620 621 ssize_t async_read(int fd, void *buf, size_t count) { 622 #if !defined(IORING_OP_READ) 623 return read( fd, buf, count ); 624 #else 625 __submit_prelude 626 627 (*sqe){ IORING_OP_READ, fd, buf, count, 0 }; 628 629 __submit_wait 630 #endif 631 } 632 633 ssize_t async_write(int fd, void *buf, size_t count) { 634 #if !defined(IORING_OP_WRITE) 635 return read( fd, buf, count ); 636 #else 637 __submit_prelude 638 639 (*sqe){ IORING_OP_WRITE, fd, buf, count, 0 }; 640 641 __submit_wait 642 #endif 643 } 573 ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 574 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READV) 575 return preadv2(fd, iov, iovcnt, offset, flags); 576 #else 577 __submit_prelude 578 579 (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset }; 580 581 __submit_wait 582 #endif 583 } 584 585 ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 586 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITEV) 587 return pwritev2(fd, iov, iovcnt, offset, flags); 588 #else 589 __submit_prelude 590 591 (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset }; 592 593 __submit_wait 594 #endif 595 } 596 597 int cfa_fsync(int fd) { 598 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FSYNC) 599 return fsync(fd); 600 #else 601 __submit_prelude 602 603 (*sqe){ IORING_OP_FSYNC, fd }; 604 605 __submit_wait 606 #endif 607 } 608 609 int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags) { 610 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SYNC_FILE_RANGE) 611 return sync_file_range(fd, offset, nbytes, flags); 612 #else 613 __submit_prelude 614 615 (*sqe){ IORING_OP_SYNC_FILE_RANGE, fd }; 616 sqe->off = offset; 617 sqe->len = nbytes; 618 sqe->sync_range_flags = flags; 619 620 __submit_wait 621 #endif 622 } 623 624 625 ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags) { 626 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SENDMSG) 627 return recv(sockfd, msg, flags); 628 #else 629 __submit_prelude 630 631 (*sqe){ IORING_OP_SENDMSG, sockfd, msg, 1, 0 }; 632 sqe->msg_flags = flags; 633 634 __submit_wait 635 #endif 636 } 637 638 ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags) { 639 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECVMSG) 640 return recv(sockfd, msg, flags); 641 #else 642 __submit_prelude 643 644 (*sqe){ IORING_OP_RECVMSG, sockfd, msg, 1, 0 }; 645 sqe->msg_flags = flags; 646 647 __submit_wait 648 #endif 649 } 650 651 ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags) { 652 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SEND) 653 return send( sockfd, buf, len, flags ); 654 #else 655 __submit_prelude 656 657 (*sqe){ IORING_OP_SEND, sockfd }; 658 sqe->addr = (uint64_t)buf; 659 sqe->len = len; 660 sqe->msg_flags = flags; 661 662 __submit_wait 663 #endif 664 } 665 666 ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags) { 667 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECV) 668 return recv( sockfd, buf, len, flags ); 669 #else 670 __submit_prelude 671 672 (*sqe){ IORING_OP_RECV, sockfd }; 673 sqe->addr = (uint64_t)buf; 674 sqe->len = len; 675 sqe->msg_flags = flags; 676 677 __submit_wait 678 #endif 679 } 680 681 int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) { 682 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_ACCEPT) 683 return accept4( sockfd, addr, addrlen, flags ); 684 #else 685 __submit_prelude 686 687 (*sqe){ IORING_OP_ACCEPT, sockfd }; 688 sqe->addr = addr; 689 sqe->addr2 = addrlen; 690 sqe->accept_flags = flags; 691 692 __submit_wait 693 #endif 694 } 695 696 int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) { 697 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CONNECT) 698 return connect( sockfd, addr, addrlen ); 699 #else 700 __submit_prelude 701 702 (*sqe){ IORING_OP_CONNECT, sockfd }; 703 sqe->addr = (uint64_t)addr; 704 sqe->off = addrlen; 705 706 __submit_wait 707 #endif 708 } 709 710 int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len) { 711 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FALLOCATE) 712 return fallocate( fd, mode, offset, len ); 713 #else 714 __submit_prelude 715 716 (*sqe){ IORING_OP_FALLOCATE, fd }; 717 sqe->off = offset; 718 sqe->len = length; 719 sqe->mode = mode; 720 721 __submit_wait 722 #endif 723 } 724 725 int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice) { 726 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FADVISE) 727 return posix_fadvise( fd, offset, len, advice ); 728 #else 729 __submit_prelude 730 731 (*sqe){ IORING_OP_FADVISE, fd }; 732 sqe->off = (uint64_t)offset; 733 sqe->len = length; 734 sqe->fadvise_advice = advice; 735 736 __submit_wait 737 #endif 738 } 739 740 int cfa_madvise(void *addr, size_t length, int advice) { 741 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_MADVISE) 742 return madvise( addr, length, advice ); 743 #else 744 __submit_prelude 745 746 (*sqe){ IORING_OP_MADVISE, 0 }; 747 sqe->addr = (uint64_t)addr; 748 sqe->len = length; 749 sqe->fadvise_advice = advice; 750 751 __submit_wait 752 #endif 753 } 754 755 int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode) { 756 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_OPENAT) 757 return openat( dirfd, pathname, flags, mode ); 758 #else 759 __submit_prelude 760 761 (*sqe){ IORING_OP_OPENAT, dirfd }; 762 sqe->addr = (uint64_t)pathname; 763 sqe->open_flags = flags; 764 sqe->mode = mode; 765 766 __submit_wait 767 #endif 768 } 769 770 int cfa_close(int fd) { 771 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CLOSE) 772 return close( fd ); 773 #else 774 __submit_prelude 775 776 (*sqe){ IORING_OP_CLOSE, fd }; 777 778 __submit_wait 779 #endif 780 } 781 782 int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf) { 783 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_STATX) 784 //return statx( dirfd, pathname, flags, mask, statxbuf ); 785 return syscall( __NR_io_uring_setup, dirfd, pathname, flags, mask, statxbuf ); 786 #else 787 __submit_prelude 788 789 (*sqe){ IORING_OP_STATX, dirfd }; 790 sqe->addr = (uint64_t)pathname; 791 sqe->statx_flags = flags; 792 sqe->len = mask; 793 sqe->off = (uint64_t)statxbuf; 794 795 __submit_wait 796 #endif 797 } 798 799 800 ssize_t cfa_read(int fd, void *buf, size_t count) { 801 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READ) 802 return read( fd, buf, count ); 803 #else 804 __submit_prelude 805 806 (*sqe){ IORING_OP_READ, fd, buf, count, 0 }; 807 808 __submit_wait 809 #endif 810 } 811 812 ssize_t cfa_write(int fd, void *buf, size_t count) { 813 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITE) 814 return read( fd, buf, count ); 815 #else 816 __submit_prelude 817 818 (*sqe){ IORING_OP_WRITE, fd, buf, count, 0 }; 819 820 __submit_wait 821 #endif 822 } 644 823 645 824 //----------------------------------------------------------------------------- … … 647 826 648 827 // Macro magic to reduce the size of the following switch case 649 650 651 652 653 654 bool is_async( fptr_t func ) {655 828 #define IS_DEFINED_APPLY(f, ...) f(__VA_ARGS__) 829 #define IS_DEFINED_SECOND(first, second, ...) second 830 #define IS_DEFINED_TEST(expansion) _CFA_IO_FEATURE_##expansion 831 #define IS_DEFINED(macro) IS_DEFINED_APPLY( IS_DEFINED_SECOND,IS_DEFINED_TEST(macro) false, true) 832 833 bool has_user_level_blocking( fptr_t func ) { 834 #if defined(HAVE_LINUX_IO_URING_H) 656 835 if( /*func == (fptr_t)preadv2 || */ 657 func == (fptr_t) async_preadv2 )836 func == (fptr_t)cfa_preadv2 ) 658 837 #define _CFA_IO_FEATURE_IORING_OP_READV , 659 838 return IS_DEFINED(IORING_OP_READV); 660 839 661 840 if( /*func == (fptr_t)pwritev2 || */ 662 func == (fptr_t)async_pwritev2 )841 func == (fptr_t)cfa_pwritev2 ) 663 842 #define _CFA_IO_FEATURE_IORING_OP_WRITEV , 664 843 return IS_DEFINED(IORING_OP_WRITEV); 665 844 666 845 if( /*func == (fptr_t)fsync || */ 667 func == (fptr_t)async_fsync )846 func == (fptr_t)cfa_fsync ) 668 847 #define _CFA_IO_FEATURE_IORING_OP_FSYNC , 669 848 return IS_DEFINED(IORING_OP_FSYNC); 670 849 671 850 if( /*func == (fptr_t)ync_file_range || */ 672 func == (fptr_t)async_sync_file_range )851 func == (fptr_t)cfa_sync_file_range ) 673 852 #define _CFA_IO_FEATURE_IORING_OP_SYNC_FILE_RANGE , 674 853 return IS_DEFINED(IORING_OP_SYNC_FILE_RANGE); 675 854 676 855 if( /*func == (fptr_t)sendmsg || */ 677 func == (fptr_t)async_sendmsg )856 func == (fptr_t)cfa_sendmsg ) 678 857 #define _CFA_IO_FEATURE_IORING_OP_SENDMSG , 679 858 return IS_DEFINED(IORING_OP_SENDMSG); 680 859 681 860 if( /*func == (fptr_t)recvmsg || */ 682 func == (fptr_t)async_recvmsg )861 func == (fptr_t)cfa_recvmsg ) 683 862 #define _CFA_IO_FEATURE_IORING_OP_RECVMSG , 684 863 return IS_DEFINED(IORING_OP_RECVMSG); 685 864 686 865 if( /*func == (fptr_t)send || */ 687 func == (fptr_t) async_send )866 func == (fptr_t)cfa_send ) 688 867 #define _CFA_IO_FEATURE_IORING_OP_SEND , 689 868 return IS_DEFINED(IORING_OP_SEND); 690 869 691 870 if( /*func == (fptr_t)recv || */ 692 func == (fptr_t) async_recv )871 func == (fptr_t)cfa_recv ) 693 872 #define _CFA_IO_FEATURE_IORING_OP_RECV , 694 873 return IS_DEFINED(IORING_OP_RECV); 695 874 696 875 if( /*func == (fptr_t)accept4 || */ 697 func == (fptr_t) async_accept4 )876 func == (fptr_t)cfa_accept4 ) 698 877 #define _CFA_IO_FEATURE_IORING_OP_ACCEPT , 699 878 return IS_DEFINED(IORING_OP_ACCEPT); 700 879 701 880 if( /*func == (fptr_t)connect || */ 702 func == (fptr_t) async_connect )881 func == (fptr_t)cfa_connect ) 703 882 #define _CFA_IO_FEATURE_IORING_OP_CONNECT , 704 883 return IS_DEFINED(IORING_OP_CONNECT); 705 884 706 885 if( /*func == (fptr_t)fallocate || */ 707 func == (fptr_t) async_fallocate )886 func == (fptr_t)cfa_fallocate ) 708 887 #define _CFA_IO_FEATURE_IORING_OP_FALLOCATE , 709 888 return IS_DEFINED(IORING_OP_FALLOCATE); 710 889 711 if( /*func == (fptr_t) fadvise || */712 func == (fptr_t) async_fadvise )890 if( /*func == (fptr_t)posix_fadvise || */ 891 func == (fptr_t)cfa_fadvise ) 713 892 #define _CFA_IO_FEATURE_IORING_OP_FADVISE , 714 893 return IS_DEFINED(IORING_OP_FADVISE); 715 894 716 895 if( /*func == (fptr_t)madvise || */ 717 func == (fptr_t) async_madvise )896 func == (fptr_t)cfa_madvise ) 718 897 #define _CFA_IO_FEATURE_IORING_OP_MADVISE , 719 898 return IS_DEFINED(IORING_OP_MADVISE); 720 899 721 900 if( /*func == (fptr_t)openat || */ 722 func == (fptr_t) async_openat )901 func == (fptr_t)cfa_openat ) 723 902 #define _CFA_IO_FEATURE_IORING_OP_OPENAT , 724 903 return IS_DEFINED(IORING_OP_OPENAT); 725 904 726 905 if( /*func == (fptr_t)close || */ 727 func == (fptr_t) async_close )906 func == (fptr_t)cfa_close ) 728 907 #define _CFA_IO_FEATURE_IORING_OP_CLOSE , 729 908 return IS_DEFINED(IORING_OP_CLOSE); 730 909 731 910 if( /*func == (fptr_t)statx || */ 732 func == (fptr_t) async_statx )911 func == (fptr_t)cfa_statx ) 733 912 #define _CFA_IO_FEATURE_IORING_OP_STATX , 734 913 return IS_DEFINED(IORING_OP_STATX); 735 914 736 915 if( /*func == (fptr_t)read || */ 737 func == (fptr_t)async_read )916 func == (fptr_t)cfa_read ) 738 917 #define _CFA_IO_FEATURE_IORING_OP_READ , 739 918 return IS_DEFINED(IORING_OP_READ); 740 919 741 920 if( /*func == (fptr_t)write || */ 742 func == (fptr_t)async_write )921 func == (fptr_t)cfa_write ) 743 922 #define _CFA_IO_FEATURE_IORING_OP_WRITE , 744 923 return IS_DEFINED(IORING_OP_WRITE); 745 746 return false; 747 } 748 749 #endif 924 #endif 925 926 return false; 927 }
Note:
See TracChangeset
for help on using the changeset viewer.