Changeset 22f94a4 for libcfa/src/concurrency/io.cfa
- Timestamp:
- Aug 11, 2020, 4:40:15 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 0d070ca
- Parents:
- 07d867b (diff), 129674b (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - File:
-
- 1 edited
-
libcfa/src/concurrency/io.cfa (modified) (5 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r07d867b r22f94a4 14 14 // 15 15 16 // #define __CFA_DEBUG_PRINT_IO__ 17 // #define __CFA_DEBUG_PRINT_IO_CORE__ 18 19 #include "kernel.hfa" 20 21 #if !defined(HAVE_LINUX_IO_URING_H) 22 void __kernel_io_startup( cluster &, int, bool ) { 23 // Nothing to do without io_uring 24 } 25 26 void __kernel_io_finish_start( cluster & ) { 27 // Nothing to do without io_uring 28 } 29 30 void __kernel_io_prepare_stop( cluster & ) { 31 // Nothing to do without io_uring 32 } 33 34 void __kernel_io_shutdown( cluster &, bool ) { 35 // Nothing to do without io_uring 36 } 37 38 #else 16 #define __cforall_thread__ 17 18 #if defined(__CFA_DEBUG__) 19 // #define __CFA_DEBUG_PRINT_IO__ 20 // #define __CFA_DEBUG_PRINT_IO_CORE__ 21 #endif 22 23 24 #if defined(CFA_HAVE_LINUX_IO_URING_H) 25 #define _GNU_SOURCE /* See feature_test_macros(7) */ 26 #include <errno.h> 27 #include <signal.h> 28 #include <stdint.h> 29 #include <string.h> 30 #include <unistd.h> 31 39 32 extern "C" { 40 #define _GNU_SOURCE /* See feature_test_macros(7) */ 41 #include <errno.h> 42 #include <stdint.h> 43 #include <string.h> 44 #include <unistd.h> 45 #include <sys/mman.h> 33 #include <sys/epoll.h> 46 34 #include <sys/syscall.h> 47 35 … … 49 37 } 50 38 51 #include "bits/signal.hfa" 52 #include "kernel_private.hfa" 53 #include "thread.hfa" 54 55 uint32_t entries_per_cluster() { 56 return 256; 57 } 58 59 static void * __io_poller_slow( void * arg ); 60 61 // Weirdly, some systems that do support io_uring don't actually define these 62 #ifdef __alpha__ 63 /* 64 * alpha is the only exception, all other architectures 65 * have common numbers for new system calls. 66 */ 67 #ifndef __NR_io_uring_setup 68 #define __NR_io_uring_setup 535 69 #endif 70 #ifndef __NR_io_uring_enter 71 #define __NR_io_uring_enter 536 72 #endif 73 #ifndef __NR_io_uring_register 74 #define __NR_io_uring_register 537 75 #endif 76 #else /* !__alpha__ */ 77 #ifndef __NR_io_uring_setup 78 #define __NR_io_uring_setup 425 79 #endif 80 #ifndef __NR_io_uring_enter 81 #define __NR_io_uring_enter 426 82 #endif 83 #ifndef __NR_io_uring_register 84 #define __NR_io_uring_register 427 85 #endif 86 #endif 87 88 // Fast poller user-thread 89 // Not using the "thread" keyword because we want to control 90 // more carefully when to start/stop it 91 struct __io_poller_fast { 92 struct __io_data * ring; 93 bool waiting; 94 $thread thrd; 95 }; 96 97 void ?{}( __io_poller_fast & this, struct cluster & cltr ) { 98 this.ring = cltr.io; 99 this.waiting = true; 100 (this.thrd){ "Fast I/O Poller", cltr }; 101 } 102 void ^?{}( __io_poller_fast & mutex this ); 103 void main( __io_poller_fast & this ); 104 static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; } 105 void ^?{}( __io_poller_fast & mutex this ) {} 106 107 struct __submition_data { 108 // Head and tail of the ring (associated with array) 109 volatile uint32_t * head; 110 volatile uint32_t * tail; 111 112 // The actual kernel ring which uses head/tail 113 // indexes into the sqes arrays 114 uint32_t * array; 115 116 // number of entries and mask to go with it 117 const uint32_t * num; 118 const uint32_t * mask; 119 120 // Submission flags (Not sure what for) 121 uint32_t * flags; 122 123 // number of sqes not submitted (whatever that means) 124 uint32_t * dropped; 125 126 // Like head/tail but not seen by the kernel 127 volatile uint32_t alloc; 128 volatile uint32_t ready; 129 130 __spinlock_t lock; 131 132 // A buffer of sqes (not the actual ring) 133 struct io_uring_sqe * sqes; 134 135 // The location and size of the mmaped area 136 void * ring_ptr; 137 size_t ring_sz; 138 139 // Statistics 140 #if !defined(__CFA_NO_STATISTICS__) 141 struct { 142 struct { 143 volatile unsigned long long int val; 144 volatile unsigned long long int cnt; 145 volatile unsigned long long int block; 146 } submit_avg; 147 } stats; 148 #endif 149 }; 150 151 struct __completion_data { 152 // Head and tail of the ring 153 volatile uint32_t * head; 154 volatile uint32_t * tail; 155 156 // number of entries and mask to go with it 157 const uint32_t * mask; 158 const uint32_t * num; 159 160 // number of cqes not submitted (whatever that means) 161 uint32_t * overflow; 162 163 // the kernel ring 164 struct io_uring_cqe * cqes; 165 166 // The location and size of the mmaped area 167 void * ring_ptr; 168 size_t ring_sz; 169 170 // Statistics 171 #if !defined(__CFA_NO_STATISTICS__) 172 struct { 173 struct { 174 unsigned long long int val; 175 unsigned long long int slow_cnt; 176 unsigned long long int fast_cnt; 177 } completed_avg; 178 } stats; 179 #endif 180 }; 181 182 struct __io_data { 183 struct __submition_data submit_q; 184 struct __completion_data completion_q; 185 uint32_t ring_flags; 186 int cltr_flags; 187 int fd; 188 semaphore submit; 189 volatile bool done; 190 struct { 191 struct { 192 void * stack; 193 pthread_t kthrd; 194 } slow; 195 __io_poller_fast fast; 196 __bin_sem_t sem; 197 } poller; 198 }; 199 200 //============================================================================================= 201 // I/O Startup / Shutdown logic 202 //============================================================================================= 203 void __kernel_io_startup( cluster & this, int io_flags, bool main_cluster ) { 204 this.io = malloc(); 205 206 // Step 1 : call to setup 207 struct io_uring_params params; 208 memset(¶ms, 0, sizeof(params)); 209 210 uint32_t nentries = entries_per_cluster(); 211 212 int fd = syscall(__NR_io_uring_setup, nentries, ¶ms ); 213 if(fd < 0) { 214 abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno)); 215 } 216 217 // Step 2 : mmap result 218 memset( this.io, 0, sizeof(struct __io_data) ); 219 struct __submition_data & sq = this.io->submit_q; 220 struct __completion_data & cq = this.io->completion_q; 221 222 // calculate the right ring size 223 sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) ); 224 cq.ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe)); 225 226 // Requires features 227 #if defined(IORING_FEAT_SINGLE_MMAP) 228 // adjust the size according to the parameters 229 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 230 cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz); 231 } 232 #endif 233 234 // mmap the Submit Queue into existence 235 sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING); 236 if (sq.ring_ptr == (void*)MAP_FAILED) { 237 abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno)); 238 } 239 240 // Requires features 241 #if defined(IORING_FEAT_SINGLE_MMAP) 242 // mmap the Completion Queue into existence (may or may not be needed) 243 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 244 cq->ring_ptr = sq->ring_ptr; 245 } 246 else 247 #endif 248 { 249 // We need multiple call to MMAP 250 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING); 251 if (cq.ring_ptr == (void*)MAP_FAILED) { 252 munmap(sq.ring_ptr, sq.ring_sz); 253 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno)); 254 } 255 } 256 257 // mmap the submit queue entries 258 size_t size = params.sq_entries * sizeof(struct io_uring_sqe); 259 sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES); 260 if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) { 261 munmap(sq.ring_ptr, sq.ring_sz); 262 if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz); 263 abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno)); 264 } 265 266 // Get the pointers from the kernel to fill the structure 267 // submit queue 268 sq.head = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head); 269 sq.tail = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail); 270 sq.mask = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask); 271 sq.num = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries); 272 sq.flags = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags); 273 sq.dropped = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 274 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 275 sq.alloc = *sq.tail; 276 sq.ready = *sq.tail; 277 278 // completion queue 279 cq.head = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head); 280 cq.tail = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail); 281 cq.mask = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask); 282 cq.num = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries); 283 cq.overflow = ( uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow); 284 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes); 285 286 // some paranoid checks 287 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask ); 288 /* paranoid */ verifyf( (*cq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num ); 289 /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head ); 290 /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail ); 291 292 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask ); 293 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num ); 294 /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head ); 295 /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail ); 296 297 // Update the global ring info 298 this.io->ring_flags = params.flags; 299 this.io->cltr_flags = io_flags; 300 this.io->fd = fd; 301 this.io->done = false; 302 (this.io->submit){ min(*sq.num, *cq.num) }; 303 304 // Initialize statistics 305 #if !defined(__CFA_NO_STATISTICS__) 306 this.io->submit_q.stats.submit_avg.val = 0; 307 this.io->submit_q.stats.submit_avg.cnt = 0; 308 this.io->submit_q.stats.submit_avg.block = 0; 309 this.io->completion_q.stats.completed_avg.val = 0; 310 this.io->completion_q.stats.completed_avg.slow_cnt = 0; 311 this.io->completion_q.stats.completed_avg.fast_cnt = 0; 312 #endif 313 314 if(!main_cluster) { 315 __kernel_io_finish_start( this ); 316 } 317 } 318 319 void __kernel_io_finish_start( cluster & this ) { 320 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 321 __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this); 322 (this.io->poller.fast){ this }; 323 __thrd_start( this.io->poller.fast, main ); 324 } 325 326 // Create the poller thread 327 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this); 328 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this ); 329 } 330 331 void __kernel_io_prepare_stop( cluster & this ) { 332 __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this); 333 // Notify the poller thread of the shutdown 334 __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST); 335 336 // Stop the IO Poller 337 sigval val = { 1 }; 338 pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val ); 339 post( this.io->poller.sem ); 340 341 // Wait for the poller thread to finish 342 pthread_join( this.io->poller.slow.kthrd, 0p ); 343 free( this.io->poller.slow.stack ); 344 345 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this); 346 347 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 348 with( this.io->poller.fast ) { 349 /* paranoid */ verify( waiting ); // The thread shouldn't be in a system call 350 /* paranoid */ verify( this.procs.head == 0p || &this == mainCluster ); 351 /* paranoid */ verify( this.idles.head == 0p || &this == mainCluster ); 352 353 // We need to adjust the clean-up based on where the thread is 354 if( thrd.preempted != __NO_PREEMPTION ) { 355 356 // This is the tricky case 357 // The thread was preempted and now it is on the ready queue 358 /* paranoid */ verify( thrd.state == Active ); // The thread better be in this state 359 /* paranoid */ verify( thrd.next == 1p ); // The thread should be the last on the list 360 /* paranoid */ verify( this.ready_queue.head == &thrd ); // The thread should be the only thing on the list 361 362 // Remove the thread from the ready queue of this cluster 363 this.ready_queue.head = 1p; 364 thrd.next = 0p; 365 366 // Fixup the thread state 367 thrd.state = Blocked; 368 thrd.preempted = __NO_PREEMPTION; 369 370 // Pretend like the thread was blocked all along 371 } 372 // !!! This is not an else if !!! 373 if( thrd.state == Blocked ) { 374 375 // This is the "easy case" 376 // The thread is parked and can easily be moved to active cluster 377 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster ); 378 thrd.curr_cluster = active_cluster(); 379 380 // unpark the fast io_poller 381 unpark( &thrd __cfaabi_dbg_ctx2 ); 382 } 383 else { 384 385 // The thread is in a weird state 386 // I don't know what to do here 387 abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n"); 388 } 389 390 } 391 392 ^(this.io->poller.fast){}; 393 394 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this); 395 } 396 } 397 398 void __kernel_io_shutdown( cluster & this, bool main_cluster ) { 399 if(!main_cluster) { 400 __kernel_io_prepare_stop( this ); 401 } 402 403 // print statistics 404 #if !defined(__CFA_NO_STATISTICS__) 405 if(this.print_stats) { 406 with(this.io->submit_q.stats, this.io->completion_q.stats) { 407 __cfaabi_bits_print_safe( STDERR_FILENO, 408 "----- I/O uRing Stats -----\n" 409 "- total submit calls : %'15llu\n" 410 "- avg submit : %'18.2lf\n" 411 "- pre-submit block %% : %'18.2lf\n" 412 "- total wait calls : %'15llu (%'llu slow, %'llu fast)\n" 413 "- avg completion/wait : %'18.2lf\n", 414 submit_avg.cnt, 415 ((double)submit_avg.val) / submit_avg.cnt, 416 (100.0 * submit_avg.block) / submit_avg.cnt, 417 completed_avg.slow_cnt + completed_avg.fast_cnt, 418 completed_avg.slow_cnt, completed_avg.fast_cnt, 419 ((double)completed_avg.val) / (completed_avg.slow_cnt + completed_avg.fast_cnt) 420 ); 421 } 422 } 423 #endif 424 425 // Shutdown the io rings 426 struct __submition_data & sq = this.io->submit_q; 427 struct __completion_data & cq = this.io->completion_q; 428 429 // unmap the submit queue entries 430 munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe)); 431 432 // unmap the Submit Queue ring 433 munmap(sq.ring_ptr, sq.ring_sz); 434 435 // unmap the Completion Queue ring, if it is different 436 if (cq.ring_ptr != sq.ring_ptr) { 437 munmap(cq.ring_ptr, cq.ring_sz); 438 } 439 440 // close the file descriptor 441 close(this.io->fd); 442 443 free( this.io ); 39 #include "stats.hfa" 40 #include "kernel.hfa" 41 #include "kernel/fwd.hfa" 42 #include "io/types.hfa" 43 44 //============================================================================================= 45 // I/O Syscall 46 //============================================================================================= 47 static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) { 48 bool need_sys_to_submit = false; 49 bool need_sys_to_complete = false; 50 unsigned flags = 0; 51 52 TO_SUBMIT: 53 if( to_submit > 0 ) { 54 if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 55 need_sys_to_submit = true; 56 break TO_SUBMIT; 57 } 58 if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) { 59 need_sys_to_submit = true; 60 flags |= IORING_ENTER_SQ_WAKEUP; 61 } 62 } 63 64 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 65 flags |= IORING_ENTER_GETEVENTS; 66 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) { 67 need_sys_to_complete = true; 68 } 69 } 70 71 int ret = 0; 72 if( need_sys_to_submit || need_sys_to_complete ) { 73 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, 0p, _NSIG / 8); 74 if( ret < 0 ) { 75 switch((int)errno) { 76 case EAGAIN: 77 case EINTR: 78 ret = -1; 79 break; 80 default: 81 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 82 } 83 } 84 } 85 86 // Memory barrier 87 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 88 return ret; 444 89 } 445 90 … … 447 92 // I/O Polling 448 93 //============================================================================================= 449 struct io_user_data { 450 int32_t result; 451 $thread * thrd; 452 }; 94 static unsigned __collect_submitions( struct __io_data & ring ); 95 static uint32_t __release_consumed_submission( struct __io_data & ring ); 96 97 static inline void process(struct io_uring_cqe & cqe ) { 98 struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data; 99 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", data, cqe.res, data->thrd ); 100 101 data->result = cqe.res; 102 unpark( data->thrd __cfaabi_dbg_ctx2 ); 103 } 453 104 454 105 // Process a single completion message from the io_uring 455 106 // This is NOT thread-safe 456 static int __drain_io( struct __io_data & ring, sigset_t * mask, int waitcnt, bool in_kernel ) { 457 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 107 static [int, bool] __drain_io( & struct __io_data ring ) { 108 /* paranoid */ verify( !kernelTLS.preemption_state.enabled ); 109 110 unsigned to_submit = 0; 111 if( ring.poller_submits ) { 112 // If the poller thread also submits, then we need to aggregate the submissions which are ready 113 to_submit = __collect_submitions( ring ); 114 } 115 116 int ret = __io_uring_enter(ring, to_submit, true); 458 117 if( ret < 0 ) { 459 switch((int)errno) { 460 case EAGAIN: 461 case EINTR: 462 return -EAGAIN; 463 default: 464 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 465 } 466 } 118 return [0, true]; 119 } 120 121 // update statistics 122 if (to_submit > 0) { 123 __STATS__( true, 124 if( to_submit > 0 ) { 125 io.submit_q.submit_avg.rdy += to_submit; 126 io.submit_q.submit_avg.csm += ret; 127 io.submit_q.submit_avg.cnt += 1; 128 } 129 ) 130 } 131 132 // Release the consumed SQEs 133 __release_consumed_submission( ring ); 467 134 468 135 // Drain the queue 469 136 unsigned head = *ring.completion_q.head; 470 unsigned tail = __atomic_load_n(ring.completion_q.tail, __ATOMIC_ACQUIRE); 137 unsigned tail = *ring.completion_q.tail; 138 const uint32_t mask = *ring.completion_q.mask; 471 139 472 140 // Nothing was new return 0 473 141 if (head == tail) { 474 return 0;142 return [0, to_submit > 0]; 475 143 } 476 144 477 145 uint32_t count = tail - head; 146 /* paranoid */ verify( count != 0 ); 478 147 for(i; count) { 479 unsigned idx = (head + i) & (*ring.completion_q.mask);148 unsigned idx = (head + i) & mask; 480 149 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; 481 150 482 151 /* paranoid */ verify(&cqe); 483 152 484 struct io_user_data * data = (struct io_user_data *)cqe.user_data; 485 __cfadbg_print_safe( io, "Kernel I/O : Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd ); 486 487 data->result = cqe.res; 488 if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); } 489 else { __unpark( data->thrd __cfaabi_dbg_ctx2 ); } 490 } 491 492 // Allow new submissions to happen 493 V(ring.submit, count); 153 process( cqe ); 154 } 494 155 495 156 // Mark to the kernel that the cqe has been seen 496 157 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 158 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 497 159 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED ); 498 160 499 return count; 500 } 501 502 static void * __io_poller_slow( void * arg ) { 503 cluster * cltr = (cluster *)arg; 504 struct __io_data & ring = *cltr->io; 505 506 sigset_t mask; 507 sigfillset(&mask); 508 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) { 509 abort( "KERNEL ERROR: IO_URING - pthread_sigmask" ); 510 } 511 512 sigdelset( &mask, SIGUSR1 ); 513 514 verify( (*ring.submit_q.head) == (*ring.submit_q.tail) ); 515 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) ); 516 517 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring); 518 519 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 520 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 521 // In the user-thread approach drain and if anything was drained, 522 // batton pass to the user-thread 523 int count = __drain_io( ring, &mask, 1, true ); 161 return [count, count > 0 || to_submit > 0]; 162 } 163 164 void main( $io_ctx_thread & this ) { 165 epoll_event ev; 166 __ioctx_register( this, ev ); 167 168 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring); 169 170 int reset = 0; 171 // Then loop until we need to start 172 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) { 173 // Drain the io 174 int count; 175 bool again; 176 disable_interrupts(); 177 [count, again] = __drain_io( *this.ring ); 178 179 if(!again) reset++; 524 180 525 181 // Update statistics 526 #if !defined(__CFA_NO_STATISTICS__) 527 ring.completion_q.stats.completed_avg.val += count; 528 ring.completion_q.stats.completed_avg.slow_cnt += 1; 529 #endif 530 531 if(count > 0) { 532 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring); 533 __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 ); 534 wait( ring.poller.sem ); 535 } 536 } 537 } 538 else { 539 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 540 //In the naive approach, just poll the io completion queue directly 541 int count = __drain_io( ring, &mask, 1, true ); 542 543 // Update statistics 544 #if !defined(__CFA_NO_STATISTICS__) 545 ring.completion_q.stats.completed_avg.val += count; 546 ring.completion_q.stats.completed_avg.slow_cnt += 1; 547 #endif 548 } 549 } 550 551 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring); 552 553 return 0p; 554 } 555 556 void main( __io_poller_fast & this ) { 557 verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ); 558 559 // Start parked 560 park( __cfaabi_dbg_ctx ); 561 562 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring); 563 564 int reset = 0; 565 566 // Then loop until we need to start 567 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) { 568 // Drain the io 569 this.waiting = false; 570 int count = __drain_io( *this.ring, 0p, 0, false ); 571 reset += count > 0 ? 1 : 0; 572 573 // Update statistics 574 #if !defined(__CFA_NO_STATISTICS__) 575 this.ring->completion_q.stats.completed_avg.val += count; 576 this.ring->completion_q.stats.completed_avg.fast_cnt += 1; 577 #endif 578 579 this.waiting = true; 182 __STATS__( true, 183 io.complete_q.completed_avg.val += count; 184 io.complete_q.completed_avg.fast_cnt += 1; 185 ) 186 enable_interrupts( __cfaabi_dbg_ctx ); 187 188 // If we got something, just yield and check again 580 189 if(reset < 5) { 581 // If we got something, just yield and check again582 190 yield(); 583 191 } 192 // We didn't get anything baton pass to the slow poller 584 193 else { 585 // We didn't get anything baton pass to the slow poller 586 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring); 587 post( this.ring->poller.sem ); 588 park( __cfaabi_dbg_ctx ); 194 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self); 589 195 reset = 0; 196 197 // block this thread 198 __ioctx_prepare_block( this, ev ); 199 wait( this.sem ); 590 200 } 591 201 } … … 599 209 600 210 // Submition steps : 601 // 1 - We need to make sure we don't overflow any of the buffer, P(ring.submit) to make sure 602 // entries are available. The semaphore make sure that there is no more operations in 603 // progress then the number of entries in the buffer. This probably limits concurrency 604 // more than necessary since submitted but not completed operations don't need any 605 // entries in user space. However, I don't know what happens if we overflow the buffers 606 // because too many requests completed at once. This is a safe approach in all cases. 607 // Furthermore, with hundreds of entries, this may be okay. 608 // 609 // 2 - Allocate a queue entry. The ring already has memory for all entries but only the ones 211 // 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones 610 212 // listed in sq.array are visible by the kernel. For those not listed, the kernel does not 611 213 // offer any assurance that an entry is not being filled by multiple flags. Therefore, we 612 214 // need to write an allocator that allows allocating concurrently. 613 215 // 614 // 3- Actually fill the submit entry, this is the only simple and straightforward step.615 // 616 // 4- Append the entry index to the array and adjust the tail accordingly. This operation216 // 2 - Actually fill the submit entry, this is the only simple and straightforward step. 217 // 218 // 3 - Append the entry index to the array and adjust the tail accordingly. This operation 617 219 // needs to arrive to two concensus at the same time: 618 220 // A - The order in which entries are listed in the array: no two threads must pick the … … 622 224 // 623 225 624 static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring ) { 625 // Wait for a spot to be available 626 __attribute__((unused)) bool blocked = P(ring.submit); 627 #if !defined(__CFA_NO_STATISTICS__) 628 __atomic_fetch_add( &ring.submit_q.stats.submit_avg.block, blocked ? 1ul64 : 0ul64, __ATOMIC_RELAXED ); 629 #endif 630 631 // Allocate the sqe 632 uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST); 633 634 // Validate that we didn't overflow anything 635 // Check that nothing overflowed 636 /* paranoid */ verify( true ); 637 638 // Check that it goes head -> tail -> alloc and never head -> alloc -> tail 639 /* paranoid */ verify( true ); 640 641 // Return the sqe 642 return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx]; 643 } 644 645 static inline void __submit( struct __io_data & ring, uint32_t idx ) { 646 // get mutual exclusion 647 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 648 649 // Append to the list of ready entries 650 uint32_t * tail = ring.submit_q.tail; 226 [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) { 227 /* paranoid */ verify( data != 0 ); 228 229 // Prepare the data we need 230 __attribute((unused)) int len = 0; 231 __attribute((unused)) int block = 0; 232 uint32_t cnt = *ring.submit_q.num; 233 uint32_t mask = *ring.submit_q.mask; 234 235 disable_interrupts(); 236 uint32_t off = __tls_rand(); 237 enable_interrupts( __cfaabi_dbg_ctx ); 238 239 // Loop around looking for an available spot 240 for() { 241 // Look through the list starting at some offset 242 for(i; cnt) { 243 uint64_t expected = 0; 244 uint32_t idx = (i + off) & mask; 245 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 246 volatile uint64_t * udata = (volatile uint64_t *)&sqe->user_data; 247 248 if( *udata == expected && 249 __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) 250 { 251 // update statistics 252 __STATS__( false, 253 io.submit_q.alloc_avg.val += len; 254 io.submit_q.alloc_avg.block += block; 255 io.submit_q.alloc_avg.cnt += 1; 256 ) 257 258 259 // Success return the data 260 return [sqe, idx]; 261 } 262 verify(expected != data); 263 264 len ++; 265 } 266 267 block++; 268 yield(); 269 } 270 } 271 272 static inline uint32_t __submit_to_ready_array( struct __io_data & ring, uint32_t idx, const uint32_t mask ) { 273 /* paranoid */ verify( idx <= mask ); 274 /* paranoid */ verify( idx != -1ul32 ); 275 276 // We need to find a spot in the ready array 277 __attribute((unused)) int len = 0; 278 __attribute((unused)) int block = 0; 279 uint32_t ready_mask = ring.submit_q.ready_cnt - 1; 280 281 disable_interrupts(); 282 uint32_t off = __tls_rand(); 283 enable_interrupts( __cfaabi_dbg_ctx ); 284 285 uint32_t picked; 286 LOOKING: for() { 287 for(i; ring.submit_q.ready_cnt) { 288 picked = (i + off) & ready_mask; 289 uint32_t expected = -1ul32; 290 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) { 291 break LOOKING; 292 } 293 verify(expected != idx); 294 295 len ++; 296 } 297 298 block++; 299 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) { 300 __release_consumed_submission( ring ); 301 unlock( ring.submit_q.lock ); 302 } 303 else { 304 yield(); 305 } 306 } 307 308 // update statistics 309 __STATS__( false, 310 io.submit_q.look_avg.val += len; 311 io.submit_q.look_avg.block += block; 312 io.submit_q.look_avg.cnt += 1; 313 ) 314 315 return picked; 316 } 317 318 void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1))) { 319 __io_data & ring = *ctx->thrd.ring; 320 // Get now the data we definetely need 321 volatile uint32_t * const tail = ring.submit_q.tail; 322 const uint32_t mask = *ring.submit_q.mask; 323 324 // There are 2 submission schemes, check which one we are using 325 if( ring.poller_submits ) { 326 // If the poller thread submits, then we just need to add this to the ready array 327 __submit_to_ready_array( ring, idx, mask ); 328 329 post( ctx->thrd.sem ); 330 331 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() ); 332 } 333 else if( ring.eager_submits ) { 334 uint32_t picked = __submit_to_ready_array( ring, idx, mask ); 335 336 for() { 337 yield(); 338 339 // If some one else collected our index, we are done 340 #warning ABA problem 341 if( ring.submit_q.ready[picked] != idx ) { 342 __STATS__( false, 343 io.submit_q.helped += 1; 344 ) 345 return; 346 } 347 348 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) { 349 __STATS__( false, 350 io.submit_q.leader += 1; 351 ) 352 break; 353 } 354 355 __STATS__( false, 356 io.submit_q.busy += 1; 357 ) 358 } 359 360 // We got the lock 361 unsigned to_submit = __collect_submitions( ring ); 362 int ret = __io_uring_enter( ring, to_submit, false ); 363 if( ret < 0 ) { 364 unlock(ring.submit_q.lock); 365 return; 366 } 367 368 /* paranoid */ verify( ret > 0 || to_submit == 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) ); 369 370 // Release the consumed SQEs 371 __release_consumed_submission( ring ); 372 373 // update statistics 374 __STATS__( true, 375 io.submit_q.submit_avg.rdy += to_submit; 376 io.submit_q.submit_avg.csm += ret; 377 io.submit_q.submit_avg.cnt += 1; 378 ) 379 380 unlock(ring.submit_q.lock); 381 } 382 else { 383 // get mutual exclusion 384 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 385 386 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0, 387 /* paranoid */ "index %u already reclaimed\n" 388 /* paranoid */ "head %u, prev %u, tail %u\n" 389 /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n", 390 /* paranoid */ idx, 391 /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail 392 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ] 393 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ] 394 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ] 395 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ] 396 /* paranoid */ ); 397 398 // Append to the list of ready entries 399 400 /* paranoid */ verify( idx <= mask ); 401 ring.submit_q.array[ (*tail) & mask ] = idx; 402 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 403 404 // Submit however, many entries need to be submitted 405 int ret = __io_uring_enter( ring, 1, false ); 406 if( ret < 0 ) { 407 switch((int)errno) { 408 default: 409 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 410 } 411 } 412 413 // update statistics 414 __STATS__( false, 415 io.submit_q.submit_avg.csm += 1; 416 io.submit_q.submit_avg.cnt += 1; 417 ) 418 419 // Release the consumed SQEs 420 __release_consumed_submission( ring ); 421 422 unlock(ring.submit_q.lock); 423 424 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret ); 425 } 426 } 427 428 static unsigned __collect_submitions( struct __io_data & ring ) { 429 /* paranoid */ verify( ring.submit_q.ready != 0p ); 430 /* paranoid */ verify( ring.submit_q.ready_cnt > 0 ); 431 432 unsigned to_submit = 0; 433 uint32_t tail = *ring.submit_q.tail; 651 434 const uint32_t mask = *ring.submit_q.mask; 652 435 653 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 654 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 655 656 // Submit however, many entries need to be submitted 657 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0); 658 if( ret < 0 ) { 659 switch((int)errno) { 660 default: 661 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 662 } 663 } 664 665 // update statistics 666 #if !defined(__CFA_NO_STATISTICS__) 667 ring.submit_q.stats.submit_avg.val += 1; 668 ring.submit_q.stats.submit_avg.cnt += 1; 669 #endif 670 671 unlock(ring.submit_q.lock); 672 // Make sure that idx was submitted 673 // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us 674 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret ); 675 } 676 677 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) { 678 this.opcode = opcode; 679 #if !defined(IOSQE_ASYNC) 680 this.flags = 0; 681 #else 682 this.flags = IOSQE_ASYNC; 683 #endif 684 this.ioprio = 0; 685 this.fd = fd; 686 this.off = 0; 687 this.addr = 0; 688 this.len = 0; 689 this.rw_flags = 0; 690 this.__pad2[0] = this.__pad2[1] = this.__pad2[2] = 0; 691 } 692 693 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd, void * addr, uint32_t len, uint64_t off ) { 694 (this){ opcode, fd }; 695 this.off = off; 696 this.addr = (uint64_t)addr; 697 this.len = len; 698 } 699 700 701 //============================================================================================= 702 // I/O Interface 703 //============================================================================================= 704 705 #define __submit_prelude \ 706 struct __io_data & ring = *active_cluster()->io; \ 707 struct io_uring_sqe * sqe; \ 708 uint32_t idx; \ 709 [sqe, idx] = __submit_alloc( ring ); 710 711 #define __submit_wait \ 712 io_user_data data = { 0, active_thread() }; \ 713 /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \ 714 sqe->user_data = (uint64_t)&data; \ 715 __submit( ring, idx ); \ 716 park( __cfaabi_dbg_ctx ); \ 717 return data.result; 436 // Go through the list of ready submissions 437 for( i; ring.submit_q.ready_cnt ) { 438 // replace any submission with the sentinel, to consume it. 439 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); 440 441 // If it was already the sentinel, then we are done 442 if( idx == -1ul32 ) continue; 443 444 // If we got a real submission, append it to the list 445 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask; 446 to_submit++; 447 } 448 449 // Increment the tail based on how many we are ready to submit 450 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST); 451 452 return to_submit; 453 } 454 455 static uint32_t __release_consumed_submission( struct __io_data & ring ) { 456 const uint32_t smask = *ring.submit_q.mask; 457 458 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0; 459 uint32_t chead = *ring.submit_q.head; 460 uint32_t phead = ring.submit_q.prev_head; 461 ring.submit_q.prev_head = chead; 462 unlock(ring.submit_q.release_lock); 463 464 uint32_t count = chead - phead; 465 for( i; count ) { 466 uint32_t idx = ring.submit_q.array[ (phead + i) & smask ]; 467 ring.submit_q.sqes[ idx ].user_data = 0; 468 } 469 return count; 470 } 718 471 #endif 719 720 // Some forward declarations721 extern "C" {722 #include <unistd.h>723 #include <sys/types.h>724 #include <sys/socket.h>725 #include <sys/syscall.h>726 727 #if defined(HAVE_PREADV2)728 struct iovec;729 extern ssize_t preadv2 (int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);730 #endif731 #if defined(HAVE_PWRITEV2)732 struct iovec;733 extern ssize_t pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);734 #endif735 736 extern int fsync(int fd);737 extern int sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags);738 739 struct msghdr;740 struct sockaddr;741 extern ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);742 extern ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);743 extern ssize_t send(int sockfd, const void *buf, size_t len, int flags);744 extern ssize_t recv(int sockfd, void *buf, size_t len, int flags);745 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);746 extern int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);747 748 extern int fallocate(int fd, int mode, uint64_t offset, uint64_t len);749 extern int posix_fadvise(int fd, uint64_t offset, uint64_t len, int advice);750 extern int madvise(void *addr, size_t length, int advice);751 752 extern int openat(int dirfd, const char *pathname, int flags, mode_t mode);753 extern int close(int fd);754 755 extern ssize_t read (int fd, void *buf, size_t count);756 }757 758 //-----------------------------------------------------------------------------759 // Asynchronous operations760 #if defined(HAVE_PREADV2)761 ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {762 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READV)763 return preadv2(fd, iov, iovcnt, offset, flags);764 #else765 __submit_prelude766 767 (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset };768 769 __submit_wait770 #endif771 }772 #endif773 774 #if defined(HAVE_PWRITEV2)775 ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {776 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITEV)777 return pwritev2(fd, iov, iovcnt, offset, flags);778 #else779 __submit_prelude780 781 (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset };782 783 __submit_wait784 #endif785 }786 #endif787 788 int cfa_fsync(int fd) {789 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FSYNC)790 return fsync(fd);791 #else792 __submit_prelude793 794 (*sqe){ IORING_OP_FSYNC, fd };795 796 __submit_wait797 #endif798 }799 800 int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags) {801 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SYNC_FILE_RANGE)802 return sync_file_range(fd, offset, nbytes, flags);803 #else804 __submit_prelude805 806 (*sqe){ IORING_OP_SYNC_FILE_RANGE, fd };807 sqe->off = offset;808 sqe->len = nbytes;809 sqe->sync_range_flags = flags;810 811 __submit_wait812 #endif813 }814 815 816 ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags) {817 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SENDMSG)818 return sendmsg(sockfd, msg, flags);819 #else820 __submit_prelude821 822 (*sqe){ IORING_OP_SENDMSG, sockfd, msg, 1, 0 };823 sqe->msg_flags = flags;824 825 __submit_wait826 #endif827 }828 829 ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags) {830 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECVMSG)831 return recvmsg(sockfd, msg, flags);832 #else833 __submit_prelude834 835 (*sqe){ IORING_OP_RECVMSG, sockfd, msg, 1, 0 };836 sqe->msg_flags = flags;837 838 __submit_wait839 #endif840 }841 842 ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags) {843 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SEND)844 return send( sockfd, buf, len, flags );845 #else846 __submit_prelude847 848 (*sqe){ IORING_OP_SEND, sockfd };849 sqe->addr = (uint64_t)buf;850 sqe->len = len;851 sqe->msg_flags = flags;852 853 __submit_wait854 #endif855 }856 857 ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags) {858 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECV)859 return recv( sockfd, buf, len, flags );860 #else861 __submit_prelude862 863 (*sqe){ IORING_OP_RECV, sockfd };864 sqe->addr = (uint64_t)buf;865 sqe->len = len;866 sqe->msg_flags = flags;867 868 __submit_wait869 #endif870 }871 872 int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {873 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_ACCEPT)874 return accept4( sockfd, addr, addrlen, flags );875 #else876 __submit_prelude877 878 (*sqe){ IORING_OP_ACCEPT, sockfd };879 sqe->addr = addr;880 sqe->addr2 = addrlen;881 sqe->accept_flags = flags;882 883 __submit_wait884 #endif885 }886 887 int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) {888 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CONNECT)889 return connect( sockfd, addr, addrlen );890 #else891 __submit_prelude892 893 (*sqe){ IORING_OP_CONNECT, sockfd };894 sqe->addr = (uint64_t)addr;895 sqe->off = addrlen;896 897 __submit_wait898 #endif899 }900 901 int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len) {902 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FALLOCATE)903 return fallocate( fd, mode, offset, len );904 #else905 __submit_prelude906 907 (*sqe){ IORING_OP_FALLOCATE, fd };908 sqe->off = offset;909 sqe->len = length;910 sqe->mode = mode;911 912 __submit_wait913 #endif914 }915 916 int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice) {917 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FADVISE)918 return posix_fadvise( fd, offset, len, advice );919 #else920 __submit_prelude921 922 (*sqe){ IORING_OP_FADVISE, fd };923 sqe->off = (uint64_t)offset;924 sqe->len = length;925 sqe->fadvise_advice = advice;926 927 __submit_wait928 #endif929 }930 931 int cfa_madvise(void *addr, size_t length, int advice) {932 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_MADVISE)933 return madvise( addr, length, advice );934 #else935 __submit_prelude936 937 (*sqe){ IORING_OP_MADVISE, 0 };938 sqe->addr = (uint64_t)addr;939 sqe->len = length;940 sqe->fadvise_advice = advice;941 942 __submit_wait943 #endif944 }945 946 int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode) {947 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_OPENAT)948 return openat( dirfd, pathname, flags, mode );949 #else950 __submit_prelude951 952 (*sqe){ IORING_OP_OPENAT, dirfd };953 sqe->addr = (uint64_t)pathname;954 sqe->open_flags = flags;955 sqe->mode = mode;956 957 __submit_wait958 #endif959 }960 961 int cfa_close(int fd) {962 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CLOSE)963 return close( fd );964 #else965 __submit_prelude966 967 (*sqe){ IORING_OP_CLOSE, fd };968 969 __submit_wait970 #endif971 }972 973 974 ssize_t cfa_read(int fd, void *buf, size_t count) {975 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READ)976 return read( fd, buf, count );977 #else978 __submit_prelude979 980 (*sqe){ IORING_OP_READ, fd, buf, count, 0 };981 982 __submit_wait983 #endif984 }985 986 ssize_t cfa_write(int fd, void *buf, size_t count) {987 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITE)988 return read( fd, buf, count );989 #else990 __submit_prelude991 992 (*sqe){ IORING_OP_WRITE, fd, buf, count, 0 };993 994 __submit_wait995 #endif996 }997 998 //-----------------------------------------------------------------------------999 // Check if a function is asynchronous1000 1001 // Macro magic to reduce the size of the following switch case1002 #define IS_DEFINED_APPLY(f, ...) f(__VA_ARGS__)1003 #define IS_DEFINED_SECOND(first, second, ...) second1004 #define IS_DEFINED_TEST(expansion) _CFA_IO_FEATURE_##expansion1005 #define IS_DEFINED(macro) IS_DEFINED_APPLY( IS_DEFINED_SECOND,IS_DEFINED_TEST(macro) false, true)1006 1007 bool has_user_level_blocking( fptr_t func ) {1008 #if defined(HAVE_LINUX_IO_URING_H)1009 #if defined(HAVE_PREADV2)1010 if( /*func == (fptr_t)preadv2 || */1011 func == (fptr_t)cfa_preadv2 )1012 #define _CFA_IO_FEATURE_IORING_OP_READV ,1013 return IS_DEFINED(IORING_OP_READV);1014 #endif1015 1016 #if defined(HAVE_PWRITEV2)1017 if( /*func == (fptr_t)pwritev2 || */1018 func == (fptr_t)cfa_pwritev2 )1019 #define _CFA_IO_FEATURE_IORING_OP_WRITEV ,1020 return IS_DEFINED(IORING_OP_WRITEV);1021 #endif1022 1023 if( /*func == (fptr_t)fsync || */1024 func == (fptr_t)cfa_fsync )1025 #define _CFA_IO_FEATURE_IORING_OP_FSYNC ,1026 return IS_DEFINED(IORING_OP_FSYNC);1027 1028 if( /*func == (fptr_t)ync_file_range || */1029 func == (fptr_t)cfa_sync_file_range )1030 #define _CFA_IO_FEATURE_IORING_OP_SYNC_FILE_RANGE ,1031 return IS_DEFINED(IORING_OP_SYNC_FILE_RANGE);1032 1033 if( /*func == (fptr_t)sendmsg || */1034 func == (fptr_t)cfa_sendmsg )1035 #define _CFA_IO_FEATURE_IORING_OP_SENDMSG ,1036 return IS_DEFINED(IORING_OP_SENDMSG);1037 1038 if( /*func == (fptr_t)recvmsg || */1039 func == (fptr_t)cfa_recvmsg )1040 #define _CFA_IO_FEATURE_IORING_OP_RECVMSG ,1041 return IS_DEFINED(IORING_OP_RECVMSG);1042 1043 if( /*func == (fptr_t)send || */1044 func == (fptr_t)cfa_send )1045 #define _CFA_IO_FEATURE_IORING_OP_SEND ,1046 return IS_DEFINED(IORING_OP_SEND);1047 1048 if( /*func == (fptr_t)recv || */1049 func == (fptr_t)cfa_recv )1050 #define _CFA_IO_FEATURE_IORING_OP_RECV ,1051 return IS_DEFINED(IORING_OP_RECV);1052 1053 if( /*func == (fptr_t)accept4 || */1054 func == (fptr_t)cfa_accept4 )1055 #define _CFA_IO_FEATURE_IORING_OP_ACCEPT ,1056 return IS_DEFINED(IORING_OP_ACCEPT);1057 1058 if( /*func == (fptr_t)connect || */1059 func == (fptr_t)cfa_connect )1060 #define _CFA_IO_FEATURE_IORING_OP_CONNECT ,1061 return IS_DEFINED(IORING_OP_CONNECT);1062 1063 if( /*func == (fptr_t)fallocate || */1064 func == (fptr_t)cfa_fallocate )1065 #define _CFA_IO_FEATURE_IORING_OP_FALLOCATE ,1066 return IS_DEFINED(IORING_OP_FALLOCATE);1067 1068 if( /*func == (fptr_t)posix_fadvise || */1069 func == (fptr_t)cfa_fadvise )1070 #define _CFA_IO_FEATURE_IORING_OP_FADVISE ,1071 return IS_DEFINED(IORING_OP_FADVISE);1072 1073 if( /*func == (fptr_t)madvise || */1074 func == (fptr_t)cfa_madvise )1075 #define _CFA_IO_FEATURE_IORING_OP_MADVISE ,1076 return IS_DEFINED(IORING_OP_MADVISE);1077 1078 if( /*func == (fptr_t)openat || */1079 func == (fptr_t)cfa_openat )1080 #define _CFA_IO_FEATURE_IORING_OP_OPENAT ,1081 return IS_DEFINED(IORING_OP_OPENAT);1082 1083 if( /*func == (fptr_t)close || */1084 func == (fptr_t)cfa_close )1085 #define _CFA_IO_FEATURE_IORING_OP_CLOSE ,1086 return IS_DEFINED(IORING_OP_CLOSE);1087 1088 if( /*func == (fptr_t)read || */1089 func == (fptr_t)cfa_read )1090 #define _CFA_IO_FEATURE_IORING_OP_READ ,1091 return IS_DEFINED(IORING_OP_READ);1092 1093 if( /*func == (fptr_t)write || */1094 func == (fptr_t)cfa_write )1095 #define _CFA_IO_FEATURE_IORING_OP_WRITE ,1096 return IS_DEFINED(IORING_OP_WRITE);1097 #endif1098 1099 return false;1100 }
Note:
See TracChangeset
for help on using the changeset viewer.