Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    r3e2b9c9 r5751a56  
    1414//
    1515
    16 #define __cforall_thread__
    17 
    1816#if defined(__CFA_DEBUG__)
    1917        // #define __CFA_DEBUG_PRINT_IO__
     
    2119#endif
    2220
    23 
    24 #if defined(CFA_HAVE_LINUX_IO_URING_H)
     21#include "kernel.hfa"
     22#include "bitmanip.hfa"
     23
     24#if !defined(CFA_HAVE_LINUX_IO_URING_H)
     25        void __kernel_io_startup( cluster &, unsigned, bool ) {
     26                // Nothing to do without io_uring
     27        }
     28
     29        void __kernel_io_finish_start( cluster & ) {
     30                // Nothing to do without io_uring
     31        }
     32
     33        void __kernel_io_prepare_stop( cluster & ) {
     34                // Nothing to do without io_uring
     35        }
     36
     37        void __kernel_io_shutdown( cluster &, bool ) {
     38                // Nothing to do without io_uring
     39        }
     40
     41#else
    2542        #define _GNU_SOURCE         /* See feature_test_macros(7) */
    2643        #include <errno.h>
    27         #include <signal.h>
    2844        #include <stdint.h>
    2945        #include <string.h>
    3046        #include <unistd.h>
     47        #include <sys/mman.h>
    3148
    3249        extern "C" {
    33                 #include <sys/epoll.h>
    3450                #include <sys/syscall.h>
    3551
     
    3753        }
    3854
    39         #include "stats.hfa"
    40         #include "kernel.hfa"
    41         #include "kernel/fwd.hfa"
    42         #include "io/types.hfa"
     55        #include "bits/signal.hfa"
     56        #include "kernel_private.hfa"
     57        #include "thread.hfa"
     58
     59        uint32_t entries_per_cluster() {
     60                return 256;
     61        }
     62
     63        static void * __io_poller_slow( void * arg );
     64
     65        // Weirdly, some systems that do support io_uring don't actually define these
     66        #ifdef __alpha__
     67                /*
     68                * alpha is the only exception, all other architectures
     69                * have common numbers for new system calls.
     70                */
     71                #ifndef __NR_io_uring_setup
     72                        #define __NR_io_uring_setup           535
     73                #endif
     74                #ifndef __NR_io_uring_enter
     75                        #define __NR_io_uring_enter           536
     76                #endif
     77                #ifndef __NR_io_uring_register
     78                        #define __NR_io_uring_register        537
     79                #endif
     80        #else /* !__alpha__ */
     81                #ifndef __NR_io_uring_setup
     82                        #define __NR_io_uring_setup           425
     83                #endif
     84                #ifndef __NR_io_uring_enter
     85                        #define __NR_io_uring_enter           426
     86                #endif
     87                #ifndef __NR_io_uring_register
     88                        #define __NR_io_uring_register        427
     89                #endif
     90        #endif
     91
     92        // Fast poller user-thread
     93        // Not using the "thread" keyword because we want to control
     94        // more carefully when to start/stop it
     95        struct __io_poller_fast {
     96                struct __io_data * ring;
     97                $thread thrd;
     98        };
     99
     100        void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
     101                this.ring = cltr.io;
     102                (this.thrd){ "Fast I/O Poller", cltr };
     103        }
     104        void ^?{}( __io_poller_fast & mutex this );
     105        void main( __io_poller_fast & this );
     106        static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
     107        void ^?{}( __io_poller_fast & mutex this ) {}
     108
     109        struct __submition_data {
     110                // Head and tail of the ring (associated with array)
     111                volatile uint32_t * head;
     112                volatile uint32_t * tail;
     113                volatile uint32_t prev_head;
     114
     115                // The actual kernel ring which uses head/tail
     116                // indexes into the sqes arrays
     117                uint32_t * array;
     118
     119                // number of entries and mask to go with it
     120                const uint32_t * num;
     121                const uint32_t * mask;
     122
     123                // Submission flags (Not sure what for)
     124                uint32_t * flags;
     125
     126                // number of sqes not submitted (whatever that means)
     127                uint32_t * dropped;
     128
     129                // Like head/tail but not seen by the kernel
     130                volatile uint32_t * ready;
     131                uint32_t ready_cnt;
     132
     133                __spinlock_t lock;
     134                __spinlock_t release_lock;
     135
     136                // A buffer of sqes (not the actual ring)
     137                struct io_uring_sqe * sqes;
     138
     139                // The location and size of the mmaped area
     140                void * ring_ptr;
     141                size_t ring_sz;
     142        };
     143
     144        struct __completion_data {
     145                // Head and tail of the ring
     146                volatile uint32_t * head;
     147                volatile uint32_t * tail;
     148
     149                // number of entries and mask to go with it
     150                const uint32_t * mask;
     151                const uint32_t * num;
     152
     153                // number of cqes not submitted (whatever that means)
     154                uint32_t * overflow;
     155
     156                // the kernel ring
     157                struct io_uring_cqe * cqes;
     158
     159                // The location and size of the mmaped area
     160                void * ring_ptr;
     161                size_t ring_sz;
     162        };
     163
     164        struct __io_data {
     165                struct __submition_data submit_q;
     166                struct __completion_data completion_q;
     167                uint32_t ring_flags;
     168                int cltr_flags;
     169                int fd;
     170                semaphore submit;
     171                volatile bool done;
     172                struct {
     173                        struct {
     174                                __processor_id_t id;
     175                                void * stack;
     176                                pthread_t kthrd;
     177                                volatile bool blocked;
     178                        } slow;
     179                        __io_poller_fast fast;
     180                        __bin_sem_t sem;
     181                } poller;
     182        };
    43183
    44184//=============================================================================================
    45 // I/O Syscall
     185// I/O Startup / Shutdown logic
    46186//=============================================================================================
    47         static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) {
     187        void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
     188                if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) {
     189                        abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n");
     190                }
     191
     192                this.io = malloc();
     193
     194                // Step 1 : call to setup
     195                struct io_uring_params params;
     196                memset(&params, 0, sizeof(params));
     197                if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS   ) params.flags |= IORING_SETUP_SQPOLL;
     198                if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES ) params.flags |= IORING_SETUP_IOPOLL;
     199
     200                uint32_t nentries = entries_per_cluster();
     201
     202                int fd = syscall(__NR_io_uring_setup, nentries, &params );
     203                if(fd < 0) {
     204                        abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
     205                }
     206
     207                // Step 2 : mmap result
     208                memset( this.io, 0, sizeof(struct __io_data) );
     209                struct __submition_data  & sq = this.io->submit_q;
     210                struct __completion_data & cq = this.io->completion_q;
     211
     212                // calculate the right ring size
     213                sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
     214                cq.ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
     215
     216                // Requires features
     217                #if defined(IORING_FEAT_SINGLE_MMAP)
     218                        // adjust the size according to the parameters
     219                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
     220                                cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz);
     221                        }
     222                #endif
     223
     224                // mmap the Submit Queue into existence
     225                sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
     226                if (sq.ring_ptr == (void*)MAP_FAILED) {
     227                        abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
     228                }
     229
     230                // Requires features
     231                #if defined(IORING_FEAT_SINGLE_MMAP)
     232                        // mmap the Completion Queue into existence (may or may not be needed)
     233                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
     234                                cq.ring_ptr = sq.ring_ptr;
     235                        }
     236                        else
     237                #endif
     238                {
     239                        // We need multiple call to MMAP
     240                        cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
     241                        if (cq.ring_ptr == (void*)MAP_FAILED) {
     242                                munmap(sq.ring_ptr, sq.ring_sz);
     243                                abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
     244                        }
     245                }
     246
     247                // mmap the submit queue entries
     248                size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
     249                sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
     250                if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
     251                        munmap(sq.ring_ptr, sq.ring_sz);
     252                        if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
     253                        abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
     254                }
     255
     256                // Get the pointers from the kernel to fill the structure
     257                // submit queue
     258                sq.head    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
     259                sq.tail    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
     260                sq.mask    = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
     261                sq.num     = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
     262                sq.flags   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
     263                sq.dropped = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
     264                sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
     265                sq.prev_head = *sq.head;
     266
     267                {
     268                        const uint32_t num = *sq.num;
     269                        for( i; num ) {
     270                                sq.sqes[i].user_data = 0ul64;
     271                        }
     272                }
     273
     274                (sq.lock){};
     275                (sq.release_lock){};
     276
     277                if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) {
     278                        /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8)  );
     279                        sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
     280                        sq.ready = alloc_align( 64, sq.ready_cnt );
     281                        for(i; sq.ready_cnt) {
     282                                sq.ready[i] = -1ul32;
     283                        }
     284                }
     285                else {
     286                        sq.ready_cnt = 0;
     287                        sq.ready = 0p;
     288                }
     289
     290                // completion queue
     291                cq.head     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
     292                cq.tail     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
     293                cq.mask     = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
     294                cq.num      = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
     295                cq.overflow = (         uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
     296                cq.cqes   = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
     297
     298                // some paranoid checks
     299                /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
     300                /* paranoid */ verifyf( (*cq.num)  >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
     301                /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
     302                /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
     303
     304                /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
     305                /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
     306                /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
     307                /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
     308
     309                // Update the global ring info
     310                this.io->ring_flags = params.flags;
     311                this.io->cltr_flags = io_flags;
     312                this.io->fd         = fd;
     313                this.io->done       = false;
     314                (this.io->submit){ min(*sq.num, *cq.num) };
     315
     316                if(!main_cluster) {
     317                        __kernel_io_finish_start( this );
     318                }
     319        }
     320
     321        void __kernel_io_finish_start( cluster & this ) {
     322                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
     323                        __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
     324                        (this.io->poller.fast){ this };
     325                        __thrd_start( this.io->poller.fast, main );
     326                }
     327
     328                // Create the poller thread
     329                __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this);
     330                this.io->poller.slow.blocked = false;
     331                this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
     332        }
     333
     334        void __kernel_io_prepare_stop( cluster & this ) {
     335                __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
     336                // Notify the poller thread of the shutdown
     337                __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
     338
     339                // Stop the IO Poller
     340                sigval val = { 1 };
     341                pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
     342                post( this.io->poller.sem );
     343
     344                // Wait for the poller thread to finish
     345                pthread_join( this.io->poller.slow.kthrd, 0p );
     346                free( this.io->poller.slow.stack );
     347
     348                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
     349
     350                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
     351                        with( this.io->poller.fast ) {
     352                                /* paranoid */ verify( this.nprocessors == 0 || &this == mainCluster );
     353                                /* paranoid */ verify( !ready_mutate_islocked() );
     354
     355                                // We need to adjust the clean-up based on where the thread is
     356                                if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
     357
     358                                        ready_schedule_lock( (struct __processor_id_t *)active_processor() );
     359
     360                                                // This is the tricky case
     361                                                // The thread was preempted and now it is on the ready queue
     362                                                // The thread should be the last on the list
     363                                                /* paranoid */ verify( thrd.link.next != 0p );
     364
     365                                                // Remove the thread from the ready queue of this cluster
     366                                                __attribute__((unused)) bool removed = remove_head( &this, &thrd );
     367                                                /* paranoid */ verify( removed );
     368                                                thrd.link.next = 0p;
     369                                                thrd.link.prev = 0p;
     370                                                __cfaabi_dbg_debug_do( thrd.unpark_stale = true );
     371
     372                                                // Fixup the thread state
     373                                                thrd.state = Blocked;
     374                                                thrd.ticket = 0;
     375                                                thrd.preempted = __NO_PREEMPTION;
     376
     377                                        ready_schedule_unlock( (struct __processor_id_t *)active_processor() );
     378
     379                                        // Pretend like the thread was blocked all along
     380                                }
     381                                // !!! This is not an else if !!!
     382                                if( thrd.state == Blocked ) {
     383
     384                                        // This is the "easy case"
     385                                        // The thread is parked and can easily be moved to active cluster
     386                                        verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
     387                                        thrd.curr_cluster = active_cluster();
     388
     389                                        // unpark the fast io_poller
     390                                        unpark( &thrd __cfaabi_dbg_ctx2 );
     391                                }
     392                                else {
     393
     394                                        // The thread is in a weird state
     395                                        // I don't know what to do here
     396                                        abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n");
     397                                }
     398
     399                        }
     400
     401                        ^(this.io->poller.fast){};
     402
     403                        __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
     404                }
     405        }
     406
     407        void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
     408                if(!main_cluster) {
     409                        __kernel_io_prepare_stop( this );
     410                }
     411
     412                // Shutdown the io rings
     413                struct __submition_data  & sq = this.io->submit_q;
     414                struct __completion_data & cq = this.io->completion_q;
     415
     416                // unmap the submit queue entries
     417                munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
     418
     419                // unmap the Submit Queue ring
     420                munmap(sq.ring_ptr, sq.ring_sz);
     421
     422                // unmap the Completion Queue ring, if it is different
     423                if (cq.ring_ptr != sq.ring_ptr) {
     424                        munmap(cq.ring_ptr, cq.ring_sz);
     425                }
     426
     427                // close the file descriptor
     428                close(this.io->fd);
     429
     430                free( this.io->submit_q.ready ); // Maybe null, doesn't matter
     431                free( this.io );
     432        }
     433
     434        int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) {
    48435                bool need_sys_to_submit = false;
    49436                bool need_sys_to_complete = false;
     437                unsigned min_complete = 0;
    50438                unsigned flags = 0;
     439
    51440
    52441                TO_SUBMIT:
     
    62451                }
    63452
     453                TO_COMPLETE:
    64454                if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
    65455                        flags |= IORING_ENTER_GETEVENTS;
     456                        if( mask ) {
     457                                need_sys_to_complete = true;
     458                                min_complete = 1;
     459                                break TO_COMPLETE;
     460                        }
    66461                        if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
    67462                                need_sys_to_complete = true;
     
    71466                int ret = 0;
    72467                if( need_sys_to_submit || need_sys_to_complete ) {
    73                         ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, 0p, _NSIG / 8);
     468                        ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);
    74469                        if( ret < 0 ) {
    75470                                switch((int)errno) {
     
    95490        static uint32_t __release_consumed_submission( struct __io_data & ring );
    96491
    97         static inline void process(struct io_uring_cqe & cqe ) {
     492        static inline void process(struct io_uring_cqe & cqe, struct __processor_id_t * id ) {
    98493                struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data;
    99494                __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
    100495
    101496                data->result = cqe.res;
    102                 unpark( data->thrd __cfaabi_dbg_ctx2 );
     497                if(!id) { unpark(     data->thrd __cfaabi_dbg_ctx2 ); }
     498                else  { __unpark( id, data->thrd __cfaabi_dbg_ctx2 ); }
    103499        }
    104500
    105501        // Process a single completion message from the io_uring
    106502        // This is NOT thread-safe
    107         static [int, bool] __drain_io( & struct __io_data ring ) {
     503        static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) {
    108504                /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
    109505
    110506                unsigned to_submit = 0;
    111                 if( ring.poller_submits ) {
     507                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
    112508                        // If the poller thread also submits, then we need to aggregate the submissions which are ready
    113509                        to_submit = __collect_submitions( ring );
    114510                }
    115511
    116                 int ret = __io_uring_enter(ring, to_submit, true);
     512                int ret = __io_uring_enter(ring, to_submit, true, mask);
    117513                if( ret < 0 ) {
    118514                        return [0, true];
     
    151547                        /* paranoid */ verify(&cqe);
    152548
    153                         process( cqe );
    154                 }
     549                        process( cqe, !mask ? (struct __processor_id_t *)0p : &ring.poller.slow.id );
     550                }
     551
     552                // Allow new submissions to happen
     553                // V(ring.submit, count);
    155554
    156555                // Mark to the kernel that the cqe has been seen
     
    162561        }
    163562
    164         void main( $io_ctx_thread & this ) {
    165                 epoll_event ev;
    166                 __ioctx_register( this, ev );
    167 
    168                 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring);
     563        static void * __io_poller_slow( void * arg ) {
     564                #if !defined( __CFA_NO_STATISTICS__ )
     565                        __stats_t local_stats;
     566                        __init_stats( &local_stats );
     567                        kernelTLS.this_stats = &local_stats;
     568                #endif
     569
     570                cluster * cltr = (cluster *)arg;
     571                struct __io_data & ring = *cltr->io;
     572
     573                ring.poller.slow.id.id = doregister( &ring.poller.slow.id );
     574
     575                sigset_t mask;
     576                sigfillset(&mask);
     577                if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
     578                        abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
     579                }
     580
     581                sigdelset( &mask, SIGUSR1 );
     582
     583                verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
     584                verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
     585
     586                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
     587
     588                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
     589                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
     590
     591                                __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST );
     592
     593                                // In the user-thread approach drain and if anything was drained,
     594                                // batton pass to the user-thread
     595                                int count;
     596                                bool again;
     597                                [count, again] = __drain_io( ring, &mask );
     598
     599                                __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
     600
     601                                // Update statistics
     602                                __STATS__( true,
     603                                        io.complete_q.completed_avg.val += count;
     604                                        io.complete_q.completed_avg.slow_cnt += 1;
     605                                )
     606
     607                                if(again) {
     608                                        __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
     609                                        __unpark( &ring.poller.slow.id, &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
     610                                        wait( ring.poller.sem );
     611                                }
     612                        }
     613                }
     614                else {
     615                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
     616                                //In the naive approach, just poll the io completion queue directly
     617                                int count;
     618                                bool again;
     619                                [count, again] = __drain_io( ring, &mask );
     620
     621                                // Update statistics
     622                                __STATS__( true,
     623                                        io.complete_q.completed_avg.val += count;
     624                                        io.complete_q.completed_avg.slow_cnt += 1;
     625                                )
     626                        }
     627                }
     628
     629                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
     630
     631                unregister( &ring.poller.slow.id );
     632
     633                #if !defined(__CFA_NO_STATISTICS__)
     634                        __tally_stats(cltr->stats, &local_stats);
     635                #endif
     636
     637                return 0p;
     638        }
     639
     640        void main( __io_poller_fast & this ) {
     641                verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD );
     642
     643                // Start parked
     644                park( __cfaabi_dbg_ctx );
     645
     646                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
    169647
    170648                int reset = 0;
     649
    171650                // Then loop until we need to start
    172                 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
     651                while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
     652
    173653                        // Drain the io
    174654                        int count;
    175655                        bool again;
    176656                        disable_interrupts();
    177                                 [count, again] = __drain_io( *this.ring );
     657                                [count, again] = __drain_io( *this.ring, 0p );
    178658
    179659                                if(!again) reset++;
     
    192672                        // We didn't get anything baton pass to the slow poller
    193673                        else {
    194                                 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self);
     674                                __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
    195675                                reset = 0;
    196676
    197                                 // block this thread
    198                                 __ioctx_prepare_block( this, ev );
    199                                 wait( this.sem );
     677                                // wake up the slow poller
     678                                post( this.ring->poller.sem );
     679
     680                                // park this thread
     681                                park( __cfaabi_dbg_ctx );
    200682                        }
    201683                }
    202684
    203685                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
     686        }
     687
     688        static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
     689        static inline void __wake_poller( struct __io_data & ring ) {
     690                if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;
     691
     692                sigval val = { 1 };
     693                pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
    204694        }
    205695
     
    316806        }
    317807
    318         void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1))) {
    319                 __io_data & ring = *ctx->thrd.ring;
     808        void __submit( struct __io_data & ring, uint32_t idx ) {
    320809                // Get now the data we definetely need
    321810                uint32_t * const tail = ring.submit_q.tail;
    322                 const uint32_t mask  = *ring.submit_q.mask;
     811                const uint32_t mask = *ring.submit_q.mask;
    323812
    324813                // There are 2 submission schemes, check which one we are using
    325                 if( ring.poller_submits ) {
     814                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
    326815                        // If the poller thread submits, then we just need to add this to the ready array
    327816                        __submit_to_ready_array( ring, idx, mask );
    328817
    329                         post( ctx->thrd.sem );
     818                        __wake_poller( ring );
    330819
    331820                        __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
    332821                }
    333                 else if( ring.eager_submits ) {
     822                else if( ring.cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS ) {
    334823                        uint32_t picked = __submit_to_ready_array( ring, idx, mask );
    335824
     
    360849                        // We got the lock
    361850                        unsigned to_submit = __collect_submitions( ring );
    362                         int ret = __io_uring_enter( ring, to_submit, false );
     851                        int ret = __io_uring_enter( ring, to_submit, false, 0p );
    363852                        if( ret < 0 ) {
    364853                                unlock(ring.submit_q.lock);
     
    403892
    404893                        // Submit however, many entries need to be submitted
    405                         int ret = __io_uring_enter( ring, 1, false );
     894                        int ret = __io_uring_enter( ring, 1, false, 0p );
    406895                        if( ret < 0 ) {
    407896                                switch((int)errno) {
     
    469958                return count;
    470959        }
     960
     961//=============================================================================================
     962// I/O Submissions
     963//=============================================================================================
     964
     965        void register_fixed_files( cluster & cl, int * files, unsigned count ) {
     966                int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );
     967                if( ret < 0 ) {
     968                        abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
     969                }
     970
     971                __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
     972        }
    471973#endif
Note: See TracChangeset for help on using the changeset viewer.