Changes in / [dfc13bb:14d8a9b]


Ignore:
Location:
benchmark/io/http
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/filecache.cfa

    rdfc13bb r14d8a9b  
    7373        cache_line * entries;
    7474        size_t size;
     75        int * rawfds;
     76        int nfds;
    7577} file_cache;
    7678
     
    98100}
    99101
    100 int put_file( cache_line & entry ) {
     102int put_file( cache_line & entry, int fd ) {
    101103        uint32_t idx = murmur3_32( (const uint8_t *)entry.file, strlen(entry.file), options.file_cache.hash_seed ) % file_cache.size;
    102104
     
    108110
    109111        file_cache.entries[idx] = entry;
     112        file_cache.entries[idx].fd = fd;
    110113        return i > 0 ? 1 : 0;
    111114}
     
    121124        size_t fcount = 0;
    122125        size_t fsize = 16;
    123         cache_line * raw = 0p;
    124         raw = alloc(raw, fsize, true);
     126        cache_line * raw = alloc(fsize);
    125127        // Step 1 get a dense array of all files
    126128        int walk(const char *fpath, const struct stat *sb, int typeflag) {
     
    131133                if(fcount > fsize) {
    132134                        fsize *= 2;
    133                         raw = alloc(raw, fsize, true);
     135                        raw = alloc(fsize, raw`realloc);
    134136                }
    135137
     
    162164        file_cache.entries = anew(file_cache.size);
    163165
     166        if(options.file_cache.fixed_fds) {
     167                file_cache.nfds   = fcount;
     168                file_cache.rawfds = alloc(fcount);
     169        }
     170
    164171        // Step 3 fill the cache
    165172        int conflicts = 0;
    166173        for(i; fcount) {
    167                 conflicts += put_file( raw[i] );
     174                int fd;
     175                if(options.file_cache.fixed_fds) {
     176                        file_cache.rawfds[i] = raw[i].fd;
     177                        fd = i;
     178                }
     179                else {
     180                        fd = raw[i].fd;
     181                }
     182                conflicts += put_file( raw[i], fd );
    168183        }
    169184        printf("Filled cache from path \"%s\" with %zu files\n", path, fcount);
     
    197212        }
    198213
    199         return [aalloc(extra), 0];
     214        size_t s = file_cache.nfds + extra;
     215        int * data = alloc(s, file_cache.rawfds`realloc);
     216        return [data, file_cache.nfds];
    200217}
    201218
  • benchmark/io/http/main.cfa

    rdfc13bb r14d8a9b  
    1212#include <kernel.hfa>
    1313#include <stats.hfa>
     14#include <time.hfa>
    1415#include <thread.hfa>
    1516
    16 #include "channel.hfa"
    1717#include "filecache.hfa"
    1818#include "options.hfa"
    1919#include "worker.hfa"
    2020
     21extern void register_fixed_files( cluster &, int *, unsigned count );
     22
     23Duration default_preemption() {
     24        return 0;
     25}
     26
    2127//=============================================================================================
    2228// Globals
    2329//=============================================================================================
    24 channel & wait_connect;
    25 
    2630struct ServerProc {
    2731        processor self;
     
    8488        // Run Server Cluster
    8589        {
    86                 cluster cl = { "Server Cluster", options.clopts.flags };
     90                cluster cl = { "Server Cluster", options.clopts.params };
    8791                #if !defined(__CFA_NO_STATISTICS__)
    8892                        print_stats_at_exit( cl, CFA_STATS_READY_Q | CFA_STATS_IO );
    8993                #endif
    9094                options.clopts.instance = &cl;
    91 
    92                 channel chan = { options.clopts.chan_size };
    93                 &wait_connect = &chan;
    9495
    9596                int pipe_cnt = options.clopts.nworkers * 2;
     
    102103                }
    103104
     105                if(options.file_cache.fixed_fds) {
     106                        register_fixed_files(cl, fds, pipe_off);
     107                }
     108
    104109                {
    105110                        ServerProc procs[options.clopts.nprocs];
     
    107112                                Worker workers[options.clopts.nworkers];
    108113                                for(i; options.clopts.nworkers) {
    109                                         if( options.file_cache.fixed_fds ) {
    110                                                 workers[i].pipe[0] = pipe_off + (i * 2) + 0;
    111                                                 workers[i].pipe[1] = pipe_off + (i * 2) + 1;
    112                                         }
    113                                         else {
     114                                        // if( options.file_cache.fixed_fds ) {
     115                                        //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
     116                                        //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
     117                                        // }
     118                                        // else
     119                                        {
    114120                                                workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0];
    115121                                                workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1];
     122                                                workers[i].sockfd  = server_fd;
     123                                                workers[i].addr    = (struct sockaddr *)&address;
     124                                                workers[i].addrlen = (socklen_t*)&addrlen;
     125                                                workers[i].flags   = 0;
    116126                                        }
    117127                                        unpark( workers[i] __cfaabi_dbg_ctx2 );
     
    119129                                printf("%d workers started on %d processors\n", options.clopts.nworkers, options.clopts.nprocs);
    120130                                {
    121                                         Acceptor acceptor = { server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen, 0 };
    122 
    123131                                        char buffer[128];
    124132                                        while(!feof(stdin)) {
     
    127135
    128136                                        printf("Shutting Down\n");
    129                                 }
    130                                 printf("Acceptor Closed\n");
    131 
    132                                 // Clean-up the workers
    133                                 for(options.clopts.nworkers) {
    134                                         put( wait_connect, -1 );
    135137                                }
    136138                        }
  • benchmark/io/http/options.cfa

    rdfc13bb r14d8a9b  
    3131                1,     // nworkers;
    3232                0,     // flags;
    33                 10,    // chan_size;
    3433                false, // procstats
    3534                false, // viewhalts
     
    3938
    4039const char * parse_options( int argc, char * argv[] ) {
    41         bool uthrdpo = false;
    4240        bool subthrd = false;
    4341        bool eagrsub = false;
     
    5250                {'t', "threads",        "Number of worker threads to use", options.clopts.nworkers},
    5351                {'b', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog},
    54                 {'B', "channel-size",   "Maximum number of accepted connection pending", options.clopts.chan_size},
    5552                {'r', "request_len",    "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen},
    5653                {'S', "seed",           "seed to use for hashing", options.file_cache.hash_seed },
    5754                {'C', "cache-size",     "Size of the cache to use, if set to small, will uses closes power of 2", options.file_cache.size },
    5855                {'l', "list-files",     "List the files in the specified path and exit", options.file_cache.list, parse_settrue },
    59                 {'u', "userthread",     "If set, cluster uses user-thread to poll I/O", uthrdpo, parse_settrue },
    6056                {'s', "submitthread",   "If set, cluster uses polling thread to submit I/O", subthrd, parse_settrue },
    6157                {'e', "eagersubmit",    "If set, cluster submits I/O eagerly but still aggregates submits", eagrsub, parse_settrue},
     
    7167        parse_args( argc, argv, opt, opt_cnt, "[OPTIONS]... [PATH]\ncforall http server", left );
    7268
    73         if( uthrdpo ) {
    74                 options.clopts.flags |= CFA_CLUSTER_IO_POLLER_USER_THREAD;
    75         }
    76 
    77         if( subthrd ) {
    78                 options.clopts.flags |= CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS;
    79         }
    80 
    81         if( eagrsub ) {
    82                 options.clopts.flags |= CFA_CLUSTER_IO_EAGER_SUBMITS;
    83         }
     69        options.clopts.params.poller_submits = subthrd;
     70        options.clopts.params.eager_submits  = eagrsub;
    8471
    8572        if( fixedfd ) {
     
    8875
    8976        if( sqkpoll ) {
    90                 options.clopts.flags |= CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS;
     77                options.clopts.params.poll_submit = true;
    9178                options.file_cache.fixed_fds = true;
    9279        }
    9380
    9481        if( iokpoll ) {
    95                 options.clopts.flags |= CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES;
     82                options.clopts.params.poll_complete = true;
    9683                options.file_cache.open_flags |= O_DIRECT;
    9784        }
    9885
    99         options.clopts.flags |= (sublen << CFA_CLUSTER_IO_BUFFLEN_OFFSET);
     86        options.clopts.params.num_ready = sublen;
    10087
    10188        if( left[0] == 0p ) { return "."; }
  • benchmark/io/http/options.hfa

    rdfc13bb r14d8a9b  
    22
    33#include <stdint.h>
     4
     5#include <kernel.hfa>
    46
    57struct cluster;
     
    2325                int nprocs;
    2426                int nworkers;
    25                 int flags;
    26                 int chan_size;
     27                io_context_params params;
    2728                bool procstats;
    2829                bool viewhalts;
  • benchmark/io/http/protocol.cfa

    rdfc13bb r14d8a9b  
    1111extern "C" {
    1212      int snprintf ( char * s, size_t n, const char * format, ... );
     13        #include <linux/io_uring.h>
    1314}
    1415#include <string.h>
    15 
    1616#include <errno.h>
    1717
     18#include "options.hfa"
    1819
    1920const char * http_msgs[] = {
     
    7475        READ:
    7576        for() {
    76                 int ret = cfa_read(fd, it, count);
    77                 if(ret == 0 ) return [OK200, true, 0p, 0];
     77                int ret = cfa_read(fd, (void*)it, count, 0, -1`s, 0p, 0p);
     78                if(ret == 0 ) return [OK200, true, 0, 0];
    7879                if(ret < 0 ) {
    7980                        if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ;
     
    8889                count -= ret;
    8990
    90                 if( count < 1 ) return [E414, false, 0p, 0];
     91                if( count < 1 ) return [E414, false, 0, 0];
    9192        }
    9293
     
    9596        it = buffer;
    9697        int ret = memcmp(it, "GET /", 5);
    97         if( ret != 0 ) return [E400, false, 0p, 0];
     98        if( ret != 0 ) return [E400, false, 0, 0];
    9899        it += 5;
    99100
     
    106107        ssize_t ret;
    107108        SPLICE1: while(count > 0) {
    108                 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, SPLICE_F_MOVE | SPLICE_F_MORE);
     109                ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, SPLICE_F_MOVE | SPLICE_F_MORE, 0, -1`s, 0p, 0p);
    109110                if( ret < 0 ) {
    110111                        if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1;
     
    116117                size_t in_pipe = ret;
    117118                SPLICE2: while(in_pipe > 0) {
    118                         ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, SPLICE_F_MOVE | SPLICE_F_MORE);
     119                        ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, SPLICE_F_MOVE | SPLICE_F_MORE, 0, -1`s, 0p, 0p);
    119120                        if( ret < 0 ) {
    120121                                if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2;
  • benchmark/io/http/worker.cfa

    rdfc13bb r14d8a9b  
    2828        CONNECTION:
    2929        for() {
    30                 int fd = take(wait_connect);
    31                 if (fd < 0) break;
     30                int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, 0p, 0p );
     31                if(fd < 0) {
     32                        if( errno == ECONNABORTED ) break;
     33                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
     34                }
    3235
    3336                printf("New connection %d, waiting for requests\n", fd);
     
    8285        }
    8386}
    84 
    85 //=============================================================================================
    86 // Acceptor Thread
    87 //=============================================================================================
    88 void ?{}( Acceptor & this, int sockfd, struct sockaddr * addr, socklen_t * addrlen, int flags ) {
    89         ((thread&)this){ "Acceptor Thread", *options.clopts.instance };
    90         this.sockfd  = sockfd;
    91         this.addr    = addr;
    92         this.addrlen = addrlen;
    93         this.flags   = flags;
    94 }
    95 
    96 void main( Acceptor & this ) {
    97         for() {
    98                 int ret = cfa_accept4( this.[sockfd, addr, addrlen, flags] );
    99                 if(ret < 0) {
    100                         if( errno == ECONNABORTED ) break;
    101                         abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
    102                 }
    103 
    104                 printf("New connection accepted\n");
    105                 put( wait_connect, ret );
    106         }
    107 }
  • benchmark/io/http/worker.hfa

    rdfc13bb r14d8a9b  
    77}
    88
    9 #include "channel.hfa"
    10 
    11 extern channel & wait_connect;
    12 
    139//=============================================================================================
    1410// Worker Thread
     
    1713thread Worker {
    1814        int pipe[2];
    19 };
    20 void ?{}( Worker & this );
    21 void main( Worker & );
    22 
    23 //=============================================================================================
    24 // Acceptor Thread
    25 //=============================================================================================
    26 thread Acceptor {
    2715        int sockfd;
    2816        struct sockaddr * addr;
     
    3018        int flags;
    3119};
    32 
    33 void ?{}( Acceptor & this, int sockfd, struct sockaddr * addr, socklen_t * addrlen, int flags );
    34 void main( Acceptor & this );
     20void ?{}( Worker & this);
     21void main( Worker & );
Note: See TracChangeset for help on using the changeset viewer.