source: libcfa/src/concurrency/io.cfa @ f00b26d4

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since f00b26d4 was f00b26d4, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Re-worked IO to use epoll and support multiple io_contexts per cluster.
Also redid how cluster options are handled.
Changed how iofwd calls are passed to support future features and io_contexts rework.

  • Property mode set to 100644
File size: 29.3 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Thu Apr 23 17:31:00 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#if defined(__CFA_DEBUG__)
17        // #define __CFA_DEBUG_PRINT_IO__
18        #define __CFA_DEBUG_PRINT_IO_CORE__
19#endif
20
21#include "kernel_private.hfa"
22#include "bitmanip.hfa"
23
24#if !defined(CFA_HAVE_LINUX_IO_URING_H)
25        void __kernel_io_startup() {
26                // Nothing to do without io_uring
27        }
28
29        void __kernel_io_shutdown() {
30                // Nothing to do without io_uring
31        }
32
33        void ?{}(io_context & this, struct cluster & cl) {}
34        void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {}
35
36        void ^?{}(io_context & this) {}
37        void ^?{}(io_context & this, bool cluster_context) {}
38
39#else
40        #define _GNU_SOURCE         /* See feature_test_macros(7) */
41        #include <errno.h>
42        #include <stdint.h>
43        #include <string.h>
44        #include <unistd.h>
45
46        extern "C" {
47                #include <sys/epoll.h>
48                #include <sys/mman.h>
49                #include <sys/syscall.h>
50
51                #include <linux/io_uring.h>
52        }
53
54        #include "bits/signal.hfa"
55        #include "kernel_private.hfa"
56        #include "thread.hfa"
57
58        void ?{}(io_context_params & this) {
59                this.num_entries = 256;
60                this.num_ready = 256;
61                this.submit_aff = -1;
62                this.eager_submits = false;
63                this.poller_submits = false;
64                this.poll_submit = false;
65                this.poll_complete = false;
66        }
67
68        static void * __io_poller_slow( void * arg );
69
70        // Weirdly, some systems that do support io_uring don't actually define these
71        #ifdef __alpha__
72                /*
73                * alpha is the only exception, all other architectures
74                * have common numbers for new system calls.
75                */
76                #ifndef __NR_io_uring_setup
77                        #define __NR_io_uring_setup           535
78                #endif
79                #ifndef __NR_io_uring_enter
80                        #define __NR_io_uring_enter           536
81                #endif
82                #ifndef __NR_io_uring_register
83                        #define __NR_io_uring_register        537
84                #endif
85        #else /* !__alpha__ */
86                #ifndef __NR_io_uring_setup
87                        #define __NR_io_uring_setup           425
88                #endif
89                #ifndef __NR_io_uring_enter
90                        #define __NR_io_uring_enter           426
91                #endif
92                #ifndef __NR_io_uring_register
93                        #define __NR_io_uring_register        427
94                #endif
95        #endif
96
97        struct __submition_data {
98                // Head and tail of the ring (associated with array)
99                volatile uint32_t * head;
100                volatile uint32_t * tail;
101                volatile uint32_t prev_head;
102
103                // The actual kernel ring which uses head/tail
104                // indexes into the sqes arrays
105                uint32_t * array;
106
107                // number of entries and mask to go with it
108                const uint32_t * num;
109                const uint32_t * mask;
110
111                // Submission flags (Not sure what for)
112                uint32_t * flags;
113
114                // number of sqes not submitted (whatever that means)
115                uint32_t * dropped;
116
117                // Like head/tail but not seen by the kernel
118                volatile uint32_t * ready;
119                uint32_t ready_cnt;
120
121                __spinlock_t lock;
122                __spinlock_t release_lock;
123
124                // A buffer of sqes (not the actual ring)
125                struct io_uring_sqe * sqes;
126
127                // The location and size of the mmaped area
128                void * ring_ptr;
129                size_t ring_sz;
130        };
131
132        struct __completion_data {
133                // Head and tail of the ring
134                volatile uint32_t * head;
135                volatile uint32_t * tail;
136
137                // number of entries and mask to go with it
138                const uint32_t * mask;
139                const uint32_t * num;
140
141                // number of cqes not submitted (whatever that means)
142                uint32_t * overflow;
143
144                // the kernel ring
145                struct io_uring_cqe * cqes;
146
147                // The location and size of the mmaped area
148                void * ring_ptr;
149                size_t ring_sz;
150        };
151
152        struct __io_data {
153                struct __submition_data submit_q;
154                struct __completion_data completion_q;
155                uint32_t ring_flags;
156                int fd;
157                bool eager_submits:1;
158                bool poller_submits:1;
159        };
160
161//=============================================================================================
162// I/O Startup / Shutdown logic + Master Poller
163//=============================================================================================
164
165// IO Master poller loop forward
166static void * iopoll_loop( __attribute__((unused)) void * args );
167
168static struct {
169        pthread_t     thrd;    // pthread handle to io poller thread
170        void *        stack;   // pthread stack for io poller thread
171        int           epollfd; // file descriptor to the epoll instance
172        volatile bool run;     // Whether or not to continue
173} iopoll;
174
175void __kernel_io_startup() {
176        __cfaabi_dbg_print_safe( "Kernel : Creating EPOLL instance\n" );
177
178        iopoll.epollfd = epoll_create1(0);
179      if (iopoll.epollfd == -1) {
180            abort( "internal error, epoll_create1\n");
181      }
182
183        __cfaabi_dbg_print_safe( "Kernel : Starting io poller thread\n" );
184
185        iopoll.run = true;
186        iopoll.stack = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );
187}
188
189void __kernel_io_shutdown() {
190        // Notify the io poller thread of the shutdown
191        iopoll.run = false;
192        sigval val = { 1 };
193        pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
194
195        // Wait for the io poller thread to finish
196
197        pthread_join( iopoll.thrd, 0p );
198        free( iopoll.stack );
199
200        int ret = close(iopoll.epollfd);
201      if (ret == -1) {
202            abort( "internal error, close epoll\n");
203      }
204
205        // Io polling is now fully stopped
206
207        __cfaabi_dbg_print_safe( "Kernel : IO poller stopped\n" );
208}
209
210static void * iopoll_loop( __attribute__((unused)) void * args ) {
211        __processor_id_t id;
212        id.id = doregister(&id);
213        __cfaabi_dbg_print_safe( "Kernel : IO poller thread starting\n" );
214
215        // Block signals to control when they arrive
216        sigset_t mask;
217        sigfillset(&mask);
218        if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
219            abort( "internal error, pthread_sigmask" );
220        }
221
222        sigdelset( &mask, SIGUSR1 );
223
224        // Create sufficient events
225        struct epoll_event events[10];
226        // Main loop
227        while( iopoll.run ) {
228                // Wait for events
229                int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
230
231                // Check if an error occured
232            if (nfds == -1) {
233                        if( errno == EINTR ) continue;
234                  abort( "internal error, pthread_sigmask" );
235            }
236
237                for(i; nfds) {
238                        $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64;
239                        /* paranoid */ verify( io_ctx );
240                        __cfadbg_print_safe(io_core, "Kernel I/O : Unparking io poller %p\n", io_ctx);
241                        #if !defined( __CFA_NO_STATISTICS__ )
242                                kernelTLS.this_stats = io_ctx->self.curr_cluster->stats;
243                        #endif
244                        __post( io_ctx->sem, &id );
245                }
246        }
247
248        __cfaabi_dbg_print_safe( "Kernel : IO poller thread stopping\n" );
249        unregister(&id);
250        return 0p;
251}
252
253//=============================================================================================
254// I/O Context Constrution/Destruction
255//=============================================================================================
256
257        void ?{}($io_ctx_thread & this, struct cluster & cl) { (this.self){ "IO Poller", cl }; }
258        void main( $io_ctx_thread & this );
259        static inline $thread * get_thread( $io_ctx_thread & this ) { return &this.self; }
260        void ^?{}( $io_ctx_thread & mutex this ) {}
261
262        static void __io_create ( __io_data & this, const io_context_params & params_in );
263        static void __io_destroy( __io_data & this );
264
265        void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {
266                (this.thrd){ cl };
267                this.thrd.ring = malloc();
268                __cfadbg_print_safe(io_core, "Kernel I/O : Creating ring for io_context %p\n", &this);
269                __io_create( *this.thrd.ring, params );
270
271                __cfadbg_print_safe(io_core, "Kernel I/O : Starting poller thread for io_context %p\n", &this);
272                this.thrd.done = false;
273                __thrd_start( this.thrd, main );
274
275                __cfadbg_print_safe(io_core, "Kernel I/O : io_context %p ready\n", &this);
276        }
277
278        void ?{}(io_context & this, struct cluster & cl) {
279                io_context_params params;
280                (this){ cl, params };
281        }
282
283        void ^?{}(io_context & this, bool cluster_context) {
284                __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %p\n", &this);
285
286                // Notify the thread of the shutdown
287                __atomic_store_n(&this.thrd.done, true, __ATOMIC_SEQ_CST);
288
289                // If this is an io_context within a cluster, things get trickier
290                $thread & thrd = this.thrd.self;
291                if( cluster_context ) {
292                        cluster & cltr = *thrd.curr_cluster;
293                        /* paranoid */ verify( cltr.nprocessors == 0 || &cltr == mainCluster );
294                        /* paranoid */ verify( !ready_mutate_islocked() );
295
296                        // We need to adjust the clean-up based on where the thread is
297                        if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
298
299                                ready_schedule_lock( (struct __processor_id_t *)active_processor() );
300
301                                        // This is the tricky case
302                                        // The thread was preempted and now it is on the ready queue
303                                        // The thread should be the last on the list
304                                        /* paranoid */ verify( thrd.link.next != 0p );
305
306                                        // Remove the thread from the ready queue of this cluster
307                                        __attribute__((unused)) bool removed = remove_head( &cltr, &thrd );
308                                        /* paranoid */ verify( removed );
309                                        thrd.link.next = 0p;
310                                        thrd.link.prev = 0p;
311                                        __cfaabi_dbg_debug_do( thrd.unpark_stale = true );
312
313                                        // Fixup the thread state
314                                        thrd.state = Blocked;
315                                        thrd.ticket = 0;
316                                        thrd.preempted = __NO_PREEMPTION;
317
318                                ready_schedule_unlock( (struct __processor_id_t *)active_processor() );
319
320                                // Pretend like the thread was blocked all along
321                        }
322                        // !!! This is not an else if !!!
323                        if( thrd.state == Blocked ) {
324
325                                // This is the "easy case"
326                                // The thread is parked and can easily be moved to active cluster
327                                verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
328                                thrd.curr_cluster = active_cluster();
329
330                                // unpark the fast io_poller
331                                unpark( &thrd __cfaabi_dbg_ctx2 );
332                        }
333                        else {
334
335                                // The thread is in a weird state
336                                // I don't know what to do here
337                                abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n");
338                        }
339                } else {
340                        unpark( &thrd __cfaabi_dbg_ctx2 );
341                }
342
343                ^(this.thrd){};
344                __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %p\n", &this);
345
346                __io_destroy( *this.thrd.ring );
347                __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %p\n", &this);
348
349                free(this.thrd.ring);
350        }
351
352        void ^?{}(io_context & this) {
353                ^(this){ false };
354        }
355
356        static void __io_create( __io_data & this, const io_context_params & params_in ) {
357                // Step 1 : call to setup
358                struct io_uring_params params;
359                memset(&params, 0, sizeof(params));
360                if( params_in.poll_submit   ) params.flags |= IORING_SETUP_SQPOLL;
361                if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL;
362
363                uint32_t nentries = params_in.num_entries;
364
365                int fd = syscall(__NR_io_uring_setup, nentries, &params );
366                if(fd < 0) {
367                        abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
368                }
369
370                // Step 2 : mmap result
371                memset( &this, 0, sizeof(struct __io_data) );
372                struct __submition_data  & sq = this.submit_q;
373                struct __completion_data & cq = this.completion_q;
374
375                // calculate the right ring size
376                sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
377                cq.ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
378
379                // Requires features
380                #if defined(IORING_FEAT_SINGLE_MMAP)
381                        // adjust the size according to the parameters
382                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
383                                cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz);
384                        }
385                #endif
386
387                // mmap the Submit Queue into existence
388                sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
389                if (sq.ring_ptr == (void*)MAP_FAILED) {
390                        abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
391                }
392
393                // Requires features
394                #if defined(IORING_FEAT_SINGLE_MMAP)
395                        // mmap the Completion Queue into existence (may or may not be needed)
396                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
397                                cq.ring_ptr = sq.ring_ptr;
398                        }
399                        else
400                #endif
401                {
402                        // We need multiple call to MMAP
403                        cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
404                        if (cq.ring_ptr == (void*)MAP_FAILED) {
405                                munmap(sq.ring_ptr, sq.ring_sz);
406                                abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
407                        }
408                }
409
410                // mmap the submit queue entries
411                size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
412                sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
413                if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
414                        munmap(sq.ring_ptr, sq.ring_sz);
415                        if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
416                        abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
417                }
418
419                // Get the pointers from the kernel to fill the structure
420                // submit queue
421                sq.head    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
422                sq.tail    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
423                sq.mask    = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
424                sq.num     = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
425                sq.flags   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
426                sq.dropped = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
427                sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
428                sq.prev_head = *sq.head;
429
430                {
431                        const uint32_t num = *sq.num;
432                        for( i; num ) {
433                                sq.sqes[i].user_data = 0ul64;
434                        }
435                }
436
437                (sq.lock){};
438                (sq.release_lock){};
439
440                if( params_in.poller_submits || params_in.eager_submits ) {
441                        /* paranoid */ verify( is_pow2( params_in.num_ready ) || (params_in.num_ready < 8) );
442                        sq.ready_cnt = max( params_in.num_ready, 8 );
443                        sq.ready = alloc_align( 64, sq.ready_cnt );
444                        for(i; sq.ready_cnt) {
445                                sq.ready[i] = -1ul32;
446                        }
447                }
448                else {
449                        sq.ready_cnt = 0;
450                        sq.ready = 0p;
451                }
452
453                // completion queue
454                cq.head     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
455                cq.tail     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
456                cq.mask     = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
457                cq.num      = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
458                cq.overflow = (         uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
459                cq.cqes   = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
460
461                // some paranoid checks
462                /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
463                /* paranoid */ verifyf( (*cq.num)  >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
464                /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
465                /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
466
467                /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
468                /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
469                /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
470                /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
471
472                // Update the global ring info
473                this.ring_flags = params.flags;
474                this.fd         = fd;
475                this.eager_submits  = params_in.eager_submits;
476                this.poller_submits = params_in.poller_submits;
477        }
478
479        void __io_destroy( __io_data & this ) {
480                // Shutdown the io rings
481                struct __submition_data  & sq = this.submit_q;
482                struct __completion_data & cq = this.completion_q;
483
484                // unmap the submit queue entries
485                munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
486
487                // unmap the Submit Queue ring
488                munmap(sq.ring_ptr, sq.ring_sz);
489
490                // unmap the Completion Queue ring, if it is different
491                if (cq.ring_ptr != sq.ring_ptr) {
492                        munmap(cq.ring_ptr, cq.ring_sz);
493                }
494
495                // close the file descriptor
496                close(this.fd);
497
498                free( this.submit_q.ready ); // Maybe null, doesn't matter
499        }
500
501        int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) {
502                bool need_sys_to_submit = false;
503                bool need_sys_to_complete = false;
504                unsigned flags = 0;
505
506                TO_SUBMIT:
507                if( to_submit > 0 ) {
508                        if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
509                                need_sys_to_submit = true;
510                                break TO_SUBMIT;
511                        }
512                        if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
513                                need_sys_to_submit = true;
514                                flags |= IORING_ENTER_SQ_WAKEUP;
515                        }
516                }
517
518                if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
519                        flags |= IORING_ENTER_GETEVENTS;
520                        if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
521                                need_sys_to_complete = true;
522                        }
523                }
524
525                int ret = 0;
526                if( need_sys_to_submit || need_sys_to_complete ) {
527                        ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, 0p, _NSIG / 8);
528                        if( ret < 0 ) {
529                                switch((int)errno) {
530                                case EAGAIN:
531                                case EINTR:
532                                        ret = -1;
533                                        break;
534                                default:
535                                        abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
536                                }
537                        }
538                }
539
540                // Memory barrier
541                __atomic_thread_fence( __ATOMIC_SEQ_CST );
542                return ret;
543        }
544
545//=============================================================================================
546// I/O Polling
547//=============================================================================================
548        static unsigned __collect_submitions( struct __io_data & ring );
549        static uint32_t __release_consumed_submission( struct __io_data & ring );
550
551        static inline void process(struct io_uring_cqe & cqe ) {
552                struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data;
553                __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
554
555                data->result = cqe.res;
556                unpark( data->thrd __cfaabi_dbg_ctx2 );
557        }
558
559        // Process a single completion message from the io_uring
560        // This is NOT thread-safe
561        static [int, bool] __drain_io( & struct __io_data ring ) {
562                /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
563
564                unsigned to_submit = 0;
565                if( ring.poller_submits ) {
566                        // If the poller thread also submits, then we need to aggregate the submissions which are ready
567                        to_submit = __collect_submitions( ring );
568                }
569
570                int ret = __io_uring_enter(ring, to_submit, true);
571                if( ret < 0 ) {
572                        return [0, true];
573                }
574
575                // update statistics
576                if (to_submit > 0) {
577                        __STATS__( true,
578                                if( to_submit > 0 ) {
579                                        io.submit_q.submit_avg.rdy += to_submit;
580                                        io.submit_q.submit_avg.csm += ret;
581                                        io.submit_q.submit_avg.cnt += 1;
582                                }
583                        )
584                }
585
586                // Release the consumed SQEs
587                __release_consumed_submission( ring );
588
589                // Drain the queue
590                unsigned head = *ring.completion_q.head;
591                unsigned tail = *ring.completion_q.tail;
592                const uint32_t mask = *ring.completion_q.mask;
593
594                // Nothing was new return 0
595                if (head == tail) {
596                        return [0, to_submit > 0];
597                }
598
599                uint32_t count = tail - head;
600                /* paranoid */ verify( count != 0 );
601                for(i; count) {
602                        unsigned idx = (head + i) & mask;
603                        struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
604
605                        /* paranoid */ verify(&cqe);
606
607                        process( cqe );
608                }
609
610                // Mark to the kernel that the cqe has been seen
611                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
612                __atomic_thread_fence( __ATOMIC_SEQ_CST );
613                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
614
615                return [count, count > 0 || to_submit > 0];
616        }
617
618        void main( $io_ctx_thread & this ) {
619                epoll_event ev;
620                ev.events = EPOLLIN | EPOLLONESHOT;
621                ev.data.u64 = (uint64_t)&this;
622                int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, this.ring->fd, &ev);
623                if (ret < 0) {
624                        abort( "KERNEL ERROR: EPOLL ADD - (%d) %s\n", (int)errno, strerror(errno) );
625                }
626
627                __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring);
628
629                int reset = 0;
630                // Then loop until we need to start
631                while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
632                        // Drain the io
633                        int count;
634                        bool again;
635                        disable_interrupts();
636                                [count, again] = __drain_io( *this.ring );
637
638                                if(!again) reset++;
639
640                                // Update statistics
641                                __STATS__( true,
642                                        io.complete_q.completed_avg.val += count;
643                                        io.complete_q.completed_avg.fast_cnt += 1;
644                                )
645                        enable_interrupts( __cfaabi_dbg_ctx );
646
647                        // If we got something, just yield and check again
648                        if(reset < 5) {
649                                yield();
650                        }
651                        // We didn't get anything baton pass to the slow poller
652                        else {
653                                __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self);
654                                reset = 0;
655
656                                // wake up the slow poller
657                                ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, this.ring->fd, &ev);
658                                if (ret < 0) {
659                                        abort( "KERNEL ERROR: EPOLL REARM - (%d) %s\n", (int)errno, strerror(errno) );
660                                }
661
662                                // park this thread
663                                wait( this.sem );
664                        }
665                }
666
667                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
668        }
669
670//=============================================================================================
671// I/O Submissions
672//=============================================================================================
673
674// Submition steps :
675// 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones
676//     listed in sq.array are visible by the kernel. For those not listed, the kernel does not
677//     offer any assurance that an entry is not being filled by multiple flags. Therefore, we
678//     need to write an allocator that allows allocating concurrently.
679//
680// 2 - Actually fill the submit entry, this is the only simple and straightforward step.
681//
682// 3 - Append the entry index to the array and adjust the tail accordingly. This operation
683//     needs to arrive to two concensus at the same time:
684//     A - The order in which entries are listed in the array: no two threads must pick the
685//         same index for their entries
686//     B - When can the tail be update for the kernel. EVERY entries in the array between
687//         head and tail must be fully filled and shouldn't ever be touched again.
688//
689
690        [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) {
691                /* paranoid */ verify( data != 0 );
692
693                // Prepare the data we need
694                __attribute((unused)) int len   = 0;
695                __attribute((unused)) int block = 0;
696                uint32_t cnt = *ring.submit_q.num;
697                uint32_t mask = *ring.submit_q.mask;
698
699                disable_interrupts();
700                        uint32_t off = __tls_rand();
701                enable_interrupts( __cfaabi_dbg_ctx );
702
703                // Loop around looking for an available spot
704                for() {
705                        // Look through the list starting at some offset
706                        for(i; cnt) {
707                                uint64_t expected = 0;
708                                uint32_t idx = (i + off) & mask;
709                                struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
710                                volatile uint64_t * udata = &sqe->user_data;
711
712                                if( *udata == expected &&
713                                        __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
714                                {
715                                        // update statistics
716                                        __STATS__( false,
717                                                io.submit_q.alloc_avg.val   += len;
718                                                io.submit_q.alloc_avg.block += block;
719                                                io.submit_q.alloc_avg.cnt   += 1;
720                                        )
721
722
723                                        // Success return the data
724                                        return [sqe, idx];
725                                }
726                                verify(expected != data);
727
728                                len ++;
729                        }
730
731                        block++;
732                        yield();
733                }
734        }
735
736        static inline uint32_t __submit_to_ready_array( struct __io_data & ring, uint32_t idx, const uint32_t mask ) {
737                /* paranoid */ verify( idx <= mask   );
738                /* paranoid */ verify( idx != -1ul32 );
739
740                // We need to find a spot in the ready array
741                __attribute((unused)) int len   = 0;
742                __attribute((unused)) int block = 0;
743                uint32_t ready_mask = ring.submit_q.ready_cnt - 1;
744
745                disable_interrupts();
746                        uint32_t off = __tls_rand();
747                enable_interrupts( __cfaabi_dbg_ctx );
748
749                uint32_t picked;
750                LOOKING: for() {
751                        for(i; ring.submit_q.ready_cnt) {
752                                picked = (i + off) & ready_mask;
753                                uint32_t expected = -1ul32;
754                                if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
755                                        break LOOKING;
756                                }
757                                verify(expected != idx);
758
759                                len ++;
760                        }
761
762                        block++;
763                        if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
764                                __release_consumed_submission( ring );
765                                unlock( ring.submit_q.lock );
766                        }
767                        else {
768                                yield();
769                        }
770                }
771
772                // update statistics
773                __STATS__( false,
774                        io.submit_q.look_avg.val   += len;
775                        io.submit_q.look_avg.block += block;
776                        io.submit_q.look_avg.cnt   += 1;
777                )
778
779                return picked;
780        }
781
782        void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1))) {
783                __io_data & ring = *ctx->thrd.ring;
784                // Get now the data we definetely need
785                uint32_t * const tail = ring.submit_q.tail;
786                const uint32_t mask  = *ring.submit_q.mask;
787
788                // There are 2 submission schemes, check which one we are using
789                if( ring.poller_submits ) {
790                        // If the poller thread submits, then we just need to add this to the ready array
791                        __submit_to_ready_array( ring, idx, mask );
792
793                        post( ctx->thrd.sem );
794
795                        __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
796                }
797                else if( ring.eager_submits ) {
798                        uint32_t picked = __submit_to_ready_array( ring, idx, mask );
799
800                        for() {
801                                yield();
802
803                                // If some one else collected our index, we are done
804                                #warning ABA problem
805                                if( ring.submit_q.ready[picked] != idx ) {
806                                        __STATS__( false,
807                                                io.submit_q.helped += 1;
808                                        )
809                                        return;
810                                }
811
812                                if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
813                                        __STATS__( false,
814                                                io.submit_q.leader += 1;
815                                        )
816                                        break;
817                                }
818
819                                __STATS__( false,
820                                        io.submit_q.busy += 1;
821                                )
822                        }
823
824                        // We got the lock
825                        unsigned to_submit = __collect_submitions( ring );
826                        int ret = __io_uring_enter( ring, to_submit, false );
827                        if( ret < 0 ) {
828                                unlock(ring.submit_q.lock);
829                                return;
830                        }
831
832                        /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) );
833
834                        // Release the consumed SQEs
835                        __release_consumed_submission( ring );
836
837                        // update statistics
838                        __STATS__( true,
839                                io.submit_q.submit_avg.rdy += to_submit;
840                                io.submit_q.submit_avg.csm += ret;
841                                io.submit_q.submit_avg.cnt += 1;
842                        )
843
844                        unlock(ring.submit_q.lock);
845                }
846                else {
847                        // get mutual exclusion
848                        lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
849
850                        /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
851                        /* paranoid */  "index %u already reclaimed\n"
852                        /* paranoid */  "head %u, prev %u, tail %u\n"
853                        /* paranoid */  "[-0: %u,-1: %u,-2: %u,-3: %u]\n",
854                        /* paranoid */  idx,
855                        /* paranoid */  *ring.submit_q.head, ring.submit_q.prev_head, *tail
856                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
857                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
858                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
859                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
860                        /* paranoid */ );
861
862                        // Append to the list of ready entries
863
864                        /* paranoid */ verify( idx <= mask );
865                        ring.submit_q.array[ (*tail) & mask ] = idx;
866                        __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
867
868                        // Submit however, many entries need to be submitted
869                        int ret = __io_uring_enter( ring, 1, false );
870                        if( ret < 0 ) {
871                                switch((int)errno) {
872                                default:
873                                        abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
874                                }
875                        }
876
877                        // update statistics
878                        __STATS__( false,
879                                io.submit_q.submit_avg.csm += 1;
880                                io.submit_q.submit_avg.cnt += 1;
881                        )
882
883                        // Release the consumed SQEs
884                        __release_consumed_submission( ring );
885
886                        unlock(ring.submit_q.lock);
887
888                        __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
889                }
890        }
891
892        static unsigned __collect_submitions( struct __io_data & ring ) {
893                /* paranoid */ verify( ring.submit_q.ready != 0p );
894                /* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
895
896                unsigned to_submit = 0;
897                uint32_t tail = *ring.submit_q.tail;
898                const uint32_t mask = *ring.submit_q.mask;
899
900                // Go through the list of ready submissions
901                for( i; ring.submit_q.ready_cnt ) {
902                        // replace any submission with the sentinel, to consume it.
903                        uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
904
905                        // If it was already the sentinel, then we are done
906                        if( idx == -1ul32 ) continue;
907
908                        // If we got a real submission, append it to the list
909                        ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
910                        to_submit++;
911                }
912
913                // Increment the tail based on how many we are ready to submit
914                __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
915
916                return to_submit;
917        }
918
919        static uint32_t __release_consumed_submission( struct __io_data & ring ) {
920                const uint32_t smask = *ring.submit_q.mask;
921
922                if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
923                uint32_t chead = *ring.submit_q.head;
924                uint32_t phead = ring.submit_q.prev_head;
925                ring.submit_q.prev_head = chead;
926                unlock(ring.submit_q.release_lock);
927
928                uint32_t count = chead - phead;
929                for( i; count ) {
930                        uint32_t idx = ring.submit_q.array[ (phead + i) & smask ];
931                        ring.submit_q.sqes[ idx ].user_data = 0;
932                }
933                return count;
934        }
935
936//=============================================================================================
937// I/O Submissions
938//=============================================================================================
939
940        void register_fixed_files( io_context & ctx, int * files, unsigned count ) {
941                int ret = syscall( __NR_io_uring_register, ctx.thrd.ring->fd, IORING_REGISTER_FILES, files, count );
942                if( ret < 0 ) {
943                        abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
944                }
945
946                __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
947        }
948
949        void register_fixed_files( cluster & cltr, int * files, unsigned count ) {
950                for(i; cltr.io.cnt) {
951                        register_fixed_files( cltr.io.ctxs[i], files, count );
952                }
953        }
954#endif
Note: See TracBrowser for help on using the repository browser.