Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    r93526ef reafec07  
    4141        #include "kernel/fwd.hfa"
    4242        #include "io/types.hfa"
    43 
    44         // returns true of acquired as leader or second leader
    45         static inline bool try_lock( __leaderlock_t & this ) {
    46                 const uintptr_t thrd = 1z | (uintptr_t)active_thread();
    47                 bool block;
    48                 disable_interrupts();
    49                 for() {
    50                         struct $thread * expected = this.value;
    51                         if( 1p != expected && 0p != expected ) {
    52                                 /* paranoid */ verify( thrd != (uintptr_t)expected ); // We better not already be the next leader
    53                                 enable_interrupts( __cfaabi_dbg_ctx );
    54                                 return false;
    55                         }
    56                         struct $thread * desired;
    57                         if( 0p == expected ) {
    58                                 // If the lock isn't locked acquire it, no need to block
    59                                 desired = 1p;
    60                                 block = false;
    61                         }
    62                         else {
    63                                 // If the lock is already locked try becomming the next leader
    64                                 desired = (struct $thread *)thrd;
    65                                 block = true;
    66                         }
    67                         if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break;
    68                 }
    69                 if( block ) {
    70                         enable_interrupts( __cfaabi_dbg_ctx );
    71                         park( __cfaabi_dbg_ctx );
    72                         disable_interrupts();
    73                 }
    74                 return true;
    75         }
    76 
    77         static inline bool next( __leaderlock_t & this ) {
    78                 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    79                 struct $thread * nextt;
    80                 for() {
    81                         struct $thread * expected = this.value;
    82                         /* paranoid */ verify( (1 & (uintptr_t)expected) == 1 ); // The lock better be locked
    83 
    84                         struct $thread * desired;
    85                         if( 1p == expected ) {
    86                                 // No next leader, just unlock
    87                                 desired = 0p;
    88                                 nextt   = 0p;
    89                         }
    90                         else {
    91                                 // There is a next leader, remove but keep locked
    92                                 desired = 1p;
    93                                 nextt   = (struct $thread *)(~1z & (uintptr_t)expected);
    94                         }
    95                         if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break;
    96                 }
    97 
    98                 if(nextt) {
    99                         unpark( nextt __cfaabi_dbg_ctx2 );
    100                         enable_interrupts( __cfaabi_dbg_ctx );
    101                         return true;
    102                 }
    103                 enable_interrupts( __cfaabi_dbg_ctx );
    104                 return false;
    105         }
    10643
    10744//=============================================================================================
     
    15693//=============================================================================================
    15794        static unsigned __collect_submitions( struct __io_data & ring );
    158         static __u32 __release_consumed_submission( struct __io_data & ring );
     95        static uint32_t __release_consumed_submission( struct __io_data & ring );
    15996
    16097        static inline void process(struct io_uring_cqe & cqe ) {
     
    163100
    164101                data->result = cqe.res;
    165                 post( data->sem );
     102                unpark( data->thrd __cfaabi_dbg_ctx2 );
    166103        }
    167104
     
    199136                unsigned head = *ring.completion_q.head;
    200137                unsigned tail = *ring.completion_q.tail;
    201                 const __u32 mask = *ring.completion_q.mask;
     138                const uint32_t mask = *ring.completion_q.mask;
    202139
    203140                // Nothing was new return 0
     
    206143                }
    207144
    208                 __u32 count = tail - head;
     145                uint32_t count = tail - head;
    209146                /* paranoid */ verify( count != 0 );
    210147                for(i; count) {
     
    245182                                __STATS__( true,
    246183                                        io.complete_q.completed_avg.val += count;
    247                                         io.complete_q.completed_avg.cnt += 1;
     184                                        io.complete_q.completed_avg.fast_cnt += 1;
    248185                                )
    249186                        enable_interrupts( __cfaabi_dbg_ctx );
     
    255192                        // We didn't get anything baton pass to the slow poller
    256193                        else {
    257                                 __STATS__( false,
    258                                         io.complete_q.blocks += 1;
    259                                 )
    260194                                __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self);
    261195                                reset = 0;
     
    290224//
    291225
    292         [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) {
     226        [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) {
    293227                /* paranoid */ verify( data != 0 );
    294228
     
    296230                __attribute((unused)) int len   = 0;
    297231                __attribute((unused)) int block = 0;
    298                 __u32 cnt = *ring.submit_q.num;
    299                 __u32 mask = *ring.submit_q.mask;
     232                uint32_t cnt = *ring.submit_q.num;
     233                uint32_t mask = *ring.submit_q.mask;
    300234
    301235                disable_interrupts();
    302                         __u32 off = __tls_rand();
     236                        uint32_t off = __tls_rand();
    303237                enable_interrupts( __cfaabi_dbg_ctx );
    304238
     
    307241                        // Look through the list starting at some offset
    308242                        for(i; cnt) {
    309                                 __u64 expected = 0;
    310                                 __u32 idx = (i + off) & mask;
     243                                uint64_t expected = 0;
     244                                uint32_t idx = (i + off) & mask;
    311245                                struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
    312                                 volatile __u64 * udata = &sqe->user_data;
     246                                volatile uint64_t * udata = (volatile uint64_t *)&sqe->user_data;
    313247
    314248                                if( *udata == expected &&
     
    336270        }
    337271
    338         static inline __u32 __submit_to_ready_array( struct __io_data & ring, __u32 idx, const __u32 mask ) {
     272        static inline uint32_t __submit_to_ready_array( struct __io_data & ring, uint32_t idx, const uint32_t mask ) {
    339273                /* paranoid */ verify( idx <= mask   );
    340274                /* paranoid */ verify( idx != -1ul32 );
     
    343277                __attribute((unused)) int len   = 0;
    344278                __attribute((unused)) int block = 0;
    345                 __u32 ready_mask = ring.submit_q.ready_cnt - 1;
     279                uint32_t ready_mask = ring.submit_q.ready_cnt - 1;
    346280
    347281                disable_interrupts();
    348                         __u32 off = __tls_rand();
     282                        uint32_t off = __tls_rand();
    349283                enable_interrupts( __cfaabi_dbg_ctx );
    350284
    351                 __u32 picked;
     285                uint32_t picked;
    352286                LOOKING: for() {
    353287                        for(i; ring.submit_q.ready_cnt) {
    354288                                picked = (i + off) & ready_mask;
    355                                 __u32 expected = -1ul32;
     289                                uint32_t expected = -1ul32;
    356290                                if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
    357291                                        break LOOKING;
     
    363297
    364298                        block++;
    365 
    366                         __u32 released = __release_consumed_submission( ring );
    367                         if( released == 0 ) {
     299                        if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
     300                                __release_consumed_submission( ring );
     301                                unlock( ring.submit_q.lock );
     302                        }
     303                        else {
    368304                                yield();
    369305                        }
     
    380316        }
    381317
    382         void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) {
     318        void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1))) {
    383319                __io_data & ring = *ctx->thrd.ring;
    384320                // Get now the data we definetely need
    385                 volatile __u32 * const tail = ring.submit_q.tail;
    386                 const __u32 mask  = *ring.submit_q.mask;
     321                volatile uint32_t * const tail = ring.submit_q.tail;
     322                const uint32_t mask  = *ring.submit_q.mask;
    387323
    388324                // There are 2 submission schemes, check which one we are using
     
    396332                }
    397333                else if( ring.eager_submits ) {
    398                         __u32 picked = __submit_to_ready_array( ring, idx, mask );
    399 
    400                         #if defined(LEADER_LOCK)
    401                                 if( !try_lock(ring.submit_q.submit_lock) ) {
     334                        uint32_t picked = __submit_to_ready_array( ring, idx, mask );
     335
     336                        for() {
     337                                yield();
     338
     339                                // If some one else collected our index, we are done
     340                                #warning ABA problem
     341                                if( ring.submit_q.ready[picked] != idx ) {
    402342                                        __STATS__( false,
    403343                                                io.submit_q.helped += 1;
     
    405345                                        return;
    406346                                }
    407                                 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    408                                 __STATS__( true,
    409                                         io.submit_q.leader += 1;
     347
     348                                if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
     349                                        __STATS__( false,
     350                                                io.submit_q.leader += 1;
     351                                        )
     352                                        break;
     353                                }
     354
     355                                __STATS__( false,
     356                                        io.submit_q.busy += 1;
    410357                                )
    411                         #else
    412                                 for() {
    413                                         yield();
    414 
    415                                         if( try_lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2) ) {
    416                                                 __STATS__( false,
    417                                                         io.submit_q.leader += 1;
    418                                                 )
    419                                                 break;
    420                                         }
    421 
    422                                         // If some one else collected our index, we are done
    423                                         #warning ABA problem
    424                                         if( ring.submit_q.ready[picked] != idx ) {
    425                                                 __STATS__( false,
    426                                                         io.submit_q.helped += 1;
    427                                                 )
    428                                                 return;
    429                                         }
    430 
    431                                         __STATS__( false,
    432                                                 io.submit_q.busy += 1;
    433                                         )
    434                                 }
    435                         #endif
     358                        }
    436359
    437360                        // We got the lock
    438                         // Collect the submissions
    439361                        unsigned to_submit = __collect_submitions( ring );
    440 
    441                         // Actually submit
    442362                        int ret = __io_uring_enter( ring, to_submit, false );
    443 
    444                         #if defined(LEADER_LOCK)
    445                                 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    446                                 next(ring.submit_q.submit_lock);
    447                         #else
    448                                 unlock(ring.submit_q.submit_lock);
    449                         #endif
    450                         if( ret < 0 ) return;
     363                        if( ret < 0 ) {
     364                                unlock(ring.submit_q.lock);
     365                                return;
     366                        }
     367
     368                        /* paranoid */ verify( ret > 0 || to_submit == 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) );
    451369
    452370                        // Release the consumed SQEs
     
    454372
    455373                        // update statistics
    456                         __STATS__( false,
     374                        __STATS__( true,
    457375                                io.submit_q.submit_avg.rdy += to_submit;
    458376                                io.submit_q.submit_avg.csm += ret;
    459377                                io.submit_q.submit_avg.cnt += 1;
    460378                        )
     379
     380                        unlock(ring.submit_q.lock);
    461381                }
    462382                else {
    463383                        // get mutual exclusion
    464                         #if defined(LEADER_LOCK)
    465                                 while(!try_lock(ring.submit_q.submit_lock));
    466                         #else
    467                                 lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2);
    468                         #endif
     384                        lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
    469385
    470386                        /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
     
    504420                        __release_consumed_submission( ring );
    505421
    506                         #if defined(LEADER_LOCK)
    507                                 next(ring.submit_q.submit_lock);
    508                         #else
    509                                 unlock(ring.submit_q.submit_lock);
    510                         #endif
     422                        unlock(ring.submit_q.lock);
    511423
    512424                        __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
     
    514426        }
    515427
    516         // #define PARTIAL_SUBMIT 32
    517428        static unsigned __collect_submitions( struct __io_data & ring ) {
    518429                /* paranoid */ verify( ring.submit_q.ready != 0p );
     
    520431
    521432                unsigned to_submit = 0;
    522                 __u32 tail = *ring.submit_q.tail;
    523                 const __u32 mask = *ring.submit_q.mask;
    524                 #if defined(PARTIAL_SUBMIT)
    525                         #if defined(LEADER_LOCK)
    526                                 #error PARTIAL_SUBMIT and LEADER_LOCK cannot co-exist
    527                         #endif
    528                         const __u32 cnt = ring.submit_q.ready_cnt > PARTIAL_SUBMIT ? PARTIAL_SUBMIT : ring.submit_q.ready_cnt;
    529                         const __u32 offset = ring.submit_q.prev_ready;
    530                         ring.submit_q.prev_ready += cnt;
    531                 #else
    532                         const __u32 cnt = ring.submit_q.ready_cnt;
    533                         const __u32 offset = 0;
    534                 #endif
     433                uint32_t tail = *ring.submit_q.tail;
     434                const uint32_t mask = *ring.submit_q.mask;
    535435
    536436                // Go through the list of ready submissions
    537                 for( c; cnt ) {
    538                         __u32 i = (offset + c) % ring.submit_q.ready_cnt;
    539 
     437                for( i; ring.submit_q.ready_cnt ) {
    540438                        // replace any submission with the sentinel, to consume it.
    541                         __u32 idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
     439                        uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
    542440
    543441                        // If it was already the sentinel, then we are done
     
    555453        }
    556454
    557         static __u32 __release_consumed_submission( struct __io_data & ring ) {
    558                 const __u32 smask = *ring.submit_q.mask;
     455        static uint32_t __release_consumed_submission( struct __io_data & ring ) {
     456                const uint32_t smask = *ring.submit_q.mask;
    559457
    560458                if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
    561                 __u32 chead = *ring.submit_q.head;
    562                 __u32 phead = ring.submit_q.prev_head;
     459                uint32_t chead = *ring.submit_q.head;
     460                uint32_t phead = ring.submit_q.prev_head;
    563461                ring.submit_q.prev_head = chead;
    564462                unlock(ring.submit_q.release_lock);
    565463
    566                 __u32 count = chead - phead;
     464                uint32_t count = chead - phead;
    567465                for( i; count ) {
    568                         __u32 idx = ring.submit_q.array[ (phead + i) & smask ];
     466                        uint32_t idx = ring.submit_q.array[ (phead + i) & smask ];
    569467                        ring.submit_q.sqes[ idx ].user_data = 0;
    570468                }
Note: See TracChangeset for help on using the changeset viewer.