Changeset 365e423 for libcfa/src/concurrency/io.cfa
- Timestamp:
- Jul 10, 2020, 2:22:17 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 7922158, e1801fc
- Parents:
- 0a92c78 (diff), 59f74a2 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r0a92c78 r365e423 14 14 // 15 15 16 // #define __CFA_DEBUG_PRINT_IO__ 17 // #define __CFA_DEBUG_PRINT_IO_CORE__ 16 #if defined(__CFA_DEBUG__) 17 // #define __CFA_DEBUG_PRINT_IO__ 18 #define __CFA_DEBUG_PRINT_IO_CORE__ 19 #endif 18 20 19 21 #include "kernel.hfa" … … 325 327 326 328 // Create the poller thread 327 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for clu ter %p\n", &this);329 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this); 328 330 this.io->poller.slow.blocked = false; 329 331 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this ); … … 430 432 } 431 433 434 int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) { 435 bool need_sys_to_submit = false; 436 bool need_sys_to_complete = false; 437 unsigned min_complete = 0; 438 unsigned flags = 0; 439 440 441 TO_SUBMIT: 442 if( to_submit > 0 ) { 443 if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 444 need_sys_to_submit = true; 445 break TO_SUBMIT; 446 } 447 if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) { 448 need_sys_to_submit = true; 449 flags |= IORING_ENTER_SQ_WAKEUP; 450 } 451 } 452 453 TO_COMPLETE: 454 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 455 flags |= IORING_ENTER_GETEVENTS; 456 if( mask ) { 457 need_sys_to_complete = true; 458 min_complete = 1; 459 break TO_COMPLETE; 460 } 461 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) { 462 need_sys_to_complete = true; 463 } 464 } 465 466 int ret = 0; 467 if( need_sys_to_submit || need_sys_to_complete ) { 468 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8); 469 if( ret < 0 ) { 470 switch((int)errno) { 471 case EAGAIN: 472 case EINTR: 473 ret = -1; 474 break; 475 default: 476 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 477 } 478 } 479 } 480 481 // Memory barrier 482 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 483 return ret; 484 } 485 432 486 //============================================================================================= 433 487 // I/O Polling … … 438 492 // Process a single completion message from the io_uring 439 493 // This is NOT thread-safe 440 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask , int waitcnt, bool in_kernel) {494 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) { 441 495 /* paranoid */ verify( !kernelTLS.preemption_state.enabled ); 442 496 … … 447 501 } 448 502 449 if (to_submit > 0 || waitcnt > 0) { 450 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 451 if( ret < 0 ) { 452 switch((int)errno) { 453 case EAGAIN: 454 case EINTR: 455 return [0, true]; 456 default: 457 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 458 } 459 } 460 461 // Release the consumed SQEs 462 __release_consumed_submission( ring ); 463 464 // update statistics 503 int ret = __io_uring_enter(ring, to_submit, true, mask); 504 if( ret < 0 ) { 505 return [0, true]; 506 } 507 508 // update statistics 509 if (to_submit > 0) { 465 510 __STATS__( true, 466 511 if( to_submit > 0 ) { … … 472 517 } 473 518 474 // Memory barrier475 __ atomic_thread_fence( __ATOMIC_SEQ_CST);519 // Release the consumed SQEs 520 __release_consumed_submission( ring ); 476 521 477 522 // Drain the queue … … 497 542 498 543 data->result = cqe.res; 499 if(! in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }500 else 544 if(!mask) { unpark( data->thrd __cfaabi_dbg_ctx2 ); } 545 else { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); } 501 546 } 502 547 … … 546 591 int count; 547 592 bool again; 548 [count, again] = __drain_io( ring, &mask , 1, true);593 [count, again] = __drain_io( ring, &mask ); 549 594 550 595 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST ); … … 568 613 int count; 569 614 bool again; 570 [count, again] = __drain_io( ring, &mask , 1, true);615 [count, again] = __drain_io( ring, &mask ); 571 616 572 617 // Update statistics … … 606 651 bool again; 607 652 disable_interrupts(); 608 [count, again] = __drain_io( *this.ring, 0p , 0, false);653 [count, again] = __drain_io( *this.ring, 0p ); 609 654 610 655 if(!again) reset++; … … 800 845 // We got the lock 801 846 unsigned to_submit = __collect_submitions( ring ); 802 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, 0, 0p, _NSIG / 8);847 int ret = __io_uring_enter( ring, to_submit, false, 0p ); 803 848 if( ret < 0 ) { 804 switch((int)errno) { 805 case EAGAIN: 806 case EINTR: 807 unlock(ring.submit_q.lock); 808 return; 809 default: 810 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 811 } 812 } 813 814 /* paranoid */ verify( ret > 0 ); 849 unlock(ring.submit_q.lock); 850 return; 851 } 852 853 /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) ); 815 854 816 855 // Release the consumed SQEs … … 830 869 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 831 870 871 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0, 872 /* paranoid */ "index %u already reclaimed\n" 873 /* paranoid */ "head %u, prev %u, tail %u\n" 874 /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n", 875 /* paranoid */ idx, 876 /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail 877 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ] 878 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ] 879 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ] 880 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ] 881 /* paranoid */ ); 882 832 883 // Append to the list of ready entries 833 884 834 885 /* paranoid */ verify( idx <= mask ); 835 836 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 886 ring.submit_q.array[ (*tail) & mask ] = idx; 837 887 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 838 888 839 /* paranoid */ verify( ring.submit_q.sqes[ idx ].user_data != 0 );840 841 889 // Submit however, many entries need to be submitted 842 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);890 int ret = __io_uring_enter( ring, 1, false, 0p ); 843 891 if( ret < 0 ) { 844 892 switch((int)errno) { … … 906 954 return count; 907 955 } 956 957 //============================================================================================= 958 // I/O Submissions 959 //============================================================================================= 960 961 void register_fixed_files( cluster & cl, int * files, unsigned count ) { 962 int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count ); 963 if( ret < 0 ) { 964 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 965 } 966 967 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret ); 968 } 908 969 #endif
Note: See TracChangeset
for help on using the changeset viewer.