Changes in / [6c12fd28:0dedf027]
- Files:
-
- 5 edited
-
benchmark/io/readv.cfa (modified) (3 diffs)
-
libcfa/src/concurrency/io.cfa (modified) (19 diffs)
-
libcfa/src/concurrency/kernel.cfa (modified) (1 diff)
-
libcfa/src/concurrency/kernel.hfa (modified) (2 diffs)
-
libcfa/src/concurrency/kernel_private.hfa (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/readv.cfa
r6c12fd28 r0dedf027 59 59 unsigned long int nthreads = 2; 60 60 unsigned long int nprocs = 1; 61 unsigned flags = 0; 62 unsigned sublen = 16; 61 int flags = 0; 63 62 64 63 arg_loop: 65 64 for(;;) { 66 65 static struct option options[] = { 67 {"duration", required_argument, 0, 'd'}, 68 {"nthreads", required_argument, 0, 't'}, 69 {"nprocs", required_argument, 0, 'p'}, 70 {"bufsize", required_argument, 0, 'b'}, 71 {"userthread", no_argument , 0, 'u'}, 72 {"submitthread", no_argument , 0, 's'}, 73 {"submitlength", required_argument, 0, 'l'}, 66 {"duration", required_argument, 0, 'd'}, 67 {"nthreads", required_argument, 0, 't'}, 68 {"nprocs", required_argument, 0, 'p'}, 69 {"bufsize", required_argument, 0, 'b'}, 70 {"userthread", no_argument , 0, 'u'}, 74 71 {0, 0, 0, 0} 75 72 }; 76 73 77 74 int idx = 0; 78 int opt = getopt_long(argc, argv, "d:t:p:b:u sl:", options, &idx);75 int opt = getopt_long(argc, argv, "d:t:p:b:u", options, &idx); 79 76 80 77 const char * arg = optarg ? optarg : ""; … … 116 113 flags |= CFA_CLUSTER_IO_POLLER_USER_THREAD; 117 114 break; 118 case 's':119 flags |= CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS;120 break;121 case 'l':122 sublen = strtoul(arg, &end, 10);123 if(*end != '\0' && sublen < 16) {124 fprintf(stderr, "Submit length must be at least 16, was %s\n", arg);125 goto usage;126 }127 flags |= (sublen << CFA_CLUSTER_IO_BUFFLEN_OFFSET);128 break;129 115 // Other cases 130 116 default: /* ? */ … … 137 123 fprintf(stderr, " -p, --nprocs=NPROCS Number of kernel threads\n"); 138 124 fprintf(stderr, " -b, --buflen=SIZE Number of bytes to read per request\n"); 139 fprintf(stderr, " -u, --userthread If set, cluster uses user-thread to poll I/O\n");140 fprintf(stderr, " -s, --submitthread If set, cluster uses polling thread to submit I/O\n");141 125 exit(EXIT_FAILURE); 142 126 } -
libcfa/src/concurrency/io.cfa
r6c12fd28 r0dedf027 20 20 21 21 #if !defined(HAVE_LINUX_IO_URING_H) 22 void __kernel_io_startup( cluster &, unsigned, bool ) {22 void __kernel_io_startup( cluster &, int, bool ) { 23 23 // Nothing to do without io_uring 24 24 } … … 91 91 struct __io_poller_fast { 92 92 struct __io_data * ring; 93 bool waiting; 93 94 $thread thrd; 94 95 }; … … 96 97 void ?{}( __io_poller_fast & this, struct cluster & cltr ) { 97 98 this.ring = cltr.io; 99 this.waiting = true; 98 100 (this.thrd){ "Fast I/O Poller", cltr }; 99 101 } … … 124 126 // Like head/tail but not seen by the kernel 125 127 volatile uint32_t alloc; 126 volatile uint32_t * ready; 127 uint32_t ready_cnt; 128 volatile uint32_t ready; 128 129 129 130 __spinlock_t lock; … … 144 145 volatile unsigned long long int block; 145 146 } submit_avg; 146 struct {147 volatile unsigned long long int val;148 volatile unsigned long long int cnt;149 volatile unsigned long long int block;150 } look_avg;151 147 } stats; 152 148 #endif … … 205 201 // I/O Startup / Shutdown logic 206 202 //============================================================================================= 207 void __kernel_io_startup( cluster & this, unsignedio_flags, bool main_cluster ) {203 void __kernel_io_startup( cluster & this, int io_flags, bool main_cluster ) { 208 204 this.io = malloc(); 209 205 … … 278 274 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 279 275 sq.alloc = *sq.tail; 280 281 if( io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { 282 sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8); 283 sq.ready = alloc_align( 64, sq.ready_cnt ); 284 for(i; sq.ready_cnt) { 285 sq.ready[i] = -1ul32; 286 } 287 } 288 else { 289 sq.ready_cnt = 0; 290 sq.ready = 0p; 291 } 276 sq.ready = *sq.tail; 292 277 293 278 // completion queue … … 322 307 this.io->submit_q.stats.submit_avg.cnt = 0; 323 308 this.io->submit_q.stats.submit_avg.block = 0; 324 this.io->submit_q.stats.look_avg.val = 0;325 this.io->submit_q.stats.look_avg.cnt = 0;326 this.io->submit_q.stats.look_avg.block = 0;327 309 this.io->completion_q.stats.completed_avg.val = 0; 328 310 this.io->completion_q.stats.completed_avg.slow_cnt = 0; … … 365 347 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 366 348 with( this.io->poller.fast ) { 349 /* paranoid */ verify( waiting ); // The thread shouldn't be in a system call 367 350 /* paranoid */ verify( this.procs.head == 0p || &this == mainCluster ); 368 351 /* paranoid */ verify( this.idles.head == 0p || &this == mainCluster ); 369 352 370 353 // We need to adjust the clean-up based on where the thread is 371 if( thrd. state == Ready || thrd.preempted != __NO_PREEMPTION ) {354 if( thrd.preempted != __NO_PREEMPTION ) { 372 355 373 356 // This is the tricky case 374 357 // The thread was preempted and now it is on the ready queue 358 /* paranoid */ verify( thrd.state == Active ); // The thread better be in this state 375 359 /* paranoid */ verify( thrd.next == 1p ); // The thread should be the last on the list 376 360 /* paranoid */ verify( this.ready_queue.head == &thrd ); // The thread should be the only thing on the list … … 421 405 if(this.print_stats) { 422 406 with(this.io->submit_q.stats, this.io->completion_q.stats) { 423 double lavgv = 0;424 double lavgb = 0;425 if(look_avg.cnt != 0) {426 lavgv = ((double)look_avg.val ) / look_avg.cnt;427 lavgb = ((double)look_avg.block) / look_avg.cnt;428 }429 430 407 __cfaabi_bits_print_safe( STDERR_FILENO, 431 408 "----- I/O uRing Stats -----\n" 432 "- total submit calls : %'15llu\n" 433 "- avg submit : %'18.2lf\n" 434 "- pre-submit block %% : %'18.2lf\n" 435 "- total ready search : %'15llu\n" 436 "- avg ready search len : %'18.2lf\n" 437 "- avg ready search block : %'18.2lf\n" 438 "- total wait calls : %'15llu (%'llu slow, %'llu fast)\n" 439 "- avg completion/wait : %'18.2lf\n", 409 "- total submit calls : %'15llu\n" 410 "- avg submit : %'18.2lf\n" 411 "- pre-submit block %% : %'18.2lf\n" 412 "- total wait calls : %'15llu (%'llu slow, %'llu fast)\n" 413 "- avg completion/wait : %'18.2lf\n", 440 414 submit_avg.cnt, 441 415 ((double)submit_avg.val) / submit_avg.cnt, 442 416 (100.0 * submit_avg.block) / submit_avg.cnt, 443 look_avg.cnt,444 lavgv,445 lavgb,446 417 completed_avg.slow_cnt + completed_avg.fast_cnt, 447 418 completed_avg.slow_cnt, completed_avg.fast_cnt, … … 470 441 close(this.io->fd); 471 442 472 free( this.io->submit_q.ready ); // Maybe null, doesn't matter473 443 free( this.io ); 474 444 } … … 484 454 // Process a single completion message from the io_uring 485 455 // This is NOT thread-safe 486 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) { 487 unsigned to_submit = 0; 488 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { 489 490 // If the poller thread also submits, then we need to aggregate the submissions which are ready 491 uint32_t * tail = ring.submit_q.tail; 492 const uint32_t mask = *ring.submit_q.mask; 493 494 // Go through the list of ready submissions 495 for( i; ring.submit_q.ready_cnt ) { 496 // replace any submission with the sentinel, to consume it. 497 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); 498 499 // If it was already the sentinel, then we are done 500 if( idx == -1ul32 ) continue; 501 502 // If we got a real submission, append it to the list 503 ring.submit_q.array[ ((*tail) + to_submit) & mask ] = idx & mask; 504 to_submit++; 505 } 506 507 // Increment the tail based on how many we are ready to submit 508 __atomic_fetch_add(tail, to_submit, __ATOMIC_SEQ_CST); 509 510 // update statistics 511 #if !defined(__CFA_NO_STATISTICS__) 512 ring.submit_q.stats.submit_avg.val += to_submit; 513 ring.submit_q.stats.submit_avg.cnt += 1; 514 #endif 515 } 516 517 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 456 static int __drain_io( struct __io_data & ring, sigset_t * mask, int waitcnt, bool in_kernel ) { 457 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 518 458 if( ret < 0 ) { 519 459 switch((int)errno) { … … 557 497 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED ); 558 498 559 return [count, count > 0 || to_submit > 0];499 return count; 560 500 } 561 501 … … 579 519 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 580 520 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 581 582 521 // In the user-thread approach drain and if anything was drained, 583 522 // batton pass to the user-thread 584 int count; 585 bool again; 586 [count, again] = __drain_io( ring, &mask, 0, true ); 523 int count = __drain_io( ring, &mask, 1, true ); 587 524 588 525 // Update statistics … … 592 529 #endif 593 530 594 if( again) {531 if(count > 0) { 595 532 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring); 596 533 __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 ); … … 602 539 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 603 540 //In the naive approach, just poll the io completion queue directly 604 int count; 605 bool again; 606 [count, again] = __drain_io( ring, &mask, 1, true ); 541 int count = __drain_io( ring, &mask, 1, true ); 607 542 608 543 // Update statistics … … 631 566 // Then loop until we need to start 632 567 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) { 633 634 568 // Drain the io 635 int count; 636 bool again; 637 [count, again] = __drain_io( *this.ring, 0p, 0, false ); 638 639 if(!again) reset++; 569 this.waiting = false; 570 int count = __drain_io( *this.ring, 0p, 0, false ); 571 reset += count > 0 ? 1 : 0; 640 572 641 573 // Update statistics … … 645 577 #endif 646 578 647 // If we got something, just yield and check again579 this.waiting = true; 648 580 if(reset < 5) { 581 // If we got something, just yield and check again 649 582 yield(); 650 583 } 651 // We didn't get anything baton pass to the slow poller652 584 else { 585 // We didn't get anything baton pass to the slow poller 653 586 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring); 587 post( this.ring->poller.sem ); 588 park( __cfaabi_dbg_ctx ); 654 589 reset = 0; 655 656 // wake up the slow poller657 post( this.ring->poller.sem );658 659 // park this thread660 park( __cfaabi_dbg_ctx );661 590 } 662 591 } 663 592 664 593 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring); 665 }666 667 static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));668 static inline void __wake_poller( struct __io_data & ring ) {669 // sigval val = { 1 };670 // pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );671 594 } 672 595 … … 709 632 uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST); 710 633 711 // Mask the idx now to allow make everything easier to check 712 idx &= *ring.submit_q.mask; 634 // Validate that we didn't overflow anything 635 // Check that nothing overflowed 636 /* paranoid */ verify( true ); 637 638 // Check that it goes head -> tail -> alloc and never head -> alloc -> tail 639 /* paranoid */ verify( true ); 713 640 714 641 // Return the sqe 715 return [&ring.submit_q.sqes[ idx ], idx];642 return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx]; 716 643 } 717 644 718 645 static inline void __submit( struct __io_data & ring, uint32_t idx ) { 719 // Get now the data we definetely need 720 uint32_t * const tail = ring.submit_q.tail; 646 // get mutual exclusion 647 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 648 649 // Append to the list of ready entries 650 uint32_t * tail = ring.submit_q.tail; 721 651 const uint32_t mask = *ring.submit_q.mask; 722 652 723 // There are 2 submission schemes, check which one we are using 724 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { 725 // If the poller thread submits, then we just need to add this to the ready array 726 727 /* paranoid */ verify( idx <= mask ); 728 /* paranoid */ verify( idx != -1ul32 ); 729 730 // We need to find a spot in the ready array 731 __attribute((unused)) int len = 0; 732 __attribute((unused)) int block = 0; 733 uint32_t expected = -1ul32; 734 LOOKING: for(;;) { 735 for(i; ring.submit_q.ready_cnt) { 736 if( __atomic_compare_exchange_n( &ring.submit_q.ready[i], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) { 737 break LOOKING; 738 } 739 740 len ++; 741 } 742 743 block++; 744 yield(); 653 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 654 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 655 656 // Submit however, many entries need to be submitted 657 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0); 658 if( ret < 0 ) { 659 switch((int)errno) { 660 default: 661 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 745 662 } 746 747 __wake_poller( ring ); 748 749 // update statistics 750 #if !defined(__CFA_NO_STATISTICS__) 751 __atomic_fetch_add( &ring.submit_q.stats.look_avg.val, len, __ATOMIC_RELAXED ); 752 __atomic_fetch_add( &ring.submit_q.stats.look_avg.block, block, __ATOMIC_RELAXED ); 753 __atomic_fetch_add( &ring.submit_q.stats.look_avg.cnt, 1, __ATOMIC_RELAXED ); 754 #endif 755 756 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() ); 757 } 758 else { 759 // get mutual exclusion 760 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 761 762 // Append to the list of ready entries 763 764 /* paranoid */ verify( idx <= mask ); 765 766 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 767 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 768 769 // Submit however, many entries need to be submitted 770 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0); 771 if( ret < 0 ) { 772 switch((int)errno) { 773 default: 774 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 775 } 776 } 777 778 // update statistics 779 #if !defined(__CFA_NO_STATISTICS__) 780 ring.submit_q.stats.submit_avg.val += 1; 781 ring.submit_q.stats.submit_avg.cnt += 1; 782 #endif 783 784 unlock(ring.submit_q.lock); 785 786 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret ); 787 } 663 } 664 665 // update statistics 666 #if !defined(__CFA_NO_STATISTICS__) 667 ring.submit_q.stats.submit_avg.val += 1; 668 ring.submit_q.stats.submit_avg.cnt += 1; 669 #endif 670 671 unlock(ring.submit_q.lock); 672 // Make sure that idx was submitted 673 // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us 674 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret ); 788 675 } 789 676 -
libcfa/src/concurrency/kernel.cfa
r6c12fd28 r0dedf027 256 256 } 257 257 258 void ?{}(cluster & this, const char name[], Duration preemption_rate, unsignedio_flags) with( this ) {258 void ?{}(cluster & this, const char name[], Duration preemption_rate, int io_flags) with( this ) { 259 259 this.name = name; 260 260 this.preemption_rate = preemption_rate; -
libcfa/src/concurrency/kernel.hfa
r6c12fd28 r0dedf027 116 116 struct __io_data; 117 117 118 #define CFA_CLUSTER_IO_POLLER_USER_THREAD 1 << 0 // 0x1 119 #define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS 1 << 1 // 0x2 120 // #define CFA_CLUSTER_IO_POLLER_KERNEL_SIDE 1 << 2 // 0x4 121 #define CFA_CLUSTER_IO_BUFFLEN_OFFSET 16 118 #define CFA_CLUSTER_IO_POLLER_USER_THREAD 1 << 0 119 // #define CFA_CLUSTER_IO_POLLER_KERNEL_SIDE 1 << 1 122 120 123 121 //----------------------------------------------------------------------------- … … 161 159 extern Duration default_preemption(); 162 160 163 void ?{} (cluster & this, const char name[], Duration preemption_rate, unsignedflags);161 void ?{} (cluster & this, const char name[], Duration preemption_rate, int flags); 164 162 void ^?{}(cluster & this); 165 163 166 static inline void ?{} (cluster & this) { this{"Anonymous Cluster", default_preemption(), 0}; }167 static inline void ?{} (cluster & this, Duration preemption_rate) { this{"Anonymous Cluster", preemption_rate, 0}; }168 static inline void ?{} (cluster & this, const char name[]) { this{name, default_preemption(), 0}; }169 static inline void ?{} (cluster & this, unsignedflags) { this{"Anonymous Cluster", default_preemption(), flags}; }170 static inline void ?{} (cluster & this, Duration preemption_rate, unsignedflags) { this{"Anonymous Cluster", preemption_rate, flags}; }171 static inline void ?{} (cluster & this, const char name[], unsignedflags) { this{name, default_preemption(), flags}; }164 static inline void ?{} (cluster & this) { this{"Anonymous Cluster", default_preemption(), 0}; } 165 static inline void ?{} (cluster & this, Duration preemption_rate) { this{"Anonymous Cluster", preemption_rate, 0}; } 166 static inline void ?{} (cluster & this, const char name[]) { this{name, default_preemption(), 0}; } 167 static inline void ?{} (cluster & this, int flags) { this{"Anonymous Cluster", default_preemption(), flags}; } 168 static inline void ?{} (cluster & this, Duration preemption_rate, int flags) { this{"Anonymous Cluster", preemption_rate, flags}; } 169 static inline void ?{} (cluster & this, const char name[], int flags) { this{name, default_preemption(), flags}; } 172 170 173 171 static inline [cluster *&, cluster *& ] __get( cluster & this ) __attribute__((const)) { return this.node.[next, prev]; } -
libcfa/src/concurrency/kernel_private.hfa
r6c12fd28 r0dedf027 77 77 //----------------------------------------------------------------------------- 78 78 // I/O 79 void __kernel_io_startup ( cluster &, unsigned, bool );79 void __kernel_io_startup ( cluster &, int, bool ); 80 80 void __kernel_io_finish_start( cluster & ); 81 81 void __kernel_io_prepare_stop( cluster & );
Note:
See TracChangeset
for help on using the changeset viewer.