Changeset 11054eb for libcfa/src


Ignore:
Timestamp:
Mar 17, 2021, 4:52:22 PM (4 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
5c2b454
Parents:
c407434e
Message:

Fix io to no longer use monitors since some usages aren't in threads

Location:
libcfa/src/concurrency
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    rc407434e r11054eb  
    8080        };
    8181
    82         static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor *, __u32 idxs[], __u32 want );
    83         static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * , __u32 idxs[], __u32 have, bool lazy );
    84         static void __ioarbiter_flush ( $io_arbiter & mutex this, $io_context * );
     82        static $io_context * __ioarbiter_allocate( $io_arbiter & this, __u32 idxs[], __u32 want );
     83        static void __ioarbiter_submit( $io_context * , __u32 idxs[], __u32 have, bool lazy );
     84        static void __ioarbiter_flush ( $io_context & );
    8585        static inline void __ioarbiter_notify( $io_context & ctx );
    8686//=============================================================================================
     
    134134                $io_context & ctx = *proc->io.ctx;
    135135
    136                 if(!ctx.ext_sq.empty) {
    137                         __ioarbiter_flush( *ctx.arbiter, &ctx );
    138                 }
     136                __ioarbiter_flush( ctx );
    139137
    140138                __STATS__( true, io.calls.flush++; )
     
    263261                __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n");
    264262
    265                 struct $io_context * ret = __ioarbiter_allocate(*ioarb, proc, idxs, want);
     263                struct $io_context * ret = __ioarbiter_allocate(*ioarb, idxs, want);
    266264
    267265                __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd);
     
    326324                __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n");
    327325
    328                 __ioarbiter_submit(*inctx->arbiter, inctx, idxs, have, lazy);
     326                __ioarbiter_submit(inctx, idxs, have, lazy);
    329327        }
    330328
     
    384382// I/O Arbiter
    385383//=============================================================================================
    386         static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor * proc, __u32 idxs[], __u32 want ) {
     384        static inline void block(__outstanding_io_queue & queue, __outstanding_io & item) {
     385                // Lock the list, it's not thread safe
     386                lock( queue.lock __cfaabi_dbg_ctx2 );
     387                {
     388                        // Add our request to the list
     389                        add( queue.queue, item );
     390
     391                        // Mark as pending
     392                        __atomic_store_n( &queue.empty, false, __ATOMIC_SEQ_CST );
     393                }
     394                unlock( queue.lock );
     395
     396                wait( item.sem );
     397        }
     398
     399        static inline bool empty(__outstanding_io_queue & queue ) {
     400                return __atomic_load_n( &queue.empty, __ATOMIC_SEQ_CST);
     401        }
     402
     403        static $io_context * __ioarbiter_allocate( $io_arbiter & this, __u32 idxs[], __u32 want ) {
    387404                __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n");
    388405
     
    390407
    391408                // No one has any resources left, wait for something to finish
    392                 // Mark as pending
    393                 __atomic_store_n( &this.pending.flag, true, __ATOMIC_SEQ_CST );
    394 
    395                 // Wait for our turn to submit
    396                 wait( this.pending.blocked, want );
    397 
    398                 __attribute((unused)) bool ret =
    399                 __alloc( this.pending.ctx, idxs, want);
    400                 /* paranoid */ verify( ret );
    401 
    402                 return this.pending.ctx;
    403 
    404         }
    405 
    406         static void __ioarbiter_notify( $io_arbiter & mutex this, $io_context * ctx ) {
    407                 /* paranoid */ verify( !is_empty(this.pending.blocked) );
    408                 this.pending.ctx = ctx;
    409 
    410                 while( !is_empty(this.pending.blocked) ) {
    411                         __cfadbg_print_safe(io, "Kernel I/O : notifying\n");
    412                         __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head;
    413                         __u32 want = front( this.pending.blocked );
    414 
    415                         if( have > want ) return;
    416 
    417                         signal_block( this.pending.blocked );
    418                 }
    419 
    420                 this.pending.flag = false;
     409                // We need to add ourself to a list of pending allocs and wait for an answer
     410                __pending_alloc pa;
     411                pa.idxs = idxs;
     412                pa.want = want;
     413
     414                block(this.pending, (__outstanding_io&)pa);
     415
     416                return pa.ctx;
     417
     418        }
     419
     420        static void __ioarbiter_notify( $io_arbiter & this, $io_context * ctx ) {
     421                /* paranoid */ verify( !empty(this.pending.queue) );
     422
     423                lock( this.pending.lock __cfaabi_dbg_ctx2 );
     424                {
     425                        while( !empty(this.pending.queue) ) {
     426                                __cfadbg_print_safe(io, "Kernel I/O : notifying\n");
     427                                __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head;
     428                                __pending_alloc & pa = (__pending_alloc&)head( this.pending.queue );
     429
     430                                if( have > pa.want ) goto DONE;
     431                                drop( this.pending.queue );
     432
     433                                /* paranoid */__attribute__((unused)) bool ret =
     434
     435                                __alloc(ctx, pa.idxs, pa.want);
     436
     437                                /* paranoid */ verify( ret );
     438
     439                                pa.ctx = ctx;
     440
     441                                post( pa.sem );
     442                        }
     443
     444                        this.pending.empty = true;
     445                        DONE:;
     446                }
     447                unlock( this.pending.lock );
    421448        }
    422449
    423450        static void __ioarbiter_notify( $io_context & ctx ) {
    424                 if(__atomic_load_n( &ctx.arbiter->pending.flag, __ATOMIC_SEQ_CST)) {
     451                if(!empty( ctx.arbiter->pending )) {
    425452                        __ioarbiter_notify( *ctx.arbiter, &ctx );
    426453                }
     
    428455
    429456        // Simply append to the pending
    430         static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * ctx, __u32 idxs[], __u32 have, bool lazy ) {
     457        static void __ioarbiter_submit( $io_context * ctx, __u32 idxs[], __u32 have, bool lazy ) {
    431458                __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd);
    432459
    433                 /* paranoid */ verify( &this == ctx->arbiter );
    434 
    435                 // Mark as pending
    436                 __atomic_store_n( &ctx->ext_sq.empty, false, __ATOMIC_SEQ_CST );
    437 
    438460                __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have);
    439461
    440                 // Wait for our turn to submit
    441                 wait( ctx->ext_sq.blocked );
    442 
    443                 // Submit our indexes
    444                 __submit(ctx, idxs, have, lazy);
     462                __external_io ei;
     463                ei.idxs = idxs;
     464                ei.have = have;
     465                ei.lazy = lazy;
     466
     467                block(ctx->ext_sq, (__outstanding_io&)ei);
    445468
    446469                __cfadbg_print_safe(io, "Kernel I/O : %u submitted from arbiter\n", have);
    447470        }
    448471
    449         static void __ioarbiter_flush( $io_arbiter & mutex this, $io_context * ctx ) {
    450                 /* paranoid */ verify( &this == ctx->arbiter );
    451 
    452                 __STATS__( false, io.flush.external += 1; )
    453 
    454                 __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n");
    455 
    456                 condition & blcked = ctx->ext_sq.blocked;
    457                 /* paranoid */ verify( ctx->ext_sq.empty == is_empty( blcked ) );
    458                 while(!is_empty( blcked )) {
    459                         signal_block( blcked );
    460                 }
    461 
    462                 ctx->ext_sq.empty = true;
     472        static void __ioarbiter_flush( $io_context & ctx ) {
     473                if(!empty( ctx.ext_sq )) {
     474                        __STATS__( false, io.flush.external += 1; )
     475
     476                        __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n");
     477
     478                        lock( ctx.ext_sq.lock __cfaabi_dbg_ctx2 );
     479                        {
     480                                while( !empty(ctx.ext_sq.queue) ) {
     481                                        __external_io & ei = (__external_io&)drop( ctx.ext_sq.queue );
     482
     483                                        __submit(&ctx, ei.idxs, ei.have, ei.lazy);
     484
     485                                        post( ei.sem );
     486                                }
     487
     488                                ctx.ext_sq.empty = true;
     489                        }
     490                        unlock(ctx.ext_sq.lock );
     491                }
    463492        }
    464493#endif
  • libcfa/src/concurrency/io/setup.cfa

    rc407434e r11054eb  
    110110                this.arbiter = cl.io.arbiter;
    111111                this.ext_sq.empty = true;
    112                 (this.ext_sq.blocked){};
     112                (this.ext_sq.queue){};
    113113                __io_uring_setup( this, cl.io.params, proc->idle );
    114114                __cfadbg_print_safe(io_core, "Kernel I/O : Created ring for io_context %u (%p)\n", this.fd, &this);
     
    329329//=============================================================================================
    330330        void ?{}( $io_arbiter & this ) {
    331                 this.pending.flag = false;
    332         }
    333 
    334         void ^?{}( $io_arbiter & mutex this ) {
    335                 // /* paranoid */ verify( empty(this.assigned) );
    336                 // /* paranoid */ verify( empty(this.available) );
    337                 /* paranoid */ verify( is_empty(this.pending.blocked) );
    338         }
     331                this.pending.empty = true;
     332        }
     333
     334        void ^?{}( $io_arbiter & this ) {}
    339335
    340336        $io_arbiter * create(void) {
  • libcfa/src/concurrency/io/types.hfa

    rc407434e r11054eb  
    2222
    2323#include "bits/locks.hfa"
     24#include "bits/queue.hfa"
    2425#include "kernel/fwd.hfa"
    2526
     
    9596        };
    9697
     98        struct __outstanding_io {
     99                inline Colable;
     100                single_sem sem;
     101        };
     102        static inline __outstanding_io *& Next( __outstanding_io * n ) { return (__outstanding_io *)Next( (Colable *)n ); }
     103
     104        struct __outstanding_io_queue {
     105                __spinlock_t lock;
     106                Queue(__outstanding_io) queue;
     107                volatile bool empty;
     108        };
     109
     110        struct __external_io {
     111                inline __outstanding_io;
     112                __u32 * idxs;
     113                __u32 have;
     114                bool lazy;
     115        };
     116
     117
    97118        struct __attribute__((aligned(128))) $io_context {
    98119                $io_arbiter * arbiter;
    99120                processor * proc;
    100121
    101                 struct {
    102                         volatile bool empty;
    103                         condition blocked;
    104                 } ext_sq;
     122                __outstanding_io_queue ext_sq;
    105123
    106124                struct __sub_ring_t sq;
     
    110128        };
    111129
    112         monitor __attribute__((aligned(128))) $io_arbiter {
    113                 struct {
    114                         condition blocked;
    115                         $io_context * ctx;
    116                         volatile bool flag;
    117                 } pending;
     130        struct __pending_alloc {
     131                inline __outstanding_io;
     132                __u32 * idxs;
     133                __u32 want;
     134                $io_context * ctx;
     135        };
     136
     137        struct __attribute__((aligned(128))) $io_arbiter {
     138                __outstanding_io_queue pending;
    118139        };
    119140
Note: See TracChangeset for help on using the changeset viewer.