Changes in / [59f74a2:0376da9]


Ignore:
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/readv.cfa

    r59f74a2 r0376da9  
    1 #define _GNU_SOURCE
    2 
    31#include <stdlib.h>
    42#include <stdio.h>
     
    2422extern bool traceHeapOn();
    2523extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
    26 extern ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
    27 extern void register_fixed_files( cluster &, int *, unsigned count );
    2824
    2925int fd;
     
    3228
    3329unsigned long int buflen = 50;
    34 bool fixed_file = false;
    3530
    3631thread __attribute__((aligned(128))) Reader {};
    3732void ?{}( Reader & this ) {
    3833        ((thread&)this){ "Reader Thread", *the_benchmark_cluster };
    39 }
    40 
    41 int do_read(int fd, struct iovec * iov) {
    42         if(fixed_file) {
    43                 return cfa_preadv2_fixed(fd, iov, 1, 0, 0);
    44         }
    45         else {
    46                 return cfa_preadv2(fd, iov, 1, 0, 0);
    47         }
    4834}
    4935
     
    5642
    5743        while(__atomic_load_n(&run, __ATOMIC_RELAXED)) {
    58                 int r = do_read(fd, &iov);
     44                int r = cfa_preadv2(fd, &iov, 1, 0, 0);
    5945                if(r < 0) abort("%s\n", strerror(-r));
    6046
     
    6652        BENCH_DECL
    6753        unsigned flags = 0;
    68         int file_flags = 0;
    6954        unsigned sublen = 16;
    7055
     
    11196                        case 'k':
    11297                                flags |= CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS;
    113                                 fixed_file = true;
    11498                                break;
    11599                        case 'i':
    116100                                flags |= CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES;
    117                                 file_flags |= O_DIRECT;
    118101                                break;
    119102                        case 'l':
     
    140123        }
    141124
    142         int lfd = open(__FILE__, file_flags);
    143         if(lfd < 0) {
     125        fd = open(__FILE__, 0);
     126        if(fd < 0) {
    144127                fprintf(stderr, "Could not open source file\n");
    145128                exit(EXIT_FAILURE);
     
    151134                Time start, end;
    152135                BenchCluster cl = { flags, CFA_STATS_READY_Q | CFA_STATS_IO };
    153 
    154                 if(fixed_file) {
    155                         fd = 0;
    156                         register_fixed_files( cl.self, &lfd, 1 );
    157                 }
    158                 else {
    159                         fd = lfd;
    160                 }
    161 
    162136                {
    163137                        BenchProc procs[nprocs];
     
    187161        }
    188162
    189         close(lfd);
     163        close(fd);
    190164}
  • libcfa/src/concurrency/io.cfa

    r59f74a2 r0376da9  
    1414//
    1515
    16 #if defined(__CFA_DEBUG__)
    17         // #define __CFA_DEBUG_PRINT_IO__
    18         #define __CFA_DEBUG_PRINT_IO_CORE__
    19 #endif
     16// #define __CFA_DEBUG_PRINT_IO__
     17// #define __CFA_DEBUG_PRINT_IO_CORE__
    2018
    2119#include "kernel.hfa"
     
    327325
    328326                // Create the poller thread
    329                 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this);
     327                __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this);
    330328                this.io->poller.slow.blocked = false;
    331329                this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
     
    432430        }
    433431
    434         int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) {
    435                 bool need_sys_to_submit = false;
    436                 bool need_sys_to_complete = false;
    437                 unsigned min_complete = 0;
    438                 unsigned flags = 0;
    439 
    440 
    441                 TO_SUBMIT:
    442                 if( to_submit > 0 ) {
    443                         if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
    444                                 need_sys_to_submit = true;
    445                                 break TO_SUBMIT;
    446                         }
    447                         if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
    448                                 need_sys_to_submit = true;
    449                                 flags |= IORING_ENTER_SQ_WAKEUP;
    450                         }
    451                 }
    452 
    453                 TO_COMPLETE:
    454                 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
    455                         flags |= IORING_ENTER_GETEVENTS;
    456                         if( mask ) {
    457                                 need_sys_to_complete = true;
    458                                 min_complete = 1;
    459                                 break TO_COMPLETE;
    460                         }
    461                         if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
    462                                 need_sys_to_complete = true;
    463                         }
    464                 }
    465 
    466                 int ret = 0;
    467                 if( need_sys_to_submit || need_sys_to_complete ) {
    468                         ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);
    469                         if( ret < 0 ) {
    470                                 switch((int)errno) {
    471                                 case EAGAIN:
    472                                 case EINTR:
    473                                         ret = -1;
    474                                         break;
    475                                 default:
    476                                         abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
    477                                 }
    478                         }
    479                 }
    480 
    481                 // Memory barrier
    482                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
    483                 return ret;
    484         }
    485 
    486432//=============================================================================================
    487433// I/O Polling
     
    492438        // Process a single completion message from the io_uring
    493439        // This is NOT thread-safe
    494         static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) {
     440        static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) {
    495441                /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
    496442
     
    501447                }
    502448
    503                 int ret = __io_uring_enter(ring, to_submit, true, mask);
    504                 if( ret < 0 ) {
    505                         return [0, true];
    506                 }
    507 
    508                 // update statistics
    509                 if (to_submit > 0) {
     449                if (to_submit > 0 || waitcnt > 0) {
     450                        int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
     451                        if( ret < 0 ) {
     452                                switch((int)errno) {
     453                                case EAGAIN:
     454                                case EINTR:
     455                                        return [0, true];
     456                                default:
     457                                        abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
     458                                }
     459                        }
     460
     461                        // Release the consumed SQEs
     462                        __release_consumed_submission( ring );
     463
     464                        // update statistics
    510465                        __STATS__( true,
    511466                                if( to_submit > 0 ) {
     
    517472                }
    518473
    519                 // Release the consumed SQEs
    520                 __release_consumed_submission( ring );
     474                // Memory barrier
     475                __atomic_thread_fence( __ATOMIC_SEQ_CST );
    521476
    522477                // Drain the queue
     
    542497
    543498                        data->result = cqe.res;
    544                         if(!mask) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
    545                         else      { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); }
     499                        if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
     500                        else         { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); }
    546501                }
    547502
     
    591546                                int count;
    592547                                bool again;
    593                                 [count, again] = __drain_io( ring, &mask );
     548                                [count, again] = __drain_io( ring, &mask, 1, true );
    594549
    595550                                __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
     
    613568                                int count;
    614569                                bool again;
    615                                 [count, again] = __drain_io( ring, &mask );
     570                                [count, again] = __drain_io( ring, &mask, 1, true );
    616571
    617572                                // Update statistics
     
    651606                        bool again;
    652607                        disable_interrupts();
    653                                 [count, again] = __drain_io( *this.ring, 0p );
     608                                [count, again] = __drain_io( *this.ring, 0p, 0, false );
    654609
    655610                                if(!again) reset++;
     
    845800                        // We got the lock
    846801                        unsigned to_submit = __collect_submitions( ring );
    847                         int ret = __io_uring_enter( ring, to_submit, false, 0p );
     802                        int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, 0, 0p, _NSIG / 8);
    848803                        if( ret < 0 ) {
    849                                 unlock(ring.submit_q.lock);
    850                                 return;
    851                         }
    852 
    853                         /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) );
     804                                switch((int)errno) {
     805                                case EAGAIN:
     806                                case EINTR:
     807                                        unlock(ring.submit_q.lock);
     808                                        return;
     809                                default:
     810                                        abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
     811                                }
     812                        }
     813
     814                        /* paranoid */ verify( ret > 0 );
    854815
    855816                        // Release the consumed SQEs
     
    869830                        lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
    870831
    871                         /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
    872                         /* paranoid */  "index %u already reclaimed\n"
    873                         /* paranoid */  "head %u, prev %u, tail %u\n"
    874                         /* paranoid */  "[-0: %u,-1: %u,-2: %u,-3: %u]\n",
    875                         /* paranoid */  idx,
    876                         /* paranoid */  *ring.submit_q.head, ring.submit_q.prev_head, *tail
    877                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
    878                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
    879                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
    880                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
    881                         /* paranoid */ );
    882 
    883832                        // Append to the list of ready entries
    884833
    885834                        /* paranoid */ verify( idx <= mask );
    886                         ring.submit_q.array[ (*tail) & mask ] = idx;
     835
     836                        ring.submit_q.array[ (*tail) & mask ] = idx & mask;
    887837                        __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
    888838
     839                        /* paranoid */ verify( ring.submit_q.sqes[ idx ].user_data != 0 );
     840
    889841                        // Submit however, many entries need to be submitted
    890                         int ret = __io_uring_enter( ring, 1, false, 0p );
     842                        int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
    891843                        if( ret < 0 ) {
    892844                                switch((int)errno) {
     
    954906                return count;
    955907        }
    956 
    957 //=============================================================================================
    958 // I/O Submissions
    959 //=============================================================================================
    960 
    961         void register_fixed_files( cluster & cl, int * files, unsigned count ) {
    962                 int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );
    963                 if( ret < 0 ) {
    964                         abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
    965                 }
    966 
    967                 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
    968         }
    969908#endif
  • libcfa/src/concurrency/iocall.cfa

    r59f74a2 r0376da9  
    128128                #endif
    129129        }
    130 
    131         ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
    132                 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READV)
    133                         return preadv2(fd, iov, iovcnt, offset, flags);
    134                 #else
    135                         __submit_prelude
    136 
    137                         (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset };
    138                         sqe->flags |= IOSQE_FIXED_FILE;
    139 
    140                         __submit_wait
    141                 #endif
    142         }
    143130#endif
    144131
Note: See TracChangeset for help on using the changeset viewer.