Ignore:
Timestamp:
Apr 30, 2020, 3:27:11 PM (3 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
arm-eh, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
9987d79
Parents:
c59a346
Message:

Added new implementation of io_uring that uses user-thread

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    rc59a346 rf6660520  
    1717
    1818#if !defined(HAVE_LINUX_IO_URING_H)
    19         void __kernel_io_startup( cluster & this ) {
     19        void __kernel_io_startup( cluster & ) {
    2020                // Nothing to do without io_uring
    2121        }
    2222
    23         void __kernel_io_shutdown( cluster & this ) {
     23        void __kernel_io_start_thrd( cluster & ) {
     24                // Nothing to do without io_uring
     25        }
     26
     27        void __kernel_io_stop_thrd ( cluster & ) {
     28                // Nothing to do without io_uring
     29        }
     30
     31        void __kernel_io_shutdown( cluster & ) {
    2432                // Nothing to do without io_uring
    2533        }
     
    4654        }
    4755
    48         static void * __io_poller( void * arg );
    49 
    50        // Weirdly, some systems that do support io_uring don't actually define these
    51        #ifdef __alpha__
    52        /*
    53        * alpha is the only exception, all other architectures
    54        * have common numbers for new system calls.
    55        */
    56        # ifndef __NR_io_uring_setup
    57        #  define __NR_io_uring_setup           535
    58        # endif
    59        # ifndef __NR_io_uring_enter
    60        #  define __NR_io_uring_enter           536
    61        # endif
    62        # ifndef __NR_io_uring_register
    63        #  define __NR_io_uring_register        537
    64        # endif
    65        #else /* !__alpha__ */
    66        # ifndef __NR_io_uring_setup
    67        #  define __NR_io_uring_setup           425
    68        # endif
    69        # ifndef __NR_io_uring_enter
    70        #  define __NR_io_uring_enter           426
    71        # endif
    72        # ifndef __NR_io_uring_register
    73        #  define __NR_io_uring_register        427
    74        # endif
    75        #endif
     56        static void * __io_poller_slow( void * arg );
     57
     58        // Weirdly, some systems that do support io_uring don't actually define these
     59        #ifdef __alpha__
     60                /*
     61                * alpha is the only exception, all other architectures
     62                * have common numbers for new system calls.
     63                */
     64                #ifndef __NR_io_uring_setup
     65                        #define __NR_io_uring_setup           535
     66                #endif
     67                #ifndef __NR_io_uring_enter
     68                        #define __NR_io_uring_enter           536
     69                #endif
     70                #ifndef __NR_io_uring_register
     71                        #define __NR_io_uring_register        537
     72                #endif
     73        #else /* !__alpha__ */
     74                #ifndef __NR_io_uring_setup
     75                        #define __NR_io_uring_setup           425
     76                #endif
     77                #ifndef __NR_io_uring_enter
     78                        #define __NR_io_uring_enter           426
     79                #endif
     80                #ifndef __NR_io_uring_register
     81                        #define __NR_io_uring_register        427
     82                #endif
     83        #endif
     84
     85        #if defined(__CFA_IO_POLLING_USER__)
     86                void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
     87                        this.ring = &cltr.io;
     88                        (this.thrd){ "I/O Poller", cltr };
     89                }
     90                void ^?{}( __io_poller_fast & mutex this );
     91        void main( __io_poller_fast & this );
     92        static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
     93                void ^?{}( __io_poller_fast & mutex this ) {}
     94        #endif
    7695
    7796//=============================================================================================
    7897// I/O Startup / Shutdown logic
    7998//=============================================================================================
    80         void __kernel_io_startup( cluster & this ) {
     99        void __kernel_io_startup( cluster & this, bool main_cluster ) {
    81100                // Step 1 : call to setup
    82101                struct io_uring_params params;
     
    184203                #endif
    185204
     205                if(!main_cluster) {
     206                        __kernel_io_finish_start( this );
     207                }
     208        }
     209
     210        void __kernel_io_finish_start( cluster & this ) {
     211                #if defined(__CFA_IO_POLLING_USER__)
     212                        (this.io.poller.fast){ this };
     213                        __thrd_start( this.io.poller.fast, main );
     214                #endif
     215
    186216                // Create the poller thread
    187                 this.io.stack = __create_pthread( &this.io.poller, __io_poller, &this );
    188         }
    189 
    190         void __kernel_io_shutdown( cluster & this ) {
    191                 // Stop the IO Poller
    192                 #if __CFA_IO_POLLING__ == __CFA_IO_POLLING_NAIVE__
     217                this.io.poller.slow.stack = __create_pthread( &this.io.poller.slow.kthrd, __io_poller_slow, &this );
     218        }
     219
     220        void __kernel_io_prepare_stop( cluster & this ) {
    193221                // Notify the poller thread of the shutdown
    194222                __atomic_store_n(&this.io.done, true, __ATOMIC_SEQ_CST);
     223
     224                // Stop the IO Poller
    195225                sigval val = { 1 };
    196                 pthread_sigqueue( this.io.poller, SIGUSR1, val );
     226                pthread_sigqueue( this.io.poller.slow.kthrd, SIGUSR1, val );
     227                #if defined(__CFA_IO_POLLING_USER__)
     228                        post( this.io.poller.sem );
     229                #endif
    197230
    198231                // Wait for the poller thread to finish
    199                 pthread_join( this.io.poller, 0p );
    200                 free( this.io.stack );
     232                pthread_join( this.io.poller.slow.kthrd, 0p );
     233                free( this.io.poller.slow.stack );
     234
     235                #if defined(__CFA_IO_POLLING_USER__)
     236                        // unpark the fast io_poller
     237                        unpark( &this.io.poller.fast.thrd __cfaabi_dbg_ctx2 );
     238
     239                        ^(this.io.poller.fast){};
     240                #endif
     241        }
     242
     243        void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
     244                if(!main_cluster) {
     245                        __kernel_io_prepare_stop( this );
     246                }
    201247
    202248                // print statistics
     
    246292        // Process a single completion message from the io_uring
    247293        // This is NOT thread-safe
    248         static int __drain_io( struct io_ring & ring, sigset_t & mask, int waitcnt ) {
    249                 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, &mask, _NSIG / 8);
     294        static int __drain_io( struct io_ring & ring, sigset_t * mask, int waitcnt, bool in_kernel ) {
     295                int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
    250296                if( ret < 0 ) {
    251297                        switch((int)errno) {
     
    281327
    282328                        data->result = cqe.res;
    283                         __unpark( data->thrd __cfaabi_dbg_ctx2 );
     329                        if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
     330                        else         { __unpark( data->thrd __cfaabi_dbg_ctx2 ); }
    284331                }
    285332
     
    300347        }
    301348
    302         static void * __io_poller( void * arg ) {
     349        static void * __io_poller_slow( void * arg ) {
    303350                cluster * cltr = (cluster *)arg;
    304351                struct io_ring & ring = cltr->io;
     
    316363
    317364                while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    318                         __drain_io( ring, mask, 1 );
     365                        #if defined(__CFA_IO_POLLING_USER__)
     366
     367                                // In the user-thread approach drain and if anything was drained,
     368                                // batton pass to the user-thread
     369                                int count = __drain_io( ring, &mask, 1, true );
     370                                if(count > 0) {
     371                                        __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
     372                                        wait( ring.poller.sem );
     373                                }
     374
     375                        #else
     376
     377                                //In the naive approach, just poll the io completion queue directly
     378                                __drain_io( ring, &mask, 1, true );
     379
     380                        #endif
    319381                }
    320382
    321383                return 0p;
    322384        }
     385
     386        #if defined(__CFA_IO_POLLING_USER__)
     387                void main( __io_poller_fast & this ) {
     388                        // Start parked
     389                        park( __cfaabi_dbg_ctx );
     390
     391                        // Then loop until we need to start
     392                        while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
     393                                // Drain the io
     394                                if(0 > __drain_io( *this.ring, 0p, 0, false )) {
     395                                        // If we got something, just yield and check again
     396                                        yield();
     397                                }
     398                                else {
     399                                        // We didn't get anything baton pass to the slow poller
     400                                        post( this.ring->poller.sem );
     401                                        park( __cfaabi_dbg_ctx );
     402                                }
     403                        }
     404                }
     405        #endif
    323406
    324407//=============================================================================================
     
    422505                this.len = len;
    423506        }
    424 #endif
     507
    425508
    426509//=============================================================================================
    427510// I/O Interface
    428511//=============================================================================================
    429 #if defined(HAVE_LINUX_IO_URING_H)
     512
    430513        #define __submit_prelude \
    431514                struct io_ring & ring = active_cluster()->io; \
Note: See TracChangeset for help on using the changeset viewer.