source: libcfa/src/concurrency/io.cfa @ 5751a56

arm-ehjacob/cs343-translationnew-astnew-ast-unique-expr
Last change on this file since 5751a56 was 5751a56, checked in by Thierry Delisle <tdelisle@…>, 17 months ago

Changed handling of io_uring support to handle op codes in configure.
Kernel probing for op codes, not supported yet.

  • Property mode set to 100644
File size: 29.9 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Thu Apr 23 17:31:00 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#if defined(__CFA_DEBUG__)
17        // #define __CFA_DEBUG_PRINT_IO__
18        // #define __CFA_DEBUG_PRINT_IO_CORE__
19#endif
20
21#include "kernel.hfa"
22#include "bitmanip.hfa"
23
24#if !defined(CFA_HAVE_LINUX_IO_URING_H)
25        void __kernel_io_startup( cluster &, unsigned, bool ) {
26                // Nothing to do without io_uring
27        }
28
29        void __kernel_io_finish_start( cluster & ) {
30                // Nothing to do without io_uring
31        }
32
33        void __kernel_io_prepare_stop( cluster & ) {
34                // Nothing to do without io_uring
35        }
36
37        void __kernel_io_shutdown( cluster &, bool ) {
38                // Nothing to do without io_uring
39        }
40
41#else
42        #define _GNU_SOURCE         /* See feature_test_macros(7) */
43        #include <errno.h>
44        #include <stdint.h>
45        #include <string.h>
46        #include <unistd.h>
47        #include <sys/mman.h>
48
49        extern "C" {
50                #include <sys/syscall.h>
51
52                #include <linux/io_uring.h>
53        }
54
55        #include "bits/signal.hfa"
56        #include "kernel_private.hfa"
57        #include "thread.hfa"
58
59        uint32_t entries_per_cluster() {
60                return 256;
61        }
62
63        static void * __io_poller_slow( void * arg );
64
65        // Weirdly, some systems that do support io_uring don't actually define these
66        #ifdef __alpha__
67                /*
68                * alpha is the only exception, all other architectures
69                * have common numbers for new system calls.
70                */
71                #ifndef __NR_io_uring_setup
72                        #define __NR_io_uring_setup           535
73                #endif
74                #ifndef __NR_io_uring_enter
75                        #define __NR_io_uring_enter           536
76                #endif
77                #ifndef __NR_io_uring_register
78                        #define __NR_io_uring_register        537
79                #endif
80        #else /* !__alpha__ */
81                #ifndef __NR_io_uring_setup
82                        #define __NR_io_uring_setup           425
83                #endif
84                #ifndef __NR_io_uring_enter
85                        #define __NR_io_uring_enter           426
86                #endif
87                #ifndef __NR_io_uring_register
88                        #define __NR_io_uring_register        427
89                #endif
90        #endif
91
92        // Fast poller user-thread
93        // Not using the "thread" keyword because we want to control
94        // more carefully when to start/stop it
95        struct __io_poller_fast {
96                struct __io_data * ring;
97                $thread thrd;
98        };
99
100        void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
101                this.ring = cltr.io;
102                (this.thrd){ "Fast I/O Poller", cltr };
103        }
104        void ^?{}( __io_poller_fast & mutex this );
105        void main( __io_poller_fast & this );
106        static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
107        void ^?{}( __io_poller_fast & mutex this ) {}
108
109        struct __submition_data {
110                // Head and tail of the ring (associated with array)
111                volatile uint32_t * head;
112                volatile uint32_t * tail;
113                volatile uint32_t prev_head;
114
115                // The actual kernel ring which uses head/tail
116                // indexes into the sqes arrays
117                uint32_t * array;
118
119                // number of entries and mask to go with it
120                const uint32_t * num;
121                const uint32_t * mask;
122
123                // Submission flags (Not sure what for)
124                uint32_t * flags;
125
126                // number of sqes not submitted (whatever that means)
127                uint32_t * dropped;
128
129                // Like head/tail but not seen by the kernel
130                volatile uint32_t * ready;
131                uint32_t ready_cnt;
132
133                __spinlock_t lock;
134                __spinlock_t release_lock;
135
136                // A buffer of sqes (not the actual ring)
137                struct io_uring_sqe * sqes;
138
139                // The location and size of the mmaped area
140                void * ring_ptr;
141                size_t ring_sz;
142        };
143
144        struct __completion_data {
145                // Head and tail of the ring
146                volatile uint32_t * head;
147                volatile uint32_t * tail;
148
149                // number of entries and mask to go with it
150                const uint32_t * mask;
151                const uint32_t * num;
152
153                // number of cqes not submitted (whatever that means)
154                uint32_t * overflow;
155
156                // the kernel ring
157                struct io_uring_cqe * cqes;
158
159                // The location and size of the mmaped area
160                void * ring_ptr;
161                size_t ring_sz;
162        };
163
164        struct __io_data {
165                struct __submition_data submit_q;
166                struct __completion_data completion_q;
167                uint32_t ring_flags;
168                int cltr_flags;
169                int fd;
170                semaphore submit;
171                volatile bool done;
172                struct {
173                        struct {
174                                __processor_id_t id;
175                                void * stack;
176                                pthread_t kthrd;
177                                volatile bool blocked;
178                        } slow;
179                        __io_poller_fast fast;
180                        __bin_sem_t sem;
181                } poller;
182        };
183
184//=============================================================================================
185// I/O Startup / Shutdown logic
186//=============================================================================================
187        void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
188                if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) {
189                        abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n");
190                }
191
192                this.io = malloc();
193
194                // Step 1 : call to setup
195                struct io_uring_params params;
196                memset(&params, 0, sizeof(params));
197                if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS   ) params.flags |= IORING_SETUP_SQPOLL;
198                if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES ) params.flags |= IORING_SETUP_IOPOLL;
199
200                uint32_t nentries = entries_per_cluster();
201
202                int fd = syscall(__NR_io_uring_setup, nentries, &params );
203                if(fd < 0) {
204                        abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
205                }
206
207                // Step 2 : mmap result
208                memset( this.io, 0, sizeof(struct __io_data) );
209                struct __submition_data  & sq = this.io->submit_q;
210                struct __completion_data & cq = this.io->completion_q;
211
212                // calculate the right ring size
213                sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
214                cq.ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
215
216                // Requires features
217                #if defined(IORING_FEAT_SINGLE_MMAP)
218                        // adjust the size according to the parameters
219                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
220                                cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz);
221                        }
222                #endif
223
224                // mmap the Submit Queue into existence
225                sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
226                if (sq.ring_ptr == (void*)MAP_FAILED) {
227                        abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
228                }
229
230                // Requires features
231                #if defined(IORING_FEAT_SINGLE_MMAP)
232                        // mmap the Completion Queue into existence (may or may not be needed)
233                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
234                                cq.ring_ptr = sq.ring_ptr;
235                        }
236                        else
237                #endif
238                {
239                        // We need multiple call to MMAP
240                        cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
241                        if (cq.ring_ptr == (void*)MAP_FAILED) {
242                                munmap(sq.ring_ptr, sq.ring_sz);
243                                abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
244                        }
245                }
246
247                // mmap the submit queue entries
248                size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
249                sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
250                if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
251                        munmap(sq.ring_ptr, sq.ring_sz);
252                        if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
253                        abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
254                }
255
256                // Get the pointers from the kernel to fill the structure
257                // submit queue
258                sq.head    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
259                sq.tail    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
260                sq.mask    = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
261                sq.num     = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
262                sq.flags   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
263                sq.dropped = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
264                sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
265                sq.prev_head = *sq.head;
266
267                {
268                        const uint32_t num = *sq.num;
269                        for( i; num ) {
270                                sq.sqes[i].user_data = 0ul64;
271                        }
272                }
273
274                (sq.lock){};
275                (sq.release_lock){};
276
277                if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) {
278                        /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8)  );
279                        sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
280                        sq.ready = alloc_align( 64, sq.ready_cnt );
281                        for(i; sq.ready_cnt) {
282                                sq.ready[i] = -1ul32;
283                        }
284                }
285                else {
286                        sq.ready_cnt = 0;
287                        sq.ready = 0p;
288                }
289
290                // completion queue
291                cq.head     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
292                cq.tail     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
293                cq.mask     = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
294                cq.num      = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
295                cq.overflow = (         uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
296                cq.cqes   = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
297
298                // some paranoid checks
299                /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
300                /* paranoid */ verifyf( (*cq.num)  >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
301                /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
302                /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
303
304                /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
305                /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
306                /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
307                /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
308
309                // Update the global ring info
310                this.io->ring_flags = params.flags;
311                this.io->cltr_flags = io_flags;
312                this.io->fd         = fd;
313                this.io->done       = false;
314                (this.io->submit){ min(*sq.num, *cq.num) };
315
316                if(!main_cluster) {
317                        __kernel_io_finish_start( this );
318                }
319        }
320
321        void __kernel_io_finish_start( cluster & this ) {
322                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
323                        __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
324                        (this.io->poller.fast){ this };
325                        __thrd_start( this.io->poller.fast, main );
326                }
327
328                // Create the poller thread
329                __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this);
330                this.io->poller.slow.blocked = false;
331                this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
332        }
333
334        void __kernel_io_prepare_stop( cluster & this ) {
335                __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
336                // Notify the poller thread of the shutdown
337                __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
338
339                // Stop the IO Poller
340                sigval val = { 1 };
341                pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
342                post( this.io->poller.sem );
343
344                // Wait for the poller thread to finish
345                pthread_join( this.io->poller.slow.kthrd, 0p );
346                free( this.io->poller.slow.stack );
347
348                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
349
350                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
351                        with( this.io->poller.fast ) {
352                                /* paranoid */ verify( this.nprocessors == 0 || &this == mainCluster );
353                                /* paranoid */ verify( !ready_mutate_islocked() );
354
355                                // We need to adjust the clean-up based on where the thread is
356                                if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
357
358                                        ready_schedule_lock( (struct __processor_id_t *)active_processor() );
359
360                                                // This is the tricky case
361                                                // The thread was preempted and now it is on the ready queue
362                                                // The thread should be the last on the list
363                                                /* paranoid */ verify( thrd.link.next != 0p );
364
365                                                // Remove the thread from the ready queue of this cluster
366                                                __attribute__((unused)) bool removed = remove_head( &this, &thrd );
367                                                /* paranoid */ verify( removed );
368                                                thrd.link.next = 0p;
369                                                thrd.link.prev = 0p;
370                                                __cfaabi_dbg_debug_do( thrd.unpark_stale = true );
371
372                                                // Fixup the thread state
373                                                thrd.state = Blocked;
374                                                thrd.ticket = 0;
375                                                thrd.preempted = __NO_PREEMPTION;
376
377                                        ready_schedule_unlock( (struct __processor_id_t *)active_processor() );
378
379                                        // Pretend like the thread was blocked all along
380                                }
381                                // !!! This is not an else if !!!
382                                if( thrd.state == Blocked ) {
383
384                                        // This is the "easy case"
385                                        // The thread is parked and can easily be moved to active cluster
386                                        verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
387                                        thrd.curr_cluster = active_cluster();
388
389                                        // unpark the fast io_poller
390                                        unpark( &thrd __cfaabi_dbg_ctx2 );
391                                }
392                                else {
393
394                                        // The thread is in a weird state
395                                        // I don't know what to do here
396                                        abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n");
397                                }
398
399                        }
400
401                        ^(this.io->poller.fast){};
402
403                        __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
404                }
405        }
406
407        void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
408                if(!main_cluster) {
409                        __kernel_io_prepare_stop( this );
410                }
411
412                // Shutdown the io rings
413                struct __submition_data  & sq = this.io->submit_q;
414                struct __completion_data & cq = this.io->completion_q;
415
416                // unmap the submit queue entries
417                munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
418
419                // unmap the Submit Queue ring
420                munmap(sq.ring_ptr, sq.ring_sz);
421
422                // unmap the Completion Queue ring, if it is different
423                if (cq.ring_ptr != sq.ring_ptr) {
424                        munmap(cq.ring_ptr, cq.ring_sz);
425                }
426
427                // close the file descriptor
428                close(this.io->fd);
429
430                free( this.io->submit_q.ready ); // Maybe null, doesn't matter
431                free( this.io );
432        }
433
434        int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) {
435                bool need_sys_to_submit = false;
436                bool need_sys_to_complete = false;
437                unsigned min_complete = 0;
438                unsigned flags = 0;
439
440
441                TO_SUBMIT:
442                if( to_submit > 0 ) {
443                        if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
444                                need_sys_to_submit = true;
445                                break TO_SUBMIT;
446                        }
447                        if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
448                                need_sys_to_submit = true;
449                                flags |= IORING_ENTER_SQ_WAKEUP;
450                        }
451                }
452
453                TO_COMPLETE:
454                if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
455                        flags |= IORING_ENTER_GETEVENTS;
456                        if( mask ) {
457                                need_sys_to_complete = true;
458                                min_complete = 1;
459                                break TO_COMPLETE;
460                        }
461                        if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
462                                need_sys_to_complete = true;
463                        }
464                }
465
466                int ret = 0;
467                if( need_sys_to_submit || need_sys_to_complete ) {
468                        ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);
469                        if( ret < 0 ) {
470                                switch((int)errno) {
471                                case EAGAIN:
472                                case EINTR:
473                                        ret = -1;
474                                        break;
475                                default:
476                                        abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
477                                }
478                        }
479                }
480
481                // Memory barrier
482                __atomic_thread_fence( __ATOMIC_SEQ_CST );
483                return ret;
484        }
485
486//=============================================================================================
487// I/O Polling
488//=============================================================================================
489        static unsigned __collect_submitions( struct __io_data & ring );
490        static uint32_t __release_consumed_submission( struct __io_data & ring );
491
492        static inline void process(struct io_uring_cqe & cqe, struct __processor_id_t * id ) {
493                struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data;
494                __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
495
496                data->result = cqe.res;
497                if(!id) { unpark(     data->thrd __cfaabi_dbg_ctx2 ); }
498                else  { __unpark( id, data->thrd __cfaabi_dbg_ctx2 ); }
499        }
500
501        // Process a single completion message from the io_uring
502        // This is NOT thread-safe
503        static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) {
504                /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
505
506                unsigned to_submit = 0;
507                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
508                        // If the poller thread also submits, then we need to aggregate the submissions which are ready
509                        to_submit = __collect_submitions( ring );
510                }
511
512                int ret = __io_uring_enter(ring, to_submit, true, mask);
513                if( ret < 0 ) {
514                        return [0, true];
515                }
516
517                // update statistics
518                if (to_submit > 0) {
519                        __STATS__( true,
520                                if( to_submit > 0 ) {
521                                        io.submit_q.submit_avg.rdy += to_submit;
522                                        io.submit_q.submit_avg.csm += ret;
523                                        io.submit_q.submit_avg.cnt += 1;
524                                }
525                        )
526                }
527
528                // Release the consumed SQEs
529                __release_consumed_submission( ring );
530
531                // Drain the queue
532                unsigned head = *ring.completion_q.head;
533                unsigned tail = *ring.completion_q.tail;
534                const uint32_t mask = *ring.completion_q.mask;
535
536                // Nothing was new return 0
537                if (head == tail) {
538                        return [0, to_submit > 0];
539                }
540
541                uint32_t count = tail - head;
542                /* paranoid */ verify( count != 0 );
543                for(i; count) {
544                        unsigned idx = (head + i) & mask;
545                        struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
546
547                        /* paranoid */ verify(&cqe);
548
549                        process( cqe, !mask ? (struct __processor_id_t *)0p : &ring.poller.slow.id );
550                }
551
552                // Allow new submissions to happen
553                // V(ring.submit, count);
554
555                // Mark to the kernel that the cqe has been seen
556                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
557                __atomic_thread_fence( __ATOMIC_SEQ_CST );
558                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
559
560                return [count, count > 0 || to_submit > 0];
561        }
562
563        static void * __io_poller_slow( void * arg ) {
564                #if !defined( __CFA_NO_STATISTICS__ )
565                        __stats_t local_stats;
566                        __init_stats( &local_stats );
567                        kernelTLS.this_stats = &local_stats;
568                #endif
569
570                cluster * cltr = (cluster *)arg;
571                struct __io_data & ring = *cltr->io;
572
573                ring.poller.slow.id.id = doregister( &ring.poller.slow.id );
574
575                sigset_t mask;
576                sigfillset(&mask);
577                if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
578                        abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
579                }
580
581                sigdelset( &mask, SIGUSR1 );
582
583                verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
584                verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
585
586                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
587
588                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
589                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
590
591                                __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST );
592
593                                // In the user-thread approach drain and if anything was drained,
594                                // batton pass to the user-thread
595                                int count;
596                                bool again;
597                                [count, again] = __drain_io( ring, &mask );
598
599                                __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
600
601                                // Update statistics
602                                __STATS__( true,
603                                        io.complete_q.completed_avg.val += count;
604                                        io.complete_q.completed_avg.slow_cnt += 1;
605                                )
606
607                                if(again) {
608                                        __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
609                                        __unpark( &ring.poller.slow.id, &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
610                                        wait( ring.poller.sem );
611                                }
612                        }
613                }
614                else {
615                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
616                                //In the naive approach, just poll the io completion queue directly
617                                int count;
618                                bool again;
619                                [count, again] = __drain_io( ring, &mask );
620
621                                // Update statistics
622                                __STATS__( true,
623                                        io.complete_q.completed_avg.val += count;
624                                        io.complete_q.completed_avg.slow_cnt += 1;
625                                )
626                        }
627                }
628
629                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
630
631                unregister( &ring.poller.slow.id );
632
633                #if !defined(__CFA_NO_STATISTICS__)
634                        __tally_stats(cltr->stats, &local_stats);
635                #endif
636
637                return 0p;
638        }
639
640        void main( __io_poller_fast & this ) {
641                verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD );
642
643                // Start parked
644                park( __cfaabi_dbg_ctx );
645
646                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
647
648                int reset = 0;
649
650                // Then loop until we need to start
651                while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
652
653                        // Drain the io
654                        int count;
655                        bool again;
656                        disable_interrupts();
657                                [count, again] = __drain_io( *this.ring, 0p );
658
659                                if(!again) reset++;
660
661                                // Update statistics
662                                __STATS__( true,
663                                        io.complete_q.completed_avg.val += count;
664                                        io.complete_q.completed_avg.fast_cnt += 1;
665                                )
666                        enable_interrupts( __cfaabi_dbg_ctx );
667
668                        // If we got something, just yield and check again
669                        if(reset < 5) {
670                                yield();
671                        }
672                        // We didn't get anything baton pass to the slow poller
673                        else {
674                                __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
675                                reset = 0;
676
677                                // wake up the slow poller
678                                post( this.ring->poller.sem );
679
680                                // park this thread
681                                park( __cfaabi_dbg_ctx );
682                        }
683                }
684
685                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
686        }
687
688        static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
689        static inline void __wake_poller( struct __io_data & ring ) {
690                if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;
691
692                sigval val = { 1 };
693                pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
694        }
695
696//=============================================================================================
697// I/O Submissions
698//=============================================================================================
699
700// Submition steps :
701// 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones
702//     listed in sq.array are visible by the kernel. For those not listed, the kernel does not
703//     offer any assurance that an entry is not being filled by multiple flags. Therefore, we
704//     need to write an allocator that allows allocating concurrently.
705//
706// 2 - Actually fill the submit entry, this is the only simple and straightforward step.
707//
708// 3 - Append the entry index to the array and adjust the tail accordingly. This operation
709//     needs to arrive to two concensus at the same time:
710//     A - The order in which entries are listed in the array: no two threads must pick the
711//         same index for their entries
712//     B - When can the tail be update for the kernel. EVERY entries in the array between
713//         head and tail must be fully filled and shouldn't ever be touched again.
714//
715
716        [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) {
717                /* paranoid */ verify( data != 0 );
718
719                // Prepare the data we need
720                __attribute((unused)) int len   = 0;
721                __attribute((unused)) int block = 0;
722                uint32_t cnt = *ring.submit_q.num;
723                uint32_t mask = *ring.submit_q.mask;
724
725                disable_interrupts();
726                        uint32_t off = __tls_rand();
727                enable_interrupts( __cfaabi_dbg_ctx );
728
729                // Loop around looking for an available spot
730                for() {
731                        // Look through the list starting at some offset
732                        for(i; cnt) {
733                                uint64_t expected = 0;
734                                uint32_t idx = (i + off) & mask;
735                                struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
736                                volatile uint64_t * udata = &sqe->user_data;
737
738                                if( *udata == expected &&
739                                        __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
740                                {
741                                        // update statistics
742                                        __STATS__( false,
743                                                io.submit_q.alloc_avg.val   += len;
744                                                io.submit_q.alloc_avg.block += block;
745                                                io.submit_q.alloc_avg.cnt   += 1;
746                                        )
747
748
749                                        // Success return the data
750                                        return [sqe, idx];
751                                }
752                                verify(expected != data);
753
754                                len ++;
755                        }
756
757                        block++;
758                        yield();
759                }
760        }
761
762        static inline uint32_t __submit_to_ready_array( struct __io_data & ring, uint32_t idx, const uint32_t mask ) {
763                /* paranoid */ verify( idx <= mask   );
764                /* paranoid */ verify( idx != -1ul32 );
765
766                // We need to find a spot in the ready array
767                __attribute((unused)) int len   = 0;
768                __attribute((unused)) int block = 0;
769                uint32_t ready_mask = ring.submit_q.ready_cnt - 1;
770
771                disable_interrupts();
772                        uint32_t off = __tls_rand();
773                enable_interrupts( __cfaabi_dbg_ctx );
774
775                uint32_t picked;
776                LOOKING: for() {
777                        for(i; ring.submit_q.ready_cnt) {
778                                picked = (i + off) & ready_mask;
779                                uint32_t expected = -1ul32;
780                                if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
781                                        break LOOKING;
782                                }
783                                verify(expected != idx);
784
785                                len ++;
786                        }
787
788                        block++;
789                        if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
790                                __release_consumed_submission( ring );
791                                unlock( ring.submit_q.lock );
792                        }
793                        else {
794                                yield();
795                        }
796                }
797
798                // update statistics
799                __STATS__( false,
800                        io.submit_q.look_avg.val   += len;
801                        io.submit_q.look_avg.block += block;
802                        io.submit_q.look_avg.cnt   += 1;
803                )
804
805                return picked;
806        }
807
808        void __submit( struct __io_data & ring, uint32_t idx ) {
809                // Get now the data we definetely need
810                uint32_t * const tail = ring.submit_q.tail;
811                const uint32_t mask = *ring.submit_q.mask;
812
813                // There are 2 submission schemes, check which one we are using
814                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
815                        // If the poller thread submits, then we just need to add this to the ready array
816                        __submit_to_ready_array( ring, idx, mask );
817
818                        __wake_poller( ring );
819
820                        __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
821                }
822                else if( ring.cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS ) {
823                        uint32_t picked = __submit_to_ready_array( ring, idx, mask );
824
825                        for() {
826                                yield();
827
828                                // If some one else collected our index, we are done
829                                #warning ABA problem
830                                if( ring.submit_q.ready[picked] != idx ) {
831                                        __STATS__( false,
832                                                io.submit_q.helped += 1;
833                                        )
834                                        return;
835                                }
836
837                                if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
838                                        __STATS__( false,
839                                                io.submit_q.leader += 1;
840                                        )
841                                        break;
842                                }
843
844                                __STATS__( false,
845                                        io.submit_q.busy += 1;
846                                )
847                        }
848
849                        // We got the lock
850                        unsigned to_submit = __collect_submitions( ring );
851                        int ret = __io_uring_enter( ring, to_submit, false, 0p );
852                        if( ret < 0 ) {
853                                unlock(ring.submit_q.lock);
854                                return;
855                        }
856
857                        /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) );
858
859                        // Release the consumed SQEs
860                        __release_consumed_submission( ring );
861
862                        // update statistics
863                        __STATS__( true,
864                                io.submit_q.submit_avg.rdy += to_submit;
865                                io.submit_q.submit_avg.csm += ret;
866                                io.submit_q.submit_avg.cnt += 1;
867                        )
868
869                        unlock(ring.submit_q.lock);
870                }
871                else {
872                        // get mutual exclusion
873                        lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
874
875                        /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
876                        /* paranoid */  "index %u already reclaimed\n"
877                        /* paranoid */  "head %u, prev %u, tail %u\n"
878                        /* paranoid */  "[-0: %u,-1: %u,-2: %u,-3: %u]\n",
879                        /* paranoid */  idx,
880                        /* paranoid */  *ring.submit_q.head, ring.submit_q.prev_head, *tail
881                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
882                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
883                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
884                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
885                        /* paranoid */ );
886
887                        // Append to the list of ready entries
888
889                        /* paranoid */ verify( idx <= mask );
890                        ring.submit_q.array[ (*tail) & mask ] = idx;
891                        __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
892
893                        // Submit however, many entries need to be submitted
894                        int ret = __io_uring_enter( ring, 1, false, 0p );
895                        if( ret < 0 ) {
896                                switch((int)errno) {
897                                default:
898                                        abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
899                                }
900                        }
901
902                        // update statistics
903                        __STATS__( false,
904                                io.submit_q.submit_avg.csm += 1;
905                                io.submit_q.submit_avg.cnt += 1;
906                        )
907
908                        // Release the consumed SQEs
909                        __release_consumed_submission( ring );
910
911                        unlock(ring.submit_q.lock);
912
913                        __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
914                }
915        }
916
917        static unsigned __collect_submitions( struct __io_data & ring ) {
918                /* paranoid */ verify( ring.submit_q.ready != 0p );
919                /* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
920
921                unsigned to_submit = 0;
922                uint32_t tail = *ring.submit_q.tail;
923                const uint32_t mask = *ring.submit_q.mask;
924
925                // Go through the list of ready submissions
926                for( i; ring.submit_q.ready_cnt ) {
927                        // replace any submission with the sentinel, to consume it.
928                        uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
929
930                        // If it was already the sentinel, then we are done
931                        if( idx == -1ul32 ) continue;
932
933                        // If we got a real submission, append it to the list
934                        ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
935                        to_submit++;
936                }
937
938                // Increment the tail based on how many we are ready to submit
939                __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
940
941                return to_submit;
942        }
943
944        static uint32_t __release_consumed_submission( struct __io_data & ring ) {
945                const uint32_t smask = *ring.submit_q.mask;
946
947                if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
948                uint32_t chead = *ring.submit_q.head;
949                uint32_t phead = ring.submit_q.prev_head;
950                ring.submit_q.prev_head = chead;
951                unlock(ring.submit_q.release_lock);
952
953                uint32_t count = chead - phead;
954                for( i; count ) {
955                        uint32_t idx = ring.submit_q.array[ (phead + i) & smask ];
956                        ring.submit_q.sqes[ idx ].user_data = 0;
957                }
958                return count;
959        }
960
961//=============================================================================================
962// I/O Submissions
963//=============================================================================================
964
965        void register_fixed_files( cluster & cl, int * files, unsigned count ) {
966                int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );
967                if( ret < 0 ) {
968                        abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
969                }
970
971                __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
972        }
973#endif
Note: See TracBrowser for help on using the repository browser.