Changes in / [6c12fd28:0dedf027]


Ignore:
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/readv.cfa

    r6c12fd28 r0dedf027  
    5959        unsigned long int nthreads = 2;
    6060        unsigned long int nprocs   = 1;
    61         unsigned flags = 0;
    62         unsigned sublen = 16;
     61        int flags = 0;
    6362
    6463        arg_loop:
    6564        for(;;) {
    6665                static struct option options[] = {
    67                         {"duration",     required_argument, 0, 'd'},
    68                         {"nthreads",     required_argument, 0, 't'},
    69                         {"nprocs",       required_argument, 0, 'p'},
    70                         {"bufsize",      required_argument, 0, 'b'},
    71                         {"userthread",   no_argument      , 0, 'u'},
    72                         {"submitthread", no_argument      , 0, 's'},
    73                         {"submitlength", required_argument, 0, 'l'},
     66                        {"duration",   required_argument, 0, 'd'},
     67                        {"nthreads",   required_argument, 0, 't'},
     68                        {"nprocs",     required_argument, 0, 'p'},
     69                        {"bufsize",    required_argument, 0, 'b'},
     70                        {"userthread", no_argument      , 0, 'u'},
    7471                        {0, 0, 0, 0}
    7572                };
    7673
    7774                int idx = 0;
    78                 int opt = getopt_long(argc, argv, "d:t:p:b:usl:", options, &idx);
     75                int opt = getopt_long(argc, argv, "d:t:p:b:u", options, &idx);
    7976
    8077                const char * arg = optarg ? optarg : "";
     
    116113                                flags |= CFA_CLUSTER_IO_POLLER_USER_THREAD;
    117114                                break;
    118                         case 's':
    119                                 flags |= CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS;
    120                                 break;
    121                         case 'l':
    122                                 sublen = strtoul(arg, &end, 10);
    123                                 if(*end != '\0' && sublen < 16) {
    124                                         fprintf(stderr, "Submit length must be at least 16, was %s\n", arg);
    125                                         goto usage;
    126                                 }
    127                                 flags |= (sublen << CFA_CLUSTER_IO_BUFFLEN_OFFSET);
    128                                 break;
    129115                        // Other cases
    130116                        default: /* ? */
     
    137123                                fprintf(stderr, "  -p, --nprocs=NPROCS      Number of kernel threads\n");
    138124                                fprintf(stderr, "  -b, --buflen=SIZE        Number of bytes to read per request\n");
    139                                 fprintf(stderr, "  -u, --userthread         If set, cluster uses user-thread to poll I/O\n");
    140                                 fprintf(stderr, "  -s, --submitthread       If set, cluster uses polling thread to submit I/O\n");
    141125                                exit(EXIT_FAILURE);
    142126                }
  • libcfa/src/concurrency/io.cfa

    r6c12fd28 r0dedf027  
    2020
    2121#if !defined(HAVE_LINUX_IO_URING_H)
    22         void __kernel_io_startup( cluster &, unsigned, bool ) {
     22        void __kernel_io_startup( cluster &, int, bool ) {
    2323                // Nothing to do without io_uring
    2424        }
     
    9191        struct __io_poller_fast {
    9292                struct __io_data * ring;
     93                bool waiting;
    9394                $thread thrd;
    9495        };
     
    9697        void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
    9798                this.ring = cltr.io;
     99                this.waiting = true;
    98100                (this.thrd){ "Fast I/O Poller", cltr };
    99101        }
     
    124126                // Like head/tail but not seen by the kernel
    125127                volatile uint32_t alloc;
    126                 volatile uint32_t * ready;
    127                 uint32_t ready_cnt;
     128                volatile uint32_t ready;
    128129
    129130                __spinlock_t lock;
     
    144145                                        volatile unsigned long long int block;
    145146                                } submit_avg;
    146                                 struct {
    147                                         volatile unsigned long long int val;
    148                                         volatile unsigned long long int cnt;
    149                                         volatile unsigned long long int block;
    150                                 } look_avg;
    151147                        } stats;
    152148                #endif
     
    205201// I/O Startup / Shutdown logic
    206202//=============================================================================================
    207         void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
     203        void __kernel_io_startup( cluster & this, int io_flags, bool main_cluster ) {
    208204                this.io = malloc();
    209205
     
    278274                sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
    279275                sq.alloc = *sq.tail;
    280 
    281                 if( io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
    282                         sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
    283                         sq.ready = alloc_align( 64, sq.ready_cnt );
    284                         for(i; sq.ready_cnt) {
    285                                 sq.ready[i] = -1ul32;
    286                         }
    287                 }
    288                 else {
    289                         sq.ready_cnt = 0;
    290                         sq.ready = 0p;
    291                 }
     276                sq.ready = *sq.tail;
    292277
    293278                // completion queue
     
    322307                        this.io->submit_q.stats.submit_avg.cnt   = 0;
    323308                        this.io->submit_q.stats.submit_avg.block = 0;
    324                         this.io->submit_q.stats.look_avg.val   = 0;
    325                         this.io->submit_q.stats.look_avg.cnt   = 0;
    326                         this.io->submit_q.stats.look_avg.block = 0;
    327309                        this.io->completion_q.stats.completed_avg.val = 0;
    328310                        this.io->completion_q.stats.completed_avg.slow_cnt = 0;
     
    365347                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    366348                        with( this.io->poller.fast ) {
     349                                /* paranoid */ verify( waiting ); // The thread shouldn't be in a system call
    367350                                /* paranoid */ verify( this.procs.head == 0p || &this == mainCluster );
    368351                                /* paranoid */ verify( this.idles.head == 0p || &this == mainCluster );
    369352
    370353                                // We need to adjust the clean-up based on where the thread is
    371                                 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
     354                                if( thrd.preempted != __NO_PREEMPTION ) {
    372355
    373356                                        // This is the tricky case
    374357                                        // The thread was preempted and now it is on the ready queue
     358                                        /* paranoid */ verify( thrd.state == Active );           // The thread better be in this state
    375359                                        /* paranoid */ verify( thrd.next == 1p );                // The thread should be the last on the list
    376360                                        /* paranoid */ verify( this.ready_queue.head == &thrd ); // The thread should be the only thing on the list
     
    421405                        if(this.print_stats) {
    422406                                with(this.io->submit_q.stats, this.io->completion_q.stats) {
    423                                         double lavgv = 0;
    424                                         double lavgb = 0;
    425                                         if(look_avg.cnt != 0) {
    426                                                 lavgv = ((double)look_avg.val  ) / look_avg.cnt;
    427                                                 lavgb = ((double)look_avg.block) / look_avg.cnt;
    428                                         }
    429 
    430407                                        __cfaabi_bits_print_safe( STDERR_FILENO,
    431408                                                "----- I/O uRing Stats -----\n"
    432                                                 "- total submit calls     : %'15llu\n"
    433                                                 "- avg submit             : %'18.2lf\n"
    434                                                 "- pre-submit block %%     : %'18.2lf\n"
    435                                                 "- total ready search     : %'15llu\n"
    436                                                 "- avg ready search len   : %'18.2lf\n"
    437                                                 "- avg ready search block : %'18.2lf\n"
    438                                                 "- total wait calls       : %'15llu   (%'llu slow, %'llu fast)\n"
    439                                                 "- avg completion/wait    : %'18.2lf\n",
     409                                                "- total submit calls  : %'15llu\n"
     410                                                "- avg submit          : %'18.2lf\n"
     411                                                "- pre-submit block %%  : %'18.2lf\n"
     412                                                "- total wait calls    : %'15llu   (%'llu slow, %'llu fast)\n"
     413                                                "- avg completion/wait : %'18.2lf\n",
    440414                                                submit_avg.cnt,
    441415                                                ((double)submit_avg.val) / submit_avg.cnt,
    442416                                                (100.0 * submit_avg.block) / submit_avg.cnt,
    443                                                 look_avg.cnt,
    444                                                 lavgv,
    445                                                 lavgb,
    446417                                                completed_avg.slow_cnt + completed_avg.fast_cnt,
    447418                                                completed_avg.slow_cnt,  completed_avg.fast_cnt,
     
    470441                close(this.io->fd);
    471442
    472                 free( this.io->submit_q.ready ); // Maybe null, doesn't matter
    473443                free( this.io );
    474444        }
     
    484454        // Process a single completion message from the io_uring
    485455        // This is NOT thread-safe
    486         static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) {
    487                 unsigned to_submit = 0;
    488                 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
    489 
    490                         // If the poller thread also submits, then we need to aggregate the submissions which are ready
    491                         uint32_t * tail = ring.submit_q.tail;
    492                         const uint32_t mask = *ring.submit_q.mask;
    493 
    494                         // Go through the list of ready submissions
    495                         for( i; ring.submit_q.ready_cnt ) {
    496                                 // replace any submission with the sentinel, to consume it.
    497                                 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
    498 
    499                                 // If it was already the sentinel, then we are done
    500                                 if( idx == -1ul32 ) continue;
    501 
    502                                 // If we got a real submission, append it to the list
    503                                 ring.submit_q.array[ ((*tail) + to_submit) & mask ] = idx & mask;
    504                                 to_submit++;
    505                         }
    506 
    507                         // Increment the tail based on how many we are ready to submit
    508                         __atomic_fetch_add(tail, to_submit, __ATOMIC_SEQ_CST);
    509 
    510                         // update statistics
    511                         #if !defined(__CFA_NO_STATISTICS__)
    512                                 ring.submit_q.stats.submit_avg.val += to_submit;
    513                                 ring.submit_q.stats.submit_avg.cnt += 1;
    514                         #endif
    515                 }
    516 
    517                 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
     456        static int __drain_io( struct __io_data & ring, sigset_t * mask, int waitcnt, bool in_kernel ) {
     457                int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
    518458                if( ret < 0 ) {
    519459                        switch((int)errno) {
     
    557497                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
    558498
    559                 return [count, count > 0 || to_submit > 0];
     499                return count;
    560500        }
    561501
     
    579519                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    580520                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    581 
    582521                                // In the user-thread approach drain and if anything was drained,
    583522                                // batton pass to the user-thread
    584                                 int count;
    585                                 bool again;
    586                                 [count, again] = __drain_io( ring, &mask, 0, true );
     523                                int count = __drain_io( ring, &mask, 1, true );
    587524
    588525                                // Update statistics
     
    592529                                #endif
    593530
    594                                 if(again) {
     531                                if(count > 0) {
    595532                                        __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
    596533                                        __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
     
    602539                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    603540                                //In the naive approach, just poll the io completion queue directly
    604                                 int count;
    605                                 bool again;
    606                                 [count, again] = __drain_io( ring, &mask, 1, true );
     541                                int count = __drain_io( ring, &mask, 1, true );
    607542
    608543                                // Update statistics
     
    631566                // Then loop until we need to start
    632567                while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
    633 
    634568                        // Drain the io
    635                         int count;
    636                         bool again;
    637                         [count, again] = __drain_io( *this.ring, 0p, 0, false );
    638 
    639                         if(!again) reset++;
     569                        this.waiting = false;
     570                        int count = __drain_io( *this.ring, 0p, 0, false );
     571                        reset += count > 0 ? 1 : 0;
    640572
    641573                        // Update statistics
     
    645577                        #endif
    646578
    647                         // If we got something, just yield and check again
     579                        this.waiting = true;
    648580                        if(reset < 5) {
     581                                // If we got something, just yield and check again
    649582                                yield();
    650583                        }
    651                         // We didn't get anything baton pass to the slow poller
    652584                        else {
     585                                // We didn't get anything baton pass to the slow poller
    653586                                __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
     587                                post( this.ring->poller.sem );
     588                                park( __cfaabi_dbg_ctx );
    654589                                reset = 0;
    655 
    656                                 // wake up the slow poller
    657                                 post( this.ring->poller.sem );
    658 
    659                                 // park this thread
    660                                 park( __cfaabi_dbg_ctx );
    661590                        }
    662591                }
    663592
    664593                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
    665         }
    666 
    667         static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
    668         static inline void __wake_poller( struct __io_data & ring ) {
    669                 // sigval val = { 1 };
    670                 // pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
    671594        }
    672595
     
    709632                uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST);
    710633
    711                 // Mask the idx now to allow make everything easier to check
    712                 idx &= *ring.submit_q.mask;
     634                // Validate that we didn't overflow anything
     635                // Check that nothing overflowed
     636                /* paranoid */ verify( true );
     637
     638                // Check that it goes head -> tail -> alloc and never head -> alloc -> tail
     639                /* paranoid */ verify( true );
    713640
    714641                // Return the sqe
    715                 return [&ring.submit_q.sqes[ idx ], idx];
     642                return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx];
    716643        }
    717644
    718645        static inline void __submit( struct __io_data & ring, uint32_t idx ) {
    719                 // Get now the data we definetely need
    720                 uint32_t * const tail = ring.submit_q.tail;
     646                // get mutual exclusion
     647                lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
     648
     649                // Append to the list of ready entries
     650                uint32_t * tail = ring.submit_q.tail;
    721651                const uint32_t mask = *ring.submit_q.mask;
    722652
    723                 // There are 2 submission schemes, check which one we are using
    724                 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
    725                         // If the poller thread submits, then we just need to add this to the ready array
    726 
    727                         /* paranoid */ verify( idx <= mask   );
    728                         /* paranoid */ verify( idx != -1ul32 );
    729 
    730                         // We need to find a spot in the ready array
    731                         __attribute((unused)) int len   = 0;
    732                         __attribute((unused)) int block = 0;
    733                         uint32_t expected = -1ul32;
    734                         LOOKING: for(;;) {
    735                                 for(i; ring.submit_q.ready_cnt) {
    736                                         if( __atomic_compare_exchange_n( &ring.submit_q.ready[i], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
    737                                                 break LOOKING;
    738                                         }
    739 
    740                                         len ++;
    741                                 }
    742 
    743                                 block++;
    744                                 yield();
     653                ring.submit_q.array[ (*tail) & mask ] = idx & mask;
     654                __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
     655
     656                // Submit however, many entries need to be submitted
     657                int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
     658                if( ret < 0 ) {
     659                        switch((int)errno) {
     660                        default:
     661                                abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
    745662                        }
    746 
    747                         __wake_poller( ring );
    748 
    749                         // update statistics
    750                         #if !defined(__CFA_NO_STATISTICS__)
    751                                 __atomic_fetch_add( &ring.submit_q.stats.look_avg.val,   len,   __ATOMIC_RELAXED );
    752                                 __atomic_fetch_add( &ring.submit_q.stats.look_avg.block, block, __ATOMIC_RELAXED );
    753                                 __atomic_fetch_add( &ring.submit_q.stats.look_avg.cnt,   1,     __ATOMIC_RELAXED );
    754                         #endif
    755 
    756                         __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
    757                 }
    758                 else {
    759                         // get mutual exclusion
    760                         lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
    761 
    762                         // Append to the list of ready entries
    763 
    764                         /* paranoid */ verify( idx <= mask );
    765 
    766                         ring.submit_q.array[ (*tail) & mask ] = idx & mask;
    767                         __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
    768 
    769                         // Submit however, many entries need to be submitted
    770                         int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
    771                         if( ret < 0 ) {
    772                                 switch((int)errno) {
    773                                 default:
    774                                         abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
    775                                 }
    776                         }
    777 
    778                         // update statistics
    779                         #if !defined(__CFA_NO_STATISTICS__)
    780                                 ring.submit_q.stats.submit_avg.val += 1;
    781                                 ring.submit_q.stats.submit_avg.cnt += 1;
    782                         #endif
    783 
    784                         unlock(ring.submit_q.lock);
    785 
    786                         __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
    787                 }
     663                }
     664
     665                // update statistics
     666                #if !defined(__CFA_NO_STATISTICS__)
     667                        ring.submit_q.stats.submit_avg.val += 1;
     668                        ring.submit_q.stats.submit_avg.cnt += 1;
     669                #endif
     670
     671                unlock(ring.submit_q.lock);
     672                // Make sure that idx was submitted
     673                // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us
     674                __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
    788675        }
    789676
  • libcfa/src/concurrency/kernel.cfa

    r6c12fd28 r0dedf027  
    256256}
    257257
    258 void ?{}(cluster & this, const char name[], Duration preemption_rate, unsigned io_flags) with( this ) {
     258void ?{}(cluster & this, const char name[], Duration preemption_rate, int io_flags) with( this ) {
    259259        this.name = name;
    260260        this.preemption_rate = preemption_rate;
  • libcfa/src/concurrency/kernel.hfa

    r6c12fd28 r0dedf027  
    116116struct __io_data;
    117117
    118 #define CFA_CLUSTER_IO_POLLER_USER_THREAD    1 << 0 // 0x1
    119 #define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS 1 << 1 // 0x2
    120 // #define CFA_CLUSTER_IO_POLLER_KERNEL_SIDE 1 << 2 // 0x4
    121 #define CFA_CLUSTER_IO_BUFFLEN_OFFSET        16
     118#define CFA_CLUSTER_IO_POLLER_USER_THREAD 1 << 0
     119// #define CFA_CLUSTER_IO_POLLER_KERNEL_SIDE 1 << 1
    122120
    123121//-----------------------------------------------------------------------------
     
    161159extern Duration default_preemption();
    162160
    163 void ?{} (cluster & this, const char name[], Duration preemption_rate, unsigned flags);
     161void ?{} (cluster & this, const char name[], Duration preemption_rate, int flags);
    164162void ^?{}(cluster & this);
    165163
    166 static inline void ?{} (cluster & this)                                           { this{"Anonymous Cluster", default_preemption(), 0}; }
    167 static inline void ?{} (cluster & this, Duration preemption_rate)                 { this{"Anonymous Cluster", preemption_rate, 0}; }
    168 static inline void ?{} (cluster & this, const char name[])                        { this{name, default_preemption(), 0}; }
    169 static inline void ?{} (cluster & this, unsigned flags)                           { this{"Anonymous Cluster", default_preemption(), flags}; }
    170 static inline void ?{} (cluster & this, Duration preemption_rate, unsigned flags) { this{"Anonymous Cluster", preemption_rate, flags}; }
    171 static inline void ?{} (cluster & this, const char name[], unsigned flags)        { this{name, default_preemption(), flags}; }
     164static inline void ?{} (cluster & this)                                      { this{"Anonymous Cluster", default_preemption(), 0}; }
     165static inline void ?{} (cluster & this, Duration preemption_rate)            { this{"Anonymous Cluster", preemption_rate, 0}; }
     166static inline void ?{} (cluster & this, const char name[])                   { this{name, default_preemption(), 0}; }
     167static inline void ?{} (cluster & this, int flags)                           { this{"Anonymous Cluster", default_preemption(), flags}; }
     168static inline void ?{} (cluster & this, Duration preemption_rate, int flags) { this{"Anonymous Cluster", preemption_rate, flags}; }
     169static inline void ?{} (cluster & this, const char name[], int flags)        { this{name, default_preemption(), flags}; }
    172170
    173171static inline [cluster *&, cluster *& ] __get( cluster & this ) __attribute__((const)) { return this.node.[next, prev]; }
  • libcfa/src/concurrency/kernel_private.hfa

    r6c12fd28 r0dedf027  
    7777//-----------------------------------------------------------------------------
    7878// I/O
    79 void __kernel_io_startup     ( cluster &, unsigned, bool );
     79void __kernel_io_startup     ( cluster &, int, bool );
    8080void __kernel_io_finish_start( cluster & );
    8181void __kernel_io_prepare_stop( cluster & );
Note: See TracChangeset for help on using the changeset viewer.