Changes in libcfa/src/concurrency/io.cfa [20ab637:fb98462]
- File:
-
- 1 edited
-
libcfa/src/concurrency/io.cfa (modified) (15 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r20ab637 rfb98462 14 14 // 15 15 16 #if defined(__CFA_DEBUG__) 17 // #define __CFA_DEBUG_PRINT_IO__ 18 #define __CFA_DEBUG_PRINT_IO_CORE__ 19 #endif 16 // #define __CFA_DEBUG_PRINT_IO__ 17 // #define __CFA_DEBUG_PRINT_IO_CORE__ 20 18 21 19 #include "kernel.hfa" … … 218 216 // adjust the size according to the parameters 219 217 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 220 cq ->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz);218 cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz); 221 219 } 222 220 #endif … … 232 230 // mmap the Completion Queue into existence (may or may not be needed) 233 231 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 234 cq ->ring_ptr = sq->ring_ptr;232 cq.ring_ptr = sq.ring_ptr; 235 233 } 236 234 else … … 327 325 328 326 // Create the poller thread 329 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for clu ster %p\n", &this);327 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this); 330 328 this.io->poller.slow.blocked = false; 331 329 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this ); … … 432 430 } 433 431 434 int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) {435 bool need_sys_to_submit = false;436 bool need_sys_to_complete = false;437 unsigned min_complete = 0;438 unsigned flags = 0;439 440 441 TO_SUBMIT:442 if( to_submit > 0 ) {443 if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {444 need_sys_to_submit = true;445 break TO_SUBMIT;446 }447 if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {448 need_sys_to_submit = true;449 flags |= IORING_ENTER_SQ_WAKEUP;450 }451 }452 453 TO_COMPLETE:454 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {455 flags |= IORING_ENTER_GETEVENTS;456 if( mask ) {457 need_sys_to_complete = true;458 min_complete = 1;459 break TO_COMPLETE;460 }461 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {462 need_sys_to_complete = true;463 }464 }465 466 int ret = 0;467 if( need_sys_to_submit || need_sys_to_complete ) {468 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);469 if( ret < 0 ) {470 switch((int)errno) {471 case EAGAIN:472 case EINTR:473 ret = -1;474 break;475 default:476 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );477 }478 }479 }480 481 // Memory barrier482 __atomic_thread_fence( __ATOMIC_SEQ_CST );483 return ret;484 }485 486 432 //============================================================================================= 487 433 // I/O Polling … … 492 438 // Process a single completion message from the io_uring 493 439 // This is NOT thread-safe 494 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) {440 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) { 495 441 /* paranoid */ verify( !kernelTLS.preemption_state.enabled ); 496 const uint32_t smask = *ring.submit_q.mask;497 442 498 443 unsigned to_submit = 0; … … 502 447 } 503 448 504 int ret = __io_uring_enter(ring, to_submit, true, mask); 505 if( ret < 0 ) { 506 return [0, true]; 507 } 508 509 // update statistics 510 if (to_submit > 0) { 449 if (to_submit > 0 || waitcnt > 0) { 450 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 451 if( ret < 0 ) { 452 switch((int)errno) { 453 case EAGAIN: 454 case EINTR: 455 return [0, true]; 456 default: 457 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 458 } 459 } 460 461 // Release the consumed SQEs 462 __release_consumed_submission( ring ); 463 464 // update statistics 511 465 __STATS__( true, 512 466 if( to_submit > 0 ) { … … 518 472 } 519 473 520 // Release the consumed SQEs521 __ release_consumed_submission( ring);474 // Memory barrier 475 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 522 476 523 477 // Drain the queue … … 543 497 544 498 data->result = cqe.res; 545 if(! mask) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }546 else { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); }499 if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); } 500 else { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); } 547 501 } 548 502 … … 592 546 int count; 593 547 bool again; 594 [count, again] = __drain_io( ring, &mask );548 [count, again] = __drain_io( ring, &mask, 1, true ); 595 549 596 550 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST ); … … 614 568 int count; 615 569 bool again; 616 [count, again] = __drain_io( ring, &mask );570 [count, again] = __drain_io( ring, &mask, 1, true ); 617 571 618 572 // Update statistics … … 652 606 bool again; 653 607 disable_interrupts(); 654 [count, again] = __drain_io( *this.ring, 0p );608 [count, again] = __drain_io( *this.ring, 0p, 0, false ); 655 609 656 610 if(!again) reset++; … … 846 800 // We got the lock 847 801 unsigned to_submit = __collect_submitions( ring ); 848 int ret = __io_uring_enter( ring, to_submit, false, 0p);802 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, 0, 0p, _NSIG / 8); 849 803 if( ret < 0 ) { 850 unlock(ring.submit_q.lock); 851 return; 852 } 853 854 /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) ); 804 switch((int)errno) { 805 case EAGAIN: 806 case EINTR: 807 unlock(ring.submit_q.lock); 808 return; 809 default: 810 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 811 } 812 } 813 814 /* paranoid */ verify( ret > 0 ); 855 815 856 816 // Release the consumed SQEs … … 870 830 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 871 831 872 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,873 /* paranoid */ "index %u already reclaimed\n"874 /* paranoid */ "head %u, prev %u, tail %u\n"875 /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n",876 /* paranoid */ idx,877 /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail878 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]879 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]880 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]881 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]882 /* paranoid */ );883 884 832 // Append to the list of ready entries 885 833 886 834 /* paranoid */ verify( idx <= mask ); 887 ring.submit_q.array[ (*tail) & mask ] = idx; 835 836 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 888 837 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 889 838 839 /* paranoid */ verify( ring.submit_q.sqes[ idx ].user_data != 0 ); 840 890 841 // Submit however, many entries need to be submitted 891 int ret = __io_uring_enter( ring, 1, false, 0p);842 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0); 892 843 if( ret < 0 ) { 893 844 switch((int)errno) { … … 955 906 return count; 956 907 } 957 958 //=============================================================================================959 // I/O Submissions960 //=============================================================================================961 962 void register_fixed_files( cluster & cl, int * files, unsigned count ) {963 int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );964 if( ret < 0 ) {965 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );966 }967 968 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );969 }970 908 #endif
Note:
See TracChangeset
for help on using the changeset viewer.