Changes in / [6dba8755:95789be]


Ignore:
Files:
4 added
20 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/readv.cfa

    r6dba8755 r95789be  
    1212}
    1313
     14#include <errno.h>
    1415#include <unistd.h>
    1516
    1617#include <clock.hfa>
     18#include <iofwd.hfa>
    1719#include <kernel.hfa>
    1820#include <thread.hfa>
     
    2325
    2426extern bool traceHeapOn();
    25 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
    26 extern ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
    27 extern void register_fixed_files( cluster &, int *, unsigned count );
    2827
    2928int fd;
     
    3130volatile size_t count = 0;
    3231
    33 unsigned long int buflen = 50;
     32unsigned long int buflen = 512;
    3433bool fixed_file = false;
    3534
     
    4039
    4140int do_read(int fd, struct iovec * iov) {
     41        // extern ssize_t cfa_preadv2(int, const struct iovec *, int, off_t, int, int = 0, Duration = -1`s, io_cancellation * = 0p, io_context * = 0p);
     42        int sflags = 0;
    4243        if(fixed_file) {
    43                 return cfa_preadv2_fixed(fd, iov, 1, 0, 0);
     44                sflags |= CFA_IO_FIXED_FD1;
    4445        }
    45         else {
    46                 return cfa_preadv2(fd, iov, 1, 0, 0);
    47         }
     46        return cfa_preadv2(fd, iov, 1, 0, 0, sflags, -1`s, 0p, 0p);
    4847}
    4948
     
    5251        /* paranoid */ assert( true == __atomic_load_n(&run, __ATOMIC_RELAXED) );
    5352
    54         char data[buflen];
     53        __attribute__((aligned(512)))  char data[buflen];
    5554        struct iovec iov = { data, buflen };
    5655
    5756        while(__atomic_load_n(&run, __ATOMIC_RELAXED)) {
    5857                int r = do_read(fd, &iov);
    59                 if(r < 0) abort("%s\n", strerror(-r));
     58                if(r < 0) abort("%s\n", strerror(errno));
    6059
    6160                __atomic_fetch_add( &count, 1, __ATOMIC_SEQ_CST );
     
    6564int main(int argc, char * argv[]) {
    6665        BENCH_DECL
    67         unsigned flags = 0;
     66        unsigned num_io = 1;
     67        io_context_params params;
    6868        int file_flags = 0;
    6969        unsigned sublen = 16;
     
    7474                        BENCH_OPT_LONG
    7575                        {"bufsize",       required_argument, 0, 'b'},
    76                         {"userthread",    no_argument      , 0, 'u'},
    7776                        {"submitthread",  no_argument      , 0, 's'},
    7877                        {"eagersubmit",   no_argument      , 0, 'e'},
    7978                        {"kpollsubmit",   no_argument      , 0, 'k'},
    8079                        {"kpollcomplete", no_argument      , 0, 'i'},
     80                        {"fixed-files",   no_argument      , 0, 'f'},
     81                        {"open-direct",   no_argument      , 0, 'o'},
    8182                        {"submitlength",  required_argument, 0, 'l'},
    8283                        {0, 0, 0, 0}
     
    8485
    8586                int idx = 0;
    86                 int opt = getopt_long(argc, argv, BENCH_OPT_SHORT "b:usekil:", options, &idx);
     87                int opt = getopt_long(argc, argv, BENCH_OPT_SHORT "b:sekil:", options, &idx);
    8788
    8889                const char * arg = optarg ? optarg : "";
     
    100101                                }
    101102                                break;
    102                         case 'u':
    103                                 flags |= CFA_CLUSTER_IO_POLLER_USER_THREAD;
    104                                 break;
    105103                        case 's':
    106                                 flags |= CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS;
     104                                params.poller_submits = true;
    107105                                break;
    108106                        case 'e':
    109                                 flags |= CFA_CLUSTER_IO_EAGER_SUBMITS;
     107                                params.eager_submits = true;
    110108                                break;
    111109                        case 'k':
    112                                 flags |= CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS;
     110                                params.poll_submit = true;
     111                        case 'f':
    113112                                fixed_file = true;
    114113                                break;
    115114                        case 'i':
    116                                 flags |= CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES;
     115                                params.poll_complete = true;
     116                        case 'o':
    117117                                file_flags |= O_DIRECT;
    118118                                break;
     
    123123                                        goto usage;
    124124                                }
    125                                 flags |= (sublen << CFA_CLUSTER_IO_BUFFLEN_OFFSET);
     125                                // flags |= (sublen << CFA_CLUSTER_IO_BUFFLEN_OFFSET);
    126126                                break;
    127127                        default: /* ? */
     
    150150        {
    151151                Time start, end;
    152                 BenchCluster cl = { flags, CFA_STATS_READY_Q | CFA_STATS_IO };
     152                BenchCluster cl = { num_io, params, CFA_STATS_READY_Q | CFA_STATS_IO };
    153153
    154154                if(fixed_file) {
     
    179179                                printf("\nDone\n");
    180180                        }
     181                        printf("Readers closed\n");
    181182                }
    182183                printf("Took %'ld ms\n", (end - start)`ms);
  • libcfa/configure

    r6dba8755 r95789be  
    701701CFA_PREFIX
    702702CFA_NAME
     703AM_T
    703704BUILDLIB_FALSE
    704705BUILDLIB_TRUE
     
    31873188  BUILDLIB_FALSE=
    31883189fi
     3190
     3191
     3192AM_T='$(T)'
    31893193
    31903194
     
    1701717021
    1701817022
     17023
    1701917024for ac_header in linux/io_uring.h
    1702017025do :
     
    1929519300
    1929619301
     19302
     19303fi
     19304
     19305
     19306
     19307        # check support for various io_uring flags
     19308
     19309                ac_fn_c_check_decl "$LINENO" "IOSQE_FIXED_FILE" "ac_cv_have_decl_IOSQE_FIXED_FILE" "#include <linux/io_uring.h>
     19310"
     19311if test "x$ac_cv_have_decl_IOSQE_FIXED_FILE" = xyes; then :
     19312  $as_echo "#define CFA_HAVE_IOSQE_FIXED_FILE 1" >>confdefs.h
     19313
     19314fi
     19315
     19316
     19317                ac_fn_c_check_decl "$LINENO" "IOSQE_IO_DRAIN" "ac_cv_have_decl_IOSQE_IO_DRAIN" "#include <linux/io_uring.h>
     19318"
     19319if test "x$ac_cv_have_decl_IOSQE_IO_DRAIN" = xyes; then :
     19320  $as_echo "#define CFA_HAVE_IOSQE_IO_DRAIN 1" >>confdefs.h
     19321
     19322fi
     19323
     19324
     19325                ac_fn_c_check_decl "$LINENO" "IOSQE_ASYNC" "ac_cv_have_decl_IOSQE_ASYNC" "#include <linux/io_uring.h>
     19326"
     19327if test "x$ac_cv_have_decl_IOSQE_ASYNC" = xyes; then :
     19328  $as_echo "#define CFA_HAVE_IOSQE_ASYNC 1" >>confdefs.h
     19329
     19330fi
     19331
     19332
     19333                ac_fn_c_check_decl "$LINENO" "IOSQE_IO_LINK" "ac_cv_have_decl_IOSQE_IO_LINK" "#include <linux/io_uring.h>
     19334"
     19335if test "x$ac_cv_have_decl_IOSQE_IO_LINK" = xyes; then :
     19336  $as_echo "#define CFA_HAVE_IOSQE_IO_LINK 1" >>confdefs.h
     19337
     19338fi
     19339
     19340
     19341                ac_fn_c_check_decl "$LINENO" "IOSQE_IO_HARDLINK" "ac_cv_have_decl_IOSQE_IO_HARDLINK" "#include <linux/io_uring.h>
     19342"
     19343if test "x$ac_cv_have_decl_IOSQE_IO_HARDLINK" = xyes; then :
     19344  $as_echo "#define CFA_HAVE_IOSQE_IO_HARDLINK 1" >>confdefs.h
     19345
     19346fi
     19347
     19348
     19349                ac_fn_c_check_decl "$LINENO" "SPLICE_F_FD_IN_FIXED" "ac_cv_have_decl_SPLICE_F_FD_IN_FIXED" "#include <linux/io_uring.h>
     19350"
     19351if test "x$ac_cv_have_decl_SPLICE_F_FD_IN_FIXED" = xyes; then :
     19352  $as_echo "#define CFA_HAVE_SPLICE_F_FD_IN_FIXED 1" >>confdefs.h
    1929719353
    1929819354fi
     
    2115821214#! $SHELL
    2115921215# Generated automatically by $as_me ($PACKAGE) $VERSION
    21160 # Libtool was configured on host `(hostname || uname -n) 2>/dev/null | sed 1q`:
    2116121216# NOTE: Changes made to this file will be lost: look at ltmain.sh.
    2116221217
  • libcfa/configure.ac

    r6dba8755 r95789be  
    105105AM_CONDITIONAL([BUILDLIB], [test "x${CONFIG_BUILDLIB}" = "xyes"])
    106106
     107AM_T='$(T)'
     108AC_SUBST(AM_T)
     109
    107110#==============================================================================
    108111#Trasforming cc1 will break compilation
     
    129132#io_uring 5.6 and later uses probes
    130133define(ioring_ops, [IORING_OP_NOP,IORING_OP_READV,IORING_OP_WRITEV,IORING_OP_FSYNC,IORING_OP_READ_FIXED,IORING_OP_WRITE_FIXED,IORING_OP_POLL_ADD,IORING_OP_POLL_REMOVE,IORING_OP_SYNC_FILE_RANGE,IORING_OP_SENDMSG,IORING_OP_RECVMSG,IORING_OP_TIMEOUT,IORING_OP_TIMEOUT_REMOVE,IORING_OP_ACCEPT,IORING_OP_ASYNC_CANCEL,IORING_OP_LINK_TIMEOUT,IORING_OP_CONNECT,IORING_OP_FALLOCATE,IORING_OP_OPENAT,IORING_OP_CLOSE,IORING_OP_FILES_UPDATE,IORING_OP_STATX,IORING_OP_READ,IORING_OP_WRITE,IORING_OP_FADVISE,IORING_OP_MADVISE,IORING_OP_SEND,IORING_OP_RECV,IORING_OP_OPENAT2,IORING_OP_EPOLL_CTL,IORING_OP_SPLICE,IORING_OP_PROVIDE_BUFFERS,IORING_OP_REMOVE_BUFFER])
     134define(ioring_flags, [IOSQE_FIXED_FILE,IOSQE_IO_DRAIN,IOSQE_ASYNC,IOSQE_IO_LINK,IOSQE_IO_HARDLINK,SPLICE_F_FD_IN_FIXED])
    131135
    132136define(ioring_from_decls, [
     
    166170                ioring_from_decls
    167171        ])
     172
     173        # check support for various io_uring flags
     174        m4_foreach([op], [ioring_flags], [
     175                AC_CHECK_DECL(op, [AC_DEFINE([CFA_HAVE_]op)], [], [[#include <linux/io_uring.h>]])
     176        ])
    168177])
    169178AC_CHECK_FUNCS([preadv2 pwritev2])
  • libcfa/prelude/defines.hfa.in

    r6dba8755 r95789be  
    5050#undef CFA_HAVE_IORING_OP_REMOVE_BUFFER
    5151
     52#undef CFA_HAVE_IOSQE_FIXED_FILE
     53#undef CFA_HAVE_IOSQE_IO_DRAIN
     54#undef CFA_HAVE_IOSQE_ASYNC
     55#undef CFA_HAVE_IOSQE_IO_LINK
     56#undef CFA_HAVE_IOSQE_IO_HARDLINK
     57#undef CFA_HAVE_SPLICE_F_FD_IN_FIXED
     58
    5259#undef HAVE_PREADV2
    5360#undef HAVE_PWRITEV2
  • libcfa/src/Makefile.am

    r6dba8755 r95789be  
    5151# not all platforms support concurrency, add option do disable it
    5252thread_headers_nosrc = concurrency/invoke.h
    53 thread_headers = concurrency/coroutine.hfa concurrency/thread.hfa concurrency/kernel.hfa concurrency/monitor.hfa concurrency/mutex.hfa
    54 thread_libsrc = concurrency/CtxSwitch-@ARCHITECTURE@.S concurrency/alarm.cfa concurrency/invoke.c concurrency/io.cfa concurrency/iocall.cfa concurrency/preemption.cfa concurrency/ready_queue.cfa concurrency/stats.cfa ${thread_headers:.hfa=.cfa}
     53
     54thread_headers = concurrency/coroutine.hfa concurrency/thread.hfa concurrency/kernel.hfa \
     55                concurrency/monitor.hfa concurrency/mutex.hfa
     56
     57thread_libsrc = concurrency/CtxSwitch-@ARCHITECTURE@.S concurrency/alarm.cfa \
     58                concurrency/invoke.c concurrency/io.cfa concurrency/iocall.cfa \
     59                concurrency/io/setup.cfa \
     60                concurrency/kernel/startup.cfa concurrency/preemption.cfa \
     61                concurrency/ready_queue.cfa concurrency/stats.cfa \
     62                ${thread_headers:.hfa=.cfa}
    5563else
    5664headers =
  • libcfa/src/bits/debug.hfa

    r6dba8755 r95789be  
    1515
    1616#pragma once
     17
     18#include <assert.h>
    1719
    1820#ifdef __CFA_DEBUG__
  • libcfa/src/bits/defs.hfa

    r6dba8755 r95789be  
    1616#pragma once
    1717
    18 #include <stdbool.h>
    19 #include <stddef.h>
    2018#include <stdint.h>
    2119
     
    5452    return ( (unsigned long long)lo)|( ((unsigned long long)hi)<<32 );
    5553}
    56 
    57 // #define __CFA_NO_BIT_TEST_AND_SET__
    58 
    59 #if defined( __i386 )
    60 static inline bool __atomic_bts(volatile unsigned long int * target, unsigned long int bit ) {
    61         #if defined(__CFA_NO_BIT_TEST_AND_SET__)
    62         unsigned long int mask = 1ul << bit;
    63         unsigned long int ret = __atomic_fetch_or(target, mask, (int)__ATOMIC_RELAXED);
    64         return (ret & mask) != 0;
    65     #else
    66         int result = 0;
    67         asm volatile(
    68             "LOCK btsl %[bit], %[target]\n\t"
    69             : "=@ccc" (result)
    70             : [target] "m" (*target), [bit] "r" (bit)
    71         );
    72         return result != 0;
    73     #endif
    74 }
    75 
    76 static inline bool __atomic_btr(volatile unsigned long int * target, unsigned long int bit ) {
    77         #if defined(__CFA_NO_BIT_TEST_AND_SET__)
    78         unsigned long int mask = 1ul << bit;
    79         unsigned long int ret = __atomic_fetch_and(target, ~mask, (int)__ATOMIC_RELAXED);
    80         return (ret & mask) != 0;
    81         #else
    82         int result = 0;
    83         asm volatile(
    84             "LOCK btrl %[bit], %[target]\n\t"
    85             :"=@ccc" (result)
    86             : [target] "m" (*target), [bit] "r" (bit)
    87         );
    88         return result != 0;
    89     #endif
    90 }
    91 #elif defined( __x86_64 )
    92 static inline bool __atomic_bts(volatile unsigned long long int * target, unsigned long long int bit ) {
    93         #if defined(__CFA_NO_BIT_TEST_AND_SET__)
    94         unsigned long long int mask = 1ul << bit;
    95         unsigned long long int ret = __atomic_fetch_or(target, mask, (int)__ATOMIC_RELAXED);
    96         return (ret & mask) != 0;
    97     #else
    98         int result = 0;
    99         asm volatile(
    100             "LOCK btsq %[bit], %[target]\n\t"
    101             : "=@ccc" (result)
    102             : [target] "m" (*target), [bit] "r" (bit)
    103         );
    104         return result != 0;
    105     #endif
    106 }
    107 
    108 static inline bool __atomic_btr(volatile unsigned long long int * target, unsigned long long int bit ) {
    109         #if defined(__CFA_NO_BIT_TEST_AND_SET__)
    110         unsigned long long int mask = 1ul << bit;
    111         unsigned long long int ret = __atomic_fetch_and(target, ~mask, (int)__ATOMIC_RELAXED);
    112         return (ret & mask) != 0;
    113         #else
    114         int result = 0;
    115         asm volatile(
    116             "LOCK btrq %[bit], %[target]\n\t"
    117             :"=@ccc" (result)
    118             : [target] "m" (*target), [bit] "r" (bit)
    119         );
    120         return result != 0;
    121     #endif
    122 }
    123 #elif defined( __ARM_ARCH )
    124     #error __atomic_bts and __atomic_btr not implemented for arm
    125 #else
    126         #error uknown hardware architecture
    127 #endif
  • libcfa/src/bits/locks.hfa

    r6dba8755 r95789be  
    164164
    165165        #undef CHECKED
     166
     167        struct $thread;
     168        extern void park( __cfaabi_dbg_ctx_param );
     169        extern void unpark( struct $thread * this __cfaabi_dbg_ctx_param2 );
     170        static inline struct $thread * active_thread ();
     171
     172        // Semaphore which only supports a single thread
     173        struct single_sem {
     174                struct $thread * volatile ptr;
     175        };
     176
     177        static inline {
     178                void  ?{}(single_sem & this) {
     179                        this.ptr = 0p;
     180                }
     181
     182                void ^?{}(single_sem & this) {}
     183
     184                bool wait(single_sem & this) {
     185                        for() {
     186                                struct $thread * expected = this.ptr;
     187                                if(expected == 1p) {
     188                                        if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     189                                                return false;
     190                                        }
     191                                }
     192                                else {
     193                                        /* paranoid */ verify( expected == 0p );
     194                                        if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     195                                                park( __cfaabi_dbg_ctx );
     196                                                return true;
     197                                        }
     198                                }
     199
     200                        }
     201                }
     202
     203                bool post(single_sem & this) {
     204                        for() {
     205                                struct $thread * expected = this.ptr;
     206                                if(expected == 1p) return false;
     207                                if(expected == 0p) {
     208                                        if(__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     209                                                return false;
     210                                        }
     211                                }
     212                                else {
     213                                        if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     214                                                unpark( expected __cfaabi_dbg_ctx2 );
     215                                                return true;
     216                                        }
     217                                }
     218                        }
     219                }
     220        }
    166221#endif
  • libcfa/src/concurrency/alarm.cfa

    r6dba8755 r95789be  
    2323
    2424#include "alarm.hfa"
    25 #include "kernel_private.hfa"
     25#include "kernel/fwd.hfa"
    2626#include "preemption.hfa"
    2727
  • libcfa/src/concurrency/invoke.h

    r6dba8755 r95789be  
    1717#include "bits/defs.hfa"
    1818#include "bits/locks.hfa"
     19#include "kernel/fwd.hfa"
    1920
    2021#ifdef __cforall
     
    2526#ifndef _INVOKE_H_
    2627#define _INVOKE_H_
    27 
    28 #ifdef __ARM_ARCH
    29         // function prototypes are only really used by these macros on ARM
    30         void disable_global_interrupts();
    31         void enable_global_interrupts();
    32 
    33         #define TL_GET( member ) ( { __typeof__( kernelTLS.member ) target; \
    34                 disable_global_interrupts(); \
    35                 target = kernelTLS.member; \
    36                 enable_global_interrupts(); \
    37                 target; } )
    38         #define TL_SET( member, value ) disable_global_interrupts(); \
    39                 kernelTLS.member = value; \
    40                 enable_global_interrupts();
    41 #else
    42         #define TL_GET( member ) kernelTLS.member
    43         #define TL_SET( member, value ) kernelTLS.member = value;
    44 #endif
    45 
    46         #ifdef __cforall
    47         extern "Cforall" {
    48                 extern __attribute__((aligned(128))) thread_local struct KernelThreadData {
    49                         struct $thread    * volatile this_thread;
    50                         struct processor  * volatile this_processor;
    51                         struct __stats_t  * volatile this_stats;
    52 
    53                         struct {
    54                                 volatile unsigned short disable_count;
    55                                 volatile bool enabled;
    56                                 volatile bool in_progress;
    57                         } preemption_state;
    58 
    59                         #if defined(__SIZEOF_INT128__)
    60                                 __uint128_t rand_seed;
    61                         #else
    62                                 uint64_t rand_seed;
    63                         #endif
    64                 } kernelTLS __attribute__ ((tls_model ( "initial-exec" )));
    65         }
    66         #endif
    6728
    6829        struct __stack_context_t {
     
    9859
    9960        enum __Coroutine_State { Halted, Start, Primed, Blocked, Ready, Active };
    100         enum __Preemption_Reason { __NO_PREEMPTION, __ALARM_PREEMPTION, __POLL_PREEMPTION, __MANUAL_PREEMPTION };
    10161
    10262        struct $coroutine {
  • libcfa/src/concurrency/io.cfa

    r6dba8755 r95789be  
    1414//
    1515
     16#define __cforall_thread__
     17
    1618#if defined(__CFA_DEBUG__)
    1719        // #define __CFA_DEBUG_PRINT_IO__
     
    1921#endif
    2022
    21 #include "kernel.hfa"
    22 #include "bitmanip.hfa"
    23 
    24 #if !defined(CFA_HAVE_LINUX_IO_URING_H)
    25         void __kernel_io_startup( cluster &, unsigned, bool ) {
    26                 // Nothing to do without io_uring
    27         }
    28 
    29         void __kernel_io_finish_start( cluster & ) {
    30                 // Nothing to do without io_uring
    31         }
    32 
    33         void __kernel_io_prepare_stop( cluster & ) {
    34                 // Nothing to do without io_uring
    35         }
    36 
    37         void __kernel_io_shutdown( cluster &, bool ) {
    38                 // Nothing to do without io_uring
    39         }
    40 
    41 #else
     23
     24#if defined(CFA_HAVE_LINUX_IO_URING_H)
    4225        #define _GNU_SOURCE         /* See feature_test_macros(7) */
    4326        #include <errno.h>
     27        #include <signal.h>
    4428        #include <stdint.h>
    4529        #include <string.h>
    4630        #include <unistd.h>
    47         #include <sys/mman.h>
    4831
    4932        extern "C" {
     33                #include <sys/epoll.h>
    5034                #include <sys/syscall.h>
    5135
     
    5337        }
    5438
    55         #include "bits/signal.hfa"
    56         #include "kernel_private.hfa"
    57         #include "thread.hfa"
    58 
    59         uint32_t entries_per_cluster() {
    60                 return 256;
    61         }
    62 
    63         static void * __io_poller_slow( void * arg );
    64 
    65         // Weirdly, some systems that do support io_uring don't actually define these
    66         #ifdef __alpha__
    67                 /*
    68                 * alpha is the only exception, all other architectures
    69                 * have common numbers for new system calls.
    70                 */
    71                 #ifndef __NR_io_uring_setup
    72                         #define __NR_io_uring_setup           535
    73                 #endif
    74                 #ifndef __NR_io_uring_enter
    75                         #define __NR_io_uring_enter           536
    76                 #endif
    77                 #ifndef __NR_io_uring_register
    78                         #define __NR_io_uring_register        537
    79                 #endif
    80         #else /* !__alpha__ */
    81                 #ifndef __NR_io_uring_setup
    82                         #define __NR_io_uring_setup           425
    83                 #endif
    84                 #ifndef __NR_io_uring_enter
    85                         #define __NR_io_uring_enter           426
    86                 #endif
    87                 #ifndef __NR_io_uring_register
    88                         #define __NR_io_uring_register        427
    89                 #endif
    90         #endif
    91 
    92         // Fast poller user-thread
    93         // Not using the "thread" keyword because we want to control
    94         // more carefully when to start/stop it
    95         struct __io_poller_fast {
    96                 struct __io_data * ring;
    97                 $thread thrd;
    98         };
    99 
    100         void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
    101                 this.ring = cltr.io;
    102                 (this.thrd){ "Fast I/O Poller", cltr };
    103         }
    104         void ^?{}( __io_poller_fast & mutex this );
    105         void main( __io_poller_fast & this );
    106         static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
    107         void ^?{}( __io_poller_fast & mutex this ) {}
    108 
    109         struct __submition_data {
    110                 // Head and tail of the ring (associated with array)
    111                 volatile uint32_t * head;
    112                 volatile uint32_t * tail;
    113                 volatile uint32_t prev_head;
    114 
    115                 // The actual kernel ring which uses head/tail
    116                 // indexes into the sqes arrays
    117                 uint32_t * array;
    118 
    119                 // number of entries and mask to go with it
    120                 const uint32_t * num;
    121                 const uint32_t * mask;
    122 
    123                 // Submission flags (Not sure what for)
    124                 uint32_t * flags;
    125 
    126                 // number of sqes not submitted (whatever that means)
    127                 uint32_t * dropped;
    128 
    129                 // Like head/tail but not seen by the kernel
    130                 volatile uint32_t * ready;
    131                 uint32_t ready_cnt;
    132 
    133                 __spinlock_t lock;
    134                 __spinlock_t release_lock;
    135 
    136                 // A buffer of sqes (not the actual ring)
    137                 struct io_uring_sqe * sqes;
    138 
    139                 // The location and size of the mmaped area
    140                 void * ring_ptr;
    141                 size_t ring_sz;
    142         };
    143 
    144         struct __completion_data {
    145                 // Head and tail of the ring
    146                 volatile uint32_t * head;
    147                 volatile uint32_t * tail;
    148 
    149                 // number of entries and mask to go with it
    150                 const uint32_t * mask;
    151                 const uint32_t * num;
    152 
    153                 // number of cqes not submitted (whatever that means)
    154                 uint32_t * overflow;
    155 
    156                 // the kernel ring
    157                 struct io_uring_cqe * cqes;
    158 
    159                 // The location and size of the mmaped area
    160                 void * ring_ptr;
    161                 size_t ring_sz;
    162         };
    163 
    164         struct __io_data {
    165                 struct __submition_data submit_q;
    166                 struct __completion_data completion_q;
    167                 uint32_t ring_flags;
    168                 int cltr_flags;
    169                 int fd;
    170                 semaphore submit;
    171                 volatile bool done;
    172                 struct {
    173                         struct {
    174                                 __processor_id_t id;
    175                                 void * stack;
    176                                 pthread_t kthrd;
    177                                 volatile bool blocked;
    178                         } slow;
    179                         __io_poller_fast fast;
    180                         __bin_sem_t sem;
    181                 } poller;
    182         };
    183 
    184 //=============================================================================================
    185 // I/O Startup / Shutdown logic
    186 //=============================================================================================
    187         void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
    188                 if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) {
    189                         abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n");
    190                 }
    191 
    192                 this.io = malloc();
    193 
    194                 // Step 1 : call to setup
    195                 struct io_uring_params params;
    196                 memset(&params, 0, sizeof(params));
    197                 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS   ) params.flags |= IORING_SETUP_SQPOLL;
    198                 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES ) params.flags |= IORING_SETUP_IOPOLL;
    199 
    200                 uint32_t nentries = entries_per_cluster();
    201 
    202                 int fd = syscall(__NR_io_uring_setup, nentries, &params );
    203                 if(fd < 0) {
    204                         abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
    205                 }
    206 
    207                 // Step 2 : mmap result
    208                 memset( this.io, 0, sizeof(struct __io_data) );
    209                 struct __submition_data  & sq = this.io->submit_q;
    210                 struct __completion_data & cq = this.io->completion_q;
    211 
    212                 // calculate the right ring size
    213                 sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
    214                 cq.ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
    215 
    216                 // Requires features
    217                 #if defined(IORING_FEAT_SINGLE_MMAP)
    218                         // adjust the size according to the parameters
    219                         if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
    220                                 cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz);
    221                         }
    222                 #endif
    223 
    224                 // mmap the Submit Queue into existence
    225                 sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
    226                 if (sq.ring_ptr == (void*)MAP_FAILED) {
    227                         abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
    228                 }
    229 
    230                 // Requires features
    231                 #if defined(IORING_FEAT_SINGLE_MMAP)
    232                         // mmap the Completion Queue into existence (may or may not be needed)
    233                         if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
    234                                 cq.ring_ptr = sq.ring_ptr;
    235                         }
    236                         else
    237                 #endif
    238                 {
    239                         // We need multiple call to MMAP
    240                         cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
    241                         if (cq.ring_ptr == (void*)MAP_FAILED) {
    242                                 munmap(sq.ring_ptr, sq.ring_sz);
    243                                 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
    244                         }
    245                 }
    246 
    247                 // mmap the submit queue entries
    248                 size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
    249                 sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
    250                 if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
    251                         munmap(sq.ring_ptr, sq.ring_sz);
    252                         if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
    253                         abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
    254                 }
    255 
    256                 // Get the pointers from the kernel to fill the structure
    257                 // submit queue
    258                 sq.head    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
    259                 sq.tail    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
    260                 sq.mask    = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
    261                 sq.num     = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
    262                 sq.flags   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
    263                 sq.dropped = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
    264                 sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
    265                 sq.prev_head = *sq.head;
    266 
    267                 {
    268                         const uint32_t num = *sq.num;
    269                         for( i; num ) {
    270                                 sq.sqes[i].user_data = 0ul64;
    271                         }
    272                 }
    273 
    274                 (sq.lock){};
    275                 (sq.release_lock){};
    276 
    277                 if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) {
    278                         /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8)  );
    279                         sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
    280                         sq.ready = alloc_align( 64, sq.ready_cnt );
    281                         for(i; sq.ready_cnt) {
    282                                 sq.ready[i] = -1ul32;
    283                         }
    284                 }
    285                 else {
    286                         sq.ready_cnt = 0;
    287                         sq.ready = 0p;
    288                 }
    289 
    290                 // completion queue
    291                 cq.head     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
    292                 cq.tail     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
    293                 cq.mask     = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
    294                 cq.num      = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
    295                 cq.overflow = (         uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
    296                 cq.cqes   = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
    297 
    298                 // some paranoid checks
    299                 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
    300                 /* paranoid */ verifyf( (*cq.num)  >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
    301                 /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
    302                 /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
    303 
    304                 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
    305                 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
    306                 /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
    307                 /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
    308 
    309                 // Update the global ring info
    310                 this.io->ring_flags = params.flags;
    311                 this.io->cltr_flags = io_flags;
    312                 this.io->fd         = fd;
    313                 this.io->done       = false;
    314                 (this.io->submit){ min(*sq.num, *cq.num) };
    315 
    316                 if(!main_cluster) {
    317                         __kernel_io_finish_start( this );
    318                 }
    319         }
    320 
    321         void __kernel_io_finish_start( cluster & this ) {
    322                 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    323                         __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
    324                         (this.io->poller.fast){ this };
    325                         __thrd_start( this.io->poller.fast, main );
    326                 }
    327 
    328                 // Create the poller thread
    329                 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this);
    330                 this.io->poller.slow.blocked = false;
    331                 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
    332         }
    333 
    334         void __kernel_io_prepare_stop( cluster & this ) {
    335                 __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
    336                 // Notify the poller thread of the shutdown
    337                 __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
    338 
    339                 // Stop the IO Poller
    340                 sigval val = { 1 };
    341                 pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
    342                 post( this.io->poller.sem );
    343 
    344                 // Wait for the poller thread to finish
    345                 pthread_join( this.io->poller.slow.kthrd, 0p );
    346                 free( this.io->poller.slow.stack );
    347 
    348                 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
    349 
    350                 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    351                         with( this.io->poller.fast ) {
    352                                 /* paranoid */ verify( this.nprocessors == 0 || &this == mainCluster );
    353                                 /* paranoid */ verify( !ready_mutate_islocked() );
    354 
    355                                 // We need to adjust the clean-up based on where the thread is
    356                                 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
    357 
    358                                         ready_schedule_lock( (struct __processor_id_t *)active_processor() );
    359 
    360                                                 // This is the tricky case
    361                                                 // The thread was preempted and now it is on the ready queue
    362                                                 // The thread should be the last on the list
    363                                                 /* paranoid */ verify( thrd.link.next != 0p );
    364 
    365                                                 // Remove the thread from the ready queue of this cluster
    366                                                 __attribute__((unused)) bool removed = remove_head( &this, &thrd );
    367                                                 /* paranoid */ verify( removed );
    368                                                 thrd.link.next = 0p;
    369                                                 thrd.link.prev = 0p;
    370                                                 __cfaabi_dbg_debug_do( thrd.unpark_stale = true );
    371 
    372                                                 // Fixup the thread state
    373                                                 thrd.state = Blocked;
    374                                                 thrd.ticket = 0;
    375                                                 thrd.preempted = __NO_PREEMPTION;
    376 
    377                                         ready_schedule_unlock( (struct __processor_id_t *)active_processor() );
    378 
    379                                         // Pretend like the thread was blocked all along
    380                                 }
    381                                 // !!! This is not an else if !!!
    382                                 if( thrd.state == Blocked ) {
    383 
    384                                         // This is the "easy case"
    385                                         // The thread is parked and can easily be moved to active cluster
    386                                         verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
    387                                         thrd.curr_cluster = active_cluster();
    388 
    389                                         // unpark the fast io_poller
    390                                         unpark( &thrd __cfaabi_dbg_ctx2 );
    391                                 }
    392                                 else {
    393 
    394                                         // The thread is in a weird state
    395                                         // I don't know what to do here
    396                                         abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n");
    397                                 }
    398 
    399                         }
    400 
    401                         ^(this.io->poller.fast){};
    402 
    403                         __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
    404                 }
    405         }
    406 
    407         void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
    408                 if(!main_cluster) {
    409                         __kernel_io_prepare_stop( this );
    410                 }
    411 
    412                 // Shutdown the io rings
    413                 struct __submition_data  & sq = this.io->submit_q;
    414                 struct __completion_data & cq = this.io->completion_q;
    415 
    416                 // unmap the submit queue entries
    417                 munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
    418 
    419                 // unmap the Submit Queue ring
    420                 munmap(sq.ring_ptr, sq.ring_sz);
    421 
    422                 // unmap the Completion Queue ring, if it is different
    423                 if (cq.ring_ptr != sq.ring_ptr) {
    424                         munmap(cq.ring_ptr, cq.ring_sz);
    425                 }
    426 
    427                 // close the file descriptor
    428                 close(this.io->fd);
    429 
    430                 free( this.io->submit_q.ready ); // Maybe null, doesn't matter
    431                 free( this.io );
    432         }
    433 
    434         int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) {
     39        #include "stats.hfa"
     40        #include "kernel.hfa"
     41        #include "kernel/fwd.hfa"
     42        #include "io/types.hfa"
     43
     44//=============================================================================================
     45// I/O Syscall
     46//=============================================================================================
     47        static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) {
    43548                bool need_sys_to_submit = false;
    43649                bool need_sys_to_complete = false;
    437                 unsigned min_complete = 0;
    43850                unsigned flags = 0;
    439 
    44051
    44152                TO_SUBMIT:
     
    45162                }
    45263
    453                 TO_COMPLETE:
    45464                if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
    45565                        flags |= IORING_ENTER_GETEVENTS;
    456                         if( mask ) {
    457                                 need_sys_to_complete = true;
    458                                 min_complete = 1;
    459                                 break TO_COMPLETE;
    460                         }
    46166                        if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
    46267                                need_sys_to_complete = true;
     
    46671                int ret = 0;
    46772                if( need_sys_to_submit || need_sys_to_complete ) {
    468                         ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);
     73                        ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, 0p, _NSIG / 8);
    46974                        if( ret < 0 ) {
    47075                                switch((int)errno) {
     
    49095        static uint32_t __release_consumed_submission( struct __io_data & ring );
    49196
    492         static inline void process(struct io_uring_cqe & cqe, struct __processor_id_t * id ) {
     97        static inline void process(struct io_uring_cqe & cqe ) {
    49398                struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data;
    49499                __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
    495100
    496101                data->result = cqe.res;
    497                 if(!id) { unpark(     data->thrd __cfaabi_dbg_ctx2 ); }
    498                 else  { __unpark( id, data->thrd __cfaabi_dbg_ctx2 ); }
     102                unpark( data->thrd __cfaabi_dbg_ctx2 );
    499103        }
    500104
    501105        // Process a single completion message from the io_uring
    502106        // This is NOT thread-safe
    503         static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) {
     107        static [int, bool] __drain_io( & struct __io_data ring ) {
    504108                /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
    505109
    506110                unsigned to_submit = 0;
    507                 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
     111                if( ring.poller_submits ) {
    508112                        // If the poller thread also submits, then we need to aggregate the submissions which are ready
    509113                        to_submit = __collect_submitions( ring );
    510114                }
    511115
    512                 int ret = __io_uring_enter(ring, to_submit, true, mask);
     116                int ret = __io_uring_enter(ring, to_submit, true);
    513117                if( ret < 0 ) {
    514118                        return [0, true];
     
    547151                        /* paranoid */ verify(&cqe);
    548152
    549                         process( cqe, !mask ? (struct __processor_id_t *)0p : &ring.poller.slow.id );
    550                 }
    551 
    552                 // Allow new submissions to happen
    553                 // V(ring.submit, count);
     153                        process( cqe );
     154                }
    554155
    555156                // Mark to the kernel that the cqe has been seen
     
    561162        }
    562163
    563         static void * __io_poller_slow( void * arg ) {
    564                 #if !defined( __CFA_NO_STATISTICS__ )
    565                         __stats_t local_stats;
    566                         __init_stats( &local_stats );
    567                         kernelTLS.this_stats = &local_stats;
    568                 #endif
    569 
    570                 cluster * cltr = (cluster *)arg;
    571                 struct __io_data & ring = *cltr->io;
    572 
    573                 ring.poller.slow.id.id = doregister( &ring.poller.slow.id );
    574 
    575                 sigset_t mask;
    576                 sigfillset(&mask);
    577                 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
    578                         abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
    579                 }
    580 
    581                 sigdelset( &mask, SIGUSR1 );
    582 
    583                 verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
    584                 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
    585 
    586                 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
    587 
    588                 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    589                         while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    590 
    591                                 __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST );
    592 
    593                                 // In the user-thread approach drain and if anything was drained,
    594                                 // batton pass to the user-thread
    595                                 int count;
    596                                 bool again;
    597                                 [count, again] = __drain_io( ring, &mask );
    598 
    599                                 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
    600 
    601                                 // Update statistics
    602                                 __STATS__( true,
    603                                         io.complete_q.completed_avg.val += count;
    604                                         io.complete_q.completed_avg.slow_cnt += 1;
    605                                 )
    606 
    607                                 if(again) {
    608                                         __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
    609                                         __unpark( &ring.poller.slow.id, &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
    610                                         wait( ring.poller.sem );
    611                                 }
    612                         }
    613                 }
    614                 else {
    615                         while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    616                                 //In the naive approach, just poll the io completion queue directly
    617                                 int count;
    618                                 bool again;
    619                                 [count, again] = __drain_io( ring, &mask );
    620 
    621                                 // Update statistics
    622                                 __STATS__( true,
    623                                         io.complete_q.completed_avg.val += count;
    624                                         io.complete_q.completed_avg.slow_cnt += 1;
    625                                 )
    626                         }
    627                 }
    628 
    629                 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
    630 
    631                 unregister( &ring.poller.slow.id );
    632 
    633                 #if !defined(__CFA_NO_STATISTICS__)
    634                         __tally_stats(cltr->stats, &local_stats);
    635                 #endif
    636 
    637                 return 0p;
    638         }
    639 
    640         void main( __io_poller_fast & this ) {
    641                 verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD );
    642 
    643                 // Start parked
    644                 park( __cfaabi_dbg_ctx );
    645 
    646                 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
     164        void main( $io_ctx_thread & this ) {
     165                epoll_event ev;
     166                __ioctx_register( this, ev );
     167
     168                __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring);
    647169
    648170                int reset = 0;
    649 
    650171                // Then loop until we need to start
    651                 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
    652 
     172                while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
    653173                        // Drain the io
    654174                        int count;
    655175                        bool again;
    656176                        disable_interrupts();
    657                                 [count, again] = __drain_io( *this.ring, 0p );
     177                                [count, again] = __drain_io( *this.ring );
    658178
    659179                                if(!again) reset++;
     
    672192                        // We didn't get anything baton pass to the slow poller
    673193                        else {
    674                                 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
     194                                __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self);
    675195                                reset = 0;
    676196
    677                                 // wake up the slow poller
    678                                 post( this.ring->poller.sem );
    679 
    680                                 // park this thread
    681                                 park( __cfaabi_dbg_ctx );
     197                                // block this thread
     198                                __ioctx_prepare_block( this, ev );
     199                                wait( this.sem );
    682200                        }
    683201                }
    684202
    685203                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
    686         }
    687 
    688         static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
    689         static inline void __wake_poller( struct __io_data & ring ) {
    690                 if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;
    691 
    692                 sigval val = { 1 };
    693                 pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
    694204        }
    695205
     
    806316        }
    807317
    808         void __submit( struct __io_data & ring, uint32_t idx ) {
     318        void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1))) {
     319                __io_data & ring = *ctx->thrd.ring;
    809320                // Get now the data we definetely need
    810321                uint32_t * const tail = ring.submit_q.tail;
    811                 const uint32_t mask = *ring.submit_q.mask;
     322                const uint32_t mask  = *ring.submit_q.mask;
    812323
    813324                // There are 2 submission schemes, check which one we are using
    814                 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
     325                if( ring.poller_submits ) {
    815326                        // If the poller thread submits, then we just need to add this to the ready array
    816327                        __submit_to_ready_array( ring, idx, mask );
    817328
    818                         __wake_poller( ring );
     329                        post( ctx->thrd.sem );
    819330
    820331                        __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
    821332                }
    822                 else if( ring.cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS ) {
     333                else if( ring.eager_submits ) {
    823334                        uint32_t picked = __submit_to_ready_array( ring, idx, mask );
    824335
     
    849360                        // We got the lock
    850361                        unsigned to_submit = __collect_submitions( ring );
    851                         int ret = __io_uring_enter( ring, to_submit, false, 0p );
     362                        int ret = __io_uring_enter( ring, to_submit, false );
    852363                        if( ret < 0 ) {
    853364                                unlock(ring.submit_q.lock);
     
    892403
    893404                        // Submit however, many entries need to be submitted
    894                         int ret = __io_uring_enter( ring, 1, false, 0p );
     405                        int ret = __io_uring_enter( ring, 1, false );
    895406                        if( ret < 0 ) {
    896407                                switch((int)errno) {
     
    958469                return count;
    959470        }
    960 
    961 //=============================================================================================
    962 // I/O Submissions
    963 //=============================================================================================
    964 
    965         void register_fixed_files( cluster & cl, int * files, unsigned count ) {
    966                 int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );
    967                 if( ret < 0 ) {
    968                         abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
    969                 }
    970 
    971                 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
    972         }
    973471#endif
  • libcfa/src/concurrency/iocall.cfa

    r6dba8755 r95789be  
    1414//
    1515
     16#define __cforall_thread__
     17
    1618#include "bits/defs.hfa"
    1719
     
    2123
    2224#if defined(CFA_HAVE_LINUX_IO_URING_H)
     25        #include <assert.h>
    2326        #include <stdint.h>
     27        #include <errno.h>
    2428        #include <linux/io_uring.h>
    2529
    26         #include "kernel_private.hfa"
     30        #include "kernel.hfa"
     31        #include "kernel/fwd.hfa"
     32        #include "io/types.hfa"
    2733
    2834        extern [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data );
    29         extern void __submit( struct __io_data & ring, uint32_t idx );
     35        extern void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1)));
    3036
    3137        static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) {
     
    5258        }
    5359
     60        static inline io_context * __get_io_context( void ) {
     61                cluster * cltr = active_cluster();
     62                /* paranoid */ verifyf( cltr, "No active cluster for io operation\n");
     63                assertf( cltr->io.cnt > 0, "Cluster %p has no default io contexts and no context was specified\n", cltr );
     64                /* paranoid */ verifyf( cltr->io.ctxs, "default io contexts for cluster %p are missing\n", cltr);
     65                return &cltr->io.ctxs[ __tls_rand() % cltr->io.cnt ];
     66        }
     67
     68
     69      #if defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_IO_DRAIN) && defined(CFA_HAVE_IOSQE_ASYNC)
     70                #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_DRAIN | IOSQE_ASYNC)
     71        #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_ASYNC)
     72                #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_ASYNC)
     73      #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_IO_DRAIN)
     74                #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_DRAIN)
     75      #elif defined(CFA_HAVE_IOSQE_IO_DRAIN) && defined(CFA_HAVE_IOSQE_ASYNC)
     76                #define REGULAR_FLAGS (IOSQE_IO_DRAIN | IOSQE_ASYNC)
     77        #elif defined(CFA_HAVE_IOSQE_FIXED_FILE)
     78                #define REGULAR_FLAGS (IOSQE_FIXED_FILE)
     79      #elif defined(CFA_HAVE_IOSQE_IO_DRAIN)
     80                #define REGULAR_FLAGS (IOSQE_IO_DRAIN)
     81      #elif defined(CFA_HAVE_IOSQE_ASYNC)
     82                #define REGULAR_FLAGS (IOSQE_ASYNC)
     83        #else
     84                #define REGULAR_FLAGS (0)
     85        #endif
     86
     87        #if defined(CFA_HAVE_IOSQE_IO_LINK) && defined(CFA_HAVE_IOSQE_IO_HARDLINK)
     88                #define LINK_FLAGS (IOSQE_IO_LINK | IOSQE_IO_HARDLINK)
     89        #elif defined(CFA_HAVE_IOSQE_IO_LINK)
     90                #define LINK_FLAGS (IOSQE_IO_LINK)
     91        #elif defined(CFA_HAVE_IOSQE_IO_HARDLINK)
     92                #define LINK_FLAGS (IOSQE_IO_HARDLINK)
     93        #else
     94                #define LINK_FLAGS (0)
     95        #endif
     96
     97        #if defined(CFA_HAVE_SPLICE_F_FD_IN_FIXED)
     98                #define SPLICE_FLAGS (SPLICE_F_FD_IN_FIXED)
     99        #else
     100                #define SPLICE_FLAGS (0)
     101        #endif
     102
     103
    54104        #define __submit_prelude \
     105                if( 0 != (submit_flags & LINK_FLAGS) ) { errno = ENOTSUP; return -1; } \
     106                (void)timeout; (void)cancellation; \
     107                if( !context ) context = __get_io_context(); \
    55108                __io_user_data_t data = { 0, active_thread() }; \
    56                 struct __io_data & ring = *data.thrd->curr_cluster->io; \
     109                struct __io_data & ring = *context->thrd.ring; \
    57110                struct io_uring_sqe * sqe; \
    58111                uint32_t idx; \
    59                 [sqe, idx] = __submit_alloc( ring, (uint64_t)(uintptr_t)&data );
     112                [sqe, idx] = __submit_alloc( ring, (uint64_t)(uintptr_t)&data ); \
     113                sqe->flags = REGULAR_FLAGS & submit_flags;
    60114
    61115        #define __submit_wait \
    62116                /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \
    63117                verify( sqe->user_data == (uint64_t)(uintptr_t)&data ); \
    64                 __submit( ring, idx ); \
     118                __submit( context, idx ); \
    65119                park( __cfaabi_dbg_ctx ); \
     120                if( data.result < 0 ) { \
     121                        errno = -data.result; \
     122                        return -1; \
     123                } \
    66124                return data.result;
    67125#endif
     
    70128// I/O Forwards
    71129//=============================================================================================
     130#include <time.hfa>
    72131
    73132// Some forward declarations
     
    121180// Asynchronous operations
    122181#if defined(HAVE_PREADV2)
    123         ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
     182        ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    124183                #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READV)
    125184                        return preadv2(fd, iov, iovcnt, offset, flags);
     
    132191                #endif
    133192        }
    134 
    135         ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
    136                 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READV)
    137                         return preadv2(fd, iov, iovcnt, offset, flags);
     193#endif
     194
     195#if defined(HAVE_PWRITEV2)
     196        ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
     197                #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITEV)
     198                        return pwritev2(fd, iov, iovcnt, offset, flags);
    138199                #else
    139200                        __submit_prelude
    140201
    141                         (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset };
    142                         sqe->flags |= IOSQE_FIXED_FILE;
     202                        (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset };
    143203
    144204                        __submit_wait
     
    147207#endif
    148208
    149 #if defined(HAVE_PWRITEV2)
    150         ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
    151                 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITEV)
    152                         return pwritev2(fd, iov, iovcnt, offset, flags);
    153                 #else
    154                         __submit_prelude
    155 
    156                         (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset };
    157 
    158                         __submit_wait
    159                 #endif
    160         }
    161 #endif
    162 
    163 int cfa_fsync(int fd) {
     209int cfa_fsync(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    164210        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FSYNC)
    165211                return fsync(fd);
     
    173219}
    174220
    175 int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags) {
     221int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    176222        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SYNC_FILE_RANGE)
    177223                return sync_file_range(fd, offset, nbytes, flags);
     
    189235
    190236
    191 ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags) {
     237ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    192238        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SENDMSG)
    193239                return sendmsg(sockfd, msg, flags);
     
    202248}
    203249
    204 ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags) {
     250ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    205251        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_RECVMSG)
    206252                return recvmsg(sockfd, msg, flags);
     
    215261}
    216262
    217 ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags) {
     263ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    218264        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SEND)
    219265                return send( sockfd, buf, len, flags );
     
    230276}
    231277
    232 ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags) {
     278ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    233279        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_RECV)
    234280                return recv( sockfd, buf, len, flags );
     
    245291}
    246292
    247 int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
     293int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    248294        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ACCEPT)
    249295                return accept4( sockfd, addr, addrlen, flags );
     
    260306}
    261307
    262 int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) {
     308int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    263309        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_CONNECT)
    264310                return connect( sockfd, addr, addrlen );
     
    274320}
    275321
    276 int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len) {
     322int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    277323        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FALLOCATE)
    278324                return fallocate( fd, mode, offset, len );
     
    289335}
    290336
    291 int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice) {
     337int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    292338        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FADVISE)
    293339                return posix_fadvise( fd, offset, len, advice );
     
    304350}
    305351
    306 int cfa_madvise(void *addr, size_t length, int advice) {
     352int cfa_madvise(void *addr, size_t length, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    307353        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_MADVISE)
    308354                return madvise( addr, length, advice );
     
    319365}
    320366
    321 int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode) {
     367int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    322368        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_OPENAT)
    323369                return openat( dirfd, pathname, flags, mode );
     
    334380}
    335381
    336 int cfa_close(int fd) {
     382int cfa_close(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    337383        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_CLOSE)
    338384                return close( fd );
     
    348394// Forward declare in case it is not supported
    349395struct statx;
    350 int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf) {
     396int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    351397        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_STATX)
    352398                #if defined(__NR_statx)
     
    360406
    361407                (*sqe){ IORING_OP_STATX, dirfd, pathname, mask, (uint64_t)statxbuf };
    362                 sqe->flags = flags;
    363 
    364                 __submit_wait
    365         #endif
    366 }
    367 
    368 ssize_t cfa_read(int fd, void *buf, size_t count) {
     408                sqe->statx_flags = flags;
     409
     410                __submit_wait
     411        #endif
     412}
     413
     414ssize_t cfa_read(int fd, void *buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    369415        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READ)
    370416                return read( fd, buf, count );
     
    378424}
    379425
    380 ssize_t cfa_write(int fd, void *buf, size_t count) {
     426ssize_t cfa_write(int fd, void *buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    381427        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITE)
    382428                return read( fd, buf, count );
     
    390436}
    391437
    392 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags) {
     438ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    393439        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SPLICE)
    394440                return splice( fd_in, off_in, fd_out, off_out, len, flags );
     
    399445                sqe->splice_fd_in  = fd_in;
    400446                sqe->splice_off_in = off_in;
    401                 sqe->splice_flags  = flags;
    402 
    403                 __submit_wait
    404         #endif
    405 }
    406 
    407 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int in_flags, int out_flags) {
    408         #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SPLICE)
    409                 return splice( fd_in, off_in, fd_out, off_out, len, flags );
    410         #else
    411                 __submit_prelude
    412 
    413                 (*sqe){ IORING_OP_SPLICE, fd_out, 0p, len, off_out };
    414                 sqe->splice_fd_in  = fd_in;
    415                 sqe->splice_off_in = off_in;
    416                 sqe->splice_flags  = flags | out_flags;
    417                 sqe->flags = in_flags;
    418 
    419                 __submit_wait
    420         #endif
    421 }
    422 
    423 ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags) {
     447                sqe->splice_flags  = flags | (SPLICE_FLAGS & submit_flags);
     448
     449                __submit_wait
     450        #endif
     451}
     452
     453ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    424454        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_TEE)
    425455                return tee( fd_in, fd_out, len, flags );
     
    429459                (*sqe){ IORING_OP_TEE, fd_out, 0p, len, 0 };
    430460                sqe->splice_fd_in = fd_in;
    431                 sqe->splice_flags = flags;
     461                sqe->splice_flags  = flags | (SPLICE_FLAGS & submit_flags);
    432462
    433463                __submit_wait
     
    536566
    537567                if( /*func == (fptr_t)splice || */
    538                         func == (fptr_t)(ssize_t (*)(int, loff_t *, int, loff_t *, size_t, unsigned int))cfa_splice,
    539                         func == (fptr_t)(ssize_t (*)(int, loff_t *, int, loff_t *, size_t, unsigned int, int, int))cfa_splice )
     568                        func == (fptr_t)cfa_splice )
    540569                        #define _CFA_IO_FEATURE_CFA_HAVE_IORING_OP_SPLICE ,
    541570                        return IS_DEFINED(CFA_HAVE_IORING_OP_SPLICE);
  • libcfa/src/concurrency/iofwd.hfa

    r6dba8755 r95789be  
    1919extern "C" {
    2020        #include <sys/types.h>
     21        #if CFA_HAVE_LINUX_IO_URING_H
     22                #include <linux/io_uring.h>
     23        #endif
    2124}
    2225#include "bits/defs.hfa"
     26#include "time.hfa"
     27
     28#if defined(CFA_HAVE_IOSQE_FIXED_FILE)
     29        #define CFA_IO_FIXED_FD1 IOSQE_FIXED_FILE
     30#endif
     31#if defined(CFA_HAVE_SPLICE_F_FD_IN_FIXED)
     32        #define CFA_IO_FIXED_FD2 SPLICE_F_FD_IN_FIXED
     33#endif
     34#if defined(CFA_HAVE_IOSQE_IO_DRAIN)
     35        #define CFA_IO_DRAIN IOSQE_IO_DRAIN
     36#endif
     37#if defined(CFA_HAVE_IOSQE_ASYNC)
     38        #define CFA_IO_ASYNC IOSQE_ASYNC
     39#endif
     40
     41struct cluster;
     42struct io_context;
     43struct io_cancellation;
    2344
    2445struct iovec;
     
    2748struct statx;
    2849
    29 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
    30 extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
    31 extern int cfa_fsync(int fd);
    32 extern int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags);
    33 extern ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags);
    34 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags);
    35 extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags);
    36 extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags);
    37 extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
    38 extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
    39 extern int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len);
    40 extern int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice);
    41 extern int cfa_madvise(void *addr, size_t length, int advice);
    42 extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode);
    43 extern int cfa_close(int fd);
    44 extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf);
    45 extern ssize_t cfa_read(int fd, void *buf, size_t count);
    46 extern ssize_t cfa_write(int fd, void *buf, size_t count);
    47 extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags);
    48 extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags);
     50extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     51extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     52extern int cfa_fsync(int fd, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     53extern int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     54extern ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     55extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     56extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     57extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     58extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     59extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     60extern int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     61extern int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     62extern int cfa_madvise(void *addr, size_t length, int advice, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     63extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     64extern int cfa_close(int fd, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     65extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     66extern ssize_t cfa_read(int fd, void *buf, size_t count, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     67extern ssize_t cfa_write(int fd, void *buf, size_t count, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     68extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     69extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
    4970
    5071//-----------------------------------------------------------------------------
    5172// Check if a function is blocks a only the user thread
    5273bool has_user_level_blocking( fptr_t func );
     74
     75//-----------------------------------------------------------------------------
     76void register_fixed_files( io_context & ctx , int * files, unsigned count );
     77void register_fixed_files( cluster    & cltr, int * files, unsigned count );
  • libcfa/src/concurrency/kernel.cfa

    r6dba8755 r95789be  
    1818
    1919//C Includes
    20 #include <stddef.h>
    2120#include <errno.h>
    22 #include <string.h>
    2321#include <stdio.h>
    24 #include <fenv.h>
    2522#include <signal.h>
    2623#include <unistd.h>
    27 #include <limits.h>                                                                             // PTHREAD_STACK_MIN
    28 #include <sys/mman.h>                                                                   // mprotect
    29 extern "C" {
    30 #include <sys/resource.h>
    31 }
    3224
    3325//CFA Includes
    34 #include "time.hfa"
    3526#include "kernel_private.hfa"
    3627#include "preemption.hfa"
    37 #include "startup.hfa"
    3828
    3929//Private includes
     
    4535// Some assembly required
    4636#if defined( __i386 )
    47         #define CtxGet( ctx )        \
    48                 __asm__ volatile (     \
    49                         "movl %%esp,%0\n"\
    50                         "movl %%ebp,%1\n"\
    51                         : "=rm" (ctx.SP),\
    52                                 "=rm" (ctx.FP) \
    53                 )
    54 
    5537        // mxcr : SSE Status and Control bits (control bits are preserved across function calls)
    5638        // fcw  : X87 FPU control word (preserved across function calls)
     
    7456
    7557#elif defined( __x86_64 )
    76         #define CtxGet( ctx )        \
    77                 __asm__ volatile (     \
    78                         "movq %%rsp,%0\n"\
    79                         "movq %%rbp,%1\n"\
    80                         : "=rm" (ctx.SP),\
    81                                 "=rm" (ctx.FP) \
    82                 )
    83 
    8458        #define __x87_store         \
    8559                uint32_t __mxcr;      \
     
    10276
    10377#elif defined( __ARM_ARCH )
    104 #define CtxGet( ctx ) __asm__ ( \
    105                 "mov %0,%%sp\n"   \
    106                 "mov %1,%%r11\n"   \
    107         : "=rm" (ctx.SP), "=rm" (ctx.FP) )
    10878#else
    10979        #error unknown hardware architecture
    11080#endif
    11181
    112 //-----------------------------------------------------------------------------
    113 //Start and stop routine for the kernel, declared first to make sure they run first
    114 static void __kernel_startup (void) __attribute__(( constructor( STARTUP_PRIORITY_KERNEL ) ));
    115 static void __kernel_shutdown(void) __attribute__(( destructor ( STARTUP_PRIORITY_KERNEL ) ));
     82extern $thread * mainThread;
     83extern processor * mainProcessor;
    11684
    11785//-----------------------------------------------------------------------------
     
    12088static bool __has_next_thread(cluster * this);
    12189static void __run_thread(processor * this, $thread * dst);
    122 static bool __wake_proc(processor *);
    12390static bool __wake_one(struct __processor_id_t * id, cluster * cltr);
    12491static void __halt(processor * this);
    125 
    126 //-----------------------------------------------------------------------------
    127 // Kernel storage
    128 KERNEL_STORAGE(cluster,              mainCluster);
    129 KERNEL_STORAGE(processor,            mainProcessor);
    130 KERNEL_STORAGE($thread,              mainThread);
    131 KERNEL_STORAGE(__stack_t,            mainThreadCtx);
    132 KERNEL_STORAGE(__scheduler_RWLock_t, __scheduler_lock);
    133 #if !defined(__CFA_NO_STATISTICS__)
    134 KERNEL_STORAGE(__stats_t, mainProcStats);
    135 #endif
    136 
    137 cluster              * mainCluster;
    138 processor            * mainProcessor;
    139 $thread              * mainThread;
    140 __scheduler_RWLock_t * __scheduler_lock;
    141 
    142 extern "C" {
    143         struct { __dllist_t(cluster) list; __spinlock_t lock; } __cfa_dbg_global_clusters;
    144 }
    145 
    146 size_t __page_size = 0;
    147 
    148 //-----------------------------------------------------------------------------
    149 // Global state
    150 thread_local struct KernelThreadData kernelTLS __attribute__ ((tls_model ( "initial-exec" ))) @= {
    151         NULL,                                                                                           // cannot use 0p
    152         NULL,
    153         NULL,
    154         { 1, false, false },
    155 };
    156 
    157 //-----------------------------------------------------------------------------
    158 // Struct to steal stack
    159 struct current_stack_info_t {
    160         __stack_t * storage;                                                            // pointer to stack object
    161         void * base;                                                                            // base of stack
    162         void * limit;                                                                           // stack grows towards stack limit
    163         void * context;                                                                         // address of cfa_context_t
    164 };
    165 
    166 void ?{}( current_stack_info_t & this ) {
    167         __stack_context_t ctx;
    168         CtxGet( ctx );
    169         this.base = ctx.FP;
    170 
    171         rlimit r;
    172         getrlimit( RLIMIT_STACK, &r);
    173         size_t size = r.rlim_cur;
    174 
    175         this.limit = (void *)(((intptr_t)this.base) - size);
    176         this.context = &storage_mainThreadCtx;
    177 }
    178 
    179 //-----------------------------------------------------------------------------
    180 // Main thread construction
    181 
    182 void ?{}( $coroutine & this, current_stack_info_t * info) with( this ) {
    183         stack.storage = info->storage;
    184         with(*stack.storage) {
    185                 limit     = info->limit;
    186                 base      = info->base;
    187         }
    188         __attribute__((may_alias)) intptr_t * istorage = (intptr_t*) &stack.storage;
    189         *istorage |= 0x1;
    190         name = "Main Thread";
    191         state = Start;
    192         starter = 0p;
    193         last = 0p;
    194         cancellation = 0p;
    195 }
    196 
    197 void ?{}( $thread & this, current_stack_info_t * info) with( this ) {
    198         ticket = 1;
    199         state = Start;
    200         self_cor{ info };
    201         curr_cor = &self_cor;
    202         curr_cluster = mainCluster;
    203         self_mon.owner = &this;
    204         self_mon.recursion = 1;
    205         self_mon_p = &self_mon;
    206         link.next = 0p;
    207         link.prev = 0p;
    208 
    209         node.next = 0p;
    210         node.prev = 0p;
    211         doregister(curr_cluster, this);
    212 
    213         monitors{ &self_mon_p, 1, (fptr_t)0 };
    214 }
    215 
    216 //-----------------------------------------------------------------------------
    217 // Processor coroutine
    218 void ?{}(processorCtx_t & this) {
    219 
    220 }
    221 
    222 // Construct the processor context of non-main processors
    223 static void ?{}(processorCtx_t & this, processor * proc, current_stack_info_t * info) {
    224         (this.__cor){ info };
    225         this.proc = proc;
    226 }
    227 
    228 static void * __invoke_processor(void * arg);
    229 
    230 static init(processor & this, const char name[], cluster & _cltr) with( this ) {
    231         this.name = name;
    232         this.cltr = &_cltr;
    233         id = -1u;
    234         destroyer = 0p;
    235         do_terminate = false;
    236         preemption_alarm = 0p;
    237         pending_preemption = false;
    238 
    239         #if !defined(__CFA_NO_STATISTICS__)
    240                 print_stats = 0;
    241                 print_halts = false;
    242         #endif
    243 
    244         int target = __atomic_add_fetch( &cltr->nprocessors, 1u, __ATOMIC_SEQ_CST );
    245 
    246         id = doregister((__processor_id_t*)&this);
    247 
    248         // Lock the RWlock so no-one pushes/pops while we are changing the queue
    249         uint_fast32_t last_size = ready_mutate_lock();
    250 
    251                 // Adjust the ready queue size
    252                 ready_queue_grow( cltr, target );
    253 
    254         // Unlock the RWlock
    255         ready_mutate_unlock( last_size );
    256 
    257         __cfadbg_print_safe(runtime_core, "Kernel : core %p created\n", &this);
    258 }
    259 
    260 // Not a ctor, it just preps the destruction but should not destroy members
    261 void deinit(processor & this) {
    262 
    263         int target = __atomic_sub_fetch( &this.cltr->nprocessors, 1u, __ATOMIC_SEQ_CST );
    264 
    265         // Lock the RWlock so no-one pushes/pops while we are changing the queue
    266         uint_fast32_t last_size = ready_mutate_lock();
    267 
    268                 // Adjust the ready queue size
    269                 ready_queue_shrink( this.cltr, target );
    270 
    271                 // Make sure we aren't on the idle queue
    272                 unsafe_remove( this.cltr->idles, &this );
    273 
    274         // Unlock the RWlock
    275         ready_mutate_unlock( last_size );
    276 
    277         // Finally we don't need the read_lock any more
    278         unregister((__processor_id_t*)&this);
    279 }
    280 
    281 void ?{}(processor & this, const char name[], cluster & _cltr) {
    282         ( this.idle ){};
    283         ( this.terminated ){ 0 };
    284         ( this.runner ){};
    285         init( this, name, _cltr );
    286 
    287         __cfadbg_print_safe(runtime_core, "Kernel : Starting core %p\n", &this);
    288 
    289         this.stack = __create_pthread( &this.kernel_thread, __invoke_processor, (void *)&this );
    290 
    291 }
    292 
    293 void ^?{}(processor & this) with( this ){
    294         if( ! __atomic_load_n(&do_terminate, __ATOMIC_ACQUIRE) ) {
    295                 __cfadbg_print_safe(runtime_core, "Kernel : core %p signaling termination\n", &this);
    296 
    297                 __atomic_store_n(&do_terminate, true, __ATOMIC_RELAXED);
    298                 __wake_proc( &this );
    299 
    300                 P( terminated );
    301                 verify( kernelTLS.this_processor != &this);
    302         }
    303 
    304         int err = pthread_join( kernel_thread, 0p );
    305         if( err != 0 ) abort("KERNEL ERROR: joining processor %p caused error %s\n", &this, strerror(err));
    306 
    307         free( this.stack );
    308 
    309         deinit( this );
    310 }
    311 
    312 void ?{}(cluster & this, const char name[], Duration preemption_rate, unsigned io_flags) with( this ) {
    313         this.name = name;
    314         this.preemption_rate = preemption_rate;
    315         this.nprocessors = 0;
    316         ready_queue{};
    317 
    318         #if !defined(__CFA_NO_STATISTICS__)
    319                 print_stats = 0;
    320                 stats = alloc();
    321                 __init_stats( stats );
    322         #endif
    323 
    324         threads{ __get };
    325 
    326         doregister(this);
    327 
    328         // Lock the RWlock so no-one pushes/pops while we are changing the queue
    329         uint_fast32_t last_size = ready_mutate_lock();
    330 
    331                 // Adjust the ready queue size
    332                 ready_queue_grow( &this, 0 );
    333 
    334         // Unlock the RWlock
    335         ready_mutate_unlock( last_size );
    336 
    337 
    338         __kernel_io_startup( this, io_flags, &this == mainCluster );
    339 }
    340 
    341 void ^?{}(cluster & this) {
    342         __kernel_io_shutdown( this, &this == mainCluster );
    343 
    344         // Lock the RWlock so no-one pushes/pops while we are changing the queue
    345         uint_fast32_t last_size = ready_mutate_lock();
    346 
    347                 // Adjust the ready queue size
    348                 ready_queue_shrink( &this, 0 );
    349 
    350         // Unlock the RWlock
    351         ready_mutate_unlock( last_size );
    352 
    353         #if !defined(__CFA_NO_STATISTICS__)
    354                 if( 0 != this.print_stats ) {
    355                         __print_stats( this.stats, this.print_stats, true, this.name, (void*)&this );
    356                 }
    357                 free( this.stats );
    358         #endif
    359 
    360         unregister(this);
    361 }
     92bool __wake_proc(processor *);
    36293
    36394//=============================================================================================
     
    550281}
    551282
    552 // KERNEL_ONLY
    553 // Context invoker for processors
    554 // This is the entry point for processors (kernel threads)
    555 // It effectively constructs a coroutine by stealing the pthread stack
    556 static void * __invoke_processor(void * arg) {
    557         #if !defined( __CFA_NO_STATISTICS__ )
    558                 __stats_t local_stats;
    559                 __init_stats( &local_stats );
    560                 kernelTLS.this_stats = &local_stats;
    561         #endif
    562 
    563         processor * proc = (processor *) arg;
    564         kernelTLS.this_processor = proc;
    565         kernelTLS.this_thread    = 0p;
    566         kernelTLS.preemption_state.[enabled, disable_count] = [false, 1];
    567         // SKULLDUGGERY: We want to create a context for the processor coroutine
    568         // which is needed for the 2-step context switch. However, there is no reason
    569         // to waste the perfectly valid stack create by pthread.
    570         current_stack_info_t info;
    571         __stack_t ctx;
    572         info.storage = &ctx;
    573         (proc->runner){ proc, &info };
    574 
    575         __cfaabi_dbg_print_safe("Coroutine : created stack %p\n", get_coroutine(proc->runner)->stack.storage);
    576 
    577         //Set global state
    578         kernelTLS.this_thread = 0p;
    579 
    580         //We now have a proper context from which to schedule threads
    581         __cfadbg_print_safe(runtime_core, "Kernel : core %p created (%p, %p)\n", proc, &proc->runner, &ctx);
    582 
    583         // SKULLDUGGERY: Since the coroutine doesn't have its own stack, we can't
    584         // resume it to start it like it normally would, it will just context switch
    585         // back to here. Instead directly call the main since we already are on the
    586         // appropriate stack.
    587         get_coroutine(proc->runner)->state = Active;
    588         main( proc->runner );
    589         get_coroutine(proc->runner)->state = Halted;
    590 
    591         // Main routine of the core returned, the core is now fully terminated
    592         __cfadbg_print_safe(runtime_core, "Kernel : core %p main ended (%p)\n", proc, &proc->runner);
    593 
    594         #if !defined(__CFA_NO_STATISTICS__)
    595                 __tally_stats(proc->cltr->stats, &local_stats);
    596                 if( 0 != proc->print_stats ) {
    597                         __print_stats( &local_stats, proc->print_stats, true, proc->name, (void*)proc );
    598                 }
    599         #endif
    600 
    601         return 0p;
    602 }
    603 
    604 static void Abort( int ret, const char func[] ) {
    605         if ( ret ) {                                                                            // pthread routines return errno values
    606                 abort( "%s : internal error, error(%d) %s.", func, ret, strerror( ret ) );
    607         } // if
    608 } // Abort
    609 
    610 void * __create_pthread( pthread_t * pthread, void * (*start)(void *), void * arg ) {
    611         pthread_attr_t attr;
    612 
    613         Abort( pthread_attr_init( &attr ), "pthread_attr_init" ); // initialize attribute
    614 
    615         size_t stacksize;
    616         // default stack size, normally defined by shell limit
    617         Abort( pthread_attr_getstacksize( &attr, &stacksize ), "pthread_attr_getstacksize" );
    618         assert( stacksize >= PTHREAD_STACK_MIN );
    619 
    620         void * stack;
    621         __cfaabi_dbg_debug_do(
    622                 stack = memalign( __page_size, stacksize + __page_size );
    623                 // pthread has no mechanism to create the guard page in user supplied stack.
    624                 if ( mprotect( stack, __page_size, PROT_NONE ) == -1 ) {
    625                         abort( "mprotect : internal error, mprotect failure, error(%d) %s.", errno, strerror( errno ) );
    626                 } // if
    627         );
    628         __cfaabi_dbg_no_debug_do(
    629                 stack = malloc( stacksize );
    630         );
    631 
    632         Abort( pthread_attr_setstack( &attr, stack, stacksize ), "pthread_attr_setstack" );
    633 
    634         Abort( pthread_create( pthread, &attr, start, arg ), "pthread_create" );
    635         return stack;
    636 }
    637 
    638 // KERNEL_ONLY
    639 static void __kernel_first_resume( processor * this ) {
    640         $thread * src = mainThread;
    641         $coroutine * dst = get_coroutine(this->runner);
    642 
    643         verify( ! kernelTLS.preemption_state.enabled );
    644 
    645         kernelTLS.this_thread->curr_cor = dst;
    646         __stack_prepare( &dst->stack, 65000 );
    647         __cfactx_start(main, dst, this->runner, __cfactx_invoke_coroutine);
    648 
    649         verify( ! kernelTLS.preemption_state.enabled );
    650 
    651         dst->last = &src->self_cor;
    652         dst->starter = dst->starter ? dst->starter : &src->self_cor;
    653 
    654         // make sure the current state is still correct
    655         /* paranoid */ verify(src->state == Ready);
    656 
    657         // context switch to specified coroutine
    658         verify( dst->context.SP );
    659         __cfactx_switch( &src->context, &dst->context );
    660         // when __cfactx_switch returns we are back in the src coroutine
    661 
    662         mainThread->curr_cor = &mainThread->self_cor;
    663 
    664         // make sure the current state has been update
    665         /* paranoid */ verify(src->state == Active);
    666 
    667         verify( ! kernelTLS.preemption_state.enabled );
    668 }
    669 
    670 // KERNEL_ONLY
    671 static void __kernel_last_resume( processor * this ) {
    672         $coroutine * src = &mainThread->self_cor;
    673         $coroutine * dst = get_coroutine(this->runner);
    674 
    675         verify( ! kernelTLS.preemption_state.enabled );
    676         verify( dst->starter == src );
    677         verify( dst->context.SP );
    678 
    679         // SKULLDUGGERY in debug the processors check that the
    680         // stack is still within the limit of the stack limits after running a thread.
    681         // that check doesn't make sense if we context switch to the processor using the
    682         // coroutine semantics. Since this is a special case, use the current context
    683         // info to populate these fields.
    684         __cfaabi_dbg_debug_do(
    685                 __stack_context_t ctx;
    686                 CtxGet( ctx );
    687                 mainThread->context.SP = ctx.SP;
    688                 mainThread->context.FP = ctx.FP;
    689         )
    690 
    691         // context switch to the processor
    692         __cfactx_switch( &src->context, &dst->context );
    693 }
    694 
    695283//-----------------------------------------------------------------------------
    696284// Scheduler routines
     
    834422
    835423//=============================================================================================
    836 // Kernel Setup logic
    837 //=============================================================================================
    838 //-----------------------------------------------------------------------------
    839 // Kernel boot procedures
    840 static void __kernel_startup(void) {
    841         verify( ! kernelTLS.preemption_state.enabled );
    842         __cfadbg_print_safe(runtime_core, "Kernel : Starting\n");
    843 
    844         __page_size = sysconf( _SC_PAGESIZE );
    845 
    846         __cfa_dbg_global_clusters.list{ __get };
    847         __cfa_dbg_global_clusters.lock{};
    848 
    849         // Initialize the global scheduler lock
    850         __scheduler_lock = (__scheduler_RWLock_t*)&storage___scheduler_lock;
    851         (*__scheduler_lock){};
    852 
    853         // Initialize the main cluster
    854         mainCluster = (cluster *)&storage_mainCluster;
    855         (*mainCluster){"Main Cluster"};
    856 
    857         __cfadbg_print_safe(runtime_core, "Kernel : Main cluster ready\n");
    858 
    859         // Start by initializing the main thread
    860         // SKULLDUGGERY: the mainThread steals the process main thread
    861         // which will then be scheduled by the mainProcessor normally
    862         mainThread = ($thread *)&storage_mainThread;
    863         current_stack_info_t info;
    864         info.storage = (__stack_t*)&storage_mainThreadCtx;
    865         (*mainThread){ &info };
    866 
    867         __cfadbg_print_safe(runtime_core, "Kernel : Main thread ready\n");
    868 
    869 
    870 
    871         // Construct the processor context of the main processor
    872         void ?{}(processorCtx_t & this, processor * proc) {
    873                 (this.__cor){ "Processor" };
    874                 this.__cor.starter = 0p;
    875                 this.proc = proc;
    876         }
    877 
    878         void ?{}(processor & this) with( this ) {
    879                 ( this.idle ){};
    880                 ( this.terminated ){ 0 };
    881                 ( this.runner ){};
    882                 init( this, "Main Processor", *mainCluster );
    883                 kernel_thread = pthread_self();
    884 
    885                 runner{ &this };
    886                 __cfadbg_print_safe(runtime_core, "Kernel : constructed main processor context %p\n", &runner);
    887         }
    888 
    889         // Initialize the main processor and the main processor ctx
    890         // (the coroutine that contains the processing control flow)
    891         mainProcessor = (processor *)&storage_mainProcessor;
    892         (*mainProcessor){};
    893 
    894         //initialize the global state variables
    895         kernelTLS.this_processor = mainProcessor;
    896         kernelTLS.this_thread    = mainThread;
    897 
    898         #if !defined( __CFA_NO_STATISTICS__ )
    899                 kernelTLS.this_stats = (__stats_t *)& storage_mainProcStats;
    900                 __init_stats( kernelTLS.this_stats );
    901         #endif
    902 
    903         // Enable preemption
    904         kernel_start_preemption();
    905 
    906         // Add the main thread to the ready queue
    907         // once resume is called on mainProcessor->runner the mainThread needs to be scheduled like any normal thread
    908         __schedule_thread((__processor_id_t *)mainProcessor, mainThread);
    909 
    910         // SKULLDUGGERY: Force a context switch to the main processor to set the main thread's context to the current UNIX
    911         // context. Hence, the main thread does not begin through __cfactx_invoke_thread, like all other threads. The trick here is that
    912         // mainThread is on the ready queue when this call is made.
    913         __kernel_first_resume( kernelTLS.this_processor );
    914 
    915 
    916         // THE SYSTEM IS NOW COMPLETELY RUNNING
    917 
    918 
    919         // Now that the system is up, finish creating systems that need threading
    920         __kernel_io_finish_start( *mainCluster );
    921 
    922 
    923         __cfadbg_print_safe(runtime_core, "Kernel : Started\n--------------------------------------------------\n\n");
    924 
    925         verify( ! kernelTLS.preemption_state.enabled );
    926         enable_interrupts( __cfaabi_dbg_ctx );
    927         verify( TL_GET( preemption_state.enabled ) );
    928 }
    929 
    930 static void __kernel_shutdown(void) {
    931         //Before we start shutting things down, wait for systems that need threading to shutdown
    932         __kernel_io_prepare_stop( *mainCluster );
    933 
    934         /* paranoid */ verify( TL_GET( preemption_state.enabled ) );
    935         disable_interrupts();
    936         /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    937 
    938         __cfadbg_print_safe(runtime_core, "\n--------------------------------------------------\nKernel : Shutting down\n");
    939 
    940         // SKULLDUGGERY: Notify the mainProcessor it needs to terminates.
    941         // When its coroutine terminates, it return control to the mainThread
    942         // which is currently here
    943         __atomic_store_n(&mainProcessor->do_terminate, true, __ATOMIC_RELEASE);
    944         __kernel_last_resume( kernelTLS.this_processor );
    945         mainThread->self_cor.state = Halted;
    946 
    947         // THE SYSTEM IS NOW COMPLETELY STOPPED
    948 
    949         // Disable preemption
    950         kernel_stop_preemption();
    951 
    952         // Destroy the main processor and its context in reverse order of construction
    953         // These were manually constructed so we need manually destroy them
    954         void ^?{}(processor & this) with( this ){
    955                 deinit( this );
    956 
    957                 /* paranoid */ verify( this.do_terminate == true );
    958                 __cfaabi_dbg_print_safe("Kernel : destroyed main processor context %p\n", &runner);
    959         }
    960 
    961         ^(*mainProcessor){};
    962 
    963         // Final step, destroy the main thread since it is no longer needed
    964 
    965         // Since we provided a stack to this taxk it will not destroy anything
    966         /* paranoid */ verify(mainThread->self_cor.stack.storage == (__stack_t*)(((uintptr_t)&storage_mainThreadCtx)| 0x1));
    967         ^(*mainThread){};
    968 
    969         ^(*mainCluster){};
    970 
    971         ^(*__scheduler_lock){};
    972 
    973         ^(__cfa_dbg_global_clusters.list){};
    974         ^(__cfa_dbg_global_clusters.lock){};
    975 
    976         __cfadbg_print_safe(runtime_core, "Kernel : Shutdown complete\n");
    977 }
    978 
    979 //=============================================================================================
    980424// Kernel Idle Sleep
    981425//=============================================================================================
     
    997441
    998442// Unconditionnaly wake a thread
    999 static bool __wake_proc(processor * this) {
     443bool __wake_proc(processor * this) {
    1000444        __cfadbg_print_safe(runtime_core, "Kernel : waking Processor %p\n", this);
    1001445
     
    1173617
    1174618//-----------------------------------------------------------------------------
    1175 // Global Queues
    1176 void doregister( cluster     & cltr ) {
    1177         lock      ( __cfa_dbg_global_clusters.lock __cfaabi_dbg_ctx2);
    1178         push_front( __cfa_dbg_global_clusters.list, cltr );
    1179         unlock    ( __cfa_dbg_global_clusters.lock );
    1180 }
    1181 
    1182 void unregister( cluster     & cltr ) {
    1183         lock  ( __cfa_dbg_global_clusters.lock __cfaabi_dbg_ctx2);
    1184         remove( __cfa_dbg_global_clusters.list, cltr );
    1185         unlock( __cfa_dbg_global_clusters.lock );
    1186 }
    1187 
    1188 void doregister( cluster * cltr, $thread & thrd ) {
    1189         lock      (cltr->thread_list_lock __cfaabi_dbg_ctx2);
    1190         cltr->nthreads += 1;
    1191         push_front(cltr->threads, thrd);
    1192         unlock    (cltr->thread_list_lock);
    1193 }
    1194 
    1195 void unregister( cluster * cltr, $thread & thrd ) {
    1196         lock  (cltr->thread_list_lock __cfaabi_dbg_ctx2);
    1197         remove(cltr->threads, thrd );
    1198         cltr->nthreads -= 1;
    1199         unlock(cltr->thread_list_lock);
    1200 }
    1201 
    1202 //-----------------------------------------------------------------------------
    1203619// Debug
    1204620__cfaabi_dbg_debug_do(
  • libcfa/src/concurrency/kernel.hfa

    r6dba8755 r95789be  
    1616#pragma once
    1717
    18 #include <stdbool.h>
    19 #include <stdint.h>
    20 
    2118#include "invoke.h"
    2219#include "time_t.hfa"
     
    2623
    2724extern "C" {
    28 #include <pthread.h>
    29 #include <semaphore.h>
     25#include <bits/pthreadtypes.h>
    3026}
    3127
     
    129125struct __io_data;
    130126
    131 #define CFA_CLUSTER_IO_POLLER_USER_THREAD    (1 << 0) // 0x01
    132 #define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS (1 << 1) // 0x02
    133 #define CFA_CLUSTER_IO_EAGER_SUBMITS         (1 << 2) // 0x04
    134 #define CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS   (1 << 3) // 0x08
    135 #define CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES (1 << 4) // 0x10
    136 #define CFA_CLUSTER_IO_BUFFLEN_OFFSET        16
    137 
     127// IO poller user-thread
     128// Not using the "thread" keyword because we want to control
     129// more carefully when to start/stop it
     130struct $io_ctx_thread {
     131        struct __io_data * ring;
     132        single_sem sem;
     133        volatile bool done;
     134        $thread self;
     135};
     136
     137
     138struct io_context {
     139        $io_ctx_thread thrd;
     140};
     141
     142struct io_context_params {
     143        int num_entries;
     144        int num_ready;
     145        int submit_aff;
     146        bool eager_submits:1;
     147        bool poller_submits:1;
     148        bool poll_submit:1;
     149        bool poll_complete:1;
     150};
     151
     152void  ?{}(io_context_params & this);
     153
     154void  ?{}(io_context & this, struct cluster & cl);
     155void  ?{}(io_context & this, struct cluster & cl, const io_context_params & params);
     156void ^?{}(io_context & this);
     157
     158struct io_cancellation {
     159        uint32_t target;
     160};
     161
     162static inline void  ?{}(io_cancellation & this) { this.target = -1u; }
     163static inline void ^?{}(io_cancellation & this) {}
     164bool cancel(io_cancellation & this);
    138165
    139166//-----------------------------------------------------------------------------
     
    206233        } node;
    207234
    208         struct __io_data * io;
     235        struct {
     236                io_context * ctxs;
     237                unsigned cnt;
     238        } io;
    209239
    210240        #if !defined(__CFA_NO_STATISTICS__)
     
    215245extern Duration default_preemption();
    216246
    217 void ?{} (cluster & this, const char name[], Duration preemption_rate, unsigned flags);
     247void ?{} (cluster & this, const char name[], Duration preemption_rate, unsigned num_io, const io_context_params & io_params);
    218248void ^?{}(cluster & this);
    219249
    220 static inline void ?{} (cluster & this)                                           { this{"Anonymous Cluster", default_preemption(), 0}; }
    221 static inline void ?{} (cluster & this, Duration preemption_rate)                 { this{"Anonymous Cluster", preemption_rate, 0}; }
    222 static inline void ?{} (cluster & this, const char name[])                        { this{name, default_preemption(), 0}; }
    223 static inline void ?{} (cluster & this, unsigned flags)                           { this{"Anonymous Cluster", default_preemption(), flags}; }
    224 static inline void ?{} (cluster & this, Duration preemption_rate, unsigned flags) { this{"Anonymous Cluster", preemption_rate, flags}; }
    225 static inline void ?{} (cluster & this, const char name[], unsigned flags)        { this{name, default_preemption(), flags}; }
     250static inline void ?{} (cluster & this)                                            { io_context_params default_params;    this{"Anonymous Cluster", default_preemption(), 1, default_params}; }
     251static inline void ?{} (cluster & this, Duration preemption_rate)                  { io_context_params default_params;    this{"Anonymous Cluster", preemption_rate, 1, default_params}; }
     252static inline void ?{} (cluster & this, const char name[])                         { io_context_params default_params;    this{name, default_preemption(), 1, default_params}; }
     253static inline void ?{} (cluster & this, unsigned num_io)                           { io_context_params default_params;    this{"Anonymous Cluster", default_preemption(), num_io, default_params}; }
     254static inline void ?{} (cluster & this, Duration preemption_rate, unsigned num_io) { io_context_params default_params;    this{"Anonymous Cluster", preemption_rate, num_io, default_params}; }
     255static inline void ?{} (cluster & this, const char name[], unsigned num_io)        { io_context_params default_params;    this{name, default_preemption(), num_io, default_params}; }
     256static inline void ?{} (cluster & this, const io_context_params & io_params)                                            { this{"Anonymous Cluster", default_preemption(), 1, io_params}; }
     257static inline void ?{} (cluster & this, Duration preemption_rate, const io_context_params & io_params)                  { this{"Anonymous Cluster", preemption_rate, 1, io_params}; }
     258static inline void ?{} (cluster & this, const char name[], const io_context_params & io_params)                         { this{name, default_preemption(), 1, io_params}; }
     259static inline void ?{} (cluster & this, unsigned num_io, const io_context_params & io_params)                           { this{"Anonymous Cluster", default_preemption(), num_io, io_params}; }
     260static inline void ?{} (cluster & this, Duration preemption_rate, unsigned num_io, const io_context_params & io_params) { this{"Anonymous Cluster", preemption_rate, num_io, io_params}; }
     261static inline void ?{} (cluster & this, const char name[], unsigned num_io, const io_context_params & io_params)        { this{name, default_preemption(), num_io, io_params}; }
    226262
    227263static inline [cluster *&, cluster *& ] __get( cluster & this ) __attribute__((const)) { return this.node.[next, prev]; }
  • libcfa/src/concurrency/kernel_private.hfa

    r6dba8755 r95789be  
    2222#include "stats.hfa"
    2323
    24 #include "bits/random.hfa"
    25 
    26 
    2724//-----------------------------------------------------------------------------
    2825// Scheduler
     
    5350
    5451
    55 struct event_kernel_t {
    56         alarm_list_t alarms;
    57         __spinlock_t lock;
    58 };
    59 
    60 extern event_kernel_t * event_kernel;
    61 
    62 struct __cfa_kernel_preemption_state_t {
    63         bool enabled;
    64         bool in_progress;
    65         unsigned short disable_count;
    66 };
    67 
    68 extern volatile thread_local __cfa_kernel_preemption_state_t preemption_state __attribute__ ((tls_model ( "initial-exec" )));
    69 
    7052extern cluster * mainCluster;
    7153
     
    8466void __unpark( struct __processor_id_t *, $thread * thrd __cfaabi_dbg_ctx_param2 );
    8567
    86 //-----------------------------------------------------------------------------
    87 // I/O
    88 void __kernel_io_startup     ( cluster &, unsigned, bool );
    89 void __kernel_io_finish_start( cluster & );
    90 void __kernel_io_prepare_stop( cluster & );
    91 void __kernel_io_shutdown    ( cluster &, bool );
     68static inline bool __post(single_sem & this, struct __processor_id_t * id) {
     69        for() {
     70                struct $thread * expected = this.ptr;
     71                if(expected == 1p) return false;
     72                if(expected == 0p) {
     73                        if(__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     74                                return false;
     75                        }
     76                }
     77                else {
     78                        if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     79                                __unpark( id, expected __cfaabi_dbg_ctx2 );
     80                                return true;
     81                        }
     82                }
     83        }
     84}
    9285
    9386//-----------------------------------------------------------------------------
    9487// Utils
    95 #define KERNEL_STORAGE(T,X) __attribute((aligned(__alignof__(T)))) static char storage_##X[sizeof(T)]
    96 
    97 static inline uint64_t __tls_rand() {
    98         #if defined(__SIZEOF_INT128__)
    99                 return __lehmer64( kernelTLS.rand_seed );
    100         #else
    101                 return __xorshift64( kernelTLS.rand_seed );
    102         #endif
    103 }
    104 
    105 
    106 void doregister( struct cluster & cltr );
    107 void unregister( struct cluster & cltr );
    108 
    10988void doregister( struct cluster * cltr, struct $thread & thrd );
    11089void unregister( struct cluster * cltr, struct $thread & thrd );
     90
     91//-----------------------------------------------------------------------------
     92// I/O
     93void ^?{}(io_context & this, bool );
    11194
    11295//=======================================================================
     
    280263void ready_queue_shrink(struct cluster * cltr, int target);
    281264
    282 //-----------------------------------------------------------------------
    283 // IO user data
    284 struct __io_user_data_t {
    285         int32_t result;
    286         $thread * thrd;
    287 };
    288 
    289 //-----------------------------------------------------------------------
    290 // Statics call at the end of each thread to register statistics
    291 #if !defined(__CFA_NO_STATISTICS__)
    292         static inline struct __stats_t * __tls_stats() {
    293                 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    294                 /* paranoid */ verify( kernelTLS.this_stats );
    295                 return kernelTLS.this_stats;
    296         }
    297 
    298         #define __STATS__(in_kernel, ...) { \
    299                 if( !(in_kernel) ) disable_interrupts(); \
    300                 with( *__tls_stats() ) { \
    301                         __VA_ARGS__ \
    302                 } \
    303                 if( !(in_kernel) ) enable_interrupts( __cfaabi_dbg_ctx ); \
    304         }
    305 #else
    306         #define __STATS__(in_kernel, ...)
    307 #endif
    308265
    309266// Local Variables: //
  • libcfa/src/concurrency/preemption.cfa

    r6dba8755 r95789be  
    2626
    2727#include "bits/signal.hfa"
     28#include "kernel_private.hfa"
    2829
    2930#if !defined(__CFA_DEFAULT_PREEMPTION__)
     
    293294// Startup routine to activate preemption
    294295// Called from kernel_startup
    295 void kernel_start_preemption() {
     296void __kernel_alarm_startup() {
    296297        __cfaabi_dbg_print_safe( "Kernel : Starting preemption\n" );
    297298
     
    315316// Shutdown routine to deactivate preemption
    316317// Called from kernel_shutdown
    317 void kernel_stop_preemption() {
     318void __kernel_alarm_shutdown() {
    318319        __cfaabi_dbg_print_safe( "Kernel : Preemption stopping\n" );
    319320
  • libcfa/src/concurrency/preemption.hfa

    r6dba8755 r95789be  
    1616#pragma once
    1717
     18#include "bits/locks.hfa"
    1819#include "alarm.hfa"
    19 #include "kernel_private.hfa"
    2020
    21 void kernel_start_preemption();
    22 void kernel_stop_preemption();
     21struct event_kernel_t {
     22        alarm_list_t alarms;
     23        __spinlock_t lock;
     24};
     25
     26extern event_kernel_t * event_kernel;
     27
    2328void update_preemption( processor * this, Duration duration );
    2429
  • libcfa/src/concurrency/thread.hfa

    r6dba8755 r95789be  
    8484
    8585//-----------------------------------------------------------------------------
    86 // Thread getters
    87 static inline struct $thread * active_thread () { return TL_GET( this_thread ); }
    88 
    89 //-----------------------------------------------------------------------------
    9086// Scheduler API
    9187
     
    106102bool force_yield( enum __Preemption_Reason );
    107103
    108 static inline void yield() {
    109         force_yield(__MANUAL_PREEMPTION);
    110 }
    111 
    112 // Yield: yield N times
    113 static inline void yield( unsigned times ) {
    114         for( times ) {
    115                 yield();
    116         }
    117 }
    118 
    119104//----------
    120105// sleep: force thread to block and be rescheduled after Duration duration
  • src/cfa.make

    r6dba8755 r95789be  
    1 CFACOMPILE = $(CFACC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CFAFLAGS) $(CFAFLAGS) $(AM_CFLAGS) $(CFLAGS)
    2 LTCFACOMPILE = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \
     1AM_T_CFA = $(am__t_CFA_@AM_T@)
     2am__t_CFA_ =
     3am__t_CFA_0 =
     4am__t_CFA_1 = /usr/bin/time --quiet -f "$@ %E" # trailling space is necessary
     5
     6
     7CFACOMPILE = $(AM_T_CFA)$(CFACC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CFAFLAGS) $(CFAFLAGS) $(AM_CFLAGS) $(CFLAGS)
     8LTCFACOMPILE = $(AM_T_CFA)$(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \
    39        $(LIBTOOLFLAGS) --mode=compile $(CFACC) $(DEFS) \
    410        $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CFAFLAGS) $(AM_CFLAGS) $(CFAFLAGS) $(CFLAGS)
Note: See TracChangeset for help on using the changeset viewer.