Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    r11054eb rdddb3dd0  
    8080        };
    8181
    82         static $io_context * __ioarbiter_allocate( $io_arbiter & this, __u32 idxs[], __u32 want );
    83         static void __ioarbiter_submit( $io_context * , __u32 idxs[], __u32 have, bool lazy );
    84         static void __ioarbiter_flush ( $io_context & );
     82        static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor *, __u32 idxs[], __u32 want );
     83        static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * , __u32 idxs[], __u32 have, bool lazy );
     84        static void __ioarbiter_flush ( $io_arbiter & mutex this, $io_context * );
    8585        static inline void __ioarbiter_notify( $io_context & ctx );
    8686//=============================================================================================
     
    134134                $io_context & ctx = *proc->io.ctx;
    135135
    136                 __ioarbiter_flush( ctx );
     136                if(!ctx.ext_sq.empty) {
     137                        __ioarbiter_flush( *ctx.arbiter, &ctx );
     138                }
    137139
    138140                __STATS__( true, io.calls.flush++; )
     
    261263                __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n");
    262264
    263                 struct $io_context * ret = __ioarbiter_allocate(*ioarb, idxs, want);
     265                struct $io_context * ret = __ioarbiter_allocate(*ioarb, proc, idxs, want);
    264266
    265267                __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd);
     
    324326                __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n");
    325327
    326                 __ioarbiter_submit(inctx, idxs, have, lazy);
     328                __ioarbiter_submit(*inctx->arbiter, inctx, idxs, have, lazy);
    327329        }
    328330
     
    382384// I/O Arbiter
    383385//=============================================================================================
    384         static inline void block(__outstanding_io_queue & queue, __outstanding_io & item) {
    385                 // Lock the list, it's not thread safe
    386                 lock( queue.lock __cfaabi_dbg_ctx2 );
    387                 {
    388                         // Add our request to the list
    389                         add( queue.queue, item );
    390 
    391                         // Mark as pending
    392                         __atomic_store_n( &queue.empty, false, __ATOMIC_SEQ_CST );
    393                 }
    394                 unlock( queue.lock );
    395 
    396                 wait( item.sem );
    397         }
    398 
    399         static inline bool empty(__outstanding_io_queue & queue ) {
    400                 return __atomic_load_n( &queue.empty, __ATOMIC_SEQ_CST);
    401         }
    402 
    403         static $io_context * __ioarbiter_allocate( $io_arbiter & this, __u32 idxs[], __u32 want ) {
     386        static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor * proc, __u32 idxs[], __u32 want ) {
    404387                __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n");
    405388
     
    407390
    408391                // No one has any resources left, wait for something to finish
    409                 // We need to add ourself to a list of pending allocs and wait for an answer
    410                 __pending_alloc pa;
    411                 pa.idxs = idxs;
    412                 pa.want = want;
    413 
    414                 block(this.pending, (__outstanding_io&)pa);
    415 
    416                 return pa.ctx;
    417 
    418         }
    419 
    420         static void __ioarbiter_notify( $io_arbiter & this, $io_context * ctx ) {
    421                 /* paranoid */ verify( !empty(this.pending.queue) );
    422 
    423                 lock( this.pending.lock __cfaabi_dbg_ctx2 );
    424                 {
    425                         while( !empty(this.pending.queue) ) {
    426                                 __cfadbg_print_safe(io, "Kernel I/O : notifying\n");
    427                                 __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head;
    428                                 __pending_alloc & pa = (__pending_alloc&)head( this.pending.queue );
    429 
    430                                 if( have > pa.want ) goto DONE;
    431                                 drop( this.pending.queue );
    432 
    433                                 /* paranoid */__attribute__((unused)) bool ret =
    434 
    435                                 __alloc(ctx, pa.idxs, pa.want);
    436 
    437                                 /* paranoid */ verify( ret );
    438 
    439                                 pa.ctx = ctx;
    440 
    441                                 post( pa.sem );
    442                         }
    443 
    444                         this.pending.empty = true;
    445                         DONE:;
    446                 }
    447                 unlock( this.pending.lock );
     392                // Mark as pending
     393                __atomic_store_n( &this.pending.flag, true, __ATOMIC_SEQ_CST );
     394
     395                // Wait for our turn to submit
     396                wait( this.pending.blocked, want );
     397
     398                __attribute((unused)) bool ret =
     399                __alloc( this.pending.ctx, idxs, want);
     400                /* paranoid */ verify( ret );
     401
     402                return this.pending.ctx;
     403
     404        }
     405
     406        static void __ioarbiter_notify( $io_arbiter & mutex this, $io_context * ctx ) {
     407                /* paranoid */ verify( !is_empty(this.pending.blocked) );
     408                this.pending.ctx = ctx;
     409
     410                while( !is_empty(this.pending.blocked) ) {
     411                        __cfadbg_print_safe(io, "Kernel I/O : notifying\n");
     412                        __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head;
     413                        __u32 want = front( this.pending.blocked );
     414
     415                        if( have > want ) return;
     416
     417                        signal_block( this.pending.blocked );
     418                }
     419
     420                this.pending.flag = false;
    448421        }
    449422
    450423        static void __ioarbiter_notify( $io_context & ctx ) {
    451                 if(!empty( ctx.arbiter->pending )) {
     424                if(__atomic_load_n( &ctx.arbiter->pending.flag, __ATOMIC_SEQ_CST)) {
    452425                        __ioarbiter_notify( *ctx.arbiter, &ctx );
    453426                }
     
    455428
    456429        // Simply append to the pending
    457         static void __ioarbiter_submit( $io_context * ctx, __u32 idxs[], __u32 have, bool lazy ) {
     430        static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * ctx, __u32 idxs[], __u32 have, bool lazy ) {
    458431                __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd);
    459432
     433                /* paranoid */ verify( &this == ctx->arbiter );
     434
     435                // Mark as pending
     436                __atomic_store_n( &ctx->ext_sq.empty, false, __ATOMIC_SEQ_CST );
     437
    460438                __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have);
    461439
    462                 __external_io ei;
    463                 ei.idxs = idxs;
    464                 ei.have = have;
    465                 ei.lazy = lazy;
    466 
    467                 block(ctx->ext_sq, (__outstanding_io&)ei);
     440                // Wait for our turn to submit
     441                wait( ctx->ext_sq.blocked );
     442
     443                // Submit our indexes
     444                __submit(ctx, idxs, have, lazy);
    468445
    469446                __cfadbg_print_safe(io, "Kernel I/O : %u submitted from arbiter\n", have);
    470447        }
    471448
    472         static void __ioarbiter_flush( $io_context & ctx ) {
    473                 if(!empty( ctx.ext_sq )) {
    474                         __STATS__( false, io.flush.external += 1; )
    475 
    476                         __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n");
    477 
    478                         lock( ctx.ext_sq.lock __cfaabi_dbg_ctx2 );
    479                         {
    480                                 while( !empty(ctx.ext_sq.queue) ) {
    481                                         __external_io & ei = (__external_io&)drop( ctx.ext_sq.queue );
    482 
    483                                         __submit(&ctx, ei.idxs, ei.have, ei.lazy);
    484 
    485                                         post( ei.sem );
    486                                 }
    487 
    488                                 ctx.ext_sq.empty = true;
    489                         }
    490                         unlock(ctx.ext_sq.lock );
    491                 }
     449        static void __ioarbiter_flush( $io_arbiter & mutex this, $io_context * ctx ) {
     450                /* paranoid */ verify( &this == ctx->arbiter );
     451
     452                __STATS__( false, io.flush.external += 1; )
     453
     454                __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n");
     455
     456                condition & blcked = ctx->ext_sq.blocked;
     457                /* paranoid */ verify( ctx->ext_sq.empty == is_empty( blcked ) );
     458                while(!is_empty( blcked )) {
     459                        signal_block( blcked );
     460                }
     461
     462                ctx->ext_sq.empty = true;
    492463        }
    493464#endif
Note: See TracChangeset for help on using the changeset viewer.