Changeset 5dadc9b
- Timestamp:
- May 7, 2020, 2:01:21 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 0335620
- Parents:
- 87e0b015
- Location:
- libcfa/src/concurrency
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r87e0b015 r5dadc9b 91 91 struct __io_poller_fast { 92 92 struct __io_data * ring; 93 bool waiting;94 93 $thread thrd; 95 94 }; … … 97 96 void ?{}( __io_poller_fast & this, struct cluster & cltr ) { 98 97 this.ring = cltr.io; 99 this.waiting = true;100 98 (this.thrd){ "Fast I/O Poller", cltr }; 101 99 } … … 126 124 // Like head/tail but not seen by the kernel 127 125 volatile uint32_t alloc; 128 volatile uint32_t ready; 126 volatile uint32_t * ready; 127 uint32_t ready_cnt; 129 128 130 129 __spinlock_t lock; … … 145 144 volatile unsigned long long int block; 146 145 } submit_avg; 146 struct { 147 volatile unsigned long long int val; 148 volatile unsigned long long int cnt; 149 volatile unsigned long long int block; 150 } look_avg; 147 151 } stats; 148 152 #endif … … 274 278 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 275 279 sq.alloc = *sq.tail; 276 sq.ready = *sq.tail; 280 281 if( io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { 282 sq.ready_cnt = 10; 283 sq.ready = alloc( sq.ready_cnt ); 284 for(i; sq.ready_cnt) { 285 sq.ready[i] = -1ul32; 286 } 287 } 288 else { 289 sq.ready_cnt = 0; 290 sq.ready = 0p; 291 } 277 292 278 293 // completion queue … … 307 322 this.io->submit_q.stats.submit_avg.cnt = 0; 308 323 this.io->submit_q.stats.submit_avg.block = 0; 324 this.io->submit_q.stats.look_avg.val = 0; 325 this.io->submit_q.stats.look_avg.cnt = 0; 326 this.io->submit_q.stats.look_avg.block = 0; 309 327 this.io->completion_q.stats.completed_avg.val = 0; 310 328 this.io->completion_q.stats.completed_avg.slow_cnt = 0; … … 347 365 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 348 366 with( this.io->poller.fast ) { 349 /* paranoid */ verify( waiting ); // The thread shouldn't be in a system call350 367 /* paranoid */ verify( this.procs.head == 0p || &this == mainCluster ); 351 368 /* paranoid */ verify( this.idles.head == 0p || &this == mainCluster ); 352 369 353 370 // We need to adjust the clean-up based on where the thread is 354 if( thrd. preempted != __NO_PREEMPTION ) {371 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 355 372 356 373 // This is the tricky case 357 374 // The thread was preempted and now it is on the ready queue 358 /* paranoid */ verify( thrd.state == Active ); // The thread better be in this state359 375 /* paranoid */ verify( thrd.next == 1p ); // The thread should be the last on the list 360 376 /* paranoid */ verify( this.ready_queue.head == &thrd ); // The thread should be the only thing on the list … … 405 421 if(this.print_stats) { 406 422 with(this.io->submit_q.stats, this.io->completion_q.stats) { 423 double lavgv = 0; 424 double lavgb = 0; 425 if(look_avg.cnt != 0) { 426 lavgv = ((double)look_avg.val ) / look_avg.cnt; 427 lavgb = ((double)look_avg.block) / look_avg.cnt; 428 } 429 407 430 __cfaabi_bits_print_safe( STDERR_FILENO, 408 431 "----- I/O uRing Stats -----\n" 409 "- total submit calls : %'15llu\n" 410 "- avg submit : %'18.2lf\n" 411 "- pre-submit block %% : %'18.2lf\n" 412 "- total wait calls : %'15llu (%'llu slow, %'llu fast)\n" 413 "- avg completion/wait : %'18.2lf\n", 432 "- total submit calls : %'15llu\n" 433 "- avg submit : %'18.2lf\n" 434 "- pre-submit block %% : %'18.2lf\n" 435 "- total ready search : %'15llu\n" 436 "- avg ready search len : %'18.2lf\n" 437 "- avg ready search block : %'18.2lf\n" 438 "- total wait calls : %'15llu (%'llu slow, %'llu fast)\n" 439 "- avg completion/wait : %'18.2lf\n", 414 440 submit_avg.cnt, 415 441 ((double)submit_avg.val) / submit_avg.cnt, 416 442 (100.0 * submit_avg.block) / submit_avg.cnt, 443 look_avg.cnt, 444 lavgv, 445 lavgb, 417 446 completed_avg.slow_cnt + completed_avg.fast_cnt, 418 447 completed_avg.slow_cnt, completed_avg.fast_cnt, … … 441 470 close(this.io->fd); 442 471 472 free( this.io->submit_q.ready ); // Maybe null, doesn't matter 443 473 free( this.io ); 444 474 } … … 454 484 // Process a single completion message from the io_uring 455 485 // This is NOT thread-safe 456 static int __drain_io( struct __io_data & ring, sigset_t * mask, int waitcnt, bool in_kernel ) { 457 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 486 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) { 487 unsigned to_submit = 0; 488 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { 489 490 // If the poller thread also submits, then we need to aggregate the submissions which are ready 491 uint32_t * tail = ring.submit_q.tail; 492 const uint32_t mask = *ring.submit_q.mask; 493 494 // Go through the list of ready submissions 495 for( i; ring.submit_q.ready_cnt ) { 496 // replace any submission with the sentinel, to consume it. 497 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); 498 499 // If it was already the sentinel, then we are done 500 if( idx == -1ul32 ) continue; 501 502 // If we got a real submission, append it to the list 503 ring.submit_q.array[ ((*tail) + to_submit) & mask ] = idx & mask; 504 to_submit++; 505 } 506 507 // Increment the tail based on how many we are ready to submit 508 __atomic_fetch_add(tail, to_submit, __ATOMIC_SEQ_CST); 509 510 // update statistics 511 #if !defined(__CFA_NO_STATISTICS__) 512 ring.submit_q.stats.submit_avg.val += to_submit; 513 ring.submit_q.stats.submit_avg.cnt += 1; 514 #endif 515 } 516 517 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 458 518 if( ret < 0 ) { 459 519 switch((int)errno) { … … 497 557 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED ); 498 558 499 return count;559 return [count, count > 0 || to_submit > 0]; 500 560 } 501 561 … … 519 579 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 520 580 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 581 521 582 // In the user-thread approach drain and if anything was drained, 522 583 // batton pass to the user-thread 523 int count = __drain_io( ring, &mask, 1, true ); 584 int count; 585 bool again; 586 [count, again] = __drain_io( ring, &mask, 1, true ); 524 587 525 588 // Update statistics … … 529 592 #endif 530 593 531 if( count > 0) {594 if(again) { 532 595 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring); 533 596 __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 ); … … 539 602 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 540 603 //In the naive approach, just poll the io completion queue directly 541 int count = __drain_io( ring, &mask, 1, true ); 604 int count; 605 bool again; 606 [count, again] = __drain_io( ring, &mask, 1, true ); 542 607 543 608 // Update statistics … … 566 631 // Then loop until we need to start 567 632 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) { 633 568 634 // Drain the io 569 this.waiting = false; 570 int count = __drain_io( *this.ring, 0p, 0, false ); 571 reset += count > 0 ? 1 : 0; 635 int count; 636 bool again; 637 [count, again] = __drain_io( *this.ring, 0p, 0, false ); 638 639 if(!again) reset++; 572 640 573 641 // Update statistics … … 577 645 #endif 578 646 579 this.waiting = true;647 // If we got something, just yield and check again 580 648 if(reset < 5) { 581 // If we got something, just yield and check again582 649 yield(); 583 650 } 651 // We didn't get anything baton pass to the slow poller 584 652 else { 585 // We didn't get anything baton pass to the slow poller586 653 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring); 654 reset = 0; 655 656 // wake up the slow poller 587 657 post( this.ring->poller.sem ); 658 659 // park this thread 588 660 park( __cfaabi_dbg_ctx ); 589 reset = 0;590 661 } 591 662 } 592 663 593 664 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring); 665 } 666 667 static inline void __wake_poller( struct __io_data & ring ) { 668 sigval val = { 1 }; 669 pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val ); 594 670 } 595 671 … … 632 708 uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST); 633 709 634 // Validate that we didn't overflow anything 635 // Check that nothing overflowed 636 /* paranoid */ verify( true ); 637 638 // Check that it goes head -> tail -> alloc and never head -> alloc -> tail 639 /* paranoid */ verify( true ); 710 // Mask the idx now to allow make everything easier to check 711 idx &= *ring.submit_q.mask; 640 712 641 713 // Return the sqe 642 return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx];714 return [&ring.submit_q.sqes[ idx ], idx]; 643 715 } 644 716 645 717 static inline void __submit( struct __io_data & ring, uint32_t idx ) { 646 // get mutual exclusion 647 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 648 649 // Append to the list of ready entries 650 uint32_t * tail = ring.submit_q.tail; 718 // Get now the data we definetely need 719 uint32_t * const tail = ring.submit_q.tail; 651 720 const uint32_t mask = *ring.submit_q.mask; 652 721 653 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 654 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 655 656 // Submit however, many entries need to be submitted 657 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0); 658 if( ret < 0 ) { 659 switch((int)errno) { 660 default: 661 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 662 } 663 } 664 665 // update statistics 666 #if !defined(__CFA_NO_STATISTICS__) 667 ring.submit_q.stats.submit_avg.val += 1; 668 ring.submit_q.stats.submit_avg.cnt += 1; 669 #endif 670 671 unlock(ring.submit_q.lock); 722 // There are 2 submission schemes, check which one we are using 723 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { 724 // If the poller thread submits, then we just need to add this to the ready array 725 726 /* paranoid */ verify( idx <= mask ); 727 /* paranoid */ verify( idx != -1ul32 ); 728 729 // We need to find a spot in the ready array 730 __attribute((unused)) int len = 0; 731 __attribute((unused)) int block = 0; 732 uint32_t expected = -1ul32; 733 LOOKING: for(;;) { 734 for(i; ring.submit_q.ready_cnt) { 735 if( __atomic_compare_exchange_n( &ring.submit_q.ready[i], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) { 736 break LOOKING; 737 } 738 739 len ++; 740 } 741 742 block++; 743 yield(); 744 } 745 746 __wake_poller( ring ); 747 748 // update statistics 749 #if !defined(__CFA_NO_STATISTICS__) 750 __atomic_fetch_add( &ring.submit_q.stats.look_avg.val, len, __ATOMIC_RELAXED ); 751 __atomic_fetch_add( &ring.submit_q.stats.look_avg.block, block, __ATOMIC_RELAXED ); 752 __atomic_fetch_add( &ring.submit_q.stats.look_avg.cnt, 1, __ATOMIC_RELAXED ); 753 #endif 754 } 755 else { 756 // get mutual exclusion 757 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 758 759 // Append to the list of ready entries 760 761 /* paranoid */ verify( idx <= mask ); 762 763 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 764 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 765 766 // Submit however, many entries need to be submitted 767 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0); 768 if( ret < 0 ) { 769 switch((int)errno) { 770 default: 771 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 772 } 773 } 774 775 // update statistics 776 #if !defined(__CFA_NO_STATISTICS__) 777 ring.submit_q.stats.submit_avg.val += 1; 778 ring.submit_q.stats.submit_avg.cnt += 1; 779 #endif 780 781 unlock(ring.submit_q.lock); 782 } 672 783 // Make sure that idx was submitted 673 784 // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us -
libcfa/src/concurrency/kernel.hfa
r87e0b015 r5dadc9b 116 116 struct __io_data; 117 117 118 #define CFA_CLUSTER_IO_POLLER_USER_THREAD 1 << 0 119 // #define CFA_CLUSTER_IO_POLLER_KERNEL_SIDE 1 << 1 118 #define CFA_CLUSTER_IO_POLLER_USER_THREAD 1 << 0 // 0x1 119 #define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS 1 << 1 // 0x2 120 // #define CFA_CLUSTER_IO_POLLER_KERNEL_SIDE 1 << 2 // 0x4 120 121 121 122 //-----------------------------------------------------------------------------
Note: See TracChangeset
for help on using the changeset viewer.