Changeset 92976d9
- Timestamp:
- Apr 10, 2020, 11:20:31 AM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 7df014f
- Parents:
- 72828a8
- Location:
- libcfa/src/concurrency
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r72828a8 r92976d9 1 #include "kernel.hfa" 2 3 #if !defined(HAVE_LINUX_IO_URING_H) 4 void __kernel_io_startup( cluster & this ) { 5 // Nothing to do without io_uring 6 } 7 8 void __kernel_io_shutdown( cluster & this ) { 9 // Nothing to do without io_uring 10 } 11 12 bool is_async( void (*)() ) { 13 return false; 14 } 15 16 #else 17 extern "C" { 18 #define _GNU_SOURCE /* See feature_test_macros(7) */ 19 #include <errno.h> 20 #include <stdint.h> 21 #include <string.h> 22 #include <unistd.h> 23 #include <sys/mman.h> 24 #include <sys/syscall.h> 25 26 #include <linux/io_uring.h> 27 } 28 29 #include "bits/signal.hfa" 30 #include "kernel_private.hfa" 31 #include "thread.hfa" 32 33 uint32_t entries_per_cluster() { 34 return 256; 35 } 36 37 static void * __io_poller( void * arg ); 38 39 //============================================================================================= 40 // I/O Startup / Shutdown logic 41 //============================================================================================= 42 void __kernel_io_startup( cluster & this ) { 43 // Step 1 : call to setup 44 struct io_uring_params params; 45 memset(¶ms, 0, sizeof(params)); 46 47 int fd = syscall(__NR_io_uring_setup, entries_per_cluster(), ¶ms ); 48 if(fd < 0) { 49 abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno)); 50 } 51 52 // Step 2 : mmap result 53 memset(&this.io, 0, sizeof(struct io_ring)); 54 struct io_uring_sq * sq = &this.io.submit_q; 55 struct io_uring_cq * cq = &this.io.completion_q; 56 57 // calculate the right ring size 58 sq->ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) ); 59 cq->ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe)); 60 61 // Requires features 62 // // adjust the size according to the parameters 63 // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 64 // cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz); 65 // } 66 67 // mmap the Submit Queue into existence 68 sq->ring_ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING); 69 if (sq->ring_ptr == (void*)MAP_FAILED) { 70 abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno)); 71 } 72 73 // mmap the Completion Queue into existence (may or may not be needed) 74 // Requires features 75 // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 76 // cq->ring_ptr = sq->ring_ptr; 77 // } 78 // else { 79 // We need multiple call to MMAP 80 cq->ring_ptr = mmap(0, cq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING); 81 if (cq->ring_ptr == (void*)MAP_FAILED) { 82 munmap(sq->ring_ptr, sq->ring_sz); 83 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno)); 84 } 85 // } 86 87 // mmap the submit queue entries 88 size_t size = params.sq_entries * sizeof(struct io_uring_sqe); 89 sq->sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES); 90 if (sq->sqes == (struct io_uring_sqe *)MAP_FAILED) { 91 munmap(sq->ring_ptr, sq->ring_sz); 92 if (cq->ring_ptr != sq->ring_ptr) munmap(cq->ring_ptr, cq->ring_sz); 93 abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno)); 94 } 95 96 // Get the pointers from the kernel to fill the structure 97 // submit queue 98 sq->head = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.head; 99 sq->tail = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.tail; 100 sq->mask = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.ring_mask; 101 sq->entries = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.ring_entries; 102 sq->flags = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.flags; 103 sq->dropped = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.dropped; 104 sq->array = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.array; 105 106 // completion queue 107 cq->head = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.head; 108 cq->tail = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.tail; 109 cq->mask = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.ring_mask; 110 cq->entries = (struct io_uring_cqe *)((intptr_t)cq->ring_ptr) + params.cq_off.ring_entries; 111 cq->overflow = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.overflow; 112 cq->cqes = (struct io_uring_cqe *)((intptr_t)cq->ring_ptr) + params.cq_off.cqes; 113 114 // Update the global ring info 115 this.io.flags = params.flags; 116 this.io.fd = fd; 117 this.io.done = false; 118 119 // Create the poller thread 120 this.io.stack = __create_pthread( &this.io.poller, __io_poller, &this ); 121 } 122 123 void __kernel_io_shutdown( cluster & this ) { 124 // Stop the IO Poller 125 // Notify the poller thread of the shutdown 126 __atomic_store_n(&this.io.done, true, __ATOMIC_SEQ_CST); 127 sigval val = { 1 }; 128 pthread_sigqueue( this.io.poller, SIGUSR1, val ); 129 130 // Wait for the poller thread to finish 131 pthread_join( this.io.poller, 0p ); 132 free( this.io.stack ); 133 134 // Shutdown the io rings 135 struct io_uring_sq & sq = this.io.submit_q; 136 struct io_uring_cq & cq = this.io.completion_q; 137 138 // unmap the submit queue entries 139 munmap(sq.sqes, *sq.entries * sizeof(struct io_uring_sqe)); 140 141 // unmap the Submit Queue ring 142 munmap(sq.ring_ptr, sq.ring_sz); 143 144 // unmap the Completion Queue ring, if it is different 145 if (cq.ring_ptr != sq.ring_ptr) { 146 munmap(cq.ring_ptr, cq.ring_sz); 147 } 148 149 // close the file descriptor 150 close(this.io.fd); 151 } 152 153 //============================================================================================= 154 // I/O Polling 155 //============================================================================================= 156 struct io_user_data { 157 int32_t result; 158 $thread * thrd; 159 }; 160 161 // Process a single completion message from the io_uring 162 // This is NOT thread-safe 163 static bool __io_process(struct io_ring & ring) { 164 unsigned head = *ring.completion_q.head; 165 unsigned tail = __atomic_load_n(ring.completion_q.tail, __ATOMIC_ACQUIRE); 166 167 if (head == tail) return false; 168 169 unsigned idx = head & (*ring.completion_q.mask); 170 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; 171 172 /* paranoid */ verify(&cqe); 173 174 struct io_user_data * data = (struct io_user_data *)cqe.user_data; 175 data->result = cqe.res; 176 unpark( data->thrd __cfaabi_dbg_ctx2 ); 177 178 // Mark to the kernel that the cqe has been seen 179 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 180 __atomic_fetch_add( ring.completion_q.head, 1, __ATOMIC_RELEASE ); 181 182 return true; 183 } 184 185 static void * __io_poller( void * arg ) { 186 cluster * cltr = (cluster *)arg; 187 struct io_ring & ring = cltr->io; 188 189 sigset_t mask; 190 sigfillset(&mask); 191 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) { 192 abort( "KERNEL ERROR: IO_URING - pthread_sigmask" ); 193 } 194 195 sigdelset( &mask, SIGUSR1 ); 196 197 verify( (*ring.submit_q.head) == (*ring.submit_q.tail) ); 198 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) ); 199 200 LOOP: while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 201 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, 1, IORING_ENTER_GETEVENTS, &mask, _NSIG / 8); 202 if( ret < 0 ) { 203 switch((int)errno) { 204 case EAGAIN: 205 case EINTR: 206 continue LOOP; 207 default: 208 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 209 } 210 } 211 212 // Drain the queue 213 while(__io_process(ring)) {} 214 } 215 216 return 0p; 217 } 218 219 //============================================================================================= 220 // I/O Submissions 221 //============================================================================================= 222 223 [* struct io_uring_sqe, uint32_t] io_submit_prelude( struct io_ring & ring ) { 224 return [0p, 0]; 225 } 226 227 //============================================================================================= 228 // I/O Interface 229 //============================================================================================= 230 231 /* 232 extern "C" { 233 #define __USE_GNU 234 #include <sys/uio.h> 235 } 236 237 ssize_t async_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 238 #if !defined(IORING_OP_READV) 239 return preadv2(fd, iov, iovcnt, offset, flags); 240 #else 241 sqe->opcode = IORING_OP_READV; 242 sqe->flags = 0; 243 sqe->ioprio = 0; 244 sqe->fd = fd; 245 sqe->off = offset; 246 sqe->addr = (unsigned long) iov; 247 sqe->len = iovcnt; 248 sqe->rw_flags = 0; 249 sqe->user_data = 0; 250 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; 251 #endif 252 } 253 254 ssize_t async_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 255 #if !defined(IORING_OP_WRITEV) 256 return pwritev2(fd, iov, iovcnt, offset, flags); 257 #else 258 #warning not implemented 259 #endif 260 } 261 262 bool is_async( void (*func)() ) { 263 switch((uintptr_t)func) { 264 case (uintptr_t)preadv2: 265 case (uintptr_t)async_preadv2: 266 #if defined(IORING_OP_READV) 267 return true; 268 #else 269 return false; 270 #endif 271 default: 272 return false; 273 } 274 } 275 */ 276 277 #endif -
libcfa/src/concurrency/kernel.cfa
r72828a8 r92976d9 262 262 threads{ __get }; 263 263 264 __kernel_io_startup( this ); 265 264 266 doregister(this); 265 267 } 266 268 267 269 void ^?{}(cluster & this) { 270 __kernel_io_shutdown( this ); 271 268 272 unregister(this); 269 273 } … … 808 812 ^(*mainThread){}; 809 813 814 ^(*mainCluster){}; 815 810 816 ^(__cfa_dbg_global_clusters.list){}; 811 817 ^(__cfa_dbg_global_clusters.lock){}; -
libcfa/src/concurrency/kernel.hfa
r72828a8 r92976d9 17 17 18 18 #include <stdbool.h> 19 #include <stdint.h> 19 20 20 21 #include "invoke.h" … … 111 112 112 113 //----------------------------------------------------------------------------- 114 // I/O 115 #if defined(HAVE_LINUX_IO_URING_H) 116 struct io_uring_sq { 117 uint32_t * head; 118 uint32_t * tail; 119 uint32_t * mask; 120 uint32_t * entries; 121 uint32_t * flags; 122 uint32_t * dropped; 123 uint32_t * array; 124 struct io_uring_sqe * sqes; 125 126 uint32_t sqe_head; 127 uint32_t sqe_tail; 128 129 size_t ring_sz; 130 void * ring_ptr; 131 }; 132 133 struct io_uring_cq { 134 volatile uint32_t * head; 135 volatile uint32_t * tail; 136 uint32_t * mask; 137 struct io_uring_cqe * entries; 138 uint32_t * overflow; 139 struct io_uring_cqe * cqes; 140 141 size_t ring_sz; 142 void * ring_ptr; 143 }; 144 145 struct io_ring { 146 struct io_uring_sq submit_q; 147 struct io_uring_cq completion_q; 148 uint32_t flags; 149 int fd; 150 pthread_t poller; 151 void * stack; 152 volatile bool done; 153 }; 154 #endif 155 156 //----------------------------------------------------------------------------- 113 157 // Cluster 114 158 struct cluster { … … 141 185 cluster * prev; 142 186 } node; 187 188 #if defined(HAVE_LINUX_IO_URING_H) 189 struct io_ring io; 190 #endif 143 191 }; 144 192 extern Duration default_preemption(); -
libcfa/src/concurrency/kernel_private.hfa
r72828a8 r92976d9 71 71 72 72 //----------------------------------------------------------------------------- 73 // I/O 74 void __kernel_io_startup ( cluster & ); 75 void __kernel_io_shutdown( cluster & ); 76 77 //----------------------------------------------------------------------------- 73 78 // Utils 74 79 #define KERNEL_STORAGE(T,X) static char storage_##X[sizeof(T)]
Note: See TracChangeset
for help on using the changeset viewer.