#define _GNU_SOURCE #include #include #include extern "C" { #include #include #include #include } #include #include #include #include #include #include #include #include #include "../benchcltr.hfa" extern bool traceHeapOn(); int fd; volatile bool run = false; volatile size_t count = 0; unsigned long int buflen = 512; bool fixed_file = false; thread __attribute__((aligned(128))) Reader {}; void ?{}( Reader & this ) { ((thread&)this){ "Reader Thread", *the_benchmark_cluster }; } int do_read(int fd, struct iovec * iov) { // extern ssize_t cfa_preadv2(int, const struct iovec *, int, off_t, int, int = 0, Duration = -1`s, io_cancellation * = 0p, io_context * = 0p); int sflags = 0 #if defined(CFA_HAVE_IOSQE_ASYNC) | CFA_IO_ASYNC #else #warning no CFA_IO_ASYNC support #endif ; if(fixed_file) { sflags |= CFA_IO_FIXED_FD1; } return cfa_preadv2(fd, iov, 1, 0, 0, sflags, -1`s, 0p, 0p); } void main( Reader & ) { park(); /* paranoid */ assert( true == __atomic_load_n(&run, __ATOMIC_RELAXED) ); __attribute__((aligned(512))) char data[buflen]; struct iovec iov = { data, buflen }; while(__atomic_load_n(&run, __ATOMIC_RELAXED)) { int r = do_read(fd, &iov); if(r < 0) abort("%s\n", strerror(errno)); __atomic_fetch_add( &count, 1, __ATOMIC_SEQ_CST ); } } int main(int argc, char * argv[]) { int file_flags = 0; unsigned num_io = 1; unsigned sublen = 16; unsigned nentries = 0; bool subthrd = false; bool subeagr = false; bool odirect = false; bool kpollsb = false; bool kpollcp = false; cfa_option opt[] = { BENCH_OPT_CFA {'b', "bufsize", "Number of bytes to read per request", buflen}, {'s', "submitthread", "If set, cluster uses polling thread to submit I/O", subthrd, parse_settrue}, {'e', "eagersubmit", "If set, cluster submits I/O eagerly but still aggregates submits", subeagr, parse_settrue}, {'f', "fixed-files", "Pre-register files with the io_contexts", fixed_file, parse_settrue}, {'o', "open-direct", "Open files with O_DIRECT flag, bypassing the file cache", odirect, parse_settrue}, {'k', "kpollsubmit", "If set, cluster uses an in kernel thread to poll submission, implies -f, requires elevated permissions", kpollsb, parse_settrue}, {'i', "kpollcomplete", "If set, cluster polls fds for completions instead of relying on interrupts to get notifications, implies -o", kpollcp, parse_settrue}, {'l', "submitlength", "Size of the buffer that stores ready submissions", sublen}, {'r', "numentries", "Number of entries each of the io_context have", nentries}, {'n', "numcontexts", "Number of io_contexts to the cluster", num_io}, }; int opt_cnt = sizeof(opt) / sizeof(cfa_option); char **left; parse_args( opt, opt_cnt, "[OPTIONS]...\ncforall readv benchmark", left ); if(kpollcp || odirect) { if( (buflen % 512) != 0 ) { fprintf(stderr, "Buffer length must be a multiple of 512 when using O_DIRECT, was %lu\n\n", buflen); print_args_usage(opt, opt_cnt, "[OPTIONS]...\ncforall readv benchmark", true); } } io_context_params params; if( subthrd ) params.poller_submits = true; if( subeagr ) params.eager_submits = true; if( kpollsb ) params.poll_submit = true; if( kpollcp ) params.poll_complete = true; if(params.poll_submit ) fixed_file = true; if(params.poll_complete) odirect = true; params.num_ready = sublen; params.num_entries = nentries; if(odirect) file_flags |= O_DIRECT; int lfd = open(__FILE__, file_flags); if(lfd < 0) { fprintf(stderr, "Could not open source file\n"); exit(EXIT_FAILURE); } printf("Running %d threads, reading %lu bytes each, over %d processors for %f seconds\n", nthreads, buflen, nprocs, duration); { Time start, end; BenchCluster cl = { num_io, params, CFA_STATS_READY_Q | CFA_STATS_IO }; if(fixed_file) { fd = 0; register_fixed_files( cl.self, &lfd, 1 ); } else { fd = lfd; } { BenchProc procs[nprocs]; { Reader threads[nthreads]; printf("Starting\n"); bool is_tty = isatty(STDOUT_FILENO); start = timeHiRes(); run = true; for(i; nthreads) { unpark( threads[i] ); } wait(duration, start, end, is_tty); run = false; end = timeHiRes(); printf("\nDone\n"); } printf("Readers closed\n"); } printf("Took %'ld ms\n", (end - start)`ms); printf("Total reads : %'15zu\n", count); printf("Reads per second : %'18.2lf\n", ((double)count) / (end - start)`s); printf("Total read size : %'15zu\n", buflen * count); printf("Bytes per second : %'18.2lf\n", ((double)count * buflen) / (end - start)`s); } close(lfd); }