Changeset e46c753


Ignore:
Timestamp:
Jul 2, 2020, 4:17:51 PM (4 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
8bb239d
Parents:
8e9e9a2
Message:

Added new io algorithm that eagerly submits while still helping

Location:
libcfa/src/concurrency
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    r8e9e9a2 re46c753  
    182182//=============================================================================================
    183183        void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
     184                if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) {
     185                        abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n");
     186                }
     187
    184188                this.io = malloc();
    185189
     
    261265                }
    262266
    263                 if( io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
     267                if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) {
    264268                        /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8)  );
    265269                        sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
     
    423427        // Process a single completion message from the io_uring
    424428        // This is NOT thread-safe
     429        static unsigned __collect_submitions( struct __io_data & ring );
    425430        static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) {
     431                /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
     432
    426433                unsigned to_submit = 0;
    427434                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
    428 
    429435                        // If the poller thread also submits, then we need to aggregate the submissions which are ready
    430                         uint32_t tail = *ring.submit_q.tail;
    431                         const uint32_t mask = *ring.submit_q.mask;
    432 
    433                         // Go through the list of ready submissions
    434                         for( i; ring.submit_q.ready_cnt ) {
    435                                 // replace any submission with the sentinel, to consume it.
    436                                 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
    437 
    438                                 // If it was already the sentinel, then we are done
    439                                 if( idx == -1ul32 ) continue;
    440 
    441                                 // If we got a real submission, append it to the list
    442                                 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
    443                                 to_submit++;
    444                         }
    445 
    446                         // Increment the tail based on how many we are ready to submit
    447                         __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
     436                        to_submit = __collect_submitions( ring );
    448437                }
    449438
     
    455444                        case EAGAIN:
    456445                        case EINTR:
    457                                 return -EAGAIN;
     446                                return [0, true];
    458447                        default:
    459448                                abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
     
    467456                }
    468457
    469                 uint32_t avail = 0;
    470                 uint32_t sqe_num = *ring.submit_q.num;
    471                 for(i; sqe_num) {
    472                         if( ring.submit_q.sqes[ i ].user_data == 0 ) avail++;
    473                 }
    474 
    475458                // update statistics
    476459                #if !defined(__CFA_NO_STATISTICS__)
    477                         __tls_stats()->io.submit_q.submit_avg.rdy += to_submit;
    478                         __tls_stats()->io.submit_q.submit_avg.csm += ret;
    479                         __tls_stats()->io.submit_q.submit_avg.avl += avail;
    480                         __tls_stats()->io.submit_q.submit_avg.cnt += 1;
     460                        if( to_submit > 0 ) {
     461                                __tls_stats()->io.submit_q.submit_avg.rdy += to_submit;
     462                                __tls_stats()->io.submit_q.submit_avg.csm += ret;
     463                                __tls_stats()->io.submit_q.submit_avg.cnt += 1;
     464                        }
    481465                #endif
    482466
     
    491475                // Nothing was new return 0
    492476                if (head == tail) {
    493                         return 0;
     477                        return [0, to_submit > 0];
    494478                }
    495479
     
    576560                                int count;
    577561                                bool again;
    578                                 [count, again] = __drain_io( ring, &mask, 0, true );
     562                                [count, again] = __drain_io( ring, &mask, 1, true );
    579563
    580564                                // Update statistics
     
    658642
    659643// Submition steps :
    660 // 1 - We need to make sure we don't overflow any of the buffer, P(ring.submit) to make sure
    661 //     entries are available. The semaphore make sure that there is no more operations in
    662 //     progress then the number of entries in the buffer. This probably limits concurrency
    663 //     more than necessary since submitted but not completed operations don't need any
    664 //     entries in user space. However, I don't know what happens if we overflow the buffers
    665 //     because too many requests completed at once. This is a safe approach in all cases.
    666 //     Furthermore, with hundreds of entries, this may be okay.
    667 //
    668 // 2 - Allocate a queue entry. The ring already has memory for all entries but only the ones
     644// 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones
    669645//     listed in sq.array are visible by the kernel. For those not listed, the kernel does not
    670646//     offer any assurance that an entry is not being filled by multiple flags. Therefore, we
    671647//     need to write an allocator that allows allocating concurrently.
    672648//
    673 // 3 - Actually fill the submit entry, this is the only simple and straightforward step.
     649// 2 - Actually fill the submit entry, this is the only simple and straightforward step.
    674650//
    675 // 4 - Append the entry index to the array and adjust the tail accordingly. This operation
     651// 3 - Append the entry index to the array and adjust the tail accordingly. This operation
    676652//     needs to arrive to two concensus at the same time:
    677653//     A - The order in which entries are listed in the array: no two threads must pick the
     
    682658
    683659        [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) {
    684                 verify( data != 0 );
    685 
     660                /* paranoid */ verify( data != 0 );
    686661
    687662                // Prepare the data we need
     
    762737                // update statistics
    763738                #if !defined(__CFA_NO_STATISTICS__)
    764                 disable_interrupts();
    765                         __tls_stats()->io.submit_q.look_avg.val   += len;
    766                         __tls_stats()->io.submit_q.look_avg.block += block;
    767                         __tls_stats()->io.submit_q.look_avg.cnt   += 1;
    768                 enable_interrupts( __cfaabi_dbg_ctx );
     739                        disable_interrupts();
     740                                __tls_stats()->io.submit_q.look_avg.val   += len;
     741                                __tls_stats()->io.submit_q.look_avg.block += block;
     742                                __tls_stats()->io.submit_q.look_avg.cnt   += 1;
     743                        enable_interrupts( __cfaabi_dbg_ctx );
    769744                #endif
    770745
     
    780755                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
    781756                        // If the poller thread submits, then we just need to add this to the ready array
    782 
    783757                        __submit_to_ready_array( ring, idx, mask );
    784758
     
    786760
    787761                        __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
     762                }
     763                else if( ring.cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS ) {
     764                        uint32_t picked = __submit_to_ready_array( ring, idx, mask );
     765
     766                        for() {
     767                                yield();
     768
     769                                // If some one else collected our index, we are done
     770                                if( ring.submit_q.ready[picked] != idx ) {
     771                                        #if !defined(__CFA_NO_STATISTICS__)
     772                                                disable_interrupts();
     773                                                        __tls_stats()->io.submit_q.helped += 1;
     774                                                enable_interrupts( __cfaabi_dbg_ctx );
     775                                        #endif
     776                                        return;
     777                                }
     778
     779                                if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
     780                                        #if !defined(__CFA_NO_STATISTICS__)
     781                                                __tls_stats()->io.submit_q.leader += 1;
     782                                        #endif
     783                                        break;
     784                                }
     785                        }
     786
     787                        // We got the lock
     788                        unsigned to_submit = __collect_submitions( ring );
     789                        // /* paranoid */ verify( to_submit > 0 );
     790                        if( to_submit == 0 ) abort();
     791
     792                        const uint32_t smask = *ring.submit_q.mask;
     793                        uint32_t shead = *ring.submit_q.head;
     794                        int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, 0, 0p, _NSIG / 8);
     795                        if( ret < 0 ) {
     796                                switch((int)errno) {
     797                                case EAGAIN:
     798                                case EINTR:
     799                                        unlock(ring.submit_q.lock);
     800                                        return;
     801                                default:
     802                                        abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
     803                                }
     804                        }
     805
     806                        /* paranoid */ verify( ret > 0 );
     807
     808                        // Release the consumed SQEs
     809                        for( i; ret ) {
     810                                uint32_t idx = ring.submit_q.array[ (i + shead) & smask ];
     811                                ring.submit_q.sqes[ idx ].user_data = 0;
     812                        }
     813
     814                        // update statistics
     815                        #if !defined(__CFA_NO_STATISTICS__)
     816                                __tls_stats()->io.submit_q.submit_avg.rdy += to_submit;
     817                                __tls_stats()->io.submit_q.submit_avg.csm += ret;
     818                                __tls_stats()->io.submit_q.submit_avg.cnt += 1;
     819                        #endif
     820
     821                        unlock(ring.submit_q.lock);
    788822                }
    789823                else {
     
    809843                        // update statistics
    810844                        #if !defined(__CFA_NO_STATISTICS__)
    811                                 __tls_stats()->io.submit_q.submit_avg.csm += 1;
    812                                 __tls_stats()->io.submit_q.submit_avg.cnt += 1;
     845                                disable_interrupts();
     846                                        abort();
     847                                        __tls_stats()->io.submit_q.submit_avg.csm += 1;
     848                                        __tls_stats()->io.submit_q.submit_avg.cnt += 1;
     849                                enable_interrupts( __cfaabi_dbg_ctx );
    813850                        #endif
    814851
     
    820857                }
    821858        }
     859
     860        static unsigned __collect_submitions( struct __io_data & ring ) {
     861                /* paranoid */ verify( ring.submit_q.ready != 0p );
     862                /* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
     863
     864                unsigned to_submit = 0;
     865                uint32_t tail = *ring.submit_q.tail;
     866                const uint32_t mask = *ring.submit_q.mask;
     867
     868                // Go through the list of ready submissions
     869                for( i; ring.submit_q.ready_cnt ) {
     870                        // replace any submission with the sentinel, to consume it.
     871                        uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
     872
     873                        // If it was already the sentinel, then we are done
     874                        if( idx == -1ul32 ) continue;
     875
     876                        // If we got a real submission, append it to the list
     877                        ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
     878                        to_submit++;
     879                }
     880
     881                // Increment the tail based on how many we are ready to submit
     882                __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
     883
     884                return to_submit;
     885        }
    822886#endif
  • libcfa/src/concurrency/kernel.hfa

    r8e9e9a2 re46c753  
    131131#define CFA_CLUSTER_IO_POLLER_USER_THREAD    1 << 0 // 0x1
    132132#define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS 1 << 1 // 0x2
    133 // #define CFA_CLUSTER_IO_POLLER_KERNEL_SIDE 1 << 2 // 0x4
     133#define CFA_CLUSTER_IO_EAGER_SUBMITS        1 << 2 // 0x4
    134134#define CFA_CLUSTER_IO_BUFFLEN_OFFSET        16
    135135
  • libcfa/src/concurrency/stats.cfa

    r8e9e9a2 re46c753  
    2727                        stats->io.submit_q.submit_avg.rdy = 0;
    2828                        stats->io.submit_q.submit_avg.csm = 0;
    29                         stats->io.submit_q.submit_avg.avl = 0;
    3029                        stats->io.submit_q.submit_avg.cnt = 0;
    3130                        stats->io.submit_q.look_avg.val   = 0;
     
    3534                        stats->io.submit_q.alloc_avg.cnt   = 0;
    3635                        stats->io.submit_q.alloc_avg.block = 0;
     36                        stats->io.submit_q.helped = 0;
     37                        stats->io.submit_q.leader = 0;
    3738                        stats->io.complete_q.completed_avg.val = 0;
    3839                        stats->io.complete_q.completed_avg.slow_cnt = 0;
     
    6869                        __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.cnt           , proc->io.submit_q.alloc_avg.cnt           , __ATOMIC_SEQ_CST );
    6970                        __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.block         , proc->io.submit_q.alloc_avg.block         , __ATOMIC_SEQ_CST );
     71                        __atomic_fetch_add( &cltr->io.submit_q.helped                  , proc->io.submit_q.helped                  , __ATOMIC_SEQ_CST );
     72                        __atomic_fetch_add( &cltr->io.submit_q.leader                  , proc->io.submit_q.leader                  , __ATOMIC_SEQ_CST );
    7073                        __atomic_fetch_add( &cltr->io.complete_q.completed_avg.val     , proc->io.complete_q.completed_avg.val     , __ATOMIC_SEQ_CST );
    7174                        __atomic_fetch_add( &cltr->io.complete_q.completed_avg.slow_cnt, proc->io.complete_q.completed_avg.slow_cnt, __ATOMIC_SEQ_CST );
     
    120123                                double avgrdy = ((double)io.submit_q.submit_avg.rdy) / io.submit_q.submit_avg.cnt;
    121124                                double avgcsm = ((double)io.submit_q.submit_avg.csm) / io.submit_q.submit_avg.cnt;
    122                                 double avgavl = ((double)io.submit_q.submit_avg.avl) / io.submit_q.submit_avg.cnt;
    123125
    124126                                double lavgv = 0;
     
    141143                                        "- avg ready entries      : %'18.2lf\n"
    142144                                        "- avg submitted entries  : %'18.2lf\n"
    143                                         "- avg available entries  : %'18.2lf\n"
     145                                        "- total helped entries   : %'15" PRIu64 "\n"
     146                                        "- total leader entries   : %'15" PRIu64 "\n"
    144147                                        "- total ready search     : %'15" PRIu64 "\n"
    145148                                        "- avg ready search len   : %'18.2lf\n"
     
    153156                                        , cluster ? "Cluster" : "Processor",  name, id
    154157                                        , io.submit_q.submit_avg.cnt
    155                                         , avgrdy, avgcsm, avgavl
     158                                        , avgrdy, avgcsm
     159                                        , io.submit_q.helped, io.submit_q.leader
    156160                                        , io.submit_q.look_avg.cnt
    157161                                        , lavgv, lavgb
  • libcfa/src/concurrency/stats.hfa

    r8e9e9a2 re46c753  
    8383                                        volatile uint64_t block;
    8484                                } alloc_avg;
     85                                volatile uint64_t helped;
     86                                volatile uint64_t leader;
    8587                        } submit_q;
    8688                        struct {
Note: See TracChangeset for help on using the changeset viewer.