Changes in / [7df014f:0100882]


Ignore:
Location:
libcfa/src/concurrency
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    r7df014f r0100882  
    1 #include "kernel.hfa"
    2 
    3 #if !defined(HAVE_LINUX_IO_URING_H)
    4         void __kernel_io_startup( cluster & this ) {
    5                 // Nothing to do without io_uring
    6         }
    7 
    8         void __kernel_io_shutdown( cluster & this ) {
    9                 // Nothing to do without io_uring
    10         }
    11 
    12         bool is_async( void (*)() ) {
    13                 return false;
    14         }
    15 
    16 #else
    17         extern "C" {
    18                 #define _GNU_SOURCE         /* See feature_test_macros(7) */
    19                 #include <errno.h>
    20                 #include <stdint.h>
    21                 #include <string.h>
    22                 #include <unistd.h>
    23                 #include <sys/mman.h>
    24                 #include <sys/syscall.h>
    25 
    26                 #include <linux/io_uring.h>
    27         }
    28 
    29         #include "bits/signal.hfa"
    30         #include "kernel_private.hfa"
    31         #include "thread.hfa"
    32 
    33         uint32_t entries_per_cluster() {
    34                 return 256;
    35         }
    36 
    37         static void * __io_poller( void * arg );
    38 
    39 //=============================================================================================
    40 // I/O Startup / Shutdown logic
    41 //=============================================================================================
    42         void __kernel_io_startup( cluster & this ) {
    43                 // Step 1 : call to setup
    44                 struct io_uring_params params;
    45                 memset(&params, 0, sizeof(params));
    46 
    47                 int fd = syscall(__NR_io_uring_setup, entries_per_cluster(), &params );
    48                 if(fd < 0) {
    49                         abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
    50                 }
    51 
    52                 // Step 2 : mmap result
    53                 memset(&this.io, 0, sizeof(struct io_ring));
    54                 struct io_uring_sq * sq = &this.io.submit_q;
    55                 struct io_uring_cq * cq = &this.io.completion_q;
    56 
    57                 // calculate the right ring size
    58                 sq->ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
    59                 cq->ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
    60 
    61                 // Requires features
    62                 // // adjust the size according to the parameters
    63                 // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
    64                 //      cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz);
    65                 // }
    66 
    67                 // mmap the Submit Queue into existence
    68                 sq->ring_ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
    69                 if (sq->ring_ptr == (void*)MAP_FAILED) {
    70                         abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
    71                 }
    72 
    73                 // mmap the Completion Queue into existence (may or may not be needed)
    74                 // Requires features
    75                 // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
    76                 //      cq->ring_ptr = sq->ring_ptr;
    77                 // }
    78                 // else {
    79                         // We need multiple call to MMAP
    80                         cq->ring_ptr = mmap(0, cq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
    81                         if (cq->ring_ptr == (void*)MAP_FAILED) {
    82                                 munmap(sq->ring_ptr, sq->ring_sz);
    83                                 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
    84                         }
    85                 // }
    86 
    87                 // mmap the submit queue entries
    88                 size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
    89                 sq->sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
    90                 if (sq->sqes == (struct io_uring_sqe *)MAP_FAILED) {
    91                         munmap(sq->ring_ptr, sq->ring_sz);
    92                         if (cq->ring_ptr != sq->ring_ptr) munmap(cq->ring_ptr, cq->ring_sz);
    93                         abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
    94                 }
    95 
    96                 // Get the pointers from the kernel to fill the structure
    97                 // submit queue
    98                 sq->head    = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.head;
    99                 sq->tail    = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.tail;
    100                 sq->mask    = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.ring_mask;
    101                 sq->entries = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.ring_entries;
    102                 sq->flags   = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.flags;
    103                 sq->dropped = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.dropped;
    104                 sq->array   = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.array;
    105 
    106                 // completion queue
    107                 cq->head     = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.head;
    108                 cq->tail     = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.tail;
    109                 cq->mask     = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.ring_mask;
    110                 cq->entries  = (struct io_uring_cqe *)((intptr_t)cq->ring_ptr) + params.cq_off.ring_entries;
    111                 cq->overflow = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.overflow;
    112                 cq->cqes = (struct io_uring_cqe *)((intptr_t)cq->ring_ptr) + params.cq_off.cqes;
    113 
    114                 // Update the global ring info
    115                 this.io.flags = params.flags;
    116                 this.io.fd    = fd;
    117                 this.io.done  = false;
    118 
    119                 // Create the poller thread
    120                 this.io.stack = __create_pthread( &this.io.poller, __io_poller, &this );
    121         }
    122 
    123         void __kernel_io_shutdown( cluster & this ) {
    124                 // Stop the IO Poller
    125                 // Notify the poller thread of the shutdown
    126                 __atomic_store_n(&this.io.done, true, __ATOMIC_SEQ_CST);
    127                 sigval val = { 1 };
    128                 pthread_sigqueue( this.io.poller, SIGUSR1, val );
    129 
    130                 // Wait for the poller thread to finish
    131                 pthread_join( this.io.poller, 0p );
    132                 free( this.io.stack );
    133 
    134                 // Shutdown the io rings
    135                 struct io_uring_sq & sq = this.io.submit_q;
    136                 struct io_uring_cq & cq = this.io.completion_q;
    137 
    138                 // unmap the submit queue entries
    139                 munmap(sq.sqes, *sq.entries * sizeof(struct io_uring_sqe));
    140 
    141                 // unmap the Submit Queue ring
    142                 munmap(sq.ring_ptr, sq.ring_sz);
    143 
    144                 // unmap the Completion Queue ring, if it is different
    145                 if (cq.ring_ptr != sq.ring_ptr) {
    146                         munmap(cq.ring_ptr, cq.ring_sz);
    147                 }
    148 
    149                 // close the file descriptor
    150                 close(this.io.fd);
    151         }
    152 
    153 //=============================================================================================
    154 // I/O Polling
    155 //=============================================================================================
    156         struct io_user_data {
    157                 int32_t result;
    158                 $thread * thrd;
    159         };
    160 
    161         // Process a single completion message from the io_uring
    162         // This is NOT thread-safe
    163         static bool __io_process(struct io_ring & ring) {
    164                 unsigned head = *ring.completion_q.head;
    165                 unsigned tail = __atomic_load_n(ring.completion_q.tail, __ATOMIC_ACQUIRE);
    166 
    167                 if (head == tail) return false;
    168 
    169                 unsigned idx = head & (*ring.completion_q.mask);
    170                 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
    171 
    172                 /* paranoid */ verify(&cqe);
    173 
    174                 struct io_user_data * data = (struct io_user_data *)cqe.user_data;
    175                 data->result = cqe.res;
    176                 unpark( data->thrd __cfaabi_dbg_ctx2 );
    177 
    178                 // Mark to the kernel that the cqe has been seen
    179                 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
    180                 __atomic_fetch_add( ring.completion_q.head, 1, __ATOMIC_RELEASE );
    181 
    182                 return true;
    183         }
    184 
    185         static void * __io_poller( void * arg ) {
    186                 cluster * cltr = (cluster *)arg;
    187                 struct io_ring & ring = cltr->io;
    188 
    189                 sigset_t mask;
    190                 sigfillset(&mask);
    191                 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
    192                         abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
    193                 }
    194 
    195                 sigdelset( &mask, SIGUSR1 );
    196 
    197                 verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
    198                 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
    199 
    200                 LOOP: while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    201                         int ret = syscall( __NR_io_uring_enter, ring.fd, 0, 1, IORING_ENTER_GETEVENTS, &mask, _NSIG / 8);
    202                         if( ret < 0 ) {
    203                                 switch((int)errno) {
    204                                 case EAGAIN:
    205                                 case EINTR:
    206                                         continue LOOP;
    207                                 default:
    208                                         abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
    209                                 }
    210                         }
    211 
    212                         // Drain the queue
    213                         while(__io_process(ring)) {}
    214                 }
    215 
    216                 return 0p;
    217         }
    218 
    219 //=============================================================================================
    220 // I/O Submissions
    221 //=============================================================================================
    222 
    223 [* struct io_uring_sqe, uint32_t] io_submit_prelude( struct io_ring & ring ) {
    224         return [0p, 0];
    225 }
    226 
    227 //=============================================================================================
    228 // I/O Interface
    229 //=============================================================================================
    230 
    231 /*
    232         extern "C" {
    233                 #define __USE_GNU
    234                 #include <sys/uio.h>
    235         }
    236 
    237         ssize_t async_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
    238                 #if !defined(IORING_OP_READV)
    239                         return preadv2(fd, iov, iovcnt, offset, flags);
    240                 #else
    241                         sqe->opcode = IORING_OP_READV;
    242                         sqe->flags = 0;
    243                         sqe->ioprio = 0;
    244                         sqe->fd = fd;
    245                         sqe->off = offset;
    246                         sqe->addr = (unsigned long) iov;
    247                         sqe->len = iovcnt;
    248                         sqe->rw_flags = 0;
    249                         sqe->user_data = 0;
    250                         sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
    251                 #endif
    252         }
    253 
    254         ssize_t async_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
    255                 #if !defined(IORING_OP_WRITEV)
    256                         return pwritev2(fd, iov, iovcnt, offset, flags);
    257                 #else
    258                         #warning not implemented
    259                 #endif
    260         }
    261 
    262         bool is_async( void (*func)() ) {
    263                 switch((uintptr_t)func) {
    264                 case (uintptr_t)preadv2:
    265                 case (uintptr_t)async_preadv2:
    266                         #if defined(IORING_OP_READV)
    267                                 return true;
    268                         #else
    269                                 return false;
    270                         #endif
    271                 default:
    272                         return false;
    273                 }
    274         }
    275 */
    276 
    277 #endif
  • libcfa/src/concurrency/kernel.cfa

    r7df014f r0100882  
    262262        threads{ __get };
    263263
    264         __kernel_io_startup( this );
    265 
    266264        doregister(this);
    267265}
    268266
    269267void ^?{}(cluster & this) {
    270         __kernel_io_shutdown( this );
    271 
    272268        unregister(this);
    273269}
     
    812808        ^(*mainThread){};
    813809
    814         ^(*mainCluster){};
    815 
    816810        ^(__cfa_dbg_global_clusters.list){};
    817811        ^(__cfa_dbg_global_clusters.lock){};
  • libcfa/src/concurrency/kernel.hfa

    r7df014f r0100882  
    1717
    1818#include <stdbool.h>
    19 #include <stdint.h>
    2019
    2120#include "invoke.h"
     
    112111
    113112//-----------------------------------------------------------------------------
    114 // I/O
    115 #if defined(HAVE_LINUX_IO_URING_H)
    116 struct io_uring_sq {
    117         uint32_t * head;
    118         uint32_t * tail;
    119         uint32_t * mask;
    120         uint32_t * entries;
    121         uint32_t * flags;
    122         uint32_t * dropped;
    123         uint32_t * array;
    124         struct io_uring_sqe * sqes;
    125 
    126         uint32_t sqe_head;
    127         uint32_t sqe_tail;
    128 
    129         size_t ring_sz;
    130         void * ring_ptr;
    131 };
    132 
    133 struct io_uring_cq {
    134         volatile uint32_t * head;
    135         volatile uint32_t * tail;
    136         uint32_t * mask;
    137         struct io_uring_cqe * entries;
    138         uint32_t * overflow;
    139         struct io_uring_cqe * cqes;
    140 
    141         size_t ring_sz;
    142         void * ring_ptr;
    143 };
    144 
    145 struct io_ring {
    146         struct io_uring_sq submit_q;
    147         struct io_uring_cq completion_q;
    148         uint32_t flags;
    149         int fd;
    150         pthread_t poller;
    151         void * stack;
    152         volatile bool done;
    153 };
    154 #endif
    155 
    156 //-----------------------------------------------------------------------------
    157113// Cluster
    158114struct cluster {
     
    185141                cluster * prev;
    186142        } node;
    187 
    188         #if defined(HAVE_LINUX_IO_URING_H)
    189                 struct io_ring io;
    190         #endif
    191143};
    192144extern Duration default_preemption();
  • libcfa/src/concurrency/kernel_private.hfa

    r7df014f r0100882  
    7171
    7272//-----------------------------------------------------------------------------
    73 // I/O
    74 void __kernel_io_startup ( cluster & );
    75 void __kernel_io_shutdown( cluster & );
    76 
    77 //-----------------------------------------------------------------------------
    7873// Utils
    7974#define KERNEL_STORAGE(T,X) static char storage_##X[sizeof(T)]
Note: See TracChangeset for help on using the changeset viewer.