Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    r5751a56 r3e2b9c9  
    1414//
    1515
     16#define __cforall_thread__
     17
    1618#if defined(__CFA_DEBUG__)
    1719        // #define __CFA_DEBUG_PRINT_IO__
     
    1921#endif
    2022
    21 #include "kernel.hfa"
    22 #include "bitmanip.hfa"
    23 
    24 #if !defined(CFA_HAVE_LINUX_IO_URING_H)
    25         void __kernel_io_startup( cluster &, unsigned, bool ) {
    26                 // Nothing to do without io_uring
    27         }
    28 
    29         void __kernel_io_finish_start( cluster & ) {
    30                 // Nothing to do without io_uring
    31         }
    32 
    33         void __kernel_io_prepare_stop( cluster & ) {
    34                 // Nothing to do without io_uring
    35         }
    36 
    37         void __kernel_io_shutdown( cluster &, bool ) {
    38                 // Nothing to do without io_uring
    39         }
    40 
    41 #else
     23
     24#if defined(CFA_HAVE_LINUX_IO_URING_H)
    4225        #define _GNU_SOURCE         /* See feature_test_macros(7) */
    4326        #include <errno.h>
     27        #include <signal.h>
    4428        #include <stdint.h>
    4529        #include <string.h>
    4630        #include <unistd.h>
    47         #include <sys/mman.h>
    4831
    4932        extern "C" {
     33                #include <sys/epoll.h>
    5034                #include <sys/syscall.h>
    5135
     
    5337        }
    5438
    55         #include "bits/signal.hfa"
    56         #include "kernel_private.hfa"
    57         #include "thread.hfa"
    58 
    59         uint32_t entries_per_cluster() {
    60                 return 256;
    61         }
    62 
    63         static void * __io_poller_slow( void * arg );
    64 
    65         // Weirdly, some systems that do support io_uring don't actually define these
    66         #ifdef __alpha__
    67                 /*
    68                 * alpha is the only exception, all other architectures
    69                 * have common numbers for new system calls.
    70                 */
    71                 #ifndef __NR_io_uring_setup
    72                         #define __NR_io_uring_setup           535
    73                 #endif
    74                 #ifndef __NR_io_uring_enter
    75                         #define __NR_io_uring_enter           536
    76                 #endif
    77                 #ifndef __NR_io_uring_register
    78                         #define __NR_io_uring_register        537
    79                 #endif
    80         #else /* !__alpha__ */
    81                 #ifndef __NR_io_uring_setup
    82                         #define __NR_io_uring_setup           425
    83                 #endif
    84                 #ifndef __NR_io_uring_enter
    85                         #define __NR_io_uring_enter           426
    86                 #endif
    87                 #ifndef __NR_io_uring_register
    88                         #define __NR_io_uring_register        427
    89                 #endif
    90         #endif
    91 
    92         // Fast poller user-thread
    93         // Not using the "thread" keyword because we want to control
    94         // more carefully when to start/stop it
    95         struct __io_poller_fast {
    96                 struct __io_data * ring;
    97                 $thread thrd;
    98         };
    99 
    100         void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
    101                 this.ring = cltr.io;
    102                 (this.thrd){ "Fast I/O Poller", cltr };
    103         }
    104         void ^?{}( __io_poller_fast & mutex this );
    105         void main( __io_poller_fast & this );
    106         static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
    107         void ^?{}( __io_poller_fast & mutex this ) {}
    108 
    109         struct __submition_data {
    110                 // Head and tail of the ring (associated with array)
    111                 volatile uint32_t * head;
    112                 volatile uint32_t * tail;
    113                 volatile uint32_t prev_head;
    114 
    115                 // The actual kernel ring which uses head/tail
    116                 // indexes into the sqes arrays
    117                 uint32_t * array;
    118 
    119                 // number of entries and mask to go with it
    120                 const uint32_t * num;
    121                 const uint32_t * mask;
    122 
    123                 // Submission flags (Not sure what for)
    124                 uint32_t * flags;
    125 
    126                 // number of sqes not submitted (whatever that means)
    127                 uint32_t * dropped;
    128 
    129                 // Like head/tail but not seen by the kernel
    130                 volatile uint32_t * ready;
    131                 uint32_t ready_cnt;
    132 
    133                 __spinlock_t lock;
    134                 __spinlock_t release_lock;
    135 
    136                 // A buffer of sqes (not the actual ring)
    137                 struct io_uring_sqe * sqes;
    138 
    139                 // The location and size of the mmaped area
    140                 void * ring_ptr;
    141                 size_t ring_sz;
    142         };
    143 
    144         struct __completion_data {
    145                 // Head and tail of the ring
    146                 volatile uint32_t * head;
    147                 volatile uint32_t * tail;
    148 
    149                 // number of entries and mask to go with it
    150                 const uint32_t * mask;
    151                 const uint32_t * num;
    152 
    153                 // number of cqes not submitted (whatever that means)
    154                 uint32_t * overflow;
    155 
    156                 // the kernel ring
    157                 struct io_uring_cqe * cqes;
    158 
    159                 // The location and size of the mmaped area
    160                 void * ring_ptr;
    161                 size_t ring_sz;
    162         };
    163 
    164         struct __io_data {
    165                 struct __submition_data submit_q;
    166                 struct __completion_data completion_q;
    167                 uint32_t ring_flags;
    168                 int cltr_flags;
    169                 int fd;
    170                 semaphore submit;
    171                 volatile bool done;
    172                 struct {
    173                         struct {
    174                                 __processor_id_t id;
    175                                 void * stack;
    176                                 pthread_t kthrd;
    177                                 volatile bool blocked;
    178                         } slow;
    179                         __io_poller_fast fast;
    180                         __bin_sem_t sem;
    181                 } poller;
    182         };
    183 
    184 //=============================================================================================
    185 // I/O Startup / Shutdown logic
    186 //=============================================================================================
    187         void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
    188                 if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) {
    189                         abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n");
    190                 }
    191 
    192                 this.io = malloc();
    193 
    194                 // Step 1 : call to setup
    195                 struct io_uring_params params;
    196                 memset(&params, 0, sizeof(params));
    197                 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS   ) params.flags |= IORING_SETUP_SQPOLL;
    198                 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES ) params.flags |= IORING_SETUP_IOPOLL;
    199 
    200                 uint32_t nentries = entries_per_cluster();
    201 
    202                 int fd = syscall(__NR_io_uring_setup, nentries, &params );
    203                 if(fd < 0) {
    204                         abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
    205                 }
    206 
    207                 // Step 2 : mmap result
    208                 memset( this.io, 0, sizeof(struct __io_data) );
    209                 struct __submition_data  & sq = this.io->submit_q;
    210                 struct __completion_data & cq = this.io->completion_q;
    211 
    212                 // calculate the right ring size
    213                 sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
    214                 cq.ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
    215 
    216                 // Requires features
    217                 #if defined(IORING_FEAT_SINGLE_MMAP)
    218                         // adjust the size according to the parameters
    219                         if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
    220                                 cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz);
    221                         }
    222                 #endif
    223 
    224                 // mmap the Submit Queue into existence
    225                 sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
    226                 if (sq.ring_ptr == (void*)MAP_FAILED) {
    227                         abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
    228                 }
    229 
    230                 // Requires features
    231                 #if defined(IORING_FEAT_SINGLE_MMAP)
    232                         // mmap the Completion Queue into existence (may or may not be needed)
    233                         if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
    234                                 cq.ring_ptr = sq.ring_ptr;
    235                         }
    236                         else
    237                 #endif
    238                 {
    239                         // We need multiple call to MMAP
    240                         cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
    241                         if (cq.ring_ptr == (void*)MAP_FAILED) {
    242                                 munmap(sq.ring_ptr, sq.ring_sz);
    243                                 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
    244                         }
    245                 }
    246 
    247                 // mmap the submit queue entries
    248                 size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
    249                 sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
    250                 if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
    251                         munmap(sq.ring_ptr, sq.ring_sz);
    252                         if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
    253                         abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
    254                 }
    255 
    256                 // Get the pointers from the kernel to fill the structure
    257                 // submit queue
    258                 sq.head    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
    259                 sq.tail    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
    260                 sq.mask    = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
    261                 sq.num     = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
    262                 sq.flags   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
    263                 sq.dropped = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
    264                 sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
    265                 sq.prev_head = *sq.head;
    266 
    267                 {
    268                         const uint32_t num = *sq.num;
    269                         for( i; num ) {
    270                                 sq.sqes[i].user_data = 0ul64;
    271                         }
    272                 }
    273 
    274                 (sq.lock){};
    275                 (sq.release_lock){};
    276 
    277                 if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) {
    278                         /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8)  );
    279                         sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
    280                         sq.ready = alloc_align( 64, sq.ready_cnt );
    281                         for(i; sq.ready_cnt) {
    282                                 sq.ready[i] = -1ul32;
    283                         }
    284                 }
    285                 else {
    286                         sq.ready_cnt = 0;
    287                         sq.ready = 0p;
    288                 }
    289 
    290                 // completion queue
    291                 cq.head     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
    292                 cq.tail     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
    293                 cq.mask     = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
    294                 cq.num      = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
    295                 cq.overflow = (         uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
    296                 cq.cqes   = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
    297 
    298                 // some paranoid checks
    299                 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
    300                 /* paranoid */ verifyf( (*cq.num)  >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
    301                 /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
    302                 /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
    303 
    304                 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
    305                 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
    306                 /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
    307                 /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
    308 
    309                 // Update the global ring info
    310                 this.io->ring_flags = params.flags;
    311                 this.io->cltr_flags = io_flags;
    312                 this.io->fd         = fd;
    313                 this.io->done       = false;
    314                 (this.io->submit){ min(*sq.num, *cq.num) };
    315 
    316                 if(!main_cluster) {
    317                         __kernel_io_finish_start( this );
    318                 }
    319         }
    320 
    321         void __kernel_io_finish_start( cluster & this ) {
    322                 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    323                         __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
    324                         (this.io->poller.fast){ this };
    325                         __thrd_start( this.io->poller.fast, main );
    326                 }
    327 
    328                 // Create the poller thread
    329                 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this);
    330                 this.io->poller.slow.blocked = false;
    331                 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
    332         }
    333 
    334         void __kernel_io_prepare_stop( cluster & this ) {
    335                 __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
    336                 // Notify the poller thread of the shutdown
    337                 __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
    338 
    339                 // Stop the IO Poller
    340                 sigval val = { 1 };
    341                 pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
    342                 post( this.io->poller.sem );
    343 
    344                 // Wait for the poller thread to finish
    345                 pthread_join( this.io->poller.slow.kthrd, 0p );
    346                 free( this.io->poller.slow.stack );
    347 
    348                 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
    349 
    350                 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    351                         with( this.io->poller.fast ) {
    352                                 /* paranoid */ verify( this.nprocessors == 0 || &this == mainCluster );
    353                                 /* paranoid */ verify( !ready_mutate_islocked() );
    354 
    355                                 // We need to adjust the clean-up based on where the thread is
    356                                 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
    357 
    358                                         ready_schedule_lock( (struct __processor_id_t *)active_processor() );
    359 
    360                                                 // This is the tricky case
    361                                                 // The thread was preempted and now it is on the ready queue
    362                                                 // The thread should be the last on the list
    363                                                 /* paranoid */ verify( thrd.link.next != 0p );
    364 
    365                                                 // Remove the thread from the ready queue of this cluster
    366                                                 __attribute__((unused)) bool removed = remove_head( &this, &thrd );
    367                                                 /* paranoid */ verify( removed );
    368                                                 thrd.link.next = 0p;
    369                                                 thrd.link.prev = 0p;
    370                                                 __cfaabi_dbg_debug_do( thrd.unpark_stale = true );
    371 
    372                                                 // Fixup the thread state
    373                                                 thrd.state = Blocked;
    374                                                 thrd.ticket = 0;
    375                                                 thrd.preempted = __NO_PREEMPTION;
    376 
    377                                         ready_schedule_unlock( (struct __processor_id_t *)active_processor() );
    378 
    379                                         // Pretend like the thread was blocked all along
    380                                 }
    381                                 // !!! This is not an else if !!!
    382                                 if( thrd.state == Blocked ) {
    383 
    384                                         // This is the "easy case"
    385                                         // The thread is parked and can easily be moved to active cluster
    386                                         verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
    387                                         thrd.curr_cluster = active_cluster();
    388 
    389                                         // unpark the fast io_poller
    390                                         unpark( &thrd __cfaabi_dbg_ctx2 );
    391                                 }
    392                                 else {
    393 
    394                                         // The thread is in a weird state
    395                                         // I don't know what to do here
    396                                         abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n");
    397                                 }
    398 
    399                         }
    400 
    401                         ^(this.io->poller.fast){};
    402 
    403                         __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
    404                 }
    405         }
    406 
    407         void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
    408                 if(!main_cluster) {
    409                         __kernel_io_prepare_stop( this );
    410                 }
    411 
    412                 // Shutdown the io rings
    413                 struct __submition_data  & sq = this.io->submit_q;
    414                 struct __completion_data & cq = this.io->completion_q;
    415 
    416                 // unmap the submit queue entries
    417                 munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
    418 
    419                 // unmap the Submit Queue ring
    420                 munmap(sq.ring_ptr, sq.ring_sz);
    421 
    422                 // unmap the Completion Queue ring, if it is different
    423                 if (cq.ring_ptr != sq.ring_ptr) {
    424                         munmap(cq.ring_ptr, cq.ring_sz);
    425                 }
    426 
    427                 // close the file descriptor
    428                 close(this.io->fd);
    429 
    430                 free( this.io->submit_q.ready ); // Maybe null, doesn't matter
    431                 free( this.io );
    432         }
    433 
    434         int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) {
     39        #include "stats.hfa"
     40        #include "kernel.hfa"
     41        #include "kernel/fwd.hfa"
     42        #include "io/types.hfa"
     43
     44//=============================================================================================
     45// I/O Syscall
     46//=============================================================================================
     47        static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) {
    43548                bool need_sys_to_submit = false;
    43649                bool need_sys_to_complete = false;
    437                 unsigned min_complete = 0;
    43850                unsigned flags = 0;
    439 
    44051
    44152                TO_SUBMIT:
     
    45162                }
    45263
    453                 TO_COMPLETE:
    45464                if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
    45565                        flags |= IORING_ENTER_GETEVENTS;
    456                         if( mask ) {
    457                                 need_sys_to_complete = true;
    458                                 min_complete = 1;
    459                                 break TO_COMPLETE;
    460                         }
    46166                        if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
    46267                                need_sys_to_complete = true;
     
    46671                int ret = 0;
    46772                if( need_sys_to_submit || need_sys_to_complete ) {
    468                         ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);
     73                        ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, 0p, _NSIG / 8);
    46974                        if( ret < 0 ) {
    47075                                switch((int)errno) {
     
    49095        static uint32_t __release_consumed_submission( struct __io_data & ring );
    49196
    492         static inline void process(struct io_uring_cqe & cqe, struct __processor_id_t * id ) {
     97        static inline void process(struct io_uring_cqe & cqe ) {
    49398                struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data;
    49499                __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
    495100
    496101                data->result = cqe.res;
    497                 if(!id) { unpark(     data->thrd __cfaabi_dbg_ctx2 ); }
    498                 else  { __unpark( id, data->thrd __cfaabi_dbg_ctx2 ); }
     102                unpark( data->thrd __cfaabi_dbg_ctx2 );
    499103        }
    500104
    501105        // Process a single completion message from the io_uring
    502106        // This is NOT thread-safe
    503         static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) {
     107        static [int, bool] __drain_io( & struct __io_data ring ) {
    504108                /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
    505109
    506110                unsigned to_submit = 0;
    507                 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
     111                if( ring.poller_submits ) {
    508112                        // If the poller thread also submits, then we need to aggregate the submissions which are ready
    509113                        to_submit = __collect_submitions( ring );
    510114                }
    511115
    512                 int ret = __io_uring_enter(ring, to_submit, true, mask);
     116                int ret = __io_uring_enter(ring, to_submit, true);
    513117                if( ret < 0 ) {
    514118                        return [0, true];
     
    547151                        /* paranoid */ verify(&cqe);
    548152
    549                         process( cqe, !mask ? (struct __processor_id_t *)0p : &ring.poller.slow.id );
    550                 }
    551 
    552                 // Allow new submissions to happen
    553                 // V(ring.submit, count);
     153                        process( cqe );
     154                }
    554155
    555156                // Mark to the kernel that the cqe has been seen
     
    561162        }
    562163
    563         static void * __io_poller_slow( void * arg ) {
    564                 #if !defined( __CFA_NO_STATISTICS__ )
    565                         __stats_t local_stats;
    566                         __init_stats( &local_stats );
    567                         kernelTLS.this_stats = &local_stats;
    568                 #endif
    569 
    570                 cluster * cltr = (cluster *)arg;
    571                 struct __io_data & ring = *cltr->io;
    572 
    573                 ring.poller.slow.id.id = doregister( &ring.poller.slow.id );
    574 
    575                 sigset_t mask;
    576                 sigfillset(&mask);
    577                 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
    578                         abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
    579                 }
    580 
    581                 sigdelset( &mask, SIGUSR1 );
    582 
    583                 verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
    584                 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
    585 
    586                 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
    587 
    588                 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    589                         while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    590 
    591                                 __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST );
    592 
    593                                 // In the user-thread approach drain and if anything was drained,
    594                                 // batton pass to the user-thread
    595                                 int count;
    596                                 bool again;
    597                                 [count, again] = __drain_io( ring, &mask );
    598 
    599                                 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
    600 
    601                                 // Update statistics
    602                                 __STATS__( true,
    603                                         io.complete_q.completed_avg.val += count;
    604                                         io.complete_q.completed_avg.slow_cnt += 1;
    605                                 )
    606 
    607                                 if(again) {
    608                                         __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
    609                                         __unpark( &ring.poller.slow.id, &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
    610                                         wait( ring.poller.sem );
    611                                 }
    612                         }
    613                 }
    614                 else {
    615                         while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    616                                 //In the naive approach, just poll the io completion queue directly
    617                                 int count;
    618                                 bool again;
    619                                 [count, again] = __drain_io( ring, &mask );
    620 
    621                                 // Update statistics
    622                                 __STATS__( true,
    623                                         io.complete_q.completed_avg.val += count;
    624                                         io.complete_q.completed_avg.slow_cnt += 1;
    625                                 )
    626                         }
    627                 }
    628 
    629                 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
    630 
    631                 unregister( &ring.poller.slow.id );
    632 
    633                 #if !defined(__CFA_NO_STATISTICS__)
    634                         __tally_stats(cltr->stats, &local_stats);
    635                 #endif
    636 
    637                 return 0p;
    638         }
    639 
    640         void main( __io_poller_fast & this ) {
    641                 verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD );
    642 
    643                 // Start parked
    644                 park( __cfaabi_dbg_ctx );
    645 
    646                 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
     164        void main( $io_ctx_thread & this ) {
     165                epoll_event ev;
     166                __ioctx_register( this, ev );
     167
     168                __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring);
    647169
    648170                int reset = 0;
    649 
    650171                // Then loop until we need to start
    651                 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
    652 
     172                while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
    653173                        // Drain the io
    654174                        int count;
    655175                        bool again;
    656176                        disable_interrupts();
    657                                 [count, again] = __drain_io( *this.ring, 0p );
     177                                [count, again] = __drain_io( *this.ring );
    658178
    659179                                if(!again) reset++;
     
    672192                        // We didn't get anything baton pass to the slow poller
    673193                        else {
    674                                 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
     194                                __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self);
    675195                                reset = 0;
    676196
    677                                 // wake up the slow poller
    678                                 post( this.ring->poller.sem );
    679 
    680                                 // park this thread
    681                                 park( __cfaabi_dbg_ctx );
     197                                // block this thread
     198                                __ioctx_prepare_block( this, ev );
     199                                wait( this.sem );
    682200                        }
    683201                }
    684202
    685203                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
    686         }
    687 
    688         static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
    689         static inline void __wake_poller( struct __io_data & ring ) {
    690                 if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;
    691 
    692                 sigval val = { 1 };
    693                 pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
    694204        }
    695205
     
    806316        }
    807317
    808         void __submit( struct __io_data & ring, uint32_t idx ) {
     318        void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1))) {
     319                __io_data & ring = *ctx->thrd.ring;
    809320                // Get now the data we definetely need
    810321                uint32_t * const tail = ring.submit_q.tail;
    811                 const uint32_t mask = *ring.submit_q.mask;
     322                const uint32_t mask  = *ring.submit_q.mask;
    812323
    813324                // There are 2 submission schemes, check which one we are using
    814                 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
     325                if( ring.poller_submits ) {
    815326                        // If the poller thread submits, then we just need to add this to the ready array
    816327                        __submit_to_ready_array( ring, idx, mask );
    817328
    818                         __wake_poller( ring );
     329                        post( ctx->thrd.sem );
    819330
    820331                        __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
    821332                }
    822                 else if( ring.cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS ) {
     333                else if( ring.eager_submits ) {
    823334                        uint32_t picked = __submit_to_ready_array( ring, idx, mask );
    824335
     
    849360                        // We got the lock
    850361                        unsigned to_submit = __collect_submitions( ring );
    851                         int ret = __io_uring_enter( ring, to_submit, false, 0p );
     362                        int ret = __io_uring_enter( ring, to_submit, false );
    852363                        if( ret < 0 ) {
    853364                                unlock(ring.submit_q.lock);
     
    892403
    893404                        // Submit however, many entries need to be submitted
    894                         int ret = __io_uring_enter( ring, 1, false, 0p );
     405                        int ret = __io_uring_enter( ring, 1, false );
    895406                        if( ret < 0 ) {
    896407                                switch((int)errno) {
     
    958469                return count;
    959470        }
    960 
    961 //=============================================================================================
    962 // I/O Submissions
    963 //=============================================================================================
    964 
    965         void register_fixed_files( cluster & cl, int * files, unsigned count ) {
    966                 int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );
    967                 if( ret < 0 ) {
    968                         abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
    969                 }
    970 
    971                 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
    972         }
    973471#endif
Note: See TracChangeset for help on using the changeset viewer.