Changes in / [7df014f:0100882]
- Location:
- libcfa/src/concurrency
- Files:
-
- 4 edited
-
io.cfa (modified) (1 diff)
-
kernel.cfa (modified) (2 diffs)
-
kernel.hfa (modified) (3 diffs)
-
kernel_private.hfa (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r7df014f r0100882 1 #include "kernel.hfa"2 3 #if !defined(HAVE_LINUX_IO_URING_H)4 void __kernel_io_startup( cluster & this ) {5 // Nothing to do without io_uring6 }7 8 void __kernel_io_shutdown( cluster & this ) {9 // Nothing to do without io_uring10 }11 12 bool is_async( void (*)() ) {13 return false;14 }15 16 #else17 extern "C" {18 #define _GNU_SOURCE /* See feature_test_macros(7) */19 #include <errno.h>20 #include <stdint.h>21 #include <string.h>22 #include <unistd.h>23 #include <sys/mman.h>24 #include <sys/syscall.h>25 26 #include <linux/io_uring.h>27 }28 29 #include "bits/signal.hfa"30 #include "kernel_private.hfa"31 #include "thread.hfa"32 33 uint32_t entries_per_cluster() {34 return 256;35 }36 37 static void * __io_poller( void * arg );38 39 //=============================================================================================40 // I/O Startup / Shutdown logic41 //=============================================================================================42 void __kernel_io_startup( cluster & this ) {43 // Step 1 : call to setup44 struct io_uring_params params;45 memset(¶ms, 0, sizeof(params));46 47 int fd = syscall(__NR_io_uring_setup, entries_per_cluster(), ¶ms );48 if(fd < 0) {49 abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));50 }51 52 // Step 2 : mmap result53 memset(&this.io, 0, sizeof(struct io_ring));54 struct io_uring_sq * sq = &this.io.submit_q;55 struct io_uring_cq * cq = &this.io.completion_q;56 57 // calculate the right ring size58 sq->ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) );59 cq->ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe));60 61 // Requires features62 // // adjust the size according to the parameters63 // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {64 // cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz);65 // }66 67 // mmap the Submit Queue into existence68 sq->ring_ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);69 if (sq->ring_ptr == (void*)MAP_FAILED) {70 abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));71 }72 73 // mmap the Completion Queue into existence (may or may not be needed)74 // Requires features75 // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {76 // cq->ring_ptr = sq->ring_ptr;77 // }78 // else {79 // We need multiple call to MMAP80 cq->ring_ptr = mmap(0, cq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);81 if (cq->ring_ptr == (void*)MAP_FAILED) {82 munmap(sq->ring_ptr, sq->ring_sz);83 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));84 }85 // }86 87 // mmap the submit queue entries88 size_t size = params.sq_entries * sizeof(struct io_uring_sqe);89 sq->sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);90 if (sq->sqes == (struct io_uring_sqe *)MAP_FAILED) {91 munmap(sq->ring_ptr, sq->ring_sz);92 if (cq->ring_ptr != sq->ring_ptr) munmap(cq->ring_ptr, cq->ring_sz);93 abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));94 }95 96 // Get the pointers from the kernel to fill the structure97 // submit queue98 sq->head = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.head;99 sq->tail = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.tail;100 sq->mask = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.ring_mask;101 sq->entries = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.ring_entries;102 sq->flags = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.flags;103 sq->dropped = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.dropped;104 sq->array = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.array;105 106 // completion queue107 cq->head = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.head;108 cq->tail = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.tail;109 cq->mask = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.ring_mask;110 cq->entries = (struct io_uring_cqe *)((intptr_t)cq->ring_ptr) + params.cq_off.ring_entries;111 cq->overflow = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.overflow;112 cq->cqes = (struct io_uring_cqe *)((intptr_t)cq->ring_ptr) + params.cq_off.cqes;113 114 // Update the global ring info115 this.io.flags = params.flags;116 this.io.fd = fd;117 this.io.done = false;118 119 // Create the poller thread120 this.io.stack = __create_pthread( &this.io.poller, __io_poller, &this );121 }122 123 void __kernel_io_shutdown( cluster & this ) {124 // Stop the IO Poller125 // Notify the poller thread of the shutdown126 __atomic_store_n(&this.io.done, true, __ATOMIC_SEQ_CST);127 sigval val = { 1 };128 pthread_sigqueue( this.io.poller, SIGUSR1, val );129 130 // Wait for the poller thread to finish131 pthread_join( this.io.poller, 0p );132 free( this.io.stack );133 134 // Shutdown the io rings135 struct io_uring_sq & sq = this.io.submit_q;136 struct io_uring_cq & cq = this.io.completion_q;137 138 // unmap the submit queue entries139 munmap(sq.sqes, *sq.entries * sizeof(struct io_uring_sqe));140 141 // unmap the Submit Queue ring142 munmap(sq.ring_ptr, sq.ring_sz);143 144 // unmap the Completion Queue ring, if it is different145 if (cq.ring_ptr != sq.ring_ptr) {146 munmap(cq.ring_ptr, cq.ring_sz);147 }148 149 // close the file descriptor150 close(this.io.fd);151 }152 153 //=============================================================================================154 // I/O Polling155 //=============================================================================================156 struct io_user_data {157 int32_t result;158 $thread * thrd;159 };160 161 // Process a single completion message from the io_uring162 // This is NOT thread-safe163 static bool __io_process(struct io_ring & ring) {164 unsigned head = *ring.completion_q.head;165 unsigned tail = __atomic_load_n(ring.completion_q.tail, __ATOMIC_ACQUIRE);166 167 if (head == tail) return false;168 169 unsigned idx = head & (*ring.completion_q.mask);170 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];171 172 /* paranoid */ verify(&cqe);173 174 struct io_user_data * data = (struct io_user_data *)cqe.user_data;175 data->result = cqe.res;176 unpark( data->thrd __cfaabi_dbg_ctx2 );177 178 // Mark to the kernel that the cqe has been seen179 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.180 __atomic_fetch_add( ring.completion_q.head, 1, __ATOMIC_RELEASE );181 182 return true;183 }184 185 static void * __io_poller( void * arg ) {186 cluster * cltr = (cluster *)arg;187 struct io_ring & ring = cltr->io;188 189 sigset_t mask;190 sigfillset(&mask);191 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {192 abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );193 }194 195 sigdelset( &mask, SIGUSR1 );196 197 verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );198 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );199 200 LOOP: while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {201 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, 1, IORING_ENTER_GETEVENTS, &mask, _NSIG / 8);202 if( ret < 0 ) {203 switch((int)errno) {204 case EAGAIN:205 case EINTR:206 continue LOOP;207 default:208 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );209 }210 }211 212 // Drain the queue213 while(__io_process(ring)) {}214 }215 216 return 0p;217 }218 219 //=============================================================================================220 // I/O Submissions221 //=============================================================================================222 223 [* struct io_uring_sqe, uint32_t] io_submit_prelude( struct io_ring & ring ) {224 return [0p, 0];225 }226 227 //=============================================================================================228 // I/O Interface229 //=============================================================================================230 231 /*232 extern "C" {233 #define __USE_GNU234 #include <sys/uio.h>235 }236 237 ssize_t async_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {238 #if !defined(IORING_OP_READV)239 return preadv2(fd, iov, iovcnt, offset, flags);240 #else241 sqe->opcode = IORING_OP_READV;242 sqe->flags = 0;243 sqe->ioprio = 0;244 sqe->fd = fd;245 sqe->off = offset;246 sqe->addr = (unsigned long) iov;247 sqe->len = iovcnt;248 sqe->rw_flags = 0;249 sqe->user_data = 0;250 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;251 #endif252 }253 254 ssize_t async_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {255 #if !defined(IORING_OP_WRITEV)256 return pwritev2(fd, iov, iovcnt, offset, flags);257 #else258 #warning not implemented259 #endif260 }261 262 bool is_async( void (*func)() ) {263 switch((uintptr_t)func) {264 case (uintptr_t)preadv2:265 case (uintptr_t)async_preadv2:266 #if defined(IORING_OP_READV)267 return true;268 #else269 return false;270 #endif271 default:272 return false;273 }274 }275 */276 277 #endif -
libcfa/src/concurrency/kernel.cfa
r7df014f r0100882 262 262 threads{ __get }; 263 263 264 __kernel_io_startup( this );265 266 264 doregister(this); 267 265 } 268 266 269 267 void ^?{}(cluster & this) { 270 __kernel_io_shutdown( this );271 272 268 unregister(this); 273 269 } … … 812 808 ^(*mainThread){}; 813 809 814 ^(*mainCluster){};815 816 810 ^(__cfa_dbg_global_clusters.list){}; 817 811 ^(__cfa_dbg_global_clusters.lock){}; -
libcfa/src/concurrency/kernel.hfa
r7df014f r0100882 17 17 18 18 #include <stdbool.h> 19 #include <stdint.h>20 19 21 20 #include "invoke.h" … … 112 111 113 112 //----------------------------------------------------------------------------- 114 // I/O115 #if defined(HAVE_LINUX_IO_URING_H)116 struct io_uring_sq {117 uint32_t * head;118 uint32_t * tail;119 uint32_t * mask;120 uint32_t * entries;121 uint32_t * flags;122 uint32_t * dropped;123 uint32_t * array;124 struct io_uring_sqe * sqes;125 126 uint32_t sqe_head;127 uint32_t sqe_tail;128 129 size_t ring_sz;130 void * ring_ptr;131 };132 133 struct io_uring_cq {134 volatile uint32_t * head;135 volatile uint32_t * tail;136 uint32_t * mask;137 struct io_uring_cqe * entries;138 uint32_t * overflow;139 struct io_uring_cqe * cqes;140 141 size_t ring_sz;142 void * ring_ptr;143 };144 145 struct io_ring {146 struct io_uring_sq submit_q;147 struct io_uring_cq completion_q;148 uint32_t flags;149 int fd;150 pthread_t poller;151 void * stack;152 volatile bool done;153 };154 #endif155 156 //-----------------------------------------------------------------------------157 113 // Cluster 158 114 struct cluster { … … 185 141 cluster * prev; 186 142 } node; 187 188 #if defined(HAVE_LINUX_IO_URING_H)189 struct io_ring io;190 #endif191 143 }; 192 144 extern Duration default_preemption(); -
libcfa/src/concurrency/kernel_private.hfa
r7df014f r0100882 71 71 72 72 //----------------------------------------------------------------------------- 73 // I/O74 void __kernel_io_startup ( cluster & );75 void __kernel_io_shutdown( cluster & );76 77 //-----------------------------------------------------------------------------78 73 // Utils 79 74 #define KERNEL_STORAGE(T,X) static char storage_##X[sizeof(T)]
Note:
See TracChangeset
for help on using the changeset viewer.