source: libcfa/src/concurrency/io.cfa@ f6660520

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since f6660520 was f6660520, checked in by Thierry Delisle <tdelisle@…>, 6 years ago

Added new implementation of io_uring that uses user-thread

  • Property mode set to 100644
File size: 28.1 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Thu Apr 23 17:31:00 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#include "kernel.hfa"
17
18#if !defined(HAVE_LINUX_IO_URING_H)
19 void __kernel_io_startup( cluster & ) {
20 // Nothing to do without io_uring
21 }
22
23 void __kernel_io_start_thrd( cluster & ) {
24 // Nothing to do without io_uring
25 }
26
27 void __kernel_io_stop_thrd ( cluster & ) {
28 // Nothing to do without io_uring
29 }
30
31 void __kernel_io_shutdown( cluster & ) {
32 // Nothing to do without io_uring
33 }
34
35#else
36 extern "C" {
37 #define _GNU_SOURCE /* See feature_test_macros(7) */
38 #include <errno.h>
39 #include <stdint.h>
40 #include <string.h>
41 #include <unistd.h>
42 #include <sys/mman.h>
43 #include <sys/syscall.h>
44
45 #include <linux/io_uring.h>
46 }
47
48 #include "bits/signal.hfa"
49 #include "kernel_private.hfa"
50 #include "thread.hfa"
51
52 uint32_t entries_per_cluster() {
53 return 256;
54 }
55
56 static void * __io_poller_slow( void * arg );
57
58 // Weirdly, some systems that do support io_uring don't actually define these
59 #ifdef __alpha__
60 /*
61 * alpha is the only exception, all other architectures
62 * have common numbers for new system calls.
63 */
64 #ifndef __NR_io_uring_setup
65 #define __NR_io_uring_setup 535
66 #endif
67 #ifndef __NR_io_uring_enter
68 #define __NR_io_uring_enter 536
69 #endif
70 #ifndef __NR_io_uring_register
71 #define __NR_io_uring_register 537
72 #endif
73 #else /* !__alpha__ */
74 #ifndef __NR_io_uring_setup
75 #define __NR_io_uring_setup 425
76 #endif
77 #ifndef __NR_io_uring_enter
78 #define __NR_io_uring_enter 426
79 #endif
80 #ifndef __NR_io_uring_register
81 #define __NR_io_uring_register 427
82 #endif
83 #endif
84
85 #if defined(__CFA_IO_POLLING_USER__)
86 void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
87 this.ring = &cltr.io;
88 (this.thrd){ "I/O Poller", cltr };
89 }
90 void ^?{}( __io_poller_fast & mutex this );
91 void main( __io_poller_fast & this );
92 static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
93 void ^?{}( __io_poller_fast & mutex this ) {}
94 #endif
95
96//=============================================================================================
97// I/O Startup / Shutdown logic
98//=============================================================================================
99 void __kernel_io_startup( cluster & this, bool main_cluster ) {
100 // Step 1 : call to setup
101 struct io_uring_params params;
102 memset(&params, 0, sizeof(params));
103
104 uint32_t nentries = entries_per_cluster();
105
106 int fd = syscall(__NR_io_uring_setup, nentries, &params );
107 if(fd < 0) {
108 abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
109 }
110
111 // Step 2 : mmap result
112 memset(&this.io, 0, sizeof(struct io_ring));
113 struct io_uring_sq & sq = this.io.submit_q;
114 struct io_uring_cq & cq = this.io.completion_q;
115
116 // calculate the right ring size
117 sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) );
118 cq.ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe));
119
120 // Requires features
121 #if defined(IORING_FEAT_SINGLE_MMAP)
122 // adjust the size according to the parameters
123 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
124 cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz);
125 }
126 #endif
127
128 // mmap the Submit Queue into existence
129 sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
130 if (sq.ring_ptr == (void*)MAP_FAILED) {
131 abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
132 }
133
134 // Requires features
135 #if defined(IORING_FEAT_SINGLE_MMAP)
136 // mmap the Completion Queue into existence (may or may not be needed)
137 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
138 cq->ring_ptr = sq->ring_ptr;
139 }
140 else
141 #endif
142 {
143 // We need multiple call to MMAP
144 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
145 if (cq.ring_ptr == (void*)MAP_FAILED) {
146 munmap(sq.ring_ptr, sq.ring_sz);
147 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
148 }
149 }
150
151 // mmap the submit queue entries
152 size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
153 sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
154 if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
155 munmap(sq.ring_ptr, sq.ring_sz);
156 if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
157 abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
158 }
159
160 // Get the pointers from the kernel to fill the structure
161 // submit queue
162 sq.head = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
163 sq.tail = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
164 sq.mask = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
165 sq.num = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
166 sq.flags = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
167 sq.dropped = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
168 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
169 sq.alloc = *sq.tail;
170 sq.ready = *sq.tail;
171
172 // completion queue
173 cq.head = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
174 cq.tail = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
175 cq.mask = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
176 cq.num = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
177 cq.overflow = ( uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
178 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
179
180 // some paranoid checks
181 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask );
182 /* paranoid */ verifyf( (*cq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
183 /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
184 /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
185
186 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
187 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
188 /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
189 /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
190
191 // Update the global ring info
192 this.io.flags = params.flags;
193 this.io.fd = fd;
194 this.io.done = false;
195 (this.io.submit){ min(*sq.num, *cq.num) };
196
197 // Initialize statistics
198 #if !defined(__CFA_NO_STATISTICS__)
199 this.io.submit_q.stats.submit_avg.val = 0;
200 this.io.submit_q.stats.submit_avg.cnt = 0;
201 this.io.completion_q.stats.completed_avg.val = 0;
202 this.io.completion_q.stats.completed_avg.cnt = 0;
203 #endif
204
205 if(!main_cluster) {
206 __kernel_io_finish_start( this );
207 }
208 }
209
210 void __kernel_io_finish_start( cluster & this ) {
211 #if defined(__CFA_IO_POLLING_USER__)
212 (this.io.poller.fast){ this };
213 __thrd_start( this.io.poller.fast, main );
214 #endif
215
216 // Create the poller thread
217 this.io.poller.slow.stack = __create_pthread( &this.io.poller.slow.kthrd, __io_poller_slow, &this );
218 }
219
220 void __kernel_io_prepare_stop( cluster & this ) {
221 // Notify the poller thread of the shutdown
222 __atomic_store_n(&this.io.done, true, __ATOMIC_SEQ_CST);
223
224 // Stop the IO Poller
225 sigval val = { 1 };
226 pthread_sigqueue( this.io.poller.slow.kthrd, SIGUSR1, val );
227 #if defined(__CFA_IO_POLLING_USER__)
228 post( this.io.poller.sem );
229 #endif
230
231 // Wait for the poller thread to finish
232 pthread_join( this.io.poller.slow.kthrd, 0p );
233 free( this.io.poller.slow.stack );
234
235 #if defined(__CFA_IO_POLLING_USER__)
236 // unpark the fast io_poller
237 unpark( &this.io.poller.fast.thrd __cfaabi_dbg_ctx2 );
238
239 ^(this.io.poller.fast){};
240 #endif
241 }
242
243 void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
244 if(!main_cluster) {
245 __kernel_io_prepare_stop( this );
246 }
247
248 // print statistics
249 #if !defined(__CFA_NO_STATISTICS__)
250 if(this.print_stats) {
251 __cfaabi_bits_print_safe( STDERR_FILENO,
252 "----- I/O uRing Stats -----\n"
253 "- total submit calls : %llu\n"
254 "- avg submit : %lf\n"
255 "- total wait calls : %llu\n"
256 "- avg completion/wait : %lf\n",
257 this.io.submit_q.stats.submit_avg.cnt,
258 ((double)this.io.submit_q.stats.submit_avg.val) / this.io.submit_q.stats.submit_avg.cnt,
259 this.io.completion_q.stats.completed_avg.cnt,
260 ((double)this.io.completion_q.stats.completed_avg.val) / this.io.completion_q.stats.completed_avg.cnt
261 );
262 }
263 #endif
264
265 // Shutdown the io rings
266 struct io_uring_sq & sq = this.io.submit_q;
267 struct io_uring_cq & cq = this.io.completion_q;
268
269 // unmap the submit queue entries
270 munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
271
272 // unmap the Submit Queue ring
273 munmap(sq.ring_ptr, sq.ring_sz);
274
275 // unmap the Completion Queue ring, if it is different
276 if (cq.ring_ptr != sq.ring_ptr) {
277 munmap(cq.ring_ptr, cq.ring_sz);
278 }
279
280 // close the file descriptor
281 close(this.io.fd);
282 }
283
284//=============================================================================================
285// I/O Polling
286//=============================================================================================
287 struct io_user_data {
288 int32_t result;
289 $thread * thrd;
290 };
291
292 // Process a single completion message from the io_uring
293 // This is NOT thread-safe
294 static int __drain_io( struct io_ring & ring, sigset_t * mask, int waitcnt, bool in_kernel ) {
295 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
296 if( ret < 0 ) {
297 switch((int)errno) {
298 case EAGAIN:
299 case EINTR:
300 return -EAGAIN;
301 default:
302 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
303 }
304 }
305
306 // Drain the queue
307 unsigned head = *ring.completion_q.head;
308 unsigned tail = __atomic_load_n(ring.completion_q.tail, __ATOMIC_ACQUIRE);
309
310 // Nothing was new return 0
311 if (head == tail) {
312 #if !defined(__CFA_NO_STATISTICS__)
313 ring.completion_q.stats.completed_avg.cnt += 1;
314 #endif
315 return 0;
316 }
317
318 uint32_t count = tail - head;
319 for(i; count) {
320 unsigned idx = (head + i) & (*ring.completion_q.mask);
321 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
322
323 /* paranoid */ verify(&cqe);
324
325 struct io_user_data * data = (struct io_user_data *)cqe.user_data;
326 // __cfaabi_bits_print_safe( STDERR_FILENO, "Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
327
328 data->result = cqe.res;
329 if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
330 else { __unpark( data->thrd __cfaabi_dbg_ctx2 ); }
331 }
332
333 // Allow new submissions to happen
334 V(ring.submit, count);
335
336 // Mark to the kernel that the cqe has been seen
337 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
338 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
339
340 // Update statistics
341 #if !defined(__CFA_NO_STATISTICS__)
342 ring.completion_q.stats.completed_avg.val += count;
343 ring.completion_q.stats.completed_avg.cnt += 1;
344 #endif
345
346 return count;
347 }
348
349 static void * __io_poller_slow( void * arg ) {
350 cluster * cltr = (cluster *)arg;
351 struct io_ring & ring = cltr->io;
352
353 sigset_t mask;
354 sigfillset(&mask);
355 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
356 abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
357 }
358
359 sigdelset( &mask, SIGUSR1 );
360
361 verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
362 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
363
364 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
365 #if defined(__CFA_IO_POLLING_USER__)
366
367 // In the user-thread approach drain and if anything was drained,
368 // batton pass to the user-thread
369 int count = __drain_io( ring, &mask, 1, true );
370 if(count > 0) {
371 __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
372 wait( ring.poller.sem );
373 }
374
375 #else
376
377 //In the naive approach, just poll the io completion queue directly
378 __drain_io( ring, &mask, 1, true );
379
380 #endif
381 }
382
383 return 0p;
384 }
385
386 #if defined(__CFA_IO_POLLING_USER__)
387 void main( __io_poller_fast & this ) {
388 // Start parked
389 park( __cfaabi_dbg_ctx );
390
391 // Then loop until we need to start
392 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
393 // Drain the io
394 if(0 > __drain_io( *this.ring, 0p, 0, false )) {
395 // If we got something, just yield and check again
396 yield();
397 }
398 else {
399 // We didn't get anything baton pass to the slow poller
400 post( this.ring->poller.sem );
401 park( __cfaabi_dbg_ctx );
402 }
403 }
404 }
405 #endif
406
407//=============================================================================================
408// I/O Submissions
409//=============================================================================================
410
411// Submition steps :
412// 1 - We need to make sure we don't overflow any of the buffer, P(ring.submit) to make sure
413// entries are available. The semaphore make sure that there is no more operations in
414// progress then the number of entries in the buffer. This probably limits concurrency
415// more than necessary since submitted but not completed operations don't need any
416// entries in user space. However, I don't know what happens if we overflow the buffers
417// because too many requests completed at once. This is a safe approach in all cases.
418// Furthermore, with hundreds of entries, this may be okay.
419//
420// 2 - Allocate a queue entry. The ring already has memory for all entries but only the ones
421// listed in sq.array are visible by the kernel. For those not listed, the kernel does not
422// offer any assurance that an entry is not being filled by multiple flags. Therefore, we
423// need to write an allocator that allows allocating concurrently.
424//
425// 3 - Actually fill the submit entry, this is the only simple and straightforward step.
426//
427// 4 - Append the entry index to the array and adjust the tail accordingly. This operation
428// needs to arrive to two concensus at the same time:
429// A - The order in which entries are listed in the array: no two threads must pick the
430// same index for their entries
431// B - When can the tail be update for the kernel. EVERY entries in the array between
432// head and tail must be fully filled and shouldn't ever be touched again.
433//
434
435 static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct io_ring & ring ) {
436 // Wait for a spot to be available
437 P(ring.submit);
438
439 // Allocate the sqe
440 uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST);
441
442 // Validate that we didn't overflow anything
443 // Check that nothing overflowed
444 /* paranoid */ verify( true );
445
446 // Check that it goes head -> tail -> alloc and never head -> alloc -> tail
447 /* paranoid */ verify( true );
448
449 // Return the sqe
450 return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx];
451 }
452
453 static inline void __submit( struct io_ring & ring, uint32_t idx ) {
454 // get mutual exclusion
455 lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
456
457 // Append to the list of ready entries
458 uint32_t * tail = ring.submit_q.tail;
459 const uint32_t mask = *ring.submit_q.mask;
460
461 ring.submit_q.array[ (*tail) & mask ] = idx & mask;
462 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
463
464 // Submit however, many entries need to be submitted
465 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
466 // __cfaabi_bits_print_safe( STDERR_FILENO, "Performed io_submit, returned %d\n", ret );
467 if( ret < 0 ) {
468 switch((int)errno) {
469 default:
470 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
471 }
472 }
473
474 // update statistics
475 #if !defined(__CFA_NO_STATISTICS__)
476 ring.submit_q.stats.submit_avg.val += 1;
477 ring.submit_q.stats.submit_avg.cnt += 1;
478 #endif
479
480 unlock(ring.submit_q.lock);
481 // Make sure that idx was submitted
482 // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us
483 }
484
485 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) {
486 this.opcode = opcode;
487 #if !defined(IOSQE_ASYNC)
488 this.flags = 0;
489 #else
490 this.flags = IOSQE_ASYNC;
491 #endif
492 this.ioprio = 0;
493 this.fd = fd;
494 this.off = 0;
495 this.addr = 0;
496 this.len = 0;
497 this.rw_flags = 0;
498 this.__pad2[0] = this.__pad2[1] = this.__pad2[2] = 0;
499 }
500
501 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd, void * addr, uint32_t len, uint64_t off ) {
502 (this){ opcode, fd };
503 this.off = off;
504 this.addr = (uint64_t)addr;
505 this.len = len;
506 }
507
508
509//=============================================================================================
510// I/O Interface
511//=============================================================================================
512
513 #define __submit_prelude \
514 struct io_ring & ring = active_cluster()->io; \
515 struct io_uring_sqe * sqe; \
516 uint32_t idx; \
517 [sqe, idx] = __submit_alloc( ring );
518
519 #define __submit_wait \
520 io_user_data data = { 0, active_thread() }; \
521 /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \
522 sqe->user_data = (uint64_t)&data; \
523 __submit( ring, idx ); \
524 park( __cfaabi_dbg_ctx ); \
525 return data.result;
526#endif
527
528// Some forward declarations
529extern "C" {
530 #include <sys/types.h>
531 struct iovec;
532 extern ssize_t preadv2 (int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
533 extern ssize_t pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
534
535 extern int fsync(int fd);
536 extern int sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags);
537
538 struct msghdr;
539 struct sockaddr;
540 extern ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
541 extern ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
542 extern ssize_t send(int sockfd, const void *buf, size_t len, int flags);
543 extern ssize_t recv(int sockfd, void *buf, size_t len, int flags);
544 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
545 extern int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
546
547 extern int fallocate(int fd, int mode, uint64_t offset, uint64_t len);
548 extern int posix_fadvise(int fd, uint64_t offset, uint64_t len, int advice);
549 extern int madvise(void *addr, size_t length, int advice);
550
551 extern int openat(int dirfd, const char *pathname, int flags, mode_t mode);
552 extern int close(int fd);
553
554 struct statx;
555 extern int statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf);
556
557 extern ssize_t read (int fd, void *buf, size_t count);
558}
559
560//-----------------------------------------------------------------------------
561// Asynchronous operations
562ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
563 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READV)
564 return preadv2(fd, iov, iovcnt, offset, flags);
565 #else
566 __submit_prelude
567
568 (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset };
569
570 __submit_wait
571 #endif
572}
573
574ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
575 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITEV)
576 return pwritev2(fd, iov, iovcnt, offset, flags);
577 #else
578 __submit_prelude
579
580 (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset };
581
582 __submit_wait
583 #endif
584}
585
586int cfa_fsync(int fd) {
587 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FSYNC)
588 return fsync(fd);
589 #else
590 __submit_prelude
591
592 (*sqe){ IORING_OP_FSYNC, fd };
593
594 __submit_wait
595 #endif
596}
597
598int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags) {
599 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SYNC_FILE_RANGE)
600 return sync_file_range(fd, offset, nbytes, flags);
601 #else
602 __submit_prelude
603
604 (*sqe){ IORING_OP_SYNC_FILE_RANGE, fd };
605 sqe->off = offset;
606 sqe->len = nbytes;
607 sqe->sync_range_flags = flags;
608
609 __submit_wait
610 #endif
611}
612
613
614ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags) {
615 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SENDMSG)
616 return recv(sockfd, msg, flags);
617 #else
618 __submit_prelude
619
620 (*sqe){ IORING_OP_SENDMSG, sockfd, msg, 1, 0 };
621 sqe->msg_flags = flags;
622
623 __submit_wait
624 #endif
625}
626
627ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags) {
628 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECVMSG)
629 return recv(sockfd, msg, flags);
630 #else
631 __submit_prelude
632
633 (*sqe){ IORING_OP_RECVMSG, sockfd, msg, 1, 0 };
634 sqe->msg_flags = flags;
635
636 __submit_wait
637 #endif
638}
639
640ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags) {
641 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SEND)
642 return send( sockfd, buf, len, flags );
643 #else
644 __submit_prelude
645
646 (*sqe){ IORING_OP_SEND, sockfd };
647 sqe->addr = (uint64_t)buf;
648 sqe->len = len;
649 sqe->msg_flags = flags;
650
651 __submit_wait
652 #endif
653}
654
655ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags) {
656 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECV)
657 return recv( sockfd, buf, len, flags );
658 #else
659 __submit_prelude
660
661 (*sqe){ IORING_OP_RECV, sockfd };
662 sqe->addr = (uint64_t)buf;
663 sqe->len = len;
664 sqe->msg_flags = flags;
665
666 __submit_wait
667 #endif
668}
669
670int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
671 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_ACCEPT)
672 return accept4( sockfd, addr, addrlen, flags );
673 #else
674 __submit_prelude
675
676 (*sqe){ IORING_OP_ACCEPT, sockfd };
677 sqe->addr = addr;
678 sqe->addr2 = addrlen;
679 sqe->accept_flags = flags;
680
681 __submit_wait
682 #endif
683}
684
685int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) {
686 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CONNECT)
687 return connect( sockfd, addr, addrlen );
688 #else
689 __submit_prelude
690
691 (*sqe){ IORING_OP_CONNECT, sockfd };
692 sqe->addr = (uint64_t)addr;
693 sqe->off = addrlen;
694
695 __submit_wait
696 #endif
697}
698
699int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len) {
700 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FALLOCATE)
701 return fallocate( fd, mode, offset, len );
702 #else
703 __submit_prelude
704
705 (*sqe){ IORING_OP_FALLOCATE, fd };
706 sqe->off = offset;
707 sqe->len = length;
708 sqe->mode = mode;
709
710 __submit_wait
711 #endif
712}
713
714int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice) {
715 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FADVISE)
716 return posix_fadvise( fd, offset, len, advice );
717 #else
718 __submit_prelude
719
720 (*sqe){ IORING_OP_FADVISE, fd };
721 sqe->off = (uint64_t)offset;
722 sqe->len = length;
723 sqe->fadvise_advice = advice;
724
725 __submit_wait
726 #endif
727}
728
729int cfa_madvise(void *addr, size_t length, int advice) {
730 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_MADVISE)
731 return madvise( addr, length, advice );
732 #else
733 __submit_prelude
734
735 (*sqe){ IORING_OP_MADVISE, 0 };
736 sqe->addr = (uint64_t)addr;
737 sqe->len = length;
738 sqe->fadvise_advice = advice;
739
740 __submit_wait
741 #endif
742}
743
744int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode) {
745 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_OPENAT)
746 return openat( dirfd, pathname, flags, mode );
747 #else
748 __submit_prelude
749
750 (*sqe){ IORING_OP_OPENAT, dirfd };
751 sqe->addr = (uint64_t)pathname;
752 sqe->open_flags = flags;
753 sqe->mode = mode;
754
755 __submit_wait
756 #endif
757}
758
759int cfa_close(int fd) {
760 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CLOSE)
761 return close( fd );
762 #else
763 __submit_prelude
764
765 (*sqe){ IORING_OP_CLOSE, fd };
766
767 __submit_wait
768 #endif
769}
770
771int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf) {
772 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_STATX)
773 //return statx( dirfd, pathname, flags, mask, statxbuf );
774 return syscall( __NR_io_uring_setup, dirfd, pathname, flags, mask, statxbuf );
775 #else
776 __submit_prelude
777
778 (*sqe){ IORING_OP_STATX, dirfd };
779 sqe->addr = (uint64_t)pathname;
780 sqe->statx_flags = flags;
781 sqe->len = mask;
782 sqe->off = (uint64_t)statxbuf;
783
784 __submit_wait
785 #endif
786}
787
788
789ssize_t cfa_read(int fd, void *buf, size_t count) {
790 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READ)
791 return read( fd, buf, count );
792 #else
793 __submit_prelude
794
795 (*sqe){ IORING_OP_READ, fd, buf, count, 0 };
796
797 __submit_wait
798 #endif
799}
800
801ssize_t cfa_write(int fd, void *buf, size_t count) {
802 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITE)
803 return read( fd, buf, count );
804 #else
805 __submit_prelude
806
807 (*sqe){ IORING_OP_WRITE, fd, buf, count, 0 };
808
809 __submit_wait
810 #endif
811}
812
813//-----------------------------------------------------------------------------
814// Check if a function is asynchronous
815
816// Macro magic to reduce the size of the following switch case
817#define IS_DEFINED_APPLY(f, ...) f(__VA_ARGS__)
818#define IS_DEFINED_SECOND(first, second, ...) second
819#define IS_DEFINED_TEST(expansion) _CFA_IO_FEATURE_##expansion
820#define IS_DEFINED(macro) IS_DEFINED_APPLY( IS_DEFINED_SECOND,IS_DEFINED_TEST(macro) false, true)
821
822bool has_user_level_blocking( fptr_t func ) {
823 #if defined(HAVE_LINUX_IO_URING_H)
824 if( /*func == (fptr_t)preadv2 || */
825 func == (fptr_t)cfa_preadv2 )
826 #define _CFA_IO_FEATURE_IORING_OP_READV ,
827 return IS_DEFINED(IORING_OP_READV);
828
829 if( /*func == (fptr_t)pwritev2 || */
830 func == (fptr_t)cfa_pwritev2 )
831 #define _CFA_IO_FEATURE_IORING_OP_WRITEV ,
832 return IS_DEFINED(IORING_OP_WRITEV);
833
834 if( /*func == (fptr_t)fsync || */
835 func == (fptr_t)cfa_fsync )
836 #define _CFA_IO_FEATURE_IORING_OP_FSYNC ,
837 return IS_DEFINED(IORING_OP_FSYNC);
838
839 if( /*func == (fptr_t)ync_file_range || */
840 func == (fptr_t)cfa_sync_file_range )
841 #define _CFA_IO_FEATURE_IORING_OP_SYNC_FILE_RANGE ,
842 return IS_DEFINED(IORING_OP_SYNC_FILE_RANGE);
843
844 if( /*func == (fptr_t)sendmsg || */
845 func == (fptr_t)cfa_sendmsg )
846 #define _CFA_IO_FEATURE_IORING_OP_SENDMSG ,
847 return IS_DEFINED(IORING_OP_SENDMSG);
848
849 if( /*func == (fptr_t)recvmsg || */
850 func == (fptr_t)cfa_recvmsg )
851 #define _CFA_IO_FEATURE_IORING_OP_RECVMSG ,
852 return IS_DEFINED(IORING_OP_RECVMSG);
853
854 if( /*func == (fptr_t)send || */
855 func == (fptr_t)cfa_send )
856 #define _CFA_IO_FEATURE_IORING_OP_SEND ,
857 return IS_DEFINED(IORING_OP_SEND);
858
859 if( /*func == (fptr_t)recv || */
860 func == (fptr_t)cfa_recv )
861 #define _CFA_IO_FEATURE_IORING_OP_RECV ,
862 return IS_DEFINED(IORING_OP_RECV);
863
864 if( /*func == (fptr_t)accept4 || */
865 func == (fptr_t)cfa_accept4 )
866 #define _CFA_IO_FEATURE_IORING_OP_ACCEPT ,
867 return IS_DEFINED(IORING_OP_ACCEPT);
868
869 if( /*func == (fptr_t)connect || */
870 func == (fptr_t)cfa_connect )
871 #define _CFA_IO_FEATURE_IORING_OP_CONNECT ,
872 return IS_DEFINED(IORING_OP_CONNECT);
873
874 if( /*func == (fptr_t)fallocate || */
875 func == (fptr_t)cfa_fallocate )
876 #define _CFA_IO_FEATURE_IORING_OP_FALLOCATE ,
877 return IS_DEFINED(IORING_OP_FALLOCATE);
878
879 if( /*func == (fptr_t)posix_fadvise || */
880 func == (fptr_t)cfa_fadvise )
881 #define _CFA_IO_FEATURE_IORING_OP_FADVISE ,
882 return IS_DEFINED(IORING_OP_FADVISE);
883
884 if( /*func == (fptr_t)madvise || */
885 func == (fptr_t)cfa_madvise )
886 #define _CFA_IO_FEATURE_IORING_OP_MADVISE ,
887 return IS_DEFINED(IORING_OP_MADVISE);
888
889 if( /*func == (fptr_t)openat || */
890 func == (fptr_t)cfa_openat )
891 #define _CFA_IO_FEATURE_IORING_OP_OPENAT ,
892 return IS_DEFINED(IORING_OP_OPENAT);
893
894 if( /*func == (fptr_t)close || */
895 func == (fptr_t)cfa_close )
896 #define _CFA_IO_FEATURE_IORING_OP_CLOSE ,
897 return IS_DEFINED(IORING_OP_CLOSE);
898
899 if( /*func == (fptr_t)statx || */
900 func == (fptr_t)cfa_statx )
901 #define _CFA_IO_FEATURE_IORING_OP_STATX ,
902 return IS_DEFINED(IORING_OP_STATX);
903
904 if( /*func == (fptr_t)read || */
905 func == (fptr_t)cfa_read )
906 #define _CFA_IO_FEATURE_IORING_OP_READ ,
907 return IS_DEFINED(IORING_OP_READ);
908
909 if( /*func == (fptr_t)write || */
910 func == (fptr_t)cfa_write )
911 #define _CFA_IO_FEATURE_IORING_OP_WRITE ,
912 return IS_DEFINED(IORING_OP_WRITE);
913 #endif
914
915 return false;
916}
Note: See TracBrowser for help on using the repository browser.