Changes in libcfa/src/concurrency/io.cfa [fb98462:20ab637]
- File:
-
- 1 edited
-
libcfa/src/concurrency/io.cfa (modified) (15 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
rfb98462 r20ab637 14 14 // 15 15 16 // #define __CFA_DEBUG_PRINT_IO__ 17 // #define __CFA_DEBUG_PRINT_IO_CORE__ 16 #if defined(__CFA_DEBUG__) 17 // #define __CFA_DEBUG_PRINT_IO__ 18 #define __CFA_DEBUG_PRINT_IO_CORE__ 19 #endif 18 20 19 21 #include "kernel.hfa" … … 216 218 // adjust the size according to the parameters 217 219 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 218 cq .ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz);220 cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz); 219 221 } 220 222 #endif … … 230 232 // mmap the Completion Queue into existence (may or may not be needed) 231 233 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 232 cq .ring_ptr = sq.ring_ptr;234 cq->ring_ptr = sq->ring_ptr; 233 235 } 234 236 else … … 325 327 326 328 // Create the poller thread 327 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for clu ter %p\n", &this);329 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this); 328 330 this.io->poller.slow.blocked = false; 329 331 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this ); … … 430 432 } 431 433 434 int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) { 435 bool need_sys_to_submit = false; 436 bool need_sys_to_complete = false; 437 unsigned min_complete = 0; 438 unsigned flags = 0; 439 440 441 TO_SUBMIT: 442 if( to_submit > 0 ) { 443 if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 444 need_sys_to_submit = true; 445 break TO_SUBMIT; 446 } 447 if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) { 448 need_sys_to_submit = true; 449 flags |= IORING_ENTER_SQ_WAKEUP; 450 } 451 } 452 453 TO_COMPLETE: 454 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 455 flags |= IORING_ENTER_GETEVENTS; 456 if( mask ) { 457 need_sys_to_complete = true; 458 min_complete = 1; 459 break TO_COMPLETE; 460 } 461 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) { 462 need_sys_to_complete = true; 463 } 464 } 465 466 int ret = 0; 467 if( need_sys_to_submit || need_sys_to_complete ) { 468 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8); 469 if( ret < 0 ) { 470 switch((int)errno) { 471 case EAGAIN: 472 case EINTR: 473 ret = -1; 474 break; 475 default: 476 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 477 } 478 } 479 } 480 481 // Memory barrier 482 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 483 return ret; 484 } 485 432 486 //============================================================================================= 433 487 // I/O Polling … … 438 492 // Process a single completion message from the io_uring 439 493 // This is NOT thread-safe 440 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask , int waitcnt, bool in_kernel) {494 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) { 441 495 /* paranoid */ verify( !kernelTLS.preemption_state.enabled ); 496 const uint32_t smask = *ring.submit_q.mask; 442 497 443 498 unsigned to_submit = 0; … … 447 502 } 448 503 449 if (to_submit > 0 || waitcnt > 0) { 450 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 451 if( ret < 0 ) { 452 switch((int)errno) { 453 case EAGAIN: 454 case EINTR: 455 return [0, true]; 456 default: 457 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 458 } 459 } 460 461 // Release the consumed SQEs 462 __release_consumed_submission( ring ); 463 464 // update statistics 504 int ret = __io_uring_enter(ring, to_submit, true, mask); 505 if( ret < 0 ) { 506 return [0, true]; 507 } 508 509 // update statistics 510 if (to_submit > 0) { 465 511 __STATS__( true, 466 512 if( to_submit > 0 ) { … … 472 518 } 473 519 474 // Memory barrier475 __ atomic_thread_fence( __ATOMIC_SEQ_CST);520 // Release the consumed SQEs 521 __release_consumed_submission( ring ); 476 522 477 523 // Drain the queue … … 497 543 498 544 data->result = cqe.res; 499 if(! in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }500 else { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); }545 if(!mask) { unpark( data->thrd __cfaabi_dbg_ctx2 ); } 546 else { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); } 501 547 } 502 548 … … 546 592 int count; 547 593 bool again; 548 [count, again] = __drain_io( ring, &mask , 1, true);594 [count, again] = __drain_io( ring, &mask ); 549 595 550 596 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST ); … … 568 614 int count; 569 615 bool again; 570 [count, again] = __drain_io( ring, &mask , 1, true);616 [count, again] = __drain_io( ring, &mask ); 571 617 572 618 // Update statistics … … 606 652 bool again; 607 653 disable_interrupts(); 608 [count, again] = __drain_io( *this.ring, 0p , 0, false);654 [count, again] = __drain_io( *this.ring, 0p ); 609 655 610 656 if(!again) reset++; … … 800 846 // We got the lock 801 847 unsigned to_submit = __collect_submitions( ring ); 802 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, 0, 0p, _NSIG / 8);848 int ret = __io_uring_enter( ring, to_submit, false, 0p ); 803 849 if( ret < 0 ) { 804 switch((int)errno) { 805 case EAGAIN: 806 case EINTR: 807 unlock(ring.submit_q.lock); 808 return; 809 default: 810 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 811 } 812 } 813 814 /* paranoid */ verify( ret > 0 ); 850 unlock(ring.submit_q.lock); 851 return; 852 } 853 854 /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) ); 815 855 816 856 // Release the consumed SQEs … … 830 870 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 831 871 872 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0, 873 /* paranoid */ "index %u already reclaimed\n" 874 /* paranoid */ "head %u, prev %u, tail %u\n" 875 /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n", 876 /* paranoid */ idx, 877 /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail 878 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ] 879 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ] 880 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ] 881 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ] 882 /* paranoid */ ); 883 832 884 // Append to the list of ready entries 833 885 834 886 /* paranoid */ verify( idx <= mask ); 835 836 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 887 ring.submit_q.array[ (*tail) & mask ] = idx; 837 888 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 838 889 839 /* paranoid */ verify( ring.submit_q.sqes[ idx ].user_data != 0 );840 841 890 // Submit however, many entries need to be submitted 842 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);891 int ret = __io_uring_enter( ring, 1, false, 0p ); 843 892 if( ret < 0 ) { 844 893 switch((int)errno) { … … 906 955 return count; 907 956 } 957 958 //============================================================================================= 959 // I/O Submissions 960 //============================================================================================= 961 962 void register_fixed_files( cluster & cl, int * files, unsigned count ) { 963 int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count ); 964 if( ret < 0 ) { 965 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 966 } 967 968 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret ); 969 } 908 970 #endif
Note:
See TracChangeset
for help on using the changeset viewer.