Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    r068a202 r1b143de  
    1818
    1919#include "kernel.hfa"
    20 #include "bitmanip.hfa"
    2120
    2221#if !defined(HAVE_LINUX_IO_URING_H)
    23         void __kernel_io_startup( cluster &, unsigned, bool ) {
     22        void __kernel_io_startup( cluster &, int, bool ) {
    2423                // Nothing to do without io_uring
    2524        }
     
    9291        struct __io_poller_fast {
    9392                struct __io_data * ring;
     93                bool waiting;
    9494                $thread thrd;
    9595        };
     
    9797        void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
    9898                this.ring = cltr.io;
     99                this.waiting = true;
    99100                (this.thrd){ "Fast I/O Poller", cltr };
    100101        }
     
    125126                // Like head/tail but not seen by the kernel
    126127                volatile uint32_t alloc;
    127                 volatile uint32_t * ready;
    128                 uint32_t ready_cnt;
     128                volatile uint32_t ready;
    129129
    130130                __spinlock_t lock;
     
    145145                                        volatile unsigned long long int block;
    146146                                } submit_avg;
    147                                 struct {
    148                                         volatile unsigned long long int val;
    149                                         volatile unsigned long long int cnt;
    150                                         volatile unsigned long long int block;
    151                                 } look_avg;
    152147                        } stats;
    153148                #endif
     
    197192                                void * stack;
    198193                                pthread_t kthrd;
    199                                 volatile bool blocked;
    200194                        } slow;
    201195                        __io_poller_fast fast;
     
    207201// I/O Startup / Shutdown logic
    208202//=============================================================================================
    209         void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
     203        void __kernel_io_startup( cluster & this, int io_flags, bool main_cluster ) {
    210204                this.io = malloc();
    211205
     
    280274                sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
    281275                sq.alloc = *sq.tail;
    282 
    283                 if( io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
    284                         /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8)  );
    285                         sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
    286                         sq.ready = alloc_align( 64, sq.ready_cnt );
    287                         for(i; sq.ready_cnt) {
    288                                 sq.ready[i] = -1ul32;
    289                         }
    290                 }
    291                 else {
    292                         sq.ready_cnt = 0;
    293                         sq.ready = 0p;
    294                 }
     276                sq.ready = *sq.tail;
    295277
    296278                // completion queue
     
    325307                        this.io->submit_q.stats.submit_avg.cnt   = 0;
    326308                        this.io->submit_q.stats.submit_avg.block = 0;
    327                         this.io->submit_q.stats.look_avg.val   = 0;
    328                         this.io->submit_q.stats.look_avg.cnt   = 0;
    329                         this.io->submit_q.stats.look_avg.block = 0;
    330309                        this.io->completion_q.stats.completed_avg.val = 0;
    331310                        this.io->completion_q.stats.completed_avg.slow_cnt = 0;
     
    347326                // Create the poller thread
    348327                __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this);
    349                 this.io->poller.slow.blocked = false;
    350328                this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
    351329        }
     
    369347                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    370348                        with( this.io->poller.fast ) {
     349                                /* paranoid */ verify( waiting ); // The thread shouldn't be in a system call
    371350                                /* paranoid */ verify( this.procs.head == 0p || &this == mainCluster );
    372351                                /* paranoid */ verify( this.idles.head == 0p || &this == mainCluster );
    373352
    374353                                // We need to adjust the clean-up based on where the thread is
    375                                 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
     354                                if( thrd.preempted != __NO_PREEMPTION ) {
    376355
    377356                                        // This is the tricky case
    378357                                        // The thread was preempted and now it is on the ready queue
    379                                         /* paranoid */ verify( thrd.next == 1p );                // The thread should be the last on the list
     358                                        /* paranoid */ verify( thrd.state == Active );           // The thread better be in this state
     359                                        /* paranoid */ verify( thrd.next != 0p );                // The thread should be the last on the list
    380360                                        /* paranoid */ verify( this.ready_queue.head == &thrd ); // The thread should be the only thing on the list
    381361
     
    425405                        if(this.print_stats) {
    426406                                with(this.io->submit_q.stats, this.io->completion_q.stats) {
    427                                         double lavgv = 0;
    428                                         double lavgb = 0;
    429                                         if(look_avg.cnt != 0) {
    430                                                 lavgv = ((double)look_avg.val  ) / look_avg.cnt;
    431                                                 lavgb = ((double)look_avg.block) / look_avg.cnt;
    432                                         }
    433 
    434                                         __cfaabi_bits_print_safe( STDOUT_FILENO,
     407                                        __cfaabi_bits_print_safe( STDERR_FILENO,
    435408                                                "----- I/O uRing Stats -----\n"
    436                                                 "- total submit calls     : %'15llu\n"
    437                                                 "- avg submit             : %'18.2lf\n"
    438                                                 "- pre-submit block %%     : %'18.2lf\n"
    439                                                 "- total ready search     : %'15llu\n"
    440                                                 "- avg ready search len   : %'18.2lf\n"
    441                                                 "- avg ready search block : %'18.2lf\n"
    442                                                 "- total wait calls       : %'15llu   (%'llu slow, %'llu fast)\n"
    443                                                 "- avg completion/wait    : %'18.2lf\n",
     409                                                "- total submit calls  : %'15llu\n"
     410                                                "- avg submit          : %'18.2lf\n"
     411                                                "- pre-submit block %%  : %'18.2lf\n"
     412                                                "- total wait calls    : %'15llu   (%'llu slow, %'llu fast)\n"
     413                                                "- avg completion/wait : %'18.2lf\n",
    444414                                                submit_avg.cnt,
    445415                                                ((double)submit_avg.val) / submit_avg.cnt,
    446416                                                (100.0 * submit_avg.block) / submit_avg.cnt,
    447                                                 look_avg.cnt,
    448                                                 lavgv,
    449                                                 lavgb,
    450417                                                completed_avg.slow_cnt + completed_avg.fast_cnt,
    451418                                                completed_avg.slow_cnt,  completed_avg.fast_cnt,
     
    474441                close(this.io->fd);
    475442
    476                 free( this.io->submit_q.ready ); // Maybe null, doesn't matter
    477443                free( this.io );
    478444        }
     
    488454        // Process a single completion message from the io_uring
    489455        // This is NOT thread-safe
    490         static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) {
    491                 unsigned to_submit = 0;
    492                 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
    493 
    494                         // If the poller thread also submits, then we need to aggregate the submissions which are ready
    495                         uint32_t * tail = ring.submit_q.tail;
    496                         const uint32_t mask = *ring.submit_q.mask;
    497 
    498                         // Go through the list of ready submissions
    499                         for( i; ring.submit_q.ready_cnt ) {
    500                                 // replace any submission with the sentinel, to consume it.
    501                                 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
    502 
    503                                 // If it was already the sentinel, then we are done
    504                                 if( idx == -1ul32 ) continue;
    505 
    506                                 // If we got a real submission, append it to the list
    507                                 ring.submit_q.array[ ((*tail) + to_submit) & mask ] = idx & mask;
    508                                 to_submit++;
    509                         }
    510 
    511                         // Increment the tail based on how many we are ready to submit
    512                         __atomic_fetch_add(tail, to_submit, __ATOMIC_SEQ_CST);
    513 
    514                         // update statistics
    515                         #if !defined(__CFA_NO_STATISTICS__)
    516                                 ring.submit_q.stats.submit_avg.val += to_submit;
    517                                 ring.submit_q.stats.submit_avg.cnt += 1;
    518                         #endif
    519                 }
    520 
    521                 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
     456        static int __drain_io( struct __io_data & ring, sigset_t * mask, int waitcnt, bool in_kernel ) {
     457                int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
    522458                if( ret < 0 ) {
    523459                        switch((int)errno) {
     
    561497                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
    562498
    563                 return [count, count > 0 || to_submit > 0];
     499                return count;
    564500        }
    565501
     
    583519                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    584520                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    585 
    586                                 __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST );
    587 
    588521                                // In the user-thread approach drain and if anything was drained,
    589522                                // batton pass to the user-thread
    590                                 int count;
    591                                 bool again;
    592                                 [count, again] = __drain_io( ring, &mask, 1, true );
    593 
    594                                 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
     523                                int count = __drain_io( ring, &mask, 1, true );
    595524
    596525                                // Update statistics
     
    600529                                #endif
    601530
    602                                 if(again) {
     531                                if(count > 0) {
    603532                                        __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
    604533                                        __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
     
    610539                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    611540                                //In the naive approach, just poll the io completion queue directly
    612                                 int count;
    613                                 bool again;
    614                                 [count, again] = __drain_io( ring, &mask, 1, true );
     541                                int count = __drain_io( ring, &mask, 1, true );
    615542
    616543                                // Update statistics
     
    639566                // Then loop until we need to start
    640567                while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
    641 
    642568                        // Drain the io
    643                         int count;
    644                         bool again;
    645                         [count, again] = __drain_io( *this.ring, 0p, 0, false );
    646 
    647                         if(!again) reset++;
     569                        this.waiting = false;
     570                        int count = __drain_io( *this.ring, 0p, 0, false );
     571                        reset += count > 0 ? 1 : 0;
    648572
    649573                        // Update statistics
     
    653577                        #endif
    654578
    655                         // If we got something, just yield and check again
     579                        this.waiting = true;
    656580                        if(reset < 5) {
     581                                // If we got something, just yield and check again
    657582                                yield();
    658583                        }
    659                         // We didn't get anything baton pass to the slow poller
    660584                        else {
     585                                // We didn't get anything baton pass to the slow poller
    661586                                __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
     587                                post( this.ring->poller.sem );
     588                                park( __cfaabi_dbg_ctx );
    662589                                reset = 0;
    663 
    664                                 // wake up the slow poller
    665                                 post( this.ring->poller.sem );
    666 
    667                                 // park this thread
    668                                 park( __cfaabi_dbg_ctx );
    669590                        }
    670591                }
    671592
    672593                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
    673         }
    674 
    675         static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
    676         static inline void __wake_poller( struct __io_data & ring ) {
    677                 if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;
    678 
    679                 sigval val = { 1 };
    680                 pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
    681594        }
    682595
     
    719632                uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST);
    720633
    721                 // Mask the idx now to allow make everything easier to check
    722                 idx &= *ring.submit_q.mask;
     634                // Validate that we didn't overflow anything
     635                // Check that nothing overflowed
     636                /* paranoid */ verify( true );
     637
     638                // Check that it goes head -> tail -> alloc and never head -> alloc -> tail
     639                /* paranoid */ verify( true );
    723640
    724641                // Return the sqe
    725                 return [&ring.submit_q.sqes[ idx ], idx];
     642                return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx];
    726643        }
    727644
    728645        static inline void __submit( struct __io_data & ring, uint32_t idx ) {
    729                 // Get now the data we definetely need
    730                 uint32_t * const tail = ring.submit_q.tail;
     646                // get mutual exclusion
     647                lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
     648
     649                // Append to the list of ready entries
     650                uint32_t * tail = ring.submit_q.tail;
    731651                const uint32_t mask = *ring.submit_q.mask;
    732652
    733                 // There are 2 submission schemes, check which one we are using
    734                 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
    735                         // If the poller thread submits, then we just need to add this to the ready array
    736 
    737                         /* paranoid */ verify( idx <= mask   );
    738                         /* paranoid */ verify( idx != -1ul32 );
    739 
    740                         // We need to find a spot in the ready array
    741                         __attribute((unused)) int len   = 0;
    742                         __attribute((unused)) int block = 0;
    743                         uint32_t expected = -1ul32;
    744                         uint32_t ready_mask = ring.submit_q.ready_cnt - 1;
    745                         uint32_t off = __tls_rand();
    746                         LOOKING: for() {
    747                                 for(i; ring.submit_q.ready_cnt) {
    748                                         uint32_t ii = (i + off) & ready_mask;
    749                                         if( __atomic_compare_exchange_n( &ring.submit_q.ready[ii], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
    750                                                 break LOOKING;
    751                                         }
    752 
    753                                         len ++;
    754                                 }
    755 
    756                                 block++;
    757                                 yield();
     653                ring.submit_q.array[ (*tail) & mask ] = idx & mask;
     654                __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
     655
     656                // Submit however, many entries need to be submitted
     657                int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
     658                if( ret < 0 ) {
     659                        switch((int)errno) {
     660                        default:
     661                                abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
    758662                        }
    759 
    760                         __wake_poller( ring );
    761 
    762                         // update statistics
    763                         #if !defined(__CFA_NO_STATISTICS__)
    764                                 __atomic_fetch_add( &ring.submit_q.stats.look_avg.val,   len,   __ATOMIC_RELAXED );
    765                                 __atomic_fetch_add( &ring.submit_q.stats.look_avg.block, block, __ATOMIC_RELAXED );
    766                                 __atomic_fetch_add( &ring.submit_q.stats.look_avg.cnt,   1,     __ATOMIC_RELAXED );
    767                         #endif
    768 
    769                         __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
    770                 }
    771                 else {
    772                         // get mutual exclusion
    773                         lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
    774 
    775                         // Append to the list of ready entries
    776 
    777                         /* paranoid */ verify( idx <= mask );
    778 
    779                         ring.submit_q.array[ (*tail) & mask ] = idx & mask;
    780                         __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
    781 
    782                         // Submit however, many entries need to be submitted
    783                         int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
    784                         if( ret < 0 ) {
    785                                 switch((int)errno) {
    786                                 default:
    787                                         abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
    788                                 }
    789                         }
    790 
    791                         // update statistics
    792                         #if !defined(__CFA_NO_STATISTICS__)
    793                                 ring.submit_q.stats.submit_avg.val += 1;
    794                                 ring.submit_q.stats.submit_avg.cnt += 1;
    795                         #endif
    796 
    797                         unlock(ring.submit_q.lock);
    798 
    799                         __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
    800                 }
     663                }
     664
     665                // update statistics
     666                #if !defined(__CFA_NO_STATISTICS__)
     667                        ring.submit_q.stats.submit_avg.val += 1;
     668                        ring.submit_q.stats.submit_avg.cnt += 1;
     669                #endif
     670
     671                unlock(ring.submit_q.lock);
     672                // Make sure that idx was submitted
     673                // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us
     674                __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
    801675        }
    802676
Note: See TracChangeset for help on using the changeset viewer.