Changeset 426f60c
- Timestamp:
- Jan 12, 2021, 12:34:08 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 58f99b3
- Parents:
- 77fde9d5
- Location:
- libcfa/src/concurrency
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r77fde9d5 r426f60c 32 32 extern "C" { 33 33 #include <sys/epoll.h> 34 #include <sys/eventfd.h> 34 35 #include <sys/syscall.h> 35 36 … … 41 42 #include "kernel/fwd.hfa" 42 43 #include "io/types.hfa" 44 45 static const char * opcodes[] = { 46 "OP_NOP", 47 "OP_READV", 48 "OP_WRITEV", 49 "OP_FSYNC", 50 "OP_READ_FIXED", 51 "OP_WRITE_FIXED", 52 "OP_POLL_ADD", 53 "OP_POLL_REMOVE", 54 "OP_SYNC_FILE_RANGE", 55 "OP_SENDMSG", 56 "OP_RECVMSG", 57 "OP_TIMEOUT", 58 "OP_TIMEOUT_REMOVE", 59 "OP_ACCEPT", 60 "OP_ASYNC_CANCEL", 61 "OP_LINK_TIMEOUT", 62 "OP_CONNECT", 63 "OP_FALLOCATE", 64 "OP_OPENAT", 65 "OP_CLOSE", 66 "OP_FILES_UPDATE", 67 "OP_STATX", 68 "OP_READ", 69 "OP_WRITE", 70 "OP_FADVISE", 71 "OP_MADVISE", 72 "OP_SEND", 73 "OP_RECV", 74 "OP_OPENAT2", 75 "OP_EPOLL_CTL", 76 "OP_SPLICE", 77 "OP_PROVIDE_BUFFERS", 78 "OP_REMOVE_BUFFERS", 79 "OP_TEE", 80 "INVALID_OP" 81 }; 43 82 44 83 // returns true of acquired as leader or second leader … … 159 198 static __u32 __release_consumed_submission( struct __io_data & ring ); 160 199 161 static inline void process(struct io_uring_cqe & cqe ) { 200 // Process a single completion message from the io_uring 201 // This is NOT thread-safe 202 static inline void process( volatile struct io_uring_cqe & cqe ) { 162 203 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 163 204 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); … … 166 207 } 167 208 168 // Process a single completion message from the io_uring169 // This is NOT thread-safe170 209 static [int, bool] __drain_io( & struct __io_data ring ) { 171 210 /* paranoid */ verify( ! __preemption_enabled() ); … … 193 232 } 194 233 234 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 235 195 236 // Release the consumed SQEs 196 237 __release_consumed_submission( ring ); … … 210 251 for(i; count) { 211 252 unsigned idx = (head + i) & mask; 212 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];253 volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; 213 254 214 255 /* paranoid */ verify(&cqe); … … 219 260 // Mark to the kernel that the cqe has been seen 220 261 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 221 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 222 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED ); 262 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST ); 223 263 224 264 return [count, count > 0 || to_submit > 0]; … … 229 269 __ioctx_register( this, ev ); 230 270 231 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller % p for ring %p ready\n", &this, &this.ring);271 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this); 232 272 233 273 const int reset_cnt = 5; … … 257 297 } 258 298 259 // We alread failed to find events a few time.299 // We alread failed to find completed entries a few time. 260 300 if(reset == 1) { 261 301 // Rearm the context so it can block … … 270 310 io.complete_q.blocks += 1; 271 311 ) 272 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller % p\n", &this.self);312 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this); 273 313 274 314 // block this thread 275 315 wait( this.sem ); 276 316 317 eventfd_t v; 318 eventfd_read(this.ring->efd, &v); 319 277 320 // restore counter 278 321 reset = reset_cnt; 279 322 } 280 323 281 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);324 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this); 282 325 } 283 326 … … 302 345 // 303 346 304 [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) { 347 // Allocate an submit queue entry. 348 // The kernel cannot see these entries until they are submitted, but other threads must be 349 // able to see which entries can be used and which are already un used by an other thread 350 // for convenience, return both the index and the pointer to the sqe 351 // sqe == &sqes[idx] 352 [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) { 305 353 /* paranoid */ verify( data != 0 ); 306 354 … … 317 365 // Look through the list starting at some offset 318 366 for(i; cnt) { 319 __u64 expected = 0;320 __u32 idx = (i + off) & mask; 321 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];367 __u64 expected = 3; 368 __u32 idx = (i + off) & mask; // Get an index from a random 369 volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 322 370 volatile __u64 * udata = &sqe->user_data; 323 371 372 // Allocate the entry by CASing the user_data field from 0 to the future address 324 373 if( *udata == expected && 325 374 __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) … … 332 381 ) 333 382 383 // debug log 334 384 __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data ); 335 385 336 386 // Success return the data 387 sqe->opcode = 0; 388 sqe->flags = 0; 389 sqe->ioprio = 0; 390 sqe->fd = 0; 391 sqe->off = 0; 392 sqe->addr = 0; 393 sqe->len = 0; 394 sqe->accept_flags = 0; 395 sqe->__pad2[0] = 0; 396 sqe->__pad2[1] = 0; 397 sqe->__pad2[2] = 0; 337 398 return [sqe, idx]; 338 399 } 339 400 verify(expected != data); 340 401 402 // This one was used 341 403 len ++; 342 404 } 343 405 344 406 block++; 407 408 abort( "Kernel I/O : all submit queue entries used, yielding\n" ); 409 345 410 yield(); 346 411 } … … 390 455 391 456 void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) { 392 __cfadbg_print_safe( io, "Kernel I/O : submitting %u for %p\n", idx, active_thread() );393 394 457 __io_data & ring = *ctx->thrd.ring; 458 459 { 460 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 461 __cfadbg_print_safe( io, 462 "Kernel I/O : submitting %u (%p) for %p\n" 463 " data: %p\n" 464 " opcode: %s\n" 465 " fd: %d\n" 466 " flags: %d\n" 467 " prio: %d\n" 468 " off: %p\n" 469 " addr: %p\n" 470 " len: %d\n" 471 " other flags: %d\n" 472 " splice fd: %d\n" 473 " pad[0]: %llu\n" 474 " pad[1]: %llu\n" 475 " pad[2]: %llu\n", 476 idx, sqe, 477 active_thread(), 478 (void*)sqe->user_data, 479 opcodes[sqe->opcode], 480 sqe->fd, 481 sqe->flags, 482 sqe->ioprio, 483 sqe->off, 484 sqe->addr, 485 sqe->len, 486 sqe->accept_flags, 487 sqe->splice_fd_in, 488 sqe->__pad2[0], 489 sqe->__pad2[1], 490 sqe->__pad2[2] 491 ); 492 } 493 494 395 495 // Get now the data we definetely need 396 496 volatile __u32 * const tail = ring.submit_q.tail; … … 475 575 __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() ); 476 576 } 477 else { 577 else 578 { 478 579 // get mutual exclusion 479 580 #if defined(LEADER_LOCK) … … 483 584 #endif 484 585 485 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,586 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64, 486 587 /* paranoid */ "index %u already reclaimed\n" 487 588 /* paranoid */ "head %u, prev %u, tail %u\n" … … 510 611 } 511 612 613 /* paranoid */ verify(ret == 1); 614 512 615 // update statistics 513 616 __STATS__( false, … … 516 619 ) 517 620 621 { 622 __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head; 623 __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ]; 624 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx]; 625 626 __cfadbg_print_safe( io, 627 "Kernel I/O : last submitted is %u (%p)\n" 628 " data: %p\n" 629 " opcode: %s\n" 630 " fd: %d\n" 631 " flags: %d\n" 632 " prio: %d\n" 633 " off: %p\n" 634 " addr: %p\n" 635 " len: %d\n" 636 " other flags: %d\n" 637 " splice fd: %d\n" 638 " pad[0]: %llu\n" 639 " pad[1]: %llu\n" 640 " pad[2]: %llu\n", 641 last_idx, sqe, 642 (void*)sqe->user_data, 643 opcodes[sqe->opcode], 644 sqe->fd, 645 sqe->flags, 646 sqe->ioprio, 647 sqe->off, 648 sqe->addr, 649 sqe->len, 650 sqe->accept_flags, 651 sqe->splice_fd_in, 652 sqe->__pad2[0], 653 sqe->__pad2[1], 654 sqe->__pad2[2] 655 ); 656 } 657 658 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 518 659 // Release the consumed SQEs 519 660 __release_consumed_submission( ring ); 661 // ring.submit_q.sqes[idx].user_data = 3ul64; 520 662 521 663 #if defined(LEADER_LOCK) … … 525 667 #endif 526 668 527 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret);669 __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() ); 528 670 } 529 671 } 530 672 531 673 // #define PARTIAL_SUBMIT 32 674 675 // go through the list of submissions in the ready array and moved them into 676 // the ring's submit queue 532 677 static unsigned __collect_submitions( struct __io_data & ring ) { 533 678 /* paranoid */ verify( ring.submit_q.ready != 0p ); … … 570 715 } 571 716 717 // Go through the ring's submit queue and release everything that has already been consumed 718 // by io_uring 572 719 static __u32 __release_consumed_submission( struct __io_data & ring ) { 573 720 const __u32 smask = *ring.submit_q.mask; 574 721 722 // We need to get the lock to copy the old head and new head 575 723 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0; 576 __u32 chead = *ring.submit_q.head; 577 __u32 phead = ring.submit_q.prev_head; 578 ring.submit_q.prev_head = chead; 724 __attribute__((unused)) 725 __u32 ctail = *ring.submit_q.tail; // get the current tail of the queue 726 __u32 chead = *ring.submit_q.head; // get the current head of the queue 727 __u32 phead = ring.submit_q.prev_head; // get the head the last time we were here 728 ring.submit_q.prev_head = chead; // note up to were we processed 579 729 unlock(ring.submit_q.release_lock); 580 730 731 // the 3 fields are organized like this diagram 732 // except it's are ring 733 // ---+--------+--------+---- 734 // ---+--------+--------+---- 735 // ^ ^ ^ 736 // phead chead ctail 737 738 // make sure ctail doesn't wrap around and reach phead 739 /* paranoid */ verify( 740 (ctail >= chead && chead >= phead) 741 || (chead >= phead && phead >= ctail) 742 || (phead >= ctail && ctail >= chead) 743 ); 744 745 // find the range we need to clear 581 746 __u32 count = chead - phead; 747 748 // We acquired an previous-head/current-head range 749 // go through the range and release the sqes 582 750 for( i; count ) { 583 751 __u32 idx = ring.submit_q.array[ (phead + i) & smask ]; 584 ring.submit_q.sqes[ idx ].user_data = 0; 752 753 /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data ); 754 ring.submit_q.sqes[ idx ].user_data = 3ul64; 585 755 } 586 756 return count; -
libcfa/src/concurrency/io/call.cfa.in
r77fde9d5 r426f60c 74 74 ; 75 75 76 extern [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data );76 extern [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ); 77 77 extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))); 78 78 … … 221 221 222 222 __u32 idx; 223 struct io_uring_sqe * sqe;223 volatile struct io_uring_sqe * sqe; 224 224 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future ); 225 225 … … 314 314 Call('ACCEPT', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', { 315 315 'fd': 'sockfd', 316 'ioprio': '0', 316 317 'addr': '(__u64)addr', 317 318 'addr2': '(__u64)addrlen', … … 373 374 Call('READ', 'ssize_t read(int fd, void * buf, size_t count)', { 374 375 'fd': 'fd', 376 'off': '0', 375 377 'addr': '(__u64)buf', 376 378 'len': 'count' … … 379 381 Call('WRITE', 'ssize_t write(int fd, void * buf, size_t count)', { 380 382 'fd': 'fd', 383 'off': '0', 381 384 'addr': '(__u64)buf', 382 385 'len': 'count' … … 477 480 478 481 __u32 idx; 479 struct io_uring_sqe * sqe;482 volatile struct io_uring_sqe * sqe; 480 483 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future ); 481 484 -
libcfa/src/concurrency/io/setup.cfa
r77fde9d5 r426f60c 52 52 #include <pthread.h> 53 53 #include <sys/epoll.h> 54 #include <sys/eventfd.h> 54 55 #include <sys/mman.h> 55 56 #include <sys/syscall.h> … … 185 186 $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64; 186 187 /* paranoid */ verify( io_ctx ); 187 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller % p\n", io_ctx);188 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->ring->fd, io_ctx); 188 189 #if !defined( __CFA_NO_STATISTICS__ ) 189 190 __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats; … … 309 310 } 310 311 312 extern void signal_unblock( int sig ); 313 extern void signal_block ( int sig ); 314 311 315 static void __io_create( __io_data & this, const io_context_params & params_in ) { 312 316 // Step 1 : call to setup … … 377 381 abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno)); 378 382 } 379 383 memset(sq.sqes, 0xde, size); 384 385 verify( 0 != (params.features & IORING_FEAT_NODROP) ); 386 387 // Step 3 : Initialize the data structure 380 388 // Get the pointers from the kernel to fill the structure 381 389 // submit queue … … 392 400 const __u32 num = *sq.num; 393 401 for( i; num ) { 394 sq.sqes[i].user_data = 0ul64; 402 sq.sqes[i].opcode = IORING_OP_LAST; 403 sq.sqes[i].user_data = 3ul64; 395 404 } 396 405 } … … 422 431 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes); 423 432 433 signal_block( SIGUSR1 ); 434 435 // Step 4 : eventfd 436 int efd = eventfd(0, 0); 437 if (efd < 0) { 438 abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno)); 439 } 440 441 int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1); 442 if (ret < 0) { 443 abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno)); 444 } 445 446 signal_unblock( SIGUSR1 ); 447 424 448 // some paranoid checks 425 449 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask ); … … 436 460 this.ring_flags = params.flags; 437 461 this.fd = fd; 462 this.efd = efd; 438 463 this.eager_submits = params_in.eager_submits; 439 464 this.poller_submits = params_in.poller_submits; … … 458 483 // close the file descriptor 459 484 close(this.fd); 485 close(this.efd); 460 486 461 487 free( this.submit_q.ready ); // Maybe null, doesn't matter … … 467 493 468 494 void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev) { 469 ev.events = EPOLLIN | EPOLL ONESHOT;495 ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT; 470 496 ev.data.u64 = (__u64)&ctx; 471 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, ctx.ring-> fd, &ev);497 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, ctx.ring->efd, &ev); 472 498 if (ret < 0) { 473 499 abort( "KERNEL ERROR: EPOLL ADD - (%d) %s\n", (int)errno, strerror(errno) ); … … 476 502 477 503 void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev) { 478 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller % p\n", &ctx);479 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, ctx.ring-> fd, &ev);504 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.ring->fd, &ctx); 505 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, ctx.ring->efd, &ev); 480 506 if (ret < 0) { 481 507 abort( "KERNEL ERROR: EPOLL REARM - (%d) %s\n", (int)errno, strerror(errno) ); -
libcfa/src/concurrency/io/types.hfa
r77fde9d5 r426f60c 65 65 66 66 // A buffer of sqes (not the actual ring) 67 struct io_uring_sqe * sqes;67 volatile struct io_uring_sqe * sqes; 68 68 69 69 // The location and size of the mmaped area … … 85 85 86 86 // the kernel ring 87 struct io_uring_cqe * cqes;87 volatile struct io_uring_cqe * cqes; 88 88 89 89 // The location and size of the mmaped area … … 97 97 __u32 ring_flags; 98 98 int fd; 99 int efd; 99 100 bool eager_submits:1; 100 101 bool poller_submits:1;
Note: See TracChangeset
for help on using the changeset viewer.