Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    rec19b21 rfe9468e2  
    3131
    3232        extern "C" {
     33                #include <sys/epoll.h>
    3334                #include <sys/syscall.h>
    3435
     
    4041        #include "kernel/fwd.hfa"
    4142        #include "io/types.hfa"
    42 
    43         static const char * opcodes[] = {
    44                 "OP_NOP",
    45                 "OP_READV",
    46                 "OP_WRITEV",
    47                 "OP_FSYNC",
    48                 "OP_READ_FIXED",
    49                 "OP_WRITE_FIXED",
    50                 "OP_POLL_ADD",
    51                 "OP_POLL_REMOVE",
    52                 "OP_SYNC_FILE_RANGE",
    53                 "OP_SENDMSG",
    54                 "OP_RECVMSG",
    55                 "OP_TIMEOUT",
    56                 "OP_TIMEOUT_REMOVE",
    57                 "OP_ACCEPT",
    58                 "OP_ASYNC_CANCEL",
    59                 "OP_LINK_TIMEOUT",
    60                 "OP_CONNECT",
    61                 "OP_FALLOCATE",
    62                 "OP_OPENAT",
    63                 "OP_CLOSE",
    64                 "OP_FILES_UPDATE",
    65                 "OP_STATX",
    66                 "OP_READ",
    67                 "OP_WRITE",
    68                 "OP_FADVISE",
    69                 "OP_MADVISE",
    70                 "OP_SEND",
    71                 "OP_RECV",
    72                 "OP_OPENAT2",
    73                 "OP_EPOLL_CTL",
    74                 "OP_SPLICE",
    75                 "OP_PROVIDE_BUFFERS",
    76                 "OP_REMOVE_BUFFERS",
    77                 "OP_TEE",
    78                 "INVALID_OP"
    79         };
    8043
    8144        // returns true of acquired as leader or second leader
     
    171134                int ret = 0;
    172135                if( need_sys_to_submit || need_sys_to_complete ) {
    173                         __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags);
    174136                        ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8);
    175137                        if( ret < 0 ) {
     
    195157        static unsigned __collect_submitions( struct __io_data & ring );
    196158        static __u32 __release_consumed_submission( struct __io_data & ring );
    197         static inline void __clean( volatile struct io_uring_sqe * sqe );
     159
     160        static inline void process(struct io_uring_cqe & cqe ) {
     161                struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
     162                __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
     163
     164                fulfil( *future, cqe.res );
     165        }
    198166
    199167        // Process a single completion message from the io_uring
    200168        // This is NOT thread-safe
    201         static inline void process( volatile struct io_uring_cqe & cqe ) {
    202                 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
    203                 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
    204 
    205                 fulfil( *future, cqe.res );
    206         }
    207 
    208169        static [int, bool] __drain_io( & struct __io_data ring ) {
    209170                /* paranoid */ verify( ! __preemption_enabled() );
     
    231192                }
    232193
    233                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
    234 
    235194                // Release the consumed SQEs
    236195                __release_consumed_submission( ring );
     
    250209                for(i; count) {
    251210                        unsigned idx = (head + i) & mask;
    252                         volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
     211                        struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
    253212
    254213                        /* paranoid */ verify(&cqe);
     
    259218                // Mark to the kernel that the cqe has been seen
    260219                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
    261                 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST );
     220                __atomic_thread_fence( __ATOMIC_SEQ_CST );
     221                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
    262222
    263223                return [count, count > 0 || to_submit > 0];
     
    265225
    266226        void main( $io_ctx_thread & this ) {
    267                 __ioctx_register( this );
    268 
    269                 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this);
    270 
    271                 const int reset_cnt = 5;
    272                 int reset = reset_cnt;
     227                epoll_event ev;
     228                __ioctx_register( this, ev );
     229
     230                __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring);
     231
     232                int reset = 0;
    273233                // Then loop until we need to start
    274                 LOOP:
    275234                while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
    276235                        // Drain the io
     
    280239                                [count, again] = __drain_io( *this.ring );
    281240
    282                                 if(!again) reset--;
     241                                if(!again) reset++;
    283242
    284243                                // Update statistics
     
    290249
    291250                        // If we got something, just yield and check again
    292                         if(reset > 1) {
     251                        if(reset < 5) {
    293252                                yield();
    294                                 continue LOOP;
    295                         }
    296 
    297                         // We alread failed to find completed entries a few time.
    298                         if(reset == 1) {
    299                                 // Rearm the context so it can block
    300                                 // but don't block right away
    301                                 // we need to retry one last time in case
    302                                 // something completed *just now*
    303                                 __ioctx_prepare_block( this );
    304                                 continue LOOP;
    305                         }
    306 
     253                        }
     254                        // We didn't get anything baton pass to the slow poller
     255                        else {
    307256                                __STATS__( false,
    308257                                        io.complete_q.blocks += 1;
    309258                                )
    310                                 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this);
     259                                __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self);
     260                                reset = 0;
    311261
    312262                                // block this thread
     263                                __ioctx_prepare_block( this, ev );
    313264                                wait( this.sem );
    314 
    315                         // restore counter
    316                         reset = reset_cnt;
    317                 }
    318 
    319                 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this);
     265                        }
     266                }
     267
     268                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
    320269        }
    321270
     
    340289//
    341290
    342         // Allocate an submit queue entry.
    343         // The kernel cannot see these entries until they are submitted, but other threads must be
    344         // able to see which entries can be used and which are already un used by an other thread
    345         // for convenience, return both the index and the pointer to the sqe
    346         // sqe == &sqes[idx]
    347         [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) {
     291        [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) {
    348292                /* paranoid */ verify( data != 0 );
    349293
     
    360304                        // Look through the list starting at some offset
    361305                        for(i; cnt) {
    362                                 __u64 expected = 3;
    363                                 __u32 idx = (i + off) & mask; // Get an index from a random
    364                                 volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
     306                                __u64 expected = 0;
     307                                __u32 idx = (i + off) & mask;
     308                                struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
    365309                                volatile __u64 * udata = &sqe->user_data;
    366310
    367                                 // Allocate the entry by CASing the user_data field from 0 to the future address
    368311                                if( *udata == expected &&
    369312                                        __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
     
    376319                                        )
    377320
    378                                         // debug log
    379                                         __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data );
    380321
    381322                                        // Success return the data
     
    384325                                verify(expected != data);
    385326
    386                                 // This one was used
    387327                                len ++;
    388328                        }
    389329
    390330                        block++;
    391 
    392                         abort( "Kernel I/O : all submit queue entries used, yielding\n" );
    393 
    394331                        yield();
    395332                }
     
    440377        void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) {
    441378                __io_data & ring = *ctx->thrd.ring;
    442 
    443                 {
    444                         __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
    445                         __cfadbg_print_safe( io,
    446                                 "Kernel I/O : submitting %u (%p) for %p\n"
    447                                 "    data: %p\n"
    448                                 "    opcode: %s\n"
    449                                 "    fd: %d\n"
    450                                 "    flags: %d\n"
    451                                 "    prio: %d\n"
    452                                 "    off: %p\n"
    453                                 "    addr: %p\n"
    454                                 "    len: %d\n"
    455                                 "    other flags: %d\n"
    456                                 "    splice fd: %d\n"
    457                                 "    pad[0]: %llu\n"
    458                                 "    pad[1]: %llu\n"
    459                                 "    pad[2]: %llu\n",
    460                                 idx, sqe,
    461                                 active_thread(),
    462                                 (void*)sqe->user_data,
    463                                 opcodes[sqe->opcode],
    464                                 sqe->fd,
    465                                 sqe->flags,
    466                                 sqe->ioprio,
    467                                 sqe->off,
    468                                 sqe->addr,
    469                                 sqe->len,
    470                                 sqe->accept_flags,
    471                                 sqe->splice_fd_in,
    472                                 sqe->__pad2[0],
    473                                 sqe->__pad2[1],
    474                                 sqe->__pad2[2]
    475                         );
    476                 }
    477 
    478 
    479379                // Get now the data we definetely need
    480380                volatile __u32 * const tail = ring.submit_q.tail;
     
    543443                                unlock(ring.submit_q.submit_lock);
    544444                        #endif
    545                         if( ret < 0 ) {
    546                                 return;
    547                         }
     445                        if( ret < 0 ) return;
    548446
    549447                        // Release the consumed SQEs
     
    556454                                io.submit_q.submit_avg.cnt += 1;
    557455                        )
    558 
    559                         __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() );
    560                 }
    561                 else
    562                 {
     456                }
     457                else {
    563458                        // get mutual exclusion
    564459                        #if defined(LEADER_LOCK)
     
    568463                        #endif
    569464
    570                         /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64,
     465                        /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
    571466                        /* paranoid */  "index %u already reclaimed\n"
    572467                        /* paranoid */  "head %u, prev %u, tail %u\n"
     
    595490                        }
    596491
    597                         /* paranoid */ verify(ret == 1);
    598 
    599492                        // update statistics
    600493                        __STATS__( false,
     
    603496                        )
    604497
    605                         {
    606                                 __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head;
    607                                 __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ];
    608                                 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx];
    609 
    610                                 __cfadbg_print_safe( io,
    611                                         "Kernel I/O : last submitted is %u (%p)\n"
    612                                         "    data: %p\n"
    613                                         "    opcode: %s\n"
    614                                         "    fd: %d\n"
    615                                         "    flags: %d\n"
    616                                         "    prio: %d\n"
    617                                         "    off: %p\n"
    618                                         "    addr: %p\n"
    619                                         "    len: %d\n"
    620                                         "    other flags: %d\n"
    621                                         "    splice fd: %d\n"
    622                                         "    pad[0]: %llu\n"
    623                                         "    pad[1]: %llu\n"
    624                                         "    pad[2]: %llu\n",
    625                                         last_idx, sqe,
    626                                         (void*)sqe->user_data,
    627                                         opcodes[sqe->opcode],
    628                                         sqe->fd,
    629                                         sqe->flags,
    630                                         sqe->ioprio,
    631                                         sqe->off,
    632                                         sqe->addr,
    633                                         sqe->len,
    634                                         sqe->accept_flags,
    635                                         sqe->splice_fd_in,
    636                                         sqe->__pad2[0],
    637                                         sqe->__pad2[1],
    638                                         sqe->__pad2[2]
    639                                 );
    640                         }
    641 
    642                         __atomic_thread_fence( __ATOMIC_SEQ_CST );
    643498                        // Release the consumed SQEs
    644499                        __release_consumed_submission( ring );
    645                         // ring.submit_q.sqes[idx].user_data = 3ul64;
    646500
    647501                        #if defined(LEADER_LOCK)
     
    651505                        #endif
    652506
    653                         __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() );
     507                        __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
    654508                }
    655509        }
    656510
    657511        // #define PARTIAL_SUBMIT 32
    658 
    659         // go through the list of submissions in the ready array and moved them into
    660         // the ring's submit queue
    661512        static unsigned __collect_submitions( struct __io_data & ring ) {
    662513                /* paranoid */ verify( ring.submit_q.ready != 0p );
     
    699550        }
    700551
    701         // Go through the ring's submit queue and release everything that has already been consumed
    702         // by io_uring
    703552        static __u32 __release_consumed_submission( struct __io_data & ring ) {
    704553                const __u32 smask = *ring.submit_q.mask;
    705554
    706                 // We need to get the lock to copy the old head and new head
    707555                if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
    708                 __attribute__((unused))
    709                 __u32 ctail = *ring.submit_q.tail;        // get the current tail of the queue
    710                 __u32 chead = *ring.submit_q.head;              // get the current head of the queue
    711                 __u32 phead = ring.submit_q.prev_head;  // get the head the last time we were here
    712                 ring.submit_q.prev_head = chead;                // note up to were we processed
     556                __u32 chead = *ring.submit_q.head;
     557                __u32 phead = ring.submit_q.prev_head;
     558                ring.submit_q.prev_head = chead;
    713559                unlock(ring.submit_q.release_lock);
    714560
    715                 // the 3 fields are organized like this diagram
    716                 // except it's are ring
    717                 // ---+--------+--------+----
    718                 // ---+--------+--------+----
    719                 //    ^        ^        ^
    720                 // phead    chead    ctail
    721 
    722                 // make sure ctail doesn't wrap around and reach phead
    723                 /* paranoid */ verify(
    724                            (ctail >= chead && chead >= phead)
    725                         || (chead >= phead && phead >= ctail)
    726                         || (phead >= ctail && ctail >= chead)
    727                 );
    728 
    729                 // find the range we need to clear
    730561                __u32 count = chead - phead;
    731 
    732                 // We acquired an previous-head/current-head range
    733                 // go through the range and release the sqes
    734562                for( i; count ) {
    735563                        __u32 idx = ring.submit_q.array[ (phead + i) & smask ];
    736 
    737                         /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data );
    738                         __clean( &ring.submit_q.sqes[ idx ] );
     564                        ring.submit_q.sqes[ idx ].user_data = 0;
    739565                }
    740566                return count;
    741567        }
    742 
    743         void __sqe_clean( volatile struct io_uring_sqe * sqe ) {
    744                 __clean( sqe );
    745         }
    746 
    747         static inline void __clean( volatile struct io_uring_sqe * sqe ) {
    748                 // If we are in debug mode, thrash the fields to make sure we catch reclamation errors
    749                 __cfaabi_dbg_debug_do(
    750                         memset(sqe, 0xde, sizeof(*sqe));
    751                         sqe->opcode = (sizeof(opcodes) / sizeof(const char *)) - 1;
    752                 );
    753 
    754                 // Mark the entry as unused
    755                 __atomic_store_n(&sqe->user_data, 3ul64, __ATOMIC_SEQ_CST);
    756         }
    757568#endif
Note: See TracChangeset for help on using the changeset viewer.