source: libcfa/src/concurrency/io.cfa @ 04b73b6

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 04b73b6 was 5751a56, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Changed handling of io_uring support to handle op codes in configure.
Kernel probing for op codes, not supported yet.

  • Property mode set to 100644
File size: 29.9 KB
RevLine 
[ecf6b46]1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Thu Apr 23 17:31:00 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
[20ab637]16#if defined(__CFA_DEBUG__)
17        // #define __CFA_DEBUG_PRINT_IO__
[5751a56]18        // #define __CFA_DEBUG_PRINT_IO_CORE__
[20ab637]19#endif
[4069faad]20
[92976d9]21#include "kernel.hfa"
[5c581cc]22#include "bitmanip.hfa"
[92976d9]23
[5751a56]24#if !defined(CFA_HAVE_LINUX_IO_URING_H)
[dd4e2d7]25        void __kernel_io_startup( cluster &, unsigned, bool ) {
[92976d9]26                // Nothing to do without io_uring
27        }
28
[3f7d0b4]29        void __kernel_io_finish_start( cluster & ) {
[f6660520]30                // Nothing to do without io_uring
31        }
32
[3f7d0b4]33        void __kernel_io_prepare_stop( cluster & ) {
[f6660520]34                // Nothing to do without io_uring
35        }
36
[3f7d0b4]37        void __kernel_io_shutdown( cluster &, bool ) {
[92976d9]38                // Nothing to do without io_uring
39        }
40
41#else
[31bb2e1]42        #define _GNU_SOURCE         /* See feature_test_macros(7) */
43        #include <errno.h>
44        #include <stdint.h>
45        #include <string.h>
46        #include <unistd.h>
47        #include <sys/mman.h>
48
[92976d9]49        extern "C" {
50                #include <sys/syscall.h>
51
52                #include <linux/io_uring.h>
53        }
54
55        #include "bits/signal.hfa"
56        #include "kernel_private.hfa"
57        #include "thread.hfa"
58
59        uint32_t entries_per_cluster() {
60                return 256;
61        }
62
[f6660520]63        static void * __io_poller_slow( void * arg );
64
65        // Weirdly, some systems that do support io_uring don't actually define these
66        #ifdef __alpha__
67                /*
68                * alpha is the only exception, all other architectures
69                * have common numbers for new system calls.
70                */
71                #ifndef __NR_io_uring_setup
72                        #define __NR_io_uring_setup           535
73                #endif
74                #ifndef __NR_io_uring_enter
75                        #define __NR_io_uring_enter           536
76                #endif
77                #ifndef __NR_io_uring_register
78                        #define __NR_io_uring_register        537
79                #endif
80        #else /* !__alpha__ */
81                #ifndef __NR_io_uring_setup
82                        #define __NR_io_uring_setup           425
83                #endif
84                #ifndef __NR_io_uring_enter
85                        #define __NR_io_uring_enter           426
86                #endif
87                #ifndef __NR_io_uring_register
88                        #define __NR_io_uring_register        427
89                #endif
90        #endif
91
[61dd73d]92        // Fast poller user-thread
93        // Not using the "thread" keyword because we want to control
94        // more carefully when to start/stop it
95        struct __io_poller_fast {
96                struct __io_data * ring;
97                $thread thrd;
98        };
99
100        void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
101                this.ring = cltr.io;
102                (this.thrd){ "Fast I/O Poller", cltr };
103        }
104        void ^?{}( __io_poller_fast & mutex this );
105        void main( __io_poller_fast & this );
106        static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
107        void ^?{}( __io_poller_fast & mutex this ) {}
108
109        struct __submition_data {
110                // Head and tail of the ring (associated with array)
111                volatile uint32_t * head;
112                volatile uint32_t * tail;
[34b61882]113                volatile uint32_t prev_head;
[61dd73d]114
115                // The actual kernel ring which uses head/tail
116                // indexes into the sqes arrays
117                uint32_t * array;
118
119                // number of entries and mask to go with it
120                const uint32_t * num;
121                const uint32_t * mask;
122
123                // Submission flags (Not sure what for)
124                uint32_t * flags;
125
126                // number of sqes not submitted (whatever that means)
127                uint32_t * dropped;
128
129                // Like head/tail but not seen by the kernel
[5dadc9b]130                volatile uint32_t * ready;
131                uint32_t ready_cnt;
[61dd73d]132
133                __spinlock_t lock;
[732b406]134                __spinlock_t release_lock;
[61dd73d]135
136                // A buffer of sqes (not the actual ring)
137                struct io_uring_sqe * sqes;
138
139                // The location and size of the mmaped area
140                void * ring_ptr;
141                size_t ring_sz;
142        };
143
144        struct __completion_data {
145                // Head and tail of the ring
146                volatile uint32_t * head;
147                volatile uint32_t * tail;
148
149                // number of entries and mask to go with it
150                const uint32_t * mask;
151                const uint32_t * num;
152
153                // number of cqes not submitted (whatever that means)
154                uint32_t * overflow;
155
156                // the kernel ring
157                struct io_uring_cqe * cqes;
158
159                // The location and size of the mmaped area
160                void * ring_ptr;
161                size_t ring_sz;
162        };
163
164        struct __io_data {
165                struct __submition_data submit_q;
166                struct __completion_data completion_q;
[b6f2b213]167                uint32_t ring_flags;
168                int cltr_flags;
[61dd73d]169                int fd;
170                semaphore submit;
171                volatile bool done;
172                struct {
173                        struct {
[13c5e19]174                                __processor_id_t id;
[61dd73d]175                                void * stack;
176                                pthread_t kthrd;
[5c581cc]177                                volatile bool blocked;
[61dd73d]178                        } slow;
179                        __io_poller_fast fast;
180                        __bin_sem_t sem;
181                } poller;
182        };
[185efe6]183
[92976d9]184//=============================================================================================
185// I/O Startup / Shutdown logic
186//=============================================================================================
[dd4e2d7]187        void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
[e46c753]188                if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) {
189                        abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n");
190                }
191
[61dd73d]192                this.io = malloc();
193
[92976d9]194                // Step 1 : call to setup
195                struct io_uring_params params;
196                memset(&params, 0, sizeof(params));
[47746a2]197                if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS   ) params.flags |= IORING_SETUP_SQPOLL;
198                if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES ) params.flags |= IORING_SETUP_IOPOLL;
[92976d9]199
[2d8f7b0]200                uint32_t nentries = entries_per_cluster();
201
202                int fd = syscall(__NR_io_uring_setup, nentries, &params );
[92976d9]203                if(fd < 0) {
204                        abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
205                }
206
207                // Step 2 : mmap result
[61dd73d]208                memset( this.io, 0, sizeof(struct __io_data) );
209                struct __submition_data  & sq = this.io->submit_q;
210                struct __completion_data & cq = this.io->completion_q;
[92976d9]211
212                // calculate the right ring size
[2d8f7b0]213                sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
214                cq.ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
[92976d9]215
216                // Requires features
[d384787]217                #if defined(IORING_FEAT_SINGLE_MMAP)
218                        // adjust the size according to the parameters
219                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
[fb98462]220                                cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz);
[d384787]221                        }
222                #endif
[92976d9]223
224                // mmap the Submit Queue into existence
[2d8f7b0]225                sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
226                if (sq.ring_ptr == (void*)MAP_FAILED) {
[92976d9]227                        abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
228                }
229
230                // Requires features
[d384787]231                #if defined(IORING_FEAT_SINGLE_MMAP)
232                        // mmap the Completion Queue into existence (may or may not be needed)
233                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
[fb98462]234                                cq.ring_ptr = sq.ring_ptr;
[d384787]235                        }
236                        else
237                #endif
238                {
[92976d9]239                        // We need multiple call to MMAP
[2d8f7b0]240                        cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
241                        if (cq.ring_ptr == (void*)MAP_FAILED) {
242                                munmap(sq.ring_ptr, sq.ring_sz);
[92976d9]243                                abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
244                        }
[d384787]245                }
[92976d9]246
247                // mmap the submit queue entries
248                size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
[2d8f7b0]249                sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
250                if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
251                        munmap(sq.ring_ptr, sq.ring_sz);
252                        if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
[92976d9]253                        abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
254                }
255
256                // Get the pointers from the kernel to fill the structure
257                // submit queue
[2d8f7b0]258                sq.head    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
259                sq.tail    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
260                sq.mask    = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
261                sq.num     = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
262                sq.flags   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
263                sq.dropped = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
264                sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
[34b61882]265                sq.prev_head = *sq.head;
[6f121b8]266
267                {
268                        const uint32_t num = *sq.num;
269                        for( i; num ) {
270                                sq.sqes[i].user_data = 0ul64;
271                        }
272                }
[5dadc9b]273
[47746a2]274                (sq.lock){};
[732b406]275                (sq.release_lock){};
[47746a2]276
[e46c753]277                if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) {
[5c581cc]278                        /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8)  );
[dd4e2d7]279                        sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
[0335620]280                        sq.ready = alloc_align( 64, sq.ready_cnt );
[5dadc9b]281                        for(i; sq.ready_cnt) {
282                                sq.ready[i] = -1ul32;
283                        }
284                }
285                else {
286                        sq.ready_cnt = 0;
287                        sq.ready = 0p;
288                }
[92976d9]289
290                // completion queue
[2d8f7b0]291                cq.head     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
292                cq.tail     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
293                cq.mask     = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
294                cq.num      = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
295                cq.overflow = (         uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
296                cq.cqes   = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
297
298                // some paranoid checks
299                /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
300                /* paranoid */ verifyf( (*cq.num)  >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
301                /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
302                /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
303
304                /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
305                /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
306                /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
307                /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
[92976d9]308
309                // Update the global ring info
[b6f2b213]310                this.io->ring_flags = params.flags;
311                this.io->cltr_flags = io_flags;
312                this.io->fd         = fd;
313                this.io->done       = false;
[61dd73d]314                (this.io->submit){ min(*sq.num, *cq.num) };
[92976d9]315
[f6660520]316                if(!main_cluster) {
317                        __kernel_io_finish_start( this );
318                }
319        }
320
321        void __kernel_io_finish_start( cluster & this ) {
[b6f2b213]322                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
323                        __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
324                        (this.io->poller.fast){ this };
325                        __thrd_start( this.io->poller.fast, main );
326                }
[f6660520]327
[92976d9]328                // Create the poller thread
[20ab637]329                __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this);
[5c581cc]330                this.io->poller.slow.blocked = false;
[61dd73d]331                this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
[92976d9]332        }
333
[f6660520]334        void __kernel_io_prepare_stop( cluster & this ) {
[0a805f2]335                __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
[92976d9]336                // Notify the poller thread of the shutdown
[61dd73d]337                __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
[f6660520]338
339                // Stop the IO Poller
[92976d9]340                sigval val = { 1 };
[61dd73d]341                pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
342                post( this.io->poller.sem );
[92976d9]343
344                // Wait for the poller thread to finish
[61dd73d]345                pthread_join( this.io->poller.slow.kthrd, 0p );
346                free( this.io->poller.slow.stack );
[f6660520]347
[0a805f2]348                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
[4069faad]349
[b6f2b213]350                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
[05cfa4d]351                        with( this.io->poller.fast ) {
[13c5e19]352                                /* paranoid */ verify( this.nprocessors == 0 || &this == mainCluster );
353                                /* paranoid */ verify( !ready_mutate_islocked() );
[05cfa4d]354
355                                // We need to adjust the clean-up based on where the thread is
[5dadc9b]356                                if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
[05cfa4d]357
[13c5e19]358                                        ready_schedule_lock( (struct __processor_id_t *)active_processor() );
[2f1cb37]359
[13c5e19]360                                                // This is the tricky case
361                                                // The thread was preempted and now it is on the ready queue
362                                                // The thread should be the last on the list
363                                                /* paranoid */ verify( thrd.link.next != 0p );
[05cfa4d]364
[13c5e19]365                                                // Remove the thread from the ready queue of this cluster
366                                                __attribute__((unused)) bool removed = remove_head( &this, &thrd );
367                                                /* paranoid */ verify( removed );
368                                                thrd.link.next = 0p;
369                                                thrd.link.prev = 0p;
370                                                __cfaabi_dbg_debug_do( thrd.unpark_stale = true );
[05cfa4d]371
[13c5e19]372                                                // Fixup the thread state
373                                                thrd.state = Blocked;
374                                                thrd.ticket = 0;
375                                                thrd.preempted = __NO_PREEMPTION;
376
377                                        ready_schedule_unlock( (struct __processor_id_t *)active_processor() );
[05cfa4d]378
379                                        // Pretend like the thread was blocked all along
380                                }
381                                // !!! This is not an else if !!!
382                                if( thrd.state == Blocked ) {
[6502a2b]383
[05cfa4d]384                                        // This is the "easy case"
385                                        // The thread is parked and can easily be moved to active cluster
386                                        verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
387                                        thrd.curr_cluster = active_cluster();
[6502a2b]388
[13c5e19]389                                        // unpark the fast io_poller
[05cfa4d]390                                        unpark( &thrd __cfaabi_dbg_ctx2 );
391                                }
392                                else {
393
394                                        // The thread is in a weird state
395                                        // I don't know what to do here
396                                        abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n");
397                                }
398
399                        }
[f6660520]400
[61dd73d]401                        ^(this.io->poller.fast){};
[4069faad]402
[0a805f2]403                        __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
[b6f2b213]404                }
[f6660520]405        }
406
407        void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
408                if(!main_cluster) {
409                        __kernel_io_prepare_stop( this );
410                }
[92976d9]411
412                // Shutdown the io rings
[61dd73d]413                struct __submition_data  & sq = this.io->submit_q;
414                struct __completion_data & cq = this.io->completion_q;
[92976d9]415
416                // unmap the submit queue entries
[2d8f7b0]417                munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
[92976d9]418
419                // unmap the Submit Queue ring
420                munmap(sq.ring_ptr, sq.ring_sz);
421
422                // unmap the Completion Queue ring, if it is different
423                if (cq.ring_ptr != sq.ring_ptr) {
424                        munmap(cq.ring_ptr, cq.ring_sz);
425                }
426
427                // close the file descriptor
[61dd73d]428                close(this.io->fd);
429
[5dadc9b]430                free( this.io->submit_q.ready ); // Maybe null, doesn't matter
[61dd73d]431                free( this.io );
[92976d9]432        }
433
[20ab637]434        int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) {
435                bool need_sys_to_submit = false;
436                bool need_sys_to_complete = false;
437                unsigned min_complete = 0;
438                unsigned flags = 0;
439
440
441                TO_SUBMIT:
442                if( to_submit > 0 ) {
443                        if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
444                                need_sys_to_submit = true;
445                                break TO_SUBMIT;
446                        }
447                        if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
448                                need_sys_to_submit = true;
449                                flags |= IORING_ENTER_SQ_WAKEUP;
450                        }
451                }
452
453                TO_COMPLETE:
454                if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
455                        flags |= IORING_ENTER_GETEVENTS;
456                        if( mask ) {
457                                need_sys_to_complete = true;
458                                min_complete = 1;
459                                break TO_COMPLETE;
460                        }
461                        if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
462                                need_sys_to_complete = true;
463                        }
464                }
465
466                int ret = 0;
467                if( need_sys_to_submit || need_sys_to_complete ) {
468                        ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);
469                        if( ret < 0 ) {
470                                switch((int)errno) {
471                                case EAGAIN:
472                                case EINTR:
473                                        ret = -1;
474                                        break;
475                                default:
476                                        abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
477                                }
478                        }
479                }
480
481                // Memory barrier
482                __atomic_thread_fence( __ATOMIC_SEQ_CST );
483                return ret;
484        }
485
[92976d9]486//=============================================================================================
487// I/O Polling
488//=============================================================================================
[1d5e4711]489        static unsigned __collect_submitions( struct __io_data & ring );
[34b61882]490        static uint32_t __release_consumed_submission( struct __io_data & ring );
[1d5e4711]491
[5751a56]492        static inline void process(struct io_uring_cqe & cqe, struct __processor_id_t * id ) {
493                struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data;
494                __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
495
496                data->result = cqe.res;
497                if(!id) { unpark(     data->thrd __cfaabi_dbg_ctx2 ); }
498                else  { __unpark( id, data->thrd __cfaabi_dbg_ctx2 ); }
499        }
500
[92976d9]501        // Process a single completion message from the io_uring
502        // This is NOT thread-safe
[20ab637]503        static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) {
[e46c753]504                /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
505
[5dadc9b]506                unsigned to_submit = 0;
507                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
508                        // If the poller thread also submits, then we need to aggregate the submissions which are ready
[e46c753]509                        to_submit = __collect_submitions( ring );
[5dadc9b]510                }
511
[20ab637]512                int ret = __io_uring_enter(ring, to_submit, true, mask);
513                if( ret < 0 ) {
514                        return [0, true];
515                }
[1d5e4711]516
[20ab637]517                // update statistics
518                if (to_submit > 0) {
[1d5e4711]519                        __STATS__( true,
520                                if( to_submit > 0 ) {
521                                        io.submit_q.submit_avg.rdy += to_submit;
522                                        io.submit_q.submit_avg.csm += ret;
523                                        io.submit_q.submit_avg.cnt += 1;
524                                }
525                        )
[6f121b8]526                }
527
[20ab637]528                // Release the consumed SQEs
529                __release_consumed_submission( ring );
[6f121b8]530
[d384787]531                // Drain the queue
[92976d9]532                unsigned head = *ring.completion_q.head;
[6f121b8]533                unsigned tail = *ring.completion_q.tail;
534                const uint32_t mask = *ring.completion_q.mask;
535
[d384787]536                // Nothing was new return 0
537                if (head == tail) {
[e46c753]538                        return [0, to_submit > 0];
[d384787]539                }
[92976d9]540
[d384787]541                uint32_t count = tail - head;
[1d5e4711]542                /* paranoid */ verify( count != 0 );
[d384787]543                for(i; count) {
[6f121b8]544                        unsigned idx = (head + i) & mask;
[d384787]545                        struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
[92976d9]546
[d384787]547                        /* paranoid */ verify(&cqe);
[92976d9]548
[5751a56]549                        process( cqe, !mask ? (struct __processor_id_t *)0p : &ring.poller.slow.id );
[d384787]550                }
[2d8f7b0]551
552                // Allow new submissions to happen
[6f121b8]553                // V(ring.submit, count);
[92976d9]554
555                // Mark to the kernel that the cqe has been seen
556                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
[6f121b8]557                __atomic_thread_fence( __ATOMIC_SEQ_CST );
[d384787]558                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
[92976d9]559
[5dadc9b]560                return [count, count > 0 || to_submit > 0];
[92976d9]561        }
562
[f6660520]563        static void * __io_poller_slow( void * arg ) {
[13c5e19]564                #if !defined( __CFA_NO_STATISTICS__ )
565                        __stats_t local_stats;
566                        __init_stats( &local_stats );
567                        kernelTLS.this_stats = &local_stats;
568                #endif
569
[92976d9]570                cluster * cltr = (cluster *)arg;
[61dd73d]571                struct __io_data & ring = *cltr->io;
[92976d9]572
[13c5e19]573                ring.poller.slow.id.id = doregister( &ring.poller.slow.id );
574
[92976d9]575                sigset_t mask;
576                sigfillset(&mask);
577                if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
578                        abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
579                }
580
581                sigdelset( &mask, SIGUSR1 );
582
583                verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
584                verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
585
[1539bbd]586                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
587
[b6f2b213]588                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
589                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
[5dadc9b]590
[5c581cc]591                                __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST );
592
[f6660520]593                                // In the user-thread approach drain and if anything was drained,
594                                // batton pass to the user-thread
[5dadc9b]595                                int count;
596                                bool again;
[20ab637]597                                [count, again] = __drain_io( ring, &mask );
[5c581cc]598
599                                __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
[3c039b0]600
601                                // Update statistics
[47746a2]602                                __STATS__( true,
603                                        io.complete_q.completed_avg.val += count;
604                                        io.complete_q.completed_avg.slow_cnt += 1;
605                                )
[3c039b0]606
[5dadc9b]607                                if(again) {
[0a805f2]608                                        __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
[13c5e19]609                                        __unpark( &ring.poller.slow.id, &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
[f6660520]610                                        wait( ring.poller.sem );
611                                }
[b6f2b213]612                        }
613                }
614                else {
615                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
[f6660520]616                                //In the naive approach, just poll the io completion queue directly
[5dadc9b]617                                int count;
618                                bool again;
[20ab637]619                                [count, again] = __drain_io( ring, &mask );
[3c039b0]620
621                                // Update statistics
[47746a2]622                                __STATS__( true,
623                                        io.complete_q.completed_avg.val += count;
624                                        io.complete_q.completed_avg.slow_cnt += 1;
625                                )
[b6f2b213]626                        }
[92976d9]627                }
628
[1539bbd]629                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
630
[13c5e19]631                unregister( &ring.poller.slow.id );
632
[df40a56]633                #if !defined(__CFA_NO_STATISTICS__)
634                        __tally_stats(cltr->stats, &local_stats);
635                #endif
636
[92976d9]637                return 0p;
638        }
639
[61dd73d]640        void main( __io_poller_fast & this ) {
[b6f2b213]641                verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD );
642
[61dd73d]643                // Start parked
644                park( __cfaabi_dbg_ctx );
[f6660520]645
[61dd73d]646                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
[1539bbd]647
[4e74466]648                int reset = 0;
649
[61dd73d]650                // Then loop until we need to start
651                while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
[5dadc9b]652
[61dd73d]653                        // Drain the io
[5dadc9b]654                        int count;
655                        bool again;
[13c5e19]656                        disable_interrupts();
[20ab637]657                                [count, again] = __drain_io( *this.ring, 0p );
[5dadc9b]658
[13c5e19]659                                if(!again) reset++;
[3c039b0]660
[13c5e19]661                                // Update statistics
[47746a2]662                                __STATS__( true,
663                                        io.complete_q.completed_avg.val += count;
664                                        io.complete_q.completed_avg.fast_cnt += 1;
665                                )
[13c5e19]666                        enable_interrupts( __cfaabi_dbg_ctx );
[3c039b0]667
[5dadc9b]668                        // If we got something, just yield and check again
[4e74466]669                        if(reset < 5) {
[61dd73d]670                                yield();
671                        }
[5dadc9b]672                        // We didn't get anything baton pass to the slow poller
[61dd73d]673                        else {
674                                __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
[5dadc9b]675                                reset = 0;
676
677                                // wake up the slow poller
[61dd73d]678                                post( this.ring->poller.sem );
[5dadc9b]679
680                                // park this thread
[61dd73d]681                                park( __cfaabi_dbg_ctx );
[f6660520]682                        }
683                }
[61dd73d]684
685                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
686        }
[f6660520]687
[0335620]688        static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
[5dadc9b]689        static inline void __wake_poller( struct __io_data & ring ) {
[5c581cc]690                if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;
691
692                sigval val = { 1 };
693                pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
[5dadc9b]694        }
695
[92976d9]696//=============================================================================================
697// I/O Submissions
698//=============================================================================================
699
[2d8f7b0]700// Submition steps :
[e46c753]701// 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones
[2d8f7b0]702//     listed in sq.array are visible by the kernel. For those not listed, the kernel does not
703//     offer any assurance that an entry is not being filled by multiple flags. Therefore, we
704//     need to write an allocator that allows allocating concurrently.
705//
[e46c753]706// 2 - Actually fill the submit entry, this is the only simple and straightforward step.
[2d8f7b0]707//
[e46c753]708// 3 - Append the entry index to the array and adjust the tail accordingly. This operation
[2d8f7b0]709//     needs to arrive to two concensus at the same time:
710//     A - The order in which entries are listed in the array: no two threads must pick the
711//         same index for their entries
712//     B - When can the tail be update for the kernel. EVERY entries in the array between
713//         head and tail must be fully filled and shouldn't ever be touched again.
714//
715
[31bb2e1]716        [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) {
[e46c753]717                /* paranoid */ verify( data != 0 );
[13c5e19]718
[6f121b8]719                // Prepare the data we need
720                __attribute((unused)) int len   = 0;
721                __attribute((unused)) int block = 0;
722                uint32_t cnt = *ring.submit_q.num;
723                uint32_t mask = *ring.submit_q.mask;
[8ae4165]724
725                disable_interrupts();
726                        uint32_t off = __tls_rand();
727                enable_interrupts( __cfaabi_dbg_ctx );
[6f121b8]728
729                // Loop around looking for an available spot
[13c5e19]730                for() {
[6f121b8]731                        // Look through the list starting at some offset
732                        for(i; cnt) {
733                                uint64_t expected = 0;
734                                uint32_t idx = (i + off) & mask;
735                                struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
736                                volatile uint64_t * udata = &sqe->user_data;
737
738                                if( *udata == expected &&
739                                        __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
740                                {
741                                        // update statistics
[47746a2]742                                        __STATS__( false,
743                                                io.submit_q.alloc_avg.val   += len;
744                                                io.submit_q.alloc_avg.block += block;
745                                                io.submit_q.alloc_avg.cnt   += 1;
746                                        )
[6f121b8]747
[13c5e19]748
[6f121b8]749                                        // Success return the data
750                                        return [sqe, idx];
751                                }
752                                verify(expected != data);
[2489d31]753
[6f121b8]754                                len ++;
755                        }
[2489d31]756
[6f121b8]757                        block++;
758                        yield();
759                }
[2489d31]760        }
761
[df40a56]762        static inline uint32_t __submit_to_ready_array( struct __io_data & ring, uint32_t idx, const uint32_t mask ) {
763                /* paranoid */ verify( idx <= mask   );
764                /* paranoid */ verify( idx != -1ul32 );
765
766                // We need to find a spot in the ready array
767                __attribute((unused)) int len   = 0;
768                __attribute((unused)) int block = 0;
769                uint32_t ready_mask = ring.submit_q.ready_cnt - 1;
770
771                disable_interrupts();
772                        uint32_t off = __tls_rand();
773                enable_interrupts( __cfaabi_dbg_ctx );
774
775                uint32_t picked;
776                LOOKING: for() {
777                        for(i; ring.submit_q.ready_cnt) {
778                                picked = (i + off) & ready_mask;
779                                uint32_t expected = -1ul32;
780                                if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
781                                        break LOOKING;
782                                }
783                                verify(expected != idx);
784
785                                len ++;
786                        }
787
788                        block++;
[34b61882]789                        if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
790                                __release_consumed_submission( ring );
791                                unlock( ring.submit_q.lock );
792                        }
793                        else {
794                                yield();
795                        }
[df40a56]796                }
797
798                // update statistics
[47746a2]799                __STATS__( false,
800                        io.submit_q.look_avg.val   += len;
801                        io.submit_q.look_avg.block += block;
802                        io.submit_q.look_avg.cnt   += 1;
803                )
[df40a56]804
805                return picked;
806        }
807
[31bb2e1]808        void __submit( struct __io_data & ring, uint32_t idx ) {
[5dadc9b]809                // Get now the data we definetely need
810                uint32_t * const tail = ring.submit_q.tail;
[2489d31]811                const uint32_t mask = *ring.submit_q.mask;
812
[5dadc9b]813                // There are 2 submission schemes, check which one we are using
814                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
815                        // If the poller thread submits, then we just need to add this to the ready array
[df40a56]816                        __submit_to_ready_array( ring, idx, mask );
[5dadc9b]817
818                        __wake_poller( ring );
819
[dd4e2d7]820                        __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
[2d8f7b0]821                }
[e46c753]822                else if( ring.cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS ) {
823                        uint32_t picked = __submit_to_ready_array( ring, idx, mask );
824
825                        for() {
826                                yield();
827
828                                // If some one else collected our index, we are done
[8bb239d]829                                #warning ABA problem
[e46c753]830                                if( ring.submit_q.ready[picked] != idx ) {
[47746a2]831                                        __STATS__( false,
832                                                io.submit_q.helped += 1;
833                                        )
[e46c753]834                                        return;
835                                }
836
837                                if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
[47746a2]838                                        __STATS__( false,
839                                                io.submit_q.leader += 1;
840                                        )
[e46c753]841                                        break;
842                                }
[8bb239d]843
[47746a2]844                                __STATS__( false,
845                                        io.submit_q.busy += 1;
846                                )
[e46c753]847                        }
848
849                        // We got the lock
850                        unsigned to_submit = __collect_submitions( ring );
[20ab637]851                        int ret = __io_uring_enter( ring, to_submit, false, 0p );
[e46c753]852                        if( ret < 0 ) {
[20ab637]853                                unlock(ring.submit_q.lock);
854                                return;
[e46c753]855                        }
856
[20ab637]857                        /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) );
[e46c753]858
859                        // Release the consumed SQEs
[34b61882]860                        __release_consumed_submission( ring );
[e46c753]861
862                        // update statistics
[47746a2]863                        __STATS__( true,
864                                io.submit_q.submit_avg.rdy += to_submit;
865                                io.submit_q.submit_avg.csm += ret;
866                                io.submit_q.submit_avg.cnt += 1;
867                        )
[e46c753]868
869                        unlock(ring.submit_q.lock);
870                }
[5dadc9b]871                else {
872                        // get mutual exclusion
873                        lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
[2489d31]874
[20ab637]875                        /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
876                        /* paranoid */  "index %u already reclaimed\n"
877                        /* paranoid */  "head %u, prev %u, tail %u\n"
878                        /* paranoid */  "[-0: %u,-1: %u,-2: %u,-3: %u]\n",
879                        /* paranoid */  idx,
880                        /* paranoid */  *ring.submit_q.head, ring.submit_q.prev_head, *tail
881                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
882                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
883                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
884                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
885                        /* paranoid */ );
886
[5dadc9b]887                        // Append to the list of ready entries
888
889                        /* paranoid */ verify( idx <= mask );
[20ab637]890                        ring.submit_q.array[ (*tail) & mask ] = idx;
[5dadc9b]891                        __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
[d384787]892
[5dadc9b]893                        // Submit however, many entries need to be submitted
[20ab637]894                        int ret = __io_uring_enter( ring, 1, false, 0p );
[5dadc9b]895                        if( ret < 0 ) {
896                                switch((int)errno) {
897                                default:
898                                        abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
899                                }
900                        }
[d384787]901
[5dadc9b]902                        // update statistics
[47746a2]903                        __STATS__( false,
904                                io.submit_q.submit_avg.csm += 1;
905                                io.submit_q.submit_avg.cnt += 1;
906                        )
[5dadc9b]907
[34b61882]908                        // Release the consumed SQEs
909                        __release_consumed_submission( ring );
[7bfc849]910
[5dadc9b]911                        unlock(ring.submit_q.lock);
[dd4e2d7]912
913                        __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
[5dadc9b]914                }
[2489d31]915        }
[e46c753]916
917        static unsigned __collect_submitions( struct __io_data & ring ) {
918                /* paranoid */ verify( ring.submit_q.ready != 0p );
919                /* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
920
921                unsigned to_submit = 0;
922                uint32_t tail = *ring.submit_q.tail;
923                const uint32_t mask = *ring.submit_q.mask;
924
925                // Go through the list of ready submissions
926                for( i; ring.submit_q.ready_cnt ) {
927                        // replace any submission with the sentinel, to consume it.
928                        uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
929
930                        // If it was already the sentinel, then we are done
931                        if( idx == -1ul32 ) continue;
932
933                        // If we got a real submission, append it to the list
934                        ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
935                        to_submit++;
936                }
937
938                // Increment the tail based on how many we are ready to submit
939                __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
940
941                return to_submit;
942        }
[34b61882]943
944        static uint32_t __release_consumed_submission( struct __io_data & ring ) {
945                const uint32_t smask = *ring.submit_q.mask;
[732b406]946
947                if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
[34b61882]948                uint32_t chead = *ring.submit_q.head;
949                uint32_t phead = ring.submit_q.prev_head;
950                ring.submit_q.prev_head = chead;
[732b406]951                unlock(ring.submit_q.release_lock);
952
[34b61882]953                uint32_t count = chead - phead;
954                for( i; count ) {
955                        uint32_t idx = ring.submit_q.array[ (phead + i) & smask ];
956                        ring.submit_q.sqes[ idx ].user_data = 0;
957                }
958                return count;
959        }
[20ab637]960
961//=============================================================================================
962// I/O Submissions
963//=============================================================================================
964
965        void register_fixed_files( cluster & cl, int * files, unsigned count ) {
966                int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );
967                if( ret < 0 ) {
968                        abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
969                }
970
971                __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
972        }
[47746a2]973#endif
Note: See TracBrowser for help on using the repository browser.