Changeset 402658b1 for libcfa/src


Ignore:
Timestamp:
Jan 13, 2021, 10:23:07 PM (5 years ago)
Author:
Peter A. Buhr <pabuhr@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
9153e53
Parents:
bace538 (diff), a00bc5b (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Location:
libcfa/src/concurrency
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    rbace538 r402658b1  
    3131
    3232        extern "C" {
    33                 #include <sys/epoll.h>
    3433                #include <sys/syscall.h>
    3534
     
    4140        #include "kernel/fwd.hfa"
    4241        #include "io/types.hfa"
     42
     43        static const char * opcodes[] = {
     44                "OP_NOP",
     45                "OP_READV",
     46                "OP_WRITEV",
     47                "OP_FSYNC",
     48                "OP_READ_FIXED",
     49                "OP_WRITE_FIXED",
     50                "OP_POLL_ADD",
     51                "OP_POLL_REMOVE",
     52                "OP_SYNC_FILE_RANGE",
     53                "OP_SENDMSG",
     54                "OP_RECVMSG",
     55                "OP_TIMEOUT",
     56                "OP_TIMEOUT_REMOVE",
     57                "OP_ACCEPT",
     58                "OP_ASYNC_CANCEL",
     59                "OP_LINK_TIMEOUT",
     60                "OP_CONNECT",
     61                "OP_FALLOCATE",
     62                "OP_OPENAT",
     63                "OP_CLOSE",
     64                "OP_FILES_UPDATE",
     65                "OP_STATX",
     66                "OP_READ",
     67                "OP_WRITE",
     68                "OP_FADVISE",
     69                "OP_MADVISE",
     70                "OP_SEND",
     71                "OP_RECV",
     72                "OP_OPENAT2",
     73                "OP_EPOLL_CTL",
     74                "OP_SPLICE",
     75                "OP_PROVIDE_BUFFERS",
     76                "OP_REMOVE_BUFFERS",
     77                "OP_TEE",
     78                "INVALID_OP"
     79        };
    4380
    4481        // returns true of acquired as leader or second leader
     
    134171                int ret = 0;
    135172                if( need_sys_to_submit || need_sys_to_complete ) {
     173                        __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags);
    136174                        ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8);
    137175                        if( ret < 0 ) {
     
    157195        static unsigned __collect_submitions( struct __io_data & ring );
    158196        static __u32 __release_consumed_submission( struct __io_data & ring );
    159 
    160         static inline void process(struct io_uring_cqe & cqe ) {
     197        static inline void __clean( volatile struct io_uring_sqe * sqe );
     198
     199        // Process a single completion message from the io_uring
     200        // This is NOT thread-safe
     201        static inline void process( volatile struct io_uring_cqe & cqe ) {
    161202                struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
    162203                __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
     
    165206        }
    166207
    167         // Process a single completion message from the io_uring
    168         // This is NOT thread-safe
    169208        static [int, bool] __drain_io( & struct __io_data ring ) {
    170209                /* paranoid */ verify( ! __preemption_enabled() );
     
    192231                }
    193232
     233                __atomic_thread_fence( __ATOMIC_SEQ_CST );
     234
    194235                // Release the consumed SQEs
    195236                __release_consumed_submission( ring );
     
    209250                for(i; count) {
    210251                        unsigned idx = (head + i) & mask;
    211                         struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
     252                        volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
    212253
    213254                        /* paranoid */ verify(&cqe);
     
    218259                // Mark to the kernel that the cqe has been seen
    219260                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
    220                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
    221                 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
     261                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST );
    222262
    223263                return [count, count > 0 || to_submit > 0];
     
    225265
    226266        void main( $io_ctx_thread & this ) {
    227                 epoll_event ev;
    228                 __ioctx_register( this, ev );
    229 
    230                 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring);
    231 
    232                 int reset = 0;
     267                __ioctx_register( this );
     268
     269                __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this);
     270
     271                const int reset_cnt = 5;
     272                int reset = reset_cnt;
    233273                // Then loop until we need to start
     274                LOOP:
    234275                while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
    235276                        // Drain the io
     
    239280                                [count, again] = __drain_io( *this.ring );
    240281
    241                                 if(!again) reset++;
     282                                if(!again) reset--;
    242283
    243284                                // Update statistics
     
    249290
    250291                        // If we got something, just yield and check again
    251                         if(reset < 5) {
     292                        if(reset > 1) {
    252293                                yield();
    253                         }
    254                         // We didn't get anything baton pass to the slow poller
    255                         else {
     294                                continue LOOP;
     295                        }
     296
     297                        // We alread failed to find completed entries a few time.
     298                        if(reset == 1) {
     299                                // Rearm the context so it can block
     300                                // but don't block right away
     301                                // we need to retry one last time in case
     302                                // something completed *just now*
     303                                __ioctx_prepare_block( this );
     304                                continue LOOP;
     305                        }
     306
    256307                                __STATS__( false,
    257308                                        io.complete_q.blocks += 1;
    258309                                )
    259                                 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self);
    260                                 reset = 0;
     310                                __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this);
    261311
    262312                                // block this thread
    263                                 __ioctx_prepare_block( this, ev );
    264313                                wait( this.sem );
    265                         }
    266                 }
    267 
    268                 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
     314
     315                        // restore counter
     316                        reset = reset_cnt;
     317                }
     318
     319                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this);
    269320        }
    270321
     
    289340//
    290341
    291         [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) {
     342        // Allocate an submit queue entry.
     343        // The kernel cannot see these entries until they are submitted, but other threads must be
     344        // able to see which entries can be used and which are already un used by an other thread
     345        // for convenience, return both the index and the pointer to the sqe
     346        // sqe == &sqes[idx]
     347        [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) {
    292348                /* paranoid */ verify( data != 0 );
    293349
     
    304360                        // Look through the list starting at some offset
    305361                        for(i; cnt) {
    306                                 __u64 expected = 0;
    307                                 __u32 idx = (i + off) & mask;
    308                                 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
     362                                __u64 expected = 3;
     363                                __u32 idx = (i + off) & mask; // Get an index from a random
     364                                volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
    309365                                volatile __u64 * udata = &sqe->user_data;
    310366
     367                                // Allocate the entry by CASing the user_data field from 0 to the future address
    311368                                if( *udata == expected &&
    312369                                        __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
     
    319376                                        )
    320377
     378                                        // debug log
     379                                        __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data );
    321380
    322381                                        // Success return the data
     
    325384                                verify(expected != data);
    326385
     386                                // This one was used
    327387                                len ++;
    328388                        }
    329389
    330390                        block++;
     391
     392                        abort( "Kernel I/O : all submit queue entries used, yielding\n" );
     393
    331394                        yield();
    332395                }
     
    377440        void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) {
    378441                __io_data & ring = *ctx->thrd.ring;
     442
     443                {
     444                        __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
     445                        __cfadbg_print_safe( io,
     446                                "Kernel I/O : submitting %u (%p) for %p\n"
     447                                "    data: %p\n"
     448                                "    opcode: %s\n"
     449                                "    fd: %d\n"
     450                                "    flags: %d\n"
     451                                "    prio: %d\n"
     452                                "    off: %p\n"
     453                                "    addr: %p\n"
     454                                "    len: %d\n"
     455                                "    other flags: %d\n"
     456                                "    splice fd: %d\n"
     457                                "    pad[0]: %llu\n"
     458                                "    pad[1]: %llu\n"
     459                                "    pad[2]: %llu\n",
     460                                idx, sqe,
     461                                active_thread(),
     462                                (void*)sqe->user_data,
     463                                opcodes[sqe->opcode],
     464                                sqe->fd,
     465                                sqe->flags,
     466                                sqe->ioprio,
     467                                sqe->off,
     468                                sqe->addr,
     469                                sqe->len,
     470                                sqe->accept_flags,
     471                                sqe->splice_fd_in,
     472                                sqe->__pad2[0],
     473                                sqe->__pad2[1],
     474                                sqe->__pad2[2]
     475                        );
     476                }
     477
     478
    379479                // Get now the data we definetely need
    380480                volatile __u32 * const tail = ring.submit_q.tail;
     
    443543                                unlock(ring.submit_q.submit_lock);
    444544                        #endif
    445                         if( ret < 0 ) return;
     545                        if( ret < 0 ) {
     546                                return;
     547                        }
    446548
    447549                        // Release the consumed SQEs
     
    454556                                io.submit_q.submit_avg.cnt += 1;
    455557                        )
    456                 }
    457                 else {
     558
     559                        __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() );
     560                }
     561                else
     562                {
    458563                        // get mutual exclusion
    459564                        #if defined(LEADER_LOCK)
     
    463568                        #endif
    464569
    465                         /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
     570                        /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64,
    466571                        /* paranoid */  "index %u already reclaimed\n"
    467572                        /* paranoid */  "head %u, prev %u, tail %u\n"
     
    490595                        }
    491596
     597                        /* paranoid */ verify(ret == 1);
     598
    492599                        // update statistics
    493600                        __STATS__( false,
     
    496603                        )
    497604
     605                        {
     606                                __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head;
     607                                __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ];
     608                                __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx];
     609
     610                                __cfadbg_print_safe( io,
     611                                        "Kernel I/O : last submitted is %u (%p)\n"
     612                                        "    data: %p\n"
     613                                        "    opcode: %s\n"
     614                                        "    fd: %d\n"
     615                                        "    flags: %d\n"
     616                                        "    prio: %d\n"
     617                                        "    off: %p\n"
     618                                        "    addr: %p\n"
     619                                        "    len: %d\n"
     620                                        "    other flags: %d\n"
     621                                        "    splice fd: %d\n"
     622                                        "    pad[0]: %llu\n"
     623                                        "    pad[1]: %llu\n"
     624                                        "    pad[2]: %llu\n",
     625                                        last_idx, sqe,
     626                                        (void*)sqe->user_data,
     627                                        opcodes[sqe->opcode],
     628                                        sqe->fd,
     629                                        sqe->flags,
     630                                        sqe->ioprio,
     631                                        sqe->off,
     632                                        sqe->addr,
     633                                        sqe->len,
     634                                        sqe->accept_flags,
     635                                        sqe->splice_fd_in,
     636                                        sqe->__pad2[0],
     637                                        sqe->__pad2[1],
     638                                        sqe->__pad2[2]
     639                                );
     640                        }
     641
     642                        __atomic_thread_fence( __ATOMIC_SEQ_CST );
    498643                        // Release the consumed SQEs
    499644                        __release_consumed_submission( ring );
     645                        // ring.submit_q.sqes[idx].user_data = 3ul64;
    500646
    501647                        #if defined(LEADER_LOCK)
     
    505651                        #endif
    506652
    507                         __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
     653                        __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() );
    508654                }
    509655        }
    510656
    511657        // #define PARTIAL_SUBMIT 32
     658
     659        // go through the list of submissions in the ready array and moved them into
     660        // the ring's submit queue
    512661        static unsigned __collect_submitions( struct __io_data & ring ) {
    513662                /* paranoid */ verify( ring.submit_q.ready != 0p );
     
    550699        }
    551700
     701        // Go through the ring's submit queue and release everything that has already been consumed
     702        // by io_uring
    552703        static __u32 __release_consumed_submission( struct __io_data & ring ) {
    553704                const __u32 smask = *ring.submit_q.mask;
    554705
     706                // We need to get the lock to copy the old head and new head
    555707                if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
    556                 __u32 chead = *ring.submit_q.head;
    557                 __u32 phead = ring.submit_q.prev_head;
    558                 ring.submit_q.prev_head = chead;
     708                __attribute__((unused))
     709                __u32 ctail = *ring.submit_q.tail;        // get the current tail of the queue
     710                __u32 chead = *ring.submit_q.head;              // get the current head of the queue
     711                __u32 phead = ring.submit_q.prev_head;  // get the head the last time we were here
     712                ring.submit_q.prev_head = chead;                // note up to were we processed
    559713                unlock(ring.submit_q.release_lock);
    560714
     715                // the 3 fields are organized like this diagram
     716                // except it's are ring
     717                // ---+--------+--------+----
     718                // ---+--------+--------+----
     719                //    ^        ^        ^
     720                // phead    chead    ctail
     721
     722                // make sure ctail doesn't wrap around and reach phead
     723                /* paranoid */ verify(
     724                           (ctail >= chead && chead >= phead)
     725                        || (chead >= phead && phead >= ctail)
     726                        || (phead >= ctail && ctail >= chead)
     727                );
     728
     729                // find the range we need to clear
    561730                __u32 count = chead - phead;
     731
     732                // We acquired an previous-head/current-head range
     733                // go through the range and release the sqes
    562734                for( i; count ) {
    563735                        __u32 idx = ring.submit_q.array[ (phead + i) & smask ];
    564                         ring.submit_q.sqes[ idx ].user_data = 0;
     736
     737                        /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data );
     738                        __clean( &ring.submit_q.sqes[ idx ] );
    565739                }
    566740                return count;
    567741        }
     742
     743        void __sqe_clean( volatile struct io_uring_sqe * sqe ) {
     744                __clean( sqe );
     745        }
     746
     747        static inline void __clean( volatile struct io_uring_sqe * sqe ) {
     748                // If we are in debug mode, thrash the fields to make sure we catch reclamation errors
     749                __cfaabi_dbg_debug_do(
     750                        memset(sqe, 0xde, sizeof(*sqe));
     751                        sqe->opcode = IORING_OP_LAST;
     752                );
     753
     754                // Mark the entry as unused
     755                __atomic_store_n(&sqe->user_data, 3ul64, __ATOMIC_SEQ_CST);
     756        }
    568757#endif
  • libcfa/src/concurrency/io/call.cfa.in

    rbace538 r402658b1  
    7474        ;
    7575
    76         extern [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data );
     76        extern [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data );
    7777        extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1)));
    7878
     
    222222                __u32 idx;
    223223                struct io_uring_sqe * sqe;
    224                 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
    225 
    226                 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
     224                [(volatile struct io_uring_sqe *) sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
     225
    227226                sqe->opcode = IORING_OP_{op};
    228                 sqe->flags = sflags;{body}
     227                sqe->flags = sflags;
     228                sqe->ioprio = 0;
     229                sqe->fd = 0;
     230                sqe->off = 0;
     231                sqe->addr = 0;
     232                sqe->len = 0;
     233                sqe->accept_flags = 0;
     234                sqe->__pad2[0] = 0;
     235                sqe->__pad2[1] = 0;
     236                sqe->__pad2[2] = 0;{body}
     237
     238                asm volatile("": : :"memory");
    229239
    230240                verify( sqe->user_data == (__u64)(uintptr_t)&future );
     
    312322        }),
    313323        # CFA_HAVE_IORING_OP_ACCEPT
    314         Call('ACCEPT4', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', {
     324        Call('ACCEPT', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', {
    315325                'fd': 'sockfd',
    316                 'addr': 'addr',
    317                 'addr2': 'addrlen',
     326                'addr': '(__u64)addr',
     327                'addr2': '(__u64)addrlen',
    318328                'accept_flags': 'flags'
    319329        }),
     
    464474
    465475print("""
     476//-----------------------------------------------------------------------------
     477bool cancel(io_cancellation & this) {
     478        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL)
     479                return false;
     480        #else
     481                io_future_t future;
     482
     483                io_context * context = __get_io_context();
     484
     485                __u8 sflags = 0;
     486                struct __io_data & ring = *context->thrd.ring;
     487
     488                __u32 idx;
     489                volatile struct io_uring_sqe * sqe;
     490                [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
     491
     492                sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
     493                sqe->opcode = IORING_OP_ASYNC_CANCEL;
     494                sqe->flags = sflags;
     495                sqe->addr = this.target;
     496
     497                verify( sqe->user_data == (__u64)(uintptr_t)&future );
     498                __submit( context, idx );
     499
     500                wait(future);
     501
     502                if( future.result == 0 ) return true; // Entry found
     503                if( future.result == -EALREADY) return true; // Entry found but in progress
     504                if( future.result == -ENOENT ) return false; // Entry not found
     505                return false;
     506        #endif
     507}
     508
    466509//-----------------------------------------------------------------------------
    467510// Check if a function is has asynchronous
  • libcfa/src/concurrency/io/setup.cfa

    rbace538 r402658b1  
    5252                #include <pthread.h>
    5353                #include <sys/epoll.h>
     54                #include <sys/eventfd.h>
    5455                #include <sys/mman.h>
    5556                #include <sys/syscall.h>
     
    169170                // Main loop
    170171                while( iopoll.run ) {
     172                        __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");
     173
    171174                        // Wait for events
    172175                        int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
     176
     177                        __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);
    173178
    174179                        // Check if an error occured
     
    181186                                $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64;
    182187                                /* paranoid */ verify( io_ctx );
    183                                 __cfadbg_print_safe(io_core, "Kernel I/O : Unparking io poller %p\n", io_ctx);
     188                                __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->ring->fd, io_ctx);
    184189                                #if !defined( __CFA_NO_STATISTICS__ )
    185190                                        __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;
    186191                                #endif
     192
     193                                eventfd_t v;
     194                                eventfd_read(io_ctx->ring->efd, &v);
     195
    187196                                post( io_ctx->sem );
    188197                        }
     
    233242                $thread & thrd = this.thrd.self;
    234243                if( cluster_context ) {
     244                        // We are about to do weird things with the threads
     245                        // we don't need interrupts to complicate everything
     246                        disable_interrupts();
     247
     248                        // Get cluster info
    235249                        cluster & cltr = *thrd.curr_cluster;
    236250                        /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster );
     
    239253                        // We need to adjust the clean-up based on where the thread is
    240254                        if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
     255                                // This is the tricky case
     256                                // The thread was preempted or ready to run and now it is on the ready queue
     257                                // but the cluster is shutting down, so there aren't any processors to run the ready queue
     258                                // the solution is to steal the thread from the ready-queue and pretend it was blocked all along
    241259
    242260                                ready_schedule_lock();
    243 
    244                                         // This is the tricky case
    245                                         // The thread was preempted and now it is on the ready queue
     261                                        // The thread should on the list
     262                                        /* paranoid */ verify( thrd.link.next != 0p );
     263
     264                                        // Remove the thread from the ready queue of this cluster
    246265                                        // The thread should be the last on the list
    247                                         /* paranoid */ verify( thrd.link.next != 0p );
    248 
    249                                         // Remove the thread from the ready queue of this cluster
    250266                                        __attribute__((unused)) bool removed = remove_head( &cltr, &thrd );
    251267                                        /* paranoid */ verify( removed );
     
    263279                        }
    264280                        // !!! This is not an else if !!!
     281                        // Ok, now the thread is blocked (whether we cheated to get here or not)
    265282                        if( thrd.state == Blocked ) {
    266 
    267283                                // This is the "easy case"
    268284                                // The thread is parked and can easily be moved to active cluster
     
    274290                        }
    275291                        else {
    276 
    277292                                // The thread is in a weird state
    278293                                // I don't know what to do here
    279294                                abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n");
    280295                        }
     296
     297                        // The weird thread kidnapping stuff is over, restore interrupts.
     298                        enable_interrupts( __cfaabi_dbg_ctx );
    281299                } else {
    282300                        post( this.thrd.sem );
     
    365383                }
    366384
     385                // Step 3 : Initialize the data structure
    367386                // Get the pointers from the kernel to fill the structure
    368387                // submit queue
     
    379398                        const __u32 num = *sq.num;
    380399                        for( i; num ) {
    381                                 sq.sqes[i].user_data = 0ul64;
     400                                sq.sqes[i].opcode = IORING_OP_LAST;
     401                                sq.sqes[i].user_data = 3ul64;
    382402                        }
    383403                }
     
    409429                cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
    410430
     431                // Step 4 : eventfd
     432                int efd;
     433                for() {
     434                        efd = eventfd(0, 0);
     435                        if (efd < 0) {
     436                                if (errno == EINTR) continue;
     437                                abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno));
     438                        }
     439                        break;
     440                }
     441
     442                int ret;
     443                for() {
     444                        ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1);
     445                        if (ret < 0) {
     446                                if (errno == EINTR) continue;
     447                                abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno));
     448                        }
     449                        break;
     450                }
     451
    411452                // some paranoid checks
    412453                /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
     
    423464                this.ring_flags = params.flags;
    424465                this.fd         = fd;
     466                this.efd        = efd;
    425467                this.eager_submits  = params_in.eager_submits;
    426468                this.poller_submits = params_in.poller_submits;
     
    445487                // close the file descriptor
    446488                close(this.fd);
     489                close(this.efd);
    447490
    448491                free( this.submit_q.ready ); // Maybe null, doesn't matter
     
    452495// I/O Context Sleep
    453496//=============================================================================================
    454 
    455         void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev) {
    456                 ev.events = EPOLLIN | EPOLLONESHOT;
     497        #define IOEVENTS EPOLLIN | EPOLLONESHOT
     498
     499        static inline void __ioctx_epoll_ctl($io_ctx_thread & ctx, int op, const char * error) {
     500                struct epoll_event ev;
     501                ev.events = IOEVENTS;
    457502                ev.data.u64 = (__u64)&ctx;
    458                 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, ctx.ring->fd, &ev);
     503                int ret = epoll_ctl(iopoll.epollfd, op, ctx.ring->efd, &ev);
    459504                if (ret < 0) {
    460                         abort( "KERNEL ERROR: EPOLL ADD - (%d) %s\n", (int)errno, strerror(errno) );
    461                 }
    462         }
    463 
    464         void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev) {
    465                 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, ctx.ring->fd, &ev);
    466                 if (ret < 0) {
    467                         abort( "KERNEL ERROR: EPOLL REARM - (%d) %s\n", (int)errno, strerror(errno) );
    468                 }
     505                        abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
     506                }
     507        }
     508
     509        void __ioctx_register($io_ctx_thread & ctx) {
     510                __ioctx_epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
     511        }
     512
     513        void __ioctx_prepare_block($io_ctx_thread & ctx) {
     514                __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.ring->fd, &ctx);
     515                __ioctx_epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
    469516        }
    470517
  • libcfa/src/concurrency/io/types.hfa

    rbace538 r402658b1  
    6565
    6666                // A buffer of sqes (not the actual ring)
    67                 struct io_uring_sqe * sqes;
     67                volatile struct io_uring_sqe * sqes;
    6868
    6969                // The location and size of the mmaped area
     
    8585
    8686                // the kernel ring
    87                 struct io_uring_cqe * cqes;
     87                volatile struct io_uring_cqe * cqes;
    8888
    8989                // The location and size of the mmaped area
     
    9797                __u32 ring_flags;
    9898                int fd;
     99                int efd;
    99100                bool eager_submits:1;
    100101                bool poller_submits:1;
     
    130131        #endif
    131132
    132         struct epoll_event;
    133133        struct $io_ctx_thread;
    134         void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev);
    135         void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev);
     134        void __ioctx_register($io_ctx_thread & ctx);
     135        void __ioctx_prepare_block($io_ctx_thread & ctx);
     136        void __sqe_clean( volatile struct io_uring_sqe * sqe );
    136137#endif
    137138
Note: See TracChangeset for help on using the changeset viewer.