Changes in / [59f74a2:0376da9]
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/readv.cfa
r59f74a2 r0376da9 1 #define _GNU_SOURCE2 3 1 #include <stdlib.h> 4 2 #include <stdio.h> … … 24 22 extern bool traceHeapOn(); 25 23 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags); 26 extern ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);27 extern void register_fixed_files( cluster &, int *, unsigned count );28 24 29 25 int fd; … … 32 28 33 29 unsigned long int buflen = 50; 34 bool fixed_file = false;35 30 36 31 thread __attribute__((aligned(128))) Reader {}; 37 32 void ?{}( Reader & this ) { 38 33 ((thread&)this){ "Reader Thread", *the_benchmark_cluster }; 39 }40 41 int do_read(int fd, struct iovec * iov) {42 if(fixed_file) {43 return cfa_preadv2_fixed(fd, iov, 1, 0, 0);44 }45 else {46 return cfa_preadv2(fd, iov, 1, 0, 0);47 }48 34 } 49 35 … … 56 42 57 43 while(__atomic_load_n(&run, __ATOMIC_RELAXED)) { 58 int r = do_read(fd, &iov);44 int r = cfa_preadv2(fd, &iov, 1, 0, 0); 59 45 if(r < 0) abort("%s\n", strerror(-r)); 60 46 … … 66 52 BENCH_DECL 67 53 unsigned flags = 0; 68 int file_flags = 0;69 54 unsigned sublen = 16; 70 55 … … 111 96 case 'k': 112 97 flags |= CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS; 113 fixed_file = true;114 98 break; 115 99 case 'i': 116 100 flags |= CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES; 117 file_flags |= O_DIRECT;118 101 break; 119 102 case 'l': … … 140 123 } 141 124 142 int lfd = open(__FILE__, file_flags);143 if( lfd < 0) {125 fd = open(__FILE__, 0); 126 if(fd < 0) { 144 127 fprintf(stderr, "Could not open source file\n"); 145 128 exit(EXIT_FAILURE); … … 151 134 Time start, end; 152 135 BenchCluster cl = { flags, CFA_STATS_READY_Q | CFA_STATS_IO }; 153 154 if(fixed_file) {155 fd = 0;156 register_fixed_files( cl.self, &lfd, 1 );157 }158 else {159 fd = lfd;160 }161 162 136 { 163 137 BenchProc procs[nprocs]; … … 187 161 } 188 162 189 close( lfd);163 close(fd); 190 164 } -
libcfa/src/concurrency/io.cfa
r59f74a2 r0376da9 14 14 // 15 15 16 #if defined(__CFA_DEBUG__) 17 // #define __CFA_DEBUG_PRINT_IO__ 18 #define __CFA_DEBUG_PRINT_IO_CORE__ 19 #endif 16 // #define __CFA_DEBUG_PRINT_IO__ 17 // #define __CFA_DEBUG_PRINT_IO_CORE__ 20 18 21 19 #include "kernel.hfa" … … 327 325 328 326 // Create the poller thread 329 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for clu ster %p\n", &this);327 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this); 330 328 this.io->poller.slow.blocked = false; 331 329 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this ); … … 432 430 } 433 431 434 int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) {435 bool need_sys_to_submit = false;436 bool need_sys_to_complete = false;437 unsigned min_complete = 0;438 unsigned flags = 0;439 440 441 TO_SUBMIT:442 if( to_submit > 0 ) {443 if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {444 need_sys_to_submit = true;445 break TO_SUBMIT;446 }447 if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {448 need_sys_to_submit = true;449 flags |= IORING_ENTER_SQ_WAKEUP;450 }451 }452 453 TO_COMPLETE:454 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {455 flags |= IORING_ENTER_GETEVENTS;456 if( mask ) {457 need_sys_to_complete = true;458 min_complete = 1;459 break TO_COMPLETE;460 }461 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {462 need_sys_to_complete = true;463 }464 }465 466 int ret = 0;467 if( need_sys_to_submit || need_sys_to_complete ) {468 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);469 if( ret < 0 ) {470 switch((int)errno) {471 case EAGAIN:472 case EINTR:473 ret = -1;474 break;475 default:476 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );477 }478 }479 }480 481 // Memory barrier482 __atomic_thread_fence( __ATOMIC_SEQ_CST );483 return ret;484 }485 486 432 //============================================================================================= 487 433 // I/O Polling … … 492 438 // Process a single completion message from the io_uring 493 439 // This is NOT thread-safe 494 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) {440 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) { 495 441 /* paranoid */ verify( !kernelTLS.preemption_state.enabled ); 496 442 … … 501 447 } 502 448 503 int ret = __io_uring_enter(ring, to_submit, true, mask); 504 if( ret < 0 ) { 505 return [0, true]; 506 } 507 508 // update statistics 509 if (to_submit > 0) { 449 if (to_submit > 0 || waitcnt > 0) { 450 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 451 if( ret < 0 ) { 452 switch((int)errno) { 453 case EAGAIN: 454 case EINTR: 455 return [0, true]; 456 default: 457 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 458 } 459 } 460 461 // Release the consumed SQEs 462 __release_consumed_submission( ring ); 463 464 // update statistics 510 465 __STATS__( true, 511 466 if( to_submit > 0 ) { … … 517 472 } 518 473 519 // Release the consumed SQEs520 __ release_consumed_submission( ring);474 // Memory barrier 475 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 521 476 522 477 // Drain the queue … … 542 497 543 498 data->result = cqe.res; 544 if(! mask) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }545 else { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); }499 if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); } 500 else { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); } 546 501 } 547 502 … … 591 546 int count; 592 547 bool again; 593 [count, again] = __drain_io( ring, &mask );548 [count, again] = __drain_io( ring, &mask, 1, true ); 594 549 595 550 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST ); … … 613 568 int count; 614 569 bool again; 615 [count, again] = __drain_io( ring, &mask );570 [count, again] = __drain_io( ring, &mask, 1, true ); 616 571 617 572 // Update statistics … … 651 606 bool again; 652 607 disable_interrupts(); 653 [count, again] = __drain_io( *this.ring, 0p );608 [count, again] = __drain_io( *this.ring, 0p, 0, false ); 654 609 655 610 if(!again) reset++; … … 845 800 // We got the lock 846 801 unsigned to_submit = __collect_submitions( ring ); 847 int ret = __io_uring_enter( ring, to_submit, false, 0p);802 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, 0, 0p, _NSIG / 8); 848 803 if( ret < 0 ) { 849 unlock(ring.submit_q.lock); 850 return; 851 } 852 853 /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) ); 804 switch((int)errno) { 805 case EAGAIN: 806 case EINTR: 807 unlock(ring.submit_q.lock); 808 return; 809 default: 810 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 811 } 812 } 813 814 /* paranoid */ verify( ret > 0 ); 854 815 855 816 // Release the consumed SQEs … … 869 830 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 870 831 871 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,872 /* paranoid */ "index %u already reclaimed\n"873 /* paranoid */ "head %u, prev %u, tail %u\n"874 /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n",875 /* paranoid */ idx,876 /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail877 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]878 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]879 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]880 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]881 /* paranoid */ );882 883 832 // Append to the list of ready entries 884 833 885 834 /* paranoid */ verify( idx <= mask ); 886 ring.submit_q.array[ (*tail) & mask ] = idx; 835 836 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 887 837 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 888 838 839 /* paranoid */ verify( ring.submit_q.sqes[ idx ].user_data != 0 ); 840 889 841 // Submit however, many entries need to be submitted 890 int ret = __io_uring_enter( ring, 1, false, 0p);842 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0); 891 843 if( ret < 0 ) { 892 844 switch((int)errno) { … … 954 906 return count; 955 907 } 956 957 //=============================================================================================958 // I/O Submissions959 //=============================================================================================960 961 void register_fixed_files( cluster & cl, int * files, unsigned count ) {962 int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );963 if( ret < 0 ) {964 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );965 }966 967 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );968 }969 908 #endif -
libcfa/src/concurrency/iocall.cfa
r59f74a2 r0376da9 128 128 #endif 129 129 } 130 131 ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {132 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READV)133 return preadv2(fd, iov, iovcnt, offset, flags);134 #else135 __submit_prelude136 137 (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset };138 sqe->flags |= IOSQE_FIXED_FILE;139 140 __submit_wait141 #endif142 }143 130 #endif 144 131
Note: See TracChangeset
for help on using the changeset viewer.