Changeset f6660520 for libcfa


Ignore:
Timestamp:
Apr 30, 2020, 3:27:11 PM (4 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
9987d79
Parents:
c59a346
Message:

Added new implementation of io_uring that uses user-thread

Location:
libcfa
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • libcfa/prelude/defines.hfa.in

    rc59a346 rf6660520  
     1//
     2// Cforall Version 1.0.0 Copyright (C) 2016 University of Waterloo
     3//
     4// The contents of this file are covered under the licence agreement in the
     5// file "LICENCE" distributed with Cforall.
     6//
     7// defines.hfa.in --
     8//
     9// Author           : Thierry Delisle
     10// Created On       : Thu Apr 30 15:23:00 2020
     11// Last Modified By :
     12// Last Modified On :
     13// Update Count     :
     14//
    115
    216#undef HAVE_LINUX_IO_URING_H
     17
     18#define __CFA_IO_POLLING_USER__
     19// #define __CFA_IO_POLLING_KERNEL__
  • libcfa/src/concurrency/io.cfa

    rc59a346 rf6660520  
    1717
    1818#if !defined(HAVE_LINUX_IO_URING_H)
    19         void __kernel_io_startup( cluster & this ) {
     19        void __kernel_io_startup( cluster & ) {
    2020                // Nothing to do without io_uring
    2121        }
    2222
    23         void __kernel_io_shutdown( cluster & this ) {
     23        void __kernel_io_start_thrd( cluster & ) {
     24                // Nothing to do without io_uring
     25        }
     26
     27        void __kernel_io_stop_thrd ( cluster & ) {
     28                // Nothing to do without io_uring
     29        }
     30
     31        void __kernel_io_shutdown( cluster & ) {
    2432                // Nothing to do without io_uring
    2533        }
     
    4654        }
    4755
    48         static void * __io_poller( void * arg );
    49 
    50        // Weirdly, some systems that do support io_uring don't actually define these
    51        #ifdef __alpha__
    52        /*
    53        * alpha is the only exception, all other architectures
    54        * have common numbers for new system calls.
    55        */
    56        # ifndef __NR_io_uring_setup
    57        #  define __NR_io_uring_setup           535
    58        # endif
    59        # ifndef __NR_io_uring_enter
    60        #  define __NR_io_uring_enter           536
    61        # endif
    62        # ifndef __NR_io_uring_register
    63        #  define __NR_io_uring_register        537
    64        # endif
    65        #else /* !__alpha__ */
    66        # ifndef __NR_io_uring_setup
    67        #  define __NR_io_uring_setup           425
    68        # endif
    69        # ifndef __NR_io_uring_enter
    70        #  define __NR_io_uring_enter           426
    71        # endif
    72        # ifndef __NR_io_uring_register
    73        #  define __NR_io_uring_register        427
    74        # endif
    75        #endif
     56        static void * __io_poller_slow( void * arg );
     57
     58        // Weirdly, some systems that do support io_uring don't actually define these
     59        #ifdef __alpha__
     60                /*
     61                * alpha is the only exception, all other architectures
     62                * have common numbers for new system calls.
     63                */
     64                #ifndef __NR_io_uring_setup
     65                        #define __NR_io_uring_setup           535
     66                #endif
     67                #ifndef __NR_io_uring_enter
     68                        #define __NR_io_uring_enter           536
     69                #endif
     70                #ifndef __NR_io_uring_register
     71                        #define __NR_io_uring_register        537
     72                #endif
     73        #else /* !__alpha__ */
     74                #ifndef __NR_io_uring_setup
     75                        #define __NR_io_uring_setup           425
     76                #endif
     77                #ifndef __NR_io_uring_enter
     78                        #define __NR_io_uring_enter           426
     79                #endif
     80                #ifndef __NR_io_uring_register
     81                        #define __NR_io_uring_register        427
     82                #endif
     83        #endif
     84
     85        #if defined(__CFA_IO_POLLING_USER__)
     86                void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
     87                        this.ring = &cltr.io;
     88                        (this.thrd){ "I/O Poller", cltr };
     89                }
     90                void ^?{}( __io_poller_fast & mutex this );
     91        void main( __io_poller_fast & this );
     92        static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
     93                void ^?{}( __io_poller_fast & mutex this ) {}
     94        #endif
    7695
    7796//=============================================================================================
    7897// I/O Startup / Shutdown logic
    7998//=============================================================================================
    80         void __kernel_io_startup( cluster & this ) {
     99        void __kernel_io_startup( cluster & this, bool main_cluster ) {
    81100                // Step 1 : call to setup
    82101                struct io_uring_params params;
     
    184203                #endif
    185204
     205                if(!main_cluster) {
     206                        __kernel_io_finish_start( this );
     207                }
     208        }
     209
     210        void __kernel_io_finish_start( cluster & this ) {
     211                #if defined(__CFA_IO_POLLING_USER__)
     212                        (this.io.poller.fast){ this };
     213                        __thrd_start( this.io.poller.fast, main );
     214                #endif
     215
    186216                // Create the poller thread
    187                 this.io.stack = __create_pthread( &this.io.poller, __io_poller, &this );
    188         }
    189 
    190         void __kernel_io_shutdown( cluster & this ) {
    191                 // Stop the IO Poller
    192                 #if __CFA_IO_POLLING__ == __CFA_IO_POLLING_NAIVE__
     217                this.io.poller.slow.stack = __create_pthread( &this.io.poller.slow.kthrd, __io_poller_slow, &this );
     218        }
     219
     220        void __kernel_io_prepare_stop( cluster & this ) {
    193221                // Notify the poller thread of the shutdown
    194222                __atomic_store_n(&this.io.done, true, __ATOMIC_SEQ_CST);
     223
     224                // Stop the IO Poller
    195225                sigval val = { 1 };
    196                 pthread_sigqueue( this.io.poller, SIGUSR1, val );
     226                pthread_sigqueue( this.io.poller.slow.kthrd, SIGUSR1, val );
     227                #if defined(__CFA_IO_POLLING_USER__)
     228                        post( this.io.poller.sem );
     229                #endif
    197230
    198231                // Wait for the poller thread to finish
    199                 pthread_join( this.io.poller, 0p );
    200                 free( this.io.stack );
     232                pthread_join( this.io.poller.slow.kthrd, 0p );
     233                free( this.io.poller.slow.stack );
     234
     235                #if defined(__CFA_IO_POLLING_USER__)
     236                        // unpark the fast io_poller
     237                        unpark( &this.io.poller.fast.thrd __cfaabi_dbg_ctx2 );
     238
     239                        ^(this.io.poller.fast){};
     240                #endif
     241        }
     242
     243        void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
     244                if(!main_cluster) {
     245                        __kernel_io_prepare_stop( this );
     246                }
    201247
    202248                // print statistics
     
    246292        // Process a single completion message from the io_uring
    247293        // This is NOT thread-safe
    248         static int __drain_io( struct io_ring & ring, sigset_t & mask, int waitcnt ) {
    249                 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, &mask, _NSIG / 8);
     294        static int __drain_io( struct io_ring & ring, sigset_t * mask, int waitcnt, bool in_kernel ) {
     295                int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
    250296                if( ret < 0 ) {
    251297                        switch((int)errno) {
     
    281327
    282328                        data->result = cqe.res;
    283                         __unpark( data->thrd __cfaabi_dbg_ctx2 );
     329                        if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
     330                        else         { __unpark( data->thrd __cfaabi_dbg_ctx2 ); }
    284331                }
    285332
     
    300347        }
    301348
    302         static void * __io_poller( void * arg ) {
     349        static void * __io_poller_slow( void * arg ) {
    303350                cluster * cltr = (cluster *)arg;
    304351                struct io_ring & ring = cltr->io;
     
    316363
    317364                while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    318                         __drain_io( ring, mask, 1 );
     365                        #if defined(__CFA_IO_POLLING_USER__)
     366
     367                                // In the user-thread approach drain and if anything was drained,
     368                                // batton pass to the user-thread
     369                                int count = __drain_io( ring, &mask, 1, true );
     370                                if(count > 0) {
     371                                        __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
     372                                        wait( ring.poller.sem );
     373                                }
     374
     375                        #else
     376
     377                                //In the naive approach, just poll the io completion queue directly
     378                                __drain_io( ring, &mask, 1, true );
     379
     380                        #endif
    319381                }
    320382
    321383                return 0p;
    322384        }
     385
     386        #if defined(__CFA_IO_POLLING_USER__)
     387                void main( __io_poller_fast & this ) {
     388                        // Start parked
     389                        park( __cfaabi_dbg_ctx );
     390
     391                        // Then loop until we need to start
     392                        while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
     393                                // Drain the io
     394                                if(0 > __drain_io( *this.ring, 0p, 0, false )) {
     395                                        // If we got something, just yield and check again
     396                                        yield();
     397                                }
     398                                else {
     399                                        // We didn't get anything baton pass to the slow poller
     400                                        post( this.ring->poller.sem );
     401                                        park( __cfaabi_dbg_ctx );
     402                                }
     403                        }
     404                }
     405        #endif
    323406
    324407//=============================================================================================
     
    422505                this.len = len;
    423506        }
    424 #endif
     507
    425508
    426509//=============================================================================================
    427510// I/O Interface
    428511//=============================================================================================
    429 #if defined(HAVE_LINUX_IO_URING_H)
     512
    430513        #define __submit_prelude \
    431514                struct io_ring & ring = active_cluster()->io; \
  • libcfa/src/concurrency/kernel.cfa

    rc59a346 rf6660520  
    266266        threads{ __get };
    267267
    268         __kernel_io_startup( this );
     268        __kernel_io_startup( this, &this == mainCluster );
    269269
    270270        doregister(this);
     
    272272
    273273void ^?{}(cluster & this) {
    274         __kernel_io_shutdown( this );
     274        __kernel_io_shutdown( this, &this == mainCluster );
    275275
    276276        unregister(this);
     
    784784
    785785
    786 
    787786        // THE SYSTEM IS NOW COMPLETELY RUNNING
    788         __cfaabi_dbg_print_safe("Kernel : Started\n--------------------------------------------------\n\n");
     787
     788
     789        // Now that the system is up, finish creating systems that need threading
     790        __kernel_io_finish_start( *mainCluster );
     791
     792
     793        __cfadbg_print_safe(runtime_core, "Kernel : Started\n--------------------------------------------------\n\n");
    789794
    790795        verify( ! kernelTLS.preemption_state.enabled );
     
    794799
    795800static void __kernel_shutdown(void) {
    796         __cfaabi_dbg_print_safe("\n--------------------------------------------------\nKernel : Shutting down\n");
     801        //Before we start shutting things down, wait for systems that need threading to shutdown
     802        __kernel_io_prepare_stop( *mainCluster );
    797803
    798804        /* paranoid */ verify( TL_GET( preemption_state.enabled ) );
    799805        disable_interrupts();
    800806        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
     807
     808        __cfadbg_print_safe(runtime_core, "\n--------------------------------------------------\nKernel : Shutting down\n");
    801809
    802810        // SKULLDUGGERY: Notify the mainProcessor it needs to terminates.
  • libcfa/src/concurrency/kernel.hfa

    rc59a346 rf6660520  
    136136        // Like head/tail but not seen by the kernel
    137137        volatile uint32_t alloc;
     138        volatile uint32_t ready;
    138139
    139140        __spinlock_t lock;
     
    187188};
    188189
     190#if defined(__CFA_IO_POLLING_USER__)
     191        struct __io_poller_fast {
     192                struct io_ring * ring;
     193                $thread thrd;
     194        };
     195#endif
     196
    189197struct io_ring {
    190198        struct io_uring_sq submit_q;
     
    192200        uint32_t flags;
    193201        int fd;
    194         pthread_t poller;
    195         void * stack;
     202        semaphore submit;
    196203        volatile bool done;
    197         semaphore submit;
     204        struct {
     205                struct {
     206                        void * stack;
     207                        pthread_t kthrd;
     208                } slow;
     209                #if defined(__CFA_IO_POLLING_USER__)
     210                        __io_poller_fast fast;
     211                        __bin_sem_t sem;
     212                #endif
     213        } poller;
    198214};
    199215#endif
  • libcfa/src/concurrency/kernel_private.hfa

    rc59a346 rf6660520  
    7575//-----------------------------------------------------------------------------
    7676// I/O
    77 void __kernel_io_startup ( cluster & );
    78 void __kernel_io_shutdown( cluster & );
     77void __kernel_io_startup     ( cluster &, bool );
     78void __kernel_io_finish_start( cluster & );
     79void __kernel_io_prepare_stop( cluster & );
     80void __kernel_io_shutdown    ( cluster &, bool );
    7981
    8082//-----------------------------------------------------------------------------
Note: See TracChangeset for help on using the changeset viewer.