Changeset 20ab637
- Timestamp:
- Jul 10, 2020, 2:17:49 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 59f74a2
- Parents:
- 3a32b3a
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/readv.cfa
r3a32b3a r20ab637 1 #define _GNU_SOURCE 2 1 3 #include <stdlib.h> 2 4 #include <stdio.h> … … 22 24 extern bool traceHeapOn(); 23 25 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags); 26 extern ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags); 27 extern void register_fixed_files( cluster &, int *, unsigned count ); 24 28 25 29 int fd; … … 28 32 29 33 unsigned long int buflen = 50; 34 bool fixed_file = false; 30 35 31 36 thread __attribute__((aligned(128))) Reader {}; 32 37 void ?{}( Reader & this ) { 33 38 ((thread&)this){ "Reader Thread", *the_benchmark_cluster }; 39 } 40 41 int do_read(int fd, struct iovec * iov) { 42 if(fixed_file) { 43 return cfa_preadv2_fixed(fd, iov, 1, 0, 0); 44 } 45 else { 46 return cfa_preadv2(fd, iov, 1, 0, 0); 47 } 34 48 } 35 49 … … 42 56 43 57 while(__atomic_load_n(&run, __ATOMIC_RELAXED)) { 44 int r = cfa_preadv2(fd, &iov, 1, 0, 0);58 int r = do_read(fd, &iov); 45 59 if(r < 0) abort("%s\n", strerror(-r)); 46 60 … … 52 66 BENCH_DECL 53 67 unsigned flags = 0; 68 int file_flags = 0; 54 69 unsigned sublen = 16; 55 70 … … 96 111 case 'k': 97 112 flags |= CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS; 113 fixed_file = true; 98 114 break; 99 115 case 'i': 100 116 flags |= CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES; 117 file_flags |= O_DIRECT; 101 118 break; 102 119 case 'l': … … 123 140 } 124 141 125 fd = open(__FILE__, 0);126 if( fd < 0) {142 int lfd = open(__FILE__, file_flags); 143 if(lfd < 0) { 127 144 fprintf(stderr, "Could not open source file\n"); 128 145 exit(EXIT_FAILURE); … … 134 151 Time start, end; 135 152 BenchCluster cl = { flags, CFA_STATS_READY_Q | CFA_STATS_IO }; 153 154 if(fixed_file) { 155 fd = 0; 156 register_fixed_files( cl.self, &lfd, 1 ); 157 } 158 else { 159 fd = lfd; 160 } 161 136 162 { 137 163 BenchProc procs[nprocs]; … … 161 187 } 162 188 163 close( fd);189 close(lfd); 164 190 } -
libcfa/src/concurrency/io.cfa
r3a32b3a r20ab637 14 14 // 15 15 16 // #define __CFA_DEBUG_PRINT_IO__ 17 // #define __CFA_DEBUG_PRINT_IO_CORE__ 16 #if defined(__CFA_DEBUG__) 17 // #define __CFA_DEBUG_PRINT_IO__ 18 #define __CFA_DEBUG_PRINT_IO_CORE__ 19 #endif 18 20 19 21 #include "kernel.hfa" … … 325 327 326 328 // Create the poller thread 327 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for clu ter %p\n", &this);329 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this); 328 330 this.io->poller.slow.blocked = false; 329 331 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this ); … … 430 432 } 431 433 434 int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) { 435 bool need_sys_to_submit = false; 436 bool need_sys_to_complete = false; 437 unsigned min_complete = 0; 438 unsigned flags = 0; 439 440 441 TO_SUBMIT: 442 if( to_submit > 0 ) { 443 if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 444 need_sys_to_submit = true; 445 break TO_SUBMIT; 446 } 447 if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) { 448 need_sys_to_submit = true; 449 flags |= IORING_ENTER_SQ_WAKEUP; 450 } 451 } 452 453 TO_COMPLETE: 454 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 455 flags |= IORING_ENTER_GETEVENTS; 456 if( mask ) { 457 need_sys_to_complete = true; 458 min_complete = 1; 459 break TO_COMPLETE; 460 } 461 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) { 462 need_sys_to_complete = true; 463 } 464 } 465 466 int ret = 0; 467 if( need_sys_to_submit || need_sys_to_complete ) { 468 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8); 469 if( ret < 0 ) { 470 switch((int)errno) { 471 case EAGAIN: 472 case EINTR: 473 ret = -1; 474 break; 475 default: 476 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 477 } 478 } 479 } 480 481 // Memory barrier 482 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 483 return ret; 484 } 485 432 486 //============================================================================================= 433 487 // I/O Polling … … 438 492 // Process a single completion message from the io_uring 439 493 // This is NOT thread-safe 440 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask , int waitcnt, bool in_kernel) {494 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) { 441 495 /* paranoid */ verify( !kernelTLS.preemption_state.enabled ); 442 496 const uint32_t smask = *ring.submit_q.mask; … … 448 502 } 449 503 450 if (to_submit > 0 || waitcnt > 0) { 451 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 452 if( ret < 0 ) { 453 switch((int)errno) { 454 case EAGAIN: 455 case EINTR: 456 return [0, true]; 457 default: 458 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 459 } 460 } 461 462 // Release the consumed SQEs 463 __release_consumed_submission( ring ); 464 465 // update statistics 504 int ret = __io_uring_enter(ring, to_submit, true, mask); 505 if( ret < 0 ) { 506 return [0, true]; 507 } 508 509 // update statistics 510 if (to_submit > 0) { 466 511 __STATS__( true, 467 512 if( to_submit > 0 ) { … … 473 518 } 474 519 475 // Memory barrier476 __ atomic_thread_fence( __ATOMIC_SEQ_CST);520 // Release the consumed SQEs 521 __release_consumed_submission( ring ); 477 522 478 523 // Drain the queue … … 498 543 499 544 data->result = cqe.res; 500 if(! in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }501 else 545 if(!mask) { unpark( data->thrd __cfaabi_dbg_ctx2 ); } 546 else { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); } 502 547 } 503 548 … … 547 592 int count; 548 593 bool again; 549 [count, again] = __drain_io( ring, &mask , 1, true);594 [count, again] = __drain_io( ring, &mask ); 550 595 551 596 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST ); … … 569 614 int count; 570 615 bool again; 571 [count, again] = __drain_io( ring, &mask , 1, true);616 [count, again] = __drain_io( ring, &mask ); 572 617 573 618 // Update statistics … … 607 652 bool again; 608 653 disable_interrupts(); 609 [count, again] = __drain_io( *this.ring, 0p , 0, false);654 [count, again] = __drain_io( *this.ring, 0p ); 610 655 611 656 if(!again) reset++; … … 801 846 // We got the lock 802 847 unsigned to_submit = __collect_submitions( ring ); 803 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, 0, 0p, _NSIG / 8);848 int ret = __io_uring_enter( ring, to_submit, false, 0p ); 804 849 if( ret < 0 ) { 805 switch((int)errno) { 806 case EAGAIN: 807 case EINTR: 808 unlock(ring.submit_q.lock); 809 return; 810 default: 811 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 812 } 813 } 814 815 /* paranoid */ verify( ret > 0 ); 850 unlock(ring.submit_q.lock); 851 return; 852 } 853 854 /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) ); 816 855 817 856 // Release the consumed SQEs … … 831 870 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 832 871 872 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0, 873 /* paranoid */ "index %u already reclaimed\n" 874 /* paranoid */ "head %u, prev %u, tail %u\n" 875 /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n", 876 /* paranoid */ idx, 877 /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail 878 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ] 879 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ] 880 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ] 881 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ] 882 /* paranoid */ ); 883 833 884 // Append to the list of ready entries 834 885 835 886 /* paranoid */ verify( idx <= mask ); 836 837 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 887 ring.submit_q.array[ (*tail) & mask ] = idx; 838 888 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 839 889 840 /* paranoid */ verify( ring.submit_q.sqes[ idx ].user_data != 0 );841 842 890 // Submit however, many entries need to be submitted 843 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);891 int ret = __io_uring_enter( ring, 1, false, 0p ); 844 892 if( ret < 0 ) { 845 893 switch((int)errno) { … … 907 955 return count; 908 956 } 957 958 //============================================================================================= 959 // I/O Submissions 960 //============================================================================================= 961 962 void register_fixed_files( cluster & cl, int * files, unsigned count ) { 963 int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count ); 964 if( ret < 0 ) { 965 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 966 } 967 968 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret ); 969 } 909 970 #endif -
libcfa/src/concurrency/iocall.cfa
r3a32b3a r20ab637 128 128 #endif 129 129 } 130 131 ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 132 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READV) 133 return preadv2(fd, iov, iovcnt, offset, flags); 134 #else 135 __submit_prelude 136 137 (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset }; 138 sqe->flags |= IOSQE_FIXED_FILE; 139 140 __submit_wait 141 #endif 142 } 130 143 #endif 131 144
Note: See TracChangeset
for help on using the changeset viewer.