Changes in / [49cad912:3a32b3a]


Ignore:
Files:
1 added
8 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/batch-readv.c

    r49cad912 r3a32b3a  
    11// Program to test the optimial batchsize in a single threaded process
    22extern "C" {
    3         #ifndef _GNU_SOURCE         /* See feature_test_macros(7) */
    4         #define _GNU_SOURCE         /* See feature_test_macros(7) */
    5         #endif
    6         #include <errno.h>
    7         #include <stdio.h>
    8         #include <stdint.h>
    9         #include <stdlib.h>
    10         #include <string.h>
     3        #include <getopt.h>
    114        #include <locale.h>
    12         #include <getopt.h>
    13         #include <unistd.h>
    14         #include <sys/mman.h>
    15         #include <sys/syscall.h>
    16         #include <sys/uio.h>
    17         #include <fcntl.h>
    185        #include <time.h>                                                                               // timespec
    196        #include <sys/time.h>                                                                   // timeval
    20 
    21         #include <linux/io_uring.h>
    227}
    23 
    248
    259enum { TIMEGRAN = 1000000000LL };                                       // nanosecond granularity, except for timeval
     
    2711#include <omp.h>
    2812
    29 # ifndef __NR_io_uring_setup
    30 #  define __NR_io_uring_setup           425
    31 # endif
    32 # ifndef __NR_io_uring_enter
    33 #  define __NR_io_uring_enter           426
    34 # endif
    35 # ifndef __NR_io_uring_register
    36 #  define __NR_io_uring_register        427
    37 # endif
     13#include "io_uring.h"
    3814
    39 struct io_uring_sq {
    40         // Head and tail of the ring (associated with array)
    41         volatile uint32_t * head;
    42         volatile uint32_t * tail;
    4315
    44         // The actual kernel ring which uses head/tail
    45         // indexes into the sqes arrays
    46         uint32_t * array;
    47 
    48         // number of entries and mask to go with it
    49         const uint32_t * num;
    50         const uint32_t * mask;
    51 
    52         // Submission flags (Not sure what for)
    53         uint32_t * flags;
    54 
    55         // number of sqes not submitted (whatever that means)
    56         uint32_t * dropped;
    57 
    58         // Like head/tail but not seen by the kernel
    59         volatile uint32_t alloc;
    60 
    61         // A buffer of sqes (not the actual ring)
    62         struct io_uring_sqe * sqes;
    63 
    64         // The location and size of the mmaped area
    65         void * ring_ptr;
    66         size_t ring_sz;
    67 };
    68 
    69 struct io_uring_cq {
    70         // Head and tail of the ring
    71         volatile uint32_t * head;
    72         volatile uint32_t * tail;
    73 
    74         // number of entries and mask to go with it
    75         const uint32_t * mask;
    76         const uint32_t * num;
    77 
    78         // number of cqes not submitted (whatever that means)
    79         uint32_t * overflow;
    80 
    81         // the kernel ring
    82         struct io_uring_cqe * cqes;
    83 
    84         // The location and size of the mmaped area
    85         void * ring_ptr;
    86         size_t ring_sz;
    87 };
    88 
    89 struct io_ring {
    90         struct io_uring_sq submit_q;
    91         struct io_uring_cq completion_q;
    92         uint32_t flags;
    93         int fd;
    94 };
    95 
    96 struct fred {
    97         io_ring io;
    98 };
    99 
    100 fred self;
    10116int myfd;
    10217
     
    217132        myfd = open(__FILE__, 0);
    218133
    219         // Step 1 : call to setup
    220         struct io_uring_params params;
    221         memset(&params, 0, sizeof(params));
    222 
    223         uint32_t nentries = 2048;
    224 
    225         int fd = syscall(__NR_io_uring_setup, nentries, &params );
    226         if(fd < 0) {
    227                 fprintf(stderr, "KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
    228                 abort();
    229         }
    230 
    231         // Step 2 : mmap result
    232         memset(&self.io, 0, sizeof(struct io_ring));
    233         struct io_uring_sq & sq = self.io.submit_q;
    234         struct io_uring_cq & cq = self.io.completion_q;
    235 
    236         // calculate the right ring size
    237         sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
    238         cq.ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
    239 
    240         // Requires features
    241         // // adjust the size according to the parameters
    242         // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
    243         //      cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz);
    244         // }
    245 
    246         // mmap the Submit Queue into existence
    247         sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
    248         if (sq.ring_ptr == (void*)MAP_FAILED) {
    249                 fprintf(stderr, "KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
    250                 abort();
    251         }
    252 
    253         // mmap the Completion Queue into existence (may or may not be needed)
    254         // Requires features
    255         // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
    256         //      cq->ring_ptr = sq->ring_ptr;
    257         // }
    258         // else {
    259                 // We need multiple call to MMAP
    260                 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
    261                 if (cq.ring_ptr == (void*)MAP_FAILED) {
    262                         munmap(sq.ring_ptr, sq.ring_sz);
    263                         fprintf(stderr, "KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
    264                         abort();
    265                 }
    266         // }
    267 
    268         // mmap the submit queue entries
    269         size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
    270         sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
    271         if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
    272                 munmap(sq.ring_ptr, sq.ring_sz);
    273                 if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
    274                 fprintf(stderr, "KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
    275                 abort();
    276         }
    277 
    278         // Get the pointers from the kernel to fill the structure
    279         // submit queue
    280         sq.head    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
    281         sq.tail    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
    282         sq.mask    = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
    283         sq.num     = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
    284         sq.flags   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
    285         sq.dropped = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
    286         sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
    287         sq.alloc = *sq.tail;
    288 
    289         // completion queue
    290         cq.head     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
    291         cq.tail     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
    292         cq.mask     = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
    293         cq.num      = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
    294         cq.overflow = (         uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
    295         cq.cqes   = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
    296 
    297         self.io.fd = fd;
     134        init_uring(2048);
    298135
    299136        // Allocate the sqe
     
    344181
    345182        printf("Took %'ld ms\n", to_miliseconds(end - start));
    346         printf("Submitted       %'llu\n", submits);
    347         printf("Completed       %'llu\n", completes);
    348         printf("Submitted / sec %'.f\n", submits   / to_fseconds(end - start));
    349         printf("Completed / sec %'.f\n", completes / to_fseconds(end - start));
     183        printf("Submitted        %'llu\n", submits);
     184        printf("Completed        %'llu\n", completes);
     185        printf("Submitted / sec  %'.f\n", submits   / to_fseconds(end - start));
     186        printf("Completed / sec  %'.f\n", completes / to_fseconds(end - start));
     187        printf("ns per Submitted %'.f\n", 1000000000.0 * to_fseconds(end - start) / (submits   / batch) );
     188        printf("ns per Completed %'.f\n", 1000000000.0 * to_fseconds(end - start) / (completes / batch) );
    350189}
  • benchmark/io/readv.cfa

    r49cad912 r3a32b3a  
    5858                static struct option options[] = {
    5959                        BENCH_OPT_LONG
    60                         {"bufsize",      required_argument, 0, 'b'},
    61                         {"userthread",   no_argument      , 0, 'u'},
    62                         {"submitthread", no_argument      , 0, 's'},
    63                         {"submitlength", required_argument, 0, 'l'},
     60                        {"bufsize",       required_argument, 0, 'b'},
     61                        {"userthread",    no_argument      , 0, 'u'},
     62                        {"submitthread",  no_argument      , 0, 's'},
     63                        {"eagersubmit",   no_argument      , 0, 'e'},
     64                        {"kpollsubmit",   no_argument      , 0, 'k'},
     65                        {"kpollcomplete", no_argument      , 0, 'i'},
     66                        {"submitlength",  required_argument, 0, 'l'},
    6467                        {0, 0, 0, 0}
    6568                };
    6669
    6770                int idx = 0;
    68                 int opt = getopt_long(argc, argv, BENCH_OPT_SHORT "b:usl:", options, &idx);
     71                int opt = getopt_long(argc, argv, BENCH_OPT_SHORT "b:usekil:", options, &idx);
    6972
    7073                const char * arg = optarg ? optarg : "";
     
    8891                                flags |= CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS;
    8992                                break;
     93                        case 'e':
     94                                flags |= CFA_CLUSTER_IO_EAGER_SUBMITS;
     95                                break;
     96                        case 'k':
     97                                flags |= CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS;
     98                                break;
     99                        case 'i':
     100                                flags |= CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES;
     101                                break;
    90102                        case 'l':
    91103                                sublen = strtoul(arg, &end, 10);
     
    103115                                fprintf( stderr, "  -u, --userthread         If set, cluster uses user-thread to poll I/O\n" );
    104116                                fprintf( stderr, "  -s, --submitthread       If set, cluster uses polling thread to submit I/O\n" );
     117                                fprintf( stderr, "  -e, --eagersubmit        If set, cluster submits I/O eagerly but still aggregates submits\n" );
     118                                fprintf( stderr, "  -k, --kpollsubmit        If set, cluster uses IORING_SETUP_SQPOLL\n" );
     119                                fprintf( stderr, "  -i, --kpollcomplete      If set, cluster uses IORING_SETUP_IOPOLL\n" );
     120                                fprintf( stderr, "  -l, --submitlength=LEN   Max number of submitions that can be submitted together\n" );
    105121                                exit(EXIT_FAILURE);
    106122                }
  • libcfa/src/concurrency/io.cfa

    r49cad912 r3a32b3a  
    109109                volatile uint32_t * head;
    110110                volatile uint32_t * tail;
     111                volatile uint32_t prev_head;
    111112
    112113                // The actual kernel ring which uses head/tail
     
    129130
    130131                __spinlock_t lock;
     132                __spinlock_t release_lock;
    131133
    132134                // A buffer of sqes (not the actual ring)
     
    182184//=============================================================================================
    183185        void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
     186                if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) {
     187                        abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n");
     188                }
     189
    184190                this.io = malloc();
    185191
     
    187193                struct io_uring_params params;
    188194                memset(&params, 0, sizeof(params));
     195                if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS   ) params.flags |= IORING_SETUP_SQPOLL;
     196                if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES ) params.flags |= IORING_SETUP_IOPOLL;
    189197
    190198                uint32_t nentries = entries_per_cluster();
     
    253261                sq.dropped = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
    254262                sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
     263                sq.prev_head = *sq.head;
    255264
    256265                {
     
    261270                }
    262271
    263                 if( io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
     272                (sq.lock){};
     273                (sq.release_lock){};
     274
     275                if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) {
    264276                        /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8)  );
    265277                        sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
     
    421433// I/O Polling
    422434//=============================================================================================
     435        static unsigned __collect_submitions( struct __io_data & ring );
     436        static uint32_t __release_consumed_submission( struct __io_data & ring );
     437
    423438        // Process a single completion message from the io_uring
    424439        // This is NOT thread-safe
    425440        static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) {
     441                /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
     442                const uint32_t smask = *ring.submit_q.mask;
     443
    426444                unsigned to_submit = 0;
    427445                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
    428 
    429446                        // If the poller thread also submits, then we need to aggregate the submissions which are ready
    430                         uint32_t tail = *ring.submit_q.tail;
    431                         const uint32_t mask = *ring.submit_q.mask;
    432 
    433                         // Go through the list of ready submissions
    434                         for( i; ring.submit_q.ready_cnt ) {
    435                                 // replace any submission with the sentinel, to consume it.
    436                                 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
    437 
    438                                 // If it was already the sentinel, then we are done
    439                                 if( idx == -1ul32 ) continue;
    440 
    441                                 // If we got a real submission, append it to the list
    442                                 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
    443                                 to_submit++;
    444                         }
    445 
    446                         // Increment the tail based on how many we are ready to submit
    447                         __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
    448                 }
    449 
    450                 const uint32_t smask = *ring.submit_q.mask;
    451                 uint32_t shead = *ring.submit_q.head;
    452                 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
    453                 if( ret < 0 ) {
    454                         switch((int)errno) {
    455                         case EAGAIN:
    456                         case EINTR:
    457                                 return -EAGAIN;
    458                         default:
    459                                 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
    460                         }
    461                 }
    462 
    463                 // Release the consumed SQEs
    464                 for( i; ret ) {
    465                         uint32_t idx = ring.submit_q.array[ (i + shead) & smask ];
    466                         ring.submit_q.sqes[ idx ].user_data = 0;
    467                 }
    468 
    469                 uint32_t avail = 0;
    470                 uint32_t sqe_num = *ring.submit_q.num;
    471                 for(i; sqe_num) {
    472                         if( ring.submit_q.sqes[ i ].user_data == 0 ) avail++;
    473                 }
    474 
    475                 // update statistics
    476                 #if !defined(__CFA_NO_STATISTICS__)
    477                         __tls_stats()->io.submit_q.submit_avg.rdy += to_submit;
    478                         __tls_stats()->io.submit_q.submit_avg.csm += ret;
    479                         __tls_stats()->io.submit_q.submit_avg.avl += avail;
    480                         __tls_stats()->io.submit_q.submit_avg.cnt += 1;
    481                 #endif
     447                        to_submit = __collect_submitions( ring );
     448                }
     449
     450                if (to_submit > 0 || waitcnt > 0) {
     451                        int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
     452                        if( ret < 0 ) {
     453                                switch((int)errno) {
     454                                case EAGAIN:
     455                                case EINTR:
     456                                        return [0, true];
     457                                default:
     458                                        abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
     459                                }
     460                        }
     461
     462                        // Release the consumed SQEs
     463                        __release_consumed_submission( ring );
     464
     465                        // update statistics
     466                        __STATS__( true,
     467                                if( to_submit > 0 ) {
     468                                        io.submit_q.submit_avg.rdy += to_submit;
     469                                        io.submit_q.submit_avg.csm += ret;
     470                                        io.submit_q.submit_avg.cnt += 1;
     471                                }
     472                        )
     473                }
     474
     475                // Memory barrier
     476                __atomic_thread_fence( __ATOMIC_SEQ_CST );
    482477
    483478                // Drain the queue
     
    486481                const uint32_t mask = *ring.completion_q.mask;
    487482
    488                 // Memory barrier
    489                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
    490 
    491483                // Nothing was new return 0
    492484                if (head == tail) {
    493                         return 0;
     485                        return [0, to_submit > 0];
    494486                }
    495487
    496488                uint32_t count = tail - head;
     489                /* paranoid */ verify( count != 0 );
    497490                for(i; count) {
    498491                        unsigned idx = (head + i) & mask;
     
    559552
    560553                                // Update statistics
    561                                 #if !defined(__CFA_NO_STATISTICS__)
    562                                         __tls_stats()->io.complete_q.completed_avg.val += count;
    563                                         __tls_stats()->io.complete_q.completed_avg.slow_cnt += 1;
    564                                 #endif
     554                                __STATS__( true,
     555                                        io.complete_q.completed_avg.val += count;
     556                                        io.complete_q.completed_avg.slow_cnt += 1;
     557                                )
    565558
    566559                                if(again) {
     
    576569                                int count;
    577570                                bool again;
    578                                 [count, again] = __drain_io( ring, &mask, 0, true );
     571                                [count, again] = __drain_io( ring, &mask, 1, true );
    579572
    580573                                // Update statistics
    581                                 #if !defined(__CFA_NO_STATISTICS__)
    582                                         __tls_stats()->io.complete_q.completed_avg.val += count;
    583                                         __tls_stats()->io.complete_q.completed_avg.slow_cnt += 1;
    584                                 #endif
     574                                __STATS__( true,
     575                                        io.complete_q.completed_avg.val += count;
     576                                        io.complete_q.completed_avg.slow_cnt += 1;
     577                                )
    585578                        }
    586579                }
     
    619612
    620613                                // Update statistics
    621                                 #if !defined(__CFA_NO_STATISTICS__)
    622                                         __tls_stats()->io.complete_q.completed_avg.val += count;
    623                                         __tls_stats()->io.complete_q.completed_avg.fast_cnt += 1;
    624                                 #endif
     614                                __STATS__( true,
     615                                        io.complete_q.completed_avg.val += count;
     616                                        io.complete_q.completed_avg.fast_cnt += 1;
     617                                )
    625618                        enable_interrupts( __cfaabi_dbg_ctx );
    626619
     
    658651
    659652// Submition steps :
    660 // 1 - We need to make sure we don't overflow any of the buffer, P(ring.submit) to make sure
    661 //     entries are available. The semaphore make sure that there is no more operations in
    662 //     progress then the number of entries in the buffer. This probably limits concurrency
    663 //     more than necessary since submitted but not completed operations don't need any
    664 //     entries in user space. However, I don't know what happens if we overflow the buffers
    665 //     because too many requests completed at once. This is a safe approach in all cases.
    666 //     Furthermore, with hundreds of entries, this may be okay.
    667 //
    668 // 2 - Allocate a queue entry. The ring already has memory for all entries but only the ones
     653// 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones
    669654//     listed in sq.array are visible by the kernel. For those not listed, the kernel does not
    670655//     offer any assurance that an entry is not being filled by multiple flags. Therefore, we
    671656//     need to write an allocator that allows allocating concurrently.
    672657//
    673 // 3 - Actually fill the submit entry, this is the only simple and straightforward step.
     658// 2 - Actually fill the submit entry, this is the only simple and straightforward step.
    674659//
    675 // 4 - Append the entry index to the array and adjust the tail accordingly. This operation
     660// 3 - Append the entry index to the array and adjust the tail accordingly. This operation
    676661//     needs to arrive to two concensus at the same time:
    677662//     A - The order in which entries are listed in the array: no two threads must pick the
     
    682667
    683668        [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) {
    684                 verify( data != 0 );
    685 
     669                /* paranoid */ verify( data != 0 );
    686670
    687671                // Prepare the data we need
     
    708692                                {
    709693                                        // update statistics
    710                                         #if !defined(__CFA_NO_STATISTICS__)
    711                                                 disable_interrupts();
    712                                                         __tls_stats()->io.submit_q.alloc_avg.val   += len;
    713                                                         __tls_stats()->io.submit_q.alloc_avg.block += block;
    714                                                         __tls_stats()->io.submit_q.alloc_avg.cnt   += 1;
    715                                                 enable_interrupts( __cfaabi_dbg_ctx );
    716                                         #endif
     694                                        __STATS__( false,
     695                                                io.submit_q.alloc_avg.val   += len;
     696                                                io.submit_q.alloc_avg.block += block;
     697                                                io.submit_q.alloc_avg.cnt   += 1;
     698                                        )
    717699
    718700
     
    757739
    758740                        block++;
    759                         yield();
     741                        if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
     742                                __release_consumed_submission( ring );
     743                                unlock( ring.submit_q.lock );
     744                        }
     745                        else {
     746                                yield();
     747                        }
    760748                }
    761749
    762750                // update statistics
    763                 #if !defined(__CFA_NO_STATISTICS__)
    764                 disable_interrupts();
    765                         __tls_stats()->io.submit_q.look_avg.val   += len;
    766                         __tls_stats()->io.submit_q.look_avg.block += block;
    767                         __tls_stats()->io.submit_q.look_avg.cnt   += 1;
    768                 enable_interrupts( __cfaabi_dbg_ctx );
    769                 #endif
     751                __STATS__( false,
     752                        io.submit_q.look_avg.val   += len;
     753                        io.submit_q.look_avg.block += block;
     754                        io.submit_q.look_avg.cnt   += 1;
     755                )
    770756
    771757                return picked;
     
    780766                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
    781767                        // If the poller thread submits, then we just need to add this to the ready array
    782 
    783768                        __submit_to_ready_array( ring, idx, mask );
    784769
     
    786771
    787772                        __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
     773                }
     774                else if( ring.cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS ) {
     775                        uint32_t picked = __submit_to_ready_array( ring, idx, mask );
     776
     777                        for() {
     778                                yield();
     779
     780                                // If some one else collected our index, we are done
     781                                #warning ABA problem
     782                                if( ring.submit_q.ready[picked] != idx ) {
     783                                        __STATS__( false,
     784                                                io.submit_q.helped += 1;
     785                                        )
     786                                        return;
     787                                }
     788
     789                                if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
     790                                        __STATS__( false,
     791                                                io.submit_q.leader += 1;
     792                                        )
     793                                        break;
     794                                }
     795
     796                                __STATS__( false,
     797                                        io.submit_q.busy += 1;
     798                                )
     799                        }
     800
     801                        // We got the lock
     802                        unsigned to_submit = __collect_submitions( ring );
     803                        int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, 0, 0p, _NSIG / 8);
     804                        if( ret < 0 ) {
     805                                switch((int)errno) {
     806                                case EAGAIN:
     807                                case EINTR:
     808                                        unlock(ring.submit_q.lock);
     809                                        return;
     810                                default:
     811                                        abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
     812                                }
     813                        }
     814
     815                        /* paranoid */ verify( ret > 0 );
     816
     817                        // Release the consumed SQEs
     818                        __release_consumed_submission( ring );
     819
     820                        // update statistics
     821                        __STATS__( true,
     822                                io.submit_q.submit_avg.rdy += to_submit;
     823                                io.submit_q.submit_avg.csm += ret;
     824                                io.submit_q.submit_avg.cnt += 1;
     825                        )
     826
     827                        unlock(ring.submit_q.lock);
    788828                }
    789829                else {
     
    797837                        ring.submit_q.array[ (*tail) & mask ] = idx & mask;
    798838                        __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
     839
     840                        /* paranoid */ verify( ring.submit_q.sqes[ idx ].user_data != 0 );
    799841
    800842                        // Submit however, many entries need to be submitted
     
    808850
    809851                        // update statistics
    810                         #if !defined(__CFA_NO_STATISTICS__)
    811                                 __tls_stats()->io.submit_q.submit_avg.csm += 1;
    812                                 __tls_stats()->io.submit_q.submit_avg.cnt += 1;
    813                         #endif
    814 
    815                         ring.submit_q.sqes[ idx & mask ].user_data = 0;
     852                        __STATS__( false,
     853                                io.submit_q.submit_avg.csm += 1;
     854                                io.submit_q.submit_avg.cnt += 1;
     855                        )
     856
     857                        // Release the consumed SQEs
     858                        __release_consumed_submission( ring );
    816859
    817860                        unlock(ring.submit_q.lock);
     
    820863                }
    821864        }
     865
     866        static unsigned __collect_submitions( struct __io_data & ring ) {
     867                /* paranoid */ verify( ring.submit_q.ready != 0p );
     868                /* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
     869
     870                unsigned to_submit = 0;
     871                uint32_t tail = *ring.submit_q.tail;
     872                const uint32_t mask = *ring.submit_q.mask;
     873
     874                // Go through the list of ready submissions
     875                for( i; ring.submit_q.ready_cnt ) {
     876                        // replace any submission with the sentinel, to consume it.
     877                        uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
     878
     879                        // If it was already the sentinel, then we are done
     880                        if( idx == -1ul32 ) continue;
     881
     882                        // If we got a real submission, append it to the list
     883                        ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
     884                        to_submit++;
     885                }
     886
     887                // Increment the tail based on how many we are ready to submit
     888                __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
     889
     890                return to_submit;
     891        }
     892
     893        static uint32_t __release_consumed_submission( struct __io_data & ring ) {
     894                const uint32_t smask = *ring.submit_q.mask;
     895
     896                if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
     897                uint32_t chead = *ring.submit_q.head;
     898                uint32_t phead = ring.submit_q.prev_head;
     899                ring.submit_q.prev_head = chead;
     900                unlock(ring.submit_q.release_lock);
     901
     902                uint32_t count = chead - phead;
     903                for( i; count ) {
     904                        uint32_t idx = ring.submit_q.array[ (phead + i) & smask ];
     905                        ring.submit_q.sqes[ idx ].user_data = 0;
     906                }
     907                return count;
     908        }
    822909#endif
  • libcfa/src/concurrency/kernel.hfa

    r49cad912 r3a32b3a  
    129129struct __io_data;
    130130
    131 #define CFA_CLUSTER_IO_POLLER_USER_THREAD    1 << 0 // 0x1
    132 #define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS 1 << 1 // 0x2
    133 // #define CFA_CLUSTER_IO_POLLER_KERNEL_SIDE 1 << 2 // 0x4
     131#define CFA_CLUSTER_IO_POLLER_USER_THREAD    (1 << 0) // 0x01
     132#define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS (1 << 1) // 0x02
     133#define CFA_CLUSTER_IO_EAGER_SUBMITS         (1 << 2) // 0x04
     134#define CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS   (1 << 3) // 0x08
     135#define CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES (1 << 4) // 0x10
    134136#define CFA_CLUSTER_IO_BUFFLEN_OFFSET        16
    135137
  • libcfa/src/concurrency/kernel_private.hfa

    r49cad912 r3a32b3a  
    286286// Statics call at the end of each thread to register statistics
    287287#if !defined(__CFA_NO_STATISTICS__)
    288 static inline struct __stats_t * __tls_stats() {
    289         /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    290         /* paranoid */ verify( kernelTLS.this_stats );
    291         return kernelTLS.this_stats;
    292 }
     288        static inline struct __stats_t * __tls_stats() {
     289                /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
     290                /* paranoid */ verify( kernelTLS.this_stats );
     291                return kernelTLS.this_stats;
     292        }
     293
     294        #define __STATS__(in_kernel, ...) { \
     295                if( !(in_kernel) ) disable_interrupts(); \
     296                with( *__tls_stats() ) { \
     297                        __VA_ARGS__ \
     298                } \
     299                if( !(in_kernel) ) enable_interrupts( __cfaabi_dbg_ctx ); \
     300        }
     301#else
     302        #define __STATS__(in_kernel, ...)
    293303#endif
    294304
  • libcfa/src/concurrency/preemption.cfa

    r49cad912 r3a32b3a  
    186186        void enable_interrupts( __cfaabi_dbg_ctx_param ) {
    187187                processor   * proc = kernelTLS.this_processor; // Cache the processor now since interrupts can start happening after the atomic store
     188                /* paranoid */ verify( proc );
    188189
    189190                with( kernelTLS.preemption_state ){
  • libcfa/src/concurrency/stats.cfa

    r49cad912 r3a32b3a  
    2727                        stats->io.submit_q.submit_avg.rdy = 0;
    2828                        stats->io.submit_q.submit_avg.csm = 0;
    29                         stats->io.submit_q.submit_avg.avl = 0;
    3029                        stats->io.submit_q.submit_avg.cnt = 0;
    3130                        stats->io.submit_q.look_avg.val   = 0;
     
    3534                        stats->io.submit_q.alloc_avg.cnt   = 0;
    3635                        stats->io.submit_q.alloc_avg.block = 0;
     36                        stats->io.submit_q.helped = 0;
     37                        stats->io.submit_q.leader = 0;
     38                        stats->io.submit_q.busy   = 0;
    3739                        stats->io.complete_q.completed_avg.val = 0;
    3840                        stats->io.complete_q.completed_avg.slow_cnt = 0;
     
    6870                        __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.cnt           , proc->io.submit_q.alloc_avg.cnt           , __ATOMIC_SEQ_CST );
    6971                        __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.block         , proc->io.submit_q.alloc_avg.block         , __ATOMIC_SEQ_CST );
     72                        __atomic_fetch_add( &cltr->io.submit_q.helped                  , proc->io.submit_q.helped                  , __ATOMIC_SEQ_CST );
     73                        __atomic_fetch_add( &cltr->io.submit_q.leader                  , proc->io.submit_q.leader                  , __ATOMIC_SEQ_CST );
     74                        __atomic_fetch_add( &cltr->io.submit_q.busy                    , proc->io.submit_q.busy                    , __ATOMIC_SEQ_CST );
    7075                        __atomic_fetch_add( &cltr->io.complete_q.completed_avg.val     , proc->io.complete_q.completed_avg.val     , __ATOMIC_SEQ_CST );
    7176                        __atomic_fetch_add( &cltr->io.complete_q.completed_avg.slow_cnt, proc->io.complete_q.completed_avg.slow_cnt, __ATOMIC_SEQ_CST );
     
    120125                                double avgrdy = ((double)io.submit_q.submit_avg.rdy) / io.submit_q.submit_avg.cnt;
    121126                                double avgcsm = ((double)io.submit_q.submit_avg.csm) / io.submit_q.submit_avg.cnt;
    122                                 double avgavl = ((double)io.submit_q.submit_avg.avl) / io.submit_q.submit_avg.cnt;
    123127
    124128                                double lavgv = 0;
     
    141145                                        "- avg ready entries      : %'18.2lf\n"
    142146                                        "- avg submitted entries  : %'18.2lf\n"
    143                                         "- avg available entries  : %'18.2lf\n"
     147                                        "- total helped entries   : %'15" PRIu64 "\n"
     148                                        "- total leader entries   : %'15" PRIu64 "\n"
     149                                        "- total busy submit      : %'15" PRIu64 "\n"
    144150                                        "- total ready search     : %'15" PRIu64 "\n"
    145151                                        "- avg ready search len   : %'18.2lf\n"
     
    153159                                        , cluster ? "Cluster" : "Processor",  name, id
    154160                                        , io.submit_q.submit_avg.cnt
    155                                         , avgrdy, avgcsm, avgavl
     161                                        , avgrdy, avgcsm
     162                                        , io.submit_q.helped, io.submit_q.leader, io.submit_q.busy
    156163                                        , io.submit_q.look_avg.cnt
    157164                                        , lavgv, lavgb
  • libcfa/src/concurrency/stats.hfa

    r49cad912 r3a32b3a  
    8383                                        volatile uint64_t block;
    8484                                } alloc_avg;
     85                                volatile uint64_t helped;
     86                                volatile uint64_t leader;
     87                                volatile uint64_t busy;
    8588                        } submit_q;
    8689                        struct {
Note: See TracChangeset for help on using the changeset viewer.