Changes in / [0dedf027:6c12fd28]


Ignore:
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/readv.cfa

    r0dedf027 r6c12fd28  
    5959        unsigned long int nthreads = 2;
    6060        unsigned long int nprocs   = 1;
    61         int flags = 0;
     61        unsigned flags = 0;
     62        unsigned sublen = 16;
    6263
    6364        arg_loop:
    6465        for(;;) {
    6566                static struct option options[] = {
    66                         {"duration",   required_argument, 0, 'd'},
    67                         {"nthreads",   required_argument, 0, 't'},
    68                         {"nprocs",     required_argument, 0, 'p'},
    69                         {"bufsize",    required_argument, 0, 'b'},
    70                         {"userthread", no_argument      , 0, 'u'},
     67                        {"duration",     required_argument, 0, 'd'},
     68                        {"nthreads",     required_argument, 0, 't'},
     69                        {"nprocs",       required_argument, 0, 'p'},
     70                        {"bufsize",      required_argument, 0, 'b'},
     71                        {"userthread",   no_argument      , 0, 'u'},
     72                        {"submitthread", no_argument      , 0, 's'},
     73                        {"submitlength", required_argument, 0, 'l'},
    7174                        {0, 0, 0, 0}
    7275                };
    7376
    7477                int idx = 0;
    75                 int opt = getopt_long(argc, argv, "d:t:p:b:u", options, &idx);
     78                int opt = getopt_long(argc, argv, "d:t:p:b:usl:", options, &idx);
    7679
    7780                const char * arg = optarg ? optarg : "";
     
    113116                                flags |= CFA_CLUSTER_IO_POLLER_USER_THREAD;
    114117                                break;
     118                        case 's':
     119                                flags |= CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS;
     120                                break;
     121                        case 'l':
     122                                sublen = strtoul(arg, &end, 10);
     123                                if(*end != '\0' && sublen < 16) {
     124                                        fprintf(stderr, "Submit length must be at least 16, was %s\n", arg);
     125                                        goto usage;
     126                                }
     127                                flags |= (sublen << CFA_CLUSTER_IO_BUFFLEN_OFFSET);
     128                                break;
    115129                        // Other cases
    116130                        default: /* ? */
     
    123137                                fprintf(stderr, "  -p, --nprocs=NPROCS      Number of kernel threads\n");
    124138                                fprintf(stderr, "  -b, --buflen=SIZE        Number of bytes to read per request\n");
     139                                fprintf(stderr, "  -u, --userthread         If set, cluster uses user-thread to poll I/O\n");
     140                                fprintf(stderr, "  -s, --submitthread       If set, cluster uses polling thread to submit I/O\n");
    125141                                exit(EXIT_FAILURE);
    126142                }
  • libcfa/src/concurrency/io.cfa

    r0dedf027 r6c12fd28  
    2020
    2121#if !defined(HAVE_LINUX_IO_URING_H)
    22         void __kernel_io_startup( cluster &, int, bool ) {
     22        void __kernel_io_startup( cluster &, unsigned, bool ) {
    2323                // Nothing to do without io_uring
    2424        }
     
    9191        struct __io_poller_fast {
    9292                struct __io_data * ring;
    93                 bool waiting;
    9493                $thread thrd;
    9594        };
     
    9796        void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
    9897                this.ring = cltr.io;
    99                 this.waiting = true;
    10098                (this.thrd){ "Fast I/O Poller", cltr };
    10199        }
     
    126124                // Like head/tail but not seen by the kernel
    127125                volatile uint32_t alloc;
    128                 volatile uint32_t ready;
     126                volatile uint32_t * ready;
     127                uint32_t ready_cnt;
    129128
    130129                __spinlock_t lock;
     
    145144                                        volatile unsigned long long int block;
    146145                                } submit_avg;
     146                                struct {
     147                                        volatile unsigned long long int val;
     148                                        volatile unsigned long long int cnt;
     149                                        volatile unsigned long long int block;
     150                                } look_avg;
    147151                        } stats;
    148152                #endif
     
    201205// I/O Startup / Shutdown logic
    202206//=============================================================================================
    203         void __kernel_io_startup( cluster & this, int io_flags, bool main_cluster ) {
     207        void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
    204208                this.io = malloc();
    205209
     
    274278                sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
    275279                sq.alloc = *sq.tail;
    276                 sq.ready = *sq.tail;
     280
     281                if( io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
     282                        sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
     283                        sq.ready = alloc_align( 64, sq.ready_cnt );
     284                        for(i; sq.ready_cnt) {
     285                                sq.ready[i] = -1ul32;
     286                        }
     287                }
     288                else {
     289                        sq.ready_cnt = 0;
     290                        sq.ready = 0p;
     291                }
    277292
    278293                // completion queue
     
    307322                        this.io->submit_q.stats.submit_avg.cnt   = 0;
    308323                        this.io->submit_q.stats.submit_avg.block = 0;
     324                        this.io->submit_q.stats.look_avg.val   = 0;
     325                        this.io->submit_q.stats.look_avg.cnt   = 0;
     326                        this.io->submit_q.stats.look_avg.block = 0;
    309327                        this.io->completion_q.stats.completed_avg.val = 0;
    310328                        this.io->completion_q.stats.completed_avg.slow_cnt = 0;
     
    347365                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    348366                        with( this.io->poller.fast ) {
    349                                 /* paranoid */ verify( waiting ); // The thread shouldn't be in a system call
    350367                                /* paranoid */ verify( this.procs.head == 0p || &this == mainCluster );
    351368                                /* paranoid */ verify( this.idles.head == 0p || &this == mainCluster );
    352369
    353370                                // We need to adjust the clean-up based on where the thread is
    354                                 if( thrd.preempted != __NO_PREEMPTION ) {
     371                                if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
    355372
    356373                                        // This is the tricky case
    357374                                        // The thread was preempted and now it is on the ready queue
    358                                         /* paranoid */ verify( thrd.state == Active );           // The thread better be in this state
    359375                                        /* paranoid */ verify( thrd.next == 1p );                // The thread should be the last on the list
    360376                                        /* paranoid */ verify( this.ready_queue.head == &thrd ); // The thread should be the only thing on the list
     
    405421                        if(this.print_stats) {
    406422                                with(this.io->submit_q.stats, this.io->completion_q.stats) {
     423                                        double lavgv = 0;
     424                                        double lavgb = 0;
     425                                        if(look_avg.cnt != 0) {
     426                                                lavgv = ((double)look_avg.val  ) / look_avg.cnt;
     427                                                lavgb = ((double)look_avg.block) / look_avg.cnt;
     428                                        }
     429
    407430                                        __cfaabi_bits_print_safe( STDERR_FILENO,
    408431                                                "----- I/O uRing Stats -----\n"
    409                                                 "- total submit calls  : %'15llu\n"
    410                                                 "- avg submit          : %'18.2lf\n"
    411                                                 "- pre-submit block %%  : %'18.2lf\n"
    412                                                 "- total wait calls    : %'15llu   (%'llu slow, %'llu fast)\n"
    413                                                 "- avg completion/wait : %'18.2lf\n",
     432                                                "- total submit calls     : %'15llu\n"
     433                                                "- avg submit             : %'18.2lf\n"
     434                                                "- pre-submit block %%     : %'18.2lf\n"
     435                                                "- total ready search     : %'15llu\n"
     436                                                "- avg ready search len   : %'18.2lf\n"
     437                                                "- avg ready search block : %'18.2lf\n"
     438                                                "- total wait calls       : %'15llu   (%'llu slow, %'llu fast)\n"
     439                                                "- avg completion/wait    : %'18.2lf\n",
    414440                                                submit_avg.cnt,
    415441                                                ((double)submit_avg.val) / submit_avg.cnt,
    416442                                                (100.0 * submit_avg.block) / submit_avg.cnt,
     443                                                look_avg.cnt,
     444                                                lavgv,
     445                                                lavgb,
    417446                                                completed_avg.slow_cnt + completed_avg.fast_cnt,
    418447                                                completed_avg.slow_cnt,  completed_avg.fast_cnt,
     
    441470                close(this.io->fd);
    442471
     472                free( this.io->submit_q.ready ); // Maybe null, doesn't matter
    443473                free( this.io );
    444474        }
     
    454484        // Process a single completion message from the io_uring
    455485        // This is NOT thread-safe
    456         static int __drain_io( struct __io_data & ring, sigset_t * mask, int waitcnt, bool in_kernel ) {
    457                 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
     486        static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) {
     487                unsigned to_submit = 0;
     488                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
     489
     490                        // If the poller thread also submits, then we need to aggregate the submissions which are ready
     491                        uint32_t * tail = ring.submit_q.tail;
     492                        const uint32_t mask = *ring.submit_q.mask;
     493
     494                        // Go through the list of ready submissions
     495                        for( i; ring.submit_q.ready_cnt ) {
     496                                // replace any submission with the sentinel, to consume it.
     497                                uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
     498
     499                                // If it was already the sentinel, then we are done
     500                                if( idx == -1ul32 ) continue;
     501
     502                                // If we got a real submission, append it to the list
     503                                ring.submit_q.array[ ((*tail) + to_submit) & mask ] = idx & mask;
     504                                to_submit++;
     505                        }
     506
     507                        // Increment the tail based on how many we are ready to submit
     508                        __atomic_fetch_add(tail, to_submit, __ATOMIC_SEQ_CST);
     509
     510                        // update statistics
     511                        #if !defined(__CFA_NO_STATISTICS__)
     512                                ring.submit_q.stats.submit_avg.val += to_submit;
     513                                ring.submit_q.stats.submit_avg.cnt += 1;
     514                        #endif
     515                }
     516
     517                int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
    458518                if( ret < 0 ) {
    459519                        switch((int)errno) {
     
    497557                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
    498558
    499                 return count;
     559                return [count, count > 0 || to_submit > 0];
    500560        }
    501561
     
    519579                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    520580                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
     581
    521582                                // In the user-thread approach drain and if anything was drained,
    522583                                // batton pass to the user-thread
    523                                 int count = __drain_io( ring, &mask, 1, true );
     584                                int count;
     585                                bool again;
     586                                [count, again] = __drain_io( ring, &mask, 0, true );
    524587
    525588                                // Update statistics
     
    529592                                #endif
    530593
    531                                 if(count > 0) {
     594                                if(again) {
    532595                                        __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
    533596                                        __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
     
    539602                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    540603                                //In the naive approach, just poll the io completion queue directly
    541                                 int count = __drain_io( ring, &mask, 1, true );
     604                                int count;
     605                                bool again;
     606                                [count, again] = __drain_io( ring, &mask, 1, true );
    542607
    543608                                // Update statistics
     
    566631                // Then loop until we need to start
    567632                while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
     633
    568634                        // Drain the io
    569                         this.waiting = false;
    570                         int count = __drain_io( *this.ring, 0p, 0, false );
    571                         reset += count > 0 ? 1 : 0;
     635                        int count;
     636                        bool again;
     637                        [count, again] = __drain_io( *this.ring, 0p, 0, false );
     638
     639                        if(!again) reset++;
    572640
    573641                        // Update statistics
     
    577645                        #endif
    578646
    579                         this.waiting = true;
     647                        // If we got something, just yield and check again
    580648                        if(reset < 5) {
    581                                 // If we got something, just yield and check again
    582649                                yield();
    583650                        }
     651                        // We didn't get anything baton pass to the slow poller
    584652                        else {
    585                                 // We didn't get anything baton pass to the slow poller
    586653                                __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
     654                                reset = 0;
     655
     656                                // wake up the slow poller
    587657                                post( this.ring->poller.sem );
     658
     659                                // park this thread
    588660                                park( __cfaabi_dbg_ctx );
    589                                 reset = 0;
    590661                        }
    591662                }
    592663
    593664                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
     665        }
     666
     667        static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
     668        static inline void __wake_poller( struct __io_data & ring ) {
     669                // sigval val = { 1 };
     670                // pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
    594671        }
    595672
     
    632709                uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST);
    633710
    634                 // Validate that we didn't overflow anything
    635                 // Check that nothing overflowed
    636                 /* paranoid */ verify( true );
    637 
    638                 // Check that it goes head -> tail -> alloc and never head -> alloc -> tail
    639                 /* paranoid */ verify( true );
     711                // Mask the idx now to allow make everything easier to check
     712                idx &= *ring.submit_q.mask;
    640713
    641714                // Return the sqe
    642                 return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx];
     715                return [&ring.submit_q.sqes[ idx ], idx];
    643716        }
    644717
    645718        static inline void __submit( struct __io_data & ring, uint32_t idx ) {
    646                 // get mutual exclusion
    647                 lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
    648 
    649                 // Append to the list of ready entries
    650                 uint32_t * tail = ring.submit_q.tail;
     719                // Get now the data we definetely need
     720                uint32_t * const tail = ring.submit_q.tail;
    651721                const uint32_t mask = *ring.submit_q.mask;
    652722
    653                 ring.submit_q.array[ (*tail) & mask ] = idx & mask;
    654                 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
    655 
    656                 // Submit however, many entries need to be submitted
    657                 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
    658                 if( ret < 0 ) {
    659                         switch((int)errno) {
    660                         default:
    661                                 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
    662                         }
    663                 }
    664 
    665                 // update statistics
    666                 #if !defined(__CFA_NO_STATISTICS__)
    667                         ring.submit_q.stats.submit_avg.val += 1;
    668                         ring.submit_q.stats.submit_avg.cnt += 1;
    669                 #endif
    670 
    671                 unlock(ring.submit_q.lock);
    672                 // Make sure that idx was submitted
    673                 // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us
    674                 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
     723                // There are 2 submission schemes, check which one we are using
     724                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
     725                        // If the poller thread submits, then we just need to add this to the ready array
     726
     727                        /* paranoid */ verify( idx <= mask   );
     728                        /* paranoid */ verify( idx != -1ul32 );
     729
     730                        // We need to find a spot in the ready array
     731                        __attribute((unused)) int len   = 0;
     732                        __attribute((unused)) int block = 0;
     733                        uint32_t expected = -1ul32;
     734                        LOOKING: for(;;) {
     735                                for(i; ring.submit_q.ready_cnt) {
     736                                        if( __atomic_compare_exchange_n( &ring.submit_q.ready[i], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
     737                                                break LOOKING;
     738                                        }
     739
     740                                        len ++;
     741                                }
     742
     743                                block++;
     744                                yield();
     745                        }
     746
     747                        __wake_poller( ring );
     748
     749                        // update statistics
     750                        #if !defined(__CFA_NO_STATISTICS__)
     751                                __atomic_fetch_add( &ring.submit_q.stats.look_avg.val,   len,   __ATOMIC_RELAXED );
     752                                __atomic_fetch_add( &ring.submit_q.stats.look_avg.block, block, __ATOMIC_RELAXED );
     753                                __atomic_fetch_add( &ring.submit_q.stats.look_avg.cnt,   1,     __ATOMIC_RELAXED );
     754                        #endif
     755
     756                        __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
     757                }
     758                else {
     759                        // get mutual exclusion
     760                        lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
     761
     762                        // Append to the list of ready entries
     763
     764                        /* paranoid */ verify( idx <= mask );
     765
     766                        ring.submit_q.array[ (*tail) & mask ] = idx & mask;
     767                        __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
     768
     769                        // Submit however, many entries need to be submitted
     770                        int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
     771                        if( ret < 0 ) {
     772                                switch((int)errno) {
     773                                default:
     774                                        abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
     775                                }
     776                        }
     777
     778                        // update statistics
     779                        #if !defined(__CFA_NO_STATISTICS__)
     780                                ring.submit_q.stats.submit_avg.val += 1;
     781                                ring.submit_q.stats.submit_avg.cnt += 1;
     782                        #endif
     783
     784                        unlock(ring.submit_q.lock);
     785
     786                        __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
     787                }
    675788        }
    676789
  • libcfa/src/concurrency/kernel.cfa

    r0dedf027 r6c12fd28  
    256256}
    257257
    258 void ?{}(cluster & this, const char name[], Duration preemption_rate, int io_flags) with( this ) {
     258void ?{}(cluster & this, const char name[], Duration preemption_rate, unsigned io_flags) with( this ) {
    259259        this.name = name;
    260260        this.preemption_rate = preemption_rate;
  • libcfa/src/concurrency/kernel.hfa

    r0dedf027 r6c12fd28  
    116116struct __io_data;
    117117
    118 #define CFA_CLUSTER_IO_POLLER_USER_THREAD 1 << 0
    119 // #define CFA_CLUSTER_IO_POLLER_KERNEL_SIDE 1 << 1
     118#define CFA_CLUSTER_IO_POLLER_USER_THREAD    1 << 0 // 0x1
     119#define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS 1 << 1 // 0x2
     120// #define CFA_CLUSTER_IO_POLLER_KERNEL_SIDE 1 << 2 // 0x4
     121#define CFA_CLUSTER_IO_BUFFLEN_OFFSET        16
    120122
    121123//-----------------------------------------------------------------------------
     
    159161extern Duration default_preemption();
    160162
    161 void ?{} (cluster & this, const char name[], Duration preemption_rate, int flags);
     163void ?{} (cluster & this, const char name[], Duration preemption_rate, unsigned flags);
    162164void ^?{}(cluster & this);
    163165
    164 static inline void ?{} (cluster & this)                                      { this{"Anonymous Cluster", default_preemption(), 0}; }
    165 static inline void ?{} (cluster & this, Duration preemption_rate)            { this{"Anonymous Cluster", preemption_rate, 0}; }
    166 static inline void ?{} (cluster & this, const char name[])                   { this{name, default_preemption(), 0}; }
    167 static inline void ?{} (cluster & this, int flags)                           { this{"Anonymous Cluster", default_preemption(), flags}; }
    168 static inline void ?{} (cluster & this, Duration preemption_rate, int flags) { this{"Anonymous Cluster", preemption_rate, flags}; }
    169 static inline void ?{} (cluster & this, const char name[], int flags)        { this{name, default_preemption(), flags}; }
     166static inline void ?{} (cluster & this)                                           { this{"Anonymous Cluster", default_preemption(), 0}; }
     167static inline void ?{} (cluster & this, Duration preemption_rate)                 { this{"Anonymous Cluster", preemption_rate, 0}; }
     168static inline void ?{} (cluster & this, const char name[])                        { this{name, default_preemption(), 0}; }
     169static inline void ?{} (cluster & this, unsigned flags)                           { this{"Anonymous Cluster", default_preemption(), flags}; }
     170static inline void ?{} (cluster & this, Duration preemption_rate, unsigned flags) { this{"Anonymous Cluster", preemption_rate, flags}; }
     171static inline void ?{} (cluster & this, const char name[], unsigned flags)        { this{name, default_preemption(), flags}; }
    170172
    171173static inline [cluster *&, cluster *& ] __get( cluster & this ) __attribute__((const)) { return this.node.[next, prev]; }
  • libcfa/src/concurrency/kernel_private.hfa

    r0dedf027 r6c12fd28  
    7777//-----------------------------------------------------------------------------
    7878// I/O
    79 void __kernel_io_startup     ( cluster &, int, bool );
     79void __kernel_io_startup     ( cluster &, unsigned, bool );
    8080void __kernel_io_finish_start( cluster & );
    8181void __kernel_io_prepare_stop( cluster & );
Note: See TracChangeset for help on using the changeset viewer.