Changeset 92976d9


Ignore:
Timestamp:
Apr 10, 2020, 11:20:31 AM (4 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
7df014f
Parents:
72828a8
Message:

Implemented basic io_uring setup and poller

Location:
libcfa/src/concurrency
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    r72828a8 r92976d9  
     1#include "kernel.hfa"
     2
     3#if !defined(HAVE_LINUX_IO_URING_H)
     4        void __kernel_io_startup( cluster & this ) {
     5                // Nothing to do without io_uring
     6        }
     7
     8        void __kernel_io_shutdown( cluster & this ) {
     9                // Nothing to do without io_uring
     10        }
     11
     12        bool is_async( void (*)() ) {
     13                return false;
     14        }
     15
     16#else
     17        extern "C" {
     18                #define _GNU_SOURCE         /* See feature_test_macros(7) */
     19                #include <errno.h>
     20                #include <stdint.h>
     21                #include <string.h>
     22                #include <unistd.h>
     23                #include <sys/mman.h>
     24                #include <sys/syscall.h>
     25
     26                #include <linux/io_uring.h>
     27        }
     28
     29        #include "bits/signal.hfa"
     30        #include "kernel_private.hfa"
     31        #include "thread.hfa"
     32
     33        uint32_t entries_per_cluster() {
     34                return 256;
     35        }
     36
     37        static void * __io_poller( void * arg );
     38
     39//=============================================================================================
     40// I/O Startup / Shutdown logic
     41//=============================================================================================
     42        void __kernel_io_startup( cluster & this ) {
     43                // Step 1 : call to setup
     44                struct io_uring_params params;
     45                memset(&params, 0, sizeof(params));
     46
     47                int fd = syscall(__NR_io_uring_setup, entries_per_cluster(), &params );
     48                if(fd < 0) {
     49                        abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
     50                }
     51
     52                // Step 2 : mmap result
     53                memset(&this.io, 0, sizeof(struct io_ring));
     54                struct io_uring_sq * sq = &this.io.submit_q;
     55                struct io_uring_cq * cq = &this.io.completion_q;
     56
     57                // calculate the right ring size
     58                sq->ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
     59                cq->ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
     60
     61                // Requires features
     62                // // adjust the size according to the parameters
     63                // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
     64                //      cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz);
     65                // }
     66
     67                // mmap the Submit Queue into existence
     68                sq->ring_ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
     69                if (sq->ring_ptr == (void*)MAP_FAILED) {
     70                        abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
     71                }
     72
     73                // mmap the Completion Queue into existence (may or may not be needed)
     74                // Requires features
     75                // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
     76                //      cq->ring_ptr = sq->ring_ptr;
     77                // }
     78                // else {
     79                        // We need multiple call to MMAP
     80                        cq->ring_ptr = mmap(0, cq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
     81                        if (cq->ring_ptr == (void*)MAP_FAILED) {
     82                                munmap(sq->ring_ptr, sq->ring_sz);
     83                                abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
     84                        }
     85                // }
     86
     87                // mmap the submit queue entries
     88                size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
     89                sq->sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
     90                if (sq->sqes == (struct io_uring_sqe *)MAP_FAILED) {
     91                        munmap(sq->ring_ptr, sq->ring_sz);
     92                        if (cq->ring_ptr != sq->ring_ptr) munmap(cq->ring_ptr, cq->ring_sz);
     93                        abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
     94                }
     95
     96                // Get the pointers from the kernel to fill the structure
     97                // submit queue
     98                sq->head    = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.head;
     99                sq->tail    = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.tail;
     100                sq->mask    = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.ring_mask;
     101                sq->entries = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.ring_entries;
     102                sq->flags   = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.flags;
     103                sq->dropped = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.dropped;
     104                sq->array   = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.array;
     105
     106                // completion queue
     107                cq->head     = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.head;
     108                cq->tail     = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.tail;
     109                cq->mask     = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.ring_mask;
     110                cq->entries  = (struct io_uring_cqe *)((intptr_t)cq->ring_ptr) + params.cq_off.ring_entries;
     111                cq->overflow = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.overflow;
     112                cq->cqes = (struct io_uring_cqe *)((intptr_t)cq->ring_ptr) + params.cq_off.cqes;
     113
     114                // Update the global ring info
     115                this.io.flags = params.flags;
     116                this.io.fd    = fd;
     117                this.io.done  = false;
     118
     119                // Create the poller thread
     120                this.io.stack = __create_pthread( &this.io.poller, __io_poller, &this );
     121        }
     122
     123        void __kernel_io_shutdown( cluster & this ) {
     124                // Stop the IO Poller
     125                // Notify the poller thread of the shutdown
     126                __atomic_store_n(&this.io.done, true, __ATOMIC_SEQ_CST);
     127                sigval val = { 1 };
     128                pthread_sigqueue( this.io.poller, SIGUSR1, val );
     129
     130                // Wait for the poller thread to finish
     131                pthread_join( this.io.poller, 0p );
     132                free( this.io.stack );
     133
     134                // Shutdown the io rings
     135                struct io_uring_sq & sq = this.io.submit_q;
     136                struct io_uring_cq & cq = this.io.completion_q;
     137
     138                // unmap the submit queue entries
     139                munmap(sq.sqes, *sq.entries * sizeof(struct io_uring_sqe));
     140
     141                // unmap the Submit Queue ring
     142                munmap(sq.ring_ptr, sq.ring_sz);
     143
     144                // unmap the Completion Queue ring, if it is different
     145                if (cq.ring_ptr != sq.ring_ptr) {
     146                        munmap(cq.ring_ptr, cq.ring_sz);
     147                }
     148
     149                // close the file descriptor
     150                close(this.io.fd);
     151        }
     152
     153//=============================================================================================
     154// I/O Polling
     155//=============================================================================================
     156        struct io_user_data {
     157                int32_t result;
     158                $thread * thrd;
     159        };
     160
     161        // Process a single completion message from the io_uring
     162        // This is NOT thread-safe
     163        static bool __io_process(struct io_ring & ring) {
     164                unsigned head = *ring.completion_q.head;
     165                unsigned tail = __atomic_load_n(ring.completion_q.tail, __ATOMIC_ACQUIRE);
     166
     167                if (head == tail) return false;
     168
     169                unsigned idx = head & (*ring.completion_q.mask);
     170                struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
     171
     172                /* paranoid */ verify(&cqe);
     173
     174                struct io_user_data * data = (struct io_user_data *)cqe.user_data;
     175                data->result = cqe.res;
     176                unpark( data->thrd __cfaabi_dbg_ctx2 );
     177
     178                // Mark to the kernel that the cqe has been seen
     179                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
     180                __atomic_fetch_add( ring.completion_q.head, 1, __ATOMIC_RELEASE );
     181
     182                return true;
     183        }
     184
     185        static void * __io_poller( void * arg ) {
     186                cluster * cltr = (cluster *)arg;
     187                struct io_ring & ring = cltr->io;
     188
     189                sigset_t mask;
     190                sigfillset(&mask);
     191                if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
     192                        abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
     193                }
     194
     195                sigdelset( &mask, SIGUSR1 );
     196
     197                verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
     198                verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
     199
     200                LOOP: while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
     201                        int ret = syscall( __NR_io_uring_enter, ring.fd, 0, 1, IORING_ENTER_GETEVENTS, &mask, _NSIG / 8);
     202                        if( ret < 0 ) {
     203                                switch((int)errno) {
     204                                case EAGAIN:
     205                                case EINTR:
     206                                        continue LOOP;
     207                                default:
     208                                        abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
     209                                }
     210                        }
     211
     212                        // Drain the queue
     213                        while(__io_process(ring)) {}
     214                }
     215
     216                return 0p;
     217        }
     218
     219//=============================================================================================
     220// I/O Submissions
     221//=============================================================================================
     222
     223[* struct io_uring_sqe, uint32_t] io_submit_prelude( struct io_ring & ring ) {
     224        return [0p, 0];
     225}
     226
     227//=============================================================================================
     228// I/O Interface
     229//=============================================================================================
     230
     231/*
     232        extern "C" {
     233                #define __USE_GNU
     234                #include <sys/uio.h>
     235        }
     236
     237        ssize_t async_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
     238                #if !defined(IORING_OP_READV)
     239                        return preadv2(fd, iov, iovcnt, offset, flags);
     240                #else
     241                        sqe->opcode = IORING_OP_READV;
     242                        sqe->flags = 0;
     243                        sqe->ioprio = 0;
     244                        sqe->fd = fd;
     245                        sqe->off = offset;
     246                        sqe->addr = (unsigned long) iov;
     247                        sqe->len = iovcnt;
     248                        sqe->rw_flags = 0;
     249                        sqe->user_data = 0;
     250                        sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
     251                #endif
     252        }
     253
     254        ssize_t async_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
     255                #if !defined(IORING_OP_WRITEV)
     256                        return pwritev2(fd, iov, iovcnt, offset, flags);
     257                #else
     258                        #warning not implemented
     259                #endif
     260        }
     261
     262        bool is_async( void (*func)() ) {
     263                switch((uintptr_t)func) {
     264                case (uintptr_t)preadv2:
     265                case (uintptr_t)async_preadv2:
     266                        #if defined(IORING_OP_READV)
     267                                return true;
     268                        #else
     269                                return false;
     270                        #endif
     271                default:
     272                        return false;
     273                }
     274        }
     275*/
     276
     277#endif
  • libcfa/src/concurrency/kernel.cfa

    r72828a8 r92976d9  
    262262        threads{ __get };
    263263
     264        __kernel_io_startup( this );
     265
    264266        doregister(this);
    265267}
    266268
    267269void ^?{}(cluster & this) {
     270        __kernel_io_shutdown( this );
     271
    268272        unregister(this);
    269273}
     
    808812        ^(*mainThread){};
    809813
     814        ^(*mainCluster){};
     815
    810816        ^(__cfa_dbg_global_clusters.list){};
    811817        ^(__cfa_dbg_global_clusters.lock){};
  • libcfa/src/concurrency/kernel.hfa

    r72828a8 r92976d9  
    1717
    1818#include <stdbool.h>
     19#include <stdint.h>
    1920
    2021#include "invoke.h"
     
    111112
    112113//-----------------------------------------------------------------------------
     114// I/O
     115#if defined(HAVE_LINUX_IO_URING_H)
     116struct io_uring_sq {
     117        uint32_t * head;
     118        uint32_t * tail;
     119        uint32_t * mask;
     120        uint32_t * entries;
     121        uint32_t * flags;
     122        uint32_t * dropped;
     123        uint32_t * array;
     124        struct io_uring_sqe * sqes;
     125
     126        uint32_t sqe_head;
     127        uint32_t sqe_tail;
     128
     129        size_t ring_sz;
     130        void * ring_ptr;
     131};
     132
     133struct io_uring_cq {
     134        volatile uint32_t * head;
     135        volatile uint32_t * tail;
     136        uint32_t * mask;
     137        struct io_uring_cqe * entries;
     138        uint32_t * overflow;
     139        struct io_uring_cqe * cqes;
     140
     141        size_t ring_sz;
     142        void * ring_ptr;
     143};
     144
     145struct io_ring {
     146        struct io_uring_sq submit_q;
     147        struct io_uring_cq completion_q;
     148        uint32_t flags;
     149        int fd;
     150        pthread_t poller;
     151        void * stack;
     152        volatile bool done;
     153};
     154#endif
     155
     156//-----------------------------------------------------------------------------
    113157// Cluster
    114158struct cluster {
     
    141185                cluster * prev;
    142186        } node;
     187
     188        #if defined(HAVE_LINUX_IO_URING_H)
     189                struct io_ring io;
     190        #endif
    143191};
    144192extern Duration default_preemption();
  • libcfa/src/concurrency/kernel_private.hfa

    r72828a8 r92976d9  
    7171
    7272//-----------------------------------------------------------------------------
     73// I/O
     74void __kernel_io_startup ( cluster & );
     75void __kernel_io_shutdown( cluster & );
     76
     77//-----------------------------------------------------------------------------
    7378// Utils
    7479#define KERNEL_STORAGE(T,X) static char storage_##X[sizeof(T)]
Note: See TracChangeset for help on using the changeset viewer.