Changeset 20ab637 for libcfa/src/concurrency/io.cfa
- Timestamp:
- Jul 10, 2020, 2:17:49 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 59f74a2
- Parents:
- 3a32b3a
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r3a32b3a r20ab637 14 14 // 15 15 16 // #define __CFA_DEBUG_PRINT_IO__ 17 // #define __CFA_DEBUG_PRINT_IO_CORE__ 16 #if defined(__CFA_DEBUG__) 17 // #define __CFA_DEBUG_PRINT_IO__ 18 #define __CFA_DEBUG_PRINT_IO_CORE__ 19 #endif 18 20 19 21 #include "kernel.hfa" … … 325 327 326 328 // Create the poller thread 327 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for clu ter %p\n", &this);329 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this); 328 330 this.io->poller.slow.blocked = false; 329 331 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this ); … … 430 432 } 431 433 434 int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) { 435 bool need_sys_to_submit = false; 436 bool need_sys_to_complete = false; 437 unsigned min_complete = 0; 438 unsigned flags = 0; 439 440 441 TO_SUBMIT: 442 if( to_submit > 0 ) { 443 if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 444 need_sys_to_submit = true; 445 break TO_SUBMIT; 446 } 447 if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) { 448 need_sys_to_submit = true; 449 flags |= IORING_ENTER_SQ_WAKEUP; 450 } 451 } 452 453 TO_COMPLETE: 454 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 455 flags |= IORING_ENTER_GETEVENTS; 456 if( mask ) { 457 need_sys_to_complete = true; 458 min_complete = 1; 459 break TO_COMPLETE; 460 } 461 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) { 462 need_sys_to_complete = true; 463 } 464 } 465 466 int ret = 0; 467 if( need_sys_to_submit || need_sys_to_complete ) { 468 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8); 469 if( ret < 0 ) { 470 switch((int)errno) { 471 case EAGAIN: 472 case EINTR: 473 ret = -1; 474 break; 475 default: 476 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 477 } 478 } 479 } 480 481 // Memory barrier 482 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 483 return ret; 484 } 485 432 486 //============================================================================================= 433 487 // I/O Polling … … 438 492 // Process a single completion message from the io_uring 439 493 // This is NOT thread-safe 440 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask , int waitcnt, bool in_kernel) {494 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) { 441 495 /* paranoid */ verify( !kernelTLS.preemption_state.enabled ); 442 496 const uint32_t smask = *ring.submit_q.mask; … … 448 502 } 449 503 450 if (to_submit > 0 || waitcnt > 0) { 451 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 452 if( ret < 0 ) { 453 switch((int)errno) { 454 case EAGAIN: 455 case EINTR: 456 return [0, true]; 457 default: 458 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 459 } 460 } 461 462 // Release the consumed SQEs 463 __release_consumed_submission( ring ); 464 465 // update statistics 504 int ret = __io_uring_enter(ring, to_submit, true, mask); 505 if( ret < 0 ) { 506 return [0, true]; 507 } 508 509 // update statistics 510 if (to_submit > 0) { 466 511 __STATS__( true, 467 512 if( to_submit > 0 ) { … … 473 518 } 474 519 475 // Memory barrier476 __ atomic_thread_fence( __ATOMIC_SEQ_CST);520 // Release the consumed SQEs 521 __release_consumed_submission( ring ); 477 522 478 523 // Drain the queue … … 498 543 499 544 data->result = cqe.res; 500 if(! in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }501 else 545 if(!mask) { unpark( data->thrd __cfaabi_dbg_ctx2 ); } 546 else { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); } 502 547 } 503 548 … … 547 592 int count; 548 593 bool again; 549 [count, again] = __drain_io( ring, &mask , 1, true);594 [count, again] = __drain_io( ring, &mask ); 550 595 551 596 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST ); … … 569 614 int count; 570 615 bool again; 571 [count, again] = __drain_io( ring, &mask , 1, true);616 [count, again] = __drain_io( ring, &mask ); 572 617 573 618 // Update statistics … … 607 652 bool again; 608 653 disable_interrupts(); 609 [count, again] = __drain_io( *this.ring, 0p , 0, false);654 [count, again] = __drain_io( *this.ring, 0p ); 610 655 611 656 if(!again) reset++; … … 801 846 // We got the lock 802 847 unsigned to_submit = __collect_submitions( ring ); 803 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, 0, 0p, _NSIG / 8);848 int ret = __io_uring_enter( ring, to_submit, false, 0p ); 804 849 if( ret < 0 ) { 805 switch((int)errno) { 806 case EAGAIN: 807 case EINTR: 808 unlock(ring.submit_q.lock); 809 return; 810 default: 811 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 812 } 813 } 814 815 /* paranoid */ verify( ret > 0 ); 850 unlock(ring.submit_q.lock); 851 return; 852 } 853 854 /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) ); 816 855 817 856 // Release the consumed SQEs … … 831 870 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 832 871 872 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0, 873 /* paranoid */ "index %u already reclaimed\n" 874 /* paranoid */ "head %u, prev %u, tail %u\n" 875 /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n", 876 /* paranoid */ idx, 877 /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail 878 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ] 879 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ] 880 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ] 881 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ] 882 /* paranoid */ ); 883 833 884 // Append to the list of ready entries 834 885 835 886 /* paranoid */ verify( idx <= mask ); 836 837 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 887 ring.submit_q.array[ (*tail) & mask ] = idx; 838 888 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 839 889 840 /* paranoid */ verify( ring.submit_q.sqes[ idx ].user_data != 0 );841 842 890 // Submit however, many entries need to be submitted 843 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);891 int ret = __io_uring_enter( ring, 1, false, 0p ); 844 892 if( ret < 0 ) { 845 893 switch((int)errno) { … … 907 955 return count; 908 956 } 957 958 //============================================================================================= 959 // I/O Submissions 960 //============================================================================================= 961 962 void register_fixed_files( cluster & cl, int * files, unsigned count ) { 963 int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count ); 964 if( ret < 0 ) { 965 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 966 } 967 968 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret ); 969 } 909 970 #endif
Note: See TracChangeset
for help on using the changeset viewer.