Changes in libcfa/src/concurrency/io.cfa [4069faad:8962722]
- File:
-
- 1 edited
-
libcfa/src/concurrency/io.cfa (modified) (12 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r4069faad r8962722 1 //2 // Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo3 //4 // The contents of this file are covered under the licence agreement in the5 // file "LICENCE" distributed with Cforall.6 //7 // io.cfa --8 //9 // Author : Thierry Delisle10 // Created On : Thu Apr 23 17:31:00 202011 // Last Modified By :12 // Last Modified On :13 // Update Count :14 //15 16 // #define __CFA_DEBUG_PRINT_IO__17 18 1 #include "kernel.hfa" 19 2 20 3 #if !defined(HAVE_LINUX_IO_URING_H) 21 void __kernel_io_startup( cluster & ) {4 void __kernel_io_startup( cluster & this ) { 22 5 // Nothing to do without io_uring 23 6 } 24 7 25 void __kernel_io_s tart_thrd( cluster &) {8 void __kernel_io_shutdown( cluster & this ) { 26 9 // Nothing to do without io_uring 27 10 } 28 11 29 void __kernel_io_stop_thrd ( cluster & ) { 30 // Nothing to do without io_uring 31 } 32 33 void __kernel_io_shutdown( cluster & ) { 34 // Nothing to do without io_uring 12 bool is_async( void (*)() ) { 13 return false; 35 14 } 36 15 … … 56 35 } 57 36 58 static void * __io_poller_slow( void * arg ); 59 60 // Weirdly, some systems that do support io_uring don't actually define these 61 #ifdef __alpha__ 62 /* 63 * alpha is the only exception, all other architectures 64 * have common numbers for new system calls. 65 */ 66 #ifndef __NR_io_uring_setup 67 #define __NR_io_uring_setup 535 68 #endif 69 #ifndef __NR_io_uring_enter 70 #define __NR_io_uring_enter 536 71 #endif 72 #ifndef __NR_io_uring_register 73 #define __NR_io_uring_register 537 74 #endif 75 #else /* !__alpha__ */ 76 #ifndef __NR_io_uring_setup 77 #define __NR_io_uring_setup 425 78 #endif 79 #ifndef __NR_io_uring_enter 80 #define __NR_io_uring_enter 426 81 #endif 82 #ifndef __NR_io_uring_register 83 #define __NR_io_uring_register 427 84 #endif 85 #endif 86 87 #if defined(__CFA_IO_POLLING_USER__) 88 void ?{}( __io_poller_fast & this, struct cluster & cltr ) { 89 this.ring = &cltr.io; 90 (this.thrd){ "I/O Poller", cltr }; 91 } 92 void ^?{}( __io_poller_fast & mutex this ); 93 void main( __io_poller_fast & this ); 94 static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; } 95 void ^?{}( __io_poller_fast & mutex this ) {} 96 #endif 37 static void * __io_poller( void * arg ); 38 39 // Weirdly, some systems that do support io_uring don't actually define these 40 #ifdef __alpha__ 41 /* 42 * alpha is the only exception, all other architectures 43 * have common numbers for new system calls. 44 */ 45 # ifndef __NR_io_uring_setup 46 # define __NR_io_uring_setup 535 47 # endif 48 # ifndef __NR_io_uring_enter 49 # define __NR_io_uring_enter 536 50 # endif 51 # ifndef __NR_io_uring_register 52 # define __NR_io_uring_register 537 53 # endif 54 #else /* !__alpha__ */ 55 # ifndef __NR_io_uring_setup 56 # define __NR_io_uring_setup 425 57 # endif 58 # ifndef __NR_io_uring_enter 59 # define __NR_io_uring_enter 426 60 # endif 61 # ifndef __NR_io_uring_register 62 # define __NR_io_uring_register 427 63 # endif 64 #endif 65 97 66 98 67 //============================================================================================= 99 68 // I/O Startup / Shutdown logic 100 69 //============================================================================================= 101 void __kernel_io_startup( cluster & this , bool main_cluster) {70 void __kernel_io_startup( cluster & this ) { 102 71 // Step 1 : call to setup 103 72 struct io_uring_params params; … … 121 90 122 91 // Requires features 123 #if defined(IORING_FEAT_SINGLE_MMAP) 124 // adjust the size according to the parameters 125 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 126 cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz); 127 } 128 #endif 92 // // adjust the size according to the parameters 93 // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 94 // cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz); 95 // } 129 96 130 97 // mmap the Submit Queue into existence … … 134 101 } 135 102 103 // mmap the Completion Queue into existence (may or may not be needed) 136 104 // Requires features 137 #if defined(IORING_FEAT_SINGLE_MMAP) 138 // mmap the Completion Queue into existence (may or may not be needed) 139 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 140 cq->ring_ptr = sq->ring_ptr; 141 } 142 else 143 #endif 144 { 105 // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 106 // cq->ring_ptr = sq->ring_ptr; 107 // } 108 // else { 145 109 // We need multiple call to MMAP 146 110 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING); … … 149 113 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno)); 150 114 } 151 }115 // } 152 116 153 117 // mmap the submit queue entries … … 170 134 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 171 135 sq.alloc = *sq.tail; 172 sq.ready = *sq.tail;173 136 174 137 // completion queue … … 197 160 (this.io.submit){ min(*sq.num, *cq.num) }; 198 161 199 // Initialize statistics200 #if !defined(__CFA_NO_STATISTICS__)201 this.io.submit_q.stats.submit_avg.val = 0;202 this.io.submit_q.stats.submit_avg.cnt = 0;203 this.io.completion_q.stats.completed_avg.val = 0;204 this.io.completion_q.stats.completed_avg.cnt = 0;205 #endif206 207 if(!main_cluster) {208 __kernel_io_finish_start( this );209 }210 }211 212 void __kernel_io_finish_start( cluster & this ) {213 #if defined(__CFA_IO_POLLING_USER__)214 __cfadbg_print_safe(io, "Kernel I/O : Creating fast poller for cluter %p\n", &this);215 (this.io.poller.fast){ "Fast IO Poller", this };216 __thrd_start( this.io.poller.fast, main );217 #endif218 219 162 // Create the poller thread 220 __cfadbg_print_safe(io, "Kernel I/O : Creating slow poller for cluter %p\n", &this); 221 this.io.poller.slow.stack = __create_pthread( &this.io.poller.slow.kthrd, __io_poller_slow, &this ); 222 } 223 224 void __kernel_io_prepare_stop( cluster & this ) { 225 __cfadbg_print_safe(io, "Kernel I/O : Stopping pollers for cluster\n", &this); 163 this.io.stack = __create_pthread( &this.io.poller, __io_poller, &this ); 164 } 165 166 void __kernel_io_shutdown( cluster & this ) { 167 // Stop the IO Poller 226 168 // Notify the poller thread of the shutdown 227 169 __atomic_store_n(&this.io.done, true, __ATOMIC_SEQ_CST); 228 229 // Stop the IO Poller230 170 sigval val = { 1 }; 231 pthread_sigqueue( this.io.poller.slow.kthrd, SIGUSR1, val ); 232 #if defined(__CFA_IO_POLLING_USER__) 233 post( this.io.poller.sem ); 234 #endif 171 pthread_sigqueue( this.io.poller, SIGUSR1, val ); 235 172 236 173 // Wait for the poller thread to finish 237 pthread_join( this.io.poller.slow.kthrd, 0p ); 238 free( this.io.poller.slow.stack ); 239 240 __cfadbg_print_safe(io, "Kernel I/O : Slow poller stopped for cluster\n", &this); 241 242 #if defined(__CFA_IO_POLLING_USER__) 243 // unpark the fast io_poller 244 unpark( &this.io.poller.fast.thrd __cfaabi_dbg_ctx2 ); 245 246 ^(this.io.poller.fast){}; 247 248 __cfadbg_print_safe(io, "Kernel I/O : Fast poller stopped for cluster\n", &this); 249 #endif 250 } 251 252 void __kernel_io_shutdown( cluster & this, bool main_cluster ) { 253 if(!main_cluster) { 254 __kernel_io_prepare_stop( this ); 255 } 256 257 // print statistics 258 #if !defined(__CFA_NO_STATISTICS__) 259 if(this.print_stats) { 260 __cfaabi_bits_print_safe( STDERR_FILENO, 261 "----- I/O uRing Stats -----\n" 262 "- total submit calls : %llu\n" 263 "- avg submit : %lf\n" 264 "- total wait calls : %llu\n" 265 "- avg completion/wait : %lf\n", 266 this.io.submit_q.stats.submit_avg.cnt, 267 ((double)this.io.submit_q.stats.submit_avg.val) / this.io.submit_q.stats.submit_avg.cnt, 268 this.io.completion_q.stats.completed_avg.cnt, 269 ((double)this.io.completion_q.stats.completed_avg.val) / this.io.completion_q.stats.completed_avg.cnt 270 ); 271 } 272 #endif 174 pthread_join( this.io.poller, 0p ); 175 free( this.io.stack ); 273 176 274 177 // Shutdown the io rings … … 301 204 // Process a single completion message from the io_uring 302 205 // This is NOT thread-safe 303 static int __drain_io( struct io_ring & ring, sigset_t * mask, int waitcnt, bool in_kernel ) { 304 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 305 if( ret < 0 ) { 306 switch((int)errno) { 307 case EAGAIN: 308 case EINTR: 309 return -EAGAIN; 310 default: 311 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 312 } 313 } 314 315 // Drain the queue 206 static bool __io_process(struct io_ring & ring) { 316 207 unsigned head = *ring.completion_q.head; 317 208 unsigned tail = __atomic_load_n(ring.completion_q.tail, __ATOMIC_ACQUIRE); 318 209 319 // Nothing was new return 0 320 if (head == tail) { 321 #if !defined(__CFA_NO_STATISTICS__) 322 ring.completion_q.stats.completed_avg.cnt += 1; 323 #endif 324 return 0; 325 } 326 327 uint32_t count = tail - head; 328 for(i; count) { 329 unsigned idx = (head + i) & (*ring.completion_q.mask); 330 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; 331 332 /* paranoid */ verify(&cqe); 333 334 struct io_user_data * data = (struct io_user_data *)cqe.user_data; 335 __cfadbg_print_safe( io, "Kernel I/O : Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd ); 336 337 data->result = cqe.res; 338 if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); } 339 else { __unpark( data->thrd __cfaabi_dbg_ctx2 ); } 340 } 210 if (head == tail) return false; 211 212 unsigned idx = head & (*ring.completion_q.mask); 213 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; 214 215 /* paranoid */ verify(&cqe); 216 217 struct io_user_data * data = (struct io_user_data *)cqe.user_data; 218 // __cfaabi_bits_print_safe( STDERR_FILENO, "Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd ); 219 220 data->result = cqe.res; 221 __unpark( data->thrd __cfaabi_dbg_ctx2 ); 341 222 342 223 // Allow new submissions to happen 343 V(ring.submit , count);224 V(ring.submit); 344 225 345 226 // Mark to the kernel that the cqe has been seen 346 227 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 347 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED ); 348 349 // Update statistics 350 #if !defined(__CFA_NO_STATISTICS__) 351 ring.completion_q.stats.completed_avg.val += count; 352 ring.completion_q.stats.completed_avg.cnt += 1; 353 #endif 354 355 return count; 356 } 357 358 static void * __io_poller_slow( void * arg ) { 228 __atomic_fetch_add( ring.completion_q.head, 1, __ATOMIC_RELAXED ); 229 230 return true; 231 } 232 233 static void * __io_poller( void * arg ) { 359 234 cluster * cltr = (cluster *)arg; 360 235 struct io_ring & ring = cltr->io; … … 371 246 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) ); 372 247 373 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 374 #if defined(__CFA_IO_POLLING_USER__) 375 376 // In the user-thread approach drain and if anything was drained, 377 // batton pass to the user-thread 378 int count = __drain_io( ring, &mask, 1, true ); 379 if(count > 0) { 380 __cfadbg_print_safe(io, "Kernel I/O : Moving to ring %p to fast poller\n", &ring); 381 __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 ); 382 wait( ring.poller.sem ); 383 } 384 385 #else 386 387 //In the naive approach, just poll the io completion queue directly 388 __drain_io( ring, &mask, 1, true ); 389 390 #endif 391 } 392 393 return 0p; 394 } 395 396 #if defined(__CFA_IO_POLLING_USER__) 397 void main( __io_poller_fast & this ) { 398 // Start parked 399 park( __cfaabi_dbg_ctx ); 400 401 // Then loop until we need to start 402 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) { 403 // Drain the io 404 if(0 > __drain_io( *this.ring, 0p, 0, false )) { 405 // If we got something, just yield and check again 406 yield(); 407 } 408 else { 409 // We didn't get anything baton pass to the slow poller 410 __cfadbg_print_safe(io, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring); 411 post( this.ring->poller.sem ); 412 park( __cfaabi_dbg_ctx ); 248 LOOP: while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 249 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, 1, IORING_ENTER_GETEVENTS, &mask, _NSIG / 8); 250 if( ret < 0 ) { 251 switch((int)errno) { 252 case EAGAIN: 253 case EINTR: 254 continue LOOP; 255 default: 256 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 413 257 } 414 258 } 259 260 // Drain the queue 261 while(__io_process(ring)) {} 415 262 } 416 #endif 263 264 return 0p; 265 } 417 266 418 267 //============================================================================================= … … 444 293 // 445 294 446 static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct io_ring & ring ) {447 // Wait for a spot to be available448 P(ring.submit);449 450 // Allocate the sqe451 uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST);452 453 // Validate that we didn't overflow anything454 // Check that nothing overflowed455 /* paranoid */ verify( true );456 457 // Check that it goes head -> tail -> alloc and never head -> alloc -> tail458 /* paranoid */ verify( true );459 460 // Return the sqe461 return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx];462 }463 464 static inline void __submit( struct io_ring & ring, uint32_t idx ) {465 // get mutual exclusion466 lock(ring.submit_q.lock __cfaabi_dbg_ctx2);467 468 // Append to the list of ready entries469 uint32_t * tail = ring.submit_q.tail;470 const uint32_t mask = *ring.submit_q.mask;471 472 ring.submit_q.array[ (*tail) & mask ] = idx & mask;473 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);474 475 // Submit however, many entries need to be submitted476 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);477 if( ret < 0 ) {478 switch((int)errno) {479 default:480 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );481 }295 static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct io_ring & ring ) { 296 // Wait for a spot to be available 297 P(ring.submit); 298 299 // Allocate the sqe 300 uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST); 301 302 // Validate that we didn't overflow anything 303 // Check that nothing overflowed 304 /* paranoid */ verify( true ); 305 306 // Check that it goes head -> tail -> alloc and never head -> alloc -> tail 307 /* paranoid */ verify( true ); 308 309 // Return the sqe 310 return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx]; 311 } 312 313 static inline void __submit( struct io_ring & ring, uint32_t idx ) { 314 // get mutual exclusion 315 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 316 317 // Append to the list of ready entries 318 uint32_t * tail = ring.submit_q.tail; 319 const uint32_t mask = *ring.submit_q.mask; 320 321 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 322 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 323 324 // Submit however, many entries need to be submitted 325 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0); 326 // __cfaabi_bits_print_safe( STDERR_FILENO, "Performed io_submit, returned %d\n", ret ); 327 if( ret < 0 ) { 328 switch((int)errno) { 329 default: 330 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 482 331 } 483 484 // update statistics 485 #if !defined(__CFA_NO_STATISTICS__) 486 ring.submit_q.stats.submit_avg.val += 1; 487 ring.submit_q.stats.submit_avg.cnt += 1; 488 #endif 489 490 unlock(ring.submit_q.lock); 491 // Make sure that idx was submitted 492 // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us 493 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret ); 494 } 495 496 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) { 497 this.opcode = opcode; 498 #if !defined(IOSQE_ASYNC) 499 this.flags = 0; 500 #else 501 this.flags = IOSQE_ASYNC; 502 #endif 503 this.ioprio = 0; 504 this.fd = fd; 505 this.off = 0; 506 this.addr = 0; 507 this.len = 0; 508 this.rw_flags = 0; 509 this.__pad2[0] = this.__pad2[1] = this.__pad2[2] = 0; 510 } 511 512 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd, void * addr, uint32_t len, uint64_t off ) { 513 (this){ opcode, fd }; 514 this.off = off; 515 this.addr = (uint64_t)addr; 516 this.len = len; 517 } 518 332 } 333 334 unlock(ring.submit_q.lock); 335 // Make sure that idx was submitted 336 // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us 337 } 338 339 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) { 340 this.opcode = opcode; 341 #if !defined(IOSQE_ASYNC) 342 this.flags = 0; 343 #else 344 this.flags = IOSQE_ASYNC; 345 #endif 346 this.ioprio = 0; 347 this.fd = fd; 348 this.off = 0; 349 this.addr = 0; 350 this.len = 0; 351 this.rw_flags = 0; 352 this.__pad2[0] = this.__pad2[1] = this.__pad2[2] = 0; 353 } 354 355 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd, void * addr, uint32_t len, uint64_t off ) { 356 (this){ opcode, fd }; 357 this.off = off; 358 this.addr = (uint64_t)addr; 359 this.len = len; 360 } 519 361 520 362 //============================================================================================= 521 363 // I/O Interface 522 364 //============================================================================================= 365 extern "C" { 366 #define __USE_GNU 367 #define _GNU_SOURCE 368 #include <fcntl.h> 369 #include <sys/uio.h> 370 #include <sys/socket.h> 371 #include <sys/stat.h> 372 } 523 373 524 374 #define __submit_prelude \ … … 535 385 park( __cfaabi_dbg_ctx ); \ 536 386 return data.result; 537 #endif538 539 // Some forward declarations540 extern "C" {541 #include <sys/types.h>542 struct iovec;543 extern ssize_t preadv2 (int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);544 extern ssize_t pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);545 546 extern int fsync(int fd);547 extern int sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags);548 549 struct msghdr;550 struct sockaddr;551 extern ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);552 extern ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);553 extern ssize_t send(int sockfd, const void *buf, size_t len, int flags);554 extern ssize_t recv(int sockfd, void *buf, size_t len, int flags);555 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);556 extern int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);557 558 extern int fallocate(int fd, int mode, uint64_t offset, uint64_t len);559 extern int posix_fadvise(int fd, uint64_t offset, uint64_t len, int advice);560 extern int madvise(void *addr, size_t length, int advice);561 562 extern int openat(int dirfd, const char *pathname, int flags, mode_t mode);563 extern int close(int fd);564 565 struct statx;566 extern int statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf);567 568 extern ssize_t read (int fd, void *buf, size_t count);569 }570 387 571 388 //----------------------------------------------------------------------------- 572 389 // Asynchronous operations 573 ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 574 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READV) 575 return preadv2(fd, iov, iovcnt, offset, flags); 576 #else 577 __submit_prelude 578 579 (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset }; 580 581 __submit_wait 582 #endif 583 } 584 585 ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 586 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITEV) 587 return pwritev2(fd, iov, iovcnt, offset, flags); 588 #else 589 __submit_prelude 590 591 (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset }; 592 593 __submit_wait 594 #endif 595 } 596 597 int cfa_fsync(int fd) { 598 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FSYNC) 599 return fsync(fd); 600 #else 601 __submit_prelude 602 603 (*sqe){ IORING_OP_FSYNC, fd }; 604 605 __submit_wait 606 #endif 607 } 608 609 int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags) { 610 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SYNC_FILE_RANGE) 611 return sync_file_range(fd, offset, nbytes, flags); 612 #else 613 __submit_prelude 614 615 (*sqe){ IORING_OP_SYNC_FILE_RANGE, fd }; 616 sqe->off = offset; 617 sqe->len = nbytes; 618 sqe->sync_range_flags = flags; 619 620 __submit_wait 621 #endif 622 } 623 624 625 ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags) { 626 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SENDMSG) 627 return recv(sockfd, msg, flags); 628 #else 629 __submit_prelude 630 631 (*sqe){ IORING_OP_SENDMSG, sockfd, msg, 1, 0 }; 632 sqe->msg_flags = flags; 633 634 __submit_wait 635 #endif 636 } 637 638 ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags) { 639 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECVMSG) 640 return recv(sockfd, msg, flags); 641 #else 642 __submit_prelude 643 644 (*sqe){ IORING_OP_RECVMSG, sockfd, msg, 1, 0 }; 645 sqe->msg_flags = flags; 646 647 __submit_wait 648 #endif 649 } 650 651 ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags) { 652 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SEND) 653 return send( sockfd, buf, len, flags ); 654 #else 655 __submit_prelude 656 657 (*sqe){ IORING_OP_SEND, sockfd }; 658 sqe->addr = (uint64_t)buf; 659 sqe->len = len; 660 sqe->msg_flags = flags; 661 662 __submit_wait 663 #endif 664 } 665 666 ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags) { 667 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECV) 668 return recv( sockfd, buf, len, flags ); 669 #else 670 __submit_prelude 671 672 (*sqe){ IORING_OP_RECV, sockfd }; 673 sqe->addr = (uint64_t)buf; 674 sqe->len = len; 675 sqe->msg_flags = flags; 676 677 __submit_wait 678 #endif 679 } 680 681 int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) { 682 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_ACCEPT) 683 return accept4( sockfd, addr, addrlen, flags ); 684 #else 685 __submit_prelude 686 687 (*sqe){ IORING_OP_ACCEPT, sockfd }; 688 sqe->addr = addr; 689 sqe->addr2 = addrlen; 690 sqe->accept_flags = flags; 691 692 __submit_wait 693 #endif 694 } 695 696 int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) { 697 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CONNECT) 698 return connect( sockfd, addr, addrlen ); 699 #else 700 __submit_prelude 701 702 (*sqe){ IORING_OP_CONNECT, sockfd }; 703 sqe->addr = (uint64_t)addr; 704 sqe->off = addrlen; 705 706 __submit_wait 707 #endif 708 } 709 710 int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len) { 711 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FALLOCATE) 712 return fallocate( fd, mode, offset, len ); 713 #else 714 __submit_prelude 715 716 (*sqe){ IORING_OP_FALLOCATE, fd }; 717 sqe->off = offset; 718 sqe->len = length; 719 sqe->mode = mode; 720 721 __submit_wait 722 #endif 723 } 724 725 int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice) { 726 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FADVISE) 727 return posix_fadvise( fd, offset, len, advice ); 728 #else 729 __submit_prelude 730 731 (*sqe){ IORING_OP_FADVISE, fd }; 732 sqe->off = (uint64_t)offset; 733 sqe->len = length; 734 sqe->fadvise_advice = advice; 735 736 __submit_wait 737 #endif 738 } 739 740 int cfa_madvise(void *addr, size_t length, int advice) { 741 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_MADVISE) 742 return madvise( addr, length, advice ); 743 #else 744 __submit_prelude 745 746 (*sqe){ IORING_OP_MADVISE, 0 }; 747 sqe->addr = (uint64_t)addr; 748 sqe->len = length; 749 sqe->fadvise_advice = advice; 750 751 __submit_wait 752 #endif 753 } 754 755 int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode) { 756 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_OPENAT) 757 return openat( dirfd, pathname, flags, mode ); 758 #else 759 __submit_prelude 760 761 (*sqe){ IORING_OP_OPENAT, dirfd }; 762 sqe->addr = (uint64_t)pathname; 763 sqe->open_flags = flags; 764 sqe->mode = mode; 765 766 __submit_wait 767 #endif 768 } 769 770 int cfa_close(int fd) { 771 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CLOSE) 772 return close( fd ); 773 #else 774 __submit_prelude 775 776 (*sqe){ IORING_OP_CLOSE, fd }; 777 778 __submit_wait 779 #endif 780 } 781 782 int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf) { 783 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_STATX) 784 //return statx( dirfd, pathname, flags, mask, statxbuf ); 785 return syscall( __NR_io_uring_setup, dirfd, pathname, flags, mask, statxbuf ); 786 #else 787 __submit_prelude 788 789 (*sqe){ IORING_OP_STATX, dirfd }; 790 sqe->addr = (uint64_t)pathname; 791 sqe->statx_flags = flags; 792 sqe->len = mask; 793 sqe->off = (uint64_t)statxbuf; 794 795 __submit_wait 796 #endif 797 } 798 799 800 ssize_t cfa_read(int fd, void *buf, size_t count) { 801 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READ) 802 return read( fd, buf, count ); 803 #else 804 __submit_prelude 805 806 (*sqe){ IORING_OP_READ, fd, buf, count, 0 }; 807 808 __submit_wait 809 #endif 810 } 811 812 ssize_t cfa_write(int fd, void *buf, size_t count) { 813 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITE) 814 return read( fd, buf, count ); 815 #else 816 __submit_prelude 817 818 (*sqe){ IORING_OP_WRITE, fd, buf, count, 0 }; 819 820 __submit_wait 821 #endif 822 } 390 ssize_t async_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 391 #if !defined(IORING_OP_READV) 392 return preadv2(fd, iov, iovcnt, offset, flags); 393 #else 394 __submit_prelude 395 396 (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset }; 397 398 __submit_wait 399 #endif 400 } 401 402 ssize_t async_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 403 #if !defined(IORING_OP_WRITEV) 404 return pwritev2(fd, iov, iovcnt, offset, flags); 405 #else 406 __submit_prelude 407 408 (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset }; 409 410 __submit_wait 411 #endif 412 } 413 414 int async_fsync(int fd) { 415 #if !defined(IORING_OP_FSYNC) 416 return fsync(fd); 417 #else 418 __submit_prelude 419 420 (*sqe){ IORING_OP_FSYNC, fd }; 421 422 __submit_wait 423 #endif 424 } 425 426 int async_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags) { 427 #if !defined(IORING_OP_SYNC_FILE_RANGE) 428 return sync_file_range(fd, offset, nbytes, flags); 429 #else 430 __submit_prelude 431 432 (*sqe){ IORING_OP_SYNC_FILE_RANGE, fd }; 433 sqe->off = offset; 434 sqe->len = nbytes; 435 sqe->sync_range_flags = flags; 436 437 __submit_wait 438 #endif 439 } 440 441 442 ssize_t async_sendmsg(int sockfd, const struct msghdr *msg, int flags) { 443 #if !defined(IORING_OP_SENDMSG) 444 return recv(sockfd, msg, flags); 445 #else 446 __submit_prelude 447 448 (*sqe){ IORING_OP_SENDMSG, sockfd, msg, 1, 0 }; 449 sqe->msg_flags = flags; 450 451 __submit_wait 452 #endif 453 } 454 455 ssize_t async_recvmsg(int sockfd, struct msghdr *msg, int flags) { 456 #if !defined(IORING_OP_RECVMSG) 457 return recv(sockfd, msg, flags); 458 #else 459 __submit_prelude 460 461 (*sqe){ IORING_OP_RECVMSG, sockfd, msg, 1, 0 }; 462 sqe->msg_flags = flags; 463 464 __submit_wait 465 #endif 466 } 467 468 ssize_t async_send(int sockfd, const void *buf, size_t len, int flags) { 469 #if !defined(IORING_OP_SEND) 470 return send( sockfd, buf, len, flags ); 471 #else 472 __submit_prelude 473 474 (*sqe){ IORING_OP_SEND, sockfd }; 475 sqe->addr = (uint64_t)buf; 476 sqe->len = len; 477 sqe->msg_flags = flags; 478 479 __submit_wait 480 #endif 481 } 482 483 ssize_t async_recv(int sockfd, void *buf, size_t len, int flags) { 484 #if !defined(IORING_OP_RECV) 485 return recv( sockfd, buf, len, flags ); 486 #else 487 __submit_prelude 488 489 (*sqe){ IORING_OP_RECV, sockfd }; 490 sqe->addr = (uint64_t)buf; 491 sqe->len = len; 492 sqe->msg_flags = flags; 493 494 __submit_wait 495 #endif 496 } 497 498 int async_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) { 499 #if !defined(IORING_OP_ACCEPT) 500 __SOCKADDR_ARG _addr; 501 _addr.__sockaddr__ = addr; 502 return accept4( sockfd, _addr, addrlen, flags ); 503 #else 504 __submit_prelude 505 506 (*sqe){ IORING_OP_ACCEPT, sockfd }; 507 sqe->addr = addr; 508 sqe->addr2 = addrlen; 509 sqe->accept_flags = flags; 510 511 __submit_wait 512 #endif 513 } 514 515 int async_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) { 516 #if !defined(IORING_OP_CONNECT) 517 __CONST_SOCKADDR_ARG _addr; 518 _addr.__sockaddr__ = addr; 519 return connect( sockfd, _addr, addrlen ); 520 #else 521 __submit_prelude 522 523 (*sqe){ IORING_OP_CONNECT, sockfd }; 524 sqe->addr = (uint64_t)addr; 525 sqe->off = addrlen; 526 527 __submit_wait 528 #endif 529 } 530 531 int async_fallocate(int fd, int mode, uint64_t offset, uint64_t len) { 532 #if !defined(IORING_OP_FALLOCATE) 533 return fallocate( fd, mode, offset, len ); 534 #else 535 __submit_prelude 536 537 (*sqe){ IORING_OP_FALLOCATE, fd }; 538 sqe->off = offset; 539 sqe->len = length; 540 sqe->mode = mode; 541 542 __submit_wait 543 #endif 544 } 545 546 int async_fadvise(int fd, uint64_t offset, uint64_t len, int advice) { 547 #if !defined(IORING_OP_FADVISE) 548 return posix_fadvise( fd, offset, len, advice ); 549 #else 550 __submit_prelude 551 552 (*sqe){ IORING_OP_FADVISE, fd }; 553 sqe->off = (uint64_t)offset; 554 sqe->len = length; 555 sqe->fadvise_advice = advice; 556 557 __submit_wait 558 #endif 559 } 560 561 int async_madvise(void *addr, size_t length, int advice) { 562 #if !defined(IORING_OP_MADVISE) 563 return madvise( addr, length, advice ); 564 #else 565 __submit_prelude 566 567 (*sqe){ IORING_OP_MADVISE, 0 }; 568 sqe->addr = (uint64_t)addr; 569 sqe->len = length; 570 sqe->fadvise_advice = advice; 571 572 __submit_wait 573 #endif 574 } 575 576 int async_openat(int dirfd, const char *pathname, int flags, mode_t mode) { 577 #if !defined(IORING_OP_OPENAT) 578 return openat( dirfd, pathname, flags, mode ); 579 #else 580 __submit_prelude 581 582 (*sqe){ IORING_OP_OPENAT, dirfd }; 583 sqe->addr = (uint64_t)pathname; 584 sqe->open_flags = flags; 585 sqe->mode = mode; 586 587 __submit_wait 588 #endif 589 } 590 591 int async_close(int fd) { 592 #if !defined(IORING_OP_CLOSE) 593 return close( fd ); 594 #else 595 __submit_prelude 596 597 (*sqe){ IORING_OP_CLOSE, fd }; 598 599 __submit_wait 600 #endif 601 } 602 603 int async_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf) { 604 #if !defined(IORING_OP_STATX) 605 //return statx( dirfd, pathname, flags, mask, statxbuf ); 606 return syscall( __NR_io_uring_setup, dirfd, pathname, flags, mask, statxbuf ); 607 #else 608 __submit_prelude 609 610 (*sqe){ IORING_OP_STATX, dirfd }; 611 sqe->addr = (uint64_t)pathname; 612 sqe->statx_flags = flags; 613 sqe->len = mask; 614 sqe->off = (uint64_t)statxbuf; 615 616 __submit_wait 617 #endif 618 } 619 620 621 ssize_t async_read(int fd, void *buf, size_t count) { 622 #if !defined(IORING_OP_READ) 623 return read( fd, buf, count ); 624 #else 625 __submit_prelude 626 627 (*sqe){ IORING_OP_READ, fd, buf, count, 0 }; 628 629 __submit_wait 630 #endif 631 } 632 633 ssize_t async_write(int fd, void *buf, size_t count) { 634 #if !defined(IORING_OP_WRITE) 635 return read( fd, buf, count ); 636 #else 637 __submit_prelude 638 639 (*sqe){ IORING_OP_WRITE, fd, buf, count, 0 }; 640 641 __submit_wait 642 #endif 643 } 823 644 824 645 //----------------------------------------------------------------------------- … … 826 647 827 648 // Macro magic to reduce the size of the following switch case 828 #define IS_DEFINED_APPLY(f, ...) f(__VA_ARGS__)829 #define IS_DEFINED_SECOND(first, second, ...) second830 #define IS_DEFINED_TEST(expansion) _CFA_IO_FEATURE_##expansion831 #define IS_DEFINED(macro) IS_DEFINED_APPLY( IS_DEFINED_SECOND,IS_DEFINED_TEST(macro) false, true)832 833 bool has_user_level_blocking( fptr_t func ) {834 #if defined(HAVE_LINUX_IO_URING_H) 649 #define IS_DEFINED_APPLY(f, ...) f(__VA_ARGS__) 650 #define IS_DEFINED_SECOND(first, second, ...) second 651 #define IS_DEFINED_TEST(expansion) _CFA_IO_FEATURE_##expansion 652 #define IS_DEFINED(macro) IS_DEFINED_APPLY( IS_DEFINED_SECOND,IS_DEFINED_TEST(macro) false, true) 653 654 bool is_async( fptr_t func ) { 655 835 656 if( /*func == (fptr_t)preadv2 || */ 836 func == (fptr_t) cfa_preadv2 )657 func == (fptr_t)async_preadv2 ) 837 658 #define _CFA_IO_FEATURE_IORING_OP_READV , 838 659 return IS_DEFINED(IORING_OP_READV); 839 660 840 661 if( /*func == (fptr_t)pwritev2 || */ 841 func == (fptr_t)cfa_pwritev2 )662 func == (fptr_t)async_pwritev2 ) 842 663 #define _CFA_IO_FEATURE_IORING_OP_WRITEV , 843 664 return IS_DEFINED(IORING_OP_WRITEV); 844 665 845 666 if( /*func == (fptr_t)fsync || */ 846 func == (fptr_t)cfa_fsync )667 func == (fptr_t)async_fsync ) 847 668 #define _CFA_IO_FEATURE_IORING_OP_FSYNC , 848 669 return IS_DEFINED(IORING_OP_FSYNC); 849 670 850 671 if( /*func == (fptr_t)ync_file_range || */ 851 func == (fptr_t)cfa_sync_file_range )672 func == (fptr_t)async_sync_file_range ) 852 673 #define _CFA_IO_FEATURE_IORING_OP_SYNC_FILE_RANGE , 853 674 return IS_DEFINED(IORING_OP_SYNC_FILE_RANGE); 854 675 855 676 if( /*func == (fptr_t)sendmsg || */ 856 func == (fptr_t)cfa_sendmsg )677 func == (fptr_t)async_sendmsg ) 857 678 #define _CFA_IO_FEATURE_IORING_OP_SENDMSG , 858 679 return IS_DEFINED(IORING_OP_SENDMSG); 859 680 860 681 if( /*func == (fptr_t)recvmsg || */ 861 func == (fptr_t)cfa_recvmsg )682 func == (fptr_t)async_recvmsg ) 862 683 #define _CFA_IO_FEATURE_IORING_OP_RECVMSG , 863 684 return IS_DEFINED(IORING_OP_RECVMSG); 864 685 865 686 if( /*func == (fptr_t)send || */ 866 func == (fptr_t) cfa_send )687 func == (fptr_t)async_send ) 867 688 #define _CFA_IO_FEATURE_IORING_OP_SEND , 868 689 return IS_DEFINED(IORING_OP_SEND); 869 690 870 691 if( /*func == (fptr_t)recv || */ 871 func == (fptr_t) cfa_recv )692 func == (fptr_t)async_recv ) 872 693 #define _CFA_IO_FEATURE_IORING_OP_RECV , 873 694 return IS_DEFINED(IORING_OP_RECV); 874 695 875 696 if( /*func == (fptr_t)accept4 || */ 876 func == (fptr_t) cfa_accept4 )697 func == (fptr_t)async_accept4 ) 877 698 #define _CFA_IO_FEATURE_IORING_OP_ACCEPT , 878 699 return IS_DEFINED(IORING_OP_ACCEPT); 879 700 880 701 if( /*func == (fptr_t)connect || */ 881 func == (fptr_t) cfa_connect )702 func == (fptr_t)async_connect ) 882 703 #define _CFA_IO_FEATURE_IORING_OP_CONNECT , 883 704 return IS_DEFINED(IORING_OP_CONNECT); 884 705 885 706 if( /*func == (fptr_t)fallocate || */ 886 func == (fptr_t) cfa_fallocate )707 func == (fptr_t)async_fallocate ) 887 708 #define _CFA_IO_FEATURE_IORING_OP_FALLOCATE , 888 709 return IS_DEFINED(IORING_OP_FALLOCATE); 889 710 890 if( /*func == (fptr_t) posix_fadvise || */891 func == (fptr_t) cfa_fadvise )711 if( /*func == (fptr_t)fadvise || */ 712 func == (fptr_t)async_fadvise ) 892 713 #define _CFA_IO_FEATURE_IORING_OP_FADVISE , 893 714 return IS_DEFINED(IORING_OP_FADVISE); 894 715 895 716 if( /*func == (fptr_t)madvise || */ 896 func == (fptr_t) cfa_madvise )717 func == (fptr_t)async_madvise ) 897 718 #define _CFA_IO_FEATURE_IORING_OP_MADVISE , 898 719 return IS_DEFINED(IORING_OP_MADVISE); 899 720 900 721 if( /*func == (fptr_t)openat || */ 901 func == (fptr_t) cfa_openat )722 func == (fptr_t)async_openat ) 902 723 #define _CFA_IO_FEATURE_IORING_OP_OPENAT , 903 724 return IS_DEFINED(IORING_OP_OPENAT); 904 725 905 726 if( /*func == (fptr_t)close || */ 906 func == (fptr_t) cfa_close )727 func == (fptr_t)async_close ) 907 728 #define _CFA_IO_FEATURE_IORING_OP_CLOSE , 908 729 return IS_DEFINED(IORING_OP_CLOSE); 909 730 910 731 if( /*func == (fptr_t)statx || */ 911 func == (fptr_t) cfa_statx )732 func == (fptr_t)async_statx ) 912 733 #define _CFA_IO_FEATURE_IORING_OP_STATX , 913 734 return IS_DEFINED(IORING_OP_STATX); 914 735 915 736 if( /*func == (fptr_t)read || */ 916 func == (fptr_t)cfa_read )737 func == (fptr_t)async_read ) 917 738 #define _CFA_IO_FEATURE_IORING_OP_READ , 918 739 return IS_DEFINED(IORING_OP_READ); 919 740 920 741 if( /*func == (fptr_t)write || */ 921 func == (fptr_t)cfa_write )742 func == (fptr_t)async_write ) 922 743 #define _CFA_IO_FEATURE_IORING_OP_WRITE , 923 744 return IS_DEFINED(IORING_OP_WRITE); 924 #endif 925 926 return false; 927 } 745 746 return false; 747 } 748 749 #endif
Note:
See TracChangeset
for help on using the changeset viewer.