source: libcfa/src/concurrency/io.cfa @ 87e0b015

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 87e0b015 was 87e0b015, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

  • Property mode set to 100644
File size: 33.6 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Thu Apr 23 17:31:00 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16// #define __CFA_DEBUG_PRINT_IO__
17// #define __CFA_DEBUG_PRINT_IO_CORE__
18
19#include "kernel.hfa"
20
21#if !defined(HAVE_LINUX_IO_URING_H)
22        void __kernel_io_startup( cluster &, int, bool ) {
23                // Nothing to do without io_uring
24        }
25
26        void __kernel_io_finish_start( cluster & ) {
27                // Nothing to do without io_uring
28        }
29
30        void __kernel_io_prepare_stop( cluster & ) {
31                // Nothing to do without io_uring
32        }
33
34        void __kernel_io_shutdown( cluster &, bool ) {
35                // Nothing to do without io_uring
36        }
37
38#else
39        extern "C" {
40                #define _GNU_SOURCE         /* See feature_test_macros(7) */
41                #include <errno.h>
42                #include <stdint.h>
43                #include <string.h>
44                #include <unistd.h>
45                #include <sys/mman.h>
46                #include <sys/syscall.h>
47
48                #include <linux/io_uring.h>
49        }
50
51        #include "bits/signal.hfa"
52        #include "kernel_private.hfa"
53        #include "thread.hfa"
54
55        uint32_t entries_per_cluster() {
56                return 256;
57        }
58
59        static void * __io_poller_slow( void * arg );
60
61        // Weirdly, some systems that do support io_uring don't actually define these
62        #ifdef __alpha__
63                /*
64                * alpha is the only exception, all other architectures
65                * have common numbers for new system calls.
66                */
67                #ifndef __NR_io_uring_setup
68                        #define __NR_io_uring_setup           535
69                #endif
70                #ifndef __NR_io_uring_enter
71                        #define __NR_io_uring_enter           536
72                #endif
73                #ifndef __NR_io_uring_register
74                        #define __NR_io_uring_register        537
75                #endif
76        #else /* !__alpha__ */
77                #ifndef __NR_io_uring_setup
78                        #define __NR_io_uring_setup           425
79                #endif
80                #ifndef __NR_io_uring_enter
81                        #define __NR_io_uring_enter           426
82                #endif
83                #ifndef __NR_io_uring_register
84                        #define __NR_io_uring_register        427
85                #endif
86        #endif
87
88        // Fast poller user-thread
89        // Not using the "thread" keyword because we want to control
90        // more carefully when to start/stop it
91        struct __io_poller_fast {
92                struct __io_data * ring;
93                bool waiting;
94                $thread thrd;
95        };
96
97        void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
98                this.ring = cltr.io;
99                this.waiting = true;
100                (this.thrd){ "Fast I/O Poller", cltr };
101        }
102        void ^?{}( __io_poller_fast & mutex this );
103        void main( __io_poller_fast & this );
104        static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
105        void ^?{}( __io_poller_fast & mutex this ) {}
106
107        struct __submition_data {
108                // Head and tail of the ring (associated with array)
109                volatile uint32_t * head;
110                volatile uint32_t * tail;
111
112                // The actual kernel ring which uses head/tail
113                // indexes into the sqes arrays
114                uint32_t * array;
115
116                // number of entries and mask to go with it
117                const uint32_t * num;
118                const uint32_t * mask;
119
120                // Submission flags (Not sure what for)
121                uint32_t * flags;
122
123                // number of sqes not submitted (whatever that means)
124                uint32_t * dropped;
125
126                // Like head/tail but not seen by the kernel
127                volatile uint32_t alloc;
128                volatile uint32_t ready;
129
130                __spinlock_t lock;
131
132                // A buffer of sqes (not the actual ring)
133                struct io_uring_sqe * sqes;
134
135                // The location and size of the mmaped area
136                void * ring_ptr;
137                size_t ring_sz;
138
139                // Statistics
140                #if !defined(__CFA_NO_STATISTICS__)
141                        struct {
142                                struct {
143                                        volatile unsigned long long int val;
144                                        volatile unsigned long long int cnt;
145                                        volatile unsigned long long int block;
146                                } submit_avg;
147                        } stats;
148                #endif
149        };
150
151        struct __completion_data {
152                // Head and tail of the ring
153                volatile uint32_t * head;
154                volatile uint32_t * tail;
155
156                // number of entries and mask to go with it
157                const uint32_t * mask;
158                const uint32_t * num;
159
160                // number of cqes not submitted (whatever that means)
161                uint32_t * overflow;
162
163                // the kernel ring
164                struct io_uring_cqe * cqes;
165
166                // The location and size of the mmaped area
167                void * ring_ptr;
168                size_t ring_sz;
169
170                // Statistics
171                #if !defined(__CFA_NO_STATISTICS__)
172                        struct {
173                                struct {
174                                        unsigned long long int val;
175                                        unsigned long long int slow_cnt;
176                                        unsigned long long int fast_cnt;
177                                } completed_avg;
178                        } stats;
179                #endif
180        };
181
182        struct __io_data {
183                struct __submition_data submit_q;
184                struct __completion_data completion_q;
185                uint32_t ring_flags;
186                int cltr_flags;
187                int fd;
188                semaphore submit;
189                volatile bool done;
190                struct {
191                        struct {
192                                void * stack;
193                                pthread_t kthrd;
194                        } slow;
195                        __io_poller_fast fast;
196                        __bin_sem_t sem;
197                } poller;
198        };
199
200//=============================================================================================
201// I/O Startup / Shutdown logic
202//=============================================================================================
203        void __kernel_io_startup( cluster & this, int io_flags, bool main_cluster ) {
204                this.io = malloc();
205
206                // Step 1 : call to setup
207                struct io_uring_params params;
208                memset(&params, 0, sizeof(params));
209
210                uint32_t nentries = entries_per_cluster();
211
212                int fd = syscall(__NR_io_uring_setup, nentries, &params );
213                if(fd < 0) {
214                        abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
215                }
216
217                // Step 2 : mmap result
218                memset( this.io, 0, sizeof(struct __io_data) );
219                struct __submition_data  & sq = this.io->submit_q;
220                struct __completion_data & cq = this.io->completion_q;
221
222                // calculate the right ring size
223                sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
224                cq.ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
225
226                // Requires features
227                #if defined(IORING_FEAT_SINGLE_MMAP)
228                        // adjust the size according to the parameters
229                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
230                                cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz);
231                        }
232                #endif
233
234                // mmap the Submit Queue into existence
235                sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
236                if (sq.ring_ptr == (void*)MAP_FAILED) {
237                        abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
238                }
239
240                // Requires features
241                #if defined(IORING_FEAT_SINGLE_MMAP)
242                        // mmap the Completion Queue into existence (may or may not be needed)
243                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
244                                cq->ring_ptr = sq->ring_ptr;
245                        }
246                        else
247                #endif
248                {
249                        // We need multiple call to MMAP
250                        cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
251                        if (cq.ring_ptr == (void*)MAP_FAILED) {
252                                munmap(sq.ring_ptr, sq.ring_sz);
253                                abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
254                        }
255                }
256
257                // mmap the submit queue entries
258                size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
259                sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
260                if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
261                        munmap(sq.ring_ptr, sq.ring_sz);
262                        if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
263                        abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
264                }
265
266                // Get the pointers from the kernel to fill the structure
267                // submit queue
268                sq.head    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
269                sq.tail    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
270                sq.mask    = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
271                sq.num     = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
272                sq.flags   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
273                sq.dropped = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
274                sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
275                sq.alloc = *sq.tail;
276                sq.ready = *sq.tail;
277
278                // completion queue
279                cq.head     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
280                cq.tail     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
281                cq.mask     = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
282                cq.num      = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
283                cq.overflow = (         uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
284                cq.cqes   = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
285
286                // some paranoid checks
287                /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
288                /* paranoid */ verifyf( (*cq.num)  >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
289                /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
290                /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
291
292                /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
293                /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
294                /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
295                /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
296
297                // Update the global ring info
298                this.io->ring_flags = params.flags;
299                this.io->cltr_flags = io_flags;
300                this.io->fd         = fd;
301                this.io->done       = false;
302                (this.io->submit){ min(*sq.num, *cq.num) };
303
304                // Initialize statistics
305                #if !defined(__CFA_NO_STATISTICS__)
306                        this.io->submit_q.stats.submit_avg.val   = 0;
307                        this.io->submit_q.stats.submit_avg.cnt   = 0;
308                        this.io->submit_q.stats.submit_avg.block = 0;
309                        this.io->completion_q.stats.completed_avg.val = 0;
310                        this.io->completion_q.stats.completed_avg.slow_cnt = 0;
311                        this.io->completion_q.stats.completed_avg.fast_cnt = 0;
312                #endif
313
314                if(!main_cluster) {
315                        __kernel_io_finish_start( this );
316                }
317        }
318
319        void __kernel_io_finish_start( cluster & this ) {
320                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
321                        __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
322                        (this.io->poller.fast){ this };
323                        __thrd_start( this.io->poller.fast, main );
324                }
325
326                // Create the poller thread
327                __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this);
328                this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
329        }
330
331        void __kernel_io_prepare_stop( cluster & this ) {
332                __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
333                // Notify the poller thread of the shutdown
334                __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
335
336                // Stop the IO Poller
337                sigval val = { 1 };
338                pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
339                post( this.io->poller.sem );
340
341                // Wait for the poller thread to finish
342                pthread_join( this.io->poller.slow.kthrd, 0p );
343                free( this.io->poller.slow.stack );
344
345                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
346
347                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
348                        with( this.io->poller.fast ) {
349                                /* paranoid */ verify( waiting ); // The thread shouldn't be in a system call
350                                /* paranoid */ verify( this.procs.head == 0p || &this == mainCluster );
351                                /* paranoid */ verify( this.idles.head == 0p || &this == mainCluster );
352
353                                // We need to adjust the clean-up based on where the thread is
354                                if( thrd.preempted != __NO_PREEMPTION ) {
355
356                                        // This is the tricky case
357                                        // The thread was preempted and now it is on the ready queue
358                                        /* paranoid */ verify( thrd.state == Active );           // The thread better be in this state
359                                        /* paranoid */ verify( thrd.next == 1p );                // The thread should be the last on the list
360                                        /* paranoid */ verify( this.ready_queue.head == &thrd ); // The thread should be the only thing on the list
361
362                                        // Remove the thread from the ready queue of this cluster
363                                        this.ready_queue.head = 1p;
364                                        thrd.next = 0p;
365
366                                        // Fixup the thread state
367                                        thrd.state = Blocked;
368                                        thrd.preempted = __NO_PREEMPTION;
369
370                                        // Pretend like the thread was blocked all along
371                                }
372                                // !!! This is not an else if !!!
373                                if( thrd.state == Blocked ) {
374
375                                        // This is the "easy case"
376                                        // The thread is parked and can easily be moved to active cluster
377                                        verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
378                                        thrd.curr_cluster = active_cluster();
379
380                        // unpark the fast io_poller
381                                        unpark( &thrd __cfaabi_dbg_ctx2 );
382                                }
383                                else {
384
385                                        // The thread is in a weird state
386                                        // I don't know what to do here
387                                        abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n");
388                                }
389
390                        }
391
392                        ^(this.io->poller.fast){};
393
394                        __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
395                }
396        }
397
398        void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
399                if(!main_cluster) {
400                        __kernel_io_prepare_stop( this );
401                }
402
403                // print statistics
404                #if !defined(__CFA_NO_STATISTICS__)
405                        if(this.print_stats) {
406                                with(this.io->submit_q.stats, this.io->completion_q.stats) {
407                                        __cfaabi_bits_print_safe( STDERR_FILENO,
408                                                "----- I/O uRing Stats -----\n"
409                                                "- total submit calls  : %'15llu\n"
410                                                "- avg submit          : %'18.2lf\n"
411                                                "- pre-submit block %%  : %'18.2lf\n"
412                                                "- total wait calls    : %'15llu   (%'llu slow, %'llu fast)\n"
413                                                "- avg completion/wait : %'18.2lf\n",
414                                                submit_avg.cnt,
415                                                ((double)submit_avg.val) / submit_avg.cnt,
416                                                (100.0 * submit_avg.block) / submit_avg.cnt,
417                                                completed_avg.slow_cnt + completed_avg.fast_cnt,
418                                                completed_avg.slow_cnt,  completed_avg.fast_cnt,
419                                                ((double)completed_avg.val) / (completed_avg.slow_cnt + completed_avg.fast_cnt)
420                                        );
421                                }
422                        }
423                #endif
424
425                // Shutdown the io rings
426                struct __submition_data  & sq = this.io->submit_q;
427                struct __completion_data & cq = this.io->completion_q;
428
429                // unmap the submit queue entries
430                munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
431
432                // unmap the Submit Queue ring
433                munmap(sq.ring_ptr, sq.ring_sz);
434
435                // unmap the Completion Queue ring, if it is different
436                if (cq.ring_ptr != sq.ring_ptr) {
437                        munmap(cq.ring_ptr, cq.ring_sz);
438                }
439
440                // close the file descriptor
441                close(this.io->fd);
442
443                free( this.io );
444        }
445
446//=============================================================================================
447// I/O Polling
448//=============================================================================================
449        struct io_user_data {
450                int32_t result;
451                $thread * thrd;
452        };
453
454        // Process a single completion message from the io_uring
455        // This is NOT thread-safe
456        static int __drain_io( struct __io_data & ring, sigset_t * mask, int waitcnt, bool in_kernel ) {
457                int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
458                if( ret < 0 ) {
459                        switch((int)errno) {
460                        case EAGAIN:
461                        case EINTR:
462                                return -EAGAIN;
463                        default:
464                                abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
465                        }
466                }
467
468                // Drain the queue
469                unsigned head = *ring.completion_q.head;
470                unsigned tail = __atomic_load_n(ring.completion_q.tail, __ATOMIC_ACQUIRE);
471
472                // Nothing was new return 0
473                if (head == tail) {
474                        return 0;
475                }
476
477                uint32_t count = tail - head;
478                for(i; count) {
479                        unsigned idx = (head + i) & (*ring.completion_q.mask);
480                        struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
481
482                        /* paranoid */ verify(&cqe);
483
484                        struct io_user_data * data = (struct io_user_data *)cqe.user_data;
485                        __cfadbg_print_safe( io, "Kernel I/O : Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
486
487                        data->result = cqe.res;
488                        if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
489                        else         { __unpark( data->thrd __cfaabi_dbg_ctx2 ); }
490                }
491
492                // Allow new submissions to happen
493                V(ring.submit, count);
494
495                // Mark to the kernel that the cqe has been seen
496                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
497                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
498
499                return count;
500        }
501
502        static void * __io_poller_slow( void * arg ) {
503                cluster * cltr = (cluster *)arg;
504                struct __io_data & ring = *cltr->io;
505
506                sigset_t mask;
507                sigfillset(&mask);
508                if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
509                        abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
510                }
511
512                sigdelset( &mask, SIGUSR1 );
513
514                verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
515                verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
516
517                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
518
519                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
520                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
521                                // In the user-thread approach drain and if anything was drained,
522                                // batton pass to the user-thread
523                                int count = __drain_io( ring, &mask, 1, true );
524
525                                // Update statistics
526                                #if !defined(__CFA_NO_STATISTICS__)
527                                        ring.completion_q.stats.completed_avg.val += count;
528                                        ring.completion_q.stats.completed_avg.slow_cnt += 1;
529                                #endif
530
531                                if(count > 0) {
532                                        __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
533                                        __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
534                                        wait( ring.poller.sem );
535                                }
536                        }
537                }
538                else {
539                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
540                                //In the naive approach, just poll the io completion queue directly
541                                int count = __drain_io( ring, &mask, 1, true );
542
543                                // Update statistics
544                                #if !defined(__CFA_NO_STATISTICS__)
545                                        ring.completion_q.stats.completed_avg.val += count;
546                                        ring.completion_q.stats.completed_avg.slow_cnt += 1;
547                                #endif
548                        }
549                }
550
551                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
552
553                return 0p;
554        }
555
556        void main( __io_poller_fast & this ) {
557                verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD );
558
559                // Start parked
560                park( __cfaabi_dbg_ctx );
561
562                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
563
564                int reset = 0;
565
566                // Then loop until we need to start
567                while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
568                        // Drain the io
569                        this.waiting = false;
570                        int count = __drain_io( *this.ring, 0p, 0, false );
571                        reset += count > 0 ? 1 : 0;
572
573                        // Update statistics
574                        #if !defined(__CFA_NO_STATISTICS__)
575                                this.ring->completion_q.stats.completed_avg.val += count;
576                                this.ring->completion_q.stats.completed_avg.fast_cnt += 1;
577                        #endif
578
579                        this.waiting = true;
580                        if(reset < 5) {
581                                // If we got something, just yield and check again
582                                yield();
583                        }
584                        else {
585                                // We didn't get anything baton pass to the slow poller
586                                __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
587                                post( this.ring->poller.sem );
588                                park( __cfaabi_dbg_ctx );
589                                reset = 0;
590                        }
591                }
592
593                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
594        }
595
596//=============================================================================================
597// I/O Submissions
598//=============================================================================================
599
600// Submition steps :
601// 1 - We need to make sure we don't overflow any of the buffer, P(ring.submit) to make sure
602//     entries are available. The semaphore make sure that there is no more operations in
603//     progress then the number of entries in the buffer. This probably limits concurrency
604//     more than necessary since submitted but not completed operations don't need any
605//     entries in user space. However, I don't know what happens if we overflow the buffers
606//     because too many requests completed at once. This is a safe approach in all cases.
607//     Furthermore, with hundreds of entries, this may be okay.
608//
609// 2 - Allocate a queue entry. The ring already has memory for all entries but only the ones
610//     listed in sq.array are visible by the kernel. For those not listed, the kernel does not
611//     offer any assurance that an entry is not being filled by multiple flags. Therefore, we
612//     need to write an allocator that allows allocating concurrently.
613//
614// 3 - Actually fill the submit entry, this is the only simple and straightforward step.
615//
616// 4 - Append the entry index to the array and adjust the tail accordingly. This operation
617//     needs to arrive to two concensus at the same time:
618//     A - The order in which entries are listed in the array: no two threads must pick the
619//         same index for their entries
620//     B - When can the tail be update for the kernel. EVERY entries in the array between
621//         head and tail must be fully filled and shouldn't ever be touched again.
622//
623
624        static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring ) {
625                // Wait for a spot to be available
626                __attribute__((unused)) bool blocked = P(ring.submit);
627                #if !defined(__CFA_NO_STATISTICS__)
628                        __atomic_fetch_add( &ring.submit_q.stats.submit_avg.block, blocked ? 1ul64 : 0ul64, __ATOMIC_RELAXED );
629                #endif
630
631                // Allocate the sqe
632                uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST);
633
634                // Validate that we didn't overflow anything
635                // Check that nothing overflowed
636                /* paranoid */ verify( true );
637
638                // Check that it goes head -> tail -> alloc and never head -> alloc -> tail
639                /* paranoid */ verify( true );
640
641                // Return the sqe
642                return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx];
643        }
644
645        static inline void __submit( struct __io_data & ring, uint32_t idx ) {
646                // get mutual exclusion
647                lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
648
649                // Append to the list of ready entries
650                uint32_t * tail = ring.submit_q.tail;
651                const uint32_t mask = *ring.submit_q.mask;
652
653                ring.submit_q.array[ (*tail) & mask ] = idx & mask;
654                __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
655
656                // Submit however, many entries need to be submitted
657                int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
658                if( ret < 0 ) {
659                        switch((int)errno) {
660                        default:
661                                abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
662                        }
663                }
664
665                // update statistics
666                #if !defined(__CFA_NO_STATISTICS__)
667                        ring.submit_q.stats.submit_avg.val += 1;
668                        ring.submit_q.stats.submit_avg.cnt += 1;
669                #endif
670
671                unlock(ring.submit_q.lock);
672                // Make sure that idx was submitted
673                // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us
674                __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
675        }
676
677        static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) {
678                this.opcode = opcode;
679                #if !defined(IOSQE_ASYNC)
680                        this.flags = 0;
681                #else
682                        this.flags = IOSQE_ASYNC;
683                #endif
684                this.ioprio = 0;
685                this.fd = fd;
686                this.off = 0;
687                this.addr = 0;
688                this.len = 0;
689                this.rw_flags = 0;
690                this.__pad2[0] = this.__pad2[1] = this.__pad2[2] = 0;
691        }
692
693        static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd, void * addr, uint32_t len, uint64_t off ) {
694                (this){ opcode, fd };
695                this.off = off;
696                this.addr = (uint64_t)addr;
697                this.len = len;
698        }
699
700
701//=============================================================================================
702// I/O Interface
703//=============================================================================================
704
705        #define __submit_prelude \
706                struct __io_data & ring = *active_cluster()->io; \
707                struct io_uring_sqe * sqe; \
708                uint32_t idx; \
709                [sqe, idx] = __submit_alloc( ring );
710
711        #define __submit_wait \
712                io_user_data data = { 0, active_thread() }; \
713                /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \
714                sqe->user_data = (uint64_t)&data; \
715                __submit( ring, idx ); \
716                park( __cfaabi_dbg_ctx ); \
717                return data.result;
718#endif
719
720// Some forward declarations
721extern "C" {
722        #include <unistd.h>
723        #include <sys/types.h>
724        #include <sys/socket.h>
725        #include <sys/syscall.h>
726
727#if defined(HAVE_PREADV2)
728        struct iovec;
729        extern ssize_t preadv2 (int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
730#endif
731#if defined(HAVE_PWRITEV2)
732        struct iovec;
733        extern ssize_t pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
734#endif
735
736        extern int fsync(int fd);
737        extern int sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags);
738
739        struct msghdr;
740        struct sockaddr;
741        extern ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
742        extern ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
743        extern ssize_t send(int sockfd, const void *buf, size_t len, int flags);
744        extern ssize_t recv(int sockfd, void *buf, size_t len, int flags);
745        extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
746        extern int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
747
748        extern int fallocate(int fd, int mode, uint64_t offset, uint64_t len);
749        extern int posix_fadvise(int fd, uint64_t offset, uint64_t len, int advice);
750        extern int madvise(void *addr, size_t length, int advice);
751
752        extern int openat(int dirfd, const char *pathname, int flags, mode_t mode);
753        extern int close(int fd);
754
755        extern ssize_t read (int fd, void *buf, size_t count);
756}
757
758//-----------------------------------------------------------------------------
759// Asynchronous operations
760#if defined(HAVE_PREADV2)
761        ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
762                #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READV)
763                        return preadv2(fd, iov, iovcnt, offset, flags);
764                #else
765                        __submit_prelude
766
767                        (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset };
768
769                        __submit_wait
770                #endif
771        }
772#endif
773
774#if defined(HAVE_PWRITEV2)
775        ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
776                #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITEV)
777                        return pwritev2(fd, iov, iovcnt, offset, flags);
778                #else
779                        __submit_prelude
780
781                        (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset };
782
783                        __submit_wait
784                #endif
785        }
786#endif
787
788int cfa_fsync(int fd) {
789        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FSYNC)
790                return fsync(fd);
791        #else
792                __submit_prelude
793
794                (*sqe){ IORING_OP_FSYNC, fd };
795
796                __submit_wait
797        #endif
798}
799
800int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags) {
801        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SYNC_FILE_RANGE)
802                return sync_file_range(fd, offset, nbytes, flags);
803        #else
804                __submit_prelude
805
806                (*sqe){ IORING_OP_SYNC_FILE_RANGE, fd };
807                sqe->off = offset;
808                sqe->len = nbytes;
809                sqe->sync_range_flags = flags;
810
811                __submit_wait
812        #endif
813}
814
815
816ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags) {
817        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SENDMSG)
818                return sendmsg(sockfd, msg, flags);
819        #else
820                __submit_prelude
821
822                (*sqe){ IORING_OP_SENDMSG, sockfd, msg, 1, 0 };
823                sqe->msg_flags = flags;
824
825                __submit_wait
826        #endif
827}
828
829ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags) {
830        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECVMSG)
831                return recvmsg(sockfd, msg, flags);
832        #else
833                __submit_prelude
834
835                (*sqe){ IORING_OP_RECVMSG, sockfd, msg, 1, 0 };
836                sqe->msg_flags = flags;
837
838                __submit_wait
839        #endif
840}
841
842ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags) {
843        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SEND)
844                return send( sockfd, buf, len, flags );
845        #else
846                __submit_prelude
847
848                (*sqe){ IORING_OP_SEND, sockfd };
849                sqe->addr = (uint64_t)buf;
850                sqe->len = len;
851                sqe->msg_flags = flags;
852
853                __submit_wait
854        #endif
855}
856
857ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags) {
858        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECV)
859                return recv( sockfd, buf, len, flags );
860        #else
861                __submit_prelude
862
863                (*sqe){ IORING_OP_RECV, sockfd };
864                sqe->addr = (uint64_t)buf;
865                sqe->len = len;
866                sqe->msg_flags = flags;
867
868                __submit_wait
869        #endif
870}
871
872int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
873        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_ACCEPT)
874                return accept4( sockfd, addr, addrlen, flags );
875        #else
876                __submit_prelude
877
878                (*sqe){ IORING_OP_ACCEPT, sockfd };
879                sqe->addr = addr;
880                sqe->addr2 = addrlen;
881                sqe->accept_flags = flags;
882
883                __submit_wait
884        #endif
885}
886
887int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) {
888        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CONNECT)
889                return connect( sockfd, addr, addrlen );
890        #else
891                __submit_prelude
892
893                (*sqe){ IORING_OP_CONNECT, sockfd };
894                sqe->addr = (uint64_t)addr;
895                sqe->off = addrlen;
896
897                __submit_wait
898        #endif
899}
900
901int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len) {
902        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FALLOCATE)
903                return fallocate( fd, mode, offset, len );
904        #else
905                __submit_prelude
906
907                (*sqe){ IORING_OP_FALLOCATE, fd };
908                sqe->off = offset;
909                sqe->len = length;
910                sqe->mode = mode;
911
912                __submit_wait
913        #endif
914}
915
916int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice) {
917        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FADVISE)
918                return posix_fadvise( fd, offset, len, advice );
919        #else
920                __submit_prelude
921
922                (*sqe){ IORING_OP_FADVISE, fd };
923                sqe->off = (uint64_t)offset;
924                sqe->len = length;
925                sqe->fadvise_advice = advice;
926
927                __submit_wait
928        #endif
929}
930
931int cfa_madvise(void *addr, size_t length, int advice) {
932        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_MADVISE)
933                return madvise( addr, length, advice );
934        #else
935                __submit_prelude
936
937                (*sqe){ IORING_OP_MADVISE, 0 };
938                sqe->addr = (uint64_t)addr;
939                sqe->len = length;
940                sqe->fadvise_advice = advice;
941
942                __submit_wait
943        #endif
944}
945
946int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode) {
947        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_OPENAT)
948                return openat( dirfd, pathname, flags, mode );
949        #else
950                __submit_prelude
951
952                (*sqe){ IORING_OP_OPENAT, dirfd };
953                sqe->addr = (uint64_t)pathname;
954                sqe->open_flags = flags;
955                sqe->mode = mode;
956
957                __submit_wait
958        #endif
959}
960
961int cfa_close(int fd) {
962        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CLOSE)
963                return close( fd );
964        #else
965                __submit_prelude
966
967                (*sqe){ IORING_OP_CLOSE, fd };
968
969                __submit_wait
970        #endif
971}
972
973
974ssize_t cfa_read(int fd, void *buf, size_t count) {
975        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READ)
976                return read( fd, buf, count );
977        #else
978                __submit_prelude
979
980                (*sqe){ IORING_OP_READ, fd, buf, count, 0 };
981
982                __submit_wait
983        #endif
984}
985
986ssize_t cfa_write(int fd, void *buf, size_t count) {
987        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITE)
988                return read( fd, buf, count );
989        #else
990                __submit_prelude
991
992                (*sqe){ IORING_OP_WRITE, fd, buf, count, 0 };
993
994                __submit_wait
995        #endif
996}
997
998//-----------------------------------------------------------------------------
999// Check if a function is asynchronous
1000
1001// Macro magic to reduce the size of the following switch case
1002#define IS_DEFINED_APPLY(f, ...) f(__VA_ARGS__)
1003#define IS_DEFINED_SECOND(first, second, ...) second
1004#define IS_DEFINED_TEST(expansion) _CFA_IO_FEATURE_##expansion
1005#define IS_DEFINED(macro) IS_DEFINED_APPLY( IS_DEFINED_SECOND,IS_DEFINED_TEST(macro) false, true)
1006
1007bool has_user_level_blocking( fptr_t func ) {
1008        #if defined(HAVE_LINUX_IO_URING_H)
1009                #if defined(HAVE_PREADV2)
1010                        if( /*func == (fptr_t)preadv2 || */
1011                                func == (fptr_t)cfa_preadv2 )
1012                                #define _CFA_IO_FEATURE_IORING_OP_READV ,
1013                                return IS_DEFINED(IORING_OP_READV);
1014                #endif
1015
1016                #if defined(HAVE_PWRITEV2)
1017                        if( /*func == (fptr_t)pwritev2 || */
1018                                func == (fptr_t)cfa_pwritev2 )
1019                                #define _CFA_IO_FEATURE_IORING_OP_WRITEV ,
1020                                return IS_DEFINED(IORING_OP_WRITEV);
1021                #endif
1022
1023                if( /*func == (fptr_t)fsync || */
1024                        func == (fptr_t)cfa_fsync )
1025                        #define _CFA_IO_FEATURE_IORING_OP_FSYNC ,
1026                        return IS_DEFINED(IORING_OP_FSYNC);
1027
1028                if( /*func == (fptr_t)ync_file_range || */
1029                        func == (fptr_t)cfa_sync_file_range )
1030                        #define _CFA_IO_FEATURE_IORING_OP_SYNC_FILE_RANGE ,
1031                        return IS_DEFINED(IORING_OP_SYNC_FILE_RANGE);
1032
1033                if( /*func == (fptr_t)sendmsg || */
1034                        func == (fptr_t)cfa_sendmsg )
1035                        #define _CFA_IO_FEATURE_IORING_OP_SENDMSG ,
1036                        return IS_DEFINED(IORING_OP_SENDMSG);
1037
1038                if( /*func == (fptr_t)recvmsg || */
1039                        func == (fptr_t)cfa_recvmsg )
1040                        #define _CFA_IO_FEATURE_IORING_OP_RECVMSG ,
1041                        return IS_DEFINED(IORING_OP_RECVMSG);
1042
1043                if( /*func == (fptr_t)send || */
1044                        func == (fptr_t)cfa_send )
1045                        #define _CFA_IO_FEATURE_IORING_OP_SEND ,
1046                        return IS_DEFINED(IORING_OP_SEND);
1047
1048                if( /*func == (fptr_t)recv || */
1049                        func == (fptr_t)cfa_recv )
1050                        #define _CFA_IO_FEATURE_IORING_OP_RECV ,
1051                        return IS_DEFINED(IORING_OP_RECV);
1052
1053                if( /*func == (fptr_t)accept4 || */
1054                        func == (fptr_t)cfa_accept4 )
1055                        #define _CFA_IO_FEATURE_IORING_OP_ACCEPT ,
1056                        return IS_DEFINED(IORING_OP_ACCEPT);
1057
1058                if( /*func == (fptr_t)connect || */
1059                        func == (fptr_t)cfa_connect )
1060                        #define _CFA_IO_FEATURE_IORING_OP_CONNECT ,
1061                        return IS_DEFINED(IORING_OP_CONNECT);
1062
1063                if( /*func == (fptr_t)fallocate || */
1064                        func == (fptr_t)cfa_fallocate )
1065                        #define _CFA_IO_FEATURE_IORING_OP_FALLOCATE ,
1066                        return IS_DEFINED(IORING_OP_FALLOCATE);
1067
1068                if( /*func == (fptr_t)posix_fadvise || */
1069                        func == (fptr_t)cfa_fadvise )
1070                        #define _CFA_IO_FEATURE_IORING_OP_FADVISE ,
1071                        return IS_DEFINED(IORING_OP_FADVISE);
1072
1073                if( /*func == (fptr_t)madvise || */
1074                        func == (fptr_t)cfa_madvise )
1075                        #define _CFA_IO_FEATURE_IORING_OP_MADVISE ,
1076                        return IS_DEFINED(IORING_OP_MADVISE);
1077
1078                if( /*func == (fptr_t)openat || */
1079                        func == (fptr_t)cfa_openat )
1080                        #define _CFA_IO_FEATURE_IORING_OP_OPENAT ,
1081                        return IS_DEFINED(IORING_OP_OPENAT);
1082
1083                if( /*func == (fptr_t)close || */
1084                        func == (fptr_t)cfa_close )
1085                        #define _CFA_IO_FEATURE_IORING_OP_CLOSE ,
1086                        return IS_DEFINED(IORING_OP_CLOSE);
1087
1088                if( /*func == (fptr_t)read || */
1089                        func == (fptr_t)cfa_read )
1090                        #define _CFA_IO_FEATURE_IORING_OP_READ ,
1091                        return IS_DEFINED(IORING_OP_READ);
1092
1093                if( /*func == (fptr_t)write || */
1094                        func == (fptr_t)cfa_write )
1095                        #define _CFA_IO_FEATURE_IORING_OP_WRITE ,
1096                        return IS_DEFINED(IORING_OP_WRITE);
1097        #endif
1098
1099        return false;
1100}
Note: See TracBrowser for help on using the repository browser.