Ignore:
Timestamp:
May 21, 2020, 5:06:14 PM (4 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
99fea48
Parents:
d47349b
Message:

Wrote proper allocator for SQEs

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    rd47349b r6f121b8  
    124124
    125125                // Like head/tail but not seen by the kernel
    126                 volatile uint32_t alloc;
    127126                volatile uint32_t * ready;
    128127                uint32_t ready_cnt;
     
    141140                        struct {
    142141                                struct {
    143                                         volatile unsigned long long int val;
     142                                        volatile unsigned long long int rdy;
     143                                        volatile unsigned long long int csm;
     144                                        volatile unsigned long long int avl;
    144145                                        volatile unsigned long long int cnt;
    145                                         volatile unsigned long long int block;
    146146                                } submit_avg;
    147147                                struct {
     
    150150                                        volatile unsigned long long int block;
    151151                                } look_avg;
     152                                struct {
     153                                        volatile unsigned long long int val;
     154                                        volatile unsigned long long int cnt;
     155                                        volatile unsigned long long int block;
     156                                } alloc_avg;
    152157                        } stats;
    153158                #endif
     
    279284                sq.dropped = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
    280285                sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
    281                 sq.alloc = *sq.tail;
     286
     287                {
     288                        const uint32_t num = *sq.num;
     289                        for( i; num ) {
     290                                sq.sqes[i].user_data = 0ul64;
     291                        }
     292                }
    282293
    283294                if( io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
     
    322333                // Initialize statistics
    323334                #if !defined(__CFA_NO_STATISTICS__)
    324                         this.io->submit_q.stats.submit_avg.val   = 0;
    325                         this.io->submit_q.stats.submit_avg.cnt   = 0;
    326                         this.io->submit_q.stats.submit_avg.block = 0;
     335                        this.io->submit_q.stats.submit_avg.rdy = 0;
     336                        this.io->submit_q.stats.submit_avg.csm = 0;
     337                        this.io->submit_q.stats.submit_avg.avl = 0;
     338                        this.io->submit_q.stats.submit_avg.cnt = 0;
    327339                        this.io->submit_q.stats.look_avg.val   = 0;
    328340                        this.io->submit_q.stats.look_avg.cnt   = 0;
    329341                        this.io->submit_q.stats.look_avg.block = 0;
     342                        this.io->submit_q.stats.alloc_avg.val   = 0;
     343                        this.io->submit_q.stats.alloc_avg.cnt   = 0;
     344                        this.io->submit_q.stats.alloc_avg.block = 0;
    330345                        this.io->completion_q.stats.completed_avg.val = 0;
    331346                        this.io->completion_q.stats.completed_avg.slow_cnt = 0;
     
    383398                                        this.ready_queue.head = 1p;
    384399                                        thrd.next = 0p;
     400                                        __cfaabi_dbg_debug_do( thrd.unpark_stale = true );
    385401
    386402                                        // Fixup the thread state
     
    425441                        if(this.print_stats) {
    426442                                with(this.io->submit_q.stats, this.io->completion_q.stats) {
     443                                        double avgrdy = ((double)submit_avg.rdy) / submit_avg.cnt;
     444                                        double avgcsm = ((double)submit_avg.csm) / submit_avg.cnt;
     445                                        double avgavl = ((double)submit_avg.avl) / submit_avg.cnt;
     446
    427447                                        double lavgv = 0;
    428448                                        double lavgb = 0;
     
    432452                                        }
    433453
     454                                        double aavgv = 0;
     455                                        double aavgb = 0;
     456                                        if(alloc_avg.cnt != 0) {
     457                                                aavgv = ((double)alloc_avg.val  ) / alloc_avg.cnt;
     458                                                aavgb = ((double)alloc_avg.block) / alloc_avg.cnt;
     459                                        }
     460
    434461                                        __cfaabi_bits_print_safe( STDOUT_FILENO,
    435462                                                "----- I/O uRing Stats -----\n"
    436463                                                "- total submit calls     : %'15llu\n"
    437                                                 "- avg submit             : %'18.2lf\n"
    438                                                 "- pre-submit block %%     : %'18.2lf\n"
     464                                                "- avg ready entries      : %'18.2lf\n"
     465                                                "- avg submitted entries  : %'18.2lf\n"
     466                                                "- avg available entries  : %'18.2lf\n"
    439467                                                "- total ready search     : %'15llu\n"
    440468                                                "- avg ready search len   : %'18.2lf\n"
    441469                                                "- avg ready search block : %'18.2lf\n"
     470                                                "- total alloc search     : %'15llu\n"
     471                                                "- avg alloc search len   : %'18.2lf\n"
     472                                                "- avg alloc search block : %'18.2lf\n"
    442473                                                "- total wait calls       : %'15llu   (%'llu slow, %'llu fast)\n"
    443474                                                "- avg completion/wait    : %'18.2lf\n",
    444475                                                submit_avg.cnt,
    445                                                 ((double)submit_avg.val) / submit_avg.cnt,
    446                                                 (100.0 * submit_avg.block) / submit_avg.cnt,
     476                                                avgrdy,
     477                                                avgcsm,
     478                                                avgavl,
    447479                                                look_avg.cnt,
    448480                                                lavgv,
    449481                                                lavgb,
     482                                                alloc_avg.cnt,
     483                                                aavgv,
     484                                                aavgb,
    450485                                                completed_avg.slow_cnt + completed_avg.fast_cnt,
    451486                                                completed_avg.slow_cnt,  completed_avg.fast_cnt,
     
    493528
    494529                        // If the poller thread also submits, then we need to aggregate the submissions which are ready
    495                         uint32_t * tail = ring.submit_q.tail;
     530                        uint32_t tail = *ring.submit_q.tail;
    496531                        const uint32_t mask = *ring.submit_q.mask;
    497532
     
    505540
    506541                                // If we got a real submission, append it to the list
    507                                 ring.submit_q.array[ ((*tail) + to_submit) & mask ] = idx & mask;
     542                                ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
    508543                                to_submit++;
    509544                        }
    510545
    511546                        // Increment the tail based on how many we are ready to submit
    512                         __atomic_fetch_add(tail, to_submit, __ATOMIC_SEQ_CST);
    513 
    514                         // update statistics
    515                         #if !defined(__CFA_NO_STATISTICS__)
    516                                 ring.submit_q.stats.submit_avg.val += to_submit;
    517                                 ring.submit_q.stats.submit_avg.cnt += 1;
    518                         #endif
    519                 }
    520 
     547                        __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
     548                }
     549
     550                const uint32_t smask = *ring.submit_q.mask;
     551                uint32_t shead = *ring.submit_q.head;
    521552                int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
    522553                if( ret < 0 ) {
     
    530561                }
    531562
     563                verify( (shead + ret) == *ring.submit_q.head );
     564
     565                // Release the consumed SQEs
     566                for( i; ret ) {
     567                        uint32_t idx = ring.submit_q.array[ (i + shead) & smask ];
     568                        ring.submit_q.sqes[ idx ].user_data = 0;
     569                }
     570
     571                uint32_t avail = 0;
     572                uint32_t sqe_num = *ring.submit_q.num;
     573                for(i; sqe_num) {
     574                        if( ring.submit_q.sqes[ i ].user_data == 0 ) avail++;
     575                }
     576
     577                // update statistics
     578                #if !defined(__CFA_NO_STATISTICS__)
     579                        ring.submit_q.stats.submit_avg.rdy += to_submit;
     580                        ring.submit_q.stats.submit_avg.csm += ret;
     581                        ring.submit_q.stats.submit_avg.avl += avail;
     582                        ring.submit_q.stats.submit_avg.cnt += 1;
     583                #endif
     584
    532585                // Drain the queue
    533586                unsigned head = *ring.completion_q.head;
    534                 unsigned tail = __atomic_load_n(ring.completion_q.tail, __ATOMIC_ACQUIRE);
     587                unsigned tail = *ring.completion_q.tail;
     588                const uint32_t mask = *ring.completion_q.mask;
     589
     590                // Memory barrier
     591                __atomic_thread_fence( __ATOMIC_SEQ_CST );
    535592
    536593                // Nothing was new return 0
     
    541598                uint32_t count = tail - head;
    542599                for(i; count) {
    543                         unsigned idx = (head + i) & (*ring.completion_q.mask);
     600                        unsigned idx = (head + i) & mask;
    544601                        struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
    545602
     
    555612
    556613                // Allow new submissions to happen
    557                 V(ring.submit, count);
     614                // V(ring.submit, count);
    558615
    559616                // Mark to the kernel that the cqe has been seen
    560617                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
     618                __atomic_thread_fence( __ATOMIC_SEQ_CST );
    561619                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
    562620
     
    709767//
    710768
    711         static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring ) {
    712                 // Wait for a spot to be available
    713                 __attribute__((unused)) bool blocked = P(ring.submit);
    714                 #if !defined(__CFA_NO_STATISTICS__)
    715                         __atomic_fetch_add( &ring.submit_q.stats.submit_avg.block, blocked ? 1ul64 : 0ul64, __ATOMIC_RELAXED );
    716                 #endif
    717 
    718                 // Allocate the sqe
    719                 uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST);
    720 
    721                 // Mask the idx now to allow make everything easier to check
    722                 idx &= *ring.submit_q.mask;
    723 
    724                 // Return the sqe
    725                 return [&ring.submit_q.sqes[ idx ], idx];
     769        static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) {
     770                verify( data != 0 );
     771
     772                // Prepare the data we need
     773                __attribute((unused)) int len   = 0;
     774                __attribute((unused)) int block = 0;
     775                uint32_t cnt = *ring.submit_q.num;
     776                uint32_t mask = *ring.submit_q.mask;
     777                uint32_t off = __tls_rand();
     778
     779                // Loop around looking for an available spot
     780                LOOKING: for() {
     781                        // Look through the list starting at some offset
     782                        for(i; cnt) {
     783                                uint64_t expected = 0;
     784                                uint32_t idx = (i + off) & mask;
     785                                struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
     786                                volatile uint64_t * udata = &sqe->user_data;
     787
     788                                if( *udata == expected &&
     789                                        __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
     790                                {
     791                                        // update statistics
     792                                        #if !defined(__CFA_NO_STATISTICS__)
     793                                                __atomic_fetch_add( &ring.submit_q.stats.alloc_avg.val,   len,   __ATOMIC_RELAXED );
     794                                                __atomic_fetch_add( &ring.submit_q.stats.alloc_avg.block, block, __ATOMIC_RELAXED );
     795                                                __atomic_fetch_add( &ring.submit_q.stats.alloc_avg.cnt,   1,     __ATOMIC_RELAXED );
     796                                        #endif
     797
     798                                        // Success return the data
     799                                        return [sqe, idx];
     800                                }
     801                                verify(expected != data);
     802
     803                                len ++;
     804                        }
     805
     806                        block++;
     807                        yield();
     808                }
    726809        }
    727810
     
    741824                        __attribute((unused)) int len   = 0;
    742825                        __attribute((unused)) int block = 0;
    743                         uint32_t expected = -1ul32;
    744826                        uint32_t ready_mask = ring.submit_q.ready_cnt - 1;
    745827                        uint32_t off = __tls_rand();
     
    747829                                for(i; ring.submit_q.ready_cnt) {
    748830                                        uint32_t ii = (i + off) & ready_mask;
     831                                        uint32_t expected = -1ul32;
    749832                                        if( __atomic_compare_exchange_n( &ring.submit_q.ready[ii], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
    750833                                                break LOOKING;
    751834                                        }
     835                                        verify(expected != idx);
    752836
    753837                                        len ++;
     
    791875                        // update statistics
    792876                        #if !defined(__CFA_NO_STATISTICS__)
    793                                 ring.submit_q.stats.submit_avg.val += 1;
     877                                ring.submit_q.stats.submit_avg.csm += 1;
    794878                                ring.submit_q.stats.submit_avg.cnt += 1;
    795879                        #endif
     
    830914
    831915        #define __submit_prelude \
    832                 struct __io_data & ring = *active_cluster()->io; \
     916                io_user_data data = { 0, active_thread() }; \
     917                struct __io_data & ring = *data.thrd->curr_cluster->io; \
    833918                struct io_uring_sqe * sqe; \
    834919                uint32_t idx; \
    835                 [sqe, idx] = __submit_alloc( ring );
     920                [sqe, idx] = __submit_alloc( ring, (uint64_t)&data );
    836921
    837922        #define __submit_wait \
    838                 io_user_data data = { 0, active_thread() }; \
    839923                /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \
    840                 sqe->user_data = (uint64_t)&data; \
     924                verify( sqe->user_data == (uint64_t)&data ); \
    841925                __submit( ring, idx ); \
    842926                park( __cfaabi_dbg_ctx ); \
Note: See TracChangeset for help on using the changeset viewer.