Changeset d34575b for libcfa/src/concurrency
- Timestamp:
- Jul 11, 2020, 6:41:48 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- a3d3efc
- Parents:
- fc9bb79 (diff), 7922158 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- libcfa/src/concurrency
- Files:
-
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
rfc9bb79 rd34575b 14 14 // 15 15 16 // #define __CFA_DEBUG_PRINT_IO__ 17 // #define __CFA_DEBUG_PRINT_IO_CORE__ 16 #if defined(__CFA_DEBUG__) 17 // #define __CFA_DEBUG_PRINT_IO__ 18 #define __CFA_DEBUG_PRINT_IO_CORE__ 19 #endif 18 20 19 21 #include "kernel.hfa" … … 109 111 volatile uint32_t * head; 110 112 volatile uint32_t * tail; 113 volatile uint32_t prev_head; 111 114 112 115 // The actual kernel ring which uses head/tail … … 129 132 130 133 __spinlock_t lock; 134 __spinlock_t release_lock; 131 135 132 136 // A buffer of sqes (not the actual ring) … … 182 186 //============================================================================================= 183 187 void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) { 188 if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) { 189 abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n"); 190 } 191 184 192 this.io = malloc(); 185 193 … … 187 195 struct io_uring_params params; 188 196 memset(¶ms, 0, sizeof(params)); 197 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS ) params.flags |= IORING_SETUP_SQPOLL; 198 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES ) params.flags |= IORING_SETUP_IOPOLL; 189 199 190 200 uint32_t nentries = entries_per_cluster(); … … 208 218 // adjust the size according to the parameters 209 219 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 210 cq ->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz);220 cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz); 211 221 } 212 222 #endif … … 222 232 // mmap the Completion Queue into existence (may or may not be needed) 223 233 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 224 cq ->ring_ptr = sq->ring_ptr;234 cq.ring_ptr = sq.ring_ptr; 225 235 } 226 236 else … … 253 263 sq.dropped = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 254 264 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 265 sq.prev_head = *sq.head; 255 266 256 267 { … … 261 272 } 262 273 263 if( io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { 274 (sq.lock){}; 275 (sq.release_lock){}; 276 277 if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) { 264 278 /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8) ); 265 279 sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8); … … 313 327 314 328 // Create the poller thread 315 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for clu ter %p\n", &this);329 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this); 316 330 this.io->poller.slow.blocked = false; 317 331 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this ); … … 418 432 } 419 433 434 int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) { 435 bool need_sys_to_submit = false; 436 bool need_sys_to_complete = false; 437 unsigned min_complete = 0; 438 unsigned flags = 0; 439 440 441 TO_SUBMIT: 442 if( to_submit > 0 ) { 443 if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 444 need_sys_to_submit = true; 445 break TO_SUBMIT; 446 } 447 if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) { 448 need_sys_to_submit = true; 449 flags |= IORING_ENTER_SQ_WAKEUP; 450 } 451 } 452 453 TO_COMPLETE: 454 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 455 flags |= IORING_ENTER_GETEVENTS; 456 if( mask ) { 457 need_sys_to_complete = true; 458 min_complete = 1; 459 break TO_COMPLETE; 460 } 461 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) { 462 need_sys_to_complete = true; 463 } 464 } 465 466 int ret = 0; 467 if( need_sys_to_submit || need_sys_to_complete ) { 468 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8); 469 if( ret < 0 ) { 470 switch((int)errno) { 471 case EAGAIN: 472 case EINTR: 473 ret = -1; 474 break; 475 default: 476 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 477 } 478 } 479 } 480 481 // Memory barrier 482 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 483 return ret; 484 } 485 420 486 //============================================================================================= 421 487 // I/O Polling 422 488 //============================================================================================= 489 static unsigned __collect_submitions( struct __io_data & ring ); 490 static uint32_t __release_consumed_submission( struct __io_data & ring ); 491 423 492 // Process a single completion message from the io_uring 424 493 // This is NOT thread-safe 425 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) { 494 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) { 495 /* paranoid */ verify( !kernelTLS.preemption_state.enabled ); 496 426 497 unsigned to_submit = 0; 427 498 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { 428 429 499 // If the poller thread also submits, then we need to aggregate the submissions which are ready 430 uint32_t tail = *ring.submit_q.tail; 431 const uint32_t mask = *ring.submit_q.mask; 432 433 // Go through the list of ready submissions 434 for( i; ring.submit_q.ready_cnt ) { 435 // replace any submission with the sentinel, to consume it. 436 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); 437 438 // If it was already the sentinel, then we are done 439 if( idx == -1ul32 ) continue; 440 441 // If we got a real submission, append it to the list 442 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask; 443 to_submit++; 444 } 445 446 // Increment the tail based on how many we are ready to submit 447 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST); 448 } 449 450 const uint32_t smask = *ring.submit_q.mask; 451 uint32_t shead = *ring.submit_q.head; 452 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 500 to_submit = __collect_submitions( ring ); 501 } 502 503 int ret = __io_uring_enter(ring, to_submit, true, mask); 453 504 if( ret < 0 ) { 454 switch((int)errno) { 455 case EAGAIN: 456 case EINTR: 457 return -EAGAIN; 458 default: 459 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 460 } 505 return [0, true]; 506 } 507 508 // update statistics 509 if (to_submit > 0) { 510 __STATS__( true, 511 if( to_submit > 0 ) { 512 io.submit_q.submit_avg.rdy += to_submit; 513 io.submit_q.submit_avg.csm += ret; 514 io.submit_q.submit_avg.cnt += 1; 515 } 516 ) 461 517 } 462 518 463 519 // Release the consumed SQEs 464 for( i; ret ) { 465 uint32_t idx = ring.submit_q.array[ (i + shead) & smask ]; 466 ring.submit_q.sqes[ idx ].user_data = 0; 467 } 468 469 uint32_t avail = 0; 470 uint32_t sqe_num = *ring.submit_q.num; 471 for(i; sqe_num) { 472 if( ring.submit_q.sqes[ i ].user_data == 0 ) avail++; 473 } 474 475 // update statistics 476 #if !defined(__CFA_NO_STATISTICS__) 477 __tls_stats()->io.submit_q.submit_avg.rdy += to_submit; 478 __tls_stats()->io.submit_q.submit_avg.csm += ret; 479 __tls_stats()->io.submit_q.submit_avg.avl += avail; 480 __tls_stats()->io.submit_q.submit_avg.cnt += 1; 481 #endif 520 __release_consumed_submission( ring ); 482 521 483 522 // Drain the queue … … 486 525 const uint32_t mask = *ring.completion_q.mask; 487 526 488 // Memory barrier489 __atomic_thread_fence( __ATOMIC_SEQ_CST );490 491 527 // Nothing was new return 0 492 528 if (head == tail) { 493 return 0;529 return [0, to_submit > 0]; 494 530 } 495 531 496 532 uint32_t count = tail - head; 533 /* paranoid */ verify( count != 0 ); 497 534 for(i; count) { 498 535 unsigned idx = (head + i) & mask; … … 505 542 506 543 data->result = cqe.res; 507 if(! in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }508 else 544 if(!mask) { unpark( data->thrd __cfaabi_dbg_ctx2 ); } 545 else { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); } 509 546 } 510 547 … … 554 591 int count; 555 592 bool again; 556 [count, again] = __drain_io( ring, &mask , 1, true);593 [count, again] = __drain_io( ring, &mask ); 557 594 558 595 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST ); 559 596 560 597 // Update statistics 561 #if !defined(__CFA_NO_STATISTICS__)562 __tls_stats()->io.complete_q.completed_avg.val += count;563 __tls_stats()->io.complete_q.completed_avg.slow_cnt += 1;564 #endif598 __STATS__( true, 599 io.complete_q.completed_avg.val += count; 600 io.complete_q.completed_avg.slow_cnt += 1; 601 ) 565 602 566 603 if(again) { … … 576 613 int count; 577 614 bool again; 578 [count, again] = __drain_io( ring, &mask , 0, true);615 [count, again] = __drain_io( ring, &mask ); 579 616 580 617 // Update statistics 581 #if !defined(__CFA_NO_STATISTICS__)582 __tls_stats()->io.complete_q.completed_avg.val += count;583 __tls_stats()->io.complete_q.completed_avg.slow_cnt += 1;584 #endif618 __STATS__( true, 619 io.complete_q.completed_avg.val += count; 620 io.complete_q.completed_avg.slow_cnt += 1; 621 ) 585 622 } 586 623 } … … 614 651 bool again; 615 652 disable_interrupts(); 616 [count, again] = __drain_io( *this.ring, 0p , 0, false);653 [count, again] = __drain_io( *this.ring, 0p ); 617 654 618 655 if(!again) reset++; 619 656 620 657 // Update statistics 621 #if !defined(__CFA_NO_STATISTICS__)622 __tls_stats()->io.complete_q.completed_avg.val += count;623 __tls_stats()->io.complete_q.completed_avg.fast_cnt += 1;624 #endif658 __STATS__( true, 659 io.complete_q.completed_avg.val += count; 660 io.complete_q.completed_avg.fast_cnt += 1; 661 ) 625 662 enable_interrupts( __cfaabi_dbg_ctx ); 626 663 … … 658 695 659 696 // Submition steps : 660 // 1 - We need to make sure we don't overflow any of the buffer, P(ring.submit) to make sure 661 // entries are available. The semaphore make sure that there is no more operations in 662 // progress then the number of entries in the buffer. This probably limits concurrency 663 // more than necessary since submitted but not completed operations don't need any 664 // entries in user space. However, I don't know what happens if we overflow the buffers 665 // because too many requests completed at once. This is a safe approach in all cases. 666 // Furthermore, with hundreds of entries, this may be okay. 667 // 668 // 2 - Allocate a queue entry. The ring already has memory for all entries but only the ones 697 // 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones 669 698 // listed in sq.array are visible by the kernel. For those not listed, the kernel does not 670 699 // offer any assurance that an entry is not being filled by multiple flags. Therefore, we 671 700 // need to write an allocator that allows allocating concurrently. 672 701 // 673 // 3- Actually fill the submit entry, this is the only simple and straightforward step.702 // 2 - Actually fill the submit entry, this is the only simple and straightforward step. 674 703 // 675 // 4- Append the entry index to the array and adjust the tail accordingly. This operation704 // 3 - Append the entry index to the array and adjust the tail accordingly. This operation 676 705 // needs to arrive to two concensus at the same time: 677 706 // A - The order in which entries are listed in the array: no two threads must pick the … … 682 711 683 712 [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) { 684 verify( data != 0 ); 685 713 /* paranoid */ verify( data != 0 ); 686 714 687 715 // Prepare the data we need … … 708 736 { 709 737 // update statistics 710 #if !defined(__CFA_NO_STATISTICS__) 711 disable_interrupts(); 712 __tls_stats()->io.submit_q.alloc_avg.val += len; 713 __tls_stats()->io.submit_q.alloc_avg.block += block; 714 __tls_stats()->io.submit_q.alloc_avg.cnt += 1; 715 enable_interrupts( __cfaabi_dbg_ctx ); 716 #endif 738 __STATS__( false, 739 io.submit_q.alloc_avg.val += len; 740 io.submit_q.alloc_avg.block += block; 741 io.submit_q.alloc_avg.cnt += 1; 742 ) 717 743 718 744 … … 757 783 758 784 block++; 759 yield(); 785 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) { 786 __release_consumed_submission( ring ); 787 unlock( ring.submit_q.lock ); 788 } 789 else { 790 yield(); 791 } 760 792 } 761 793 762 794 // update statistics 763 #if !defined(__CFA_NO_STATISTICS__) 764 disable_interrupts(); 765 __tls_stats()->io.submit_q.look_avg.val += len; 766 __tls_stats()->io.submit_q.look_avg.block += block; 767 __tls_stats()->io.submit_q.look_avg.cnt += 1; 768 enable_interrupts( __cfaabi_dbg_ctx ); 769 #endif 795 __STATS__( false, 796 io.submit_q.look_avg.val += len; 797 io.submit_q.look_avg.block += block; 798 io.submit_q.look_avg.cnt += 1; 799 ) 770 800 771 801 return picked; … … 780 810 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { 781 811 // If the poller thread submits, then we just need to add this to the ready array 782 783 812 __submit_to_ready_array( ring, idx, mask ); 784 813 … … 786 815 787 816 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() ); 817 } 818 else if( ring.cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS ) { 819 uint32_t picked = __submit_to_ready_array( ring, idx, mask ); 820 821 for() { 822 yield(); 823 824 // If some one else collected our index, we are done 825 #warning ABA problem 826 if( ring.submit_q.ready[picked] != idx ) { 827 __STATS__( false, 828 io.submit_q.helped += 1; 829 ) 830 return; 831 } 832 833 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) { 834 __STATS__( false, 835 io.submit_q.leader += 1; 836 ) 837 break; 838 } 839 840 __STATS__( false, 841 io.submit_q.busy += 1; 842 ) 843 } 844 845 // We got the lock 846 unsigned to_submit = __collect_submitions( ring ); 847 int ret = __io_uring_enter( ring, to_submit, false, 0p ); 848 if( ret < 0 ) { 849 unlock(ring.submit_q.lock); 850 return; 851 } 852 853 /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) ); 854 855 // Release the consumed SQEs 856 __release_consumed_submission( ring ); 857 858 // update statistics 859 __STATS__( true, 860 io.submit_q.submit_avg.rdy += to_submit; 861 io.submit_q.submit_avg.csm += ret; 862 io.submit_q.submit_avg.cnt += 1; 863 ) 864 865 unlock(ring.submit_q.lock); 788 866 } 789 867 else { … … 791 869 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 792 870 871 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0, 872 /* paranoid */ "index %u already reclaimed\n" 873 /* paranoid */ "head %u, prev %u, tail %u\n" 874 /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n", 875 /* paranoid */ idx, 876 /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail 877 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ] 878 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ] 879 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ] 880 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ] 881 /* paranoid */ ); 882 793 883 // Append to the list of ready entries 794 884 795 885 /* paranoid */ verify( idx <= mask ); 796 797 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 886 ring.submit_q.array[ (*tail) & mask ] = idx; 798 887 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 799 888 800 889 // Submit however, many entries need to be submitted 801 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);890 int ret = __io_uring_enter( ring, 1, false, 0p ); 802 891 if( ret < 0 ) { 803 892 switch((int)errno) { … … 808 897 809 898 // update statistics 810 #if !defined(__CFA_NO_STATISTICS__) 811 __tls_stats()->io.submit_q.submit_avg.csm += 1; 812 __tls_stats()->io.submit_q.submit_avg.cnt += 1; 813 #endif 814 815 ring.submit_q.sqes[ idx & mask ].user_data = 0; 899 __STATS__( false, 900 io.submit_q.submit_avg.csm += 1; 901 io.submit_q.submit_avg.cnt += 1; 902 ) 903 904 // Release the consumed SQEs 905 __release_consumed_submission( ring ); 816 906 817 907 unlock(ring.submit_q.lock); … … 820 910 } 821 911 } 912 913 static unsigned __collect_submitions( struct __io_data & ring ) { 914 /* paranoid */ verify( ring.submit_q.ready != 0p ); 915 /* paranoid */ verify( ring.submit_q.ready_cnt > 0 ); 916 917 unsigned to_submit = 0; 918 uint32_t tail = *ring.submit_q.tail; 919 const uint32_t mask = *ring.submit_q.mask; 920 921 // Go through the list of ready submissions 922 for( i; ring.submit_q.ready_cnt ) { 923 // replace any submission with the sentinel, to consume it. 924 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); 925 926 // If it was already the sentinel, then we are done 927 if( idx == -1ul32 ) continue; 928 929 // If we got a real submission, append it to the list 930 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask; 931 to_submit++; 932 } 933 934 // Increment the tail based on how many we are ready to submit 935 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST); 936 937 return to_submit; 938 } 939 940 static uint32_t __release_consumed_submission( struct __io_data & ring ) { 941 const uint32_t smask = *ring.submit_q.mask; 942 943 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0; 944 uint32_t chead = *ring.submit_q.head; 945 uint32_t phead = ring.submit_q.prev_head; 946 ring.submit_q.prev_head = chead; 947 unlock(ring.submit_q.release_lock); 948 949 uint32_t count = chead - phead; 950 for( i; count ) { 951 uint32_t idx = ring.submit_q.array[ (phead + i) & smask ]; 952 ring.submit_q.sqes[ idx ].user_data = 0; 953 } 954 return count; 955 } 956 957 //============================================================================================= 958 // I/O Submissions 959 //============================================================================================= 960 961 void register_fixed_files( cluster & cl, int * files, unsigned count ) { 962 int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count ); 963 if( ret < 0 ) { 964 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 965 } 966 967 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret ); 968 } 822 969 #endif -
libcfa/src/concurrency/iocall.cfa
rfc9bb79 rd34575b 108 108 109 109 extern ssize_t read (int fd, void *buf, size_t count); 110 111 extern ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags); 112 extern ssize_t tee(int fd_in, int fd_out, size_t len, unsigned int flags); 110 113 } 111 114 … … 128 131 #endif 129 132 } 133 134 ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 135 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READV) 136 return preadv2(fd, iov, iovcnt, offset, flags); 137 #else 138 __submit_prelude 139 140 (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset }; 141 sqe->flags |= IOSQE_FIXED_FILE; 142 143 __submit_wait 144 #endif 145 } 130 146 #endif 131 147 … … 329 345 } 330 346 331 332 347 ssize_t cfa_read(int fd, void *buf, size_t count) { 333 348 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READ) … … 349 364 350 365 (*sqe){ IORING_OP_WRITE, fd, buf, count, 0 }; 366 367 __submit_wait 368 #endif 369 } 370 371 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags) { 372 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SPLICE) 373 return splice( fd_in, off_in, fd_out, off_out, len, flags ); 374 #else 375 __submit_prelude 376 377 (*sqe){ IORING_OP_SPLICE, fd_out, 0p, len, off_out }; 378 sqe->splice_fd_in = fd_in; 379 sqe->splice_off_in = off_in; 380 sqe->splice_flags = flags; 381 382 __submit_wait 383 #endif 384 } 385 386 ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags) { 387 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_TEE) 388 return tee( fd_in, fd_out, len, flags ); 389 #else 390 __submit_prelude 391 392 (*sqe){ IORING_OP_TEE, fd_out, 0p, len, 0 }; 393 sqe->splice_fd_in = fd_in; 394 sqe->splice_flags = flags; 351 395 352 396 __submit_wait … … 453 497 #define _CFA_IO_FEATURE_IORING_OP_WRITE , 454 498 return IS_DEFINED(IORING_OP_WRITE); 499 500 if( /*func == (fptr_t)splice || */ 501 func == (fptr_t)cfa_splice ) 502 #define _CFA_IO_FEATURE_IORING_OP_SPLICE , 503 return IS_DEFINED(IORING_OP_SPLICE); 504 505 if( /*func == (fptr_t)tee || */ 506 func == (fptr_t)cfa_tee ) 507 #define _CFA_IO_FEATURE_IORING_OP_TEE , 508 return IS_DEFINED(IORING_OP_TEE); 455 509 #endif 456 510 -
libcfa/src/concurrency/kernel.hfa
rfc9bb79 rd34575b 129 129 struct __io_data; 130 130 131 #define CFA_CLUSTER_IO_POLLER_USER_THREAD 1 << 0 // 0x1 132 #define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS 1 << 1 // 0x2 133 // #define CFA_CLUSTER_IO_POLLER_KERNEL_SIDE 1 << 2 // 0x4 131 #define CFA_CLUSTER_IO_POLLER_USER_THREAD (1 << 0) // 0x01 132 #define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS (1 << 1) // 0x02 133 #define CFA_CLUSTER_IO_EAGER_SUBMITS (1 << 2) // 0x04 134 #define CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS (1 << 3) // 0x08 135 #define CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES (1 << 4) // 0x10 134 136 #define CFA_CLUSTER_IO_BUFFLEN_OFFSET 16 135 137 -
libcfa/src/concurrency/kernel_private.hfa
rfc9bb79 rd34575b 286 286 // Statics call at the end of each thread to register statistics 287 287 #if !defined(__CFA_NO_STATISTICS__) 288 static inline struct __stats_t * __tls_stats() { 289 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 290 /* paranoid */ verify( kernelTLS.this_stats ); 291 return kernelTLS.this_stats; 292 } 288 static inline struct __stats_t * __tls_stats() { 289 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 290 /* paranoid */ verify( kernelTLS.this_stats ); 291 return kernelTLS.this_stats; 292 } 293 294 #define __STATS__(in_kernel, ...) { \ 295 if( !(in_kernel) ) disable_interrupts(); \ 296 with( *__tls_stats() ) { \ 297 __VA_ARGS__ \ 298 } \ 299 if( !(in_kernel) ) enable_interrupts( __cfaabi_dbg_ctx ); \ 300 } 301 #else 302 #define __STATS__(in_kernel, ...) 293 303 #endif 294 304 -
libcfa/src/concurrency/preemption.cfa
rfc9bb79 rd34575b 186 186 void enable_interrupts( __cfaabi_dbg_ctx_param ) { 187 187 processor * proc = kernelTLS.this_processor; // Cache the processor now since interrupts can start happening after the atomic store 188 /* paranoid */ verify( proc ); 188 189 189 190 with( kernelTLS.preemption_state ){ -
libcfa/src/concurrency/stats.cfa
rfc9bb79 rd34575b 27 27 stats->io.submit_q.submit_avg.rdy = 0; 28 28 stats->io.submit_q.submit_avg.csm = 0; 29 stats->io.submit_q.submit_avg.avl = 0;30 29 stats->io.submit_q.submit_avg.cnt = 0; 31 30 stats->io.submit_q.look_avg.val = 0; … … 35 34 stats->io.submit_q.alloc_avg.cnt = 0; 36 35 stats->io.submit_q.alloc_avg.block = 0; 36 stats->io.submit_q.helped = 0; 37 stats->io.submit_q.leader = 0; 38 stats->io.submit_q.busy = 0; 37 39 stats->io.complete_q.completed_avg.val = 0; 38 40 stats->io.complete_q.completed_avg.slow_cnt = 0; … … 68 70 __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.cnt , proc->io.submit_q.alloc_avg.cnt , __ATOMIC_SEQ_CST ); 69 71 __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.block , proc->io.submit_q.alloc_avg.block , __ATOMIC_SEQ_CST ); 72 __atomic_fetch_add( &cltr->io.submit_q.helped , proc->io.submit_q.helped , __ATOMIC_SEQ_CST ); 73 __atomic_fetch_add( &cltr->io.submit_q.leader , proc->io.submit_q.leader , __ATOMIC_SEQ_CST ); 74 __atomic_fetch_add( &cltr->io.submit_q.busy , proc->io.submit_q.busy , __ATOMIC_SEQ_CST ); 70 75 __atomic_fetch_add( &cltr->io.complete_q.completed_avg.val , proc->io.complete_q.completed_avg.val , __ATOMIC_SEQ_CST ); 71 76 __atomic_fetch_add( &cltr->io.complete_q.completed_avg.slow_cnt, proc->io.complete_q.completed_avg.slow_cnt, __ATOMIC_SEQ_CST ); … … 120 125 double avgrdy = ((double)io.submit_q.submit_avg.rdy) / io.submit_q.submit_avg.cnt; 121 126 double avgcsm = ((double)io.submit_q.submit_avg.csm) / io.submit_q.submit_avg.cnt; 122 double avgavl = ((double)io.submit_q.submit_avg.avl) / io.submit_q.submit_avg.cnt;123 127 124 128 double lavgv = 0; … … 141 145 "- avg ready entries : %'18.2lf\n" 142 146 "- avg submitted entries : %'18.2lf\n" 143 "- avg available entries : %'18.2lf\n" 147 "- total helped entries : %'15" PRIu64 "\n" 148 "- total leader entries : %'15" PRIu64 "\n" 149 "- total busy submit : %'15" PRIu64 "\n" 144 150 "- total ready search : %'15" PRIu64 "\n" 145 151 "- avg ready search len : %'18.2lf\n" … … 153 159 , cluster ? "Cluster" : "Processor", name, id 154 160 , io.submit_q.submit_avg.cnt 155 , avgrdy, avgcsm, avgavl 161 , avgrdy, avgcsm 162 , io.submit_q.helped, io.submit_q.leader, io.submit_q.busy 156 163 , io.submit_q.look_avg.cnt 157 164 , lavgv, lavgb -
libcfa/src/concurrency/stats.hfa
rfc9bb79 rd34575b 83 83 volatile uint64_t block; 84 84 } alloc_avg; 85 volatile uint64_t helped; 86 volatile uint64_t leader; 87 volatile uint64_t busy; 85 88 } submit_q; 86 89 struct {
Note: See TracChangeset
for help on using the changeset viewer.