source: libcfa/src/concurrency/io.cfa @ 20ab637

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 20ab637 was 20ab637, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Added quick and dirty support for fixed files reads.
Added support for kernel side polling.

  • Property mode set to 100644
File size: 29.8 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Thu Apr 23 17:31:00 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#if defined(__CFA_DEBUG__)
17        // #define __CFA_DEBUG_PRINT_IO__
18        #define __CFA_DEBUG_PRINT_IO_CORE__
19#endif
20
21#include "kernel.hfa"
22#include "bitmanip.hfa"
23
24#if !defined(HAVE_LINUX_IO_URING_H)
25        void __kernel_io_startup( cluster &, unsigned, bool ) {
26                // Nothing to do without io_uring
27        }
28
29        void __kernel_io_finish_start( cluster & ) {
30                // Nothing to do without io_uring
31        }
32
33        void __kernel_io_prepare_stop( cluster & ) {
34                // Nothing to do without io_uring
35        }
36
37        void __kernel_io_shutdown( cluster &, bool ) {
38                // Nothing to do without io_uring
39        }
40
41#else
42        #define _GNU_SOURCE         /* See feature_test_macros(7) */
43        #include <errno.h>
44        #include <stdint.h>
45        #include <string.h>
46        #include <unistd.h>
47        #include <sys/mman.h>
48
49        extern "C" {
50                #include <sys/syscall.h>
51
52                #include <linux/io_uring.h>
53        }
54
55        #include "bits/signal.hfa"
56        #include "kernel_private.hfa"
57        #include "thread.hfa"
58
59        uint32_t entries_per_cluster() {
60                return 256;
61        }
62
63        static void * __io_poller_slow( void * arg );
64
65        // Weirdly, some systems that do support io_uring don't actually define these
66        #ifdef __alpha__
67                /*
68                * alpha is the only exception, all other architectures
69                * have common numbers for new system calls.
70                */
71                #ifndef __NR_io_uring_setup
72                        #define __NR_io_uring_setup           535
73                #endif
74                #ifndef __NR_io_uring_enter
75                        #define __NR_io_uring_enter           536
76                #endif
77                #ifndef __NR_io_uring_register
78                        #define __NR_io_uring_register        537
79                #endif
80        #else /* !__alpha__ */
81                #ifndef __NR_io_uring_setup
82                        #define __NR_io_uring_setup           425
83                #endif
84                #ifndef __NR_io_uring_enter
85                        #define __NR_io_uring_enter           426
86                #endif
87                #ifndef __NR_io_uring_register
88                        #define __NR_io_uring_register        427
89                #endif
90        #endif
91
92        // Fast poller user-thread
93        // Not using the "thread" keyword because we want to control
94        // more carefully when to start/stop it
95        struct __io_poller_fast {
96                struct __io_data * ring;
97                $thread thrd;
98        };
99
100        void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
101                this.ring = cltr.io;
102                (this.thrd){ "Fast I/O Poller", cltr };
103        }
104        void ^?{}( __io_poller_fast & mutex this );
105        void main( __io_poller_fast & this );
106        static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
107        void ^?{}( __io_poller_fast & mutex this ) {}
108
109        struct __submition_data {
110                // Head and tail of the ring (associated with array)
111                volatile uint32_t * head;
112                volatile uint32_t * tail;
113                volatile uint32_t prev_head;
114
115                // The actual kernel ring which uses head/tail
116                // indexes into the sqes arrays
117                uint32_t * array;
118
119                // number of entries and mask to go with it
120                const uint32_t * num;
121                const uint32_t * mask;
122
123                // Submission flags (Not sure what for)
124                uint32_t * flags;
125
126                // number of sqes not submitted (whatever that means)
127                uint32_t * dropped;
128
129                // Like head/tail but not seen by the kernel
130                volatile uint32_t * ready;
131                uint32_t ready_cnt;
132
133                __spinlock_t lock;
134                __spinlock_t release_lock;
135
136                // A buffer of sqes (not the actual ring)
137                struct io_uring_sqe * sqes;
138
139                // The location and size of the mmaped area
140                void * ring_ptr;
141                size_t ring_sz;
142        };
143
144        struct __completion_data {
145                // Head and tail of the ring
146                volatile uint32_t * head;
147                volatile uint32_t * tail;
148
149                // number of entries and mask to go with it
150                const uint32_t * mask;
151                const uint32_t * num;
152
153                // number of cqes not submitted (whatever that means)
154                uint32_t * overflow;
155
156                // the kernel ring
157                struct io_uring_cqe * cqes;
158
159                // The location and size of the mmaped area
160                void * ring_ptr;
161                size_t ring_sz;
162        };
163
164        struct __io_data {
165                struct __submition_data submit_q;
166                struct __completion_data completion_q;
167                uint32_t ring_flags;
168                int cltr_flags;
169                int fd;
170                semaphore submit;
171                volatile bool done;
172                struct {
173                        struct {
174                                __processor_id_t id;
175                                void * stack;
176                                pthread_t kthrd;
177                                volatile bool blocked;
178                        } slow;
179                        __io_poller_fast fast;
180                        __bin_sem_t sem;
181                } poller;
182        };
183
184//=============================================================================================
185// I/O Startup / Shutdown logic
186//=============================================================================================
187        void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
188                if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) {
189                        abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n");
190                }
191
192                this.io = malloc();
193
194                // Step 1 : call to setup
195                struct io_uring_params params;
196                memset(&params, 0, sizeof(params));
197                if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS   ) params.flags |= IORING_SETUP_SQPOLL;
198                if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES ) params.flags |= IORING_SETUP_IOPOLL;
199
200                uint32_t nentries = entries_per_cluster();
201
202                int fd = syscall(__NR_io_uring_setup, nentries, &params );
203                if(fd < 0) {
204                        abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
205                }
206
207                // Step 2 : mmap result
208                memset( this.io, 0, sizeof(struct __io_data) );
209                struct __submition_data  & sq = this.io->submit_q;
210                struct __completion_data & cq = this.io->completion_q;
211
212                // calculate the right ring size
213                sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
214                cq.ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
215
216                // Requires features
217                #if defined(IORING_FEAT_SINGLE_MMAP)
218                        // adjust the size according to the parameters
219                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
220                                cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz);
221                        }
222                #endif
223
224                // mmap the Submit Queue into existence
225                sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
226                if (sq.ring_ptr == (void*)MAP_FAILED) {
227                        abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
228                }
229
230                // Requires features
231                #if defined(IORING_FEAT_SINGLE_MMAP)
232                        // mmap the Completion Queue into existence (may or may not be needed)
233                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
234                                cq->ring_ptr = sq->ring_ptr;
235                        }
236                        else
237                #endif
238                {
239                        // We need multiple call to MMAP
240                        cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
241                        if (cq.ring_ptr == (void*)MAP_FAILED) {
242                                munmap(sq.ring_ptr, sq.ring_sz);
243                                abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
244                        }
245                }
246
247                // mmap the submit queue entries
248                size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
249                sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
250                if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
251                        munmap(sq.ring_ptr, sq.ring_sz);
252                        if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
253                        abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
254                }
255
256                // Get the pointers from the kernel to fill the structure
257                // submit queue
258                sq.head    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
259                sq.tail    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
260                sq.mask    = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
261                sq.num     = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
262                sq.flags   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
263                sq.dropped = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
264                sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
265                sq.prev_head = *sq.head;
266
267                {
268                        const uint32_t num = *sq.num;
269                        for( i; num ) {
270                                sq.sqes[i].user_data = 0ul64;
271                        }
272                }
273
274                (sq.lock){};
275                (sq.release_lock){};
276
277                if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) {
278                        /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8)  );
279                        sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
280                        sq.ready = alloc_align( 64, sq.ready_cnt );
281                        for(i; sq.ready_cnt) {
282                                sq.ready[i] = -1ul32;
283                        }
284                }
285                else {
286                        sq.ready_cnt = 0;
287                        sq.ready = 0p;
288                }
289
290                // completion queue
291                cq.head     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
292                cq.tail     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
293                cq.mask     = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
294                cq.num      = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
295                cq.overflow = (         uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
296                cq.cqes   = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
297
298                // some paranoid checks
299                /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
300                /* paranoid */ verifyf( (*cq.num)  >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
301                /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
302                /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
303
304                /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
305                /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
306                /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
307                /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
308
309                // Update the global ring info
310                this.io->ring_flags = params.flags;
311                this.io->cltr_flags = io_flags;
312                this.io->fd         = fd;
313                this.io->done       = false;
314                (this.io->submit){ min(*sq.num, *cq.num) };
315
316                if(!main_cluster) {
317                        __kernel_io_finish_start( this );
318                }
319        }
320
321        void __kernel_io_finish_start( cluster & this ) {
322                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
323                        __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
324                        (this.io->poller.fast){ this };
325                        __thrd_start( this.io->poller.fast, main );
326                }
327
328                // Create the poller thread
329                __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this);
330                this.io->poller.slow.blocked = false;
331                this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
332        }
333
334        void __kernel_io_prepare_stop( cluster & this ) {
335                __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
336                // Notify the poller thread of the shutdown
337                __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
338
339                // Stop the IO Poller
340                sigval val = { 1 };
341                pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
342                post( this.io->poller.sem );
343
344                // Wait for the poller thread to finish
345                pthread_join( this.io->poller.slow.kthrd, 0p );
346                free( this.io->poller.slow.stack );
347
348                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
349
350                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
351                        with( this.io->poller.fast ) {
352                                /* paranoid */ verify( this.nprocessors == 0 || &this == mainCluster );
353                                /* paranoid */ verify( !ready_mutate_islocked() );
354
355                                // We need to adjust the clean-up based on where the thread is
356                                if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
357
358                                        ready_schedule_lock( (struct __processor_id_t *)active_processor() );
359
360                                                // This is the tricky case
361                                                // The thread was preempted and now it is on the ready queue
362                                                // The thread should be the last on the list
363                                                /* paranoid */ verify( thrd.link.next != 0p );
364
365                                                // Remove the thread from the ready queue of this cluster
366                                                __attribute__((unused)) bool removed = remove_head( &this, &thrd );
367                                                /* paranoid */ verify( removed );
368                                                thrd.link.next = 0p;
369                                                thrd.link.prev = 0p;
370                                                __cfaabi_dbg_debug_do( thrd.unpark_stale = true );
371
372                                                // Fixup the thread state
373                                                thrd.state = Blocked;
374                                                thrd.ticket = 0;
375                                                thrd.preempted = __NO_PREEMPTION;
376
377                                        ready_schedule_unlock( (struct __processor_id_t *)active_processor() );
378
379                                        // Pretend like the thread was blocked all along
380                                }
381                                // !!! This is not an else if !!!
382                                if( thrd.state == Blocked ) {
383
384                                        // This is the "easy case"
385                                        // The thread is parked and can easily be moved to active cluster
386                                        verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
387                                        thrd.curr_cluster = active_cluster();
388
389                                        // unpark the fast io_poller
390                                        unpark( &thrd __cfaabi_dbg_ctx2 );
391                                }
392                                else {
393
394                                        // The thread is in a weird state
395                                        // I don't know what to do here
396                                        abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n");
397                                }
398
399                        }
400
401                        ^(this.io->poller.fast){};
402
403                        __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
404                }
405        }
406
407        void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
408                if(!main_cluster) {
409                        __kernel_io_prepare_stop( this );
410                }
411
412                // Shutdown the io rings
413                struct __submition_data  & sq = this.io->submit_q;
414                struct __completion_data & cq = this.io->completion_q;
415
416                // unmap the submit queue entries
417                munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
418
419                // unmap the Submit Queue ring
420                munmap(sq.ring_ptr, sq.ring_sz);
421
422                // unmap the Completion Queue ring, if it is different
423                if (cq.ring_ptr != sq.ring_ptr) {
424                        munmap(cq.ring_ptr, cq.ring_sz);
425                }
426
427                // close the file descriptor
428                close(this.io->fd);
429
430                free( this.io->submit_q.ready ); // Maybe null, doesn't matter
431                free( this.io );
432        }
433
434        int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) {
435                bool need_sys_to_submit = false;
436                bool need_sys_to_complete = false;
437                unsigned min_complete = 0;
438                unsigned flags = 0;
439
440
441                TO_SUBMIT:
442                if( to_submit > 0 ) {
443                        if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
444                                need_sys_to_submit = true;
445                                break TO_SUBMIT;
446                        }
447                        if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
448                                need_sys_to_submit = true;
449                                flags |= IORING_ENTER_SQ_WAKEUP;
450                        }
451                }
452
453                TO_COMPLETE:
454                if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
455                        flags |= IORING_ENTER_GETEVENTS;
456                        if( mask ) {
457                                need_sys_to_complete = true;
458                                min_complete = 1;
459                                break TO_COMPLETE;
460                        }
461                        if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
462                                need_sys_to_complete = true;
463                        }
464                }
465
466                int ret = 0;
467                if( need_sys_to_submit || need_sys_to_complete ) {
468                        ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);
469                        if( ret < 0 ) {
470                                switch((int)errno) {
471                                case EAGAIN:
472                                case EINTR:
473                                        ret = -1;
474                                        break;
475                                default:
476                                        abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
477                                }
478                        }
479                }
480
481                // Memory barrier
482                __atomic_thread_fence( __ATOMIC_SEQ_CST );
483                return ret;
484        }
485
486//=============================================================================================
487// I/O Polling
488//=============================================================================================
489        static unsigned __collect_submitions( struct __io_data & ring );
490        static uint32_t __release_consumed_submission( struct __io_data & ring );
491
492        // Process a single completion message from the io_uring
493        // This is NOT thread-safe
494        static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) {
495                /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
496                const uint32_t smask = *ring.submit_q.mask;
497
498                unsigned to_submit = 0;
499                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
500                        // If the poller thread also submits, then we need to aggregate the submissions which are ready
501                        to_submit = __collect_submitions( ring );
502                }
503
504                int ret = __io_uring_enter(ring, to_submit, true, mask);
505                if( ret < 0 ) {
506                        return [0, true];
507                }
508
509                // update statistics
510                if (to_submit > 0) {
511                        __STATS__( true,
512                                if( to_submit > 0 ) {
513                                        io.submit_q.submit_avg.rdy += to_submit;
514                                        io.submit_q.submit_avg.csm += ret;
515                                        io.submit_q.submit_avg.cnt += 1;
516                                }
517                        )
518                }
519
520                // Release the consumed SQEs
521                __release_consumed_submission( ring );
522
523                // Drain the queue
524                unsigned head = *ring.completion_q.head;
525                unsigned tail = *ring.completion_q.tail;
526                const uint32_t mask = *ring.completion_q.mask;
527
528                // Nothing was new return 0
529                if (head == tail) {
530                        return [0, to_submit > 0];
531                }
532
533                uint32_t count = tail - head;
534                /* paranoid */ verify( count != 0 );
535                for(i; count) {
536                        unsigned idx = (head + i) & mask;
537                        struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
538
539                        /* paranoid */ verify(&cqe);
540
541                        struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data;
542                        __cfadbg_print_safe( io, "Kernel I/O : Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
543
544                        data->result = cqe.res;
545                        if(!mask) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
546                        else      { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); }
547                }
548
549                // Allow new submissions to happen
550                // V(ring.submit, count);
551
552                // Mark to the kernel that the cqe has been seen
553                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
554                __atomic_thread_fence( __ATOMIC_SEQ_CST );
555                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
556
557                return [count, count > 0 || to_submit > 0];
558        }
559
560        static void * __io_poller_slow( void * arg ) {
561                #if !defined( __CFA_NO_STATISTICS__ )
562                        __stats_t local_stats;
563                        __init_stats( &local_stats );
564                        kernelTLS.this_stats = &local_stats;
565                #endif
566
567                cluster * cltr = (cluster *)arg;
568                struct __io_data & ring = *cltr->io;
569
570                ring.poller.slow.id.id = doregister( &ring.poller.slow.id );
571
572                sigset_t mask;
573                sigfillset(&mask);
574                if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
575                        abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
576                }
577
578                sigdelset( &mask, SIGUSR1 );
579
580                verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
581                verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
582
583                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
584
585                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
586                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
587
588                                __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST );
589
590                                // In the user-thread approach drain and if anything was drained,
591                                // batton pass to the user-thread
592                                int count;
593                                bool again;
594                                [count, again] = __drain_io( ring, &mask );
595
596                                __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
597
598                                // Update statistics
599                                __STATS__( true,
600                                        io.complete_q.completed_avg.val += count;
601                                        io.complete_q.completed_avg.slow_cnt += 1;
602                                )
603
604                                if(again) {
605                                        __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
606                                        __unpark( &ring.poller.slow.id, &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
607                                        wait( ring.poller.sem );
608                                }
609                        }
610                }
611                else {
612                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
613                                //In the naive approach, just poll the io completion queue directly
614                                int count;
615                                bool again;
616                                [count, again] = __drain_io( ring, &mask );
617
618                                // Update statistics
619                                __STATS__( true,
620                                        io.complete_q.completed_avg.val += count;
621                                        io.complete_q.completed_avg.slow_cnt += 1;
622                                )
623                        }
624                }
625
626                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
627
628                unregister( &ring.poller.slow.id );
629
630                #if !defined(__CFA_NO_STATISTICS__)
631                        __tally_stats(cltr->stats, &local_stats);
632                #endif
633
634                return 0p;
635        }
636
637        void main( __io_poller_fast & this ) {
638                verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD );
639
640                // Start parked
641                park( __cfaabi_dbg_ctx );
642
643                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
644
645                int reset = 0;
646
647                // Then loop until we need to start
648                while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
649
650                        // Drain the io
651                        int count;
652                        bool again;
653                        disable_interrupts();
654                                [count, again] = __drain_io( *this.ring, 0p );
655
656                                if(!again) reset++;
657
658                                // Update statistics
659                                __STATS__( true,
660                                        io.complete_q.completed_avg.val += count;
661                                        io.complete_q.completed_avg.fast_cnt += 1;
662                                )
663                        enable_interrupts( __cfaabi_dbg_ctx );
664
665                        // If we got something, just yield and check again
666                        if(reset < 5) {
667                                yield();
668                        }
669                        // We didn't get anything baton pass to the slow poller
670                        else {
671                                __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
672                                reset = 0;
673
674                                // wake up the slow poller
675                                post( this.ring->poller.sem );
676
677                                // park this thread
678                                park( __cfaabi_dbg_ctx );
679                        }
680                }
681
682                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
683        }
684
685        static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
686        static inline void __wake_poller( struct __io_data & ring ) {
687                if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;
688
689                sigval val = { 1 };
690                pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
691        }
692
693//=============================================================================================
694// I/O Submissions
695//=============================================================================================
696
697// Submition steps :
698// 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones
699//     listed in sq.array are visible by the kernel. For those not listed, the kernel does not
700//     offer any assurance that an entry is not being filled by multiple flags. Therefore, we
701//     need to write an allocator that allows allocating concurrently.
702//
703// 2 - Actually fill the submit entry, this is the only simple and straightforward step.
704//
705// 3 - Append the entry index to the array and adjust the tail accordingly. This operation
706//     needs to arrive to two concensus at the same time:
707//     A - The order in which entries are listed in the array: no two threads must pick the
708//         same index for their entries
709//     B - When can the tail be update for the kernel. EVERY entries in the array between
710//         head and tail must be fully filled and shouldn't ever be touched again.
711//
712
713        [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) {
714                /* paranoid */ verify( data != 0 );
715
716                // Prepare the data we need
717                __attribute((unused)) int len   = 0;
718                __attribute((unused)) int block = 0;
719                uint32_t cnt = *ring.submit_q.num;
720                uint32_t mask = *ring.submit_q.mask;
721
722                disable_interrupts();
723                        uint32_t off = __tls_rand();
724                enable_interrupts( __cfaabi_dbg_ctx );
725
726                // Loop around looking for an available spot
727                for() {
728                        // Look through the list starting at some offset
729                        for(i; cnt) {
730                                uint64_t expected = 0;
731                                uint32_t idx = (i + off) & mask;
732                                struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
733                                volatile uint64_t * udata = &sqe->user_data;
734
735                                if( *udata == expected &&
736                                        __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
737                                {
738                                        // update statistics
739                                        __STATS__( false,
740                                                io.submit_q.alloc_avg.val   += len;
741                                                io.submit_q.alloc_avg.block += block;
742                                                io.submit_q.alloc_avg.cnt   += 1;
743                                        )
744
745
746                                        // Success return the data
747                                        return [sqe, idx];
748                                }
749                                verify(expected != data);
750
751                                len ++;
752                        }
753
754                        block++;
755                        yield();
756                }
757        }
758
759        static inline uint32_t __submit_to_ready_array( struct __io_data & ring, uint32_t idx, const uint32_t mask ) {
760                /* paranoid */ verify( idx <= mask   );
761                /* paranoid */ verify( idx != -1ul32 );
762
763                // We need to find a spot in the ready array
764                __attribute((unused)) int len   = 0;
765                __attribute((unused)) int block = 0;
766                uint32_t ready_mask = ring.submit_q.ready_cnt - 1;
767
768                disable_interrupts();
769                        uint32_t off = __tls_rand();
770                enable_interrupts( __cfaabi_dbg_ctx );
771
772                uint32_t picked;
773                LOOKING: for() {
774                        for(i; ring.submit_q.ready_cnt) {
775                                picked = (i + off) & ready_mask;
776                                uint32_t expected = -1ul32;
777                                if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
778                                        break LOOKING;
779                                }
780                                verify(expected != idx);
781
782                                len ++;
783                        }
784
785                        block++;
786                        if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
787                                __release_consumed_submission( ring );
788                                unlock( ring.submit_q.lock );
789                        }
790                        else {
791                                yield();
792                        }
793                }
794
795                // update statistics
796                __STATS__( false,
797                        io.submit_q.look_avg.val   += len;
798                        io.submit_q.look_avg.block += block;
799                        io.submit_q.look_avg.cnt   += 1;
800                )
801
802                return picked;
803        }
804
805        void __submit( struct __io_data & ring, uint32_t idx ) {
806                // Get now the data we definetely need
807                uint32_t * const tail = ring.submit_q.tail;
808                const uint32_t mask = *ring.submit_q.mask;
809
810                // There are 2 submission schemes, check which one we are using
811                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
812                        // If the poller thread submits, then we just need to add this to the ready array
813                        __submit_to_ready_array( ring, idx, mask );
814
815                        __wake_poller( ring );
816
817                        __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
818                }
819                else if( ring.cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS ) {
820                        uint32_t picked = __submit_to_ready_array( ring, idx, mask );
821
822                        for() {
823                                yield();
824
825                                // If some one else collected our index, we are done
826                                #warning ABA problem
827                                if( ring.submit_q.ready[picked] != idx ) {
828                                        __STATS__( false,
829                                                io.submit_q.helped += 1;
830                                        )
831                                        return;
832                                }
833
834                                if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
835                                        __STATS__( false,
836                                                io.submit_q.leader += 1;
837                                        )
838                                        break;
839                                }
840
841                                __STATS__( false,
842                                        io.submit_q.busy += 1;
843                                )
844                        }
845
846                        // We got the lock
847                        unsigned to_submit = __collect_submitions( ring );
848                        int ret = __io_uring_enter( ring, to_submit, false, 0p );
849                        if( ret < 0 ) {
850                                unlock(ring.submit_q.lock);
851                                return;
852                        }
853
854                        /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) );
855
856                        // Release the consumed SQEs
857                        __release_consumed_submission( ring );
858
859                        // update statistics
860                        __STATS__( true,
861                                io.submit_q.submit_avg.rdy += to_submit;
862                                io.submit_q.submit_avg.csm += ret;
863                                io.submit_q.submit_avg.cnt += 1;
864                        )
865
866                        unlock(ring.submit_q.lock);
867                }
868                else {
869                        // get mutual exclusion
870                        lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
871
872                        /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
873                        /* paranoid */  "index %u already reclaimed\n"
874                        /* paranoid */  "head %u, prev %u, tail %u\n"
875                        /* paranoid */  "[-0: %u,-1: %u,-2: %u,-3: %u]\n",
876                        /* paranoid */  idx,
877                        /* paranoid */  *ring.submit_q.head, ring.submit_q.prev_head, *tail
878                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
879                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
880                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
881                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
882                        /* paranoid */ );
883
884                        // Append to the list of ready entries
885
886                        /* paranoid */ verify( idx <= mask );
887                        ring.submit_q.array[ (*tail) & mask ] = idx;
888                        __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
889
890                        // Submit however, many entries need to be submitted
891                        int ret = __io_uring_enter( ring, 1, false, 0p );
892                        if( ret < 0 ) {
893                                switch((int)errno) {
894                                default:
895                                        abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
896                                }
897                        }
898
899                        // update statistics
900                        __STATS__( false,
901                                io.submit_q.submit_avg.csm += 1;
902                                io.submit_q.submit_avg.cnt += 1;
903                        )
904
905                        // Release the consumed SQEs
906                        __release_consumed_submission( ring );
907
908                        unlock(ring.submit_q.lock);
909
910                        __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
911                }
912        }
913
914        static unsigned __collect_submitions( struct __io_data & ring ) {
915                /* paranoid */ verify( ring.submit_q.ready != 0p );
916                /* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
917
918                unsigned to_submit = 0;
919                uint32_t tail = *ring.submit_q.tail;
920                const uint32_t mask = *ring.submit_q.mask;
921
922                // Go through the list of ready submissions
923                for( i; ring.submit_q.ready_cnt ) {
924                        // replace any submission with the sentinel, to consume it.
925                        uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
926
927                        // If it was already the sentinel, then we are done
928                        if( idx == -1ul32 ) continue;
929
930                        // If we got a real submission, append it to the list
931                        ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
932                        to_submit++;
933                }
934
935                // Increment the tail based on how many we are ready to submit
936                __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
937
938                return to_submit;
939        }
940
941        static uint32_t __release_consumed_submission( struct __io_data & ring ) {
942                const uint32_t smask = *ring.submit_q.mask;
943
944                if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
945                uint32_t chead = *ring.submit_q.head;
946                uint32_t phead = ring.submit_q.prev_head;
947                ring.submit_q.prev_head = chead;
948                unlock(ring.submit_q.release_lock);
949
950                uint32_t count = chead - phead;
951                for( i; count ) {
952                        uint32_t idx = ring.submit_q.array[ (phead + i) & smask ];
953                        ring.submit_q.sqes[ idx ].user_data = 0;
954                }
955                return count;
956        }
957
958//=============================================================================================
959// I/O Submissions
960//=============================================================================================
961
962        void register_fixed_files( cluster & cl, int * files, unsigned count ) {
963                int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );
964                if( ret < 0 ) {
965                        abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
966                }
967
968                __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
969        }
970#endif
Note: See TracBrowser for help on using the repository browser.