Changes in / [6dba8755:95789be]
- Files:
-
- 4 added
- 20 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/readv.cfa
r6dba8755 r95789be 12 12 } 13 13 14 #include <errno.h> 14 15 #include <unistd.h> 15 16 16 17 #include <clock.hfa> 18 #include <iofwd.hfa> 17 19 #include <kernel.hfa> 18 20 #include <thread.hfa> … … 23 25 24 26 extern bool traceHeapOn(); 25 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);26 extern ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);27 extern void register_fixed_files( cluster &, int *, unsigned count );28 27 29 28 int fd; … … 31 30 volatile size_t count = 0; 32 31 33 unsigned long int buflen = 5 0;32 unsigned long int buflen = 512; 34 33 bool fixed_file = false; 35 34 … … 40 39 41 40 int do_read(int fd, struct iovec * iov) { 41 // extern ssize_t cfa_preadv2(int, const struct iovec *, int, off_t, int, int = 0, Duration = -1`s, io_cancellation * = 0p, io_context * = 0p); 42 int sflags = 0; 42 43 if(fixed_file) { 43 return cfa_preadv2_fixed(fd, iov, 1, 0, 0);44 sflags |= CFA_IO_FIXED_FD1; 44 45 } 45 else { 46 return cfa_preadv2(fd, iov, 1, 0, 0); 47 } 46 return cfa_preadv2(fd, iov, 1, 0, 0, sflags, -1`s, 0p, 0p); 48 47 } 49 48 … … 52 51 /* paranoid */ assert( true == __atomic_load_n(&run, __ATOMIC_RELAXED) ); 53 52 54 char data[buflen];53 __attribute__((aligned(512))) char data[buflen]; 55 54 struct iovec iov = { data, buflen }; 56 55 57 56 while(__atomic_load_n(&run, __ATOMIC_RELAXED)) { 58 57 int r = do_read(fd, &iov); 59 if(r < 0) abort("%s\n", strerror( -r));58 if(r < 0) abort("%s\n", strerror(errno)); 60 59 61 60 __atomic_fetch_add( &count, 1, __ATOMIC_SEQ_CST ); … … 65 64 int main(int argc, char * argv[]) { 66 65 BENCH_DECL 67 unsigned flags = 0; 66 unsigned num_io = 1; 67 io_context_params params; 68 68 int file_flags = 0; 69 69 unsigned sublen = 16; … … 74 74 BENCH_OPT_LONG 75 75 {"bufsize", required_argument, 0, 'b'}, 76 {"userthread", no_argument , 0, 'u'},77 76 {"submitthread", no_argument , 0, 's'}, 78 77 {"eagersubmit", no_argument , 0, 'e'}, 79 78 {"kpollsubmit", no_argument , 0, 'k'}, 80 79 {"kpollcomplete", no_argument , 0, 'i'}, 80 {"fixed-files", no_argument , 0, 'f'}, 81 {"open-direct", no_argument , 0, 'o'}, 81 82 {"submitlength", required_argument, 0, 'l'}, 82 83 {0, 0, 0, 0} … … 84 85 85 86 int idx = 0; 86 int opt = getopt_long(argc, argv, BENCH_OPT_SHORT "b: usekil:", options, &idx);87 int opt = getopt_long(argc, argv, BENCH_OPT_SHORT "b:sekil:", options, &idx); 87 88 88 89 const char * arg = optarg ? optarg : ""; … … 100 101 } 101 102 break; 102 case 'u':103 flags |= CFA_CLUSTER_IO_POLLER_USER_THREAD;104 break;105 103 case 's': 106 flags |= CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS;104 params.poller_submits = true; 107 105 break; 108 106 case 'e': 109 flags |= CFA_CLUSTER_IO_EAGER_SUBMITS;107 params.eager_submits = true; 110 108 break; 111 109 case 'k': 112 flags |= CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS; 110 params.poll_submit = true; 111 case 'f': 113 112 fixed_file = true; 114 113 break; 115 114 case 'i': 116 flags |= CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES; 115 params.poll_complete = true; 116 case 'o': 117 117 file_flags |= O_DIRECT; 118 118 break; … … 123 123 goto usage; 124 124 } 125 flags |= (sublen << CFA_CLUSTER_IO_BUFFLEN_OFFSET);125 // flags |= (sublen << CFA_CLUSTER_IO_BUFFLEN_OFFSET); 126 126 break; 127 127 default: /* ? */ … … 150 150 { 151 151 Time start, end; 152 BenchCluster cl = { flags, CFA_STATS_READY_Q | CFA_STATS_IO };152 BenchCluster cl = { num_io, params, CFA_STATS_READY_Q | CFA_STATS_IO }; 153 153 154 154 if(fixed_file) { … … 179 179 printf("\nDone\n"); 180 180 } 181 printf("Readers closed\n"); 181 182 } 182 183 printf("Took %'ld ms\n", (end - start)`ms); -
libcfa/configure
r6dba8755 r95789be 701 701 CFA_PREFIX 702 702 CFA_NAME 703 AM_T 703 704 BUILDLIB_FALSE 704 705 BUILDLIB_TRUE … … 3187 3188 BUILDLIB_FALSE= 3188 3189 fi 3190 3191 3192 AM_T='$(T)' 3189 3193 3190 3194 … … 17017 17021 17018 17022 17023 17019 17024 for ac_header in linux/io_uring.h 17020 17025 do : … … 19295 19300 19296 19301 19302 19303 fi 19304 19305 19306 19307 # check support for various io_uring flags 19308 19309 ac_fn_c_check_decl "$LINENO" "IOSQE_FIXED_FILE" "ac_cv_have_decl_IOSQE_FIXED_FILE" "#include <linux/io_uring.h> 19310 " 19311 if test "x$ac_cv_have_decl_IOSQE_FIXED_FILE" = xyes; then : 19312 $as_echo "#define CFA_HAVE_IOSQE_FIXED_FILE 1" >>confdefs.h 19313 19314 fi 19315 19316 19317 ac_fn_c_check_decl "$LINENO" "IOSQE_IO_DRAIN" "ac_cv_have_decl_IOSQE_IO_DRAIN" "#include <linux/io_uring.h> 19318 " 19319 if test "x$ac_cv_have_decl_IOSQE_IO_DRAIN" = xyes; then : 19320 $as_echo "#define CFA_HAVE_IOSQE_IO_DRAIN 1" >>confdefs.h 19321 19322 fi 19323 19324 19325 ac_fn_c_check_decl "$LINENO" "IOSQE_ASYNC" "ac_cv_have_decl_IOSQE_ASYNC" "#include <linux/io_uring.h> 19326 " 19327 if test "x$ac_cv_have_decl_IOSQE_ASYNC" = xyes; then : 19328 $as_echo "#define CFA_HAVE_IOSQE_ASYNC 1" >>confdefs.h 19329 19330 fi 19331 19332 19333 ac_fn_c_check_decl "$LINENO" "IOSQE_IO_LINK" "ac_cv_have_decl_IOSQE_IO_LINK" "#include <linux/io_uring.h> 19334 " 19335 if test "x$ac_cv_have_decl_IOSQE_IO_LINK" = xyes; then : 19336 $as_echo "#define CFA_HAVE_IOSQE_IO_LINK 1" >>confdefs.h 19337 19338 fi 19339 19340 19341 ac_fn_c_check_decl "$LINENO" "IOSQE_IO_HARDLINK" "ac_cv_have_decl_IOSQE_IO_HARDLINK" "#include <linux/io_uring.h> 19342 " 19343 if test "x$ac_cv_have_decl_IOSQE_IO_HARDLINK" = xyes; then : 19344 $as_echo "#define CFA_HAVE_IOSQE_IO_HARDLINK 1" >>confdefs.h 19345 19346 fi 19347 19348 19349 ac_fn_c_check_decl "$LINENO" "SPLICE_F_FD_IN_FIXED" "ac_cv_have_decl_SPLICE_F_FD_IN_FIXED" "#include <linux/io_uring.h> 19350 " 19351 if test "x$ac_cv_have_decl_SPLICE_F_FD_IN_FIXED" = xyes; then : 19352 $as_echo "#define CFA_HAVE_SPLICE_F_FD_IN_FIXED 1" >>confdefs.h 19297 19353 19298 19354 fi … … 21158 21214 #! $SHELL 21159 21215 # Generated automatically by $as_me ($PACKAGE) $VERSION 21160 # Libtool was configured on host `(hostname || uname -n) 2>/dev/null | sed 1q`:21161 21216 # NOTE: Changes made to this file will be lost: look at ltmain.sh. 21162 21217 -
libcfa/configure.ac
r6dba8755 r95789be 105 105 AM_CONDITIONAL([BUILDLIB], [test "x${CONFIG_BUILDLIB}" = "xyes"]) 106 106 107 AM_T='$(T)' 108 AC_SUBST(AM_T) 109 107 110 #============================================================================== 108 111 #Trasforming cc1 will break compilation … … 129 132 #io_uring 5.6 and later uses probes 130 133 define(ioring_ops, [IORING_OP_NOP,IORING_OP_READV,IORING_OP_WRITEV,IORING_OP_FSYNC,IORING_OP_READ_FIXED,IORING_OP_WRITE_FIXED,IORING_OP_POLL_ADD,IORING_OP_POLL_REMOVE,IORING_OP_SYNC_FILE_RANGE,IORING_OP_SENDMSG,IORING_OP_RECVMSG,IORING_OP_TIMEOUT,IORING_OP_TIMEOUT_REMOVE,IORING_OP_ACCEPT,IORING_OP_ASYNC_CANCEL,IORING_OP_LINK_TIMEOUT,IORING_OP_CONNECT,IORING_OP_FALLOCATE,IORING_OP_OPENAT,IORING_OP_CLOSE,IORING_OP_FILES_UPDATE,IORING_OP_STATX,IORING_OP_READ,IORING_OP_WRITE,IORING_OP_FADVISE,IORING_OP_MADVISE,IORING_OP_SEND,IORING_OP_RECV,IORING_OP_OPENAT2,IORING_OP_EPOLL_CTL,IORING_OP_SPLICE,IORING_OP_PROVIDE_BUFFERS,IORING_OP_REMOVE_BUFFER]) 134 define(ioring_flags, [IOSQE_FIXED_FILE,IOSQE_IO_DRAIN,IOSQE_ASYNC,IOSQE_IO_LINK,IOSQE_IO_HARDLINK,SPLICE_F_FD_IN_FIXED]) 131 135 132 136 define(ioring_from_decls, [ … … 166 170 ioring_from_decls 167 171 ]) 172 173 # check support for various io_uring flags 174 m4_foreach([op], [ioring_flags], [ 175 AC_CHECK_DECL(op, [AC_DEFINE([CFA_HAVE_]op)], [], [[#include <linux/io_uring.h>]]) 176 ]) 168 177 ]) 169 178 AC_CHECK_FUNCS([preadv2 pwritev2]) -
libcfa/prelude/defines.hfa.in
r6dba8755 r95789be 50 50 #undef CFA_HAVE_IORING_OP_REMOVE_BUFFER 51 51 52 #undef CFA_HAVE_IOSQE_FIXED_FILE 53 #undef CFA_HAVE_IOSQE_IO_DRAIN 54 #undef CFA_HAVE_IOSQE_ASYNC 55 #undef CFA_HAVE_IOSQE_IO_LINK 56 #undef CFA_HAVE_IOSQE_IO_HARDLINK 57 #undef CFA_HAVE_SPLICE_F_FD_IN_FIXED 58 52 59 #undef HAVE_PREADV2 53 60 #undef HAVE_PWRITEV2 -
libcfa/src/Makefile.am
r6dba8755 r95789be 51 51 # not all platforms support concurrency, add option do disable it 52 52 thread_headers_nosrc = concurrency/invoke.h 53 thread_headers = concurrency/coroutine.hfa concurrency/thread.hfa concurrency/kernel.hfa concurrency/monitor.hfa concurrency/mutex.hfa 54 thread_libsrc = concurrency/CtxSwitch-@ARCHITECTURE@.S concurrency/alarm.cfa concurrency/invoke.c concurrency/io.cfa concurrency/iocall.cfa concurrency/preemption.cfa concurrency/ready_queue.cfa concurrency/stats.cfa ${thread_headers:.hfa=.cfa} 53 54 thread_headers = concurrency/coroutine.hfa concurrency/thread.hfa concurrency/kernel.hfa \ 55 concurrency/monitor.hfa concurrency/mutex.hfa 56 57 thread_libsrc = concurrency/CtxSwitch-@ARCHITECTURE@.S concurrency/alarm.cfa \ 58 concurrency/invoke.c concurrency/io.cfa concurrency/iocall.cfa \ 59 concurrency/io/setup.cfa \ 60 concurrency/kernel/startup.cfa concurrency/preemption.cfa \ 61 concurrency/ready_queue.cfa concurrency/stats.cfa \ 62 ${thread_headers:.hfa=.cfa} 55 63 else 56 64 headers = -
libcfa/src/bits/debug.hfa
r6dba8755 r95789be 15 15 16 16 #pragma once 17 18 #include <assert.h> 17 19 18 20 #ifdef __CFA_DEBUG__ -
libcfa/src/bits/defs.hfa
r6dba8755 r95789be 16 16 #pragma once 17 17 18 #include <stdbool.h>19 #include <stddef.h>20 18 #include <stdint.h> 21 19 … … 54 52 return ( (unsigned long long)lo)|( ((unsigned long long)hi)<<32 ); 55 53 } 56 57 // #define __CFA_NO_BIT_TEST_AND_SET__58 59 #if defined( __i386 )60 static inline bool __atomic_bts(volatile unsigned long int * target, unsigned long int bit ) {61 #if defined(__CFA_NO_BIT_TEST_AND_SET__)62 unsigned long int mask = 1ul << bit;63 unsigned long int ret = __atomic_fetch_or(target, mask, (int)__ATOMIC_RELAXED);64 return (ret & mask) != 0;65 #else66 int result = 0;67 asm volatile(68 "LOCK btsl %[bit], %[target]\n\t"69 : "=@ccc" (result)70 : [target] "m" (*target), [bit] "r" (bit)71 );72 return result != 0;73 #endif74 }75 76 static inline bool __atomic_btr(volatile unsigned long int * target, unsigned long int bit ) {77 #if defined(__CFA_NO_BIT_TEST_AND_SET__)78 unsigned long int mask = 1ul << bit;79 unsigned long int ret = __atomic_fetch_and(target, ~mask, (int)__ATOMIC_RELAXED);80 return (ret & mask) != 0;81 #else82 int result = 0;83 asm volatile(84 "LOCK btrl %[bit], %[target]\n\t"85 :"=@ccc" (result)86 : [target] "m" (*target), [bit] "r" (bit)87 );88 return result != 0;89 #endif90 }91 #elif defined( __x86_64 )92 static inline bool __atomic_bts(volatile unsigned long long int * target, unsigned long long int bit ) {93 #if defined(__CFA_NO_BIT_TEST_AND_SET__)94 unsigned long long int mask = 1ul << bit;95 unsigned long long int ret = __atomic_fetch_or(target, mask, (int)__ATOMIC_RELAXED);96 return (ret & mask) != 0;97 #else98 int result = 0;99 asm volatile(100 "LOCK btsq %[bit], %[target]\n\t"101 : "=@ccc" (result)102 : [target] "m" (*target), [bit] "r" (bit)103 );104 return result != 0;105 #endif106 }107 108 static inline bool __atomic_btr(volatile unsigned long long int * target, unsigned long long int bit ) {109 #if defined(__CFA_NO_BIT_TEST_AND_SET__)110 unsigned long long int mask = 1ul << bit;111 unsigned long long int ret = __atomic_fetch_and(target, ~mask, (int)__ATOMIC_RELAXED);112 return (ret & mask) != 0;113 #else114 int result = 0;115 asm volatile(116 "LOCK btrq %[bit], %[target]\n\t"117 :"=@ccc" (result)118 : [target] "m" (*target), [bit] "r" (bit)119 );120 return result != 0;121 #endif122 }123 #elif defined( __ARM_ARCH )124 #error __atomic_bts and __atomic_btr not implemented for arm125 #else126 #error uknown hardware architecture127 #endif -
libcfa/src/bits/locks.hfa
r6dba8755 r95789be 164 164 165 165 #undef CHECKED 166 167 struct $thread; 168 extern void park( __cfaabi_dbg_ctx_param ); 169 extern void unpark( struct $thread * this __cfaabi_dbg_ctx_param2 ); 170 static inline struct $thread * active_thread (); 171 172 // Semaphore which only supports a single thread 173 struct single_sem { 174 struct $thread * volatile ptr; 175 }; 176 177 static inline { 178 void ?{}(single_sem & this) { 179 this.ptr = 0p; 180 } 181 182 void ^?{}(single_sem & this) {} 183 184 bool wait(single_sem & this) { 185 for() { 186 struct $thread * expected = this.ptr; 187 if(expected == 1p) { 188 if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 189 return false; 190 } 191 } 192 else { 193 /* paranoid */ verify( expected == 0p ); 194 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 195 park( __cfaabi_dbg_ctx ); 196 return true; 197 } 198 } 199 200 } 201 } 202 203 bool post(single_sem & this) { 204 for() { 205 struct $thread * expected = this.ptr; 206 if(expected == 1p) return false; 207 if(expected == 0p) { 208 if(__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 209 return false; 210 } 211 } 212 else { 213 if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 214 unpark( expected __cfaabi_dbg_ctx2 ); 215 return true; 216 } 217 } 218 } 219 } 220 } 166 221 #endif -
libcfa/src/concurrency/alarm.cfa
r6dba8755 r95789be 23 23 24 24 #include "alarm.hfa" 25 #include "kernel _private.hfa"25 #include "kernel/fwd.hfa" 26 26 #include "preemption.hfa" 27 27 -
libcfa/src/concurrency/invoke.h
r6dba8755 r95789be 17 17 #include "bits/defs.hfa" 18 18 #include "bits/locks.hfa" 19 #include "kernel/fwd.hfa" 19 20 20 21 #ifdef __cforall … … 25 26 #ifndef _INVOKE_H_ 26 27 #define _INVOKE_H_ 27 28 #ifdef __ARM_ARCH29 // function prototypes are only really used by these macros on ARM30 void disable_global_interrupts();31 void enable_global_interrupts();32 33 #define TL_GET( member ) ( { __typeof__( kernelTLS.member ) target; \34 disable_global_interrupts(); \35 target = kernelTLS.member; \36 enable_global_interrupts(); \37 target; } )38 #define TL_SET( member, value ) disable_global_interrupts(); \39 kernelTLS.member = value; \40 enable_global_interrupts();41 #else42 #define TL_GET( member ) kernelTLS.member43 #define TL_SET( member, value ) kernelTLS.member = value;44 #endif45 46 #ifdef __cforall47 extern "Cforall" {48 extern __attribute__((aligned(128))) thread_local struct KernelThreadData {49 struct $thread * volatile this_thread;50 struct processor * volatile this_processor;51 struct __stats_t * volatile this_stats;52 53 struct {54 volatile unsigned short disable_count;55 volatile bool enabled;56 volatile bool in_progress;57 } preemption_state;58 59 #if defined(__SIZEOF_INT128__)60 __uint128_t rand_seed;61 #else62 uint64_t rand_seed;63 #endif64 } kernelTLS __attribute__ ((tls_model ( "initial-exec" )));65 }66 #endif67 28 68 29 struct __stack_context_t { … … 98 59 99 60 enum __Coroutine_State { Halted, Start, Primed, Blocked, Ready, Active }; 100 enum __Preemption_Reason { __NO_PREEMPTION, __ALARM_PREEMPTION, __POLL_PREEMPTION, __MANUAL_PREEMPTION };101 61 102 62 struct $coroutine { -
libcfa/src/concurrency/io.cfa
r6dba8755 r95789be 14 14 // 15 15 16 #define __cforall_thread__ 17 16 18 #if defined(__CFA_DEBUG__) 17 19 // #define __CFA_DEBUG_PRINT_IO__ … … 19 21 #endif 20 22 21 #include "kernel.hfa" 22 #include "bitmanip.hfa" 23 24 #if !defined(CFA_HAVE_LINUX_IO_URING_H) 25 void __kernel_io_startup( cluster &, unsigned, bool ) { 26 // Nothing to do without io_uring 27 } 28 29 void __kernel_io_finish_start( cluster & ) { 30 // Nothing to do without io_uring 31 } 32 33 void __kernel_io_prepare_stop( cluster & ) { 34 // Nothing to do without io_uring 35 } 36 37 void __kernel_io_shutdown( cluster &, bool ) { 38 // Nothing to do without io_uring 39 } 40 41 #else 23 24 #if defined(CFA_HAVE_LINUX_IO_URING_H) 42 25 #define _GNU_SOURCE /* See feature_test_macros(7) */ 43 26 #include <errno.h> 27 #include <signal.h> 44 28 #include <stdint.h> 45 29 #include <string.h> 46 30 #include <unistd.h> 47 #include <sys/mman.h>48 31 49 32 extern "C" { 33 #include <sys/epoll.h> 50 34 #include <sys/syscall.h> 51 35 … … 53 37 } 54 38 55 #include "bits/signal.hfa" 56 #include "kernel_private.hfa" 57 #include "thread.hfa" 58 59 uint32_t entries_per_cluster() { 60 return 256; 61 } 62 63 static void * __io_poller_slow( void * arg ); 64 65 // Weirdly, some systems that do support io_uring don't actually define these 66 #ifdef __alpha__ 67 /* 68 * alpha is the only exception, all other architectures 69 * have common numbers for new system calls. 70 */ 71 #ifndef __NR_io_uring_setup 72 #define __NR_io_uring_setup 535 73 #endif 74 #ifndef __NR_io_uring_enter 75 #define __NR_io_uring_enter 536 76 #endif 77 #ifndef __NR_io_uring_register 78 #define __NR_io_uring_register 537 79 #endif 80 #else /* !__alpha__ */ 81 #ifndef __NR_io_uring_setup 82 #define __NR_io_uring_setup 425 83 #endif 84 #ifndef __NR_io_uring_enter 85 #define __NR_io_uring_enter 426 86 #endif 87 #ifndef __NR_io_uring_register 88 #define __NR_io_uring_register 427 89 #endif 90 #endif 91 92 // Fast poller user-thread 93 // Not using the "thread" keyword because we want to control 94 // more carefully when to start/stop it 95 struct __io_poller_fast { 96 struct __io_data * ring; 97 $thread thrd; 98 }; 99 100 void ?{}( __io_poller_fast & this, struct cluster & cltr ) { 101 this.ring = cltr.io; 102 (this.thrd){ "Fast I/O Poller", cltr }; 103 } 104 void ^?{}( __io_poller_fast & mutex this ); 105 void main( __io_poller_fast & this ); 106 static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; } 107 void ^?{}( __io_poller_fast & mutex this ) {} 108 109 struct __submition_data { 110 // Head and tail of the ring (associated with array) 111 volatile uint32_t * head; 112 volatile uint32_t * tail; 113 volatile uint32_t prev_head; 114 115 // The actual kernel ring which uses head/tail 116 // indexes into the sqes arrays 117 uint32_t * array; 118 119 // number of entries and mask to go with it 120 const uint32_t * num; 121 const uint32_t * mask; 122 123 // Submission flags (Not sure what for) 124 uint32_t * flags; 125 126 // number of sqes not submitted (whatever that means) 127 uint32_t * dropped; 128 129 // Like head/tail but not seen by the kernel 130 volatile uint32_t * ready; 131 uint32_t ready_cnt; 132 133 __spinlock_t lock; 134 __spinlock_t release_lock; 135 136 // A buffer of sqes (not the actual ring) 137 struct io_uring_sqe * sqes; 138 139 // The location and size of the mmaped area 140 void * ring_ptr; 141 size_t ring_sz; 142 }; 143 144 struct __completion_data { 145 // Head and tail of the ring 146 volatile uint32_t * head; 147 volatile uint32_t * tail; 148 149 // number of entries and mask to go with it 150 const uint32_t * mask; 151 const uint32_t * num; 152 153 // number of cqes not submitted (whatever that means) 154 uint32_t * overflow; 155 156 // the kernel ring 157 struct io_uring_cqe * cqes; 158 159 // The location and size of the mmaped area 160 void * ring_ptr; 161 size_t ring_sz; 162 }; 163 164 struct __io_data { 165 struct __submition_data submit_q; 166 struct __completion_data completion_q; 167 uint32_t ring_flags; 168 int cltr_flags; 169 int fd; 170 semaphore submit; 171 volatile bool done; 172 struct { 173 struct { 174 __processor_id_t id; 175 void * stack; 176 pthread_t kthrd; 177 volatile bool blocked; 178 } slow; 179 __io_poller_fast fast; 180 __bin_sem_t sem; 181 } poller; 182 }; 183 184 //============================================================================================= 185 // I/O Startup / Shutdown logic 186 //============================================================================================= 187 void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) { 188 if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) { 189 abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n"); 190 } 191 192 this.io = malloc(); 193 194 // Step 1 : call to setup 195 struct io_uring_params params; 196 memset(¶ms, 0, sizeof(params)); 197 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS ) params.flags |= IORING_SETUP_SQPOLL; 198 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES ) params.flags |= IORING_SETUP_IOPOLL; 199 200 uint32_t nentries = entries_per_cluster(); 201 202 int fd = syscall(__NR_io_uring_setup, nentries, ¶ms ); 203 if(fd < 0) { 204 abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno)); 205 } 206 207 // Step 2 : mmap result 208 memset( this.io, 0, sizeof(struct __io_data) ); 209 struct __submition_data & sq = this.io->submit_q; 210 struct __completion_data & cq = this.io->completion_q; 211 212 // calculate the right ring size 213 sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) ); 214 cq.ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe)); 215 216 // Requires features 217 #if defined(IORING_FEAT_SINGLE_MMAP) 218 // adjust the size according to the parameters 219 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 220 cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz); 221 } 222 #endif 223 224 // mmap the Submit Queue into existence 225 sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING); 226 if (sq.ring_ptr == (void*)MAP_FAILED) { 227 abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno)); 228 } 229 230 // Requires features 231 #if defined(IORING_FEAT_SINGLE_MMAP) 232 // mmap the Completion Queue into existence (may or may not be needed) 233 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 234 cq.ring_ptr = sq.ring_ptr; 235 } 236 else 237 #endif 238 { 239 // We need multiple call to MMAP 240 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING); 241 if (cq.ring_ptr == (void*)MAP_FAILED) { 242 munmap(sq.ring_ptr, sq.ring_sz); 243 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno)); 244 } 245 } 246 247 // mmap the submit queue entries 248 size_t size = params.sq_entries * sizeof(struct io_uring_sqe); 249 sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES); 250 if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) { 251 munmap(sq.ring_ptr, sq.ring_sz); 252 if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz); 253 abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno)); 254 } 255 256 // Get the pointers from the kernel to fill the structure 257 // submit queue 258 sq.head = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head); 259 sq.tail = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail); 260 sq.mask = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask); 261 sq.num = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries); 262 sq.flags = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags); 263 sq.dropped = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 264 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 265 sq.prev_head = *sq.head; 266 267 { 268 const uint32_t num = *sq.num; 269 for( i; num ) { 270 sq.sqes[i].user_data = 0ul64; 271 } 272 } 273 274 (sq.lock){}; 275 (sq.release_lock){}; 276 277 if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) { 278 /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8) ); 279 sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8); 280 sq.ready = alloc_align( 64, sq.ready_cnt ); 281 for(i; sq.ready_cnt) { 282 sq.ready[i] = -1ul32; 283 } 284 } 285 else { 286 sq.ready_cnt = 0; 287 sq.ready = 0p; 288 } 289 290 // completion queue 291 cq.head = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head); 292 cq.tail = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail); 293 cq.mask = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask); 294 cq.num = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries); 295 cq.overflow = ( uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow); 296 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes); 297 298 // some paranoid checks 299 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask ); 300 /* paranoid */ verifyf( (*cq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num ); 301 /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head ); 302 /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail ); 303 304 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask ); 305 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num ); 306 /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head ); 307 /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail ); 308 309 // Update the global ring info 310 this.io->ring_flags = params.flags; 311 this.io->cltr_flags = io_flags; 312 this.io->fd = fd; 313 this.io->done = false; 314 (this.io->submit){ min(*sq.num, *cq.num) }; 315 316 if(!main_cluster) { 317 __kernel_io_finish_start( this ); 318 } 319 } 320 321 void __kernel_io_finish_start( cluster & this ) { 322 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 323 __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this); 324 (this.io->poller.fast){ this }; 325 __thrd_start( this.io->poller.fast, main ); 326 } 327 328 // Create the poller thread 329 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this); 330 this.io->poller.slow.blocked = false; 331 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this ); 332 } 333 334 void __kernel_io_prepare_stop( cluster & this ) { 335 __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this); 336 // Notify the poller thread of the shutdown 337 __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST); 338 339 // Stop the IO Poller 340 sigval val = { 1 }; 341 pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val ); 342 post( this.io->poller.sem ); 343 344 // Wait for the poller thread to finish 345 pthread_join( this.io->poller.slow.kthrd, 0p ); 346 free( this.io->poller.slow.stack ); 347 348 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this); 349 350 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 351 with( this.io->poller.fast ) { 352 /* paranoid */ verify( this.nprocessors == 0 || &this == mainCluster ); 353 /* paranoid */ verify( !ready_mutate_islocked() ); 354 355 // We need to adjust the clean-up based on where the thread is 356 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 357 358 ready_schedule_lock( (struct __processor_id_t *)active_processor() ); 359 360 // This is the tricky case 361 // The thread was preempted and now it is on the ready queue 362 // The thread should be the last on the list 363 /* paranoid */ verify( thrd.link.next != 0p ); 364 365 // Remove the thread from the ready queue of this cluster 366 __attribute__((unused)) bool removed = remove_head( &this, &thrd ); 367 /* paranoid */ verify( removed ); 368 thrd.link.next = 0p; 369 thrd.link.prev = 0p; 370 __cfaabi_dbg_debug_do( thrd.unpark_stale = true ); 371 372 // Fixup the thread state 373 thrd.state = Blocked; 374 thrd.ticket = 0; 375 thrd.preempted = __NO_PREEMPTION; 376 377 ready_schedule_unlock( (struct __processor_id_t *)active_processor() ); 378 379 // Pretend like the thread was blocked all along 380 } 381 // !!! This is not an else if !!! 382 if( thrd.state == Blocked ) { 383 384 // This is the "easy case" 385 // The thread is parked and can easily be moved to active cluster 386 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster ); 387 thrd.curr_cluster = active_cluster(); 388 389 // unpark the fast io_poller 390 unpark( &thrd __cfaabi_dbg_ctx2 ); 391 } 392 else { 393 394 // The thread is in a weird state 395 // I don't know what to do here 396 abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n"); 397 } 398 399 } 400 401 ^(this.io->poller.fast){}; 402 403 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this); 404 } 405 } 406 407 void __kernel_io_shutdown( cluster & this, bool main_cluster ) { 408 if(!main_cluster) { 409 __kernel_io_prepare_stop( this ); 410 } 411 412 // Shutdown the io rings 413 struct __submition_data & sq = this.io->submit_q; 414 struct __completion_data & cq = this.io->completion_q; 415 416 // unmap the submit queue entries 417 munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe)); 418 419 // unmap the Submit Queue ring 420 munmap(sq.ring_ptr, sq.ring_sz); 421 422 // unmap the Completion Queue ring, if it is different 423 if (cq.ring_ptr != sq.ring_ptr) { 424 munmap(cq.ring_ptr, cq.ring_sz); 425 } 426 427 // close the file descriptor 428 close(this.io->fd); 429 430 free( this.io->submit_q.ready ); // Maybe null, doesn't matter 431 free( this.io ); 432 } 433 434 int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) { 39 #include "stats.hfa" 40 #include "kernel.hfa" 41 #include "kernel/fwd.hfa" 42 #include "io/types.hfa" 43 44 //============================================================================================= 45 // I/O Syscall 46 //============================================================================================= 47 static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) { 435 48 bool need_sys_to_submit = false; 436 49 bool need_sys_to_complete = false; 437 unsigned min_complete = 0;438 50 unsigned flags = 0; 439 440 51 441 52 TO_SUBMIT: … … 451 62 } 452 63 453 TO_COMPLETE:454 64 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 455 65 flags |= IORING_ENTER_GETEVENTS; 456 if( mask ) {457 need_sys_to_complete = true;458 min_complete = 1;459 break TO_COMPLETE;460 }461 66 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) { 462 67 need_sys_to_complete = true; … … 466 71 int ret = 0; 467 72 if( need_sys_to_submit || need_sys_to_complete ) { 468 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);73 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, 0p, _NSIG / 8); 469 74 if( ret < 0 ) { 470 75 switch((int)errno) { … … 490 95 static uint32_t __release_consumed_submission( struct __io_data & ring ); 491 96 492 static inline void process(struct io_uring_cqe & cqe , struct __processor_id_t * id) {97 static inline void process(struct io_uring_cqe & cqe ) { 493 98 struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data; 494 99 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", data, cqe.res, data->thrd ); 495 100 496 101 data->result = cqe.res; 497 if(!id) { unpark( data->thrd __cfaabi_dbg_ctx2 ); } 498 else { __unpark( id, data->thrd __cfaabi_dbg_ctx2 ); } 102 unpark( data->thrd __cfaabi_dbg_ctx2 ); 499 103 } 500 104 501 105 // Process a single completion message from the io_uring 502 106 // This is NOT thread-safe 503 static [int, bool] __drain_io( & struct __io_data ring , * sigset_t mask) {107 static [int, bool] __drain_io( & struct __io_data ring ) { 504 108 /* paranoid */ verify( !kernelTLS.preemption_state.enabled ); 505 109 506 110 unsigned to_submit = 0; 507 if( ring. cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) {111 if( ring.poller_submits ) { 508 112 // If the poller thread also submits, then we need to aggregate the submissions which are ready 509 113 to_submit = __collect_submitions( ring ); 510 114 } 511 115 512 int ret = __io_uring_enter(ring, to_submit, true , mask);116 int ret = __io_uring_enter(ring, to_submit, true); 513 117 if( ret < 0 ) { 514 118 return [0, true]; … … 547 151 /* paranoid */ verify(&cqe); 548 152 549 process( cqe, !mask ? (struct __processor_id_t *)0p : &ring.poller.slow.id ); 550 } 551 552 // Allow new submissions to happen 553 // V(ring.submit, count); 153 process( cqe ); 154 } 554 155 555 156 // Mark to the kernel that the cqe has been seen … … 561 162 } 562 163 563 static void * __io_poller_slow( void * arg ) { 564 #if !defined( __CFA_NO_STATISTICS__ ) 565 __stats_t local_stats; 566 __init_stats( &local_stats ); 567 kernelTLS.this_stats = &local_stats; 568 #endif 569 570 cluster * cltr = (cluster *)arg; 571 struct __io_data & ring = *cltr->io; 572 573 ring.poller.slow.id.id = doregister( &ring.poller.slow.id ); 574 575 sigset_t mask; 576 sigfillset(&mask); 577 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) { 578 abort( "KERNEL ERROR: IO_URING - pthread_sigmask" ); 579 } 580 581 sigdelset( &mask, SIGUSR1 ); 582 583 verify( (*ring.submit_q.head) == (*ring.submit_q.tail) ); 584 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) ); 585 586 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring); 587 588 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 589 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 590 591 __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST ); 592 593 // In the user-thread approach drain and if anything was drained, 594 // batton pass to the user-thread 595 int count; 596 bool again; 597 [count, again] = __drain_io( ring, &mask ); 598 599 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST ); 600 601 // Update statistics 602 __STATS__( true, 603 io.complete_q.completed_avg.val += count; 604 io.complete_q.completed_avg.slow_cnt += 1; 605 ) 606 607 if(again) { 608 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring); 609 __unpark( &ring.poller.slow.id, &ring.poller.fast.thrd __cfaabi_dbg_ctx2 ); 610 wait( ring.poller.sem ); 611 } 612 } 613 } 614 else { 615 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 616 //In the naive approach, just poll the io completion queue directly 617 int count; 618 bool again; 619 [count, again] = __drain_io( ring, &mask ); 620 621 // Update statistics 622 __STATS__( true, 623 io.complete_q.completed_avg.val += count; 624 io.complete_q.completed_avg.slow_cnt += 1; 625 ) 626 } 627 } 628 629 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring); 630 631 unregister( &ring.poller.slow.id ); 632 633 #if !defined(__CFA_NO_STATISTICS__) 634 __tally_stats(cltr->stats, &local_stats); 635 #endif 636 637 return 0p; 638 } 639 640 void main( __io_poller_fast & this ) { 641 verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ); 642 643 // Start parked 644 park( __cfaabi_dbg_ctx ); 645 646 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring); 164 void main( $io_ctx_thread & this ) { 165 epoll_event ev; 166 __ioctx_register( this, ev ); 167 168 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring); 647 169 648 170 int reset = 0; 649 650 171 // Then loop until we need to start 651 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) { 652 172 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) { 653 173 // Drain the io 654 174 int count; 655 175 bool again; 656 176 disable_interrupts(); 657 [count, again] = __drain_io( *this.ring , 0p);177 [count, again] = __drain_io( *this.ring ); 658 178 659 179 if(!again) reset++; … … 672 192 // We didn't get anything baton pass to the slow poller 673 193 else { 674 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);194 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self); 675 195 reset = 0; 676 196 677 // wake up the slow poller 678 post( this.ring->poller.sem ); 679 680 // park this thread 681 park( __cfaabi_dbg_ctx ); 197 // block this thread 198 __ioctx_prepare_block( this, ev ); 199 wait( this.sem ); 682 200 } 683 201 } 684 202 685 203 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring); 686 }687 688 static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));689 static inline void __wake_poller( struct __io_data & ring ) {690 if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;691 692 sigval val = { 1 };693 pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );694 204 } 695 205 … … 806 316 } 807 317 808 void __submit( struct __io_data & ring, uint32_t idx ) { 318 void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1))) { 319 __io_data & ring = *ctx->thrd.ring; 809 320 // Get now the data we definetely need 810 321 uint32_t * const tail = ring.submit_q.tail; 811 const uint32_t mask = *ring.submit_q.mask;322 const uint32_t mask = *ring.submit_q.mask; 812 323 813 324 // There are 2 submission schemes, check which one we are using 814 if( ring. cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) {325 if( ring.poller_submits ) { 815 326 // If the poller thread submits, then we just need to add this to the ready array 816 327 __submit_to_ready_array( ring, idx, mask ); 817 328 818 __wake_poller( ring);329 post( ctx->thrd.sem ); 819 330 820 331 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() ); 821 332 } 822 else if( ring. cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) {333 else if( ring.eager_submits ) { 823 334 uint32_t picked = __submit_to_ready_array( ring, idx, mask ); 824 335 … … 849 360 // We got the lock 850 361 unsigned to_submit = __collect_submitions( ring ); 851 int ret = __io_uring_enter( ring, to_submit, false , 0p);362 int ret = __io_uring_enter( ring, to_submit, false ); 852 363 if( ret < 0 ) { 853 364 unlock(ring.submit_q.lock); … … 892 403 893 404 // Submit however, many entries need to be submitted 894 int ret = __io_uring_enter( ring, 1, false , 0p);405 int ret = __io_uring_enter( ring, 1, false ); 895 406 if( ret < 0 ) { 896 407 switch((int)errno) { … … 958 469 return count; 959 470 } 960 961 //=============================================================================================962 // I/O Submissions963 //=============================================================================================964 965 void register_fixed_files( cluster & cl, int * files, unsigned count ) {966 int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );967 if( ret < 0 ) {968 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );969 }970 971 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );972 }973 471 #endif -
libcfa/src/concurrency/iocall.cfa
r6dba8755 r95789be 14 14 // 15 15 16 #define __cforall_thread__ 17 16 18 #include "bits/defs.hfa" 17 19 … … 21 23 22 24 #if defined(CFA_HAVE_LINUX_IO_URING_H) 25 #include <assert.h> 23 26 #include <stdint.h> 27 #include <errno.h> 24 28 #include <linux/io_uring.h> 25 29 26 #include "kernel_private.hfa" 30 #include "kernel.hfa" 31 #include "kernel/fwd.hfa" 32 #include "io/types.hfa" 27 33 28 34 extern [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ); 29 extern void __submit( struct __io_data & ring, uint32_t idx);35 extern void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1))); 30 36 31 37 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) { … … 52 58 } 53 59 60 static inline io_context * __get_io_context( void ) { 61 cluster * cltr = active_cluster(); 62 /* paranoid */ verifyf( cltr, "No active cluster for io operation\n"); 63 assertf( cltr->io.cnt > 0, "Cluster %p has no default io contexts and no context was specified\n", cltr ); 64 /* paranoid */ verifyf( cltr->io.ctxs, "default io contexts for cluster %p are missing\n", cltr); 65 return &cltr->io.ctxs[ __tls_rand() % cltr->io.cnt ]; 66 } 67 68 69 #if defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_IO_DRAIN) && defined(CFA_HAVE_IOSQE_ASYNC) 70 #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_DRAIN | IOSQE_ASYNC) 71 #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_ASYNC) 72 #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_ASYNC) 73 #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_IO_DRAIN) 74 #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_DRAIN) 75 #elif defined(CFA_HAVE_IOSQE_IO_DRAIN) && defined(CFA_HAVE_IOSQE_ASYNC) 76 #define REGULAR_FLAGS (IOSQE_IO_DRAIN | IOSQE_ASYNC) 77 #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) 78 #define REGULAR_FLAGS (IOSQE_FIXED_FILE) 79 #elif defined(CFA_HAVE_IOSQE_IO_DRAIN) 80 #define REGULAR_FLAGS (IOSQE_IO_DRAIN) 81 #elif defined(CFA_HAVE_IOSQE_ASYNC) 82 #define REGULAR_FLAGS (IOSQE_ASYNC) 83 #else 84 #define REGULAR_FLAGS (0) 85 #endif 86 87 #if defined(CFA_HAVE_IOSQE_IO_LINK) && defined(CFA_HAVE_IOSQE_IO_HARDLINK) 88 #define LINK_FLAGS (IOSQE_IO_LINK | IOSQE_IO_HARDLINK) 89 #elif defined(CFA_HAVE_IOSQE_IO_LINK) 90 #define LINK_FLAGS (IOSQE_IO_LINK) 91 #elif defined(CFA_HAVE_IOSQE_IO_HARDLINK) 92 #define LINK_FLAGS (IOSQE_IO_HARDLINK) 93 #else 94 #define LINK_FLAGS (0) 95 #endif 96 97 #if defined(CFA_HAVE_SPLICE_F_FD_IN_FIXED) 98 #define SPLICE_FLAGS (SPLICE_F_FD_IN_FIXED) 99 #else 100 #define SPLICE_FLAGS (0) 101 #endif 102 103 54 104 #define __submit_prelude \ 105 if( 0 != (submit_flags & LINK_FLAGS) ) { errno = ENOTSUP; return -1; } \ 106 (void)timeout; (void)cancellation; \ 107 if( !context ) context = __get_io_context(); \ 55 108 __io_user_data_t data = { 0, active_thread() }; \ 56 struct __io_data & ring = * data.thrd->curr_cluster->io; \109 struct __io_data & ring = *context->thrd.ring; \ 57 110 struct io_uring_sqe * sqe; \ 58 111 uint32_t idx; \ 59 [sqe, idx] = __submit_alloc( ring, (uint64_t)(uintptr_t)&data ); 112 [sqe, idx] = __submit_alloc( ring, (uint64_t)(uintptr_t)&data ); \ 113 sqe->flags = REGULAR_FLAGS & submit_flags; 60 114 61 115 #define __submit_wait \ 62 116 /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \ 63 117 verify( sqe->user_data == (uint64_t)(uintptr_t)&data ); \ 64 __submit( ring, idx ); \118 __submit( context, idx ); \ 65 119 park( __cfaabi_dbg_ctx ); \ 120 if( data.result < 0 ) { \ 121 errno = -data.result; \ 122 return -1; \ 123 } \ 66 124 return data.result; 67 125 #endif … … 70 128 // I/O Forwards 71 129 //============================================================================================= 130 #include <time.hfa> 72 131 73 132 // Some forward declarations … … 121 180 // Asynchronous operations 122 181 #if defined(HAVE_PREADV2) 123 ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags ) {182 ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 124 183 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READV) 125 184 return preadv2(fd, iov, iovcnt, offset, flags); … … 132 191 #endif 133 192 } 134 135 ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 136 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READV) 137 return preadv2(fd, iov, iovcnt, offset, flags); 193 #endif 194 195 #if defined(HAVE_PWRITEV2) 196 ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 197 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITEV) 198 return pwritev2(fd, iov, iovcnt, offset, flags); 138 199 #else 139 200 __submit_prelude 140 201 141 (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset }; 142 sqe->flags |= IOSQE_FIXED_FILE; 202 (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset }; 143 203 144 204 __submit_wait … … 147 207 #endif 148 208 149 #if defined(HAVE_PWRITEV2) 150 ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 151 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITEV) 152 return pwritev2(fd, iov, iovcnt, offset, flags); 153 #else 154 __submit_prelude 155 156 (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset }; 157 158 __submit_wait 159 #endif 160 } 161 #endif 162 163 int cfa_fsync(int fd) { 209 int cfa_fsync(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 164 210 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FSYNC) 165 211 return fsync(fd); … … 173 219 } 174 220 175 int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags ) {221 int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 176 222 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SYNC_FILE_RANGE) 177 223 return sync_file_range(fd, offset, nbytes, flags); … … 189 235 190 236 191 ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags ) {237 ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 192 238 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SENDMSG) 193 239 return sendmsg(sockfd, msg, flags); … … 202 248 } 203 249 204 ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags ) {250 ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 205 251 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_RECVMSG) 206 252 return recvmsg(sockfd, msg, flags); … … 215 261 } 216 262 217 ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags ) {263 ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 218 264 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SEND) 219 265 return send( sockfd, buf, len, flags ); … … 230 276 } 231 277 232 ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags ) {278 ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 233 279 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_RECV) 234 280 return recv( sockfd, buf, len, flags ); … … 245 291 } 246 292 247 int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags ) {293 int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 248 294 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ACCEPT) 249 295 return accept4( sockfd, addr, addrlen, flags ); … … 260 306 } 261 307 262 int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen ) {308 int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 263 309 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_CONNECT) 264 310 return connect( sockfd, addr, addrlen ); … … 274 320 } 275 321 276 int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len ) {322 int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 277 323 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FALLOCATE) 278 324 return fallocate( fd, mode, offset, len ); … … 289 335 } 290 336 291 int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice ) {337 int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 292 338 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FADVISE) 293 339 return posix_fadvise( fd, offset, len, advice ); … … 304 350 } 305 351 306 int cfa_madvise(void *addr, size_t length, int advice ) {352 int cfa_madvise(void *addr, size_t length, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 307 353 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_MADVISE) 308 354 return madvise( addr, length, advice ); … … 319 365 } 320 366 321 int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode ) {367 int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 322 368 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_OPENAT) 323 369 return openat( dirfd, pathname, flags, mode ); … … 334 380 } 335 381 336 int cfa_close(int fd ) {382 int cfa_close(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 337 383 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_CLOSE) 338 384 return close( fd ); … … 348 394 // Forward declare in case it is not supported 349 395 struct statx; 350 int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf ) {396 int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 351 397 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_STATX) 352 398 #if defined(__NR_statx) … … 360 406 361 407 (*sqe){ IORING_OP_STATX, dirfd, pathname, mask, (uint64_t)statxbuf }; 362 sqe-> flags = flags;363 364 __submit_wait 365 #endif 366 } 367 368 ssize_t cfa_read(int fd, void *buf, size_t count ) {408 sqe->statx_flags = flags; 409 410 __submit_wait 411 #endif 412 } 413 414 ssize_t cfa_read(int fd, void *buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 369 415 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READ) 370 416 return read( fd, buf, count ); … … 378 424 } 379 425 380 ssize_t cfa_write(int fd, void *buf, size_t count ) {426 ssize_t cfa_write(int fd, void *buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 381 427 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITE) 382 428 return read( fd, buf, count ); … … 390 436 } 391 437 392 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags ) {438 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 393 439 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SPLICE) 394 440 return splice( fd_in, off_in, fd_out, off_out, len, flags ); … … 399 445 sqe->splice_fd_in = fd_in; 400 446 sqe->splice_off_in = off_in; 401 sqe->splice_flags = flags; 402 403 __submit_wait 404 #endif 405 } 406 407 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int in_flags, int out_flags) { 408 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SPLICE) 409 return splice( fd_in, off_in, fd_out, off_out, len, flags ); 410 #else 411 __submit_prelude 412 413 (*sqe){ IORING_OP_SPLICE, fd_out, 0p, len, off_out }; 414 sqe->splice_fd_in = fd_in; 415 sqe->splice_off_in = off_in; 416 sqe->splice_flags = flags | out_flags; 417 sqe->flags = in_flags; 418 419 __submit_wait 420 #endif 421 } 422 423 ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags) { 447 sqe->splice_flags = flags | (SPLICE_FLAGS & submit_flags); 448 449 __submit_wait 450 #endif 451 } 452 453 ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 424 454 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_TEE) 425 455 return tee( fd_in, fd_out, len, flags ); … … 429 459 (*sqe){ IORING_OP_TEE, fd_out, 0p, len, 0 }; 430 460 sqe->splice_fd_in = fd_in; 431 sqe->splice_flags = flags;461 sqe->splice_flags = flags | (SPLICE_FLAGS & submit_flags); 432 462 433 463 __submit_wait … … 536 566 537 567 if( /*func == (fptr_t)splice || */ 538 func == (fptr_t)(ssize_t (*)(int, loff_t *, int, loff_t *, size_t, unsigned int))cfa_splice, 539 func == (fptr_t)(ssize_t (*)(int, loff_t *, int, loff_t *, size_t, unsigned int, int, int))cfa_splice ) 568 func == (fptr_t)cfa_splice ) 540 569 #define _CFA_IO_FEATURE_CFA_HAVE_IORING_OP_SPLICE , 541 570 return IS_DEFINED(CFA_HAVE_IORING_OP_SPLICE); -
libcfa/src/concurrency/iofwd.hfa
r6dba8755 r95789be 19 19 extern "C" { 20 20 #include <sys/types.h> 21 #if CFA_HAVE_LINUX_IO_URING_H 22 #include <linux/io_uring.h> 23 #endif 21 24 } 22 25 #include "bits/defs.hfa" 26 #include "time.hfa" 27 28 #if defined(CFA_HAVE_IOSQE_FIXED_FILE) 29 #define CFA_IO_FIXED_FD1 IOSQE_FIXED_FILE 30 #endif 31 #if defined(CFA_HAVE_SPLICE_F_FD_IN_FIXED) 32 #define CFA_IO_FIXED_FD2 SPLICE_F_FD_IN_FIXED 33 #endif 34 #if defined(CFA_HAVE_IOSQE_IO_DRAIN) 35 #define CFA_IO_DRAIN IOSQE_IO_DRAIN 36 #endif 37 #if defined(CFA_HAVE_IOSQE_ASYNC) 38 #define CFA_IO_ASYNC IOSQE_ASYNC 39 #endif 40 41 struct cluster; 42 struct io_context; 43 struct io_cancellation; 23 44 24 45 struct iovec; … … 27 48 struct statx; 28 49 29 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags );30 extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags );31 extern int cfa_fsync(int fd );32 extern int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags );33 extern ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags );34 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags );35 extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags );36 extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags );37 extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags );38 extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen );39 extern int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len );40 extern int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice );41 extern int cfa_madvise(void *addr, size_t length, int advice );42 extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode );43 extern int cfa_close(int fd );44 extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf );45 extern ssize_t cfa_read(int fd, void *buf, size_t count );46 extern ssize_t cfa_write(int fd, void *buf, size_t count );47 extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags );48 extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags );50 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 51 extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 52 extern int cfa_fsync(int fd, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 53 extern int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 54 extern ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 55 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 56 extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 57 extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 58 extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 59 extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 60 extern int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 61 extern int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 62 extern int cfa_madvise(void *addr, size_t length, int advice, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 63 extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 64 extern int cfa_close(int fd, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 65 extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 66 extern ssize_t cfa_read(int fd, void *buf, size_t count, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 67 extern ssize_t cfa_write(int fd, void *buf, size_t count, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 68 extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 69 extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 49 70 50 71 //----------------------------------------------------------------------------- 51 72 // Check if a function is blocks a only the user thread 52 73 bool has_user_level_blocking( fptr_t func ); 74 75 //----------------------------------------------------------------------------- 76 void register_fixed_files( io_context & ctx , int * files, unsigned count ); 77 void register_fixed_files( cluster & cltr, int * files, unsigned count ); -
libcfa/src/concurrency/kernel.cfa
r6dba8755 r95789be 18 18 19 19 //C Includes 20 #include <stddef.h>21 20 #include <errno.h> 22 #include <string.h>23 21 #include <stdio.h> 24 #include <fenv.h>25 22 #include <signal.h> 26 23 #include <unistd.h> 27 #include <limits.h> // PTHREAD_STACK_MIN28 #include <sys/mman.h> // mprotect29 extern "C" {30 #include <sys/resource.h>31 }32 24 33 25 //CFA Includes 34 #include "time.hfa"35 26 #include "kernel_private.hfa" 36 27 #include "preemption.hfa" 37 #include "startup.hfa"38 28 39 29 //Private includes … … 45 35 // Some assembly required 46 36 #if defined( __i386 ) 47 #define CtxGet( ctx ) \48 __asm__ volatile ( \49 "movl %%esp,%0\n"\50 "movl %%ebp,%1\n"\51 : "=rm" (ctx.SP),\52 "=rm" (ctx.FP) \53 )54 55 37 // mxcr : SSE Status and Control bits (control bits are preserved across function calls) 56 38 // fcw : X87 FPU control word (preserved across function calls) … … 74 56 75 57 #elif defined( __x86_64 ) 76 #define CtxGet( ctx ) \77 __asm__ volatile ( \78 "movq %%rsp,%0\n"\79 "movq %%rbp,%1\n"\80 : "=rm" (ctx.SP),\81 "=rm" (ctx.FP) \82 )83 84 58 #define __x87_store \ 85 59 uint32_t __mxcr; \ … … 102 76 103 77 #elif defined( __ARM_ARCH ) 104 #define CtxGet( ctx ) __asm__ ( \105 "mov %0,%%sp\n" \106 "mov %1,%%r11\n" \107 : "=rm" (ctx.SP), "=rm" (ctx.FP) )108 78 #else 109 79 #error unknown hardware architecture 110 80 #endif 111 81 112 //----------------------------------------------------------------------------- 113 //Start and stop routine for the kernel, declared first to make sure they run first 114 static void __kernel_startup (void) __attribute__(( constructor( STARTUP_PRIORITY_KERNEL ) )); 115 static void __kernel_shutdown(void) __attribute__(( destructor ( STARTUP_PRIORITY_KERNEL ) )); 82 extern $thread * mainThread; 83 extern processor * mainProcessor; 116 84 117 85 //----------------------------------------------------------------------------- … … 120 88 static bool __has_next_thread(cluster * this); 121 89 static void __run_thread(processor * this, $thread * dst); 122 static bool __wake_proc(processor *);123 90 static bool __wake_one(struct __processor_id_t * id, cluster * cltr); 124 91 static void __halt(processor * this); 125 126 //----------------------------------------------------------------------------- 127 // Kernel storage 128 KERNEL_STORAGE(cluster, mainCluster); 129 KERNEL_STORAGE(processor, mainProcessor); 130 KERNEL_STORAGE($thread, mainThread); 131 KERNEL_STORAGE(__stack_t, mainThreadCtx); 132 KERNEL_STORAGE(__scheduler_RWLock_t, __scheduler_lock); 133 #if !defined(__CFA_NO_STATISTICS__) 134 KERNEL_STORAGE(__stats_t, mainProcStats); 135 #endif 136 137 cluster * mainCluster; 138 processor * mainProcessor; 139 $thread * mainThread; 140 __scheduler_RWLock_t * __scheduler_lock; 141 142 extern "C" { 143 struct { __dllist_t(cluster) list; __spinlock_t lock; } __cfa_dbg_global_clusters; 144 } 145 146 size_t __page_size = 0; 147 148 //----------------------------------------------------------------------------- 149 // Global state 150 thread_local struct KernelThreadData kernelTLS __attribute__ ((tls_model ( "initial-exec" ))) @= { 151 NULL, // cannot use 0p 152 NULL, 153 NULL, 154 { 1, false, false }, 155 }; 156 157 //----------------------------------------------------------------------------- 158 // Struct to steal stack 159 struct current_stack_info_t { 160 __stack_t * storage; // pointer to stack object 161 void * base; // base of stack 162 void * limit; // stack grows towards stack limit 163 void * context; // address of cfa_context_t 164 }; 165 166 void ?{}( current_stack_info_t & this ) { 167 __stack_context_t ctx; 168 CtxGet( ctx ); 169 this.base = ctx.FP; 170 171 rlimit r; 172 getrlimit( RLIMIT_STACK, &r); 173 size_t size = r.rlim_cur; 174 175 this.limit = (void *)(((intptr_t)this.base) - size); 176 this.context = &storage_mainThreadCtx; 177 } 178 179 //----------------------------------------------------------------------------- 180 // Main thread construction 181 182 void ?{}( $coroutine & this, current_stack_info_t * info) with( this ) { 183 stack.storage = info->storage; 184 with(*stack.storage) { 185 limit = info->limit; 186 base = info->base; 187 } 188 __attribute__((may_alias)) intptr_t * istorage = (intptr_t*) &stack.storage; 189 *istorage |= 0x1; 190 name = "Main Thread"; 191 state = Start; 192 starter = 0p; 193 last = 0p; 194 cancellation = 0p; 195 } 196 197 void ?{}( $thread & this, current_stack_info_t * info) with( this ) { 198 ticket = 1; 199 state = Start; 200 self_cor{ info }; 201 curr_cor = &self_cor; 202 curr_cluster = mainCluster; 203 self_mon.owner = &this; 204 self_mon.recursion = 1; 205 self_mon_p = &self_mon; 206 link.next = 0p; 207 link.prev = 0p; 208 209 node.next = 0p; 210 node.prev = 0p; 211 doregister(curr_cluster, this); 212 213 monitors{ &self_mon_p, 1, (fptr_t)0 }; 214 } 215 216 //----------------------------------------------------------------------------- 217 // Processor coroutine 218 void ?{}(processorCtx_t & this) { 219 220 } 221 222 // Construct the processor context of non-main processors 223 static void ?{}(processorCtx_t & this, processor * proc, current_stack_info_t * info) { 224 (this.__cor){ info }; 225 this.proc = proc; 226 } 227 228 static void * __invoke_processor(void * arg); 229 230 static init(processor & this, const char name[], cluster & _cltr) with( this ) { 231 this.name = name; 232 this.cltr = &_cltr; 233 id = -1u; 234 destroyer = 0p; 235 do_terminate = false; 236 preemption_alarm = 0p; 237 pending_preemption = false; 238 239 #if !defined(__CFA_NO_STATISTICS__) 240 print_stats = 0; 241 print_halts = false; 242 #endif 243 244 int target = __atomic_add_fetch( &cltr->nprocessors, 1u, __ATOMIC_SEQ_CST ); 245 246 id = doregister((__processor_id_t*)&this); 247 248 // Lock the RWlock so no-one pushes/pops while we are changing the queue 249 uint_fast32_t last_size = ready_mutate_lock(); 250 251 // Adjust the ready queue size 252 ready_queue_grow( cltr, target ); 253 254 // Unlock the RWlock 255 ready_mutate_unlock( last_size ); 256 257 __cfadbg_print_safe(runtime_core, "Kernel : core %p created\n", &this); 258 } 259 260 // Not a ctor, it just preps the destruction but should not destroy members 261 void deinit(processor & this) { 262 263 int target = __atomic_sub_fetch( &this.cltr->nprocessors, 1u, __ATOMIC_SEQ_CST ); 264 265 // Lock the RWlock so no-one pushes/pops while we are changing the queue 266 uint_fast32_t last_size = ready_mutate_lock(); 267 268 // Adjust the ready queue size 269 ready_queue_shrink( this.cltr, target ); 270 271 // Make sure we aren't on the idle queue 272 unsafe_remove( this.cltr->idles, &this ); 273 274 // Unlock the RWlock 275 ready_mutate_unlock( last_size ); 276 277 // Finally we don't need the read_lock any more 278 unregister((__processor_id_t*)&this); 279 } 280 281 void ?{}(processor & this, const char name[], cluster & _cltr) { 282 ( this.idle ){}; 283 ( this.terminated ){ 0 }; 284 ( this.runner ){}; 285 init( this, name, _cltr ); 286 287 __cfadbg_print_safe(runtime_core, "Kernel : Starting core %p\n", &this); 288 289 this.stack = __create_pthread( &this.kernel_thread, __invoke_processor, (void *)&this ); 290 291 } 292 293 void ^?{}(processor & this) with( this ){ 294 if( ! __atomic_load_n(&do_terminate, __ATOMIC_ACQUIRE) ) { 295 __cfadbg_print_safe(runtime_core, "Kernel : core %p signaling termination\n", &this); 296 297 __atomic_store_n(&do_terminate, true, __ATOMIC_RELAXED); 298 __wake_proc( &this ); 299 300 P( terminated ); 301 verify( kernelTLS.this_processor != &this); 302 } 303 304 int err = pthread_join( kernel_thread, 0p ); 305 if( err != 0 ) abort("KERNEL ERROR: joining processor %p caused error %s\n", &this, strerror(err)); 306 307 free( this.stack ); 308 309 deinit( this ); 310 } 311 312 void ?{}(cluster & this, const char name[], Duration preemption_rate, unsigned io_flags) with( this ) { 313 this.name = name; 314 this.preemption_rate = preemption_rate; 315 this.nprocessors = 0; 316 ready_queue{}; 317 318 #if !defined(__CFA_NO_STATISTICS__) 319 print_stats = 0; 320 stats = alloc(); 321 __init_stats( stats ); 322 #endif 323 324 threads{ __get }; 325 326 doregister(this); 327 328 // Lock the RWlock so no-one pushes/pops while we are changing the queue 329 uint_fast32_t last_size = ready_mutate_lock(); 330 331 // Adjust the ready queue size 332 ready_queue_grow( &this, 0 ); 333 334 // Unlock the RWlock 335 ready_mutate_unlock( last_size ); 336 337 338 __kernel_io_startup( this, io_flags, &this == mainCluster ); 339 } 340 341 void ^?{}(cluster & this) { 342 __kernel_io_shutdown( this, &this == mainCluster ); 343 344 // Lock the RWlock so no-one pushes/pops while we are changing the queue 345 uint_fast32_t last_size = ready_mutate_lock(); 346 347 // Adjust the ready queue size 348 ready_queue_shrink( &this, 0 ); 349 350 // Unlock the RWlock 351 ready_mutate_unlock( last_size ); 352 353 #if !defined(__CFA_NO_STATISTICS__) 354 if( 0 != this.print_stats ) { 355 __print_stats( this.stats, this.print_stats, true, this.name, (void*)&this ); 356 } 357 free( this.stats ); 358 #endif 359 360 unregister(this); 361 } 92 bool __wake_proc(processor *); 362 93 363 94 //============================================================================================= … … 550 281 } 551 282 552 // KERNEL_ONLY553 // Context invoker for processors554 // This is the entry point for processors (kernel threads)555 // It effectively constructs a coroutine by stealing the pthread stack556 static void * __invoke_processor(void * arg) {557 #if !defined( __CFA_NO_STATISTICS__ )558 __stats_t local_stats;559 __init_stats( &local_stats );560 kernelTLS.this_stats = &local_stats;561 #endif562 563 processor * proc = (processor *) arg;564 kernelTLS.this_processor = proc;565 kernelTLS.this_thread = 0p;566 kernelTLS.preemption_state.[enabled, disable_count] = [false, 1];567 // SKULLDUGGERY: We want to create a context for the processor coroutine568 // which is needed for the 2-step context switch. However, there is no reason569 // to waste the perfectly valid stack create by pthread.570 current_stack_info_t info;571 __stack_t ctx;572 info.storage = &ctx;573 (proc->runner){ proc, &info };574 575 __cfaabi_dbg_print_safe("Coroutine : created stack %p\n", get_coroutine(proc->runner)->stack.storage);576 577 //Set global state578 kernelTLS.this_thread = 0p;579 580 //We now have a proper context from which to schedule threads581 __cfadbg_print_safe(runtime_core, "Kernel : core %p created (%p, %p)\n", proc, &proc->runner, &ctx);582 583 // SKULLDUGGERY: Since the coroutine doesn't have its own stack, we can't584 // resume it to start it like it normally would, it will just context switch585 // back to here. Instead directly call the main since we already are on the586 // appropriate stack.587 get_coroutine(proc->runner)->state = Active;588 main( proc->runner );589 get_coroutine(proc->runner)->state = Halted;590 591 // Main routine of the core returned, the core is now fully terminated592 __cfadbg_print_safe(runtime_core, "Kernel : core %p main ended (%p)\n", proc, &proc->runner);593 594 #if !defined(__CFA_NO_STATISTICS__)595 __tally_stats(proc->cltr->stats, &local_stats);596 if( 0 != proc->print_stats ) {597 __print_stats( &local_stats, proc->print_stats, true, proc->name, (void*)proc );598 }599 #endif600 601 return 0p;602 }603 604 static void Abort( int ret, const char func[] ) {605 if ( ret ) { // pthread routines return errno values606 abort( "%s : internal error, error(%d) %s.", func, ret, strerror( ret ) );607 } // if608 } // Abort609 610 void * __create_pthread( pthread_t * pthread, void * (*start)(void *), void * arg ) {611 pthread_attr_t attr;612 613 Abort( pthread_attr_init( &attr ), "pthread_attr_init" ); // initialize attribute614 615 size_t stacksize;616 // default stack size, normally defined by shell limit617 Abort( pthread_attr_getstacksize( &attr, &stacksize ), "pthread_attr_getstacksize" );618 assert( stacksize >= PTHREAD_STACK_MIN );619 620 void * stack;621 __cfaabi_dbg_debug_do(622 stack = memalign( __page_size, stacksize + __page_size );623 // pthread has no mechanism to create the guard page in user supplied stack.624 if ( mprotect( stack, __page_size, PROT_NONE ) == -1 ) {625 abort( "mprotect : internal error, mprotect failure, error(%d) %s.", errno, strerror( errno ) );626 } // if627 );628 __cfaabi_dbg_no_debug_do(629 stack = malloc( stacksize );630 );631 632 Abort( pthread_attr_setstack( &attr, stack, stacksize ), "pthread_attr_setstack" );633 634 Abort( pthread_create( pthread, &attr, start, arg ), "pthread_create" );635 return stack;636 }637 638 // KERNEL_ONLY639 static void __kernel_first_resume( processor * this ) {640 $thread * src = mainThread;641 $coroutine * dst = get_coroutine(this->runner);642 643 verify( ! kernelTLS.preemption_state.enabled );644 645 kernelTLS.this_thread->curr_cor = dst;646 __stack_prepare( &dst->stack, 65000 );647 __cfactx_start(main, dst, this->runner, __cfactx_invoke_coroutine);648 649 verify( ! kernelTLS.preemption_state.enabled );650 651 dst->last = &src->self_cor;652 dst->starter = dst->starter ? dst->starter : &src->self_cor;653 654 // make sure the current state is still correct655 /* paranoid */ verify(src->state == Ready);656 657 // context switch to specified coroutine658 verify( dst->context.SP );659 __cfactx_switch( &src->context, &dst->context );660 // when __cfactx_switch returns we are back in the src coroutine661 662 mainThread->curr_cor = &mainThread->self_cor;663 664 // make sure the current state has been update665 /* paranoid */ verify(src->state == Active);666 667 verify( ! kernelTLS.preemption_state.enabled );668 }669 670 // KERNEL_ONLY671 static void __kernel_last_resume( processor * this ) {672 $coroutine * src = &mainThread->self_cor;673 $coroutine * dst = get_coroutine(this->runner);674 675 verify( ! kernelTLS.preemption_state.enabled );676 verify( dst->starter == src );677 verify( dst->context.SP );678 679 // SKULLDUGGERY in debug the processors check that the680 // stack is still within the limit of the stack limits after running a thread.681 // that check doesn't make sense if we context switch to the processor using the682 // coroutine semantics. Since this is a special case, use the current context683 // info to populate these fields.684 __cfaabi_dbg_debug_do(685 __stack_context_t ctx;686 CtxGet( ctx );687 mainThread->context.SP = ctx.SP;688 mainThread->context.FP = ctx.FP;689 )690 691 // context switch to the processor692 __cfactx_switch( &src->context, &dst->context );693 }694 695 283 //----------------------------------------------------------------------------- 696 284 // Scheduler routines … … 834 422 835 423 //============================================================================================= 836 // Kernel Setup logic837 //=============================================================================================838 //-----------------------------------------------------------------------------839 // Kernel boot procedures840 static void __kernel_startup(void) {841 verify( ! kernelTLS.preemption_state.enabled );842 __cfadbg_print_safe(runtime_core, "Kernel : Starting\n");843 844 __page_size = sysconf( _SC_PAGESIZE );845 846 __cfa_dbg_global_clusters.list{ __get };847 __cfa_dbg_global_clusters.lock{};848 849 // Initialize the global scheduler lock850 __scheduler_lock = (__scheduler_RWLock_t*)&storage___scheduler_lock;851 (*__scheduler_lock){};852 853 // Initialize the main cluster854 mainCluster = (cluster *)&storage_mainCluster;855 (*mainCluster){"Main Cluster"};856 857 __cfadbg_print_safe(runtime_core, "Kernel : Main cluster ready\n");858 859 // Start by initializing the main thread860 // SKULLDUGGERY: the mainThread steals the process main thread861 // which will then be scheduled by the mainProcessor normally862 mainThread = ($thread *)&storage_mainThread;863 current_stack_info_t info;864 info.storage = (__stack_t*)&storage_mainThreadCtx;865 (*mainThread){ &info };866 867 __cfadbg_print_safe(runtime_core, "Kernel : Main thread ready\n");868 869 870 871 // Construct the processor context of the main processor872 void ?{}(processorCtx_t & this, processor * proc) {873 (this.__cor){ "Processor" };874 this.__cor.starter = 0p;875 this.proc = proc;876 }877 878 void ?{}(processor & this) with( this ) {879 ( this.idle ){};880 ( this.terminated ){ 0 };881 ( this.runner ){};882 init( this, "Main Processor", *mainCluster );883 kernel_thread = pthread_self();884 885 runner{ &this };886 __cfadbg_print_safe(runtime_core, "Kernel : constructed main processor context %p\n", &runner);887 }888 889 // Initialize the main processor and the main processor ctx890 // (the coroutine that contains the processing control flow)891 mainProcessor = (processor *)&storage_mainProcessor;892 (*mainProcessor){};893 894 //initialize the global state variables895 kernelTLS.this_processor = mainProcessor;896 kernelTLS.this_thread = mainThread;897 898 #if !defined( __CFA_NO_STATISTICS__ )899 kernelTLS.this_stats = (__stats_t *)& storage_mainProcStats;900 __init_stats( kernelTLS.this_stats );901 #endif902 903 // Enable preemption904 kernel_start_preemption();905 906 // Add the main thread to the ready queue907 // once resume is called on mainProcessor->runner the mainThread needs to be scheduled like any normal thread908 __schedule_thread((__processor_id_t *)mainProcessor, mainThread);909 910 // SKULLDUGGERY: Force a context switch to the main processor to set the main thread's context to the current UNIX911 // context. Hence, the main thread does not begin through __cfactx_invoke_thread, like all other threads. The trick here is that912 // mainThread is on the ready queue when this call is made.913 __kernel_first_resume( kernelTLS.this_processor );914 915 916 // THE SYSTEM IS NOW COMPLETELY RUNNING917 918 919 // Now that the system is up, finish creating systems that need threading920 __kernel_io_finish_start( *mainCluster );921 922 923 __cfadbg_print_safe(runtime_core, "Kernel : Started\n--------------------------------------------------\n\n");924 925 verify( ! kernelTLS.preemption_state.enabled );926 enable_interrupts( __cfaabi_dbg_ctx );927 verify( TL_GET( preemption_state.enabled ) );928 }929 930 static void __kernel_shutdown(void) {931 //Before we start shutting things down, wait for systems that need threading to shutdown932 __kernel_io_prepare_stop( *mainCluster );933 934 /* paranoid */ verify( TL_GET( preemption_state.enabled ) );935 disable_interrupts();936 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );937 938 __cfadbg_print_safe(runtime_core, "\n--------------------------------------------------\nKernel : Shutting down\n");939 940 // SKULLDUGGERY: Notify the mainProcessor it needs to terminates.941 // When its coroutine terminates, it return control to the mainThread942 // which is currently here943 __atomic_store_n(&mainProcessor->do_terminate, true, __ATOMIC_RELEASE);944 __kernel_last_resume( kernelTLS.this_processor );945 mainThread->self_cor.state = Halted;946 947 // THE SYSTEM IS NOW COMPLETELY STOPPED948 949 // Disable preemption950 kernel_stop_preemption();951 952 // Destroy the main processor and its context in reverse order of construction953 // These were manually constructed so we need manually destroy them954 void ^?{}(processor & this) with( this ){955 deinit( this );956 957 /* paranoid */ verify( this.do_terminate == true );958 __cfaabi_dbg_print_safe("Kernel : destroyed main processor context %p\n", &runner);959 }960 961 ^(*mainProcessor){};962 963 // Final step, destroy the main thread since it is no longer needed964 965 // Since we provided a stack to this taxk it will not destroy anything966 /* paranoid */ verify(mainThread->self_cor.stack.storage == (__stack_t*)(((uintptr_t)&storage_mainThreadCtx)| 0x1));967 ^(*mainThread){};968 969 ^(*mainCluster){};970 971 ^(*__scheduler_lock){};972 973 ^(__cfa_dbg_global_clusters.list){};974 ^(__cfa_dbg_global_clusters.lock){};975 976 __cfadbg_print_safe(runtime_core, "Kernel : Shutdown complete\n");977 }978 979 //=============================================================================================980 424 // Kernel Idle Sleep 981 425 //============================================================================================= … … 997 441 998 442 // Unconditionnaly wake a thread 999 staticbool __wake_proc(processor * this) {443 bool __wake_proc(processor * this) { 1000 444 __cfadbg_print_safe(runtime_core, "Kernel : waking Processor %p\n", this); 1001 445 … … 1173 617 1174 618 //----------------------------------------------------------------------------- 1175 // Global Queues1176 void doregister( cluster & cltr ) {1177 lock ( __cfa_dbg_global_clusters.lock __cfaabi_dbg_ctx2);1178 push_front( __cfa_dbg_global_clusters.list, cltr );1179 unlock ( __cfa_dbg_global_clusters.lock );1180 }1181 1182 void unregister( cluster & cltr ) {1183 lock ( __cfa_dbg_global_clusters.lock __cfaabi_dbg_ctx2);1184 remove( __cfa_dbg_global_clusters.list, cltr );1185 unlock( __cfa_dbg_global_clusters.lock );1186 }1187 1188 void doregister( cluster * cltr, $thread & thrd ) {1189 lock (cltr->thread_list_lock __cfaabi_dbg_ctx2);1190 cltr->nthreads += 1;1191 push_front(cltr->threads, thrd);1192 unlock (cltr->thread_list_lock);1193 }1194 1195 void unregister( cluster * cltr, $thread & thrd ) {1196 lock (cltr->thread_list_lock __cfaabi_dbg_ctx2);1197 remove(cltr->threads, thrd );1198 cltr->nthreads -= 1;1199 unlock(cltr->thread_list_lock);1200 }1201 1202 //-----------------------------------------------------------------------------1203 619 // Debug 1204 620 __cfaabi_dbg_debug_do( -
libcfa/src/concurrency/kernel.hfa
r6dba8755 r95789be 16 16 #pragma once 17 17 18 #include <stdbool.h>19 #include <stdint.h>20 21 18 #include "invoke.h" 22 19 #include "time_t.hfa" … … 26 23 27 24 extern "C" { 28 #include <pthread.h> 29 #include <semaphore.h> 25 #include <bits/pthreadtypes.h> 30 26 } 31 27 … … 129 125 struct __io_data; 130 126 131 #define CFA_CLUSTER_IO_POLLER_USER_THREAD (1 << 0) // 0x01 132 #define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS (1 << 1) // 0x02 133 #define CFA_CLUSTER_IO_EAGER_SUBMITS (1 << 2) // 0x04 134 #define CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS (1 << 3) // 0x08 135 #define CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES (1 << 4) // 0x10 136 #define CFA_CLUSTER_IO_BUFFLEN_OFFSET 16 137 127 // IO poller user-thread 128 // Not using the "thread" keyword because we want to control 129 // more carefully when to start/stop it 130 struct $io_ctx_thread { 131 struct __io_data * ring; 132 single_sem sem; 133 volatile bool done; 134 $thread self; 135 }; 136 137 138 struct io_context { 139 $io_ctx_thread thrd; 140 }; 141 142 struct io_context_params { 143 int num_entries; 144 int num_ready; 145 int submit_aff; 146 bool eager_submits:1; 147 bool poller_submits:1; 148 bool poll_submit:1; 149 bool poll_complete:1; 150 }; 151 152 void ?{}(io_context_params & this); 153 154 void ?{}(io_context & this, struct cluster & cl); 155 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params); 156 void ^?{}(io_context & this); 157 158 struct io_cancellation { 159 uint32_t target; 160 }; 161 162 static inline void ?{}(io_cancellation & this) { this.target = -1u; } 163 static inline void ^?{}(io_cancellation & this) {} 164 bool cancel(io_cancellation & this); 138 165 139 166 //----------------------------------------------------------------------------- … … 206 233 } node; 207 234 208 struct __io_data * io; 235 struct { 236 io_context * ctxs; 237 unsigned cnt; 238 } io; 209 239 210 240 #if !defined(__CFA_NO_STATISTICS__) … … 215 245 extern Duration default_preemption(); 216 246 217 void ?{} (cluster & this, const char name[], Duration preemption_rate, unsigned flags);247 void ?{} (cluster & this, const char name[], Duration preemption_rate, unsigned num_io, const io_context_params & io_params); 218 248 void ^?{}(cluster & this); 219 249 220 static inline void ?{} (cluster & this) { this{"Anonymous Cluster", default_preemption(), 0}; } 221 static inline void ?{} (cluster & this, Duration preemption_rate) { this{"Anonymous Cluster", preemption_rate, 0}; } 222 static inline void ?{} (cluster & this, const char name[]) { this{name, default_preemption(), 0}; } 223 static inline void ?{} (cluster & this, unsigned flags) { this{"Anonymous Cluster", default_preemption(), flags}; } 224 static inline void ?{} (cluster & this, Duration preemption_rate, unsigned flags) { this{"Anonymous Cluster", preemption_rate, flags}; } 225 static inline void ?{} (cluster & this, const char name[], unsigned flags) { this{name, default_preemption(), flags}; } 250 static inline void ?{} (cluster & this) { io_context_params default_params; this{"Anonymous Cluster", default_preemption(), 1, default_params}; } 251 static inline void ?{} (cluster & this, Duration preemption_rate) { io_context_params default_params; this{"Anonymous Cluster", preemption_rate, 1, default_params}; } 252 static inline void ?{} (cluster & this, const char name[]) { io_context_params default_params; this{name, default_preemption(), 1, default_params}; } 253 static inline void ?{} (cluster & this, unsigned num_io) { io_context_params default_params; this{"Anonymous Cluster", default_preemption(), num_io, default_params}; } 254 static inline void ?{} (cluster & this, Duration preemption_rate, unsigned num_io) { io_context_params default_params; this{"Anonymous Cluster", preemption_rate, num_io, default_params}; } 255 static inline void ?{} (cluster & this, const char name[], unsigned num_io) { io_context_params default_params; this{name, default_preemption(), num_io, default_params}; } 256 static inline void ?{} (cluster & this, const io_context_params & io_params) { this{"Anonymous Cluster", default_preemption(), 1, io_params}; } 257 static inline void ?{} (cluster & this, Duration preemption_rate, const io_context_params & io_params) { this{"Anonymous Cluster", preemption_rate, 1, io_params}; } 258 static inline void ?{} (cluster & this, const char name[], const io_context_params & io_params) { this{name, default_preemption(), 1, io_params}; } 259 static inline void ?{} (cluster & this, unsigned num_io, const io_context_params & io_params) { this{"Anonymous Cluster", default_preemption(), num_io, io_params}; } 260 static inline void ?{} (cluster & this, Duration preemption_rate, unsigned num_io, const io_context_params & io_params) { this{"Anonymous Cluster", preemption_rate, num_io, io_params}; } 261 static inline void ?{} (cluster & this, const char name[], unsigned num_io, const io_context_params & io_params) { this{name, default_preemption(), num_io, io_params}; } 226 262 227 263 static inline [cluster *&, cluster *& ] __get( cluster & this ) __attribute__((const)) { return this.node.[next, prev]; } -
libcfa/src/concurrency/kernel_private.hfa
r6dba8755 r95789be 22 22 #include "stats.hfa" 23 23 24 #include "bits/random.hfa"25 26 27 24 //----------------------------------------------------------------------------- 28 25 // Scheduler … … 53 50 54 51 55 struct event_kernel_t {56 alarm_list_t alarms;57 __spinlock_t lock;58 };59 60 extern event_kernel_t * event_kernel;61 62 struct __cfa_kernel_preemption_state_t {63 bool enabled;64 bool in_progress;65 unsigned short disable_count;66 };67 68 extern volatile thread_local __cfa_kernel_preemption_state_t preemption_state __attribute__ ((tls_model ( "initial-exec" )));69 70 52 extern cluster * mainCluster; 71 53 … … 84 66 void __unpark( struct __processor_id_t *, $thread * thrd __cfaabi_dbg_ctx_param2 ); 85 67 86 //----------------------------------------------------------------------------- 87 // I/O 88 void __kernel_io_startup ( cluster &, unsigned, bool ); 89 void __kernel_io_finish_start( cluster & ); 90 void __kernel_io_prepare_stop( cluster & ); 91 void __kernel_io_shutdown ( cluster &, bool ); 68 static inline bool __post(single_sem & this, struct __processor_id_t * id) { 69 for() { 70 struct $thread * expected = this.ptr; 71 if(expected == 1p) return false; 72 if(expected == 0p) { 73 if(__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 74 return false; 75 } 76 } 77 else { 78 if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 79 __unpark( id, expected __cfaabi_dbg_ctx2 ); 80 return true; 81 } 82 } 83 } 84 } 92 85 93 86 //----------------------------------------------------------------------------- 94 87 // Utils 95 #define KERNEL_STORAGE(T,X) __attribute((aligned(__alignof__(T)))) static char storage_##X[sizeof(T)]96 97 static inline uint64_t __tls_rand() {98 #if defined(__SIZEOF_INT128__)99 return __lehmer64( kernelTLS.rand_seed );100 #else101 return __xorshift64( kernelTLS.rand_seed );102 #endif103 }104 105 106 void doregister( struct cluster & cltr );107 void unregister( struct cluster & cltr );108 109 88 void doregister( struct cluster * cltr, struct $thread & thrd ); 110 89 void unregister( struct cluster * cltr, struct $thread & thrd ); 90 91 //----------------------------------------------------------------------------- 92 // I/O 93 void ^?{}(io_context & this, bool ); 111 94 112 95 //======================================================================= … … 280 263 void ready_queue_shrink(struct cluster * cltr, int target); 281 264 282 //-----------------------------------------------------------------------283 // IO user data284 struct __io_user_data_t {285 int32_t result;286 $thread * thrd;287 };288 289 //-----------------------------------------------------------------------290 // Statics call at the end of each thread to register statistics291 #if !defined(__CFA_NO_STATISTICS__)292 static inline struct __stats_t * __tls_stats() {293 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );294 /* paranoid */ verify( kernelTLS.this_stats );295 return kernelTLS.this_stats;296 }297 298 #define __STATS__(in_kernel, ...) { \299 if( !(in_kernel) ) disable_interrupts(); \300 with( *__tls_stats() ) { \301 __VA_ARGS__ \302 } \303 if( !(in_kernel) ) enable_interrupts( __cfaabi_dbg_ctx ); \304 }305 #else306 #define __STATS__(in_kernel, ...)307 #endif308 265 309 266 // Local Variables: // -
libcfa/src/concurrency/preemption.cfa
r6dba8755 r95789be 26 26 27 27 #include "bits/signal.hfa" 28 #include "kernel_private.hfa" 28 29 29 30 #if !defined(__CFA_DEFAULT_PREEMPTION__) … … 293 294 // Startup routine to activate preemption 294 295 // Called from kernel_startup 295 void kernel_start_preemption() {296 void __kernel_alarm_startup() { 296 297 __cfaabi_dbg_print_safe( "Kernel : Starting preemption\n" ); 297 298 … … 315 316 // Shutdown routine to deactivate preemption 316 317 // Called from kernel_shutdown 317 void kernel_stop_preemption() {318 void __kernel_alarm_shutdown() { 318 319 __cfaabi_dbg_print_safe( "Kernel : Preemption stopping\n" ); 319 320 -
libcfa/src/concurrency/preemption.hfa
r6dba8755 r95789be 16 16 #pragma once 17 17 18 #include "bits/locks.hfa" 18 19 #include "alarm.hfa" 19 #include "kernel_private.hfa"20 20 21 void kernel_start_preemption(); 22 void kernel_stop_preemption(); 21 struct event_kernel_t { 22 alarm_list_t alarms; 23 __spinlock_t lock; 24 }; 25 26 extern event_kernel_t * event_kernel; 27 23 28 void update_preemption( processor * this, Duration duration ); 24 29 -
libcfa/src/concurrency/thread.hfa
r6dba8755 r95789be 84 84 85 85 //----------------------------------------------------------------------------- 86 // Thread getters87 static inline struct $thread * active_thread () { return TL_GET( this_thread ); }88 89 //-----------------------------------------------------------------------------90 86 // Scheduler API 91 87 … … 106 102 bool force_yield( enum __Preemption_Reason ); 107 103 108 static inline void yield() {109 force_yield(__MANUAL_PREEMPTION);110 }111 112 // Yield: yield N times113 static inline void yield( unsigned times ) {114 for( times ) {115 yield();116 }117 }118 119 104 //---------- 120 105 // sleep: force thread to block and be rescheduled after Duration duration -
src/cfa.make
r6dba8755 r95789be 1 CFACOMPILE = $(CFACC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CFAFLAGS) $(CFAFLAGS) $(AM_CFLAGS) $(CFLAGS) 2 LTCFACOMPILE = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \ 1 AM_T_CFA = $(am__t_CFA_@AM_T@) 2 am__t_CFA_ = 3 am__t_CFA_0 = 4 am__t_CFA_1 = /usr/bin/time --quiet -f "$@ %E" # trailling space is necessary 5 6 7 CFACOMPILE = $(AM_T_CFA)$(CFACC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CFAFLAGS) $(CFAFLAGS) $(AM_CFLAGS) $(CFLAGS) 8 LTCFACOMPILE = $(AM_T_CFA)$(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \ 3 9 $(LIBTOOLFLAGS) --mode=compile $(CFACC) $(DEFS) \ 4 10 $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CFAFLAGS) $(AM_CFLAGS) $(CFAFLAGS) $(CFLAGS)
Note: See TracChangeset
for help on using the changeset viewer.