Changeset e46c753
- Timestamp:
- Jul 2, 2020, 4:17:51 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 8bb239d
- Parents:
- 8e9e9a2
- Location:
- libcfa/src/concurrency
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r8e9e9a2 re46c753 182 182 //============================================================================================= 183 183 void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) { 184 if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) { 185 abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n"); 186 } 187 184 188 this.io = malloc(); 185 189 … … 261 265 } 262 266 263 if( io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) {267 if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) { 264 268 /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8) ); 265 269 sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8); … … 423 427 // Process a single completion message from the io_uring 424 428 // This is NOT thread-safe 429 static unsigned __collect_submitions( struct __io_data & ring ); 425 430 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) { 431 /* paranoid */ verify( !kernelTLS.preemption_state.enabled ); 432 426 433 unsigned to_submit = 0; 427 434 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { 428 429 435 // If the poller thread also submits, then we need to aggregate the submissions which are ready 430 uint32_t tail = *ring.submit_q.tail; 431 const uint32_t mask = *ring.submit_q.mask; 432 433 // Go through the list of ready submissions 434 for( i; ring.submit_q.ready_cnt ) { 435 // replace any submission with the sentinel, to consume it. 436 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); 437 438 // If it was already the sentinel, then we are done 439 if( idx == -1ul32 ) continue; 440 441 // If we got a real submission, append it to the list 442 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask; 443 to_submit++; 444 } 445 446 // Increment the tail based on how many we are ready to submit 447 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST); 436 to_submit = __collect_submitions( ring ); 448 437 } 449 438 … … 455 444 case EAGAIN: 456 445 case EINTR: 457 return -EAGAIN;446 return [0, true]; 458 447 default: 459 448 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); … … 467 456 } 468 457 469 uint32_t avail = 0;470 uint32_t sqe_num = *ring.submit_q.num;471 for(i; sqe_num) {472 if( ring.submit_q.sqes[ i ].user_data == 0 ) avail++;473 }474 475 458 // update statistics 476 459 #if !defined(__CFA_NO_STATISTICS__) 477 __tls_stats()->io.submit_q.submit_avg.rdy += to_submit; 478 __tls_stats()->io.submit_q.submit_avg.csm += ret; 479 __tls_stats()->io.submit_q.submit_avg.avl += avail; 480 __tls_stats()->io.submit_q.submit_avg.cnt += 1; 460 if( to_submit > 0 ) { 461 __tls_stats()->io.submit_q.submit_avg.rdy += to_submit; 462 __tls_stats()->io.submit_q.submit_avg.csm += ret; 463 __tls_stats()->io.submit_q.submit_avg.cnt += 1; 464 } 481 465 #endif 482 466 … … 491 475 // Nothing was new return 0 492 476 if (head == tail) { 493 return 0;477 return [0, to_submit > 0]; 494 478 } 495 479 … … 576 560 int count; 577 561 bool again; 578 [count, again] = __drain_io( ring, &mask, 0, true );562 [count, again] = __drain_io( ring, &mask, 1, true ); 579 563 580 564 // Update statistics … … 658 642 659 643 // Submition steps : 660 // 1 - We need to make sure we don't overflow any of the buffer, P(ring.submit) to make sure 661 // entries are available. The semaphore make sure that there is no more operations in 662 // progress then the number of entries in the buffer. This probably limits concurrency 663 // more than necessary since submitted but not completed operations don't need any 664 // entries in user space. However, I don't know what happens if we overflow the buffers 665 // because too many requests completed at once. This is a safe approach in all cases. 666 // Furthermore, with hundreds of entries, this may be okay. 667 // 668 // 2 - Allocate a queue entry. The ring already has memory for all entries but only the ones 644 // 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones 669 645 // listed in sq.array are visible by the kernel. For those not listed, the kernel does not 670 646 // offer any assurance that an entry is not being filled by multiple flags. Therefore, we 671 647 // need to write an allocator that allows allocating concurrently. 672 648 // 673 // 3- Actually fill the submit entry, this is the only simple and straightforward step.649 // 2 - Actually fill the submit entry, this is the only simple and straightforward step. 674 650 // 675 // 4- Append the entry index to the array and adjust the tail accordingly. This operation651 // 3 - Append the entry index to the array and adjust the tail accordingly. This operation 676 652 // needs to arrive to two concensus at the same time: 677 653 // A - The order in which entries are listed in the array: no two threads must pick the … … 682 658 683 659 [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) { 684 verify( data != 0 ); 685 660 /* paranoid */ verify( data != 0 ); 686 661 687 662 // Prepare the data we need … … 762 737 // update statistics 763 738 #if !defined(__CFA_NO_STATISTICS__) 764 disable_interrupts();765 __tls_stats()->io.submit_q.look_avg.val += len;766 __tls_stats()->io.submit_q.look_avg.block += block;767 __tls_stats()->io.submit_q.look_avg.cnt += 1;768 enable_interrupts( __cfaabi_dbg_ctx );739 disable_interrupts(); 740 __tls_stats()->io.submit_q.look_avg.val += len; 741 __tls_stats()->io.submit_q.look_avg.block += block; 742 __tls_stats()->io.submit_q.look_avg.cnt += 1; 743 enable_interrupts( __cfaabi_dbg_ctx ); 769 744 #endif 770 745 … … 780 755 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { 781 756 // If the poller thread submits, then we just need to add this to the ready array 782 783 757 __submit_to_ready_array( ring, idx, mask ); 784 758 … … 786 760 787 761 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() ); 762 } 763 else if( ring.cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS ) { 764 uint32_t picked = __submit_to_ready_array( ring, idx, mask ); 765 766 for() { 767 yield(); 768 769 // If some one else collected our index, we are done 770 if( ring.submit_q.ready[picked] != idx ) { 771 #if !defined(__CFA_NO_STATISTICS__) 772 disable_interrupts(); 773 __tls_stats()->io.submit_q.helped += 1; 774 enable_interrupts( __cfaabi_dbg_ctx ); 775 #endif 776 return; 777 } 778 779 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) { 780 #if !defined(__CFA_NO_STATISTICS__) 781 __tls_stats()->io.submit_q.leader += 1; 782 #endif 783 break; 784 } 785 } 786 787 // We got the lock 788 unsigned to_submit = __collect_submitions( ring ); 789 // /* paranoid */ verify( to_submit > 0 ); 790 if( to_submit == 0 ) abort(); 791 792 const uint32_t smask = *ring.submit_q.mask; 793 uint32_t shead = *ring.submit_q.head; 794 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, 0, 0p, _NSIG / 8); 795 if( ret < 0 ) { 796 switch((int)errno) { 797 case EAGAIN: 798 case EINTR: 799 unlock(ring.submit_q.lock); 800 return; 801 default: 802 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 803 } 804 } 805 806 /* paranoid */ verify( ret > 0 ); 807 808 // Release the consumed SQEs 809 for( i; ret ) { 810 uint32_t idx = ring.submit_q.array[ (i + shead) & smask ]; 811 ring.submit_q.sqes[ idx ].user_data = 0; 812 } 813 814 // update statistics 815 #if !defined(__CFA_NO_STATISTICS__) 816 __tls_stats()->io.submit_q.submit_avg.rdy += to_submit; 817 __tls_stats()->io.submit_q.submit_avg.csm += ret; 818 __tls_stats()->io.submit_q.submit_avg.cnt += 1; 819 #endif 820 821 unlock(ring.submit_q.lock); 788 822 } 789 823 else { … … 809 843 // update statistics 810 844 #if !defined(__CFA_NO_STATISTICS__) 811 __tls_stats()->io.submit_q.submit_avg.csm += 1; 812 __tls_stats()->io.submit_q.submit_avg.cnt += 1; 845 disable_interrupts(); 846 abort(); 847 __tls_stats()->io.submit_q.submit_avg.csm += 1; 848 __tls_stats()->io.submit_q.submit_avg.cnt += 1; 849 enable_interrupts( __cfaabi_dbg_ctx ); 813 850 #endif 814 851 … … 820 857 } 821 858 } 859 860 static unsigned __collect_submitions( struct __io_data & ring ) { 861 /* paranoid */ verify( ring.submit_q.ready != 0p ); 862 /* paranoid */ verify( ring.submit_q.ready_cnt > 0 ); 863 864 unsigned to_submit = 0; 865 uint32_t tail = *ring.submit_q.tail; 866 const uint32_t mask = *ring.submit_q.mask; 867 868 // Go through the list of ready submissions 869 for( i; ring.submit_q.ready_cnt ) { 870 // replace any submission with the sentinel, to consume it. 871 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); 872 873 // If it was already the sentinel, then we are done 874 if( idx == -1ul32 ) continue; 875 876 // If we got a real submission, append it to the list 877 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask; 878 to_submit++; 879 } 880 881 // Increment the tail based on how many we are ready to submit 882 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST); 883 884 return to_submit; 885 } 822 886 #endif -
libcfa/src/concurrency/kernel.hfa
r8e9e9a2 re46c753 131 131 #define CFA_CLUSTER_IO_POLLER_USER_THREAD 1 << 0 // 0x1 132 132 #define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS 1 << 1 // 0x2 133 // #define CFA_CLUSTER_IO_POLLER_KERNEL_SIDE1 << 2 // 0x4133 #define CFA_CLUSTER_IO_EAGER_SUBMITS 1 << 2 // 0x4 134 134 #define CFA_CLUSTER_IO_BUFFLEN_OFFSET 16 135 135 -
libcfa/src/concurrency/stats.cfa
r8e9e9a2 re46c753 27 27 stats->io.submit_q.submit_avg.rdy = 0; 28 28 stats->io.submit_q.submit_avg.csm = 0; 29 stats->io.submit_q.submit_avg.avl = 0;30 29 stats->io.submit_q.submit_avg.cnt = 0; 31 30 stats->io.submit_q.look_avg.val = 0; … … 35 34 stats->io.submit_q.alloc_avg.cnt = 0; 36 35 stats->io.submit_q.alloc_avg.block = 0; 36 stats->io.submit_q.helped = 0; 37 stats->io.submit_q.leader = 0; 37 38 stats->io.complete_q.completed_avg.val = 0; 38 39 stats->io.complete_q.completed_avg.slow_cnt = 0; … … 68 69 __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.cnt , proc->io.submit_q.alloc_avg.cnt , __ATOMIC_SEQ_CST ); 69 70 __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.block , proc->io.submit_q.alloc_avg.block , __ATOMIC_SEQ_CST ); 71 __atomic_fetch_add( &cltr->io.submit_q.helped , proc->io.submit_q.helped , __ATOMIC_SEQ_CST ); 72 __atomic_fetch_add( &cltr->io.submit_q.leader , proc->io.submit_q.leader , __ATOMIC_SEQ_CST ); 70 73 __atomic_fetch_add( &cltr->io.complete_q.completed_avg.val , proc->io.complete_q.completed_avg.val , __ATOMIC_SEQ_CST ); 71 74 __atomic_fetch_add( &cltr->io.complete_q.completed_avg.slow_cnt, proc->io.complete_q.completed_avg.slow_cnt, __ATOMIC_SEQ_CST ); … … 120 123 double avgrdy = ((double)io.submit_q.submit_avg.rdy) / io.submit_q.submit_avg.cnt; 121 124 double avgcsm = ((double)io.submit_q.submit_avg.csm) / io.submit_q.submit_avg.cnt; 122 double avgavl = ((double)io.submit_q.submit_avg.avl) / io.submit_q.submit_avg.cnt;123 125 124 126 double lavgv = 0; … … 141 143 "- avg ready entries : %'18.2lf\n" 142 144 "- avg submitted entries : %'18.2lf\n" 143 "- avg available entries : %'18.2lf\n" 145 "- total helped entries : %'15" PRIu64 "\n" 146 "- total leader entries : %'15" PRIu64 "\n" 144 147 "- total ready search : %'15" PRIu64 "\n" 145 148 "- avg ready search len : %'18.2lf\n" … … 153 156 , cluster ? "Cluster" : "Processor", name, id 154 157 , io.submit_q.submit_avg.cnt 155 , avgrdy, avgcsm, avgavl 158 , avgrdy, avgcsm 159 , io.submit_q.helped, io.submit_q.leader 156 160 , io.submit_q.look_avg.cnt 157 161 , lavgv, lavgb -
libcfa/src/concurrency/stats.hfa
r8e9e9a2 re46c753 83 83 volatile uint64_t block; 84 84 } alloc_avg; 85 volatile uint64_t helped; 86 volatile uint64_t leader; 85 87 } submit_q; 86 88 struct {
Note: See TracChangeset
for help on using the changeset viewer.