Ignore:
Timestamp:
Mar 2, 2021, 1:58:12 PM (8 months ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
arm-eh, jacob/cs343-translation, master, new-ast-unique-expr
Children:
2cd784a
Parents:
6047b00
Message:

Changed io to use ring per kernel threads.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    r6047b00 rdddb3dd0  
    3232        extern "C" {
    3333                #include <sys/syscall.h>
     34                #include <sys/eventfd.h>
    3435
    3536                #include <linux/io_uring.h>
     
    7980        };
    8081
    81 //=============================================================================================
    82 // I/O Syscall
    83 //=============================================================================================
    84         static int __io_uring_enter( struct $io_context & ctx, unsigned to_submit, bool get ) {
    85                 __STATS__( false, io.calls.count++; )
    86                 bool need_sys_to_submit = false;
    87                 bool need_sys_to_complete = false;
    88                 unsigned flags = 0;
    89 
    90                 TO_SUBMIT:
    91                 if( to_submit > 0 ) {
    92                         if( !(ctx.ring_flags & IORING_SETUP_SQPOLL) ) {
    93                                 need_sys_to_submit = true;
    94                                 break TO_SUBMIT;
    95                         }
    96                         if( (*ctx.sq.flags) & IORING_SQ_NEED_WAKEUP ) {
    97                                 need_sys_to_submit = true;
    98                                 flags |= IORING_ENTER_SQ_WAKEUP;
    99                         }
    100                 }
    101 
    102                 if( get && !(ctx.ring_flags & IORING_SETUP_SQPOLL) ) {
    103                         flags |= IORING_ENTER_GETEVENTS;
    104                         if( (ctx.ring_flags & IORING_SETUP_IOPOLL) ) {
    105                                 need_sys_to_complete = true;
    106                         }
    107                 }
    108 
    109                 int ret = 0;
    110                 if( need_sys_to_submit || need_sys_to_complete ) {
    111                         __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ctx.fd, to_submit, flags);
    112                         __STATS__( false, io.calls.blocks++; )
    113                         ret = syscall( __NR_io_uring_enter, ctx.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8);
    114                         __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING %d returned %d\n", ctx.fd, ret);
    115                 }
    116 
    117                 // Memory barrier
    118                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
    119                 return ret;
    120         }
    121 
     82        static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor *, __u32 idxs[], __u32 want );
     83        static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * , __u32 idxs[], __u32 have, bool lazy );
     84        static void __ioarbiter_flush ( $io_arbiter & mutex this, $io_context * );
     85        static inline void __ioarbiter_notify( $io_context & ctx );
    12286//=============================================================================================
    12387// I/O Polling
     
    12690        static inline __u32 __release_sqes( struct $io_context & );
    12791
    128         static bool __drain_io( struct  $io_context & ctx ) {
    129                 unsigned to_submit = __flush( ctx );
    130                 int ret = __io_uring_enter( ctx, to_submit, true );
     92        void __cfa_io_drain( processor * proc ) {
     93                /* paranoid */ verify( ! __preemption_enabled() );
     94                /* paranoid */ verify( proc );
     95                /* paranoid */ verify( proc->io.ctx );
     96
     97                // Drain the queue
     98                $io_context * ctx = proc->io.ctx;
     99                unsigned head = *ctx->cq.head;
     100                unsigned tail = *ctx->cq.tail;
     101                const __u32 mask = *ctx->cq.mask;
     102
     103                __u32 count = tail - head;
     104                __STATS__( false, io.calls.drain++; io.calls.completed += count; )
     105
     106                for(i; count) {
     107                        unsigned idx = (head + i) & mask;
     108                        volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx];
     109
     110                        /* paranoid */ verify(&cqe);
     111
     112                        struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
     113                        __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
     114
     115                        fulfil( *future, cqe.res );
     116                }
     117
     118                __cfadbg_print_safe(io, "Kernel I/O : %u completed\n", count);
     119
     120                // Mark to the kernel that the cqe has been seen
     121                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
     122                __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST );
     123
     124                /* paranoid */ verify( ! __preemption_enabled() );
     125
     126                return;
     127        }
     128
     129        void __cfa_io_flush( processor * proc ) {
     130                /* paranoid */ verify( ! __preemption_enabled() );
     131                /* paranoid */ verify( proc );
     132                /* paranoid */ verify( proc->io.ctx );
     133
     134                $io_context & ctx = *proc->io.ctx;
     135
     136                if(!ctx.ext_sq.empty) {
     137                        __ioarbiter_flush( *ctx.arbiter, &ctx );
     138                }
     139
     140                __STATS__( true, io.calls.flush++; )
     141                int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, 0, 0, (sigset_t *)0p, _NSIG / 8);
    131142                if( ret < 0 ) {
    132143                        switch((int)errno) {
     
    136147                                // Update statistics
    137148                                __STATS__( false, io.calls.errors.busy ++; )
    138                                 return true;
    139                                 break;
     149                                return;
    140150                        default:
    141151                                abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
     
    143153                }
    144154
    145                 // update statistics
    146                 if (to_submit > 0) {
    147                         __STATS__( false, io.calls.submitted += ret; )
    148                         /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
    149 
    150                         /* paranoid */ verify( ctx.sq.to_submit >= ret );
    151                         ctx.sq.to_submit -= ret;
    152 
    153                         /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
    154 
    155                         if(ret) {
    156                                 __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring\n", ret);
    157                         }
    158                 }
     155                __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd);
     156                __STATS__( true, io.calls.submitted += ret; )
     157                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     158                /* paranoid */ verify( ctx.sq.to_submit >= ret );
     159
     160                ctx.sq.to_submit -= ret;
     161
     162                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
    159163
    160164                // Release the consumed SQEs
    161165                __release_sqes( ctx );
    162166
    163                 // Drain the queue
    164                 unsigned head = *ctx.cq.head;
    165                 unsigned tail = *ctx.cq.tail;
    166                 const __u32 mask = *ctx.cq.mask;
    167 
    168                 // Nothing was new return 0
    169                 if (head == tail) {
    170                         return ctx.sq.to_submit > 0;
    171                 }
    172 
    173                 __u32 count = tail - head;
    174                 /* paranoid */ verify( count != 0 );
    175                 __STATS__( false, io.calls.completed += count; )
    176 
    177                 for(i; count) {
    178                         unsigned idx = (head + i) & mask;
    179                         volatile struct io_uring_cqe & cqe = ctx.cq.cqes[idx];
    180 
    181                         /* paranoid */ verify(&cqe);
    182 
    183                         struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
    184                         __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
    185 
    186                         fulfil( *future, cqe.res );
    187                 }
    188 
    189                 if(count) {
    190                         __cfadbg_print_safe(io, "Kernel I/O : %u completed\n", count);
    191                 }
    192 
    193                 // Mark to the kernel that the cqe has been seen
    194                 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
    195                 __atomic_store_n( ctx.cq.head, head + count, __ATOMIC_SEQ_CST );
    196 
    197                 return count > 0 || to_submit > 0;
    198         }
    199 
    200         void main( $io_context & this ) {
    201                 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.fd, &this);
    202 
    203                 const int reset_cnt = 5;
    204                 int reset = reset_cnt;
    205                 // Then loop until we need to start
    206                 LOOP:
    207                 while() {
    208                         waitfor( ^?{} : this) {
    209                                 break LOOP;
    210                         }
    211                         or else {}
    212 
    213                         // Drain the io
    214                         bool again = __drain_io( this );
    215 
    216                         if(!again) reset--;
    217 
    218                         // If we got something, just yield and check again
    219                         if(reset > 1) {
    220                                 yield();
    221                                 continue LOOP;
    222                         }
    223 
    224                         // We alread failed to find completed entries a few time.
    225                         if(reset == 1) {
    226                                 // Rearm the context so it can block
    227                                 // but don't block right away
    228                                 // we need to retry one last time in case
    229                                 // something completed *just now*
    230                                 __ioctx_prepare_block( this );
    231                                 continue LOOP;
    232                         }
    233 
    234                         __STATS__( false,
    235                                 io.poller.sleeps += 1;
    236                         )
    237                         __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.fd, &this);
    238 
    239                         // block this thread
    240                         wait( this.sem );
    241 
    242                         // restore counter
    243                         reset = reset_cnt;
    244                 }
    245 
    246                 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.fd, &this);
     167                /* paranoid */ verify( ! __preemption_enabled() );
     168
     169                ctx.proc->io.pending = false;
    247170        }
    248171
     
    266189//         head and tail must be fully filled and shouldn't ever be touched again.
    267190//
    268 
    269         static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor *, __u32 idxs[], __u32 want );
    270         static void __ioarbiter_submit  ( $io_arbiter & mutex this, $io_context * , __u32 idxs[], __u32 have );
    271         static void __ioarbiter_flush   ( $io_arbiter & mutex this, $io_context * );
    272         static inline void __ioarbiter_notify( $io_context & ctx );
    273 
    274191        //=============================================================================================
    275192        // Allocation
     
    278195                struct io_uring_sqe * sqes = ctx->sq.sqes;
    279196                for(i; want) {
     197                        __cfadbg_print_safe(io, "Kernel I/O : filling loop\n");
    280198                        out_sqes[i] = &sqes[idxs[i]];
    281199                }
     
    295213                // copy all the indexes we want from the available list
    296214                for(i; want) {
     215                        __cfadbg_print_safe(io, "Kernel I/O : allocating loop\n");
    297216                        idxs[i] = sq.free_ring.array[(fhead + i) & mask];
    298217                }
     
    315234                disable_interrupts();
    316235                processor * proc = __cfaabi_tls.this_processor;
     236                $io_context * ctx = proc->io.ctx;
    317237                /* paranoid */ verify( __cfaabi_tls.this_processor );
    318                 /* paranoid */ verify( proc->io.lock == false );
    319 
    320                 __atomic_store_n( &proc->io.lock, true, __ATOMIC_SEQ_CST );
    321                 $io_context * ctx = proc->io.ctx;
     238                /* paranoid */ verify( ctx );
     239
     240                __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n");
     241
     242                // We can proceed to the fast path
     243                if( __alloc(ctx, idxs, want) ) {
     244                        // Allocation was successful
     245                        __STATS__( true, io.alloc.fast += 1; )
     246                        enable_interrupts( __cfaabi_dbg_ctx );
     247
     248                        __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful from ring %d\n", ctx->fd);
     249
     250                        __fill( sqes, want, idxs, ctx );
     251                        return ctx;
     252                }
     253                // The fast path failed, fallback
     254                __STATS__( true, io.alloc.fail += 1; )
     255
     256                // Fast path failed, fallback on arbitration
     257                __STATS__( true, io.alloc.slow += 1; )
     258                enable_interrupts( __cfaabi_dbg_ctx );
     259
    322260                $io_arbiter * ioarb = proc->cltr->io.arbiter;
    323261                /* paranoid */ verify( ioarb );
    324262
    325                 // Can we proceed to the fast path
    326                 if(  ctx                                // We alreay have an instance?
    327                 &&  !ctx->revoked )             // Our instance is still valid?
    328                 {
    329                         __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n");
    330 
    331                         // We can proceed to the fast path
    332                         if( __alloc(ctx, idxs, want) ) {
    333                                 // Allocation was successful
    334                                 // Mark the instance as no longer in-use and re-enable interrupts
    335                                 __atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE );
    336                                 __STATS__( true, io.alloc.fast += 1; )
    337                                 enable_interrupts( __cfaabi_dbg_ctx );
    338 
    339                                 __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful\n");
    340 
    341                                 __fill( sqes, want, idxs, ctx );
    342                                 return ctx;
    343                         }
    344                         // The fast path failed, fallback
    345                         __STATS__( true, io.alloc.fail += 1; )
    346                 }
    347 
    348                 // Fast path failed, fallback on arbitration
    349                 __atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE );
    350                 __STATS__( true, io.alloc.slow += 1; )
    351                 enable_interrupts( __cfaabi_dbg_ctx );
    352 
    353263                __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n");
    354264
    355265                struct $io_context * ret = __ioarbiter_allocate(*ioarb, proc, idxs, want);
    356266
    357                 __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed\n");
     267                __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd);
    358268
    359269                __fill( sqes, want, idxs,ret );
     
    364274        //=============================================================================================
    365275        // submission
    366         static inline void __submit( struct $io_context * ctx, __u32 idxs[], __u32 have) {
     276        static inline void __submit( struct $io_context * ctx, __u32 idxs[], __u32 have, bool lazy) {
    367277                // We can proceed to the fast path
    368278                // Get the right objects
    369279                __sub_ring_t & sq = ctx->sq;
    370280                const __u32 mask  = *sq.mask;
    371                 __u32 tail = sq.kring.ready;
     281                __u32 tail = *sq.kring.tail;
    372282
    373283                // Add the sqes to the array
    374284                for( i; have ) {
     285                        __cfadbg_print_safe(io, "Kernel I/O : __submit loop\n");
    375286                        sq.kring.array[ (tail + i) & mask ] = idxs[i];
    376287                }
    377288
    378289                // Make the sqes visible to the submitter
    379                 __atomic_store_n(&sq.kring.ready, tail + have, __ATOMIC_RELEASE);
    380 
    381                 // Make sure the poller is awake
    382                 __cfadbg_print_safe(io, "Kernel I/O : waking the poller\n");
    383                 post( ctx->sem );
    384         }
    385 
    386         void cfa_io_submit( struct $io_context * inctx, __u32 idxs[], __u32 have ) __attribute__((nonnull (1))) {
    387                 __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u\n", have);
     290                __atomic_store_n(sq.kring.tail, tail + have, __ATOMIC_RELEASE);
     291                sq.to_submit++;
     292
     293                ctx->proc->io.pending = true;
     294                ctx->proc->io.dirty   = true;
     295                if(sq.to_submit > 30 || !lazy) {
     296                        __cfa_io_flush( ctx->proc );
     297                }
     298        }
     299
     300        void cfa_io_submit( struct $io_context * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) {
     301                __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager");
    388302
    389303                disable_interrupts();
    390304                processor * proc = __cfaabi_tls.this_processor;
     305                $io_context * ctx = proc->io.ctx;
    391306                /* paranoid */ verify( __cfaabi_tls.this_processor );
    392                 /* paranoid */ verify( proc->io.lock == false );
    393 
    394                 __atomic_store_n( &proc->io.lock, true, __ATOMIC_SEQ_CST );
    395                 $io_context * ctx = proc->io.ctx;
     307                /* paranoid */ verify( ctx );
    396308
    397309                // Can we proceed to the fast path
    398                 if(  ctx                                // We alreay have an instance?
    399                 &&  !ctx->revoked               // Our instance is still valid?
    400                 &&   ctx == inctx )             // We have the right instance?
     310                if( ctx == inctx )              // We have the right instance?
    401311                {
    402                         __submit(ctx, idxs, have);
     312                        __submit(ctx, idxs, have, lazy);
    403313
    404314                        // Mark the instance as no longer in-use, re-enable interrupts and return
    405                         __atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE );
    406315                        __STATS__( true, io.submit.fast += 1; )
    407316                        enable_interrupts( __cfaabi_dbg_ctx );
     
    412321
    413322                // Fast path failed, fallback on arbitration
    414                 __atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE );
    415323                __STATS__( true, io.submit.slow += 1; )
    416324                enable_interrupts( __cfaabi_dbg_ctx );
     
    418326                __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n");
    419327
    420                 __ioarbiter_submit(*inctx->arbiter, inctx, idxs, have);
     328                __ioarbiter_submit(*inctx->arbiter, inctx, idxs, have, lazy);
    421329        }
    422330
    423331        //=============================================================================================
    424332        // Flushing
    425         static unsigned __flush( struct $io_context & ctx ) {
    426                 // First check for external
    427                 if( !__atomic_load_n(&ctx.ext_sq.empty, __ATOMIC_SEQ_CST) ) {
    428                         // We have external submissions, delegate to the arbiter
    429                         __ioarbiter_flush( *ctx.arbiter, &ctx );
    430                 }
    431 
    432                 __u32 tail  = *ctx.sq.kring.tail;
    433                 __u32 ready = ctx.sq.kring.ready;
    434 
    435                 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
    436                 ctx.sq.to_submit += (ready - tail);
    437                 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
    438 
    439                 if(ctx.sq.to_submit) {
    440                         __cfadbg_print_safe(io, "Kernel I/O : %u ready to submit\n", ctx.sq.to_submit);
    441                 }
    442 
    443                 __atomic_store_n(ctx.sq.kring.tail, ready, __ATOMIC_RELEASE);
    444 
    445                 return ctx.sq.to_submit;
    446         }
    447 
    448 
    449333        // Go through the ring's submit queue and release everything that has already been consumed
    450334        // by io_uring
     
    484368                // go through the range and release the sqes
    485369                for( i; count ) {
     370                        __cfadbg_print_safe(io, "Kernel I/O : release loop\n");
    486371                        __u32 idx = ctx.sq.kring.array[ (phead + i) & mask ];
    487372                        ctx.sq.free_ring.array[ (ftail + i) & mask ] = idx;
     
    499384// I/O Arbiter
    500385//=============================================================================================
    501         static inline void __revoke( $io_arbiter & this, $io_context * ctx ) {
    502                 if(ctx->revoked) return;
    503 
    504                 /* paranoid */ verify( ctx->proc );
    505                 remove( this.assigned, *ctx );
    506 
    507                 // Mark as revoked
    508                 __atomic_store_n(&ctx->revoked, true, __ATOMIC_SEQ_CST);
    509 
    510                 // Wait for the processor to no longer use it
    511                 while(ctx->proc->io.lock) Pause();
    512 
    513                 // Remove the coupling with the processor
    514                 ctx->proc->io.ctx = 0p;
    515                 ctx->proc = 0p;
    516 
    517                 // add to available contexts
    518                 addHead( this.available, *ctx );
    519         }
    520 
    521         static inline void __assign( $io_arbiter & this, $io_context * ctx, processor * proc ) {
    522                 remove( this.available, *ctx );
    523 
    524                 ctx->revoked = false;
    525                 ctx->proc = proc;
    526                 __atomic_store_n(&proc->io.ctx, ctx, __ATOMIC_SEQ_CST);
    527 
    528                 // add to assigned contexts
    529                 addTail( this.assigned, *ctx );
    530         }
    531 
    532386        static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor * proc, __u32 idxs[], __u32 want ) {
    533387                __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n");
    534 
    535                 SeqIter($io_context) iter;
    536                 $io_context & ci;
    537                 // Do we already have something available?
    538                 for( over( iter, this.available ); iter | ci;) {
    539                         __cfadbg_print_safe(io, "Kernel I/O : attempting available context\n");
    540 
    541                         $io_context * c = &ci;
    542                         if(__alloc(c, idxs, want)) {
    543                                 __assign( this, c, proc);
    544                                 return c;
    545                         }
    546                 }
    547 
    548 
    549                 // Otherwise, we have no choice but to revoke everyone to check if other instance have available data
    550                 for( over( iter, this.assigned ); iter | ci; ) {
    551                         __cfadbg_print_safe(io, "Kernel I/O : revoking context for allocation\n");
    552 
    553                         $io_context * c = &ci;
    554                         __revoke( this, c );
    555 
    556                         __STATS__( false, io.alloc.revoke += 1; )
    557 
    558                         if(__alloc(c, idxs, want)) {
    559                                 __assign( this, c, proc);
    560                                 return c;
    561                         }
    562                 }
    563 
    564                 __cfadbg_print_safe(io, "Kernel I/O : waiting for available resources\n");
    565388
    566389                __STATS__( false, io.alloc.block += 1; )
     
    577400                /* paranoid */ verify( ret );
    578401
    579                 __assign( this, this.pending.ctx, proc);
    580402                return this.pending.ctx;
     403
    581404        }
    582405
     
    586409
    587410                while( !is_empty(this.pending.blocked) ) {
     411                        __cfadbg_print_safe(io, "Kernel I/O : notifying\n");
    588412                        __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head;
    589413                        __u32 want = front( this.pending.blocked );
     
    604428
    605429        // Simply append to the pending
    606         static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * ctx, __u32 idxs[], __u32 have ) {
     430        static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * ctx, __u32 idxs[], __u32 have, bool lazy ) {
    607431                __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd);
    608432
     
    612436                __atomic_store_n( &ctx->ext_sq.empty, false, __ATOMIC_SEQ_CST );
    613437
    614                 // Wake-up the poller
    615                 post( ctx->sem );
    616 
    617438                __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have);
    618439
     
    621442
    622443                // Submit our indexes
    623                 __submit(ctx, idxs, have);
     444                __submit(ctx, idxs, have, lazy);
    624445
    625446                __cfadbg_print_safe(io, "Kernel I/O : %u submitted from arbiter\n", have);
     
    630451
    631452                __STATS__( false, io.flush.external += 1; )
    632 
    633                 __revoke( this, ctx );
    634453
    635454                __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n");
     
    643462                ctx->ext_sq.empty = true;
    644463        }
    645 
    646         void __ioarbiter_register( $io_arbiter & mutex this, $io_context & ctx ) {
    647                 __cfadbg_print_safe(io, "Kernel I/O : registering new context\n");
    648 
    649                 ctx.arbiter = &this;
    650 
    651                 // add to available contexts
    652                 addHead( this.available, ctx );
    653 
    654                 // Check if this solves pending allocations
    655                 if(this.pending.flag) {
    656                         __ioarbiter_notify( ctx );
    657                 }
    658         }
    659 
    660         void __ioarbiter_unregister( $io_arbiter & mutex this, $io_context & ctx ) {
    661                 /* paranoid */ verify( &this == ctx.arbiter );
    662 
    663                 __revoke( this, &ctx );
    664 
    665                 remove( this.available, ctx );
    666         }
    667464#endif
Note: See TracChangeset for help on using the changeset viewer.