Changeset d34575b


Ignore:
Timestamp:
Jul 11, 2020, 6:41:48 PM (4 years ago)
Author:
Peter A. Buhr <pabuhr@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
a3d3efc
Parents:
fc9bb79 (diff), 7922158 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Files:
23 added
16 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/batch-readv.c

    rfc9bb79 rd34575b  
    11// Program to test the optimial batchsize in a single threaded process
    22extern "C" {
    3         #ifndef _GNU_SOURCE         /* See feature_test_macros(7) */
    4         #define _GNU_SOURCE         /* See feature_test_macros(7) */
    5         #endif
    6         #include <errno.h>
    7         #include <stdio.h>
    8         #include <stdint.h>
    9         #include <stdlib.h>
    10         #include <string.h>
     3        #include <getopt.h>
    114        #include <locale.h>
    12         #include <getopt.h>
    13         #include <unistd.h>
    14         #include <sys/mman.h>
    15         #include <sys/syscall.h>
    16         #include <sys/uio.h>
    17         #include <fcntl.h>
    185        #include <time.h>                                                                               // timespec
    196        #include <sys/time.h>                                                                   // timeval
    20 
    21         #include <linux/io_uring.h>
    227}
    23 
    248
    259enum { TIMEGRAN = 1000000000LL };                                       // nanosecond granularity, except for timeval
     
    2711#include <omp.h>
    2812
    29 # ifndef __NR_io_uring_setup
    30 #  define __NR_io_uring_setup           425
    31 # endif
    32 # ifndef __NR_io_uring_enter
    33 #  define __NR_io_uring_enter           426
    34 # endif
    35 # ifndef __NR_io_uring_register
    36 #  define __NR_io_uring_register        427
    37 # endif
     13#include "io_uring.h"
    3814
    39 struct io_uring_sq {
    40         // Head and tail of the ring (associated with array)
    41         volatile uint32_t * head;
    42         volatile uint32_t * tail;
    4315
    44         // The actual kernel ring which uses head/tail
    45         // indexes into the sqes arrays
    46         uint32_t * array;
    47 
    48         // number of entries and mask to go with it
    49         const uint32_t * num;
    50         const uint32_t * mask;
    51 
    52         // Submission flags (Not sure what for)
    53         uint32_t * flags;
    54 
    55         // number of sqes not submitted (whatever that means)
    56         uint32_t * dropped;
    57 
    58         // Like head/tail but not seen by the kernel
    59         volatile uint32_t alloc;
    60 
    61         // A buffer of sqes (not the actual ring)
    62         struct io_uring_sqe * sqes;
    63 
    64         // The location and size of the mmaped area
    65         void * ring_ptr;
    66         size_t ring_sz;
    67 };
    68 
    69 struct io_uring_cq {
    70         // Head and tail of the ring
    71         volatile uint32_t * head;
    72         volatile uint32_t * tail;
    73 
    74         // number of entries and mask to go with it
    75         const uint32_t * mask;
    76         const uint32_t * num;
    77 
    78         // number of cqes not submitted (whatever that means)
    79         uint32_t * overflow;
    80 
    81         // the kernel ring
    82         struct io_uring_cqe * cqes;
    83 
    84         // The location and size of the mmaped area
    85         void * ring_ptr;
    86         size_t ring_sz;
    87 };
    88 
    89 struct io_ring {
    90         struct io_uring_sq submit_q;
    91         struct io_uring_cq completion_q;
    92         uint32_t flags;
    93         int fd;
    94 };
    95 
    96 struct fred {
    97         io_ring io;
    98 };
    99 
    100 fred self;
    10116int myfd;
    10217
     
    217132        myfd = open(__FILE__, 0);
    218133
    219         // Step 1 : call to setup
    220         struct io_uring_params params;
    221         memset(&params, 0, sizeof(params));
    222 
    223         uint32_t nentries = 2048;
    224 
    225         int fd = syscall(__NR_io_uring_setup, nentries, &params );
    226         if(fd < 0) {
    227                 fprintf(stderr, "KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
    228                 abort();
    229         }
    230 
    231         // Step 2 : mmap result
    232         memset(&self.io, 0, sizeof(struct io_ring));
    233         struct io_uring_sq & sq = self.io.submit_q;
    234         struct io_uring_cq & cq = self.io.completion_q;
    235 
    236         // calculate the right ring size
    237         sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
    238         cq.ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
    239 
    240         // Requires features
    241         // // adjust the size according to the parameters
    242         // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
    243         //      cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz);
    244         // }
    245 
    246         // mmap the Submit Queue into existence
    247         sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
    248         if (sq.ring_ptr == (void*)MAP_FAILED) {
    249                 fprintf(stderr, "KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
    250                 abort();
    251         }
    252 
    253         // mmap the Completion Queue into existence (may or may not be needed)
    254         // Requires features
    255         // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
    256         //      cq->ring_ptr = sq->ring_ptr;
    257         // }
    258         // else {
    259                 // We need multiple call to MMAP
    260                 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
    261                 if (cq.ring_ptr == (void*)MAP_FAILED) {
    262                         munmap(sq.ring_ptr, sq.ring_sz);
    263                         fprintf(stderr, "KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
    264                         abort();
    265                 }
    266         // }
    267 
    268         // mmap the submit queue entries
    269         size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
    270         sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
    271         if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
    272                 munmap(sq.ring_ptr, sq.ring_sz);
    273                 if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
    274                 fprintf(stderr, "KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
    275                 abort();
    276         }
    277 
    278         // Get the pointers from the kernel to fill the structure
    279         // submit queue
    280         sq.head    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
    281         sq.tail    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
    282         sq.mask    = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
    283         sq.num     = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
    284         sq.flags   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
    285         sq.dropped = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
    286         sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
    287         sq.alloc = *sq.tail;
    288 
    289         // completion queue
    290         cq.head     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
    291         cq.tail     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
    292         cq.mask     = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
    293         cq.num      = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
    294         cq.overflow = (         uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
    295         cq.cqes   = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
    296 
    297         self.io.fd = fd;
     134        init_uring(2048);
    298135
    299136        // Allocate the sqe
     
    344181
    345182        printf("Took %'ld ms\n", to_miliseconds(end - start));
    346         printf("Submitted       %'llu\n", submits);
    347         printf("Completed       %'llu\n", completes);
    348         printf("Submitted / sec %'.f\n", submits   / to_fseconds(end - start));
    349         printf("Completed / sec %'.f\n", completes / to_fseconds(end - start));
     183        printf("Submitted        %'llu\n", submits);
     184        printf("Completed        %'llu\n", completes);
     185        printf("Submitted / sec  %'.f\n", submits   / to_fseconds(end - start));
     186        printf("Completed / sec  %'.f\n", completes / to_fseconds(end - start));
     187        printf("ns per Submitted %'.f\n", 1000000000.0 * to_fseconds(end - start) / (submits   / batch) );
     188        printf("ns per Completed %'.f\n", 1000000000.0 * to_fseconds(end - start) / (completes / batch) );
    350189}
  • benchmark/io/readv.cfa

    rfc9bb79 rd34575b  
     1#define _GNU_SOURCE
     2
    13#include <stdlib.h>
    24#include <stdio.h>
     
    2224extern bool traceHeapOn();
    2325extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
     26extern ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
     27extern void register_fixed_files( cluster &, int *, unsigned count );
    2428
    2529int fd;
     
    2832
    2933unsigned long int buflen = 50;
     34bool fixed_file = false;
    3035
    3136thread __attribute__((aligned(128))) Reader {};
    3237void ?{}( Reader & this ) {
    3338        ((thread&)this){ "Reader Thread", *the_benchmark_cluster };
     39}
     40
     41int do_read(int fd, struct iovec * iov) {
     42        if(fixed_file) {
     43                return cfa_preadv2_fixed(fd, iov, 1, 0, 0);
     44        }
     45        else {
     46                return cfa_preadv2(fd, iov, 1, 0, 0);
     47        }
    3448}
    3549
     
    4256
    4357        while(__atomic_load_n(&run, __ATOMIC_RELAXED)) {
    44                 int r = cfa_preadv2(fd, &iov, 1, 0, 0);
     58                int r = do_read(fd, &iov);
    4559                if(r < 0) abort("%s\n", strerror(-r));
    4660
     
    5266        BENCH_DECL
    5367        unsigned flags = 0;
     68        int file_flags = 0;
    5469        unsigned sublen = 16;
    5570
     
    5873                static struct option options[] = {
    5974                        BENCH_OPT_LONG
    60                         {"bufsize",      required_argument, 0, 'b'},
    61                         {"userthread",   no_argument      , 0, 'u'},
    62                         {"submitthread", no_argument      , 0, 's'},
    63                         {"submitlength", required_argument, 0, 'l'},
     75                        {"bufsize",       required_argument, 0, 'b'},
     76                        {"userthread",    no_argument      , 0, 'u'},
     77                        {"submitthread",  no_argument      , 0, 's'},
     78                        {"eagersubmit",   no_argument      , 0, 'e'},
     79                        {"kpollsubmit",   no_argument      , 0, 'k'},
     80                        {"kpollcomplete", no_argument      , 0, 'i'},
     81                        {"submitlength",  required_argument, 0, 'l'},
    6482                        {0, 0, 0, 0}
    6583                };
    6684
    6785                int idx = 0;
    68                 int opt = getopt_long(argc, argv, BENCH_OPT_SHORT "b:usl:", options, &idx);
     86                int opt = getopt_long(argc, argv, BENCH_OPT_SHORT "b:usekil:", options, &idx);
    6987
    7088                const char * arg = optarg ? optarg : "";
     
    88106                                flags |= CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS;
    89107                                break;
     108                        case 'e':
     109                                flags |= CFA_CLUSTER_IO_EAGER_SUBMITS;
     110                                break;
     111                        case 'k':
     112                                flags |= CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS;
     113                                fixed_file = true;
     114                                break;
     115                        case 'i':
     116                                flags |= CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES;
     117                                file_flags |= O_DIRECT;
     118                                break;
    90119                        case 'l':
    91120                                sublen = strtoul(arg, &end, 10);
     
    103132                                fprintf( stderr, "  -u, --userthread         If set, cluster uses user-thread to poll I/O\n" );
    104133                                fprintf( stderr, "  -s, --submitthread       If set, cluster uses polling thread to submit I/O\n" );
     134                                fprintf( stderr, "  -e, --eagersubmit        If set, cluster submits I/O eagerly but still aggregates submits\n" );
     135                                fprintf( stderr, "  -k, --kpollsubmit        If set, cluster uses IORING_SETUP_SQPOLL\n" );
     136                                fprintf( stderr, "  -i, --kpollcomplete      If set, cluster uses IORING_SETUP_IOPOLL\n" );
     137                                fprintf( stderr, "  -l, --submitlength=LEN   Max number of submitions that can be submitted together\n" );
    105138                                exit(EXIT_FAILURE);
    106139                }
    107140        }
    108141
    109         fd = open(__FILE__, 0);
    110         if(fd < 0) {
     142        int lfd = open(__FILE__, file_flags);
     143        if(lfd < 0) {
    111144                fprintf(stderr, "Could not open source file\n");
    112145                exit(EXIT_FAILURE);
     
    118151                Time start, end;
    119152                BenchCluster cl = { flags, CFA_STATS_READY_Q | CFA_STATS_IO };
     153
     154                if(fixed_file) {
     155                        fd = 0;
     156                        register_fixed_files( cl.self, &lfd, 1 );
     157                }
     158                else {
     159                        fd = lfd;
     160                }
     161
    120162                {
    121163                        BenchProc procs[nprocs];
     
    145187        }
    146188
    147         close(fd);
     189        close(lfd);
    148190}
  • libcfa/src/Makefile.am

    rfc9bb79 rd34575b  
    4040if BUILDLIB
    4141headers_nosrc = bitmanip.hfa exception.hfa math.hfa gmp.hfa time_t.hfa clock.hfa \
    42                 bits/align.hfa bits/containers.hfa bits/defs.hfa bits/debug.hfa bits/locks.hfa containers/list.hfa containers/stackLockFree.hfa
     42                bits/align.hfa bits/containers.hfa bits/defs.hfa bits/debug.hfa bits/locks.hfa \
     43                containers/list.hfa containers/stackLockFree.hfa concurrency/iofwd.hfa
     44
    4345headers = common.hfa fstream.hfa heap.hfa iostream.hfa iterator.hfa limits.hfa rational.hfa \
    4446                time.hfa stdlib.hfa memory.hfa \
  • libcfa/src/Makefile.in

    rfc9bb79 rd34575b  
    246246        bits/align.hfa bits/containers.hfa bits/defs.hfa \
    247247        bits/debug.hfa bits/locks.hfa containers/list.hfa \
    248         containers/stackLockFree.hfa concurrency/coroutine.hfa \
    249         concurrency/thread.hfa concurrency/kernel.hfa \
    250         concurrency/monitor.hfa concurrency/mutex.hfa \
    251         concurrency/invoke.h
     248        containers/stackLockFree.hfa concurrency/iofwd.hfa \
     249        concurrency/coroutine.hfa concurrency/thread.hfa \
     250        concurrency/kernel.hfa concurrency/monitor.hfa \
     251        concurrency/mutex.hfa concurrency/invoke.h
    252252HEADERS = $(nobase_cfa_include_HEADERS)
    253253am__tagged_files = $(HEADERS) $(SOURCES) $(TAGS_FILES) $(LISP)
     
    470470#----------------------------------------------------------------------------------------------------------------
    471471@BUILDLIB_TRUE@headers_nosrc = bitmanip.hfa exception.hfa math.hfa gmp.hfa time_t.hfa clock.hfa \
    472 @BUILDLIB_TRUE@         bits/align.hfa bits/containers.hfa bits/defs.hfa bits/debug.hfa bits/locks.hfa containers/list.hfa containers/stackLockFree.hfa
     472@BUILDLIB_TRUE@         bits/align.hfa bits/containers.hfa bits/defs.hfa bits/debug.hfa bits/locks.hfa \
     473@BUILDLIB_TRUE@         containers/list.hfa containers/stackLockFree.hfa concurrency/iofwd.hfa
    473474
    474475@BUILDLIB_FALSE@headers =
  • libcfa/src/concurrency/io.cfa

    rfc9bb79 rd34575b  
    1414//
    1515
    16 // #define __CFA_DEBUG_PRINT_IO__
    17 // #define __CFA_DEBUG_PRINT_IO_CORE__
     16#if defined(__CFA_DEBUG__)
     17        // #define __CFA_DEBUG_PRINT_IO__
     18        #define __CFA_DEBUG_PRINT_IO_CORE__
     19#endif
    1820
    1921#include "kernel.hfa"
     
    109111                volatile uint32_t * head;
    110112                volatile uint32_t * tail;
     113                volatile uint32_t prev_head;
    111114
    112115                // The actual kernel ring which uses head/tail
     
    129132
    130133                __spinlock_t lock;
     134                __spinlock_t release_lock;
    131135
    132136                // A buffer of sqes (not the actual ring)
     
    182186//=============================================================================================
    183187        void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
     188                if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) {
     189                        abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n");
     190                }
     191
    184192                this.io = malloc();
    185193
     
    187195                struct io_uring_params params;
    188196                memset(&params, 0, sizeof(params));
     197                if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS   ) params.flags |= IORING_SETUP_SQPOLL;
     198                if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES ) params.flags |= IORING_SETUP_IOPOLL;
    189199
    190200                uint32_t nentries = entries_per_cluster();
     
    208218                        // adjust the size according to the parameters
    209219                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
    210                                 cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz);
     220                                cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz);
    211221                        }
    212222                #endif
     
    222232                        // mmap the Completion Queue into existence (may or may not be needed)
    223233                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
    224                                 cq->ring_ptr = sq->ring_ptr;
     234                                cq.ring_ptr = sq.ring_ptr;
    225235                        }
    226236                        else
     
    253263                sq.dropped = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
    254264                sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
     265                sq.prev_head = *sq.head;
    255266
    256267                {
     
    261272                }
    262273
    263                 if( io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
     274                (sq.lock){};
     275                (sq.release_lock){};
     276
     277                if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) {
    264278                        /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8)  );
    265279                        sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
     
    313327
    314328                // Create the poller thread
    315                 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this);
     329                __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this);
    316330                this.io->poller.slow.blocked = false;
    317331                this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
     
    418432        }
    419433
     434        int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) {
     435                bool need_sys_to_submit = false;
     436                bool need_sys_to_complete = false;
     437                unsigned min_complete = 0;
     438                unsigned flags = 0;
     439
     440
     441                TO_SUBMIT:
     442                if( to_submit > 0 ) {
     443                        if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
     444                                need_sys_to_submit = true;
     445                                break TO_SUBMIT;
     446                        }
     447                        if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
     448                                need_sys_to_submit = true;
     449                                flags |= IORING_ENTER_SQ_WAKEUP;
     450                        }
     451                }
     452
     453                TO_COMPLETE:
     454                if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
     455                        flags |= IORING_ENTER_GETEVENTS;
     456                        if( mask ) {
     457                                need_sys_to_complete = true;
     458                                min_complete = 1;
     459                                break TO_COMPLETE;
     460                        }
     461                        if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
     462                                need_sys_to_complete = true;
     463                        }
     464                }
     465
     466                int ret = 0;
     467                if( need_sys_to_submit || need_sys_to_complete ) {
     468                        ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);
     469                        if( ret < 0 ) {
     470                                switch((int)errno) {
     471                                case EAGAIN:
     472                                case EINTR:
     473                                        ret = -1;
     474                                        break;
     475                                default:
     476                                        abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
     477                                }
     478                        }
     479                }
     480
     481                // Memory barrier
     482                __atomic_thread_fence( __ATOMIC_SEQ_CST );
     483                return ret;
     484        }
     485
    420486//=============================================================================================
    421487// I/O Polling
    422488//=============================================================================================
     489        static unsigned __collect_submitions( struct __io_data & ring );
     490        static uint32_t __release_consumed_submission( struct __io_data & ring );
     491
    423492        // Process a single completion message from the io_uring
    424493        // This is NOT thread-safe
    425         static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) {
     494        static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) {
     495                /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
     496
    426497                unsigned to_submit = 0;
    427498                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
    428 
    429499                        // If the poller thread also submits, then we need to aggregate the submissions which are ready
    430                         uint32_t tail = *ring.submit_q.tail;
    431                         const uint32_t mask = *ring.submit_q.mask;
    432 
    433                         // Go through the list of ready submissions
    434                         for( i; ring.submit_q.ready_cnt ) {
    435                                 // replace any submission with the sentinel, to consume it.
    436                                 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
    437 
    438                                 // If it was already the sentinel, then we are done
    439                                 if( idx == -1ul32 ) continue;
    440 
    441                                 // If we got a real submission, append it to the list
    442                                 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
    443                                 to_submit++;
    444                         }
    445 
    446                         // Increment the tail based on how many we are ready to submit
    447                         __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
    448                 }
    449 
    450                 const uint32_t smask = *ring.submit_q.mask;
    451                 uint32_t shead = *ring.submit_q.head;
    452                 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
     500                        to_submit = __collect_submitions( ring );
     501                }
     502
     503                int ret = __io_uring_enter(ring, to_submit, true, mask);
    453504                if( ret < 0 ) {
    454                         switch((int)errno) {
    455                         case EAGAIN:
    456                         case EINTR:
    457                                 return -EAGAIN;
    458                         default:
    459                                 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
    460                         }
     505                        return [0, true];
     506                }
     507
     508                // update statistics
     509                if (to_submit > 0) {
     510                        __STATS__( true,
     511                                if( to_submit > 0 ) {
     512                                        io.submit_q.submit_avg.rdy += to_submit;
     513                                        io.submit_q.submit_avg.csm += ret;
     514                                        io.submit_q.submit_avg.cnt += 1;
     515                                }
     516                        )
    461517                }
    462518
    463519                // Release the consumed SQEs
    464                 for( i; ret ) {
    465                         uint32_t idx = ring.submit_q.array[ (i + shead) & smask ];
    466                         ring.submit_q.sqes[ idx ].user_data = 0;
    467                 }
    468 
    469                 uint32_t avail = 0;
    470                 uint32_t sqe_num = *ring.submit_q.num;
    471                 for(i; sqe_num) {
    472                         if( ring.submit_q.sqes[ i ].user_data == 0 ) avail++;
    473                 }
    474 
    475                 // update statistics
    476                 #if !defined(__CFA_NO_STATISTICS__)
    477                         __tls_stats()->io.submit_q.submit_avg.rdy += to_submit;
    478                         __tls_stats()->io.submit_q.submit_avg.csm += ret;
    479                         __tls_stats()->io.submit_q.submit_avg.avl += avail;
    480                         __tls_stats()->io.submit_q.submit_avg.cnt += 1;
    481                 #endif
     520                __release_consumed_submission( ring );
    482521
    483522                // Drain the queue
     
    486525                const uint32_t mask = *ring.completion_q.mask;
    487526
    488                 // Memory barrier
    489                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
    490 
    491527                // Nothing was new return 0
    492528                if (head == tail) {
    493                         return 0;
     529                        return [0, to_submit > 0];
    494530                }
    495531
    496532                uint32_t count = tail - head;
     533                /* paranoid */ verify( count != 0 );
    497534                for(i; count) {
    498535                        unsigned idx = (head + i) & mask;
     
    505542
    506543                        data->result = cqe.res;
    507                         if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
    508                         else         { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); }
     544                        if(!mask) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
     545                        else      { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); }
    509546                }
    510547
     
    554591                                int count;
    555592                                bool again;
    556                                 [count, again] = __drain_io( ring, &mask, 1, true );
     593                                [count, again] = __drain_io( ring, &mask );
    557594
    558595                                __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
    559596
    560597                                // Update statistics
    561                                 #if !defined(__CFA_NO_STATISTICS__)
    562                                         __tls_stats()->io.complete_q.completed_avg.val += count;
    563                                         __tls_stats()->io.complete_q.completed_avg.slow_cnt += 1;
    564                                 #endif
     598                                __STATS__( true,
     599                                        io.complete_q.completed_avg.val += count;
     600                                        io.complete_q.completed_avg.slow_cnt += 1;
     601                                )
    565602
    566603                                if(again) {
     
    576613                                int count;
    577614                                bool again;
    578                                 [count, again] = __drain_io( ring, &mask, 0, true );
     615                                [count, again] = __drain_io( ring, &mask );
    579616
    580617                                // Update statistics
    581                                 #if !defined(__CFA_NO_STATISTICS__)
    582                                         __tls_stats()->io.complete_q.completed_avg.val += count;
    583                                         __tls_stats()->io.complete_q.completed_avg.slow_cnt += 1;
    584                                 #endif
     618                                __STATS__( true,
     619                                        io.complete_q.completed_avg.val += count;
     620                                        io.complete_q.completed_avg.slow_cnt += 1;
     621                                )
    585622                        }
    586623                }
     
    614651                        bool again;
    615652                        disable_interrupts();
    616                                 [count, again] = __drain_io( *this.ring, 0p, 0, false );
     653                                [count, again] = __drain_io( *this.ring, 0p );
    617654
    618655                                if(!again) reset++;
    619656
    620657                                // Update statistics
    621                                 #if !defined(__CFA_NO_STATISTICS__)
    622                                         __tls_stats()->io.complete_q.completed_avg.val += count;
    623                                         __tls_stats()->io.complete_q.completed_avg.fast_cnt += 1;
    624                                 #endif
     658                                __STATS__( true,
     659                                        io.complete_q.completed_avg.val += count;
     660                                        io.complete_q.completed_avg.fast_cnt += 1;
     661                                )
    625662                        enable_interrupts( __cfaabi_dbg_ctx );
    626663
     
    658695
    659696// Submition steps :
    660 // 1 - We need to make sure we don't overflow any of the buffer, P(ring.submit) to make sure
    661 //     entries are available. The semaphore make sure that there is no more operations in
    662 //     progress then the number of entries in the buffer. This probably limits concurrency
    663 //     more than necessary since submitted but not completed operations don't need any
    664 //     entries in user space. However, I don't know what happens if we overflow the buffers
    665 //     because too many requests completed at once. This is a safe approach in all cases.
    666 //     Furthermore, with hundreds of entries, this may be okay.
    667 //
    668 // 2 - Allocate a queue entry. The ring already has memory for all entries but only the ones
     697// 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones
    669698//     listed in sq.array are visible by the kernel. For those not listed, the kernel does not
    670699//     offer any assurance that an entry is not being filled by multiple flags. Therefore, we
    671700//     need to write an allocator that allows allocating concurrently.
    672701//
    673 // 3 - Actually fill the submit entry, this is the only simple and straightforward step.
     702// 2 - Actually fill the submit entry, this is the only simple and straightforward step.
    674703//
    675 // 4 - Append the entry index to the array and adjust the tail accordingly. This operation
     704// 3 - Append the entry index to the array and adjust the tail accordingly. This operation
    676705//     needs to arrive to two concensus at the same time:
    677706//     A - The order in which entries are listed in the array: no two threads must pick the
     
    682711
    683712        [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) {
    684                 verify( data != 0 );
    685 
     713                /* paranoid */ verify( data != 0 );
    686714
    687715                // Prepare the data we need
     
    708736                                {
    709737                                        // update statistics
    710                                         #if !defined(__CFA_NO_STATISTICS__)
    711                                                 disable_interrupts();
    712                                                         __tls_stats()->io.submit_q.alloc_avg.val   += len;
    713                                                         __tls_stats()->io.submit_q.alloc_avg.block += block;
    714                                                         __tls_stats()->io.submit_q.alloc_avg.cnt   += 1;
    715                                                 enable_interrupts( __cfaabi_dbg_ctx );
    716                                         #endif
     738                                        __STATS__( false,
     739                                                io.submit_q.alloc_avg.val   += len;
     740                                                io.submit_q.alloc_avg.block += block;
     741                                                io.submit_q.alloc_avg.cnt   += 1;
     742                                        )
    717743
    718744
     
    757783
    758784                        block++;
    759                         yield();
     785                        if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
     786                                __release_consumed_submission( ring );
     787                                unlock( ring.submit_q.lock );
     788                        }
     789                        else {
     790                                yield();
     791                        }
    760792                }
    761793
    762794                // update statistics
    763                 #if !defined(__CFA_NO_STATISTICS__)
    764                 disable_interrupts();
    765                         __tls_stats()->io.submit_q.look_avg.val   += len;
    766                         __tls_stats()->io.submit_q.look_avg.block += block;
    767                         __tls_stats()->io.submit_q.look_avg.cnt   += 1;
    768                 enable_interrupts( __cfaabi_dbg_ctx );
    769                 #endif
     795                __STATS__( false,
     796                        io.submit_q.look_avg.val   += len;
     797                        io.submit_q.look_avg.block += block;
     798                        io.submit_q.look_avg.cnt   += 1;
     799                )
    770800
    771801                return picked;
     
    780810                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
    781811                        // If the poller thread submits, then we just need to add this to the ready array
    782 
    783812                        __submit_to_ready_array( ring, idx, mask );
    784813
     
    786815
    787816                        __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
     817                }
     818                else if( ring.cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS ) {
     819                        uint32_t picked = __submit_to_ready_array( ring, idx, mask );
     820
     821                        for() {
     822                                yield();
     823
     824                                // If some one else collected our index, we are done
     825                                #warning ABA problem
     826                                if( ring.submit_q.ready[picked] != idx ) {
     827                                        __STATS__( false,
     828                                                io.submit_q.helped += 1;
     829                                        )
     830                                        return;
     831                                }
     832
     833                                if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
     834                                        __STATS__( false,
     835                                                io.submit_q.leader += 1;
     836                                        )
     837                                        break;
     838                                }
     839
     840                                __STATS__( false,
     841                                        io.submit_q.busy += 1;
     842                                )
     843                        }
     844
     845                        // We got the lock
     846                        unsigned to_submit = __collect_submitions( ring );
     847                        int ret = __io_uring_enter( ring, to_submit, false, 0p );
     848                        if( ret < 0 ) {
     849                                unlock(ring.submit_q.lock);
     850                                return;
     851                        }
     852
     853                        /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) );
     854
     855                        // Release the consumed SQEs
     856                        __release_consumed_submission( ring );
     857
     858                        // update statistics
     859                        __STATS__( true,
     860                                io.submit_q.submit_avg.rdy += to_submit;
     861                                io.submit_q.submit_avg.csm += ret;
     862                                io.submit_q.submit_avg.cnt += 1;
     863                        )
     864
     865                        unlock(ring.submit_q.lock);
    788866                }
    789867                else {
     
    791869                        lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
    792870
     871                        /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
     872                        /* paranoid */  "index %u already reclaimed\n"
     873                        /* paranoid */  "head %u, prev %u, tail %u\n"
     874                        /* paranoid */  "[-0: %u,-1: %u,-2: %u,-3: %u]\n",
     875                        /* paranoid */  idx,
     876                        /* paranoid */  *ring.submit_q.head, ring.submit_q.prev_head, *tail
     877                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
     878                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
     879                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
     880                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
     881                        /* paranoid */ );
     882
    793883                        // Append to the list of ready entries
    794884
    795885                        /* paranoid */ verify( idx <= mask );
    796 
    797                         ring.submit_q.array[ (*tail) & mask ] = idx & mask;
     886                        ring.submit_q.array[ (*tail) & mask ] = idx;
    798887                        __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
    799888
    800889                        // Submit however, many entries need to be submitted
    801                         int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
     890                        int ret = __io_uring_enter( ring, 1, false, 0p );
    802891                        if( ret < 0 ) {
    803892                                switch((int)errno) {
     
    808897
    809898                        // update statistics
    810                         #if !defined(__CFA_NO_STATISTICS__)
    811                                 __tls_stats()->io.submit_q.submit_avg.csm += 1;
    812                                 __tls_stats()->io.submit_q.submit_avg.cnt += 1;
    813                         #endif
    814 
    815                         ring.submit_q.sqes[ idx & mask ].user_data = 0;
     899                        __STATS__( false,
     900                                io.submit_q.submit_avg.csm += 1;
     901                                io.submit_q.submit_avg.cnt += 1;
     902                        )
     903
     904                        // Release the consumed SQEs
     905                        __release_consumed_submission( ring );
    816906
    817907                        unlock(ring.submit_q.lock);
     
    820910                }
    821911        }
     912
     913        static unsigned __collect_submitions( struct __io_data & ring ) {
     914                /* paranoid */ verify( ring.submit_q.ready != 0p );
     915                /* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
     916
     917                unsigned to_submit = 0;
     918                uint32_t tail = *ring.submit_q.tail;
     919                const uint32_t mask = *ring.submit_q.mask;
     920
     921                // Go through the list of ready submissions
     922                for( i; ring.submit_q.ready_cnt ) {
     923                        // replace any submission with the sentinel, to consume it.
     924                        uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
     925
     926                        // If it was already the sentinel, then we are done
     927                        if( idx == -1ul32 ) continue;
     928
     929                        // If we got a real submission, append it to the list
     930                        ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
     931                        to_submit++;
     932                }
     933
     934                // Increment the tail based on how many we are ready to submit
     935                __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
     936
     937                return to_submit;
     938        }
     939
     940        static uint32_t __release_consumed_submission( struct __io_data & ring ) {
     941                const uint32_t smask = *ring.submit_q.mask;
     942
     943                if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
     944                uint32_t chead = *ring.submit_q.head;
     945                uint32_t phead = ring.submit_q.prev_head;
     946                ring.submit_q.prev_head = chead;
     947                unlock(ring.submit_q.release_lock);
     948
     949                uint32_t count = chead - phead;
     950                for( i; count ) {
     951                        uint32_t idx = ring.submit_q.array[ (phead + i) & smask ];
     952                        ring.submit_q.sqes[ idx ].user_data = 0;
     953                }
     954                return count;
     955        }
     956
     957//=============================================================================================
     958// I/O Submissions
     959//=============================================================================================
     960
     961        void register_fixed_files( cluster & cl, int * files, unsigned count ) {
     962                int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );
     963                if( ret < 0 ) {
     964                        abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
     965                }
     966
     967                __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
     968        }
    822969#endif
  • libcfa/src/concurrency/iocall.cfa

    rfc9bb79 rd34575b  
    108108
    109109        extern ssize_t read (int fd, void *buf, size_t count);
     110
     111        extern ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags);
     112        extern ssize_t tee(int fd_in, int fd_out, size_t len, unsigned int flags);
    110113}
    111114
     
    128131                #endif
    129132        }
     133
     134        ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
     135                #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READV)
     136                        return preadv2(fd, iov, iovcnt, offset, flags);
     137                #else
     138                        __submit_prelude
     139
     140                        (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset };
     141                        sqe->flags |= IOSQE_FIXED_FILE;
     142
     143                        __submit_wait
     144                #endif
     145        }
    130146#endif
    131147
     
    329345}
    330346
    331 
    332347ssize_t cfa_read(int fd, void *buf, size_t count) {
    333348        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READ)
     
    349364
    350365                (*sqe){ IORING_OP_WRITE, fd, buf, count, 0 };
     366
     367                __submit_wait
     368        #endif
     369}
     370
     371ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags) {
     372        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SPLICE)
     373                return splice( fd_in, off_in, fd_out, off_out, len, flags );
     374        #else
     375                __submit_prelude
     376
     377                (*sqe){ IORING_OP_SPLICE, fd_out, 0p, len, off_out };
     378                sqe->splice_fd_in  = fd_in;
     379                sqe->splice_off_in = off_in;
     380                sqe->splice_flags  = flags;
     381
     382                __submit_wait
     383        #endif
     384}
     385
     386ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags) {
     387        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_TEE)
     388                return tee( fd_in, fd_out, len, flags );
     389        #else
     390                __submit_prelude
     391
     392                (*sqe){ IORING_OP_TEE, fd_out, 0p, len, 0 };
     393                sqe->splice_fd_in = fd_in;
     394                sqe->splice_flags = flags;
    351395
    352396                __submit_wait
     
    453497                        #define _CFA_IO_FEATURE_IORING_OP_WRITE ,
    454498                        return IS_DEFINED(IORING_OP_WRITE);
     499
     500                if( /*func == (fptr_t)splice || */
     501                        func == (fptr_t)cfa_splice )
     502                        #define _CFA_IO_FEATURE_IORING_OP_SPLICE ,
     503                        return IS_DEFINED(IORING_OP_SPLICE);
     504
     505                if( /*func == (fptr_t)tee || */
     506                        func == (fptr_t)cfa_tee )
     507                        #define _CFA_IO_FEATURE_IORING_OP_TEE ,
     508                        return IS_DEFINED(IORING_OP_TEE);
    455509        #endif
    456510
  • libcfa/src/concurrency/kernel.hfa

    rfc9bb79 rd34575b  
    129129struct __io_data;
    130130
    131 #define CFA_CLUSTER_IO_POLLER_USER_THREAD    1 << 0 // 0x1
    132 #define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS 1 << 1 // 0x2
    133 // #define CFA_CLUSTER_IO_POLLER_KERNEL_SIDE 1 << 2 // 0x4
     131#define CFA_CLUSTER_IO_POLLER_USER_THREAD    (1 << 0) // 0x01
     132#define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS (1 << 1) // 0x02
     133#define CFA_CLUSTER_IO_EAGER_SUBMITS         (1 << 2) // 0x04
     134#define CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS   (1 << 3) // 0x08
     135#define CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES (1 << 4) // 0x10
    134136#define CFA_CLUSTER_IO_BUFFLEN_OFFSET        16
    135137
  • libcfa/src/concurrency/kernel_private.hfa

    rfc9bb79 rd34575b  
    286286// Statics call at the end of each thread to register statistics
    287287#if !defined(__CFA_NO_STATISTICS__)
    288 static inline struct __stats_t * __tls_stats() {
    289         /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    290         /* paranoid */ verify( kernelTLS.this_stats );
    291         return kernelTLS.this_stats;
    292 }
     288        static inline struct __stats_t * __tls_stats() {
     289                /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
     290                /* paranoid */ verify( kernelTLS.this_stats );
     291                return kernelTLS.this_stats;
     292        }
     293
     294        #define __STATS__(in_kernel, ...) { \
     295                if( !(in_kernel) ) disable_interrupts(); \
     296                with( *__tls_stats() ) { \
     297                        __VA_ARGS__ \
     298                } \
     299                if( !(in_kernel) ) enable_interrupts( __cfaabi_dbg_ctx ); \
     300        }
     301#else
     302        #define __STATS__(in_kernel, ...)
    293303#endif
    294304
  • libcfa/src/concurrency/preemption.cfa

    rfc9bb79 rd34575b  
    186186        void enable_interrupts( __cfaabi_dbg_ctx_param ) {
    187187                processor   * proc = kernelTLS.this_processor; // Cache the processor now since interrupts can start happening after the atomic store
     188                /* paranoid */ verify( proc );
    188189
    189190                with( kernelTLS.preemption_state ){
  • libcfa/src/concurrency/stats.cfa

    rfc9bb79 rd34575b  
    2727                        stats->io.submit_q.submit_avg.rdy = 0;
    2828                        stats->io.submit_q.submit_avg.csm = 0;
    29                         stats->io.submit_q.submit_avg.avl = 0;
    3029                        stats->io.submit_q.submit_avg.cnt = 0;
    3130                        stats->io.submit_q.look_avg.val   = 0;
     
    3534                        stats->io.submit_q.alloc_avg.cnt   = 0;
    3635                        stats->io.submit_q.alloc_avg.block = 0;
     36                        stats->io.submit_q.helped = 0;
     37                        stats->io.submit_q.leader = 0;
     38                        stats->io.submit_q.busy   = 0;
    3739                        stats->io.complete_q.completed_avg.val = 0;
    3840                        stats->io.complete_q.completed_avg.slow_cnt = 0;
     
    6870                        __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.cnt           , proc->io.submit_q.alloc_avg.cnt           , __ATOMIC_SEQ_CST );
    6971                        __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.block         , proc->io.submit_q.alloc_avg.block         , __ATOMIC_SEQ_CST );
     72                        __atomic_fetch_add( &cltr->io.submit_q.helped                  , proc->io.submit_q.helped                  , __ATOMIC_SEQ_CST );
     73                        __atomic_fetch_add( &cltr->io.submit_q.leader                  , proc->io.submit_q.leader                  , __ATOMIC_SEQ_CST );
     74                        __atomic_fetch_add( &cltr->io.submit_q.busy                    , proc->io.submit_q.busy                    , __ATOMIC_SEQ_CST );
    7075                        __atomic_fetch_add( &cltr->io.complete_q.completed_avg.val     , proc->io.complete_q.completed_avg.val     , __ATOMIC_SEQ_CST );
    7176                        __atomic_fetch_add( &cltr->io.complete_q.completed_avg.slow_cnt, proc->io.complete_q.completed_avg.slow_cnt, __ATOMIC_SEQ_CST );
     
    120125                                double avgrdy = ((double)io.submit_q.submit_avg.rdy) / io.submit_q.submit_avg.cnt;
    121126                                double avgcsm = ((double)io.submit_q.submit_avg.csm) / io.submit_q.submit_avg.cnt;
    122                                 double avgavl = ((double)io.submit_q.submit_avg.avl) / io.submit_q.submit_avg.cnt;
    123127
    124128                                double lavgv = 0;
     
    141145                                        "- avg ready entries      : %'18.2lf\n"
    142146                                        "- avg submitted entries  : %'18.2lf\n"
    143                                         "- avg available entries  : %'18.2lf\n"
     147                                        "- total helped entries   : %'15" PRIu64 "\n"
     148                                        "- total leader entries   : %'15" PRIu64 "\n"
     149                                        "- total busy submit      : %'15" PRIu64 "\n"
    144150                                        "- total ready search     : %'15" PRIu64 "\n"
    145151                                        "- avg ready search len   : %'18.2lf\n"
     
    153159                                        , cluster ? "Cluster" : "Processor",  name, id
    154160                                        , io.submit_q.submit_avg.cnt
    155                                         , avgrdy, avgcsm, avgavl
     161                                        , avgrdy, avgcsm
     162                                        , io.submit_q.helped, io.submit_q.leader, io.submit_q.busy
    156163                                        , io.submit_q.look_avg.cnt
    157164                                        , lavgv, lavgb
  • libcfa/src/concurrency/stats.hfa

    rfc9bb79 rd34575b  
    8383                                        volatile uint64_t block;
    8484                                } alloc_avg;
     85                                volatile uint64_t helped;
     86                                volatile uint64_t leader;
     87                                volatile uint64_t busy;
    8588                        } submit_q;
    8689                        struct {
  • src/Makefile.in

    rfc9bb79 rd34575b  
    626626
    627627BUILT_SOURCES = Parser/parser.hh
    628 AM_YFLAGS = -d -t -v
     628AM_YFLAGS = -d -t -v -Wno-yacc
    629629SRC_RESOLVEXPR = \
    630630      ResolvExpr/AdjustExprType.cc \
  • src/Parser/module.mk

    rfc9bb79 rd34575b  
    1717BUILT_SOURCES = Parser/parser.hh
    1818
    19 AM_YFLAGS = -d -t -v
     19AM_YFLAGS = -d -t -v -Wno-yacc
    2020
    2121SRC += \
  • tests/pybin/test_run.py

    rfc9bb79 rd34575b  
    6969                        else :                                          text = "FAILED with code %d" % retcode
    7070
    71                 text += "    C%s - R%s" % (cls.fmtDur(duration[0]), cls.fmtDur(duration[1]))
     71                text += "    C%s - R%s" % (fmtDur(duration[0]), fmtDur(duration[1]))
    7272                return text
    73 
    74         @staticmethod
    75         def fmtDur( duration ):
    76                 if duration :
    77                         hours, rem = divmod(duration, 3600)
    78                         minutes, rem = divmod(rem, 60)
    79                         seconds, millis = divmod(rem, 1)
    80                         return "%2d:%02d.%03d" % (minutes, seconds, millis * 1000)
    81                 return " n/a"
  • tests/pybin/tools.py

    rfc9bb79 rd34575b  
    387387                while True:
    388388                        yield i.next(max(expire - time.time(), 0))
     389
     390def fmtDur( duration ):
     391        if duration :
     392                hours, rem = divmod(duration, 3600)
     393                minutes, rem = divmod(rem, 60)
     394                seconds, millis = divmod(rem, 1)
     395                return "%2d:%02d.%03d" % (minutes, seconds, millis * 1000)
     396        return " n/a"
  • tests/test.py

    rfc9bb79 rd34575b  
    361361
    362362                # for each build configurations, run the test
    363                 for arch, debug, install in itertools.product(settings.all_arch, settings.all_debug, settings.all_install):
    364                         settings.arch    = arch
    365                         settings.debug   = debug
    366                         settings.install = install
    367 
    368                         # filter out the tests for a different architecture
    369                         # tests are the same across debug/install
    370                         local_tests = settings.arch.filter( tests )
    371                         options.jobs, forceJobs = job_count( options, local_tests )
    372                         settings.update_make_cmd(forceJobs, options.jobs)
    373 
    374                         # check the build configuration works
    375                         settings.validate()
    376 
    377                         # print configuration
    378                         print('%s %i tests on %i cores (%s:%s)' % (
    379                                 'Regenerating' if settings.generating else 'Running',
    380                                 len(local_tests),
    381                                 options.jobs,
    382                                 settings.arch.string,
    383                                 settings.debug.string
    384                         ))
    385 
    386                         # otherwise run all tests and make sure to return the correct error code
    387                         failed = run_tests(local_tests, options.jobs)
    388                         if failed:
    389                                 result = 1
    390                                 if not settings.continue_:
    391                                         break
    392 
    393 
     363                with Timed() as total_dur:
     364                        for arch, debug, install in itertools.product(settings.all_arch, settings.all_debug, settings.all_install):
     365                                settings.arch    = arch
     366                                settings.debug   = debug
     367                                settings.install = install
     368
     369                                # filter out the tests for a different architecture
     370                                # tests are the same across debug/install
     371                                local_tests = settings.arch.filter( tests )
     372                                options.jobs, forceJobs = job_count( options, local_tests )
     373                                settings.update_make_cmd(forceJobs, options.jobs)
     374
     375                                # check the build configuration works
     376                                settings.validate()
     377
     378                                # print configuration
     379                                print('%s %i tests on %i cores (%s:%s)' % (
     380                                        'Regenerating' if settings.generating else 'Running',
     381                                        len(local_tests),
     382                                        options.jobs,
     383                                        settings.arch.string,
     384                                        settings.debug.string
     385                                ))
     386
     387                                # otherwise run all tests and make sure to return the correct error code
     388                                failed = run_tests(local_tests, options.jobs)
     389                                if failed:
     390                                        result = 1
     391                                        if not settings.continue_:
     392                                                break
     393
     394                print('Tests took %s' % fmtDur( total_dur.duration ))
    394395                sys.exit( failed )
Note: See TracChangeset for help on using the changeset viewer.