Changeset 3e2b9c9 for libcfa/src/concurrency/io.cfa
- Timestamp:
- Aug 3, 2020, 1:59:13 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 95789be
- Parents:
- e660761
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
re660761 r3e2b9c9 14 14 // 15 15 16 #define __cforall_thread__ 17 16 18 #if defined(__CFA_DEBUG__) 17 19 // #define __CFA_DEBUG_PRINT_IO__ … … 19 21 #endif 20 22 21 #include "kernel_private.hfa" 22 #include "bitmanip.hfa" 23 24 #if !defined(CFA_HAVE_LINUX_IO_URING_H) 25 void __kernel_io_startup() { 26 // Nothing to do without io_uring 27 } 28 29 void __kernel_io_shutdown() { 30 // Nothing to do without io_uring 31 } 32 33 void ?{}(io_context & this, struct cluster & cl) {} 34 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {} 35 36 void ^?{}(io_context & this) {} 37 void ^?{}(io_context & this, bool cluster_context) {} 38 39 #else 23 24 #if defined(CFA_HAVE_LINUX_IO_URING_H) 40 25 #define _GNU_SOURCE /* See feature_test_macros(7) */ 41 26 #include <errno.h> 27 #include <signal.h> 42 28 #include <stdint.h> 43 29 #include <string.h> … … 46 32 extern "C" { 47 33 #include <sys/epoll.h> 48 #include <sys/mman.h>49 34 #include <sys/syscall.h> 50 35 … … 52 37 } 53 38 54 #include "bits/signal.hfa" 55 #include "kernel_private.hfa" 56 #include "thread.hfa" 57 58 void ?{}(io_context_params & this) { 59 this.num_entries = 256; 60 this.num_ready = 256; 61 this.submit_aff = -1; 62 this.eager_submits = false; 63 this.poller_submits = false; 64 this.poll_submit = false; 65 this.poll_complete = false; 66 } 67 68 static void * __io_poller_slow( void * arg ); 69 70 // Weirdly, some systems that do support io_uring don't actually define these 71 #ifdef __alpha__ 72 /* 73 * alpha is the only exception, all other architectures 74 * have common numbers for new system calls. 75 */ 76 #ifndef __NR_io_uring_setup 77 #define __NR_io_uring_setup 535 78 #endif 79 #ifndef __NR_io_uring_enter 80 #define __NR_io_uring_enter 536 81 #endif 82 #ifndef __NR_io_uring_register 83 #define __NR_io_uring_register 537 84 #endif 85 #else /* !__alpha__ */ 86 #ifndef __NR_io_uring_setup 87 #define __NR_io_uring_setup 425 88 #endif 89 #ifndef __NR_io_uring_enter 90 #define __NR_io_uring_enter 426 91 #endif 92 #ifndef __NR_io_uring_register 93 #define __NR_io_uring_register 427 94 #endif 95 #endif 96 97 struct __submition_data { 98 // Head and tail of the ring (associated with array) 99 volatile uint32_t * head; 100 volatile uint32_t * tail; 101 volatile uint32_t prev_head; 102 103 // The actual kernel ring which uses head/tail 104 // indexes into the sqes arrays 105 uint32_t * array; 106 107 // number of entries and mask to go with it 108 const uint32_t * num; 109 const uint32_t * mask; 110 111 // Submission flags (Not sure what for) 112 uint32_t * flags; 113 114 // number of sqes not submitted (whatever that means) 115 uint32_t * dropped; 116 117 // Like head/tail but not seen by the kernel 118 volatile uint32_t * ready; 119 uint32_t ready_cnt; 120 121 __spinlock_t lock; 122 __spinlock_t release_lock; 123 124 // A buffer of sqes (not the actual ring) 125 struct io_uring_sqe * sqes; 126 127 // The location and size of the mmaped area 128 void * ring_ptr; 129 size_t ring_sz; 130 }; 131 132 struct __completion_data { 133 // Head and tail of the ring 134 volatile uint32_t * head; 135 volatile uint32_t * tail; 136 137 // number of entries and mask to go with it 138 const uint32_t * mask; 139 const uint32_t * num; 140 141 // number of cqes not submitted (whatever that means) 142 uint32_t * overflow; 143 144 // the kernel ring 145 struct io_uring_cqe * cqes; 146 147 // The location and size of the mmaped area 148 void * ring_ptr; 149 size_t ring_sz; 150 }; 151 152 struct __io_data { 153 struct __submition_data submit_q; 154 struct __completion_data completion_q; 155 uint32_t ring_flags; 156 int fd; 157 bool eager_submits:1; 158 bool poller_submits:1; 159 }; 160 161 //============================================================================================= 162 // I/O Startup / Shutdown logic + Master Poller 163 //============================================================================================= 164 165 // IO Master poller loop forward 166 static void * iopoll_loop( __attribute__((unused)) void * args ); 167 168 static struct { 169 pthread_t thrd; // pthread handle to io poller thread 170 void * stack; // pthread stack for io poller thread 171 int epollfd; // file descriptor to the epoll instance 172 volatile bool run; // Whether or not to continue 173 } iopoll; 174 175 void __kernel_io_startup(void) { 176 __cfaabi_dbg_print_safe( "Kernel : Creating EPOLL instance\n" ); 177 178 iopoll.epollfd = epoll_create1(0); 179 if (iopoll.epollfd == -1) { 180 abort( "internal error, epoll_create1\n"); 181 } 182 183 __cfaabi_dbg_print_safe( "Kernel : Starting io poller thread\n" ); 184 185 iopoll.run = true; 186 iopoll.stack = __create_pthread( &iopoll.thrd, iopoll_loop, 0p ); 187 } 188 189 void __kernel_io_shutdown(void) { 190 // Notify the io poller thread of the shutdown 191 iopoll.run = false; 192 sigval val = { 1 }; 193 pthread_sigqueue( iopoll.thrd, SIGUSR1, val ); 194 195 // Wait for the io poller thread to finish 196 197 pthread_join( iopoll.thrd, 0p ); 198 free( iopoll.stack ); 199 200 int ret = close(iopoll.epollfd); 201 if (ret == -1) { 202 abort( "internal error, close epoll\n"); 203 } 204 205 // Io polling is now fully stopped 206 207 __cfaabi_dbg_print_safe( "Kernel : IO poller stopped\n" ); 208 } 209 210 static void * iopoll_loop( __attribute__((unused)) void * args ) { 211 __processor_id_t id; 212 id.id = doregister(&id); 213 __cfaabi_dbg_print_safe( "Kernel : IO poller thread starting\n" ); 214 215 // Block signals to control when they arrive 216 sigset_t mask; 217 sigfillset(&mask); 218 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) { 219 abort( "internal error, pthread_sigmask" ); 220 } 221 222 sigdelset( &mask, SIGUSR1 ); 223 224 // Create sufficient events 225 struct epoll_event events[10]; 226 // Main loop 227 while( iopoll.run ) { 228 // Wait for events 229 int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask ); 230 231 // Check if an error occured 232 if (nfds == -1) { 233 if( errno == EINTR ) continue; 234 abort( "internal error, pthread_sigmask" ); 235 } 236 237 for(i; nfds) { 238 $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64; 239 /* paranoid */ verify( io_ctx ); 240 __cfadbg_print_safe(io_core, "Kernel I/O : Unparking io poller %p\n", io_ctx); 241 #if !defined( __CFA_NO_STATISTICS__ ) 242 kernelTLS.this_stats = io_ctx->self.curr_cluster->stats; 243 #endif 244 __post( io_ctx->sem, &id ); 245 } 246 } 247 248 __cfaabi_dbg_print_safe( "Kernel : IO poller thread stopping\n" ); 249 unregister(&id); 250 return 0p; 251 } 252 253 //============================================================================================= 254 // I/O Context Constrution/Destruction 255 //============================================================================================= 256 257 void ?{}($io_ctx_thread & this, struct cluster & cl) { (this.self){ "IO Poller", cl }; } 258 void main( $io_ctx_thread & this ); 259 static inline $thread * get_thread( $io_ctx_thread & this ) { return &this.self; } 260 void ^?{}( $io_ctx_thread & mutex this ) {} 261 262 static void __io_create ( __io_data & this, const io_context_params & params_in ); 263 static void __io_destroy( __io_data & this ); 264 265 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) { 266 (this.thrd){ cl }; 267 this.thrd.ring = malloc(); 268 __cfadbg_print_safe(io_core, "Kernel I/O : Creating ring for io_context %p\n", &this); 269 __io_create( *this.thrd.ring, params ); 270 271 __cfadbg_print_safe(io_core, "Kernel I/O : Starting poller thread for io_context %p\n", &this); 272 this.thrd.done = false; 273 __thrd_start( this.thrd, main ); 274 275 __cfadbg_print_safe(io_core, "Kernel I/O : io_context %p ready\n", &this); 276 } 277 278 void ?{}(io_context & this, struct cluster & cl) { 279 io_context_params params; 280 (this){ cl, params }; 281 } 282 283 void ^?{}(io_context & this, bool cluster_context) { 284 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %p\n", &this); 285 286 // Notify the thread of the shutdown 287 __atomic_store_n(&this.thrd.done, true, __ATOMIC_SEQ_CST); 288 289 // If this is an io_context within a cluster, things get trickier 290 $thread & thrd = this.thrd.self; 291 if( cluster_context ) { 292 cluster & cltr = *thrd.curr_cluster; 293 /* paranoid */ verify( cltr.nprocessors == 0 || &cltr == mainCluster ); 294 /* paranoid */ verify( !ready_mutate_islocked() ); 295 296 // We need to adjust the clean-up based on where the thread is 297 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 298 299 ready_schedule_lock( (struct __processor_id_t *)active_processor() ); 300 301 // This is the tricky case 302 // The thread was preempted and now it is on the ready queue 303 // The thread should be the last on the list 304 /* paranoid */ verify( thrd.link.next != 0p ); 305 306 // Remove the thread from the ready queue of this cluster 307 __attribute__((unused)) bool removed = remove_head( &cltr, &thrd ); 308 /* paranoid */ verify( removed ); 309 thrd.link.next = 0p; 310 thrd.link.prev = 0p; 311 __cfaabi_dbg_debug_do( thrd.unpark_stale = true ); 312 313 // Fixup the thread state 314 thrd.state = Blocked; 315 thrd.ticket = 0; 316 thrd.preempted = __NO_PREEMPTION; 317 318 ready_schedule_unlock( (struct __processor_id_t *)active_processor() ); 319 320 // Pretend like the thread was blocked all along 321 } 322 // !!! This is not an else if !!! 323 if( thrd.state == Blocked ) { 324 325 // This is the "easy case" 326 // The thread is parked and can easily be moved to active cluster 327 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster ); 328 thrd.curr_cluster = active_cluster(); 329 330 // unpark the fast io_poller 331 unpark( &thrd __cfaabi_dbg_ctx2 ); 332 } 333 else { 334 335 // The thread is in a weird state 336 // I don't know what to do here 337 abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n"); 338 } 339 } else { 340 unpark( &thrd __cfaabi_dbg_ctx2 ); 341 } 342 343 ^(this.thrd){}; 344 __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %p\n", &this); 345 346 __io_destroy( *this.thrd.ring ); 347 __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %p\n", &this); 348 349 free(this.thrd.ring); 350 } 351 352 void ^?{}(io_context & this) { 353 ^(this){ false }; 354 } 355 356 static void __io_create( __io_data & this, const io_context_params & params_in ) { 357 // Step 1 : call to setup 358 struct io_uring_params params; 359 memset(¶ms, 0, sizeof(params)); 360 if( params_in.poll_submit ) params.flags |= IORING_SETUP_SQPOLL; 361 if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL; 362 363 uint32_t nentries = params_in.num_entries; 364 365 int fd = syscall(__NR_io_uring_setup, nentries, ¶ms ); 366 if(fd < 0) { 367 abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno)); 368 } 369 370 // Step 2 : mmap result 371 memset( &this, 0, sizeof(struct __io_data) ); 372 struct __submition_data & sq = this.submit_q; 373 struct __completion_data & cq = this.completion_q; 374 375 // calculate the right ring size 376 sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) ); 377 cq.ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe)); 378 379 // Requires features 380 #if defined(IORING_FEAT_SINGLE_MMAP) 381 // adjust the size according to the parameters 382 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 383 cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz); 384 } 385 #endif 386 387 // mmap the Submit Queue into existence 388 sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING); 389 if (sq.ring_ptr == (void*)MAP_FAILED) { 390 abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno)); 391 } 392 393 // Requires features 394 #if defined(IORING_FEAT_SINGLE_MMAP) 395 // mmap the Completion Queue into existence (may or may not be needed) 396 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 397 cq.ring_ptr = sq.ring_ptr; 398 } 399 else 400 #endif 401 { 402 // We need multiple call to MMAP 403 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING); 404 if (cq.ring_ptr == (void*)MAP_FAILED) { 405 munmap(sq.ring_ptr, sq.ring_sz); 406 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno)); 407 } 408 } 409 410 // mmap the submit queue entries 411 size_t size = params.sq_entries * sizeof(struct io_uring_sqe); 412 sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES); 413 if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) { 414 munmap(sq.ring_ptr, sq.ring_sz); 415 if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz); 416 abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno)); 417 } 418 419 // Get the pointers from the kernel to fill the structure 420 // submit queue 421 sq.head = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head); 422 sq.tail = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail); 423 sq.mask = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask); 424 sq.num = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries); 425 sq.flags = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags); 426 sq.dropped = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 427 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 428 sq.prev_head = *sq.head; 429 430 { 431 const uint32_t num = *sq.num; 432 for( i; num ) { 433 sq.sqes[i].user_data = 0ul64; 434 } 435 } 436 437 (sq.lock){}; 438 (sq.release_lock){}; 439 440 if( params_in.poller_submits || params_in.eager_submits ) { 441 /* paranoid */ verify( is_pow2( params_in.num_ready ) || (params_in.num_ready < 8) ); 442 sq.ready_cnt = max( params_in.num_ready, 8 ); 443 sq.ready = alloc_align( 64, sq.ready_cnt ); 444 for(i; sq.ready_cnt) { 445 sq.ready[i] = -1ul32; 446 } 447 } 448 else { 449 sq.ready_cnt = 0; 450 sq.ready = 0p; 451 } 452 453 // completion queue 454 cq.head = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head); 455 cq.tail = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail); 456 cq.mask = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask); 457 cq.num = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries); 458 cq.overflow = ( uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow); 459 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes); 460 461 // some paranoid checks 462 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask ); 463 /* paranoid */ verifyf( (*cq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num ); 464 /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head ); 465 /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail ); 466 467 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask ); 468 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num ); 469 /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head ); 470 /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail ); 471 472 // Update the global ring info 473 this.ring_flags = params.flags; 474 this.fd = fd; 475 this.eager_submits = params_in.eager_submits; 476 this.poller_submits = params_in.poller_submits; 477 } 478 479 void __io_destroy( __io_data & this ) { 480 // Shutdown the io rings 481 struct __submition_data & sq = this.submit_q; 482 struct __completion_data & cq = this.completion_q; 483 484 // unmap the submit queue entries 485 munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe)); 486 487 // unmap the Submit Queue ring 488 munmap(sq.ring_ptr, sq.ring_sz); 489 490 // unmap the Completion Queue ring, if it is different 491 if (cq.ring_ptr != sq.ring_ptr) { 492 munmap(cq.ring_ptr, cq.ring_sz); 493 } 494 495 // close the file descriptor 496 close(this.fd); 497 498 free( this.submit_q.ready ); // Maybe null, doesn't matter 499 } 500 501 int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) { 39 #include "stats.hfa" 40 #include "kernel.hfa" 41 #include "kernel/fwd.hfa" 42 #include "io/types.hfa" 43 44 //============================================================================================= 45 // I/O Syscall 46 //============================================================================================= 47 static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) { 502 48 bool need_sys_to_submit = false; 503 49 bool need_sys_to_complete = false; … … 618 164 void main( $io_ctx_thread & this ) { 619 165 epoll_event ev; 620 ev.events = EPOLLIN | EPOLLONESHOT; 621 ev.data.u64 = (uint64_t)&this; 622 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, this.ring->fd, &ev); 623 if (ret < 0) { 624 abort( "KERNEL ERROR: EPOLL ADD - (%d) %s\n", (int)errno, strerror(errno) ); 625 } 166 __ioctx_register( this, ev ); 626 167 627 168 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring); … … 654 195 reset = 0; 655 196 656 // wake up the slow poller 657 ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, this.ring->fd, &ev); 658 if (ret < 0) { 659 abort( "KERNEL ERROR: EPOLL REARM - (%d) %s\n", (int)errno, strerror(errno) ); 660 } 661 662 // park this thread 197 // block this thread 198 __ioctx_prepare_block( this, ev ); 663 199 wait( this.sem ); 664 200 } … … 933 469 return count; 934 470 } 935 936 //=============================================================================================937 // I/O Submissions938 //=============================================================================================939 940 void register_fixed_files( io_context & ctx, int * files, unsigned count ) {941 int ret = syscall( __NR_io_uring_register, ctx.thrd.ring->fd, IORING_REGISTER_FILES, files, count );942 if( ret < 0 ) {943 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );944 }945 946 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );947 }948 949 void register_fixed_files( cluster & cltr, int * files, unsigned count ) {950 for(i; cltr.io.cnt) {951 register_fixed_files( cltr.io.ctxs[i], files, count );952 }953 }954 471 #endif
Note: See TracChangeset
for help on using the changeset viewer.