Changeset 2f1cb37 for libcfa/src/concurrency/io.cfa
- Timestamp:
- May 12, 2020, 4:58:53 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 2802824
- Parents:
- 1b143de (diff), 068a202 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r1b143de r2f1cb37 18 18 19 19 #include "kernel.hfa" 20 #include "bitmanip.hfa" 20 21 21 22 #if !defined(HAVE_LINUX_IO_URING_H) 22 void __kernel_io_startup( cluster &, int, bool ) {23 void __kernel_io_startup( cluster &, unsigned, bool ) { 23 24 // Nothing to do without io_uring 24 25 } … … 91 92 struct __io_poller_fast { 92 93 struct __io_data * ring; 93 bool waiting;94 94 $thread thrd; 95 95 }; … … 97 97 void ?{}( __io_poller_fast & this, struct cluster & cltr ) { 98 98 this.ring = cltr.io; 99 this.waiting = true;100 99 (this.thrd){ "Fast I/O Poller", cltr }; 101 100 } … … 126 125 // Like head/tail but not seen by the kernel 127 126 volatile uint32_t alloc; 128 volatile uint32_t ready; 127 volatile uint32_t * ready; 128 uint32_t ready_cnt; 129 129 130 130 __spinlock_t lock; … … 145 145 volatile unsigned long long int block; 146 146 } submit_avg; 147 struct { 148 volatile unsigned long long int val; 149 volatile unsigned long long int cnt; 150 volatile unsigned long long int block; 151 } look_avg; 147 152 } stats; 148 153 #endif … … 192 197 void * stack; 193 198 pthread_t kthrd; 199 volatile bool blocked; 194 200 } slow; 195 201 __io_poller_fast fast; … … 201 207 // I/O Startup / Shutdown logic 202 208 //============================================================================================= 203 void __kernel_io_startup( cluster & this, intio_flags, bool main_cluster ) {209 void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) { 204 210 this.io = malloc(); 205 211 … … 274 280 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 275 281 sq.alloc = *sq.tail; 276 sq.ready = *sq.tail; 282 283 if( io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { 284 /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8) ); 285 sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8); 286 sq.ready = alloc_align( 64, sq.ready_cnt ); 287 for(i; sq.ready_cnt) { 288 sq.ready[i] = -1ul32; 289 } 290 } 291 else { 292 sq.ready_cnt = 0; 293 sq.ready = 0p; 294 } 277 295 278 296 // completion queue … … 307 325 this.io->submit_q.stats.submit_avg.cnt = 0; 308 326 this.io->submit_q.stats.submit_avg.block = 0; 327 this.io->submit_q.stats.look_avg.val = 0; 328 this.io->submit_q.stats.look_avg.cnt = 0; 329 this.io->submit_q.stats.look_avg.block = 0; 309 330 this.io->completion_q.stats.completed_avg.val = 0; 310 331 this.io->completion_q.stats.completed_avg.slow_cnt = 0; … … 326 347 // Create the poller thread 327 348 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this); 349 this.io->poller.slow.blocked = false; 328 350 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this ); 329 351 } … … 347 369 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 348 370 with( this.io->poller.fast ) { 349 /* paranoid */ verify( waiting ); // The thread shouldn't be in a system call350 371 /* paranoid */ verify( this.procs.head == 0p || &this == mainCluster ); 351 372 /* paranoid */ verify( this.idles.head == 0p || &this == mainCluster ); 352 373 353 374 // We need to adjust the clean-up based on where the thread is 354 if( thrd. preempted != __NO_PREEMPTION ) {375 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 355 376 356 377 // This is the tricky case 357 378 // The thread was preempted and now it is on the ready queue 358 /* paranoid */ verify( thrd.state == Active ); // The thread better be in this state 379 359 380 /* paranoid */ verify( thrd.next != 0p ); // The thread should be the last on the list 360 381 /* paranoid */ verify( this.ready_queue.head == &thrd ); // The thread should be the only thing on the list … … 405 426 if(this.print_stats) { 406 427 with(this.io->submit_q.stats, this.io->completion_q.stats) { 407 __cfaabi_bits_print_safe( STDERR_FILENO, 428 double lavgv = 0; 429 double lavgb = 0; 430 if(look_avg.cnt != 0) { 431 lavgv = ((double)look_avg.val ) / look_avg.cnt; 432 lavgb = ((double)look_avg.block) / look_avg.cnt; 433 } 434 435 __cfaabi_bits_print_safe( STDOUT_FILENO, 408 436 "----- I/O uRing Stats -----\n" 409 "- total submit calls : %'15llu\n" 410 "- avg submit : %'18.2lf\n" 411 "- pre-submit block %% : %'18.2lf\n" 412 "- total wait calls : %'15llu (%'llu slow, %'llu fast)\n" 413 "- avg completion/wait : %'18.2lf\n", 437 "- total submit calls : %'15llu\n" 438 "- avg submit : %'18.2lf\n" 439 "- pre-submit block %% : %'18.2lf\n" 440 "- total ready search : %'15llu\n" 441 "- avg ready search len : %'18.2lf\n" 442 "- avg ready search block : %'18.2lf\n" 443 "- total wait calls : %'15llu (%'llu slow, %'llu fast)\n" 444 "- avg completion/wait : %'18.2lf\n", 414 445 submit_avg.cnt, 415 446 ((double)submit_avg.val) / submit_avg.cnt, 416 447 (100.0 * submit_avg.block) / submit_avg.cnt, 448 look_avg.cnt, 449 lavgv, 450 lavgb, 417 451 completed_avg.slow_cnt + completed_avg.fast_cnt, 418 452 completed_avg.slow_cnt, completed_avg.fast_cnt, … … 441 475 close(this.io->fd); 442 476 477 free( this.io->submit_q.ready ); // Maybe null, doesn't matter 443 478 free( this.io ); 444 479 } … … 454 489 // Process a single completion message from the io_uring 455 490 // This is NOT thread-safe 456 static int __drain_io( struct __io_data & ring, sigset_t * mask, int waitcnt, bool in_kernel ) { 457 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 491 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) { 492 unsigned to_submit = 0; 493 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { 494 495 // If the poller thread also submits, then we need to aggregate the submissions which are ready 496 uint32_t * tail = ring.submit_q.tail; 497 const uint32_t mask = *ring.submit_q.mask; 498 499 // Go through the list of ready submissions 500 for( i; ring.submit_q.ready_cnt ) { 501 // replace any submission with the sentinel, to consume it. 502 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); 503 504 // If it was already the sentinel, then we are done 505 if( idx == -1ul32 ) continue; 506 507 // If we got a real submission, append it to the list 508 ring.submit_q.array[ ((*tail) + to_submit) & mask ] = idx & mask; 509 to_submit++; 510 } 511 512 // Increment the tail based on how many we are ready to submit 513 __atomic_fetch_add(tail, to_submit, __ATOMIC_SEQ_CST); 514 515 // update statistics 516 #if !defined(__CFA_NO_STATISTICS__) 517 ring.submit_q.stats.submit_avg.val += to_submit; 518 ring.submit_q.stats.submit_avg.cnt += 1; 519 #endif 520 } 521 522 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 458 523 if( ret < 0 ) { 459 524 switch((int)errno) { … … 497 562 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED ); 498 563 499 return count;564 return [count, count > 0 || to_submit > 0]; 500 565 } 501 566 … … 519 584 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 520 585 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 586 587 __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST ); 588 521 589 // In the user-thread approach drain and if anything was drained, 522 590 // batton pass to the user-thread 523 int count = __drain_io( ring, &mask, 1, true ); 591 int count; 592 bool again; 593 [count, again] = __drain_io( ring, &mask, 1, true ); 594 595 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST ); 524 596 525 597 // Update statistics … … 529 601 #endif 530 602 531 if( count > 0) {603 if(again) { 532 604 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring); 533 605 __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 ); … … 539 611 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 540 612 //In the naive approach, just poll the io completion queue directly 541 int count = __drain_io( ring, &mask, 1, true ); 613 int count; 614 bool again; 615 [count, again] = __drain_io( ring, &mask, 1, true ); 542 616 543 617 // Update statistics … … 566 640 // Then loop until we need to start 567 641 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) { 642 568 643 // Drain the io 569 this.waiting = false; 570 int count = __drain_io( *this.ring, 0p, 0, false ); 571 reset += count > 0 ? 1 : 0; 644 int count; 645 bool again; 646 [count, again] = __drain_io( *this.ring, 0p, 0, false ); 647 648 if(!again) reset++; 572 649 573 650 // Update statistics … … 577 654 #endif 578 655 579 this.waiting = true;656 // If we got something, just yield and check again 580 657 if(reset < 5) { 581 // If we got something, just yield and check again582 658 yield(); 583 659 } 660 // We didn't get anything baton pass to the slow poller 584 661 else { 585 // We didn't get anything baton pass to the slow poller586 662 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring); 663 reset = 0; 664 665 // wake up the slow poller 587 666 post( this.ring->poller.sem ); 667 668 // park this thread 588 669 park( __cfaabi_dbg_ctx ); 589 reset = 0;590 670 } 591 671 } 592 672 593 673 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring); 674 } 675 676 static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial)); 677 static inline void __wake_poller( struct __io_data & ring ) { 678 if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return; 679 680 sigval val = { 1 }; 681 pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val ); 594 682 } 595 683 … … 632 720 uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST); 633 721 634 // Validate that we didn't overflow anything 635 // Check that nothing overflowed 636 /* paranoid */ verify( true ); 637 638 // Check that it goes head -> tail -> alloc and never head -> alloc -> tail 639 /* paranoid */ verify( true ); 722 // Mask the idx now to allow make everything easier to check 723 idx &= *ring.submit_q.mask; 640 724 641 725 // Return the sqe 642 return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx];726 return [&ring.submit_q.sqes[ idx ], idx]; 643 727 } 644 728 645 729 static inline void __submit( struct __io_data & ring, uint32_t idx ) { 646 // get mutual exclusion 647 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 648 649 // Append to the list of ready entries 650 uint32_t * tail = ring.submit_q.tail; 730 // Get now the data we definetely need 731 uint32_t * const tail = ring.submit_q.tail; 651 732 const uint32_t mask = *ring.submit_q.mask; 652 733 653 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 654 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 655 656 // Submit however, many entries need to be submitted 657 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0); 658 if( ret < 0 ) { 659 switch((int)errno) { 660 default: 661 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 662 } 663 } 664 665 // update statistics 666 #if !defined(__CFA_NO_STATISTICS__) 667 ring.submit_q.stats.submit_avg.val += 1; 668 ring.submit_q.stats.submit_avg.cnt += 1; 669 #endif 670 671 unlock(ring.submit_q.lock); 672 // Make sure that idx was submitted 673 // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us 674 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret ); 734 // There are 2 submission schemes, check which one we are using 735 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { 736 // If the poller thread submits, then we just need to add this to the ready array 737 738 /* paranoid */ verify( idx <= mask ); 739 /* paranoid */ verify( idx != -1ul32 ); 740 741 // We need to find a spot in the ready array 742 __attribute((unused)) int len = 0; 743 __attribute((unused)) int block = 0; 744 uint32_t expected = -1ul32; 745 uint32_t ready_mask = ring.submit_q.ready_cnt - 1; 746 uint32_t off = __tls_rand(); 747 LOOKING: for() { 748 for(i; ring.submit_q.ready_cnt) { 749 uint32_t ii = (i + off) & ready_mask; 750 if( __atomic_compare_exchange_n( &ring.submit_q.ready[ii], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) { 751 break LOOKING; 752 } 753 754 len ++; 755 } 756 757 block++; 758 yield(); 759 } 760 761 __wake_poller( ring ); 762 763 // update statistics 764 #if !defined(__CFA_NO_STATISTICS__) 765 __atomic_fetch_add( &ring.submit_q.stats.look_avg.val, len, __ATOMIC_RELAXED ); 766 __atomic_fetch_add( &ring.submit_q.stats.look_avg.block, block, __ATOMIC_RELAXED ); 767 __atomic_fetch_add( &ring.submit_q.stats.look_avg.cnt, 1, __ATOMIC_RELAXED ); 768 #endif 769 770 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() ); 771 } 772 else { 773 // get mutual exclusion 774 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 775 776 // Append to the list of ready entries 777 778 /* paranoid */ verify( idx <= mask ); 779 780 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 781 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 782 783 // Submit however, many entries need to be submitted 784 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0); 785 if( ret < 0 ) { 786 switch((int)errno) { 787 default: 788 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 789 } 790 } 791 792 // update statistics 793 #if !defined(__CFA_NO_STATISTICS__) 794 ring.submit_q.stats.submit_avg.val += 1; 795 ring.submit_q.stats.submit_avg.cnt += 1; 796 #endif 797 798 unlock(ring.submit_q.lock); 799 800 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret ); 801 } 675 802 } 676 803
Note: See TracChangeset
for help on using the changeset viewer.