Changeset 0e4df2e for libcfa/src/concurrency
- Timestamp:
- May 22, 2020, 11:49:29 AM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 95cb63b
- Parents:
- 2802824 (diff), 99fea48 (diff)
 Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
 Use the(diff)links above to see all the changes relative to each parent.
- Location:
- libcfa/src/concurrency
- Files:
- 
      - 2 edited
 
 - 
          
  io.cfa (modified) (18 diffs)
- 
          
  kernel.cfa (modified) (1 diff)
 
Legend:
- Unmodified
- Added
- Removed
- 
      libcfa/src/concurrency/io.cfar2802824 r0e4df2e 124 124 125 125 // Like head/tail but not seen by the kernel 126 volatile uint32_t alloc;127 126 volatile uint32_t * ready; 128 127 uint32_t ready_cnt; … … 141 140 struct { 142 141 struct { 143 volatile unsigned long long int val; 142 volatile unsigned long long int rdy; 143 volatile unsigned long long int csm; 144 volatile unsigned long long int avl; 144 145 volatile unsigned long long int cnt; 145 volatile unsigned long long int block;146 146 } submit_avg; 147 147 struct { … … 150 150 volatile unsigned long long int block; 151 151 } look_avg; 152 struct { 153 volatile unsigned long long int val; 154 volatile unsigned long long int cnt; 155 volatile unsigned long long int block; 156 } alloc_avg; 152 157 } stats; 153 158 #endif … … 279 284 sq.dropped = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 280 285 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 281 sq.alloc = *sq.tail; 286 287 { 288 const uint32_t num = *sq.num; 289 for( i; num ) { 290 sq.sqes[i].user_data = 0ul64; 291 } 292 } 282 293 283 294 if( io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { … … 322 333 // Initialize statistics 323 334 #if !defined(__CFA_NO_STATISTICS__) 324 this.io->submit_q.stats.submit_avg.val = 0; 325 this.io->submit_q.stats.submit_avg.cnt = 0; 326 this.io->submit_q.stats.submit_avg.block = 0; 335 this.io->submit_q.stats.submit_avg.rdy = 0; 336 this.io->submit_q.stats.submit_avg.csm = 0; 337 this.io->submit_q.stats.submit_avg.avl = 0; 338 this.io->submit_q.stats.submit_avg.cnt = 0; 327 339 this.io->submit_q.stats.look_avg.val = 0; 328 340 this.io->submit_q.stats.look_avg.cnt = 0; 329 341 this.io->submit_q.stats.look_avg.block = 0; 342 this.io->submit_q.stats.alloc_avg.val = 0; 343 this.io->submit_q.stats.alloc_avg.cnt = 0; 344 this.io->submit_q.stats.alloc_avg.block = 0; 330 345 this.io->completion_q.stats.completed_avg.val = 0; 331 346 this.io->completion_q.stats.completed_avg.slow_cnt = 0; … … 384 399 this.ready_queue.head = 1p; 385 400 thrd.next = 0p; 401 __cfaabi_dbg_debug_do( thrd.unpark_stale = true ); 386 402 387 403 // Fixup the thread state … … 426 442 if(this.print_stats) { 427 443 with(this.io->submit_q.stats, this.io->completion_q.stats) { 444 double avgrdy = ((double)submit_avg.rdy) / submit_avg.cnt; 445 double avgcsm = ((double)submit_avg.csm) / submit_avg.cnt; 446 double avgavl = ((double)submit_avg.avl) / submit_avg.cnt; 447 428 448 double lavgv = 0; 429 449 double lavgb = 0; … … 433 453 } 434 454 455 double aavgv = 0; 456 double aavgb = 0; 457 if(alloc_avg.cnt != 0) { 458 aavgv = ((double)alloc_avg.val ) / alloc_avg.cnt; 459 aavgb = ((double)alloc_avg.block) / alloc_avg.cnt; 460 } 461 435 462 __cfaabi_bits_print_safe( STDOUT_FILENO, 436 463 "----- I/O uRing Stats -----\n" 437 464 "- total submit calls : %'15llu\n" 438 "- avg submit : %'18.2lf\n" 439 "- pre-submit block %% : %'18.2lf\n" 465 "- avg ready entries : %'18.2lf\n" 466 "- avg submitted entries : %'18.2lf\n" 467 "- avg available entries : %'18.2lf\n" 440 468 "- total ready search : %'15llu\n" 441 469 "- avg ready search len : %'18.2lf\n" 442 470 "- avg ready search block : %'18.2lf\n" 471 "- total alloc search : %'15llu\n" 472 "- avg alloc search len : %'18.2lf\n" 473 "- avg alloc search block : %'18.2lf\n" 443 474 "- total wait calls : %'15llu (%'llu slow, %'llu fast)\n" 444 475 "- avg completion/wait : %'18.2lf\n", 445 476 submit_avg.cnt, 446 ((double)submit_avg.val) / submit_avg.cnt, 447 (100.0 * submit_avg.block) / submit_avg.cnt, 477 avgrdy, 478 avgcsm, 479 avgavl, 448 480 look_avg.cnt, 449 481 lavgv, 450 482 lavgb, 483 alloc_avg.cnt, 484 aavgv, 485 aavgb, 451 486 completed_avg.slow_cnt + completed_avg.fast_cnt, 452 487 completed_avg.slow_cnt, completed_avg.fast_cnt, … … 494 529 495 530 // If the poller thread also submits, then we need to aggregate the submissions which are ready 496 uint32_t * tail =ring.submit_q.tail;531 uint32_t tail = *ring.submit_q.tail; 497 532 const uint32_t mask = *ring.submit_q.mask; 498 533 … … 506 541 507 542 // If we got a real submission, append it to the list 508 ring.submit_q.array[ ( (*tail)+ to_submit) & mask ] = idx & mask;543 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask; 509 544 to_submit++; 510 545 } 511 546 512 547 // Increment the tail based on how many we are ready to submit 513 __atomic_fetch_add(tail, to_submit, __ATOMIC_SEQ_CST); 514 515 // update statistics 516 #if !defined(__CFA_NO_STATISTICS__) 517 ring.submit_q.stats.submit_avg.val += to_submit; 518 ring.submit_q.stats.submit_avg.cnt += 1; 519 #endif 520 } 521 548 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST); 549 } 550 551 const uint32_t smask = *ring.submit_q.mask; 552 uint32_t shead = *ring.submit_q.head; 522 553 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 523 554 if( ret < 0 ) { … … 531 562 } 532 563 564 verify( (shead + ret) == *ring.submit_q.head ); 565 566 // Release the consumed SQEs 567 for( i; ret ) { 568 uint32_t idx = ring.submit_q.array[ (i + shead) & smask ]; 569 ring.submit_q.sqes[ idx ].user_data = 0; 570 } 571 572 uint32_t avail = 0; 573 uint32_t sqe_num = *ring.submit_q.num; 574 for(i; sqe_num) { 575 if( ring.submit_q.sqes[ i ].user_data == 0 ) avail++; 576 } 577 578 // update statistics 579 #if !defined(__CFA_NO_STATISTICS__) 580 ring.submit_q.stats.submit_avg.rdy += to_submit; 581 ring.submit_q.stats.submit_avg.csm += ret; 582 ring.submit_q.stats.submit_avg.avl += avail; 583 ring.submit_q.stats.submit_avg.cnt += 1; 584 #endif 585 533 586 // Drain the queue 534 587 unsigned head = *ring.completion_q.head; 535 unsigned tail = __atomic_load_n(ring.completion_q.tail, __ATOMIC_ACQUIRE); 588 unsigned tail = *ring.completion_q.tail; 589 const uint32_t mask = *ring.completion_q.mask; 590 591 // Memory barrier 592 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 536 593 537 594 // Nothing was new return 0 … … 542 599 uint32_t count = tail - head; 543 600 for(i; count) { 544 unsigned idx = (head + i) & (*ring.completion_q.mask);601 unsigned idx = (head + i) & mask; 545 602 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; 546 603 … … 556 613 557 614 // Allow new submissions to happen 558 V(ring.submit, count);615 // V(ring.submit, count); 559 616 560 617 // Mark to the kernel that the cqe has been seen 561 618 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 619 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 562 620 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED ); 563 621 … … 710 768 // 711 769 712 static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring ) { 713 // Wait for a spot to be available 714 __attribute__((unused)) bool blocked = P(ring.submit); 715 #if !defined(__CFA_NO_STATISTICS__) 716 __atomic_fetch_add( &ring.submit_q.stats.submit_avg.block, blocked ? 1ul64 : 0ul64, __ATOMIC_RELAXED ); 717 #endif 718 719 // Allocate the sqe 720 uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST); 721 722 // Mask the idx now to allow make everything easier to check 723 idx &= *ring.submit_q.mask; 724 725 // Return the sqe 726 return [&ring.submit_q.sqes[ idx ], idx]; 770 static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) { 771 verify( data != 0 ); 772 773 // Prepare the data we need 774 __attribute((unused)) int len = 0; 775 __attribute((unused)) int block = 0; 776 uint32_t cnt = *ring.submit_q.num; 777 uint32_t mask = *ring.submit_q.mask; 778 uint32_t off = __tls_rand(); 779 780 // Loop around looking for an available spot 781 LOOKING: for() { 782 // Look through the list starting at some offset 783 for(i; cnt) { 784 uint64_t expected = 0; 785 uint32_t idx = (i + off) & mask; 786 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 787 volatile uint64_t * udata = &sqe->user_data; 788 789 if( *udata == expected && 790 __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) 791 { 792 // update statistics 793 #if !defined(__CFA_NO_STATISTICS__) 794 __atomic_fetch_add( &ring.submit_q.stats.alloc_avg.val, len, __ATOMIC_RELAXED ); 795 __atomic_fetch_add( &ring.submit_q.stats.alloc_avg.block, block, __ATOMIC_RELAXED ); 796 __atomic_fetch_add( &ring.submit_q.stats.alloc_avg.cnt, 1, __ATOMIC_RELAXED ); 797 #endif 798 799 // Success return the data 800 return [sqe, idx]; 801 } 802 verify(expected != data); 803 804 len ++; 805 } 806 807 block++; 808 yield(); 809 } 727 810 } 728 811 … … 742 825 __attribute((unused)) int len = 0; 743 826 __attribute((unused)) int block = 0; 744 uint32_t expected = -1ul32;745 827 uint32_t ready_mask = ring.submit_q.ready_cnt - 1; 746 828 uint32_t off = __tls_rand(); … … 748 830 for(i; ring.submit_q.ready_cnt) { 749 831 uint32_t ii = (i + off) & ready_mask; 832 uint32_t expected = -1ul32; 750 833 if( __atomic_compare_exchange_n( &ring.submit_q.ready[ii], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) { 751 834 break LOOKING; 752 835 } 836 verify(expected != idx); 753 837 754 838 len ++; … … 792 876 // update statistics 793 877 #if !defined(__CFA_NO_STATISTICS__) 794 ring.submit_q.stats.submit_avg. val+= 1;878 ring.submit_q.stats.submit_avg.csm += 1; 795 879 ring.submit_q.stats.submit_avg.cnt += 1; 796 880 #endif … … 831 915 832 916 #define __submit_prelude \ 833 struct __io_data & ring = *active_cluster()->io; \ 917 io_user_data data = { 0, active_thread() }; \ 918 struct __io_data & ring = *data.thrd->curr_cluster->io; \ 834 919 struct io_uring_sqe * sqe; \ 835 920 uint32_t idx; \ 836 [sqe, idx] = __submit_alloc( ring );921 [sqe, idx] = __submit_alloc( ring, (uint64_t)&data ); 837 922 838 923 #define __submit_wait \ 839 io_user_data data = { 0, active_thread() }; \840 924 /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \ 841 sqe->user_data = (uint64_t)&data; \925 verify( sqe->user_data == (uint64_t)&data ); \ 842 926 __submit( ring, idx ); \ 843 927 park( __cfaabi_dbg_ctx ); \ 
- 
      libcfa/src/concurrency/kernel.cfar2802824 r0e4df2e 648 648 649 649 // record activity 650 __cfaabi_dbg_debug_do( char * old_caller = thrd->unpark_caller; ) 650 651 __cfaabi_dbg_record_thrd( *thrd, false, caller ); 651 652 
  Note:
 See   TracChangeset
 for help on using the changeset viewer.
  