Ignore:
Timestamp:
May 12, 2020, 4:58:53 PM (5 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
2802824
Parents:
1b143de (diff), 068a202 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' into relaxed_ready

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    r1b143de r2f1cb37  
    1818
    1919#include "kernel.hfa"
     20#include "bitmanip.hfa"
    2021
    2122#if !defined(HAVE_LINUX_IO_URING_H)
    22         void __kernel_io_startup( cluster &, int, bool ) {
     23        void __kernel_io_startup( cluster &, unsigned, bool ) {
    2324                // Nothing to do without io_uring
    2425        }
     
    9192        struct __io_poller_fast {
    9293                struct __io_data * ring;
    93                 bool waiting;
    9494                $thread thrd;
    9595        };
     
    9797        void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
    9898                this.ring = cltr.io;
    99                 this.waiting = true;
    10099                (this.thrd){ "Fast I/O Poller", cltr };
    101100        }
     
    126125                // Like head/tail but not seen by the kernel
    127126                volatile uint32_t alloc;
    128                 volatile uint32_t ready;
     127                volatile uint32_t * ready;
     128                uint32_t ready_cnt;
    129129
    130130                __spinlock_t lock;
     
    145145                                        volatile unsigned long long int block;
    146146                                } submit_avg;
     147                                struct {
     148                                        volatile unsigned long long int val;
     149                                        volatile unsigned long long int cnt;
     150                                        volatile unsigned long long int block;
     151                                } look_avg;
    147152                        } stats;
    148153                #endif
     
    192197                                void * stack;
    193198                                pthread_t kthrd;
     199                                volatile bool blocked;
    194200                        } slow;
    195201                        __io_poller_fast fast;
     
    201207// I/O Startup / Shutdown logic
    202208//=============================================================================================
    203         void __kernel_io_startup( cluster & this, int io_flags, bool main_cluster ) {
     209        void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
    204210                this.io = malloc();
    205211
     
    274280                sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
    275281                sq.alloc = *sq.tail;
    276                 sq.ready = *sq.tail;
     282
     283                if( io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
     284                        /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8)  );
     285                        sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
     286                        sq.ready = alloc_align( 64, sq.ready_cnt );
     287                        for(i; sq.ready_cnt) {
     288                                sq.ready[i] = -1ul32;
     289                        }
     290                }
     291                else {
     292                        sq.ready_cnt = 0;
     293                        sq.ready = 0p;
     294                }
    277295
    278296                // completion queue
     
    307325                        this.io->submit_q.stats.submit_avg.cnt   = 0;
    308326                        this.io->submit_q.stats.submit_avg.block = 0;
     327                        this.io->submit_q.stats.look_avg.val   = 0;
     328                        this.io->submit_q.stats.look_avg.cnt   = 0;
     329                        this.io->submit_q.stats.look_avg.block = 0;
    309330                        this.io->completion_q.stats.completed_avg.val = 0;
    310331                        this.io->completion_q.stats.completed_avg.slow_cnt = 0;
     
    326347                // Create the poller thread
    327348                __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this);
     349                this.io->poller.slow.blocked = false;
    328350                this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
    329351        }
     
    347369                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    348370                        with( this.io->poller.fast ) {
    349                                 /* paranoid */ verify( waiting ); // The thread shouldn't be in a system call
    350371                                /* paranoid */ verify( this.procs.head == 0p || &this == mainCluster );
    351372                                /* paranoid */ verify( this.idles.head == 0p || &this == mainCluster );
    352373
    353374                                // We need to adjust the clean-up based on where the thread is
    354                                 if( thrd.preempted != __NO_PREEMPTION ) {
     375                                if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
    355376
    356377                                        // This is the tricky case
    357378                                        // The thread was preempted and now it is on the ready queue
    358                                         /* paranoid */ verify( thrd.state == Active );           // The thread better be in this state
     379
    359380                                        /* paranoid */ verify( thrd.next != 0p );                // The thread should be the last on the list
    360381                                        /* paranoid */ verify( this.ready_queue.head == &thrd ); // The thread should be the only thing on the list
     
    405426                        if(this.print_stats) {
    406427                                with(this.io->submit_q.stats, this.io->completion_q.stats) {
    407                                         __cfaabi_bits_print_safe( STDERR_FILENO,
     428                                        double lavgv = 0;
     429                                        double lavgb = 0;
     430                                        if(look_avg.cnt != 0) {
     431                                                lavgv = ((double)look_avg.val  ) / look_avg.cnt;
     432                                                lavgb = ((double)look_avg.block) / look_avg.cnt;
     433                                        }
     434
     435                                        __cfaabi_bits_print_safe( STDOUT_FILENO,
    408436                                                "----- I/O uRing Stats -----\n"
    409                                                 "- total submit calls  : %'15llu\n"
    410                                                 "- avg submit          : %'18.2lf\n"
    411                                                 "- pre-submit block %%  : %'18.2lf\n"
    412                                                 "- total wait calls    : %'15llu   (%'llu slow, %'llu fast)\n"
    413                                                 "- avg completion/wait : %'18.2lf\n",
     437                                                "- total submit calls     : %'15llu\n"
     438                                                "- avg submit             : %'18.2lf\n"
     439                                                "- pre-submit block %%     : %'18.2lf\n"
     440                                                "- total ready search     : %'15llu\n"
     441                                                "- avg ready search len   : %'18.2lf\n"
     442                                                "- avg ready search block : %'18.2lf\n"
     443                                                "- total wait calls       : %'15llu   (%'llu slow, %'llu fast)\n"
     444                                                "- avg completion/wait    : %'18.2lf\n",
    414445                                                submit_avg.cnt,
    415446                                                ((double)submit_avg.val) / submit_avg.cnt,
    416447                                                (100.0 * submit_avg.block) / submit_avg.cnt,
     448                                                look_avg.cnt,
     449                                                lavgv,
     450                                                lavgb,
    417451                                                completed_avg.slow_cnt + completed_avg.fast_cnt,
    418452                                                completed_avg.slow_cnt,  completed_avg.fast_cnt,
     
    441475                close(this.io->fd);
    442476
     477                free( this.io->submit_q.ready ); // Maybe null, doesn't matter
    443478                free( this.io );
    444479        }
     
    454489        // Process a single completion message from the io_uring
    455490        // This is NOT thread-safe
    456         static int __drain_io( struct __io_data & ring, sigset_t * mask, int waitcnt, bool in_kernel ) {
    457                 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
     491        static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) {
     492                unsigned to_submit = 0;
     493                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
     494
     495                        // If the poller thread also submits, then we need to aggregate the submissions which are ready
     496                        uint32_t * tail = ring.submit_q.tail;
     497                        const uint32_t mask = *ring.submit_q.mask;
     498
     499                        // Go through the list of ready submissions
     500                        for( i; ring.submit_q.ready_cnt ) {
     501                                // replace any submission with the sentinel, to consume it.
     502                                uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
     503
     504                                // If it was already the sentinel, then we are done
     505                                if( idx == -1ul32 ) continue;
     506
     507                                // If we got a real submission, append it to the list
     508                                ring.submit_q.array[ ((*tail) + to_submit) & mask ] = idx & mask;
     509                                to_submit++;
     510                        }
     511
     512                        // Increment the tail based on how many we are ready to submit
     513                        __atomic_fetch_add(tail, to_submit, __ATOMIC_SEQ_CST);
     514
     515                        // update statistics
     516                        #if !defined(__CFA_NO_STATISTICS__)
     517                                ring.submit_q.stats.submit_avg.val += to_submit;
     518                                ring.submit_q.stats.submit_avg.cnt += 1;
     519                        #endif
     520                }
     521
     522                int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
    458523                if( ret < 0 ) {
    459524                        switch((int)errno) {
     
    497562                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
    498563
    499                 return count;
     564                return [count, count > 0 || to_submit > 0];
    500565        }
    501566
     
    519584                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    520585                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
     586
     587                                __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST );
     588
    521589                                // In the user-thread approach drain and if anything was drained,
    522590                                // batton pass to the user-thread
    523                                 int count = __drain_io( ring, &mask, 1, true );
     591                                int count;
     592                                bool again;
     593                                [count, again] = __drain_io( ring, &mask, 1, true );
     594
     595                                __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
    524596
    525597                                // Update statistics
     
    529601                                #endif
    530602
    531                                 if(count > 0) {
     603                                if(again) {
    532604                                        __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
    533605                                        __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
     
    539611                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    540612                                //In the naive approach, just poll the io completion queue directly
    541                                 int count = __drain_io( ring, &mask, 1, true );
     613                                int count;
     614                                bool again;
     615                                [count, again] = __drain_io( ring, &mask, 1, true );
    542616
    543617                                // Update statistics
     
    566640                // Then loop until we need to start
    567641                while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
     642
    568643                        // Drain the io
    569                         this.waiting = false;
    570                         int count = __drain_io( *this.ring, 0p, 0, false );
    571                         reset += count > 0 ? 1 : 0;
     644                        int count;
     645                        bool again;
     646                        [count, again] = __drain_io( *this.ring, 0p, 0, false );
     647
     648                        if(!again) reset++;
    572649
    573650                        // Update statistics
     
    577654                        #endif
    578655
    579                         this.waiting = true;
     656                        // If we got something, just yield and check again
    580657                        if(reset < 5) {
    581                                 // If we got something, just yield and check again
    582658                                yield();
    583659                        }
     660                        // We didn't get anything baton pass to the slow poller
    584661                        else {
    585                                 // We didn't get anything baton pass to the slow poller
    586662                                __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
     663                                reset = 0;
     664
     665                                // wake up the slow poller
    587666                                post( this.ring->poller.sem );
     667
     668                                // park this thread
    588669                                park( __cfaabi_dbg_ctx );
    589                                 reset = 0;
    590670                        }
    591671                }
    592672
    593673                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
     674        }
     675
     676        static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
     677        static inline void __wake_poller( struct __io_data & ring ) {
     678                if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;
     679
     680                sigval val = { 1 };
     681                pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
    594682        }
    595683
     
    632720                uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST);
    633721
    634                 // Validate that we didn't overflow anything
    635                 // Check that nothing overflowed
    636                 /* paranoid */ verify( true );
    637 
    638                 // Check that it goes head -> tail -> alloc and never head -> alloc -> tail
    639                 /* paranoid */ verify( true );
     722                // Mask the idx now to allow make everything easier to check
     723                idx &= *ring.submit_q.mask;
    640724
    641725                // Return the sqe
    642                 return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx];
     726                return [&ring.submit_q.sqes[ idx ], idx];
    643727        }
    644728
    645729        static inline void __submit( struct __io_data & ring, uint32_t idx ) {
    646                 // get mutual exclusion
    647                 lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
    648 
    649                 // Append to the list of ready entries
    650                 uint32_t * tail = ring.submit_q.tail;
     730                // Get now the data we definetely need
     731                uint32_t * const tail = ring.submit_q.tail;
    651732                const uint32_t mask = *ring.submit_q.mask;
    652733
    653                 ring.submit_q.array[ (*tail) & mask ] = idx & mask;
    654                 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
    655 
    656                 // Submit however, many entries need to be submitted
    657                 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
    658                 if( ret < 0 ) {
    659                         switch((int)errno) {
    660                         default:
    661                                 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
    662                         }
    663                 }
    664 
    665                 // update statistics
    666                 #if !defined(__CFA_NO_STATISTICS__)
    667                         ring.submit_q.stats.submit_avg.val += 1;
    668                         ring.submit_q.stats.submit_avg.cnt += 1;
    669                 #endif
    670 
    671                 unlock(ring.submit_q.lock);
    672                 // Make sure that idx was submitted
    673                 // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us
    674                 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
     734                // There are 2 submission schemes, check which one we are using
     735                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
     736                        // If the poller thread submits, then we just need to add this to the ready array
     737
     738                        /* paranoid */ verify( idx <= mask   );
     739                        /* paranoid */ verify( idx != -1ul32 );
     740
     741                        // We need to find a spot in the ready array
     742                        __attribute((unused)) int len   = 0;
     743                        __attribute((unused)) int block = 0;
     744                        uint32_t expected = -1ul32;
     745                        uint32_t ready_mask = ring.submit_q.ready_cnt - 1;
     746                        uint32_t off = __tls_rand();
     747                        LOOKING: for() {
     748                                for(i; ring.submit_q.ready_cnt) {
     749                                        uint32_t ii = (i + off) & ready_mask;
     750                                        if( __atomic_compare_exchange_n( &ring.submit_q.ready[ii], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
     751                                                break LOOKING;
     752                                        }
     753
     754                                        len ++;
     755                                }
     756
     757                                block++;
     758                                yield();
     759                        }
     760
     761                        __wake_poller( ring );
     762
     763                        // update statistics
     764                        #if !defined(__CFA_NO_STATISTICS__)
     765                                __atomic_fetch_add( &ring.submit_q.stats.look_avg.val,   len,   __ATOMIC_RELAXED );
     766                                __atomic_fetch_add( &ring.submit_q.stats.look_avg.block, block, __ATOMIC_RELAXED );
     767                                __atomic_fetch_add( &ring.submit_q.stats.look_avg.cnt,   1,     __ATOMIC_RELAXED );
     768                        #endif
     769
     770                        __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
     771                }
     772                else {
     773                        // get mutual exclusion
     774                        lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
     775
     776                        // Append to the list of ready entries
     777
     778                        /* paranoid */ verify( idx <= mask );
     779
     780                        ring.submit_q.array[ (*tail) & mask ] = idx & mask;
     781                        __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
     782
     783                        // Submit however, many entries need to be submitted
     784                        int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
     785                        if( ret < 0 ) {
     786                                switch((int)errno) {
     787                                default:
     788                                        abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
     789                                }
     790                        }
     791
     792                        // update statistics
     793                        #if !defined(__CFA_NO_STATISTICS__)
     794                                ring.submit_q.stats.submit_avg.val += 1;
     795                                ring.submit_q.stats.submit_avg.cnt += 1;
     796                        #endif
     797
     798                        unlock(ring.submit_q.lock);
     799
     800                        __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
     801                }
    675802        }
    676803
Note: See TracChangeset for help on using the changeset viewer.