Changes in / [2802824:33e62f1b]


Ignore:
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/readv.cfa

    r2802824 r33e62f1b  
    5959        unsigned long int nthreads = 2;
    6060        unsigned long int nprocs   = 1;
    61         unsigned flags = 0;
    62         unsigned sublen = 16;
     61        int flags = 0;
    6362
    6463        arg_loop:
    6564        for(;;) {
    6665                static struct option options[] = {
    67                         {"duration",     required_argument, 0, 'd'},
    68                         {"nthreads",     required_argument, 0, 't'},
    69                         {"nprocs",       required_argument, 0, 'p'},
    70                         {"bufsize",      required_argument, 0, 'b'},
    71                         {"userthread",   no_argument      , 0, 'u'},
    72                         {"submitthread", no_argument      , 0, 's'},
    73                         {"submitlength", required_argument, 0, 'l'},
     66                        {"duration",   required_argument, 0, 'd'},
     67                        {"nthreads",   required_argument, 0, 't'},
     68                        {"nprocs",     required_argument, 0, 'p'},
     69                        {"bufsize",    required_argument, 0, 'b'},
     70                        {"userthread", no_argument      , 0, 'u'},
    7471                        {0, 0, 0, 0}
    7572                };
    7673
    7774                int idx = 0;
    78                 int opt = getopt_long(argc, argv, "d:t:p:b:usl:", options, &idx);
     75                int opt = getopt_long(argc, argv, "d:t:p:b:u", options, &idx);
    7976
    8077                const char * arg = optarg ? optarg : "";
     
    116113                                flags |= CFA_CLUSTER_IO_POLLER_USER_THREAD;
    117114                                break;
    118                         case 's':
    119                                 flags |= CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS;
    120                                 break;
    121                         case 'l':
    122                                 sublen = strtoul(arg, &end, 10);
    123                                 if(*end != '\0' && sublen < 16) {
    124                                         fprintf(stderr, "Submit length must be at least 16, was %s\n", arg);
    125                                         goto usage;
    126                                 }
    127                                 flags |= (sublen << CFA_CLUSTER_IO_BUFFLEN_OFFSET);
    128                                 break;
    129115                        // Other cases
    130116                        default: /* ? */
     
    137123                                fprintf(stderr, "  -p, --nprocs=NPROCS      Number of kernel threads\n");
    138124                                fprintf(stderr, "  -b, --buflen=SIZE        Number of bytes to read per request\n");
    139                                 fprintf(stderr, "  -u, --userthread         If set, cluster uses user-thread to poll I/O\n");
    140                                 fprintf(stderr, "  -s, --submitthread       If set, cluster uses polling thread to submit I/O\n");
    141125                                exit(EXIT_FAILURE);
    142126                }
  • libcfa/src/concurrency/io.cfa

    r2802824 r33e62f1b  
    1818
    1919#include "kernel.hfa"
    20 #include "bitmanip.hfa"
    2120
    2221#if !defined(HAVE_LINUX_IO_URING_H)
    23         void __kernel_io_startup( cluster &, unsigned, bool ) {
     22        void __kernel_io_startup( cluster &, int, bool ) {
    2423                // Nothing to do without io_uring
    2524        }
     
    9291        struct __io_poller_fast {
    9392                struct __io_data * ring;
     93                bool waiting;
    9494                $thread thrd;
    9595        };
     
    9797        void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
    9898                this.ring = cltr.io;
     99                this.waiting = true;
    99100                (this.thrd){ "Fast I/O Poller", cltr };
    100101        }
     
    125126                // Like head/tail but not seen by the kernel
    126127                volatile uint32_t alloc;
    127                 volatile uint32_t * ready;
    128                 uint32_t ready_cnt;
     128                volatile uint32_t ready;
    129129
    130130                __spinlock_t lock;
     
    145145                                        volatile unsigned long long int block;
    146146                                } submit_avg;
    147                                 struct {
    148                                         volatile unsigned long long int val;
    149                                         volatile unsigned long long int cnt;
    150                                         volatile unsigned long long int block;
    151                                 } look_avg;
    152147                        } stats;
    153148                #endif
     
    197192                                void * stack;
    198193                                pthread_t kthrd;
    199                                 volatile bool blocked;
    200194                        } slow;
    201195                        __io_poller_fast fast;
     
    207201// I/O Startup / Shutdown logic
    208202//=============================================================================================
    209         void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
     203        void __kernel_io_startup( cluster & this, int io_flags, bool main_cluster ) {
    210204                this.io = malloc();
    211205
     
    280274                sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
    281275                sq.alloc = *sq.tail;
    282 
    283                 if( io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
    284                         /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8)  );
    285                         sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
    286                         sq.ready = alloc_align( 64, sq.ready_cnt );
    287                         for(i; sq.ready_cnt) {
    288                                 sq.ready[i] = -1ul32;
    289                         }
    290                 }
    291                 else {
    292                         sq.ready_cnt = 0;
    293                         sq.ready = 0p;
    294                 }
     276                sq.ready = *sq.tail;
    295277
    296278                // completion queue
     
    325307                        this.io->submit_q.stats.submit_avg.cnt   = 0;
    326308                        this.io->submit_q.stats.submit_avg.block = 0;
    327                         this.io->submit_q.stats.look_avg.val   = 0;
    328                         this.io->submit_q.stats.look_avg.cnt   = 0;
    329                         this.io->submit_q.stats.look_avg.block = 0;
    330309                        this.io->completion_q.stats.completed_avg.val = 0;
    331310                        this.io->completion_q.stats.completed_avg.slow_cnt = 0;
     
    347326                // Create the poller thread
    348327                __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this);
    349                 this.io->poller.slow.blocked = false;
    350328                this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
    351329        }
     
    369347                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    370348                        with( this.io->poller.fast ) {
     349                                /* paranoid */ verify( waiting ); // The thread shouldn't be in a system call
    371350                                /* paranoid */ verify( this.procs.head == 0p || &this == mainCluster );
    372351                                /* paranoid */ verify( this.idles.head == 0p || &this == mainCluster );
    373352
    374353                                // We need to adjust the clean-up based on where the thread is
    375                                 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
     354                                if( thrd.preempted != __NO_PREEMPTION ) {
    376355
    377356                                        // This is the tricky case
    378357                                        // The thread was preempted and now it is on the ready queue
    379 
     358                                        /* paranoid */ verify( thrd.state == Active );           // The thread better be in this state
    380359                                        /* paranoid */ verify( thrd.next != 0p );                // The thread should be the last on the list
    381360                                        /* paranoid */ verify( this.ready_queue.head == &thrd ); // The thread should be the only thing on the list
     
    426405                        if(this.print_stats) {
    427406                                with(this.io->submit_q.stats, this.io->completion_q.stats) {
    428                                         double lavgv = 0;
    429                                         double lavgb = 0;
    430                                         if(look_avg.cnt != 0) {
    431                                                 lavgv = ((double)look_avg.val  ) / look_avg.cnt;
    432                                                 lavgb = ((double)look_avg.block) / look_avg.cnt;
    433                                         }
    434 
    435                                         __cfaabi_bits_print_safe( STDOUT_FILENO,
     407                                        __cfaabi_bits_print_safe( STDERR_FILENO,
    436408                                                "----- I/O uRing Stats -----\n"
    437                                                 "- total submit calls     : %'15llu\n"
    438                                                 "- avg submit             : %'18.2lf\n"
    439                                                 "- pre-submit block %%     : %'18.2lf\n"
    440                                                 "- total ready search     : %'15llu\n"
    441                                                 "- avg ready search len   : %'18.2lf\n"
    442                                                 "- avg ready search block : %'18.2lf\n"
    443                                                 "- total wait calls       : %'15llu   (%'llu slow, %'llu fast)\n"
    444                                                 "- avg completion/wait    : %'18.2lf\n",
     409                                                "- total submit calls  : %'15llu\n"
     410                                                "- avg submit          : %'18.2lf\n"
     411                                                "- pre-submit block %%  : %'18.2lf\n"
     412                                                "- total wait calls    : %'15llu   (%'llu slow, %'llu fast)\n"
     413                                                "- avg completion/wait : %'18.2lf\n",
    445414                                                submit_avg.cnt,
    446415                                                ((double)submit_avg.val) / submit_avg.cnt,
    447416                                                (100.0 * submit_avg.block) / submit_avg.cnt,
    448                                                 look_avg.cnt,
    449                                                 lavgv,
    450                                                 lavgb,
    451417                                                completed_avg.slow_cnt + completed_avg.fast_cnt,
    452418                                                completed_avg.slow_cnt,  completed_avg.fast_cnt,
     
    475441                close(this.io->fd);
    476442
    477                 free( this.io->submit_q.ready ); // Maybe null, doesn't matter
    478443                free( this.io );
    479444        }
     
    489454        // Process a single completion message from the io_uring
    490455        // This is NOT thread-safe
    491         static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) {
    492                 unsigned to_submit = 0;
    493                 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
    494 
    495                         // If the poller thread also submits, then we need to aggregate the submissions which are ready
    496                         uint32_t * tail = ring.submit_q.tail;
    497                         const uint32_t mask = *ring.submit_q.mask;
    498 
    499                         // Go through the list of ready submissions
    500                         for( i; ring.submit_q.ready_cnt ) {
    501                                 // replace any submission with the sentinel, to consume it.
    502                                 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
    503 
    504                                 // If it was already the sentinel, then we are done
    505                                 if( idx == -1ul32 ) continue;
    506 
    507                                 // If we got a real submission, append it to the list
    508                                 ring.submit_q.array[ ((*tail) + to_submit) & mask ] = idx & mask;
    509                                 to_submit++;
    510                         }
    511 
    512                         // Increment the tail based on how many we are ready to submit
    513                         __atomic_fetch_add(tail, to_submit, __ATOMIC_SEQ_CST);
    514 
    515                         // update statistics
    516                         #if !defined(__CFA_NO_STATISTICS__)
    517                                 ring.submit_q.stats.submit_avg.val += to_submit;
    518                                 ring.submit_q.stats.submit_avg.cnt += 1;
    519                         #endif
    520                 }
    521 
    522                 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
     456        static int __drain_io( struct __io_data & ring, sigset_t * mask, int waitcnt, bool in_kernel ) {
     457                int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
    523458                if( ret < 0 ) {
    524459                        switch((int)errno) {
     
    562497                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
    563498
    564                 return [count, count > 0 || to_submit > 0];
     499                return count;
    565500        }
    566501
     
    584519                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    585520                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    586 
    587                                 __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST );
    588 
    589521                                // In the user-thread approach drain and if anything was drained,
    590522                                // batton pass to the user-thread
    591                                 int count;
    592                                 bool again;
    593                                 [count, again] = __drain_io( ring, &mask, 1, true );
    594 
    595                                 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
     523                                int count = __drain_io( ring, &mask, 1, true );
    596524
    597525                                // Update statistics
     
    601529                                #endif
    602530
    603                                 if(again) {
     531                                if(count > 0) {
    604532                                        __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
    605533                                        __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
     
    611539                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    612540                                //In the naive approach, just poll the io completion queue directly
    613                                 int count;
    614                                 bool again;
    615                                 [count, again] = __drain_io( ring, &mask, 1, true );
     541                                int count = __drain_io( ring, &mask, 1, true );
    616542
    617543                                // Update statistics
     
    640566                // Then loop until we need to start
    641567                while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
    642 
    643568                        // Drain the io
    644                         int count;
    645                         bool again;
    646                         [count, again] = __drain_io( *this.ring, 0p, 0, false );
    647 
    648                         if(!again) reset++;
     569                        this.waiting = false;
     570                        int count = __drain_io( *this.ring, 0p, 0, false );
     571                        reset += count > 0 ? 1 : 0;
    649572
    650573                        // Update statistics
     
    654577                        #endif
    655578
    656                         // If we got something, just yield and check again
     579                        this.waiting = true;
    657580                        if(reset < 5) {
     581                                // If we got something, just yield and check again
    658582                                yield();
    659583                        }
    660                         // We didn't get anything baton pass to the slow poller
    661584                        else {
     585                                // We didn't get anything baton pass to the slow poller
    662586                                __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
     587                                post( this.ring->poller.sem );
     588                                park( __cfaabi_dbg_ctx );
    663589                                reset = 0;
    664 
    665                                 // wake up the slow poller
    666                                 post( this.ring->poller.sem );
    667 
    668                                 // park this thread
    669                                 park( __cfaabi_dbg_ctx );
    670590                        }
    671591                }
    672592
    673593                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
    674         }
    675 
    676         static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
    677         static inline void __wake_poller( struct __io_data & ring ) {
    678                 if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;
    679 
    680                 sigval val = { 1 };
    681                 pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
    682594        }
    683595
     
    720632                uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST);
    721633
    722                 // Mask the idx now to allow make everything easier to check
    723                 idx &= *ring.submit_q.mask;
     634                // Validate that we didn't overflow anything
     635                // Check that nothing overflowed
     636                /* paranoid */ verify( true );
     637
     638                // Check that it goes head -> tail -> alloc and never head -> alloc -> tail
     639                /* paranoid */ verify( true );
    724640
    725641                // Return the sqe
    726                 return [&ring.submit_q.sqes[ idx ], idx];
     642                return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx];
    727643        }
    728644
    729645        static inline void __submit( struct __io_data & ring, uint32_t idx ) {
    730                 // Get now the data we definetely need
    731                 uint32_t * const tail = ring.submit_q.tail;
     646                // get mutual exclusion
     647                lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
     648
     649                // Append to the list of ready entries
     650                uint32_t * tail = ring.submit_q.tail;
    732651                const uint32_t mask = *ring.submit_q.mask;
    733652
    734                 // There are 2 submission schemes, check which one we are using
    735                 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
    736                         // If the poller thread submits, then we just need to add this to the ready array
    737 
    738                         /* paranoid */ verify( idx <= mask   );
    739                         /* paranoid */ verify( idx != -1ul32 );
    740 
    741                         // We need to find a spot in the ready array
    742                         __attribute((unused)) int len   = 0;
    743                         __attribute((unused)) int block = 0;
    744                         uint32_t expected = -1ul32;
    745                         uint32_t ready_mask = ring.submit_q.ready_cnt - 1;
    746                         uint32_t off = __tls_rand();
    747                         LOOKING: for() {
    748                                 for(i; ring.submit_q.ready_cnt) {
    749                                         uint32_t ii = (i + off) & ready_mask;
    750                                         if( __atomic_compare_exchange_n( &ring.submit_q.ready[ii], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
    751                                                 break LOOKING;
    752                                         }
    753 
    754                                         len ++;
    755                                 }
    756 
    757                                 block++;
    758                                 yield();
     653                ring.submit_q.array[ (*tail) & mask ] = idx & mask;
     654                __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
     655
     656                // Submit however, many entries need to be submitted
     657                int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
     658                if( ret < 0 ) {
     659                        switch((int)errno) {
     660                        default:
     661                                abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
    759662                        }
    760 
    761                         __wake_poller( ring );
    762 
    763                         // update statistics
    764                         #if !defined(__CFA_NO_STATISTICS__)
    765                                 __atomic_fetch_add( &ring.submit_q.stats.look_avg.val,   len,   __ATOMIC_RELAXED );
    766                                 __atomic_fetch_add( &ring.submit_q.stats.look_avg.block, block, __ATOMIC_RELAXED );
    767                                 __atomic_fetch_add( &ring.submit_q.stats.look_avg.cnt,   1,     __ATOMIC_RELAXED );
    768                         #endif
    769 
    770                         __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
    771                 }
    772                 else {
    773                         // get mutual exclusion
    774                         lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
    775 
    776                         // Append to the list of ready entries
    777 
    778                         /* paranoid */ verify( idx <= mask );
    779 
    780                         ring.submit_q.array[ (*tail) & mask ] = idx & mask;
    781                         __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
    782 
    783                         // Submit however, many entries need to be submitted
    784                         int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
    785                         if( ret < 0 ) {
    786                                 switch((int)errno) {
    787                                 default:
    788                                         abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
    789                                 }
    790                         }
    791 
    792                         // update statistics
    793                         #if !defined(__CFA_NO_STATISTICS__)
    794                                 ring.submit_q.stats.submit_avg.val += 1;
    795                                 ring.submit_q.stats.submit_avg.cnt += 1;
    796                         #endif
    797 
    798                         unlock(ring.submit_q.lock);
    799 
    800                         __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
    801                 }
     663                }
     664
     665                // update statistics
     666                #if !defined(__CFA_NO_STATISTICS__)
     667                        ring.submit_q.stats.submit_avg.val += 1;
     668                        ring.submit_q.stats.submit_avg.cnt += 1;
     669                #endif
     670
     671                unlock(ring.submit_q.lock);
     672                // Make sure that idx was submitted
     673                // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us
     674                __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
    802675        }
    803676
  • libcfa/src/concurrency/kernel.cfa

    r2802824 r33e62f1b  
    258258}
    259259
    260 void ?{}(cluster & this, const char name[], Duration preemption_rate, unsigned io_flags) with( this ) {
     260void ?{}(cluster & this, const char name[], Duration preemption_rate, int io_flags) with( this ) {
    261261        this.name = name;
    262262        this.preemption_rate = preemption_rate;
  • libcfa/src/concurrency/kernel.hfa

    r2802824 r33e62f1b  
    117117struct __io_data;
    118118
    119 #define CFA_CLUSTER_IO_POLLER_USER_THREAD    1 << 0 // 0x1
    120 #define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS 1 << 1 // 0x2
    121 // #define CFA_CLUSTER_IO_POLLER_KERNEL_SIDE 1 << 2 // 0x4
    122 #define CFA_CLUSTER_IO_BUFFLEN_OFFSET        16
     119#define CFA_CLUSTER_IO_POLLER_USER_THREAD 1 << 0
     120// #define CFA_CLUSTER_IO_POLLER_KERNEL_SIDE 1 << 1
    123121
    124122
     
    312310extern Duration default_preemption();
    313311
    314 void ?{} (cluster & this, const char name[], Duration preemption_rate, unsigned flags);
     312void ?{} (cluster & this, const char name[], Duration preemption_rate, int flags);
    315313void ^?{}(cluster & this);
    316314
    317 static inline void ?{} (cluster & this)                                           { this{"Anonymous Cluster", default_preemption(), 0}; }
    318 static inline void ?{} (cluster & this, Duration preemption_rate)                 { this{"Anonymous Cluster", preemption_rate, 0}; }
    319 static inline void ?{} (cluster & this, const char name[])                        { this{name, default_preemption(), 0}; }
    320 static inline void ?{} (cluster & this, unsigned flags)                           { this{"Anonymous Cluster", default_preemption(), flags}; }
    321 static inline void ?{} (cluster & this, Duration preemption_rate, unsigned flags) { this{"Anonymous Cluster", preemption_rate, flags}; }
    322 static inline void ?{} (cluster & this, const char name[], unsigned flags)        { this{name, default_preemption(), flags}; }
     315static inline void ?{} (cluster & this)                                      { this{"Anonymous Cluster", default_preemption(), 0}; }
     316static inline void ?{} (cluster & this, Duration preemption_rate)            { this{"Anonymous Cluster", preemption_rate, 0}; }
     317static inline void ?{} (cluster & this, const char name[])                   { this{name, default_preemption(), 0}; }
     318static inline void ?{} (cluster & this, int flags)                           { this{"Anonymous Cluster", default_preemption(), flags}; }
     319static inline void ?{} (cluster & this, Duration preemption_rate, int flags) { this{"Anonymous Cluster", preemption_rate, flags}; }
     320static inline void ?{} (cluster & this, const char name[], int flags)        { this{name, default_preemption(), flags}; }
    323321
    324322static inline [cluster *&, cluster *& ] __get( cluster & this ) __attribute__((const)) { return this.node.[next, prev]; }
  • libcfa/src/concurrency/kernel_private.hfa

    r2802824 r33e62f1b  
    7777//-----------------------------------------------------------------------------
    7878// I/O
    79 void __kernel_io_startup     ( cluster &, unsigned, bool );
     79void __kernel_io_startup     ( cluster &, int, bool );
    8080void __kernel_io_finish_start( cluster & );
    8181void __kernel_io_prepare_stop( cluster & );
Note: See TracChangeset for help on using the changeset viewer.