Changeset dddb3dd0


Ignore:
Timestamp:
Mar 2, 2021, 1:58:12 PM (9 months ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
arm-eh, jacob/cs343-translation, master, new-ast-unique-expr
Children:
2cd784a
Parents:
6047b00
Message:

Changed io to use ring per kernel threads.

Location:
libcfa/src
Files:
11 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/bits/defs.hfa

    r6047b00 rdddb3dd0  
    7474        #error unsupported architecture
    7575#endif
     76
     77#define CFA_IO_LAZY (1_l64u << 32_l64u)
  • libcfa/src/concurrency/io.cfa

    r6047b00 rdddb3dd0  
    3232        extern "C" {
    3333                #include <sys/syscall.h>
     34                #include <sys/eventfd.h>
    3435
    3536                #include <linux/io_uring.h>
     
    7980        };
    8081
    81 //=============================================================================================
    82 // I/O Syscall
    83 //=============================================================================================
    84         static int __io_uring_enter( struct $io_context & ctx, unsigned to_submit, bool get ) {
    85                 __STATS__( false, io.calls.count++; )
    86                 bool need_sys_to_submit = false;
    87                 bool need_sys_to_complete = false;
    88                 unsigned flags = 0;
    89 
    90                 TO_SUBMIT:
    91                 if( to_submit > 0 ) {
    92                         if( !(ctx.ring_flags & IORING_SETUP_SQPOLL) ) {
    93                                 need_sys_to_submit = true;
    94                                 break TO_SUBMIT;
    95                         }
    96                         if( (*ctx.sq.flags) & IORING_SQ_NEED_WAKEUP ) {
    97                                 need_sys_to_submit = true;
    98                                 flags |= IORING_ENTER_SQ_WAKEUP;
    99                         }
    100                 }
    101 
    102                 if( get && !(ctx.ring_flags & IORING_SETUP_SQPOLL) ) {
    103                         flags |= IORING_ENTER_GETEVENTS;
    104                         if( (ctx.ring_flags & IORING_SETUP_IOPOLL) ) {
    105                                 need_sys_to_complete = true;
    106                         }
    107                 }
    108 
    109                 int ret = 0;
    110                 if( need_sys_to_submit || need_sys_to_complete ) {
    111                         __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ctx.fd, to_submit, flags);
    112                         __STATS__( false, io.calls.blocks++; )
    113                         ret = syscall( __NR_io_uring_enter, ctx.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8);
    114                         __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING %d returned %d\n", ctx.fd, ret);
    115                 }
    116 
    117                 // Memory barrier
    118                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
    119                 return ret;
    120         }
    121 
     82        static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor *, __u32 idxs[], __u32 want );
     83        static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * , __u32 idxs[], __u32 have, bool lazy );
     84        static void __ioarbiter_flush ( $io_arbiter & mutex this, $io_context * );
     85        static inline void __ioarbiter_notify( $io_context & ctx );
    12286//=============================================================================================
    12387// I/O Polling
     
    12690        static inline __u32 __release_sqes( struct $io_context & );
    12791
    128         static bool __drain_io( struct  $io_context & ctx ) {
    129                 unsigned to_submit = __flush( ctx );
    130                 int ret = __io_uring_enter( ctx, to_submit, true );
     92        void __cfa_io_drain( processor * proc ) {
     93                /* paranoid */ verify( ! __preemption_enabled() );
     94                /* paranoid */ verify( proc );
     95                /* paranoid */ verify( proc->io.ctx );
     96
     97                // Drain the queue
     98                $io_context * ctx = proc->io.ctx;
     99                unsigned head = *ctx->cq.head;
     100                unsigned tail = *ctx->cq.tail;
     101                const __u32 mask = *ctx->cq.mask;
     102
     103                __u32 count = tail - head;
     104                __STATS__( false, io.calls.drain++; io.calls.completed += count; )
     105
     106                for(i; count) {
     107                        unsigned idx = (head + i) & mask;
     108                        volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx];
     109
     110                        /* paranoid */ verify(&cqe);
     111
     112                        struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
     113                        __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
     114
     115                        fulfil( *future, cqe.res );
     116                }
     117
     118                __cfadbg_print_safe(io, "Kernel I/O : %u completed\n", count);
     119
     120                // Mark to the kernel that the cqe has been seen
     121                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
     122                __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST );
     123
     124                /* paranoid */ verify( ! __preemption_enabled() );
     125
     126                return;
     127        }
     128
     129        void __cfa_io_flush( processor * proc ) {
     130                /* paranoid */ verify( ! __preemption_enabled() );
     131                /* paranoid */ verify( proc );
     132                /* paranoid */ verify( proc->io.ctx );
     133
     134                $io_context & ctx = *proc->io.ctx;
     135
     136                if(!ctx.ext_sq.empty) {
     137                        __ioarbiter_flush( *ctx.arbiter, &ctx );
     138                }
     139
     140                __STATS__( true, io.calls.flush++; )
     141                int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, 0, 0, (sigset_t *)0p, _NSIG / 8);
    131142                if( ret < 0 ) {
    132143                        switch((int)errno) {
     
    136147                                // Update statistics
    137148                                __STATS__( false, io.calls.errors.busy ++; )
    138                                 return true;
    139                                 break;
     149                                return;
    140150                        default:
    141151                                abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
     
    143153                }
    144154
    145                 // update statistics
    146                 if (to_submit > 0) {
    147                         __STATS__( false, io.calls.submitted += ret; )
    148                         /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
    149 
    150                         /* paranoid */ verify( ctx.sq.to_submit >= ret );
    151                         ctx.sq.to_submit -= ret;
    152 
    153                         /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
    154 
    155                         if(ret) {
    156                                 __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring\n", ret);
    157                         }
    158                 }
     155                __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd);
     156                __STATS__( true, io.calls.submitted += ret; )
     157                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     158                /* paranoid */ verify( ctx.sq.to_submit >= ret );
     159
     160                ctx.sq.to_submit -= ret;
     161
     162                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
    159163
    160164                // Release the consumed SQEs
    161165                __release_sqes( ctx );
    162166
    163                 // Drain the queue
    164                 unsigned head = *ctx.cq.head;
    165                 unsigned tail = *ctx.cq.tail;
    166                 const __u32 mask = *ctx.cq.mask;
    167 
    168                 // Nothing was new return 0
    169                 if (head == tail) {
    170                         return ctx.sq.to_submit > 0;
    171                 }
    172 
    173                 __u32 count = tail - head;
    174                 /* paranoid */ verify( count != 0 );
    175                 __STATS__( false, io.calls.completed += count; )
    176 
    177                 for(i; count) {
    178                         unsigned idx = (head + i) & mask;
    179                         volatile struct io_uring_cqe & cqe = ctx.cq.cqes[idx];
    180 
    181                         /* paranoid */ verify(&cqe);
    182 
    183                         struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
    184                         __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
    185 
    186                         fulfil( *future, cqe.res );
    187                 }
    188 
    189                 if(count) {
    190                         __cfadbg_print_safe(io, "Kernel I/O : %u completed\n", count);
    191                 }
    192 
    193                 // Mark to the kernel that the cqe has been seen
    194                 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
    195                 __atomic_store_n( ctx.cq.head, head + count, __ATOMIC_SEQ_CST );
    196 
    197                 return count > 0 || to_submit > 0;
    198         }
    199 
    200         void main( $io_context & this ) {
    201                 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.fd, &this);
    202 
    203                 const int reset_cnt = 5;
    204                 int reset = reset_cnt;
    205                 // Then loop until we need to start
    206                 LOOP:
    207                 while() {
    208                         waitfor( ^?{} : this) {
    209                                 break LOOP;
    210                         }
    211                         or else {}
    212 
    213                         // Drain the io
    214                         bool again = __drain_io( this );
    215 
    216                         if(!again) reset--;
    217 
    218                         // If we got something, just yield and check again
    219                         if(reset > 1) {
    220                                 yield();
    221                                 continue LOOP;
    222                         }
    223 
    224                         // We alread failed to find completed entries a few time.
    225                         if(reset == 1) {
    226                                 // Rearm the context so it can block
    227                                 // but don't block right away
    228                                 // we need to retry one last time in case
    229                                 // something completed *just now*
    230                                 __ioctx_prepare_block( this );
    231                                 continue LOOP;
    232                         }
    233 
    234                         __STATS__( false,
    235                                 io.poller.sleeps += 1;
    236                         )
    237                         __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.fd, &this);
    238 
    239                         // block this thread
    240                         wait( this.sem );
    241 
    242                         // restore counter
    243                         reset = reset_cnt;
    244                 }
    245 
    246                 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.fd, &this);
     167                /* paranoid */ verify( ! __preemption_enabled() );
     168
     169                ctx.proc->io.pending = false;
    247170        }
    248171
     
    266189//         head and tail must be fully filled and shouldn't ever be touched again.
    267190//
    268 
    269         static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor *, __u32 idxs[], __u32 want );
    270         static void __ioarbiter_submit  ( $io_arbiter & mutex this, $io_context * , __u32 idxs[], __u32 have );
    271         static void __ioarbiter_flush   ( $io_arbiter & mutex this, $io_context * );
    272         static inline void __ioarbiter_notify( $io_context & ctx );
    273 
    274191        //=============================================================================================
    275192        // Allocation
     
    278195                struct io_uring_sqe * sqes = ctx->sq.sqes;
    279196                for(i; want) {
     197                        __cfadbg_print_safe(io, "Kernel I/O : filling loop\n");
    280198                        out_sqes[i] = &sqes[idxs[i]];
    281199                }
     
    295213                // copy all the indexes we want from the available list
    296214                for(i; want) {
     215                        __cfadbg_print_safe(io, "Kernel I/O : allocating loop\n");
    297216                        idxs[i] = sq.free_ring.array[(fhead + i) & mask];
    298217                }
     
    315234                disable_interrupts();
    316235                processor * proc = __cfaabi_tls.this_processor;
     236                $io_context * ctx = proc->io.ctx;
    317237                /* paranoid */ verify( __cfaabi_tls.this_processor );
    318                 /* paranoid */ verify( proc->io.lock == false );
    319 
    320                 __atomic_store_n( &proc->io.lock, true, __ATOMIC_SEQ_CST );
    321                 $io_context * ctx = proc->io.ctx;
     238                /* paranoid */ verify( ctx );
     239
     240                __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n");
     241
     242                // We can proceed to the fast path
     243                if( __alloc(ctx, idxs, want) ) {
     244                        // Allocation was successful
     245                        __STATS__( true, io.alloc.fast += 1; )
     246                        enable_interrupts( __cfaabi_dbg_ctx );
     247
     248                        __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful from ring %d\n", ctx->fd);
     249
     250                        __fill( sqes, want, idxs, ctx );
     251                        return ctx;
     252                }
     253                // The fast path failed, fallback
     254                __STATS__( true, io.alloc.fail += 1; )
     255
     256                // Fast path failed, fallback on arbitration
     257                __STATS__( true, io.alloc.slow += 1; )
     258                enable_interrupts( __cfaabi_dbg_ctx );
     259
    322260                $io_arbiter * ioarb = proc->cltr->io.arbiter;
    323261                /* paranoid */ verify( ioarb );
    324262
    325                 // Can we proceed to the fast path
    326                 if(  ctx                                // We alreay have an instance?
    327                 &&  !ctx->revoked )             // Our instance is still valid?
    328                 {
    329                         __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n");
    330 
    331                         // We can proceed to the fast path
    332                         if( __alloc(ctx, idxs, want) ) {
    333                                 // Allocation was successful
    334                                 // Mark the instance as no longer in-use and re-enable interrupts
    335                                 __atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE );
    336                                 __STATS__( true, io.alloc.fast += 1; )
    337                                 enable_interrupts( __cfaabi_dbg_ctx );
    338 
    339                                 __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful\n");
    340 
    341                                 __fill( sqes, want, idxs, ctx );
    342                                 return ctx;
    343                         }
    344                         // The fast path failed, fallback
    345                         __STATS__( true, io.alloc.fail += 1; )
    346                 }
    347 
    348                 // Fast path failed, fallback on arbitration
    349                 __atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE );
    350                 __STATS__( true, io.alloc.slow += 1; )
    351                 enable_interrupts( __cfaabi_dbg_ctx );
    352 
    353263                __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n");
    354264
    355265                struct $io_context * ret = __ioarbiter_allocate(*ioarb, proc, idxs, want);
    356266
    357                 __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed\n");
     267                __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd);
    358268
    359269                __fill( sqes, want, idxs,ret );
     
    364274        //=============================================================================================
    365275        // submission
    366         static inline void __submit( struct $io_context * ctx, __u32 idxs[], __u32 have) {
     276        static inline void __submit( struct $io_context * ctx, __u32 idxs[], __u32 have, bool lazy) {
    367277                // We can proceed to the fast path
    368278                // Get the right objects
    369279                __sub_ring_t & sq = ctx->sq;
    370280                const __u32 mask  = *sq.mask;
    371                 __u32 tail = sq.kring.ready;
     281                __u32 tail = *sq.kring.tail;
    372282
    373283                // Add the sqes to the array
    374284                for( i; have ) {
     285                        __cfadbg_print_safe(io, "Kernel I/O : __submit loop\n");
    375286                        sq.kring.array[ (tail + i) & mask ] = idxs[i];
    376287                }
    377288
    378289                // Make the sqes visible to the submitter
    379                 __atomic_store_n(&sq.kring.ready, tail + have, __ATOMIC_RELEASE);
    380 
    381                 // Make sure the poller is awake
    382                 __cfadbg_print_safe(io, "Kernel I/O : waking the poller\n");
    383                 post( ctx->sem );
    384         }
    385 
    386         void cfa_io_submit( struct $io_context * inctx, __u32 idxs[], __u32 have ) __attribute__((nonnull (1))) {
    387                 __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u\n", have);
     290                __atomic_store_n(sq.kring.tail, tail + have, __ATOMIC_RELEASE);
     291                sq.to_submit++;
     292
     293                ctx->proc->io.pending = true;
     294                ctx->proc->io.dirty   = true;
     295                if(sq.to_submit > 30 || !lazy) {
     296                        __cfa_io_flush( ctx->proc );
     297                }
     298        }
     299
     300        void cfa_io_submit( struct $io_context * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) {
     301                __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager");
    388302
    389303                disable_interrupts();
    390304                processor * proc = __cfaabi_tls.this_processor;
     305                $io_context * ctx = proc->io.ctx;
    391306                /* paranoid */ verify( __cfaabi_tls.this_processor );
    392                 /* paranoid */ verify( proc->io.lock == false );
    393 
    394                 __atomic_store_n( &proc->io.lock, true, __ATOMIC_SEQ_CST );
    395                 $io_context * ctx = proc->io.ctx;
     307                /* paranoid */ verify( ctx );
    396308
    397309                // Can we proceed to the fast path
    398                 if(  ctx                                // We alreay have an instance?
    399                 &&  !ctx->revoked               // Our instance is still valid?
    400                 &&   ctx == inctx )             // We have the right instance?
     310                if( ctx == inctx )              // We have the right instance?
    401311                {
    402                         __submit(ctx, idxs, have);
     312                        __submit(ctx, idxs, have, lazy);
    403313
    404314                        // Mark the instance as no longer in-use, re-enable interrupts and return
    405                         __atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE );
    406315                        __STATS__( true, io.submit.fast += 1; )
    407316                        enable_interrupts( __cfaabi_dbg_ctx );
     
    412321
    413322                // Fast path failed, fallback on arbitration
    414                 __atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE );
    415323                __STATS__( true, io.submit.slow += 1; )
    416324                enable_interrupts( __cfaabi_dbg_ctx );
     
    418326                __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n");
    419327
    420                 __ioarbiter_submit(*inctx->arbiter, inctx, idxs, have);
     328                __ioarbiter_submit(*inctx->arbiter, inctx, idxs, have, lazy);
    421329        }
    422330
    423331        //=============================================================================================
    424332        // Flushing
    425         static unsigned __flush( struct $io_context & ctx ) {
    426                 // First check for external
    427                 if( !__atomic_load_n(&ctx.ext_sq.empty, __ATOMIC_SEQ_CST) ) {
    428                         // We have external submissions, delegate to the arbiter
    429                         __ioarbiter_flush( *ctx.arbiter, &ctx );
    430                 }
    431 
    432                 __u32 tail  = *ctx.sq.kring.tail;
    433                 __u32 ready = ctx.sq.kring.ready;
    434 
    435                 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
    436                 ctx.sq.to_submit += (ready - tail);
    437                 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
    438 
    439                 if(ctx.sq.to_submit) {
    440                         __cfadbg_print_safe(io, "Kernel I/O : %u ready to submit\n", ctx.sq.to_submit);
    441                 }
    442 
    443                 __atomic_store_n(ctx.sq.kring.tail, ready, __ATOMIC_RELEASE);
    444 
    445                 return ctx.sq.to_submit;
    446         }
    447 
    448 
    449333        // Go through the ring's submit queue and release everything that has already been consumed
    450334        // by io_uring
     
    484368                // go through the range and release the sqes
    485369                for( i; count ) {
     370                        __cfadbg_print_safe(io, "Kernel I/O : release loop\n");
    486371                        __u32 idx = ctx.sq.kring.array[ (phead + i) & mask ];
    487372                        ctx.sq.free_ring.array[ (ftail + i) & mask ] = idx;
     
    499384// I/O Arbiter
    500385//=============================================================================================
    501         static inline void __revoke( $io_arbiter & this, $io_context * ctx ) {
    502                 if(ctx->revoked) return;
    503 
    504                 /* paranoid */ verify( ctx->proc );
    505                 remove( this.assigned, *ctx );
    506 
    507                 // Mark as revoked
    508                 __atomic_store_n(&ctx->revoked, true, __ATOMIC_SEQ_CST);
    509 
    510                 // Wait for the processor to no longer use it
    511                 while(ctx->proc->io.lock) Pause();
    512 
    513                 // Remove the coupling with the processor
    514                 ctx->proc->io.ctx = 0p;
    515                 ctx->proc = 0p;
    516 
    517                 // add to available contexts
    518                 addHead( this.available, *ctx );
    519         }
    520 
    521         static inline void __assign( $io_arbiter & this, $io_context * ctx, processor * proc ) {
    522                 remove( this.available, *ctx );
    523 
    524                 ctx->revoked = false;
    525                 ctx->proc = proc;
    526                 __atomic_store_n(&proc->io.ctx, ctx, __ATOMIC_SEQ_CST);
    527 
    528                 // add to assigned contexts
    529                 addTail( this.assigned, *ctx );
    530         }
    531 
    532386        static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor * proc, __u32 idxs[], __u32 want ) {
    533387                __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n");
    534 
    535                 SeqIter($io_context) iter;
    536                 $io_context & ci;
    537                 // Do we already have something available?
    538                 for( over( iter, this.available ); iter | ci;) {
    539                         __cfadbg_print_safe(io, "Kernel I/O : attempting available context\n");
    540 
    541                         $io_context * c = &ci;
    542                         if(__alloc(c, idxs, want)) {
    543                                 __assign( this, c, proc);
    544                                 return c;
    545                         }
    546                 }
    547 
    548 
    549                 // Otherwise, we have no choice but to revoke everyone to check if other instance have available data
    550                 for( over( iter, this.assigned ); iter | ci; ) {
    551                         __cfadbg_print_safe(io, "Kernel I/O : revoking context for allocation\n");
    552 
    553                         $io_context * c = &ci;
    554                         __revoke( this, c );
    555 
    556                         __STATS__( false, io.alloc.revoke += 1; )
    557 
    558                         if(__alloc(c, idxs, want)) {
    559                                 __assign( this, c, proc);
    560                                 return c;
    561                         }
    562                 }
    563 
    564                 __cfadbg_print_safe(io, "Kernel I/O : waiting for available resources\n");
    565388
    566389                __STATS__( false, io.alloc.block += 1; )
     
    577400                /* paranoid */ verify( ret );
    578401
    579                 __assign( this, this.pending.ctx, proc);
    580402                return this.pending.ctx;
     403
    581404        }
    582405
     
    586409
    587410                while( !is_empty(this.pending.blocked) ) {
     411                        __cfadbg_print_safe(io, "Kernel I/O : notifying\n");
    588412                        __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head;
    589413                        __u32 want = front( this.pending.blocked );
     
    604428
    605429        // Simply append to the pending
    606         static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * ctx, __u32 idxs[], __u32 have ) {
     430        static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * ctx, __u32 idxs[], __u32 have, bool lazy ) {
    607431                __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd);
    608432
     
    612436                __atomic_store_n( &ctx->ext_sq.empty, false, __ATOMIC_SEQ_CST );
    613437
    614                 // Wake-up the poller
    615                 post( ctx->sem );
    616 
    617438                __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have);
    618439
     
    621442
    622443                // Submit our indexes
    623                 __submit(ctx, idxs, have);
     444                __submit(ctx, idxs, have, lazy);
    624445
    625446                __cfadbg_print_safe(io, "Kernel I/O : %u submitted from arbiter\n", have);
     
    630451
    631452                __STATS__( false, io.flush.external += 1; )
    632 
    633                 __revoke( this, ctx );
    634453
    635454                __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n");
     
    643462                ctx->ext_sq.empty = true;
    644463        }
    645 
    646         void __ioarbiter_register( $io_arbiter & mutex this, $io_context & ctx ) {
    647                 __cfadbg_print_safe(io, "Kernel I/O : registering new context\n");
    648 
    649                 ctx.arbiter = &this;
    650 
    651                 // add to available contexts
    652                 addHead( this.available, ctx );
    653 
    654                 // Check if this solves pending allocations
    655                 if(this.pending.flag) {
    656                         __ioarbiter_notify( ctx );
    657                 }
    658         }
    659 
    660         void __ioarbiter_unregister( $io_arbiter & mutex this, $io_context & ctx ) {
    661                 /* paranoid */ verify( &this == ctx.arbiter );
    662 
    663                 __revoke( this, &ctx );
    664 
    665                 remove( this.available, ctx );
    666         }
    667464#endif
  • libcfa/src/concurrency/io/call.cfa.in

    r6047b00 rdddb3dd0  
    7575
    7676        extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want)  __attribute__((nonnull (1,2)));
    77         extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have ) __attribute__((nonnull (1,2)));
     77        extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2)));
    7878#endif
    7979
     
    185185                return ', '.join(args_a)
    186186
    187 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, int submit_flags) {{
     187AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, __u64 submit_flags) {{
    188188        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_{op})
    189189                ssize_t res = {name}({args});
     
    216216
    217217                verify( sqe->user_data == (__u64)(uintptr_t)&future );
    218                 cfa_io_submit( ctx, &idx, 1 );
     218                cfa_io_submit( ctx, &idx, 1, 0 != (submit_flags & CFA_IO_LAZY) );
    219219        #endif
    220220}}"""
    221221
    222 SyncTemplate = """{ret} cfa_{name}({params}, int submit_flags) {{
     222SyncTemplate = """{ret} cfa_{name}({params}, __u64 submit_flags) {{
    223223        io_future_t future;
    224224
     
    388388        if c.define:
    389389                print("""#if defined({define})
    390         {ret} cfa_{name}({params}, int submit_flags);
     390        {ret} cfa_{name}({params}, __u64 submit_flags);
    391391#endif""".format(define=c.define,ret=c.ret, name=c.name, params=c.params))
    392392        else:
    393                 print("{ret} cfa_{name}({params}, int submit_flags);"
     393                print("{ret} cfa_{name}({params}, __u64 submit_flags);"
    394394                .format(ret=c.ret, name=c.name, params=c.params))
    395395
     
    399399        if c.define:
    400400                print("""#if defined({define})
    401         void async_{name}(io_future_t & future, {params}, int submit_flags);
     401        void async_{name}(io_future_t & future, {params}, __u64 submit_flags);
    402402#endif""".format(define=c.define,name=c.name, params=c.params))
    403403        else:
    404                 print("void async_{name}(io_future_t & future, {params}, int submit_flags);"
     404                print("void async_{name}(io_future_t & future, {params}, __u64 submit_flags);"
    405405                .format(name=c.name, params=c.params))
    406406print("\n")
  • libcfa/src/concurrency/io/setup.cfa

    r6047b00 rdddb3dd0  
    2626
    2727#if !defined(CFA_HAVE_LINUX_IO_URING_H)
    28         void __kernel_io_startup() {
    29                 // Nothing to do without io_uring
    30         }
    31 
    32         void __kernel_io_shutdown() {
    33                 // Nothing to do without io_uring
    34         }
    35 
    3628        void ?{}(io_context_params & this) {}
    3729
     
    9789
    9890//=============================================================================================
    99 // I/O Startup / Shutdown logic + Master Poller
    100 //=============================================================================================
    101 
    102         // IO Master poller loop forward
    103         static void * iopoll_loop( __attribute__((unused)) void * args );
    104 
    105         static struct {
    106                       pthread_t  thrd;    // pthread handle to io poller thread
    107                       void *     stack;   // pthread stack for io poller thread
    108                       int        epollfd; // file descriptor to the epoll instance
    109                 volatile     bool run;     // Whether or not to continue
    110                 volatile     bool stopped; // Whether the poller has finished running
    111                 volatile uint64_t epoch;   // Epoch used for memory reclamation
    112         } iopoll;
    113 
    114         void __kernel_io_startup(void) {
    115                 __cfadbg_print_safe(io_core, "Kernel : Creating EPOLL instance\n" );
    116 
    117                 iopoll.epollfd = epoll_create1(0);
    118                 if (iopoll.epollfd == -1) {
    119                         abort( "internal error, epoll_create1\n");
    120                 }
    121 
    122                 __cfadbg_print_safe(io_core, "Kernel : Starting io poller thread\n" );
    123 
    124                 iopoll.stack   = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );
    125                 iopoll.run     = true;
    126                 iopoll.stopped = false;
    127                 iopoll.epoch   = 0;
    128         }
    129 
    130         void __kernel_io_shutdown(void) {
    131                 // Notify the io poller thread of the shutdown
    132                 iopoll.run = false;
    133                 sigval val = { 1 };
    134                 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
    135 
    136                 // Wait for the io poller thread to finish
    137 
    138                 __destroy_pthread( iopoll.thrd, iopoll.stack, 0p );
    139 
    140                 int ret = close(iopoll.epollfd);
    141                 if (ret == -1) {
    142                         abort( "internal error, close epoll\n");
    143                 }
    144 
    145                 // Io polling is now fully stopped
    146 
    147                 __cfadbg_print_safe(io_core, "Kernel : IO poller stopped\n" );
    148         }
    149 
    150         static void * iopoll_loop( __attribute__((unused)) void * args ) {
    151                 __processor_id_t id;
    152                 id.full_proc = false;
    153                 id.id = doregister(&id);
    154                 __cfaabi_tls.this_proc_id = &id;
    155                 __cfadbg_print_safe(io_core, "Kernel : IO poller thread starting\n" );
    156 
    157                 // Block signals to control when they arrive
    158                 sigset_t mask;
    159                 sigfillset(&mask);
    160                 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
    161                 abort( "internal error, pthread_sigmask" );
    162                 }
    163 
    164                 sigdelset( &mask, SIGUSR1 );
    165 
    166                 // Create sufficient events
    167                 struct epoll_event events[10];
    168                 // Main loop
    169                 while( iopoll.run ) {
    170                         __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");
    171 
    172                         // increment the epoch to notify any deleters we are starting a new cycle
    173                         __atomic_fetch_add(&iopoll.epoch, 1, __ATOMIC_SEQ_CST);
    174 
    175                         // Wait for events
    176                         int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
    177 
    178                         __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);
    179 
    180                         // Check if an error occured
    181                         if (nfds == -1) {
    182                                 if( errno == EINTR ) continue;
    183                                 abort( "internal error, pthread_sigmask" );
    184                         }
    185 
    186                         for(i; nfds) {
    187                                 $io_context * io_ctx = ($io_context *)(uintptr_t)events[i].data.u64;
    188                                 /* paranoid */ verify( io_ctx );
    189                                 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->fd, io_ctx);
    190                                 #if !defined( __CFA_NO_STATISTICS__ )
    191                                         __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;
    192                                 #endif
    193 
    194                                 eventfd_t v;
    195                                 eventfd_read(io_ctx->efd, &v);
    196 
    197                                 post( io_ctx->sem );
    198                         }
    199                 }
    200 
    201                 __atomic_store_n(&iopoll.stopped, true, __ATOMIC_SEQ_CST);
    202 
    203                 __cfadbg_print_safe(io_core, "Kernel : IO poller thread stopping\n" );
    204                 unregister(&id);
    205                 return 0p;
    206         }
    207 
    208 //=============================================================================================
    20991// I/O Context Constrution/Destruction
    21092//=============================================================================================
    21193
    212         static void __io_uring_setup ( $io_context & this, const io_context_params & params_in );
     94
     95
     96        static void __io_uring_setup ( $io_context & this, const io_context_params & params_in, int procfd );
    21397        static void __io_uring_teardown( $io_context & this );
    21498        static void __epoll_register($io_context & ctx);
     
    217101        void __ioarbiter_unregister( $io_arbiter & mutex, $io_context & ctx );
    218102
    219         void ?{}($io_context & this, struct cluster & cl) {
    220                 (this.self){ "IO Poller", cl };
     103        void ?{}($io_context & this, processor * proc, struct cluster & cl) {
     104                /* paranoid */ verify( cl.io.arbiter );
     105                this.proc = proc;
     106                this.arbiter = cl.io.arbiter;
    221107                this.ext_sq.empty = true;
    222                 this.revoked = true;
    223                 __io_uring_setup( this, cl.io.params );
     108                (this.ext_sq.blocked){};
     109                __io_uring_setup( this, cl.io.params, proc->idle );
    224110                __cfadbg_print_safe(io_core, "Kernel I/O : Created ring for io_context %u (%p)\n", this.fd, &this);
    225 
    226                 __epoll_register(this);
    227 
    228                 __ioarbiter_register(*cl.io.arbiter, this);
    229 
    230                 __thrd_start( this, main );
    231                 __cfadbg_print_safe(io_core, "Kernel I/O : Started poller thread for io_context %u\n", this.fd);
    232         }
    233 
    234         void ^?{}($io_context & mutex this) {
     111        }
     112
     113        void ^?{}($io_context & this) {
    235114                __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %u\n", this.fd);
    236 
    237                 ^(this.self){};
    238                 __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %u\n", this.fd);
    239 
    240                 __ioarbiter_unregister(*this.arbiter, this);
    241 
    242                 __epoll_unregister(this);
    243115
    244116                __io_uring_teardown( this );
     
    246118        }
    247119
    248         void ?{}(io_context & this, struct cluster & cl) {
    249                 // this.ctx = new(cl);
    250                 this.ctx = alloc();
    251                 (*this.ctx){ cl };
    252 
    253                 __cfadbg_print_safe(io_core, "Kernel I/O : io_context %u ready\n", this.ctx->fd);
    254         }
    255 
    256         void ^?{}(io_context & this) {
    257                 post( this.ctx->sem );
    258 
    259                 delete(this.ctx);
    260         }
    261 
    262120        extern void __disable_interrupts_hard();
    263121        extern void __enable_interrupts_hard();
    264122
    265         static void __io_uring_setup( $io_context & this, const io_context_params & params_in ) {
     123        static void __io_uring_setup( $io_context & this, const io_context_params & params_in, int procfd ) {
    266124                // Step 1 : call to setup
    267125                struct io_uring_params params;
     
    339197                sq.dropped     = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
    340198
    341                 sq.kring.ready = 0;
    342199                sq.kring.released = 0;
    343200
     
    362219                // io_uring_register is so f*cking slow on some machine that it
    363220                // will never succeed if preemption isn't hard blocked
     221                __cfadbg_print_safe(io_core, "Kernel I/O : registering %d for completion with ring %d\n", procfd, fd);
     222
    364223                __disable_interrupts_hard();
    365224
    366                 int efd = eventfd(0, 0);
    367                 if (efd < 0) {
    368                         abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno));
    369                 }
    370 
    371                 int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1);
     225                int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &procfd, 1);
    372226                if (ret < 0) {
    373227                        abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno));
     
    375229
    376230                __enable_interrupts_hard();
     231
     232                __cfadbg_print_safe(io_core, "Kernel I/O : registered %d for completion with ring %d\n", procfd, fd);
    377233
    378234                // some paranoid checks
     
    390246                this.ring_flags = 0;
    391247                this.fd         = fd;
    392                 this.efd        = efd;
    393248        }
    394249
     
    411266                // close the file descriptor
    412267                close(this.fd);
    413                 close(this.efd);
    414268
    415269                free( this.sq.free_ring.array ); // Maybe null, doesn't matter
    416270        }
    417271
     272        void __cfa_io_start( processor * proc ) {
     273                proc->io.ctx = alloc();
     274                (*proc->io.ctx){proc, *proc->cltr};
     275        }
     276        void __cfa_io_stop ( processor * proc ) {
     277                ^(*proc->io.ctx){};
     278                free(proc->io.ctx);
     279        }
     280
    418281//=============================================================================================
    419282// I/O Context Sleep
    420283//=============================================================================================
    421         static inline void __epoll_ctl($io_context & ctx, int op, const char * error) {
    422                 struct epoll_event ev;
    423                 ev.events = EPOLLIN | EPOLLONESHOT;
    424                 ev.data.u64 = (__u64)&ctx;
    425                 int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev);
    426                 if (ret < 0) {
    427                         abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
    428                 }
    429         }
    430 
    431         static void __epoll_register($io_context & ctx) {
    432                 __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
    433         }
    434 
    435         static void __epoll_unregister($io_context & ctx) {
    436                 // Read the current epoch so we know when to stop
    437                 size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST);
    438 
    439                 // Remove the fd from the iopoller
    440                 __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");
    441 
    442                 // Notify the io poller thread of the shutdown
    443                 iopoll.run = false;
    444                 sigval val = { 1 };
    445                 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
    446 
    447                 // Make sure all this is done
    448                 __atomic_thread_fence(__ATOMIC_SEQ_CST);
    449 
    450                 // Wait for the next epoch
    451                 while(curr == iopoll.epoch && !iopoll.stopped) Pause();
    452         }
    453 
    454         void __ioctx_prepare_block($io_context & ctx) {
    455                 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx);
    456                 __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
    457         }
     284        // static inline void __epoll_ctl($io_context & ctx, int op, const char * error) {
     285        //      struct epoll_event ev;
     286        //      ev.events = EPOLLIN | EPOLLONESHOT;
     287        //      ev.data.u64 = (__u64)&ctx;
     288        //      int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev);
     289        //      if (ret < 0) {
     290        //              abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
     291        //      }
     292        // }
     293
     294        // static void __epoll_register($io_context & ctx) {
     295        //      __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
     296        // }
     297
     298        // static void __epoll_unregister($io_context & ctx) {
     299        //      // Read the current epoch so we know when to stop
     300        //      size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST);
     301
     302        //      // Remove the fd from the iopoller
     303        //      __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");
     304
     305        //      // Notify the io poller thread of the shutdown
     306        //      iopoll.run = false;
     307        //      sigval val = { 1 };
     308        //      pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
     309
     310        //      // Make sure all this is done
     311        //      __atomic_thread_fence(__ATOMIC_SEQ_CST);
     312
     313        //      // Wait for the next epoch
     314        //      while(curr == iopoll.epoch && !iopoll.stopped) Pause();
     315        // }
     316
     317        // void __ioctx_prepare_block($io_context & ctx) {
     318        //      __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx);
     319        //      __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
     320        // }
    458321
    459322
     
    466329
    467330        void ^?{}( $io_arbiter & mutex this ) {
    468                 /* paranoid */ verify( empty(this.assigned) );
    469                 /* paranoid */ verify( empty(this.available) );
     331                // /* paranoid */ verify( empty(this.assigned) );
     332                // /* paranoid */ verify( empty(this.available) );
    470333                /* paranoid */ verify( is_empty(this.pending.blocked) );
    471334        }
  • libcfa/src/concurrency/io/types.hfa

    r6047b00 rdddb3dd0  
    3838                        volatile __u32 * head;   // one passed last index consumed by the kernel
    3939                        volatile __u32 * tail;   // one passed last index visible to the kernel
    40                         volatile __u32 ready;    // one passed last index added to array ()
    4140                        volatile __u32 released; // one passed last index released back to the free list
    4241
     
    9796
    9897        struct __attribute__((aligned(128))) $io_context {
    99                 inline Seqable;
    100 
    101                 volatile bool revoked;
     98                $io_arbiter * arbiter;
    10299                processor * proc;
    103 
    104                 $io_arbiter * arbiter;
    105100
    106101                struct {
     
    113108                __u32 ring_flags;
    114109                int fd;
    115                 int efd;
    116 
    117                 single_sem sem;
    118                 $thread self;
    119110        };
    120 
    121         void main( $io_context & this );
    122         static inline $thread  * get_thread ( $io_context & this ) __attribute__((const)) { return &this.self; }
    123         static inline $monitor * get_monitor( $io_context & this ) __attribute__((const)) { return &this.self.self_mon; }
    124         static inline $io_context *& Back( $io_context * n ) { return ($io_context *)Back( (Seqable *)n ); }
    125         static inline $io_context *& Next( $io_context * n ) { return ($io_context *)Next( (Colable *)n ); }
    126         void ^?{}( $io_context & mutex this );
    127111
    128112        monitor __attribute__((aligned(128))) $io_arbiter {
     
    132116                        volatile bool flag;
    133117                } pending;
    134 
    135                 Sequence($io_context) assigned;
    136 
    137                 Sequence($io_context) available;
    138118        };
    139119
     
    167147        #endif
    168148
    169         void __ioctx_prepare_block($io_context & ctx);
     149        // void __ioctx_prepare_block($io_context & ctx);
    170150#endif
    171151
  • libcfa/src/concurrency/iofwd.hfa

    r6047b00 rdddb3dd0  
    5959// underlying calls
    6060extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want)  __attribute__((nonnull (1,2)));
    61 extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have ) __attribute__((nonnull (1,2)));
     61extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2)));
    6262
    6363//----------
    6464// synchronous calls
    6565#if defined(CFA_HAVE_PREADV2)
    66         extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags);
     66        extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);
    6767#endif
    6868#if defined(CFA_HAVE_PWRITEV2)
    69         extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags);
     69        extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);
    7070#endif
    71 extern int cfa_fsync(int fd, int submit_flags);
    72 extern int cfa_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event, int submit_flags);
    73 extern int cfa_sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags, int submit_flags);
    74 extern  ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags);
    75 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags);
    76 extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags);
    77 extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags);
    78 extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags);
    79 extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags);
    80 extern int cfa_fallocate(int fd, int mode, off_t offset, off_t len, int submit_flags);
    81 extern int cfa_posix_fadvise(int fd, off_t offset, off_t len, int advice, int submit_flags);
    82 extern int cfa_madvise(void *addr, size_t length, int advice, int submit_flags);
    83 extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags);
     71extern int cfa_fsync(int fd, __u64 submit_flags);
     72extern int cfa_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event, __u64 submit_flags);
     73extern int cfa_sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags, __u64 submit_flags);
     74extern  ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, __u64 submit_flags);
     75extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, __u64 submit_flags);
     76extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, __u64 submit_flags);
     77extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, __u64 submit_flags);
     78extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, __u64 submit_flags);
     79extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, __u64 submit_flags);
     80extern int cfa_fallocate(int fd, int mode, off_t offset, off_t len, __u64 submit_flags);
     81extern int cfa_posix_fadvise(int fd, off_t offset, off_t len, int advice, __u64 submit_flags);
     82extern int cfa_madvise(void *addr, size_t length, int advice, __u64 submit_flags);
     83extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, __u64 submit_flags);
    8484#if defined(CFA_HAVE_OPENAT2)
    85         extern int cfa_openat2(int dirfd, const char *pathname, struct open_how * how, size_t size, int submit_flags);
     85        extern int cfa_openat2(int dirfd, const char *pathname, struct open_how * how, size_t size, __u64 submit_flags);
    8686#endif
    87 extern int cfa_close(int fd, int submit_flags);
     87extern int cfa_close(int fd, __u64 submit_flags);
    8888#if defined(CFA_HAVE_STATX)
    89         extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags);
     89        extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, __u64 submit_flags);
    9090#endif
    91 extern ssize_t cfa_read(int fd, void * buf, size_t count, int submit_flags);
    92 extern ssize_t cfa_write(int fd, void * buf, size_t count, int submit_flags);
    93 extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags);
    94 extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags);
     91extern ssize_t cfa_read(int fd, void * buf, size_t count, __u64 submit_flags);
     92extern ssize_t cfa_write(int fd, void * buf, size_t count, __u64 submit_flags);
     93extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, __u64 submit_flags);
     94extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, __u64 submit_flags);
    9595
    9696//----------
    9797// asynchronous calls
    9898#if defined(CFA_HAVE_PREADV2)
    99         extern void async_preadv2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags);
     99        extern void async_preadv2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);
    100100#endif
    101101#if defined(CFA_HAVE_PWRITEV2)
    102         extern void async_pwritev2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags);
     102        extern void async_pwritev2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);
    103103#endif
    104 extern void async_fsync(io_future_t & future, int fd, int submit_flags);
    105 extern void async_epoll_ctl(io_future_t & future, int epfd, int op, int fd, struct epoll_event *event, int submit_flags);
    106 extern void async_sync_file_range(io_future_t & future, int fd, off64_t offset, off64_t nbytes, unsigned int flags, int submit_flags);
    107 extern void async_sendmsg(io_future_t & future, int sockfd, const struct msghdr *msg, int flags, int submit_flags);
    108 extern void async_recvmsg(io_future_t & future, int sockfd, struct msghdr *msg, int flags, int submit_flags);
    109 extern void async_send(io_future_t & future, int sockfd, const void *buf, size_t len, int flags, int submit_flags);
    110 extern void async_recv(io_future_t & future, int sockfd, void *buf, size_t len, int flags, int submit_flags);
    111 extern void async_accept4(io_future_t & future, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags);
    112 extern void async_connect(io_future_t & future, int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags);
    113 extern void async_fallocate(io_future_t & future, int fd, int mode, off_t offset, off_t len, int submit_flags);
    114 extern void async_posix_fadvise(io_future_t & future, int fd, off_t offset, off_t len, int advice, int submit_flags);
    115 extern void async_madvise(io_future_t & future, void *addr, size_t length, int advice, int submit_flags);
    116 extern void async_openat(io_future_t & future, int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags);
     104extern void async_fsync(io_future_t & future, int fd, __u64 submit_flags);
     105extern void async_epoll_ctl(io_future_t & future, int epfd, int op, int fd, struct epoll_event *event, __u64 submit_flags);
     106extern void async_sync_file_range(io_future_t & future, int fd, off64_t offset, off64_t nbytes, unsigned int flags, __u64 submit_flags);
     107extern void async_sendmsg(io_future_t & future, int sockfd, const struct msghdr *msg, int flags, __u64 submit_flags);
     108extern void async_recvmsg(io_future_t & future, int sockfd, struct msghdr *msg, int flags, __u64 submit_flags);
     109extern void async_send(io_future_t & future, int sockfd, const void *buf, size_t len, int flags, __u64 submit_flags);
     110extern void async_recv(io_future_t & future, int sockfd, void *buf, size_t len, int flags, __u64 submit_flags);
     111extern void async_accept4(io_future_t & future, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, __u64 submit_flags);
     112extern void async_connect(io_future_t & future, int sockfd, const struct sockaddr *addr, socklen_t addrlen, __u64 submit_flags);
     113extern void async_fallocate(io_future_t & future, int fd, int mode, off_t offset, off_t len, __u64 submit_flags);
     114extern void async_posix_fadvise(io_future_t & future, int fd, off_t offset, off_t len, int advice, __u64 submit_flags);
     115extern void async_madvise(io_future_t & future, void *addr, size_t length, int advice, __u64 submit_flags);
     116extern void async_openat(io_future_t & future, int dirfd, const char *pathname, int flags, mode_t mode, __u64 submit_flags);
    117117#if defined(CFA_HAVE_OPENAT2)
    118         extern void async_openat2(io_future_t & future, int dirfd, const char *pathname, struct open_how * how, size_t size, int submit_flags);
     118        extern void async_openat2(io_future_t & future, int dirfd, const char *pathname, struct open_how * how, size_t size, __u64 submit_flags);
    119119#endif
    120 extern void async_close(io_future_t & future, int fd, int submit_flags);
     120extern void async_close(io_future_t & future, int fd, __u64 submit_flags);
    121121#if defined(CFA_HAVE_STATX)
    122         extern void async_statx(io_future_t & future, int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags);
     122        extern void async_statx(io_future_t & future, int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, __u64 submit_flags);
    123123#endif
    124 void async_read(io_future_t & future, int fd, void * buf, size_t count, int submit_flags);
    125 extern void async_write(io_future_t & future, int fd, void * buf, size_t count, int submit_flags);
    126 extern void async_splice(io_future_t & future, int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags);
    127 extern void async_tee(io_future_t & future, int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags);
     124void async_read(io_future_t & future, int fd, void * buf, size_t count, __u64 submit_flags);
     125extern void async_write(io_future_t & future, int fd, void * buf, size_t count, __u64 submit_flags);
     126extern void async_splice(io_future_t & future, int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, __u64 submit_flags);
     127extern void async_tee(io_future_t & future, int fd_in, int fd_out, size_t len, unsigned int flags, __u64 submit_flags);
    128128
    129129
  • libcfa/src/concurrency/kernel.cfa

    r6047b00 rdddb3dd0  
    2222#include <signal.h>
    2323#include <unistd.h>
     24extern "C" {
     25        #include <sys/eventfd.h>
     26}
    2427
    2528//CFA Includes
     
    109112static void __run_thread(processor * this, $thread * dst);
    110113static void __wake_one(cluster * cltr);
    111 static void wait(__bin_sem_t & this);
    112114
    113115static void push  (__cluster_idles & idles, processor & proc);
     
    115117static [unsigned idle, unsigned total, * processor] query( & __cluster_idles idles );
    116118
     119extern void __cfa_io_start( processor * );
     120extern void __cfa_io_drain( processor * );
     121extern void __cfa_io_flush( processor * );
     122extern void __cfa_io_stop ( processor * );
     123static inline void __maybe_io_drain( processor * );
     124
     125extern void __disable_interrupts_hard();
     126extern void __enable_interrupts_hard();
    117127
    118128//=============================================================================================
     
    130140        verify(this);
    131141
     142        __cfa_io_start( this );
     143
    132144        __cfadbg_print_safe(runtime_core, "Kernel : core %p starting\n", this);
    133145        #if !defined(__CFA_NO_STATISTICS__)
     
    151163                MAIN_LOOP:
    152164                for() {
     165                        // Check if there is pending io
     166                        __maybe_io_drain( this );
     167
    153168                        // Try to get the next thread
    154169                        readyThread = __next_thread( this->cltr );
    155170
    156171                        if( !readyThread ) {
     172                                __cfa_io_flush( this );
    157173                                readyThread = __next_thread_slow( this->cltr );
    158174                        }
     
    190206                                #endif
    191207
    192                                 wait( this->idle );
     208                                __cfadbg_print_safe(runtime_core, "Kernel : core %p waiting on eventfd %d\n", this, this->idle);
     209
     210                                __disable_interrupts_hard();
     211                                eventfd_t val;
     212                                eventfd_read( this->idle, &val );
     213                                __enable_interrupts_hard();
    193214
    194215                                #if !defined(__CFA_NO_STATISTICS__)
     
    206227
    207228                        /* paranoid */ verify( readyThread );
     229
     230                        // Reset io dirty bit
     231                        this->io.dirty = false;
    208232
    209233                        // We found a thread run it
     
    220244                                }
    221245                        #endif
     246
     247                        if(this->io.pending && !this->io.dirty) {
     248                                __cfa_io_flush( this );
     249                        }
    222250                }
    223251
     
    225253        }
    226254
     255        __cfa_io_stop( this );
     256
    227257        post( this->terminated );
     258
    228259
    229260        if(this == mainProcessor) {
     
    248279        /* paranoid */ verifyf( thrd_dst->link.next == 0p, "Expected null got %p", thrd_dst->link.next );
    249280        __builtin_prefetch( thrd_dst->context.SP );
     281
     282        __cfadbg_print_safe(runtime_core, "Kernel : core %p running thread %p (%s)\n", this, thrd_dst, thrd_dst->self_cor.name);
    250283
    251284        $coroutine * proc_cor = get_coroutine(this->runner);
     
    330363        // Just before returning to the processor, set the processor coroutine to active
    331364        proc_cor->state = Active;
     365
     366        __cfadbg_print_safe(runtime_core, "Kernel : core %p finished running thread %p\n", this, thrd_dst);
    332367
    333368        /* paranoid */ verify( ! __preemption_enabled() );
     
    549584// Kernel Idle Sleep
    550585//=============================================================================================
    551 extern "C" {
    552         char * strerror(int);
    553 }
    554 #define CHECKED(x) { int err = x; if( err != 0 ) abort("KERNEL ERROR: Operation \"" #x "\" return error %d - %s\n", err, strerror(err)); }
    555 
    556 static void wait(__bin_sem_t & this) with( this ) {
    557         verify(__cfaabi_dbg_in_kernel());
    558         CHECKED( pthread_mutex_lock(&lock) );
    559                 while(val < 1) {
    560                         pthread_cond_wait(&cond, &lock);
    561                 }
    562                 val -= 1;
    563         CHECKED( pthread_mutex_unlock(&lock) );
    564 }
    565 
    566 static bool post(__bin_sem_t & this) with( this ) {
    567         bool needs_signal = false;
    568 
    569         CHECKED( pthread_mutex_lock(&lock) );
    570                 if(val < 1) {
    571                         val += 1;
    572                         pthread_cond_signal(&cond);
    573                         needs_signal = true;
    574                 }
    575         CHECKED( pthread_mutex_unlock(&lock) );
    576 
    577         return needs_signal;
    578 }
    579 
    580 #undef CHECKED
    581 
    582586// Wake a thread from the front if there are any
    583587static void __wake_one(cluster * this) {
     
    595599
    596600        // We found a processor, wake it up
    597         post( p->idle );
     601        eventfd_t val;
     602        val = 1;
     603        eventfd_write( p->idle, val );
    598604
    599605        #if !defined(__CFA_NO_STATISTICS__)
     
    613619        disable_interrupts();
    614620                /* paranoid */ verify( ! __preemption_enabled() );
    615                 post( this->idle );
     621                eventfd_t val;
     622                val = 1;
     623                eventfd_read( this->idle, &val );
    616624        enable_interrupts( __cfaabi_dbg_ctx );
    617625}
     
    696704// Kernel Utilities
    697705//=============================================================================================
     706#if defined(CFA_HAVE_LINUX_IO_URING_H)
     707#include "io/types.hfa"
     708#endif
     709
     710static inline void __maybe_io_drain( processor * proc ) {
     711        #if defined(CFA_HAVE_LINUX_IO_URING_H)
     712                __cfadbg_print_safe(runtime_core, "Kernel : core %p checking io for ring %d\n", proc, proc->io.ctx->fd);
     713
     714                // Check if we should drain the queue
     715                $io_context * ctx = proc->io.ctx;
     716                unsigned head = *ctx->cq.head;
     717                unsigned tail = *ctx->cq.tail;
     718                if(head != tail) __cfa_io_drain( proc );
     719        #endif
     720}
     721
    698722//-----------------------------------------------------------------------------
    699723// Debug
  • libcfa/src/concurrency/kernel.hfa

    r6047b00 rdddb3dd0  
    2828}
    2929
    30 //-----------------------------------------------------------------------------
    31 // Underlying Locks
    3230#ifdef __CFA_WITH_VERIFY__
    3331        extern bool __cfaabi_dbg_in_kernel();
    3432#endif
    35 
    36 struct __bin_sem_t {
    37         pthread_mutex_t         lock;
    38         pthread_cond_t          cond;
    39         int                     val;
    40 };
    4133
    4234//-----------------------------------------------------------------------------
     
    5244void  ?{}(io_context_params & this);
    5345
    54 struct io_context {
    55         $io_context * ctx;
    56         cluster * cltr;
    57 };
    58 void  ?{}(io_context & this, struct cluster & cl);
    59 void ^?{}(io_context & this);
    60 
    6146//-----------------------------------------------------------------------------
    6247// Processor
     
    9883
    9984        struct {
    100                 $io_context * volatile ctx;
    101                 volatile bool lock;
     85                $io_context * ctx;
     86                bool pending;
     87                bool dirty;
    10288        } io;
    10389
     
    11096
    11197        // Idle lock (kernel semaphore)
    112         __bin_sem_t idle;
     98        int idle;
    11399
    114100        // Termination synchronisation (user semaphore)
  • libcfa/src/concurrency/kernel/startup.cfa

    r6047b00 rdddb3dd0  
    2222extern "C" {
    2323      #include <limits.h>       // PTHREAD_STACK_MIN
     24        #include <sys/eventfd.h>  // eventfd
    2425      #include <sys/mman.h>     // mprotect
    2526      #include <sys/resource.h> // getrlimit
     
    8081static void ?{}(processorCtx_t & this) {}
    8182static void ?{}(processorCtx_t & this, processor * proc, current_stack_info_t * info);
    82 static void ?{}(__bin_sem_t & this);
    83 static void ^?{}(__bin_sem_t & this);
    8483
    8584#if defined(__CFA_WITH_VERIFY__)
     
    9190extern void __kernel_alarm_startup(void);
    9291extern void __kernel_alarm_shutdown(void);
    93 extern void __kernel_io_startup (void);
    94 extern void __kernel_io_shutdown(void);
    9592
    9693//-----------------------------------------------------------------------------
     
    104101KERNEL_STORAGE($thread,              mainThread);
    105102KERNEL_STORAGE(__stack_t,            mainThreadCtx);
    106 KERNEL_STORAGE(io_context,           mainIoContext);
    107103KERNEL_STORAGE(__scheduler_RWLock_t, __scheduler_lock);
    108104#if !defined(__CFA_NO_STATISTICS__)
     
    200196
    201197        void ?{}(processor & this) with( this ) {
    202                 ( this.idle ){};
    203198                ( this.terminated ){};
    204199                ( this.runner ){};
     
    228223        __kernel_alarm_startup();
    229224
    230         // Start IO
    231         __kernel_io_startup();
    232 
    233         io_context * mainio = (io_context *)&storage_mainIoContext;
    234         (*mainio){ *mainCluster };
    235 
    236225        // Add the main thread to the ready queue
    237226        // once resume is called on mainProcessor->runner the mainThread needs to be scheduled like any normal thread
     
    255244
    256245static void __kernel_shutdown(void) {
    257         //Before we start shutting things down, wait for systems that need threading to shutdown
    258         io_context * mainio = (io_context *)&storage_mainIoContext;
    259         ^(*mainio){};
    260 
    261246        /* paranoid */ verify( __preemption_enabled() );
    262247        disable_interrupts();
     
    276261        // Disable preemption
    277262        __kernel_alarm_shutdown();
    278 
    279         // Stop IO
    280         __kernel_io_shutdown();
    281263
    282264        // Destroy the main processor and its context in reverse order of construction
     
    479461
    480462        this.io.ctx = 0p;
    481         this.io.lock = false;
     463        this.io.pending = false;
     464        this.io.dirty   = false;
     465
     466        this.idle = eventfd(0, 0);
     467        if (idle < 0) {
     468                abort("KERNEL ERROR: PROCESSOR EVENTFD - %s\n", strerror(errno));
     469        }
    482470
    483471        #if !defined(__CFA_NO_STATISTICS__)
     
    521509        // Finally we don't need the read_lock any more
    522510        unregister((__processor_id_t*)&this);
     511
     512        close(this.idle);
    523513}
    524514
    525515void ?{}(processor & this, const char name[], cluster & _cltr) {
    526         ( this.idle ){};
    527516        ( this.terminated ){};
    528517        ( this.runner ){};
     
    726715}
    727716
    728 extern "C" {
    729         char * strerror(int);
    730 }
    731 #define CHECKED(x) { int err = x; if( err != 0 ) abort("KERNEL ERROR: Operation \"" #x "\" return error %d - %s\n", err, strerror(err)); }
    732 
    733 static void ?{}(__bin_sem_t & this) with( this ) {
    734         // Create the mutex with error checking
    735         pthread_mutexattr_t mattr;
    736         pthread_mutexattr_init( &mattr );
    737         pthread_mutexattr_settype( &mattr, PTHREAD_MUTEX_ERRORCHECK_NP);
    738         pthread_mutex_init(&lock, &mattr);
    739 
    740         pthread_cond_init (&cond, (const pthread_condattr_t *)0p);  // workaround trac#208: cast should not be required
    741         val = 0;
    742 }
    743 
    744 static void ^?{}(__bin_sem_t & this) with( this ) {
    745         CHECKED( pthread_mutex_destroy(&lock) );
    746         CHECKED( pthread_cond_destroy (&cond) );
    747 }
    748 
    749 #undef CHECKED
    750 
    751717#if defined(__CFA_WITH_VERIFY__)
    752718static bool verify_fwd_bck_rng(void) {
  • libcfa/src/concurrency/stats.cfa

    r6047b00 rdddb3dd0  
    3333                        stats->io.submit.slow       = 0;
    3434                        stats->io.flush.external    = 0;
    35                         stats->io.calls.count       = 0;
     35                        stats->io.calls.flush       = 0;
    3636                        stats->io.calls.submitted   = 0;
     37                        stats->io.calls.drain       = 0;
    3738                        stats->io.calls.completed   = 0;
    38                         stats->io.calls.blocks      = 0;
    3939                        stats->io.calls.errors.busy = 0;
    4040                        stats->io.poller.sleeps     = 0;
     
    6767                        __atomic_fetch_add( &cltr->io.submit.slow      , proc->io.submit.slow      , __ATOMIC_SEQ_CST ); proc->io.submit.slow       = 0;
    6868                        __atomic_fetch_add( &cltr->io.flush.external   , proc->io.flush.external   , __ATOMIC_SEQ_CST ); proc->io.flush.external    = 0;
    69                         __atomic_fetch_add( &cltr->io.calls.count      , proc->io.calls.count      , __ATOMIC_SEQ_CST ); proc->io.calls.count       = 0;
     69                        __atomic_fetch_add( &cltr->io.calls.flush      , proc->io.calls.flush      , __ATOMIC_SEQ_CST ); proc->io.calls.flush       = 0;
    7070                        __atomic_fetch_add( &cltr->io.calls.submitted  , proc->io.calls.submitted  , __ATOMIC_SEQ_CST ); proc->io.calls.submitted   = 0;
     71                        __atomic_fetch_add( &cltr->io.calls.drain      , proc->io.calls.drain      , __ATOMIC_SEQ_CST ); proc->io.calls.drain       = 0;
    7172                        __atomic_fetch_add( &cltr->io.calls.completed  , proc->io.calls.completed  , __ATOMIC_SEQ_CST ); proc->io.calls.completed   = 0;
    72                         __atomic_fetch_add( &cltr->io.calls.blocks     , proc->io.calls.blocks     , __ATOMIC_SEQ_CST ); proc->io.calls.blocks      = 0;
    7373                        __atomic_fetch_add( &cltr->io.calls.errors.busy, proc->io.calls.errors.busy, __ATOMIC_SEQ_CST ); proc->io.calls.errors.busy = 0;
    7474                        __atomic_fetch_add( &cltr->io.poller.sleeps    , proc->io.poller.sleeps    , __ATOMIC_SEQ_CST ); proc->io.poller.sleeps     = 0;
     
    110110                                double avgfasts = ((double)io.submit.fast) / total_submits;
    111111
    112                                 double avgsubs = ((double)io.calls.submitted) / io.calls.count;
    113                                 double avgcomp = ((double)io.calls.completed) / io.calls.count;
     112                                double avgsubs = ((double)io.calls.submitted) / io.calls.flush;
     113                                double avgcomp = ((double)io.calls.completed) / io.calls.drain;
    114114
    115115                                __cfaabi_bits_print_safe( STDOUT_FILENO,
     
    129129                                        , io.submit.fast, io.submit.slow, avgfasts
    130130                                        , io.flush.external
    131                                         , io.calls.count, io.calls.blocks, io.calls.errors.busy
     131                                        , io.calls.flush, io.calls.drain, io.calls.errors.busy
    132132                                        , io.calls.submitted, avgsubs
    133133                                        , io.calls.completed, avgcomp
  • libcfa/src/concurrency/stats.hfa

    r6047b00 rdddb3dd0  
    8080                        } flush;
    8181                        struct {
    82                                 volatile uint64_t count;
     82                                volatile uint64_t drain;
     83                                volatile uint64_t completed;
     84                                volatile uint64_t flush;
    8385                                volatile uint64_t submitted;
    84                                 volatile uint64_t completed;
    85                                 volatile uint64_t blocks;
    8686                                struct {
    8787                                        volatile uint64_t busy;
Note: See TracChangeset for help on using the changeset viewer.