Changeset 20ab637


Ignore:
Timestamp:
Jul 10, 2020, 2:17:49 PM (15 months ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
arm-eh, jacob/cs343-translation, master, new-ast, new-ast-unique-expr
Children:
59f74a2
Parents:
3a32b3a
Message:

Added quick and dirty support for fixed files reads.
Added support for kernel side polling.

Files:
3 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/readv.cfa

    r3a32b3a r20ab637  
     1#define _GNU_SOURCE
     2
    13#include <stdlib.h>
    24#include <stdio.h>
     
    2224extern bool traceHeapOn();
    2325extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
     26extern ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
     27extern void register_fixed_files( cluster &, int *, unsigned count );
    2428
    2529int fd;
     
    2832
    2933unsigned long int buflen = 50;
     34bool fixed_file = false;
    3035
    3136thread __attribute__((aligned(128))) Reader {};
    3237void ?{}( Reader & this ) {
    3338        ((thread&)this){ "Reader Thread", *the_benchmark_cluster };
     39}
     40
     41int do_read(int fd, struct iovec * iov) {
     42        if(fixed_file) {
     43                return cfa_preadv2_fixed(fd, iov, 1, 0, 0);
     44        }
     45        else {
     46                return cfa_preadv2(fd, iov, 1, 0, 0);
     47        }
    3448}
    3549
     
    4256
    4357        while(__atomic_load_n(&run, __ATOMIC_RELAXED)) {
    44                 int r = cfa_preadv2(fd, &iov, 1, 0, 0);
     58                int r = do_read(fd, &iov);
    4559                if(r < 0) abort("%s\n", strerror(-r));
    4660
     
    5266        BENCH_DECL
    5367        unsigned flags = 0;
     68        int file_flags = 0;
    5469        unsigned sublen = 16;
    5570
     
    96111                        case 'k':
    97112                                flags |= CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS;
     113                                fixed_file = true;
    98114                                break;
    99115                        case 'i':
    100116                                flags |= CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES;
     117                                file_flags |= O_DIRECT;
    101118                                break;
    102119                        case 'l':
     
    123140        }
    124141
    125         fd = open(__FILE__, 0);
    126         if(fd < 0) {
     142        int lfd = open(__FILE__, file_flags);
     143        if(lfd < 0) {
    127144                fprintf(stderr, "Could not open source file\n");
    128145                exit(EXIT_FAILURE);
     
    134151                Time start, end;
    135152                BenchCluster cl = { flags, CFA_STATS_READY_Q | CFA_STATS_IO };
     153
     154                if(fixed_file) {
     155                        fd = 0;
     156                        register_fixed_files( cl.self, &lfd, 1 );
     157                }
     158                else {
     159                        fd = lfd;
     160                }
     161
    136162                {
    137163                        BenchProc procs[nprocs];
     
    161187        }
    162188
    163         close(fd);
     189        close(lfd);
    164190}
  • libcfa/src/concurrency/io.cfa

    r3a32b3a r20ab637  
    1414//
    1515
    16 // #define __CFA_DEBUG_PRINT_IO__
    17 // #define __CFA_DEBUG_PRINT_IO_CORE__
     16#if defined(__CFA_DEBUG__)
     17        // #define __CFA_DEBUG_PRINT_IO__
     18        #define __CFA_DEBUG_PRINT_IO_CORE__
     19#endif
    1820
    1921#include "kernel.hfa"
     
    325327
    326328                // Create the poller thread
    327                 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this);
     329                __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this);
    328330                this.io->poller.slow.blocked = false;
    329331                this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
     
    430432        }
    431433
     434        int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) {
     435                bool need_sys_to_submit = false;
     436                bool need_sys_to_complete = false;
     437                unsigned min_complete = 0;
     438                unsigned flags = 0;
     439
     440
     441                TO_SUBMIT:
     442                if( to_submit > 0 ) {
     443                        if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
     444                                need_sys_to_submit = true;
     445                                break TO_SUBMIT;
     446                        }
     447                        if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
     448                                need_sys_to_submit = true;
     449                                flags |= IORING_ENTER_SQ_WAKEUP;
     450                        }
     451                }
     452
     453                TO_COMPLETE:
     454                if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
     455                        flags |= IORING_ENTER_GETEVENTS;
     456                        if( mask ) {
     457                                need_sys_to_complete = true;
     458                                min_complete = 1;
     459                                break TO_COMPLETE;
     460                        }
     461                        if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
     462                                need_sys_to_complete = true;
     463                        }
     464                }
     465
     466                int ret = 0;
     467                if( need_sys_to_submit || need_sys_to_complete ) {
     468                        ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);
     469                        if( ret < 0 ) {
     470                                switch((int)errno) {
     471                                case EAGAIN:
     472                                case EINTR:
     473                                        ret = -1;
     474                                        break;
     475                                default:
     476                                        abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
     477                                }
     478                        }
     479                }
     480
     481                // Memory barrier
     482                __atomic_thread_fence( __ATOMIC_SEQ_CST );
     483                return ret;
     484        }
     485
    432486//=============================================================================================
    433487// I/O Polling
     
    438492        // Process a single completion message from the io_uring
    439493        // This is NOT thread-safe
    440         static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) {
     494        static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) {
    441495                /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
    442496                const uint32_t smask = *ring.submit_q.mask;
     
    448502                }
    449503
    450                 if (to_submit > 0 || waitcnt > 0) {
    451                         int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
    452                         if( ret < 0 ) {
    453                                 switch((int)errno) {
    454                                 case EAGAIN:
    455                                 case EINTR:
    456                                         return [0, true];
    457                                 default:
    458                                         abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
    459                                 }
    460                         }
    461 
    462                         // Release the consumed SQEs
    463                         __release_consumed_submission( ring );
    464 
    465                         // update statistics
     504                int ret = __io_uring_enter(ring, to_submit, true, mask);
     505                if( ret < 0 ) {
     506                        return [0, true];
     507                }
     508
     509                // update statistics
     510                if (to_submit > 0) {
    466511                        __STATS__( true,
    467512                                if( to_submit > 0 ) {
     
    473518                }
    474519
    475                 // Memory barrier
    476                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
     520                // Release the consumed SQEs
     521                __release_consumed_submission( ring );
    477522
    478523                // Drain the queue
     
    498543
    499544                        data->result = cqe.res;
    500                         if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
    501                         else         { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); }
     545                        if(!mask) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
     546                        else      { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); }
    502547                }
    503548
     
    547592                                int count;
    548593                                bool again;
    549                                 [count, again] = __drain_io( ring, &mask, 1, true );
     594                                [count, again] = __drain_io( ring, &mask );
    550595
    551596                                __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
     
    569614                                int count;
    570615                                bool again;
    571                                 [count, again] = __drain_io( ring, &mask, 1, true );
     616                                [count, again] = __drain_io( ring, &mask );
    572617
    573618                                // Update statistics
     
    607652                        bool again;
    608653                        disable_interrupts();
    609                                 [count, again] = __drain_io( *this.ring, 0p, 0, false );
     654                                [count, again] = __drain_io( *this.ring, 0p );
    610655
    611656                                if(!again) reset++;
     
    801846                        // We got the lock
    802847                        unsigned to_submit = __collect_submitions( ring );
    803                         int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, 0, 0p, _NSIG / 8);
     848                        int ret = __io_uring_enter( ring, to_submit, false, 0p );
    804849                        if( ret < 0 ) {
    805                                 switch((int)errno) {
    806                                 case EAGAIN:
    807                                 case EINTR:
    808                                         unlock(ring.submit_q.lock);
    809                                         return;
    810                                 default:
    811                                         abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
    812                                 }
    813                         }
    814 
    815                         /* paranoid */ verify( ret > 0 );
     850                                unlock(ring.submit_q.lock);
     851                                return;
     852                        }
     853
     854                        /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) );
    816855
    817856                        // Release the consumed SQEs
     
    831870                        lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
    832871
     872                        /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
     873                        /* paranoid */  "index %u already reclaimed\n"
     874                        /* paranoid */  "head %u, prev %u, tail %u\n"
     875                        /* paranoid */  "[-0: %u,-1: %u,-2: %u,-3: %u]\n",
     876                        /* paranoid */  idx,
     877                        /* paranoid */  *ring.submit_q.head, ring.submit_q.prev_head, *tail
     878                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
     879                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
     880                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
     881                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
     882                        /* paranoid */ );
     883
    833884                        // Append to the list of ready entries
    834885
    835886                        /* paranoid */ verify( idx <= mask );
    836 
    837                         ring.submit_q.array[ (*tail) & mask ] = idx & mask;
     887                        ring.submit_q.array[ (*tail) & mask ] = idx;
    838888                        __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
    839889
    840                         /* paranoid */ verify( ring.submit_q.sqes[ idx ].user_data != 0 );
    841 
    842890                        // Submit however, many entries need to be submitted
    843                         int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
     891                        int ret = __io_uring_enter( ring, 1, false, 0p );
    844892                        if( ret < 0 ) {
    845893                                switch((int)errno) {
     
    907955                return count;
    908956        }
     957
     958//=============================================================================================
     959// I/O Submissions
     960//=============================================================================================
     961
     962        void register_fixed_files( cluster & cl, int * files, unsigned count ) {
     963                int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );
     964                if( ret < 0 ) {
     965                        abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
     966                }
     967
     968                __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
     969        }
    909970#endif
  • libcfa/src/concurrency/iocall.cfa

    r3a32b3a r20ab637  
    128128                #endif
    129129        }
     130
     131        ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
     132                #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READV)
     133                        return preadv2(fd, iov, iovcnt, offset, flags);
     134                #else
     135                        __submit_prelude
     136
     137                        (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset };
     138                        sqe->flags |= IOSQE_FIXED_FILE;
     139
     140                        __submit_wait
     141                #endif
     142        }
    130143#endif
    131144
Note: See TracChangeset for help on using the changeset viewer.