Changeset 426f60c


Ignore:
Timestamp:
Jan 12, 2021, 12:34:08 PM (8 months ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
arm-eh, jacob/cs343-translation, master, new-ast-unique-expr
Children:
58f99b3
Parents:
77fde9d
Message:

Web server seems to work

Location:
libcfa/src/concurrency
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    r77fde9d r426f60c  
    3232        extern "C" {
    3333                #include <sys/epoll.h>
     34                #include <sys/eventfd.h>
    3435                #include <sys/syscall.h>
    3536
     
    4142        #include "kernel/fwd.hfa"
    4243        #include "io/types.hfa"
     44
     45        static const char * opcodes[] = {
     46                "OP_NOP",
     47                "OP_READV",
     48                "OP_WRITEV",
     49                "OP_FSYNC",
     50                "OP_READ_FIXED",
     51                "OP_WRITE_FIXED",
     52                "OP_POLL_ADD",
     53                "OP_POLL_REMOVE",
     54                "OP_SYNC_FILE_RANGE",
     55                "OP_SENDMSG",
     56                "OP_RECVMSG",
     57                "OP_TIMEOUT",
     58                "OP_TIMEOUT_REMOVE",
     59                "OP_ACCEPT",
     60                "OP_ASYNC_CANCEL",
     61                "OP_LINK_TIMEOUT",
     62                "OP_CONNECT",
     63                "OP_FALLOCATE",
     64                "OP_OPENAT",
     65                "OP_CLOSE",
     66                "OP_FILES_UPDATE",
     67                "OP_STATX",
     68                "OP_READ",
     69                "OP_WRITE",
     70                "OP_FADVISE",
     71                "OP_MADVISE",
     72                "OP_SEND",
     73                "OP_RECV",
     74                "OP_OPENAT2",
     75                "OP_EPOLL_CTL",
     76                "OP_SPLICE",
     77                "OP_PROVIDE_BUFFERS",
     78                "OP_REMOVE_BUFFERS",
     79                "OP_TEE",
     80                "INVALID_OP"
     81        };
    4382
    4483        // returns true of acquired as leader or second leader
     
    159198        static __u32 __release_consumed_submission( struct __io_data & ring );
    160199
    161         static inline void process(struct io_uring_cqe & cqe ) {
     200        // Process a single completion message from the io_uring
     201        // This is NOT thread-safe
     202        static inline void process( volatile struct io_uring_cqe & cqe ) {
    162203                struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
    163204                __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
     
    166207        }
    167208
    168         // Process a single completion message from the io_uring
    169         // This is NOT thread-safe
    170209        static [int, bool] __drain_io( & struct __io_data ring ) {
    171210                /* paranoid */ verify( ! __preemption_enabled() );
     
    193232                }
    194233
     234                __atomic_thread_fence( __ATOMIC_SEQ_CST );
     235
    195236                // Release the consumed SQEs
    196237                __release_consumed_submission( ring );
     
    210251                for(i; count) {
    211252                        unsigned idx = (head + i) & mask;
    212                         struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
     253                        volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
    213254
    214255                        /* paranoid */ verify(&cqe);
     
    219260                // Mark to the kernel that the cqe has been seen
    220261                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
    221                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
    222                 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
     262                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST );
    223263
    224264                return [count, count > 0 || to_submit > 0];
     
    229269                __ioctx_register( this, ev );
    230270
    231                 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring);
     271                __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this);
    232272
    233273                const int reset_cnt = 5;
     
    257297                        }
    258298
    259                         // We alread failed to find events a few time.
     299                        // We alread failed to find completed entries a few time.
    260300                        if(reset == 1) {
    261301                                // Rearm the context so it can block
     
    270310                                        io.complete_q.blocks += 1;
    271311                                )
    272                                 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self);
     312                                __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this);
    273313
    274314                                // block this thread
    275315                                wait( this.sem );
    276316
     317                                eventfd_t v;
     318                                eventfd_read(this.ring->efd, &v);
     319
    277320                        // restore counter
    278321                        reset = reset_cnt;
    279322                }
    280323
    281                 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
     324                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this);
    282325        }
    283326
     
    302345//
    303346
    304         [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) {
     347        // Allocate an submit queue entry.
     348        // The kernel cannot see these entries until they are submitted, but other threads must be
     349        // able to see which entries can be used and which are already un used by an other thread
     350        // for convenience, return both the index and the pointer to the sqe
     351        // sqe == &sqes[idx]
     352        [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) {
    305353                /* paranoid */ verify( data != 0 );
    306354
     
    317365                        // Look through the list starting at some offset
    318366                        for(i; cnt) {
    319                                 __u64 expected = 0;
    320                                 __u32 idx = (i + off) & mask;
    321                                 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
     367                                __u64 expected = 3;
     368                                __u32 idx = (i + off) & mask; // Get an index from a random
     369                                volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
    322370                                volatile __u64 * udata = &sqe->user_data;
    323371
     372                                // Allocate the entry by CASing the user_data field from 0 to the future address
    324373                                if( *udata == expected &&
    325374                                        __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
     
    332381                                        )
    333382
     383                                        // debug log
    334384                                        __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data );
    335385
    336386                                        // Success return the data
     387                                        sqe->opcode = 0;
     388                                        sqe->flags = 0;
     389                                        sqe->ioprio = 0;
     390                                        sqe->fd = 0;
     391                                        sqe->off = 0;
     392                                        sqe->addr = 0;
     393                                        sqe->len = 0;
     394                                        sqe->accept_flags = 0;
     395                                        sqe->__pad2[0] = 0;
     396                                        sqe->__pad2[1] = 0;
     397                                        sqe->__pad2[2] = 0;
    337398                                        return [sqe, idx];
    338399                                }
    339400                                verify(expected != data);
    340401
     402                                // This one was used
    341403                                len ++;
    342404                        }
    343405
    344406                        block++;
     407
     408                        abort( "Kernel I/O : all submit queue entries used, yielding\n" );
     409
    345410                        yield();
    346411                }
     
    390455
    391456        void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) {
    392                 __cfadbg_print_safe( io, "Kernel I/O : submitting %u for %p\n", idx, active_thread() );
    393 
    394457                __io_data & ring = *ctx->thrd.ring;
     458
     459                {
     460                        __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
     461                        __cfadbg_print_safe( io,
     462                                "Kernel I/O : submitting %u (%p) for %p\n"
     463                                "    data: %p\n"
     464                                "    opcode: %s\n"
     465                                "    fd: %d\n"
     466                                "    flags: %d\n"
     467                                "    prio: %d\n"
     468                                "    off: %p\n"
     469                                "    addr: %p\n"
     470                                "    len: %d\n"
     471                                "    other flags: %d\n"
     472                                "    splice fd: %d\n"
     473                                "    pad[0]: %llu\n"
     474                                "    pad[1]: %llu\n"
     475                                "    pad[2]: %llu\n",
     476                                idx, sqe,
     477                                active_thread(),
     478                                (void*)sqe->user_data,
     479                                opcodes[sqe->opcode],
     480                                sqe->fd,
     481                                sqe->flags,
     482                                sqe->ioprio,
     483                                sqe->off,
     484                                sqe->addr,
     485                                sqe->len,
     486                                sqe->accept_flags,
     487                                sqe->splice_fd_in,
     488                                sqe->__pad2[0],
     489                                sqe->__pad2[1],
     490                                sqe->__pad2[2]
     491                        );
     492                }
     493
     494
    395495                // Get now the data we definetely need
    396496                volatile __u32 * const tail = ring.submit_q.tail;
     
    475575                        __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() );
    476576                }
    477                 else {
     577                else
     578                {
    478579                        // get mutual exclusion
    479580                        #if defined(LEADER_LOCK)
     
    483584                        #endif
    484585
    485                         /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
     586                        /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64,
    486587                        /* paranoid */  "index %u already reclaimed\n"
    487588                        /* paranoid */  "head %u, prev %u, tail %u\n"
     
    510611                        }
    511612
     613                        /* paranoid */ verify(ret == 1);
     614
    512615                        // update statistics
    513616                        __STATS__( false,
     
    516619                        )
    517620
     621                        {
     622                                __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head;
     623                                __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ];
     624                                __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx];
     625
     626                                __cfadbg_print_safe( io,
     627                                        "Kernel I/O : last submitted is %u (%p)\n"
     628                                        "    data: %p\n"
     629                                        "    opcode: %s\n"
     630                                        "    fd: %d\n"
     631                                        "    flags: %d\n"
     632                                        "    prio: %d\n"
     633                                        "    off: %p\n"
     634                                        "    addr: %p\n"
     635                                        "    len: %d\n"
     636                                        "    other flags: %d\n"
     637                                        "    splice fd: %d\n"
     638                                        "    pad[0]: %llu\n"
     639                                        "    pad[1]: %llu\n"
     640                                        "    pad[2]: %llu\n",
     641                                        last_idx, sqe,
     642                                        (void*)sqe->user_data,
     643                                        opcodes[sqe->opcode],
     644                                        sqe->fd,
     645                                        sqe->flags,
     646                                        sqe->ioprio,
     647                                        sqe->off,
     648                                        sqe->addr,
     649                                        sqe->len,
     650                                        sqe->accept_flags,
     651                                        sqe->splice_fd_in,
     652                                        sqe->__pad2[0],
     653                                        sqe->__pad2[1],
     654                                        sqe->__pad2[2]
     655                                );
     656                        }
     657
     658                        __atomic_thread_fence( __ATOMIC_SEQ_CST );
    518659                        // Release the consumed SQEs
    519660                        __release_consumed_submission( ring );
     661                        // ring.submit_q.sqes[idx].user_data = 3ul64;
    520662
    521663                        #if defined(LEADER_LOCK)
     
    525667                        #endif
    526668
    527                         __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
     669                        __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() );
    528670                }
    529671        }
    530672
    531673        // #define PARTIAL_SUBMIT 32
     674
     675        // go through the list of submissions in the ready array and moved them into
     676        // the ring's submit queue
    532677        static unsigned __collect_submitions( struct __io_data & ring ) {
    533678                /* paranoid */ verify( ring.submit_q.ready != 0p );
     
    570715        }
    571716
     717        // Go through the ring's submit queue and release everything that has already been consumed
     718        // by io_uring
    572719        static __u32 __release_consumed_submission( struct __io_data & ring ) {
    573720                const __u32 smask = *ring.submit_q.mask;
    574721
     722                // We need to get the lock to copy the old head and new head
    575723                if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
    576                 __u32 chead = *ring.submit_q.head;
    577                 __u32 phead = ring.submit_q.prev_head;
    578                 ring.submit_q.prev_head = chead;
     724                __attribute__((unused))
     725                __u32 ctail = *ring.submit_q.tail;        // get the current tail of the queue
     726                __u32 chead = *ring.submit_q.head;              // get the current head of the queue
     727                __u32 phead = ring.submit_q.prev_head;  // get the head the last time we were here
     728                ring.submit_q.prev_head = chead;                // note up to were we processed
    579729                unlock(ring.submit_q.release_lock);
    580730
     731                // the 3 fields are organized like this diagram
     732                // except it's are ring
     733                // ---+--------+--------+----
     734                // ---+--------+--------+----
     735                //    ^        ^        ^
     736                // phead    chead    ctail
     737
     738                // make sure ctail doesn't wrap around and reach phead
     739                /* paranoid */ verify(
     740                           (ctail >= chead && chead >= phead)
     741                        || (chead >= phead && phead >= ctail)
     742                        || (phead >= ctail && ctail >= chead)
     743                );
     744
     745                // find the range we need to clear
    581746                __u32 count = chead - phead;
     747
     748                // We acquired an previous-head/current-head range
     749                // go through the range and release the sqes
    582750                for( i; count ) {
    583751                        __u32 idx = ring.submit_q.array[ (phead + i) & smask ];
    584                         ring.submit_q.sqes[ idx ].user_data = 0;
     752
     753                        /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data );
     754                        ring.submit_q.sqes[ idx ].user_data = 3ul64;
    585755                }
    586756                return count;
  • libcfa/src/concurrency/io/call.cfa.in

    r77fde9d r426f60c  
    7474        ;
    7575
    76         extern [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data );
     76        extern [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data );
    7777        extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1)));
    7878
     
    221221
    222222                __u32 idx;
    223                 struct io_uring_sqe * sqe;
     223                volatile struct io_uring_sqe * sqe;
    224224                [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
    225225
     
    314314        Call('ACCEPT', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', {
    315315                'fd': 'sockfd',
     316                'ioprio': '0',
    316317                'addr': '(__u64)addr',
    317318                'addr2': '(__u64)addrlen',
     
    373374        Call('READ', 'ssize_t read(int fd, void * buf, size_t count)', {
    374375                'fd': 'fd',
     376                'off': '0',
    375377                'addr': '(__u64)buf',
    376378                'len': 'count'
     
    379381        Call('WRITE', 'ssize_t write(int fd, void * buf, size_t count)', {
    380382                'fd': 'fd',
     383                'off': '0',
    381384                'addr': '(__u64)buf',
    382385                'len': 'count'
     
    477480
    478481                __u32 idx;
    479                 struct io_uring_sqe * sqe;
     482                volatile struct io_uring_sqe * sqe;
    480483                [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
    481484
  • libcfa/src/concurrency/io/setup.cfa

    r77fde9d r426f60c  
    5252                #include <pthread.h>
    5353                #include <sys/epoll.h>
     54                #include <sys/eventfd.h>
    5455                #include <sys/mman.h>
    5556                #include <sys/syscall.h>
     
    185186                                $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64;
    186187                                /* paranoid */ verify( io_ctx );
    187                                 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %p\n", io_ctx);
     188                                __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->ring->fd, io_ctx);
    188189                                #if !defined( __CFA_NO_STATISTICS__ )
    189190                                        __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;
     
    309310        }
    310311
     312        extern void signal_unblock( int sig );
     313        extern void signal_block  ( int sig );
     314
    311315        static void __io_create( __io_data & this, const io_context_params & params_in ) {
    312316                // Step 1 : call to setup
     
    377381                        abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
    378382                }
    379 
     383                memset(sq.sqes, 0xde, size);
     384
     385                verify( 0 != (params.features & IORING_FEAT_NODROP) );
     386
     387                // Step 3 : Initialize the data structure
    380388                // Get the pointers from the kernel to fill the structure
    381389                // submit queue
     
    392400                        const __u32 num = *sq.num;
    393401                        for( i; num ) {
    394                                 sq.sqes[i].user_data = 0ul64;
     402                                sq.sqes[i].opcode = IORING_OP_LAST;
     403                                sq.sqes[i].user_data = 3ul64;
    395404                        }
    396405                }
     
    422431                cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
    423432
     433                signal_block( SIGUSR1 );
     434
     435                // Step 4 : eventfd
     436                int efd = eventfd(0, 0);
     437                if (efd < 0) {
     438                        abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno));
     439                }
     440
     441                int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1);
     442                if (ret < 0) {
     443                        abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno));
     444                }
     445
     446                signal_unblock( SIGUSR1 );
     447
    424448                // some paranoid checks
    425449                /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
     
    436460                this.ring_flags = params.flags;
    437461                this.fd         = fd;
     462                this.efd        = efd;
    438463                this.eager_submits  = params_in.eager_submits;
    439464                this.poller_submits = params_in.poller_submits;
     
    458483                // close the file descriptor
    459484                close(this.fd);
     485                close(this.efd);
    460486
    461487                free( this.submit_q.ready ); // Maybe null, doesn't matter
     
    467493
    468494        void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev) {
    469                 ev.events = EPOLLIN | EPOLLONESHOT;
     495                ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    470496                ev.data.u64 = (__u64)&ctx;
    471                 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, ctx.ring->fd, &ev);
     497                int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, ctx.ring->efd, &ev);
    472498                if (ret < 0) {
    473499                        abort( "KERNEL ERROR: EPOLL ADD - (%d) %s\n", (int)errno, strerror(errno) );
     
    476502
    477503        void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev) {
    478                 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %p\n", &ctx);
    479                 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, ctx.ring->fd, &ev);
     504                __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.ring->fd, &ctx);
     505                int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, ctx.ring->efd, &ev);
    480506                if (ret < 0) {
    481507                        abort( "KERNEL ERROR: EPOLL REARM - (%d) %s\n", (int)errno, strerror(errno) );
  • libcfa/src/concurrency/io/types.hfa

    r77fde9d r426f60c  
    6565
    6666                // A buffer of sqes (not the actual ring)
    67                 struct io_uring_sqe * sqes;
     67                volatile struct io_uring_sqe * sqes;
    6868
    6969                // The location and size of the mmaped area
     
    8585
    8686                // the kernel ring
    87                 struct io_uring_cqe * cqes;
     87                volatile struct io_uring_cqe * cqes;
    8888
    8989                // The location and size of the mmaped area
     
    9797                __u32 ring_flags;
    9898                int fd;
     99                int efd;
    99100                bool eager_submits:1;
    100101                bool poller_submits:1;
Note: See TracChangeset for help on using the changeset viewer.