Changeset 7922158
- Timestamp:
- Jul 10, 2020, 4:07:23 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- d34575b
- Parents:
- ab44413 (diff), 365e423 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/readv.cfa
rab44413 r7922158 1 #define _GNU_SOURCE 2 1 3 #include <stdlib.h> 2 4 #include <stdio.h> … … 22 24 extern bool traceHeapOn(); 23 25 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags); 26 extern ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags); 27 extern void register_fixed_files( cluster &, int *, unsigned count ); 24 28 25 29 int fd; … … 28 32 29 33 unsigned long int buflen = 50; 34 bool fixed_file = false; 30 35 31 36 thread __attribute__((aligned(128))) Reader {}; 32 37 void ?{}( Reader & this ) { 33 38 ((thread&)this){ "Reader Thread", *the_benchmark_cluster }; 39 } 40 41 int do_read(int fd, struct iovec * iov) { 42 if(fixed_file) { 43 return cfa_preadv2_fixed(fd, iov, 1, 0, 0); 44 } 45 else { 46 return cfa_preadv2(fd, iov, 1, 0, 0); 47 } 34 48 } 35 49 … … 42 56 43 57 while(__atomic_load_n(&run, __ATOMIC_RELAXED)) { 44 int r = cfa_preadv2(fd, &iov, 1, 0, 0);58 int r = do_read(fd, &iov); 45 59 if(r < 0) abort("%s\n", strerror(-r)); 46 60 … … 52 66 BENCH_DECL 53 67 unsigned flags = 0; 68 int file_flags = 0; 54 69 unsigned sublen = 16; 55 70 … … 96 111 case 'k': 97 112 flags |= CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS; 113 fixed_file = true; 98 114 break; 99 115 case 'i': 100 116 flags |= CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES; 117 file_flags |= O_DIRECT; 101 118 break; 102 119 case 'l': … … 123 140 } 124 141 125 fd = open(__FILE__, 0);126 if( fd < 0) {142 int lfd = open(__FILE__, file_flags); 143 if(lfd < 0) { 127 144 fprintf(stderr, "Could not open source file\n"); 128 145 exit(EXIT_FAILURE); … … 134 151 Time start, end; 135 152 BenchCluster cl = { flags, CFA_STATS_READY_Q | CFA_STATS_IO }; 153 154 if(fixed_file) { 155 fd = 0; 156 register_fixed_files( cl.self, &lfd, 1 ); 157 } 158 else { 159 fd = lfd; 160 } 161 136 162 { 137 163 BenchProc procs[nprocs]; … … 161 187 } 162 188 163 close( fd);189 close(lfd); 164 190 } -
libcfa/src/concurrency/io.cfa
rab44413 r7922158 14 14 // 15 15 16 // #define __CFA_DEBUG_PRINT_IO__ 17 // #define __CFA_DEBUG_PRINT_IO_CORE__ 16 #if defined(__CFA_DEBUG__) 17 // #define __CFA_DEBUG_PRINT_IO__ 18 #define __CFA_DEBUG_PRINT_IO_CORE__ 19 #endif 18 20 19 21 #include "kernel.hfa" … … 325 327 326 328 // Create the poller thread 327 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for clu ter %p\n", &this);329 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this); 328 330 this.io->poller.slow.blocked = false; 329 331 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this ); … … 430 432 } 431 433 434 int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) { 435 bool need_sys_to_submit = false; 436 bool need_sys_to_complete = false; 437 unsigned min_complete = 0; 438 unsigned flags = 0; 439 440 441 TO_SUBMIT: 442 if( to_submit > 0 ) { 443 if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 444 need_sys_to_submit = true; 445 break TO_SUBMIT; 446 } 447 if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) { 448 need_sys_to_submit = true; 449 flags |= IORING_ENTER_SQ_WAKEUP; 450 } 451 } 452 453 TO_COMPLETE: 454 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 455 flags |= IORING_ENTER_GETEVENTS; 456 if( mask ) { 457 need_sys_to_complete = true; 458 min_complete = 1; 459 break TO_COMPLETE; 460 } 461 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) { 462 need_sys_to_complete = true; 463 } 464 } 465 466 int ret = 0; 467 if( need_sys_to_submit || need_sys_to_complete ) { 468 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8); 469 if( ret < 0 ) { 470 switch((int)errno) { 471 case EAGAIN: 472 case EINTR: 473 ret = -1; 474 break; 475 default: 476 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 477 } 478 } 479 } 480 481 // Memory barrier 482 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 483 return ret; 484 } 485 432 486 //============================================================================================= 433 487 // I/O Polling … … 438 492 // Process a single completion message from the io_uring 439 493 // This is NOT thread-safe 440 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask , int waitcnt, bool in_kernel) {494 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) { 441 495 /* paranoid */ verify( !kernelTLS.preemption_state.enabled ); 442 496 … … 447 501 } 448 502 449 if (to_submit > 0 || waitcnt > 0) { 450 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 451 if( ret < 0 ) { 452 switch((int)errno) { 453 case EAGAIN: 454 case EINTR: 455 return [0, true]; 456 default: 457 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 458 } 459 } 460 461 // Release the consumed SQEs 462 __release_consumed_submission( ring ); 463 464 // update statistics 503 int ret = __io_uring_enter(ring, to_submit, true, mask); 504 if( ret < 0 ) { 505 return [0, true]; 506 } 507 508 // update statistics 509 if (to_submit > 0) { 465 510 __STATS__( true, 466 511 if( to_submit > 0 ) { … … 472 517 } 473 518 474 // Memory barrier475 __ atomic_thread_fence( __ATOMIC_SEQ_CST);519 // Release the consumed SQEs 520 __release_consumed_submission( ring ); 476 521 477 522 // Drain the queue … … 497 542 498 543 data->result = cqe.res; 499 if(! in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }500 else 544 if(!mask) { unpark( data->thrd __cfaabi_dbg_ctx2 ); } 545 else { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); } 501 546 } 502 547 … … 546 591 int count; 547 592 bool again; 548 [count, again] = __drain_io( ring, &mask , 1, true);593 [count, again] = __drain_io( ring, &mask ); 549 594 550 595 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST ); … … 568 613 int count; 569 614 bool again; 570 [count, again] = __drain_io( ring, &mask , 1, true);615 [count, again] = __drain_io( ring, &mask ); 571 616 572 617 // Update statistics … … 606 651 bool again; 607 652 disable_interrupts(); 608 [count, again] = __drain_io( *this.ring, 0p , 0, false);653 [count, again] = __drain_io( *this.ring, 0p ); 609 654 610 655 if(!again) reset++; … … 800 845 // We got the lock 801 846 unsigned to_submit = __collect_submitions( ring ); 802 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, 0, 0p, _NSIG / 8);847 int ret = __io_uring_enter( ring, to_submit, false, 0p ); 803 848 if( ret < 0 ) { 804 switch((int)errno) { 805 case EAGAIN: 806 case EINTR: 807 unlock(ring.submit_q.lock); 808 return; 809 default: 810 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 811 } 812 } 813 814 /* paranoid */ verify( ret > 0 ); 849 unlock(ring.submit_q.lock); 850 return; 851 } 852 853 /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) ); 815 854 816 855 // Release the consumed SQEs … … 830 869 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 831 870 871 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0, 872 /* paranoid */ "index %u already reclaimed\n" 873 /* paranoid */ "head %u, prev %u, tail %u\n" 874 /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n", 875 /* paranoid */ idx, 876 /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail 877 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ] 878 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ] 879 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ] 880 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ] 881 /* paranoid */ ); 882 832 883 // Append to the list of ready entries 833 884 834 885 /* paranoid */ verify( idx <= mask ); 835 836 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 886 ring.submit_q.array[ (*tail) & mask ] = idx; 837 887 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 838 888 839 /* paranoid */ verify( ring.submit_q.sqes[ idx ].user_data != 0 );840 841 889 // Submit however, many entries need to be submitted 842 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);890 int ret = __io_uring_enter( ring, 1, false, 0p ); 843 891 if( ret < 0 ) { 844 892 switch((int)errno) { … … 906 954 return count; 907 955 } 956 957 //============================================================================================= 958 // I/O Submissions 959 //============================================================================================= 960 961 void register_fixed_files( cluster & cl, int * files, unsigned count ) { 962 int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count ); 963 if( ret < 0 ) { 964 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 965 } 966 967 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret ); 968 } 908 969 #endif -
libcfa/src/concurrency/iocall.cfa
rab44413 r7922158 108 108 109 109 extern ssize_t read (int fd, void *buf, size_t count); 110 111 extern ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags); 112 extern ssize_t tee(int fd_in, int fd_out, size_t len, unsigned int flags); 110 113 } 111 114 … … 128 131 #endif 129 132 } 133 134 ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 135 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READV) 136 return preadv2(fd, iov, iovcnt, offset, flags); 137 #else 138 __submit_prelude 139 140 (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset }; 141 sqe->flags |= IOSQE_FIXED_FILE; 142 143 __submit_wait 144 #endif 145 } 130 146 #endif 131 147 … … 329 345 } 330 346 331 332 347 ssize_t cfa_read(int fd, void *buf, size_t count) { 333 348 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READ) … … 349 364 350 365 (*sqe){ IORING_OP_WRITE, fd, buf, count, 0 }; 366 367 __submit_wait 368 #endif 369 } 370 371 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags) { 372 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SPLICE) 373 return splice( fd_in, off_in, fd_out, off_out, len, flags ); 374 #else 375 __submit_prelude 376 377 (*sqe){ IORING_OP_SPLICE, fd_out, 0p, len, off_out }; 378 sqe->splice_fd_in = fd_in; 379 sqe->splice_off_in = off_in; 380 sqe->splice_flags = flags; 381 382 __submit_wait 383 #endif 384 } 385 386 ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags) { 387 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_TEE) 388 return tee( fd_in, fd_out, len, flags ); 389 #else 390 __submit_prelude 391 392 (*sqe){ IORING_OP_TEE, fd_out, 0p, len, 0 }; 393 sqe->splice_fd_in = fd_in; 394 sqe->splice_flags = flags; 351 395 352 396 __submit_wait … … 453 497 #define _CFA_IO_FEATURE_IORING_OP_WRITE , 454 498 return IS_DEFINED(IORING_OP_WRITE); 499 500 if( /*func == (fptr_t)splice || */ 501 func == (fptr_t)cfa_splice ) 502 #define _CFA_IO_FEATURE_IORING_OP_SPLICE , 503 return IS_DEFINED(IORING_OP_SPLICE); 504 505 if( /*func == (fptr_t)tee || */ 506 func == (fptr_t)cfa_tee ) 507 #define _CFA_IO_FEATURE_IORING_OP_TEE , 508 return IS_DEFINED(IORING_OP_TEE); 455 509 #endif 456 510
Note: See TracChangeset
for help on using the changeset viewer.