Changeset 6f121b8 for libcfa/src/concurrency/io.cfa
- Timestamp:
- May 21, 2020, 5:06:14 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 99fea48
- Parents:
- d47349b
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
rd47349b r6f121b8 124 124 125 125 // Like head/tail but not seen by the kernel 126 volatile uint32_t alloc;127 126 volatile uint32_t * ready; 128 127 uint32_t ready_cnt; … … 141 140 struct { 142 141 struct { 143 volatile unsigned long long int val; 142 volatile unsigned long long int rdy; 143 volatile unsigned long long int csm; 144 volatile unsigned long long int avl; 144 145 volatile unsigned long long int cnt; 145 volatile unsigned long long int block;146 146 } submit_avg; 147 147 struct { … … 150 150 volatile unsigned long long int block; 151 151 } look_avg; 152 struct { 153 volatile unsigned long long int val; 154 volatile unsigned long long int cnt; 155 volatile unsigned long long int block; 156 } alloc_avg; 152 157 } stats; 153 158 #endif … … 279 284 sq.dropped = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 280 285 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 281 sq.alloc = *sq.tail; 286 287 { 288 const uint32_t num = *sq.num; 289 for( i; num ) { 290 sq.sqes[i].user_data = 0ul64; 291 } 292 } 282 293 283 294 if( io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { … … 322 333 // Initialize statistics 323 334 #if !defined(__CFA_NO_STATISTICS__) 324 this.io->submit_q.stats.submit_avg.val = 0; 325 this.io->submit_q.stats.submit_avg.cnt = 0; 326 this.io->submit_q.stats.submit_avg.block = 0; 335 this.io->submit_q.stats.submit_avg.rdy = 0; 336 this.io->submit_q.stats.submit_avg.csm = 0; 337 this.io->submit_q.stats.submit_avg.avl = 0; 338 this.io->submit_q.stats.submit_avg.cnt = 0; 327 339 this.io->submit_q.stats.look_avg.val = 0; 328 340 this.io->submit_q.stats.look_avg.cnt = 0; 329 341 this.io->submit_q.stats.look_avg.block = 0; 342 this.io->submit_q.stats.alloc_avg.val = 0; 343 this.io->submit_q.stats.alloc_avg.cnt = 0; 344 this.io->submit_q.stats.alloc_avg.block = 0; 330 345 this.io->completion_q.stats.completed_avg.val = 0; 331 346 this.io->completion_q.stats.completed_avg.slow_cnt = 0; … … 383 398 this.ready_queue.head = 1p; 384 399 thrd.next = 0p; 400 __cfaabi_dbg_debug_do( thrd.unpark_stale = true ); 385 401 386 402 // Fixup the thread state … … 425 441 if(this.print_stats) { 426 442 with(this.io->submit_q.stats, this.io->completion_q.stats) { 443 double avgrdy = ((double)submit_avg.rdy) / submit_avg.cnt; 444 double avgcsm = ((double)submit_avg.csm) / submit_avg.cnt; 445 double avgavl = ((double)submit_avg.avl) / submit_avg.cnt; 446 427 447 double lavgv = 0; 428 448 double lavgb = 0; … … 432 452 } 433 453 454 double aavgv = 0; 455 double aavgb = 0; 456 if(alloc_avg.cnt != 0) { 457 aavgv = ((double)alloc_avg.val ) / alloc_avg.cnt; 458 aavgb = ((double)alloc_avg.block) / alloc_avg.cnt; 459 } 460 434 461 __cfaabi_bits_print_safe( STDOUT_FILENO, 435 462 "----- I/O uRing Stats -----\n" 436 463 "- total submit calls : %'15llu\n" 437 "- avg submit : %'18.2lf\n" 438 "- pre-submit block %% : %'18.2lf\n" 464 "- avg ready entries : %'18.2lf\n" 465 "- avg submitted entries : %'18.2lf\n" 466 "- avg available entries : %'18.2lf\n" 439 467 "- total ready search : %'15llu\n" 440 468 "- avg ready search len : %'18.2lf\n" 441 469 "- avg ready search block : %'18.2lf\n" 470 "- total alloc search : %'15llu\n" 471 "- avg alloc search len : %'18.2lf\n" 472 "- avg alloc search block : %'18.2lf\n" 442 473 "- total wait calls : %'15llu (%'llu slow, %'llu fast)\n" 443 474 "- avg completion/wait : %'18.2lf\n", 444 475 submit_avg.cnt, 445 ((double)submit_avg.val) / submit_avg.cnt, 446 (100.0 * submit_avg.block) / submit_avg.cnt, 476 avgrdy, 477 avgcsm, 478 avgavl, 447 479 look_avg.cnt, 448 480 lavgv, 449 481 lavgb, 482 alloc_avg.cnt, 483 aavgv, 484 aavgb, 450 485 completed_avg.slow_cnt + completed_avg.fast_cnt, 451 486 completed_avg.slow_cnt, completed_avg.fast_cnt, … … 493 528 494 529 // If the poller thread also submits, then we need to aggregate the submissions which are ready 495 uint32_t * tail =ring.submit_q.tail;530 uint32_t tail = *ring.submit_q.tail; 496 531 const uint32_t mask = *ring.submit_q.mask; 497 532 … … 505 540 506 541 // If we got a real submission, append it to the list 507 ring.submit_q.array[ ( (*tail)+ to_submit) & mask ] = idx & mask;542 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask; 508 543 to_submit++; 509 544 } 510 545 511 546 // Increment the tail based on how many we are ready to submit 512 __atomic_fetch_add(tail, to_submit, __ATOMIC_SEQ_CST); 513 514 // update statistics 515 #if !defined(__CFA_NO_STATISTICS__) 516 ring.submit_q.stats.submit_avg.val += to_submit; 517 ring.submit_q.stats.submit_avg.cnt += 1; 518 #endif 519 } 520 547 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST); 548 } 549 550 const uint32_t smask = *ring.submit_q.mask; 551 uint32_t shead = *ring.submit_q.head; 521 552 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 522 553 if( ret < 0 ) { … … 530 561 } 531 562 563 verify( (shead + ret) == *ring.submit_q.head ); 564 565 // Release the consumed SQEs 566 for( i; ret ) { 567 uint32_t idx = ring.submit_q.array[ (i + shead) & smask ]; 568 ring.submit_q.sqes[ idx ].user_data = 0; 569 } 570 571 uint32_t avail = 0; 572 uint32_t sqe_num = *ring.submit_q.num; 573 for(i; sqe_num) { 574 if( ring.submit_q.sqes[ i ].user_data == 0 ) avail++; 575 } 576 577 // update statistics 578 #if !defined(__CFA_NO_STATISTICS__) 579 ring.submit_q.stats.submit_avg.rdy += to_submit; 580 ring.submit_q.stats.submit_avg.csm += ret; 581 ring.submit_q.stats.submit_avg.avl += avail; 582 ring.submit_q.stats.submit_avg.cnt += 1; 583 #endif 584 532 585 // Drain the queue 533 586 unsigned head = *ring.completion_q.head; 534 unsigned tail = __atomic_load_n(ring.completion_q.tail, __ATOMIC_ACQUIRE); 587 unsigned tail = *ring.completion_q.tail; 588 const uint32_t mask = *ring.completion_q.mask; 589 590 // Memory barrier 591 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 535 592 536 593 // Nothing was new return 0 … … 541 598 uint32_t count = tail - head; 542 599 for(i; count) { 543 unsigned idx = (head + i) & (*ring.completion_q.mask);600 unsigned idx = (head + i) & mask; 544 601 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; 545 602 … … 555 612 556 613 // Allow new submissions to happen 557 V(ring.submit, count);614 // V(ring.submit, count); 558 615 559 616 // Mark to the kernel that the cqe has been seen 560 617 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 618 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 561 619 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED ); 562 620 … … 709 767 // 710 768 711 static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring ) { 712 // Wait for a spot to be available 713 __attribute__((unused)) bool blocked = P(ring.submit); 714 #if !defined(__CFA_NO_STATISTICS__) 715 __atomic_fetch_add( &ring.submit_q.stats.submit_avg.block, blocked ? 1ul64 : 0ul64, __ATOMIC_RELAXED ); 716 #endif 717 718 // Allocate the sqe 719 uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST); 720 721 // Mask the idx now to allow make everything easier to check 722 idx &= *ring.submit_q.mask; 723 724 // Return the sqe 725 return [&ring.submit_q.sqes[ idx ], idx]; 769 static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) { 770 verify( data != 0 ); 771 772 // Prepare the data we need 773 __attribute((unused)) int len = 0; 774 __attribute((unused)) int block = 0; 775 uint32_t cnt = *ring.submit_q.num; 776 uint32_t mask = *ring.submit_q.mask; 777 uint32_t off = __tls_rand(); 778 779 // Loop around looking for an available spot 780 LOOKING: for() { 781 // Look through the list starting at some offset 782 for(i; cnt) { 783 uint64_t expected = 0; 784 uint32_t idx = (i + off) & mask; 785 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 786 volatile uint64_t * udata = &sqe->user_data; 787 788 if( *udata == expected && 789 __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) 790 { 791 // update statistics 792 #if !defined(__CFA_NO_STATISTICS__) 793 __atomic_fetch_add( &ring.submit_q.stats.alloc_avg.val, len, __ATOMIC_RELAXED ); 794 __atomic_fetch_add( &ring.submit_q.stats.alloc_avg.block, block, __ATOMIC_RELAXED ); 795 __atomic_fetch_add( &ring.submit_q.stats.alloc_avg.cnt, 1, __ATOMIC_RELAXED ); 796 #endif 797 798 // Success return the data 799 return [sqe, idx]; 800 } 801 verify(expected != data); 802 803 len ++; 804 } 805 806 block++; 807 yield(); 808 } 726 809 } 727 810 … … 741 824 __attribute((unused)) int len = 0; 742 825 __attribute((unused)) int block = 0; 743 uint32_t expected = -1ul32;744 826 uint32_t ready_mask = ring.submit_q.ready_cnt - 1; 745 827 uint32_t off = __tls_rand(); … … 747 829 for(i; ring.submit_q.ready_cnt) { 748 830 uint32_t ii = (i + off) & ready_mask; 831 uint32_t expected = -1ul32; 749 832 if( __atomic_compare_exchange_n( &ring.submit_q.ready[ii], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) { 750 833 break LOOKING; 751 834 } 835 verify(expected != idx); 752 836 753 837 len ++; … … 791 875 // update statistics 792 876 #if !defined(__CFA_NO_STATISTICS__) 793 ring.submit_q.stats.submit_avg. val+= 1;877 ring.submit_q.stats.submit_avg.csm += 1; 794 878 ring.submit_q.stats.submit_avg.cnt += 1; 795 879 #endif … … 830 914 831 915 #define __submit_prelude \ 832 struct __io_data & ring = *active_cluster()->io; \ 916 io_user_data data = { 0, active_thread() }; \ 917 struct __io_data & ring = *data.thrd->curr_cluster->io; \ 833 918 struct io_uring_sqe * sqe; \ 834 919 uint32_t idx; \ 835 [sqe, idx] = __submit_alloc( ring );920 [sqe, idx] = __submit_alloc( ring, (uint64_t)&data ); 836 921 837 922 #define __submit_wait \ 838 io_user_data data = { 0, active_thread() }; \839 923 /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \ 840 sqe->user_data = (uint64_t)&data; \924 verify( sqe->user_data == (uint64_t)&data ); \ 841 925 __submit( ring, idx ); \ 842 926 park( __cfaabi_dbg_ctx ); \
Note: See TracChangeset
for help on using the changeset viewer.