Changes in / [49cad912:3a32b3a]
- Files:
-
- 1 added
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/batch-readv.c
r49cad912 r3a32b3a 1 1 // Program to test the optimial batchsize in a single threaded process 2 2 extern "C" { 3 #ifndef _GNU_SOURCE /* See feature_test_macros(7) */ 4 #define _GNU_SOURCE /* See feature_test_macros(7) */ 5 #endif 6 #include <errno.h> 7 #include <stdio.h> 8 #include <stdint.h> 9 #include <stdlib.h> 10 #include <string.h> 3 #include <getopt.h> 11 4 #include <locale.h> 12 #include <getopt.h>13 #include <unistd.h>14 #include <sys/mman.h>15 #include <sys/syscall.h>16 #include <sys/uio.h>17 #include <fcntl.h>18 5 #include <time.h> // timespec 19 6 #include <sys/time.h> // timeval 20 21 #include <linux/io_uring.h>22 7 } 23 24 8 25 9 enum { TIMEGRAN = 1000000000LL }; // nanosecond granularity, except for timeval … … 27 11 #include <omp.h> 28 12 29 # ifndef __NR_io_uring_setup 30 # define __NR_io_uring_setup 425 31 # endif 32 # ifndef __NR_io_uring_enter 33 # define __NR_io_uring_enter 426 34 # endif 35 # ifndef __NR_io_uring_register 36 # define __NR_io_uring_register 427 37 # endif 13 #include "io_uring.h" 38 14 39 struct io_uring_sq {40 // Head and tail of the ring (associated with array)41 volatile uint32_t * head;42 volatile uint32_t * tail;43 15 44 // The actual kernel ring which uses head/tail45 // indexes into the sqes arrays46 uint32_t * array;47 48 // number of entries and mask to go with it49 const uint32_t * num;50 const uint32_t * mask;51 52 // Submission flags (Not sure what for)53 uint32_t * flags;54 55 // number of sqes not submitted (whatever that means)56 uint32_t * dropped;57 58 // Like head/tail but not seen by the kernel59 volatile uint32_t alloc;60 61 // A buffer of sqes (not the actual ring)62 struct io_uring_sqe * sqes;63 64 // The location and size of the mmaped area65 void * ring_ptr;66 size_t ring_sz;67 };68 69 struct io_uring_cq {70 // Head and tail of the ring71 volatile uint32_t * head;72 volatile uint32_t * tail;73 74 // number of entries and mask to go with it75 const uint32_t * mask;76 const uint32_t * num;77 78 // number of cqes not submitted (whatever that means)79 uint32_t * overflow;80 81 // the kernel ring82 struct io_uring_cqe * cqes;83 84 // The location and size of the mmaped area85 void * ring_ptr;86 size_t ring_sz;87 };88 89 struct io_ring {90 struct io_uring_sq submit_q;91 struct io_uring_cq completion_q;92 uint32_t flags;93 int fd;94 };95 96 struct fred {97 io_ring io;98 };99 100 fred self;101 16 int myfd; 102 17 … … 217 132 myfd = open(__FILE__, 0); 218 133 219 // Step 1 : call to setup 220 struct io_uring_params params; 221 memset(¶ms, 0, sizeof(params)); 222 223 uint32_t nentries = 2048; 224 225 int fd = syscall(__NR_io_uring_setup, nentries, ¶ms ); 226 if(fd < 0) { 227 fprintf(stderr, "KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno)); 228 abort(); 229 } 230 231 // Step 2 : mmap result 232 memset(&self.io, 0, sizeof(struct io_ring)); 233 struct io_uring_sq & sq = self.io.submit_q; 234 struct io_uring_cq & cq = self.io.completion_q; 235 236 // calculate the right ring size 237 sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) ); 238 cq.ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe)); 239 240 // Requires features 241 // // adjust the size according to the parameters 242 // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 243 // cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz); 244 // } 245 246 // mmap the Submit Queue into existence 247 sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING); 248 if (sq.ring_ptr == (void*)MAP_FAILED) { 249 fprintf(stderr, "KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno)); 250 abort(); 251 } 252 253 // mmap the Completion Queue into existence (may or may not be needed) 254 // Requires features 255 // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 256 // cq->ring_ptr = sq->ring_ptr; 257 // } 258 // else { 259 // We need multiple call to MMAP 260 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING); 261 if (cq.ring_ptr == (void*)MAP_FAILED) { 262 munmap(sq.ring_ptr, sq.ring_sz); 263 fprintf(stderr, "KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno)); 264 abort(); 265 } 266 // } 267 268 // mmap the submit queue entries 269 size_t size = params.sq_entries * sizeof(struct io_uring_sqe); 270 sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES); 271 if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) { 272 munmap(sq.ring_ptr, sq.ring_sz); 273 if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz); 274 fprintf(stderr, "KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno)); 275 abort(); 276 } 277 278 // Get the pointers from the kernel to fill the structure 279 // submit queue 280 sq.head = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head); 281 sq.tail = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail); 282 sq.mask = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask); 283 sq.num = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries); 284 sq.flags = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags); 285 sq.dropped = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 286 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 287 sq.alloc = *sq.tail; 288 289 // completion queue 290 cq.head = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head); 291 cq.tail = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail); 292 cq.mask = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask); 293 cq.num = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries); 294 cq.overflow = ( uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow); 295 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes); 296 297 self.io.fd = fd; 134 init_uring(2048); 298 135 299 136 // Allocate the sqe … … 344 181 345 182 printf("Took %'ld ms\n", to_miliseconds(end - start)); 346 printf("Submitted %'llu\n", submits); 347 printf("Completed %'llu\n", completes); 348 printf("Submitted / sec %'.f\n", submits / to_fseconds(end - start)); 349 printf("Completed / sec %'.f\n", completes / to_fseconds(end - start)); 183 printf("Submitted %'llu\n", submits); 184 printf("Completed %'llu\n", completes); 185 printf("Submitted / sec %'.f\n", submits / to_fseconds(end - start)); 186 printf("Completed / sec %'.f\n", completes / to_fseconds(end - start)); 187 printf("ns per Submitted %'.f\n", 1000000000.0 * to_fseconds(end - start) / (submits / batch) ); 188 printf("ns per Completed %'.f\n", 1000000000.0 * to_fseconds(end - start) / (completes / batch) ); 350 189 } -
benchmark/io/readv.cfa
r49cad912 r3a32b3a 58 58 static struct option options[] = { 59 59 BENCH_OPT_LONG 60 {"bufsize", required_argument, 0, 'b'}, 61 {"userthread", no_argument , 0, 'u'}, 62 {"submitthread", no_argument , 0, 's'}, 63 {"submitlength", required_argument, 0, 'l'}, 60 {"bufsize", required_argument, 0, 'b'}, 61 {"userthread", no_argument , 0, 'u'}, 62 {"submitthread", no_argument , 0, 's'}, 63 {"eagersubmit", no_argument , 0, 'e'}, 64 {"kpollsubmit", no_argument , 0, 'k'}, 65 {"kpollcomplete", no_argument , 0, 'i'}, 66 {"submitlength", required_argument, 0, 'l'}, 64 67 {0, 0, 0, 0} 65 68 }; 66 69 67 70 int idx = 0; 68 int opt = getopt_long(argc, argv, BENCH_OPT_SHORT "b:us l:", options, &idx);71 int opt = getopt_long(argc, argv, BENCH_OPT_SHORT "b:usekil:", options, &idx); 69 72 70 73 const char * arg = optarg ? optarg : ""; … … 88 91 flags |= CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS; 89 92 break; 93 case 'e': 94 flags |= CFA_CLUSTER_IO_EAGER_SUBMITS; 95 break; 96 case 'k': 97 flags |= CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS; 98 break; 99 case 'i': 100 flags |= CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES; 101 break; 90 102 case 'l': 91 103 sublen = strtoul(arg, &end, 10); … … 103 115 fprintf( stderr, " -u, --userthread If set, cluster uses user-thread to poll I/O\n" ); 104 116 fprintf( stderr, " -s, --submitthread If set, cluster uses polling thread to submit I/O\n" ); 117 fprintf( stderr, " -e, --eagersubmit If set, cluster submits I/O eagerly but still aggregates submits\n" ); 118 fprintf( stderr, " -k, --kpollsubmit If set, cluster uses IORING_SETUP_SQPOLL\n" ); 119 fprintf( stderr, " -i, --kpollcomplete If set, cluster uses IORING_SETUP_IOPOLL\n" ); 120 fprintf( stderr, " -l, --submitlength=LEN Max number of submitions that can be submitted together\n" ); 105 121 exit(EXIT_FAILURE); 106 122 } -
libcfa/src/concurrency/io.cfa
r49cad912 r3a32b3a 109 109 volatile uint32_t * head; 110 110 volatile uint32_t * tail; 111 volatile uint32_t prev_head; 111 112 112 113 // The actual kernel ring which uses head/tail … … 129 130 130 131 __spinlock_t lock; 132 __spinlock_t release_lock; 131 133 132 134 // A buffer of sqes (not the actual ring) … … 182 184 //============================================================================================= 183 185 void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) { 186 if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) { 187 abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n"); 188 } 189 184 190 this.io = malloc(); 185 191 … … 187 193 struct io_uring_params params; 188 194 memset(¶ms, 0, sizeof(params)); 195 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS ) params.flags |= IORING_SETUP_SQPOLL; 196 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES ) params.flags |= IORING_SETUP_IOPOLL; 189 197 190 198 uint32_t nentries = entries_per_cluster(); … … 253 261 sq.dropped = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 254 262 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 263 sq.prev_head = *sq.head; 255 264 256 265 { … … 261 270 } 262 271 263 if( io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { 272 (sq.lock){}; 273 (sq.release_lock){}; 274 275 if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) { 264 276 /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8) ); 265 277 sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8); … … 421 433 // I/O Polling 422 434 //============================================================================================= 435 static unsigned __collect_submitions( struct __io_data & ring ); 436 static uint32_t __release_consumed_submission( struct __io_data & ring ); 437 423 438 // Process a single completion message from the io_uring 424 439 // This is NOT thread-safe 425 440 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) { 441 /* paranoid */ verify( !kernelTLS.preemption_state.enabled ); 442 const uint32_t smask = *ring.submit_q.mask; 443 426 444 unsigned to_submit = 0; 427 445 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { 428 429 446 // If the poller thread also submits, then we need to aggregate the submissions which are ready 430 uint32_t tail = *ring.submit_q.tail; 431 const uint32_t mask = *ring.submit_q.mask; 432 433 // Go through the list of ready submissions 434 for( i; ring.submit_q.ready_cnt ) { 435 // replace any submission with the sentinel, to consume it. 436 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); 437 438 // If it was already the sentinel, then we are done 439 if( idx == -1ul32 ) continue; 440 441 // If we got a real submission, append it to the list 442 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask; 443 to_submit++; 444 } 445 446 // Increment the tail based on how many we are ready to submit 447 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST); 448 } 449 450 const uint32_t smask = *ring.submit_q.mask; 451 uint32_t shead = *ring.submit_q.head; 452 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 453 if( ret < 0 ) { 454 switch((int)errno) { 455 case EAGAIN: 456 case EINTR: 457 return -EAGAIN; 458 default: 459 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 460 } 461 } 462 463 // Release the consumed SQEs 464 for( i; ret ) { 465 uint32_t idx = ring.submit_q.array[ (i + shead) & smask ]; 466 ring.submit_q.sqes[ idx ].user_data = 0; 467 } 468 469 uint32_t avail = 0; 470 uint32_t sqe_num = *ring.submit_q.num; 471 for(i; sqe_num) { 472 if( ring.submit_q.sqes[ i ].user_data == 0 ) avail++; 473 } 474 475 // update statistics 476 #if !defined(__CFA_NO_STATISTICS__) 477 __tls_stats()->io.submit_q.submit_avg.rdy += to_submit; 478 __tls_stats()->io.submit_q.submit_avg.csm += ret; 479 __tls_stats()->io.submit_q.submit_avg.avl += avail; 480 __tls_stats()->io.submit_q.submit_avg.cnt += 1; 481 #endif 447 to_submit = __collect_submitions( ring ); 448 } 449 450 if (to_submit > 0 || waitcnt > 0) { 451 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 452 if( ret < 0 ) { 453 switch((int)errno) { 454 case EAGAIN: 455 case EINTR: 456 return [0, true]; 457 default: 458 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 459 } 460 } 461 462 // Release the consumed SQEs 463 __release_consumed_submission( ring ); 464 465 // update statistics 466 __STATS__( true, 467 if( to_submit > 0 ) { 468 io.submit_q.submit_avg.rdy += to_submit; 469 io.submit_q.submit_avg.csm += ret; 470 io.submit_q.submit_avg.cnt += 1; 471 } 472 ) 473 } 474 475 // Memory barrier 476 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 482 477 483 478 // Drain the queue … … 486 481 const uint32_t mask = *ring.completion_q.mask; 487 482 488 // Memory barrier489 __atomic_thread_fence( __ATOMIC_SEQ_CST );490 491 483 // Nothing was new return 0 492 484 if (head == tail) { 493 return 0;485 return [0, to_submit > 0]; 494 486 } 495 487 496 488 uint32_t count = tail - head; 489 /* paranoid */ verify( count != 0 ); 497 490 for(i; count) { 498 491 unsigned idx = (head + i) & mask; … … 559 552 560 553 // Update statistics 561 #if !defined(__CFA_NO_STATISTICS__)562 __tls_stats()->io.complete_q.completed_avg.val += count;563 __tls_stats()->io.complete_q.completed_avg.slow_cnt += 1;564 #endif554 __STATS__( true, 555 io.complete_q.completed_avg.val += count; 556 io.complete_q.completed_avg.slow_cnt += 1; 557 ) 565 558 566 559 if(again) { … … 576 569 int count; 577 570 bool again; 578 [count, again] = __drain_io( ring, &mask, 0, true );571 [count, again] = __drain_io( ring, &mask, 1, true ); 579 572 580 573 // Update statistics 581 #if !defined(__CFA_NO_STATISTICS__)582 __tls_stats()->io.complete_q.completed_avg.val += count;583 __tls_stats()->io.complete_q.completed_avg.slow_cnt += 1;584 #endif574 __STATS__( true, 575 io.complete_q.completed_avg.val += count; 576 io.complete_q.completed_avg.slow_cnt += 1; 577 ) 585 578 } 586 579 } … … 619 612 620 613 // Update statistics 621 #if !defined(__CFA_NO_STATISTICS__)622 __tls_stats()->io.complete_q.completed_avg.val += count;623 __tls_stats()->io.complete_q.completed_avg.fast_cnt += 1;624 #endif614 __STATS__( true, 615 io.complete_q.completed_avg.val += count; 616 io.complete_q.completed_avg.fast_cnt += 1; 617 ) 625 618 enable_interrupts( __cfaabi_dbg_ctx ); 626 619 … … 658 651 659 652 // Submition steps : 660 // 1 - We need to make sure we don't overflow any of the buffer, P(ring.submit) to make sure 661 // entries are available. The semaphore make sure that there is no more operations in 662 // progress then the number of entries in the buffer. This probably limits concurrency 663 // more than necessary since submitted but not completed operations don't need any 664 // entries in user space. However, I don't know what happens if we overflow the buffers 665 // because too many requests completed at once. This is a safe approach in all cases. 666 // Furthermore, with hundreds of entries, this may be okay. 667 // 668 // 2 - Allocate a queue entry. The ring already has memory for all entries but only the ones 653 // 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones 669 654 // listed in sq.array are visible by the kernel. For those not listed, the kernel does not 670 655 // offer any assurance that an entry is not being filled by multiple flags. Therefore, we 671 656 // need to write an allocator that allows allocating concurrently. 672 657 // 673 // 3- Actually fill the submit entry, this is the only simple and straightforward step.658 // 2 - Actually fill the submit entry, this is the only simple and straightforward step. 674 659 // 675 // 4- Append the entry index to the array and adjust the tail accordingly. This operation660 // 3 - Append the entry index to the array and adjust the tail accordingly. This operation 676 661 // needs to arrive to two concensus at the same time: 677 662 // A - The order in which entries are listed in the array: no two threads must pick the … … 682 667 683 668 [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) { 684 verify( data != 0 ); 685 669 /* paranoid */ verify( data != 0 ); 686 670 687 671 // Prepare the data we need … … 708 692 { 709 693 // update statistics 710 #if !defined(__CFA_NO_STATISTICS__) 711 disable_interrupts(); 712 __tls_stats()->io.submit_q.alloc_avg.val += len; 713 __tls_stats()->io.submit_q.alloc_avg.block += block; 714 __tls_stats()->io.submit_q.alloc_avg.cnt += 1; 715 enable_interrupts( __cfaabi_dbg_ctx ); 716 #endif 694 __STATS__( false, 695 io.submit_q.alloc_avg.val += len; 696 io.submit_q.alloc_avg.block += block; 697 io.submit_q.alloc_avg.cnt += 1; 698 ) 717 699 718 700 … … 757 739 758 740 block++; 759 yield(); 741 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) { 742 __release_consumed_submission( ring ); 743 unlock( ring.submit_q.lock ); 744 } 745 else { 746 yield(); 747 } 760 748 } 761 749 762 750 // update statistics 763 #if !defined(__CFA_NO_STATISTICS__) 764 disable_interrupts(); 765 __tls_stats()->io.submit_q.look_avg.val += len; 766 __tls_stats()->io.submit_q.look_avg.block += block; 767 __tls_stats()->io.submit_q.look_avg.cnt += 1; 768 enable_interrupts( __cfaabi_dbg_ctx ); 769 #endif 751 __STATS__( false, 752 io.submit_q.look_avg.val += len; 753 io.submit_q.look_avg.block += block; 754 io.submit_q.look_avg.cnt += 1; 755 ) 770 756 771 757 return picked; … … 780 766 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { 781 767 // If the poller thread submits, then we just need to add this to the ready array 782 783 768 __submit_to_ready_array( ring, idx, mask ); 784 769 … … 786 771 787 772 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() ); 773 } 774 else if( ring.cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS ) { 775 uint32_t picked = __submit_to_ready_array( ring, idx, mask ); 776 777 for() { 778 yield(); 779 780 // If some one else collected our index, we are done 781 #warning ABA problem 782 if( ring.submit_q.ready[picked] != idx ) { 783 __STATS__( false, 784 io.submit_q.helped += 1; 785 ) 786 return; 787 } 788 789 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) { 790 __STATS__( false, 791 io.submit_q.leader += 1; 792 ) 793 break; 794 } 795 796 __STATS__( false, 797 io.submit_q.busy += 1; 798 ) 799 } 800 801 // We got the lock 802 unsigned to_submit = __collect_submitions( ring ); 803 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, 0, 0p, _NSIG / 8); 804 if( ret < 0 ) { 805 switch((int)errno) { 806 case EAGAIN: 807 case EINTR: 808 unlock(ring.submit_q.lock); 809 return; 810 default: 811 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 812 } 813 } 814 815 /* paranoid */ verify( ret > 0 ); 816 817 // Release the consumed SQEs 818 __release_consumed_submission( ring ); 819 820 // update statistics 821 __STATS__( true, 822 io.submit_q.submit_avg.rdy += to_submit; 823 io.submit_q.submit_avg.csm += ret; 824 io.submit_q.submit_avg.cnt += 1; 825 ) 826 827 unlock(ring.submit_q.lock); 788 828 } 789 829 else { … … 797 837 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 798 838 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 839 840 /* paranoid */ verify( ring.submit_q.sqes[ idx ].user_data != 0 ); 799 841 800 842 // Submit however, many entries need to be submitted … … 808 850 809 851 // update statistics 810 #if !defined(__CFA_NO_STATISTICS__) 811 __tls_stats()->io.submit_q.submit_avg.csm += 1; 812 __tls_stats()->io.submit_q.submit_avg.cnt += 1; 813 #endif 814 815 ring.submit_q.sqes[ idx & mask ].user_data = 0; 852 __STATS__( false, 853 io.submit_q.submit_avg.csm += 1; 854 io.submit_q.submit_avg.cnt += 1; 855 ) 856 857 // Release the consumed SQEs 858 __release_consumed_submission( ring ); 816 859 817 860 unlock(ring.submit_q.lock); … … 820 863 } 821 864 } 865 866 static unsigned __collect_submitions( struct __io_data & ring ) { 867 /* paranoid */ verify( ring.submit_q.ready != 0p ); 868 /* paranoid */ verify( ring.submit_q.ready_cnt > 0 ); 869 870 unsigned to_submit = 0; 871 uint32_t tail = *ring.submit_q.tail; 872 const uint32_t mask = *ring.submit_q.mask; 873 874 // Go through the list of ready submissions 875 for( i; ring.submit_q.ready_cnt ) { 876 // replace any submission with the sentinel, to consume it. 877 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); 878 879 // If it was already the sentinel, then we are done 880 if( idx == -1ul32 ) continue; 881 882 // If we got a real submission, append it to the list 883 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask; 884 to_submit++; 885 } 886 887 // Increment the tail based on how many we are ready to submit 888 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST); 889 890 return to_submit; 891 } 892 893 static uint32_t __release_consumed_submission( struct __io_data & ring ) { 894 const uint32_t smask = *ring.submit_q.mask; 895 896 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0; 897 uint32_t chead = *ring.submit_q.head; 898 uint32_t phead = ring.submit_q.prev_head; 899 ring.submit_q.prev_head = chead; 900 unlock(ring.submit_q.release_lock); 901 902 uint32_t count = chead - phead; 903 for( i; count ) { 904 uint32_t idx = ring.submit_q.array[ (phead + i) & smask ]; 905 ring.submit_q.sqes[ idx ].user_data = 0; 906 } 907 return count; 908 } 822 909 #endif -
libcfa/src/concurrency/kernel.hfa
r49cad912 r3a32b3a 129 129 struct __io_data; 130 130 131 #define CFA_CLUSTER_IO_POLLER_USER_THREAD 1 << 0 // 0x1 132 #define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS 1 << 1 // 0x2 133 // #define CFA_CLUSTER_IO_POLLER_KERNEL_SIDE 1 << 2 // 0x4 131 #define CFA_CLUSTER_IO_POLLER_USER_THREAD (1 << 0) // 0x01 132 #define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS (1 << 1) // 0x02 133 #define CFA_CLUSTER_IO_EAGER_SUBMITS (1 << 2) // 0x04 134 #define CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS (1 << 3) // 0x08 135 #define CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES (1 << 4) // 0x10 134 136 #define CFA_CLUSTER_IO_BUFFLEN_OFFSET 16 135 137 -
libcfa/src/concurrency/kernel_private.hfa
r49cad912 r3a32b3a 286 286 // Statics call at the end of each thread to register statistics 287 287 #if !defined(__CFA_NO_STATISTICS__) 288 static inline struct __stats_t * __tls_stats() { 289 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 290 /* paranoid */ verify( kernelTLS.this_stats ); 291 return kernelTLS.this_stats; 292 } 288 static inline struct __stats_t * __tls_stats() { 289 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 290 /* paranoid */ verify( kernelTLS.this_stats ); 291 return kernelTLS.this_stats; 292 } 293 294 #define __STATS__(in_kernel, ...) { \ 295 if( !(in_kernel) ) disable_interrupts(); \ 296 with( *__tls_stats() ) { \ 297 __VA_ARGS__ \ 298 } \ 299 if( !(in_kernel) ) enable_interrupts( __cfaabi_dbg_ctx ); \ 300 } 301 #else 302 #define __STATS__(in_kernel, ...) 293 303 #endif 294 304 -
libcfa/src/concurrency/preemption.cfa
r49cad912 r3a32b3a 186 186 void enable_interrupts( __cfaabi_dbg_ctx_param ) { 187 187 processor * proc = kernelTLS.this_processor; // Cache the processor now since interrupts can start happening after the atomic store 188 /* paranoid */ verify( proc ); 188 189 189 190 with( kernelTLS.preemption_state ){ -
libcfa/src/concurrency/stats.cfa
r49cad912 r3a32b3a 27 27 stats->io.submit_q.submit_avg.rdy = 0; 28 28 stats->io.submit_q.submit_avg.csm = 0; 29 stats->io.submit_q.submit_avg.avl = 0;30 29 stats->io.submit_q.submit_avg.cnt = 0; 31 30 stats->io.submit_q.look_avg.val = 0; … … 35 34 stats->io.submit_q.alloc_avg.cnt = 0; 36 35 stats->io.submit_q.alloc_avg.block = 0; 36 stats->io.submit_q.helped = 0; 37 stats->io.submit_q.leader = 0; 38 stats->io.submit_q.busy = 0; 37 39 stats->io.complete_q.completed_avg.val = 0; 38 40 stats->io.complete_q.completed_avg.slow_cnt = 0; … … 68 70 __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.cnt , proc->io.submit_q.alloc_avg.cnt , __ATOMIC_SEQ_CST ); 69 71 __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.block , proc->io.submit_q.alloc_avg.block , __ATOMIC_SEQ_CST ); 72 __atomic_fetch_add( &cltr->io.submit_q.helped , proc->io.submit_q.helped , __ATOMIC_SEQ_CST ); 73 __atomic_fetch_add( &cltr->io.submit_q.leader , proc->io.submit_q.leader , __ATOMIC_SEQ_CST ); 74 __atomic_fetch_add( &cltr->io.submit_q.busy , proc->io.submit_q.busy , __ATOMIC_SEQ_CST ); 70 75 __atomic_fetch_add( &cltr->io.complete_q.completed_avg.val , proc->io.complete_q.completed_avg.val , __ATOMIC_SEQ_CST ); 71 76 __atomic_fetch_add( &cltr->io.complete_q.completed_avg.slow_cnt, proc->io.complete_q.completed_avg.slow_cnt, __ATOMIC_SEQ_CST ); … … 120 125 double avgrdy = ((double)io.submit_q.submit_avg.rdy) / io.submit_q.submit_avg.cnt; 121 126 double avgcsm = ((double)io.submit_q.submit_avg.csm) / io.submit_q.submit_avg.cnt; 122 double avgavl = ((double)io.submit_q.submit_avg.avl) / io.submit_q.submit_avg.cnt;123 127 124 128 double lavgv = 0; … … 141 145 "- avg ready entries : %'18.2lf\n" 142 146 "- avg submitted entries : %'18.2lf\n" 143 "- avg available entries : %'18.2lf\n" 147 "- total helped entries : %'15" PRIu64 "\n" 148 "- total leader entries : %'15" PRIu64 "\n" 149 "- total busy submit : %'15" PRIu64 "\n" 144 150 "- total ready search : %'15" PRIu64 "\n" 145 151 "- avg ready search len : %'18.2lf\n" … … 153 159 , cluster ? "Cluster" : "Processor", name, id 154 160 , io.submit_q.submit_avg.cnt 155 , avgrdy, avgcsm, avgavl 161 , avgrdy, avgcsm 162 , io.submit_q.helped, io.submit_q.leader, io.submit_q.busy 156 163 , io.submit_q.look_avg.cnt 157 164 , lavgv, lavgb -
libcfa/src/concurrency/stats.hfa
r49cad912 r3a32b3a 83 83 volatile uint64_t block; 84 84 } alloc_avg; 85 volatile uint64_t helped; 86 volatile uint64_t leader; 87 volatile uint64_t busy; 85 88 } submit_q; 86 89 struct {
Note: See TracChangeset
for help on using the changeset viewer.