source: libcfa/src/concurrency/io.cfa @ e1801fc

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since e1801fc was 59f74a2, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

  • Property mode set to 100644
File size: 29.8 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Thu Apr 23 17:31:00 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#if defined(__CFA_DEBUG__)
17        // #define __CFA_DEBUG_PRINT_IO__
18        #define __CFA_DEBUG_PRINT_IO_CORE__
19#endif
20
21#include "kernel.hfa"
22#include "bitmanip.hfa"
23
24#if !defined(HAVE_LINUX_IO_URING_H)
25        void __kernel_io_startup( cluster &, unsigned, bool ) {
26                // Nothing to do without io_uring
27        }
28
29        void __kernel_io_finish_start( cluster & ) {
30                // Nothing to do without io_uring
31        }
32
33        void __kernel_io_prepare_stop( cluster & ) {
34                // Nothing to do without io_uring
35        }
36
37        void __kernel_io_shutdown( cluster &, bool ) {
38                // Nothing to do without io_uring
39        }
40
41#else
42        #define _GNU_SOURCE         /* See feature_test_macros(7) */
43        #include <errno.h>
44        #include <stdint.h>
45        #include <string.h>
46        #include <unistd.h>
47        #include <sys/mman.h>
48
49        extern "C" {
50                #include <sys/syscall.h>
51
52                #include <linux/io_uring.h>
53        }
54
55        #include "bits/signal.hfa"
56        #include "kernel_private.hfa"
57        #include "thread.hfa"
58
59        uint32_t entries_per_cluster() {
60                return 256;
61        }
62
63        static void * __io_poller_slow( void * arg );
64
65        // Weirdly, some systems that do support io_uring don't actually define these
66        #ifdef __alpha__
67                /*
68                * alpha is the only exception, all other architectures
69                * have common numbers for new system calls.
70                */
71                #ifndef __NR_io_uring_setup
72                        #define __NR_io_uring_setup           535
73                #endif
74                #ifndef __NR_io_uring_enter
75                        #define __NR_io_uring_enter           536
76                #endif
77                #ifndef __NR_io_uring_register
78                        #define __NR_io_uring_register        537
79                #endif
80        #else /* !__alpha__ */
81                #ifndef __NR_io_uring_setup
82                        #define __NR_io_uring_setup           425
83                #endif
84                #ifndef __NR_io_uring_enter
85                        #define __NR_io_uring_enter           426
86                #endif
87                #ifndef __NR_io_uring_register
88                        #define __NR_io_uring_register        427
89                #endif
90        #endif
91
92        // Fast poller user-thread
93        // Not using the "thread" keyword because we want to control
94        // more carefully when to start/stop it
95        struct __io_poller_fast {
96                struct __io_data * ring;
97                $thread thrd;
98        };
99
100        void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
101                this.ring = cltr.io;
102                (this.thrd){ "Fast I/O Poller", cltr };
103        }
104        void ^?{}( __io_poller_fast & mutex this );
105        void main( __io_poller_fast & this );
106        static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
107        void ^?{}( __io_poller_fast & mutex this ) {}
108
109        struct __submition_data {
110                // Head and tail of the ring (associated with array)
111                volatile uint32_t * head;
112                volatile uint32_t * tail;
113                volatile uint32_t prev_head;
114
115                // The actual kernel ring which uses head/tail
116                // indexes into the sqes arrays
117                uint32_t * array;
118
119                // number of entries and mask to go with it
120                const uint32_t * num;
121                const uint32_t * mask;
122
123                // Submission flags (Not sure what for)
124                uint32_t * flags;
125
126                // number of sqes not submitted (whatever that means)
127                uint32_t * dropped;
128
129                // Like head/tail but not seen by the kernel
130                volatile uint32_t * ready;
131                uint32_t ready_cnt;
132
133                __spinlock_t lock;
134                __spinlock_t release_lock;
135
136                // A buffer of sqes (not the actual ring)
137                struct io_uring_sqe * sqes;
138
139                // The location and size of the mmaped area
140                void * ring_ptr;
141                size_t ring_sz;
142        };
143
144        struct __completion_data {
145                // Head and tail of the ring
146                volatile uint32_t * head;
147                volatile uint32_t * tail;
148
149                // number of entries and mask to go with it
150                const uint32_t * mask;
151                const uint32_t * num;
152
153                // number of cqes not submitted (whatever that means)
154                uint32_t * overflow;
155
156                // the kernel ring
157                struct io_uring_cqe * cqes;
158
159                // The location and size of the mmaped area
160                void * ring_ptr;
161                size_t ring_sz;
162        };
163
164        struct __io_data {
165                struct __submition_data submit_q;
166                struct __completion_data completion_q;
167                uint32_t ring_flags;
168                int cltr_flags;
169                int fd;
170                semaphore submit;
171                volatile bool done;
172                struct {
173                        struct {
174                                __processor_id_t id;
175                                void * stack;
176                                pthread_t kthrd;
177                                volatile bool blocked;
178                        } slow;
179                        __io_poller_fast fast;
180                        __bin_sem_t sem;
181                } poller;
182        };
183
184//=============================================================================================
185// I/O Startup / Shutdown logic
186//=============================================================================================
187        void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
188                if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) {
189                        abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n");
190                }
191
192                this.io = malloc();
193
194                // Step 1 : call to setup
195                struct io_uring_params params;
196                memset(&params, 0, sizeof(params));
197                if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS   ) params.flags |= IORING_SETUP_SQPOLL;
198                if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES ) params.flags |= IORING_SETUP_IOPOLL;
199
200                uint32_t nentries = entries_per_cluster();
201
202                int fd = syscall(__NR_io_uring_setup, nentries, &params );
203                if(fd < 0) {
204                        abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
205                }
206
207                // Step 2 : mmap result
208                memset( this.io, 0, sizeof(struct __io_data) );
209                struct __submition_data  & sq = this.io->submit_q;
210                struct __completion_data & cq = this.io->completion_q;
211
212                // calculate the right ring size
213                sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
214                cq.ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
215
216                // Requires features
217                #if defined(IORING_FEAT_SINGLE_MMAP)
218                        // adjust the size according to the parameters
219                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
220                                cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz);
221                        }
222                #endif
223
224                // mmap the Submit Queue into existence
225                sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
226                if (sq.ring_ptr == (void*)MAP_FAILED) {
227                        abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
228                }
229
230                // Requires features
231                #if defined(IORING_FEAT_SINGLE_MMAP)
232                        // mmap the Completion Queue into existence (may or may not be needed)
233                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
234                                cq.ring_ptr = sq.ring_ptr;
235                        }
236                        else
237                #endif
238                {
239                        // We need multiple call to MMAP
240                        cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
241                        if (cq.ring_ptr == (void*)MAP_FAILED) {
242                                munmap(sq.ring_ptr, sq.ring_sz);
243                                abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
244                        }
245                }
246
247                // mmap the submit queue entries
248                size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
249                sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
250                if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
251                        munmap(sq.ring_ptr, sq.ring_sz);
252                        if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
253                        abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
254                }
255
256                // Get the pointers from the kernel to fill the structure
257                // submit queue
258                sq.head    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
259                sq.tail    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
260                sq.mask    = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
261                sq.num     = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
262                sq.flags   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
263                sq.dropped = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
264                sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
265                sq.prev_head = *sq.head;
266
267                {
268                        const uint32_t num = *sq.num;
269                        for( i; num ) {
270                                sq.sqes[i].user_data = 0ul64;
271                        }
272                }
273
274                (sq.lock){};
275                (sq.release_lock){};
276
277                if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) {
278                        /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8)  );
279                        sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
280                        sq.ready = alloc_align( 64, sq.ready_cnt );
281                        for(i; sq.ready_cnt) {
282                                sq.ready[i] = -1ul32;
283                        }
284                }
285                else {
286                        sq.ready_cnt = 0;
287                        sq.ready = 0p;
288                }
289
290                // completion queue
291                cq.head     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
292                cq.tail     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
293                cq.mask     = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
294                cq.num      = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
295                cq.overflow = (         uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
296                cq.cqes   = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
297
298                // some paranoid checks
299                /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
300                /* paranoid */ verifyf( (*cq.num)  >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
301                /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
302                /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
303
304                /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
305                /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
306                /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
307                /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
308
309                // Update the global ring info
310                this.io->ring_flags = params.flags;
311                this.io->cltr_flags = io_flags;
312                this.io->fd         = fd;
313                this.io->done       = false;
314                (this.io->submit){ min(*sq.num, *cq.num) };
315
316                if(!main_cluster) {
317                        __kernel_io_finish_start( this );
318                }
319        }
320
321        void __kernel_io_finish_start( cluster & this ) {
322                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
323                        __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
324                        (this.io->poller.fast){ this };
325                        __thrd_start( this.io->poller.fast, main );
326                }
327
328                // Create the poller thread
329                __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this);
330                this.io->poller.slow.blocked = false;
331                this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
332        }
333
334        void __kernel_io_prepare_stop( cluster & this ) {
335                __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
336                // Notify the poller thread of the shutdown
337                __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
338
339                // Stop the IO Poller
340                sigval val = { 1 };
341                pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
342                post( this.io->poller.sem );
343
344                // Wait for the poller thread to finish
345                pthread_join( this.io->poller.slow.kthrd, 0p );
346                free( this.io->poller.slow.stack );
347
348                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
349
350                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
351                        with( this.io->poller.fast ) {
352                                /* paranoid */ verify( this.nprocessors == 0 || &this == mainCluster );
353                                /* paranoid */ verify( !ready_mutate_islocked() );
354
355                                // We need to adjust the clean-up based on where the thread is
356                                if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
357
358                                        ready_schedule_lock( (struct __processor_id_t *)active_processor() );
359
360                                                // This is the tricky case
361                                                // The thread was preempted and now it is on the ready queue
362                                                // The thread should be the last on the list
363                                                /* paranoid */ verify( thrd.link.next != 0p );
364
365                                                // Remove the thread from the ready queue of this cluster
366                                                __attribute__((unused)) bool removed = remove_head( &this, &thrd );
367                                                /* paranoid */ verify( removed );
368                                                thrd.link.next = 0p;
369                                                thrd.link.prev = 0p;
370                                                __cfaabi_dbg_debug_do( thrd.unpark_stale = true );
371
372                                                // Fixup the thread state
373                                                thrd.state = Blocked;
374                                                thrd.ticket = 0;
375                                                thrd.preempted = __NO_PREEMPTION;
376
377                                        ready_schedule_unlock( (struct __processor_id_t *)active_processor() );
378
379                                        // Pretend like the thread was blocked all along
380                                }
381                                // !!! This is not an else if !!!
382                                if( thrd.state == Blocked ) {
383
384                                        // This is the "easy case"
385                                        // The thread is parked and can easily be moved to active cluster
386                                        verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
387                                        thrd.curr_cluster = active_cluster();
388
389                                        // unpark the fast io_poller
390                                        unpark( &thrd __cfaabi_dbg_ctx2 );
391                                }
392                                else {
393
394                                        // The thread is in a weird state
395                                        // I don't know what to do here
396                                        abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n");
397                                }
398
399                        }
400
401                        ^(this.io->poller.fast){};
402
403                        __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
404                }
405        }
406
407        void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
408                if(!main_cluster) {
409                        __kernel_io_prepare_stop( this );
410                }
411
412                // Shutdown the io rings
413                struct __submition_data  & sq = this.io->submit_q;
414                struct __completion_data & cq = this.io->completion_q;
415
416                // unmap the submit queue entries
417                munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
418
419                // unmap the Submit Queue ring
420                munmap(sq.ring_ptr, sq.ring_sz);
421
422                // unmap the Completion Queue ring, if it is different
423                if (cq.ring_ptr != sq.ring_ptr) {
424                        munmap(cq.ring_ptr, cq.ring_sz);
425                }
426
427                // close the file descriptor
428                close(this.io->fd);
429
430                free( this.io->submit_q.ready ); // Maybe null, doesn't matter
431                free( this.io );
432        }
433
434        int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) {
435                bool need_sys_to_submit = false;
436                bool need_sys_to_complete = false;
437                unsigned min_complete = 0;
438                unsigned flags = 0;
439
440
441                TO_SUBMIT:
442                if( to_submit > 0 ) {
443                        if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
444                                need_sys_to_submit = true;
445                                break TO_SUBMIT;
446                        }
447                        if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
448                                need_sys_to_submit = true;
449                                flags |= IORING_ENTER_SQ_WAKEUP;
450                        }
451                }
452
453                TO_COMPLETE:
454                if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
455                        flags |= IORING_ENTER_GETEVENTS;
456                        if( mask ) {
457                                need_sys_to_complete = true;
458                                min_complete = 1;
459                                break TO_COMPLETE;
460                        }
461                        if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
462                                need_sys_to_complete = true;
463                        }
464                }
465
466                int ret = 0;
467                if( need_sys_to_submit || need_sys_to_complete ) {
468                        ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);
469                        if( ret < 0 ) {
470                                switch((int)errno) {
471                                case EAGAIN:
472                                case EINTR:
473                                        ret = -1;
474                                        break;
475                                default:
476                                        abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
477                                }
478                        }
479                }
480
481                // Memory barrier
482                __atomic_thread_fence( __ATOMIC_SEQ_CST );
483                return ret;
484        }
485
486//=============================================================================================
487// I/O Polling
488//=============================================================================================
489        static unsigned __collect_submitions( struct __io_data & ring );
490        static uint32_t __release_consumed_submission( struct __io_data & ring );
491
492        // Process a single completion message from the io_uring
493        // This is NOT thread-safe
494        static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) {
495                /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
496
497                unsigned to_submit = 0;
498                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
499                        // If the poller thread also submits, then we need to aggregate the submissions which are ready
500                        to_submit = __collect_submitions( ring );
501                }
502
503                int ret = __io_uring_enter(ring, to_submit, true, mask);
504                if( ret < 0 ) {
505                        return [0, true];
506                }
507
508                // update statistics
509                if (to_submit > 0) {
510                        __STATS__( true,
511                                if( to_submit > 0 ) {
512                                        io.submit_q.submit_avg.rdy += to_submit;
513                                        io.submit_q.submit_avg.csm += ret;
514                                        io.submit_q.submit_avg.cnt += 1;
515                                }
516                        )
517                }
518
519                // Release the consumed SQEs
520                __release_consumed_submission( ring );
521
522                // Drain the queue
523                unsigned head = *ring.completion_q.head;
524                unsigned tail = *ring.completion_q.tail;
525                const uint32_t mask = *ring.completion_q.mask;
526
527                // Nothing was new return 0
528                if (head == tail) {
529                        return [0, to_submit > 0];
530                }
531
532                uint32_t count = tail - head;
533                /* paranoid */ verify( count != 0 );
534                for(i; count) {
535                        unsigned idx = (head + i) & mask;
536                        struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
537
538                        /* paranoid */ verify(&cqe);
539
540                        struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data;
541                        __cfadbg_print_safe( io, "Kernel I/O : Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
542
543                        data->result = cqe.res;
544                        if(!mask) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
545                        else      { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); }
546                }
547
548                // Allow new submissions to happen
549                // V(ring.submit, count);
550
551                // Mark to the kernel that the cqe has been seen
552                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
553                __atomic_thread_fence( __ATOMIC_SEQ_CST );
554                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
555
556                return [count, count > 0 || to_submit > 0];
557        }
558
559        static void * __io_poller_slow( void * arg ) {
560                #if !defined( __CFA_NO_STATISTICS__ )
561                        __stats_t local_stats;
562                        __init_stats( &local_stats );
563                        kernelTLS.this_stats = &local_stats;
564                #endif
565
566                cluster * cltr = (cluster *)arg;
567                struct __io_data & ring = *cltr->io;
568
569                ring.poller.slow.id.id = doregister( &ring.poller.slow.id );
570
571                sigset_t mask;
572                sigfillset(&mask);
573                if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
574                        abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
575                }
576
577                sigdelset( &mask, SIGUSR1 );
578
579                verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
580                verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
581
582                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
583
584                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
585                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
586
587                                __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST );
588
589                                // In the user-thread approach drain and if anything was drained,
590                                // batton pass to the user-thread
591                                int count;
592                                bool again;
593                                [count, again] = __drain_io( ring, &mask );
594
595                                __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
596
597                                // Update statistics
598                                __STATS__( true,
599                                        io.complete_q.completed_avg.val += count;
600                                        io.complete_q.completed_avg.slow_cnt += 1;
601                                )
602
603                                if(again) {
604                                        __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
605                                        __unpark( &ring.poller.slow.id, &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
606                                        wait( ring.poller.sem );
607                                }
608                        }
609                }
610                else {
611                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
612                                //In the naive approach, just poll the io completion queue directly
613                                int count;
614                                bool again;
615                                [count, again] = __drain_io( ring, &mask );
616
617                                // Update statistics
618                                __STATS__( true,
619                                        io.complete_q.completed_avg.val += count;
620                                        io.complete_q.completed_avg.slow_cnt += 1;
621                                )
622                        }
623                }
624
625                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
626
627                unregister( &ring.poller.slow.id );
628
629                #if !defined(__CFA_NO_STATISTICS__)
630                        __tally_stats(cltr->stats, &local_stats);
631                #endif
632
633                return 0p;
634        }
635
636        void main( __io_poller_fast & this ) {
637                verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD );
638
639                // Start parked
640                park( __cfaabi_dbg_ctx );
641
642                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
643
644                int reset = 0;
645
646                // Then loop until we need to start
647                while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
648
649                        // Drain the io
650                        int count;
651                        bool again;
652                        disable_interrupts();
653                                [count, again] = __drain_io( *this.ring, 0p );
654
655                                if(!again) reset++;
656
657                                // Update statistics
658                                __STATS__( true,
659                                        io.complete_q.completed_avg.val += count;
660                                        io.complete_q.completed_avg.fast_cnt += 1;
661                                )
662                        enable_interrupts( __cfaabi_dbg_ctx );
663
664                        // If we got something, just yield and check again
665                        if(reset < 5) {
666                                yield();
667                        }
668                        // We didn't get anything baton pass to the slow poller
669                        else {
670                                __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
671                                reset = 0;
672
673                                // wake up the slow poller
674                                post( this.ring->poller.sem );
675
676                                // park this thread
677                                park( __cfaabi_dbg_ctx );
678                        }
679                }
680
681                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
682        }
683
684        static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
685        static inline void __wake_poller( struct __io_data & ring ) {
686                if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;
687
688                sigval val = { 1 };
689                pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
690        }
691
692//=============================================================================================
693// I/O Submissions
694//=============================================================================================
695
696// Submition steps :
697// 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones
698//     listed in sq.array are visible by the kernel. For those not listed, the kernel does not
699//     offer any assurance that an entry is not being filled by multiple flags. Therefore, we
700//     need to write an allocator that allows allocating concurrently.
701//
702// 2 - Actually fill the submit entry, this is the only simple and straightforward step.
703//
704// 3 - Append the entry index to the array and adjust the tail accordingly. This operation
705//     needs to arrive to two concensus at the same time:
706//     A - The order in which entries are listed in the array: no two threads must pick the
707//         same index for their entries
708//     B - When can the tail be update for the kernel. EVERY entries in the array between
709//         head and tail must be fully filled and shouldn't ever be touched again.
710//
711
712        [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) {
713                /* paranoid */ verify( data != 0 );
714
715                // Prepare the data we need
716                __attribute((unused)) int len   = 0;
717                __attribute((unused)) int block = 0;
718                uint32_t cnt = *ring.submit_q.num;
719                uint32_t mask = *ring.submit_q.mask;
720
721                disable_interrupts();
722                        uint32_t off = __tls_rand();
723                enable_interrupts( __cfaabi_dbg_ctx );
724
725                // Loop around looking for an available spot
726                for() {
727                        // Look through the list starting at some offset
728                        for(i; cnt) {
729                                uint64_t expected = 0;
730                                uint32_t idx = (i + off) & mask;
731                                struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
732                                volatile uint64_t * udata = &sqe->user_data;
733
734                                if( *udata == expected &&
735                                        __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
736                                {
737                                        // update statistics
738                                        __STATS__( false,
739                                                io.submit_q.alloc_avg.val   += len;
740                                                io.submit_q.alloc_avg.block += block;
741                                                io.submit_q.alloc_avg.cnt   += 1;
742                                        )
743
744
745                                        // Success return the data
746                                        return [sqe, idx];
747                                }
748                                verify(expected != data);
749
750                                len ++;
751                        }
752
753                        block++;
754                        yield();
755                }
756        }
757
758        static inline uint32_t __submit_to_ready_array( struct __io_data & ring, uint32_t idx, const uint32_t mask ) {
759                /* paranoid */ verify( idx <= mask   );
760                /* paranoid */ verify( idx != -1ul32 );
761
762                // We need to find a spot in the ready array
763                __attribute((unused)) int len   = 0;
764                __attribute((unused)) int block = 0;
765                uint32_t ready_mask = ring.submit_q.ready_cnt - 1;
766
767                disable_interrupts();
768                        uint32_t off = __tls_rand();
769                enable_interrupts( __cfaabi_dbg_ctx );
770
771                uint32_t picked;
772                LOOKING: for() {
773                        for(i; ring.submit_q.ready_cnt) {
774                                picked = (i + off) & ready_mask;
775                                uint32_t expected = -1ul32;
776                                if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
777                                        break LOOKING;
778                                }
779                                verify(expected != idx);
780
781                                len ++;
782                        }
783
784                        block++;
785                        if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
786                                __release_consumed_submission( ring );
787                                unlock( ring.submit_q.lock );
788                        }
789                        else {
790                                yield();
791                        }
792                }
793
794                // update statistics
795                __STATS__( false,
796                        io.submit_q.look_avg.val   += len;
797                        io.submit_q.look_avg.block += block;
798                        io.submit_q.look_avg.cnt   += 1;
799                )
800
801                return picked;
802        }
803
804        void __submit( struct __io_data & ring, uint32_t idx ) {
805                // Get now the data we definetely need
806                uint32_t * const tail = ring.submit_q.tail;
807                const uint32_t mask = *ring.submit_q.mask;
808
809                // There are 2 submission schemes, check which one we are using
810                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
811                        // If the poller thread submits, then we just need to add this to the ready array
812                        __submit_to_ready_array( ring, idx, mask );
813
814                        __wake_poller( ring );
815
816                        __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
817                }
818                else if( ring.cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS ) {
819                        uint32_t picked = __submit_to_ready_array( ring, idx, mask );
820
821                        for() {
822                                yield();
823
824                                // If some one else collected our index, we are done
825                                #warning ABA problem
826                                if( ring.submit_q.ready[picked] != idx ) {
827                                        __STATS__( false,
828                                                io.submit_q.helped += 1;
829                                        )
830                                        return;
831                                }
832
833                                if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
834                                        __STATS__( false,
835                                                io.submit_q.leader += 1;
836                                        )
837                                        break;
838                                }
839
840                                __STATS__( false,
841                                        io.submit_q.busy += 1;
842                                )
843                        }
844
845                        // We got the lock
846                        unsigned to_submit = __collect_submitions( ring );
847                        int ret = __io_uring_enter( ring, to_submit, false, 0p );
848                        if( ret < 0 ) {
849                                unlock(ring.submit_q.lock);
850                                return;
851                        }
852
853                        /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) );
854
855                        // Release the consumed SQEs
856                        __release_consumed_submission( ring );
857
858                        // update statistics
859                        __STATS__( true,
860                                io.submit_q.submit_avg.rdy += to_submit;
861                                io.submit_q.submit_avg.csm += ret;
862                                io.submit_q.submit_avg.cnt += 1;
863                        )
864
865                        unlock(ring.submit_q.lock);
866                }
867                else {
868                        // get mutual exclusion
869                        lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
870
871                        /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
872                        /* paranoid */  "index %u already reclaimed\n"
873                        /* paranoid */  "head %u, prev %u, tail %u\n"
874                        /* paranoid */  "[-0: %u,-1: %u,-2: %u,-3: %u]\n",
875                        /* paranoid */  idx,
876                        /* paranoid */  *ring.submit_q.head, ring.submit_q.prev_head, *tail
877                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
878                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
879                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
880                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
881                        /* paranoid */ );
882
883                        // Append to the list of ready entries
884
885                        /* paranoid */ verify( idx <= mask );
886                        ring.submit_q.array[ (*tail) & mask ] = idx;
887                        __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
888
889                        // Submit however, many entries need to be submitted
890                        int ret = __io_uring_enter( ring, 1, false, 0p );
891                        if( ret < 0 ) {
892                                switch((int)errno) {
893                                default:
894                                        abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
895                                }
896                        }
897
898                        // update statistics
899                        __STATS__( false,
900                                io.submit_q.submit_avg.csm += 1;
901                                io.submit_q.submit_avg.cnt += 1;
902                        )
903
904                        // Release the consumed SQEs
905                        __release_consumed_submission( ring );
906
907                        unlock(ring.submit_q.lock);
908
909                        __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
910                }
911        }
912
913        static unsigned __collect_submitions( struct __io_data & ring ) {
914                /* paranoid */ verify( ring.submit_q.ready != 0p );
915                /* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
916
917                unsigned to_submit = 0;
918                uint32_t tail = *ring.submit_q.tail;
919                const uint32_t mask = *ring.submit_q.mask;
920
921                // Go through the list of ready submissions
922                for( i; ring.submit_q.ready_cnt ) {
923                        // replace any submission with the sentinel, to consume it.
924                        uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
925
926                        // If it was already the sentinel, then we are done
927                        if( idx == -1ul32 ) continue;
928
929                        // If we got a real submission, append it to the list
930                        ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
931                        to_submit++;
932                }
933
934                // Increment the tail based on how many we are ready to submit
935                __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
936
937                return to_submit;
938        }
939
940        static uint32_t __release_consumed_submission( struct __io_data & ring ) {
941                const uint32_t smask = *ring.submit_q.mask;
942
943                if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
944                uint32_t chead = *ring.submit_q.head;
945                uint32_t phead = ring.submit_q.prev_head;
946                ring.submit_q.prev_head = chead;
947                unlock(ring.submit_q.release_lock);
948
949                uint32_t count = chead - phead;
950                for( i; count ) {
951                        uint32_t idx = ring.submit_q.array[ (phead + i) & smask ];
952                        ring.submit_q.sqes[ idx ].user_data = 0;
953                }
954                return count;
955        }
956
957//=============================================================================================
958// I/O Submissions
959//=============================================================================================
960
961        void register_fixed_files( cluster & cl, int * files, unsigned count ) {
962                int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );
963                if( ret < 0 ) {
964                        abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
965                }
966
967                __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
968        }
969#endif
Note: See TracBrowser for help on using the repository browser.