Changes in / [14d8a9b:dfc13bb]


Ignore:
Location:
benchmark/io/http
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/filecache.cfa

    r14d8a9b rdfc13bb  
    7373        cache_line * entries;
    7474        size_t size;
    75         int * rawfds;
    76         int nfds;
    7775} file_cache;
    7876
     
    10098}
    10199
    102 int put_file( cache_line & entry, int fd ) {
     100int put_file( cache_line & entry ) {
    103101        uint32_t idx = murmur3_32( (const uint8_t *)entry.file, strlen(entry.file), options.file_cache.hash_seed ) % file_cache.size;
    104102
     
    110108
    111109        file_cache.entries[idx] = entry;
    112         file_cache.entries[idx].fd = fd;
    113110        return i > 0 ? 1 : 0;
    114111}
     
    124121        size_t fcount = 0;
    125122        size_t fsize = 16;
    126         cache_line * raw = alloc(fsize);
     123        cache_line * raw = 0p;
     124        raw = alloc(raw, fsize, true);
    127125        // Step 1 get a dense array of all files
    128126        int walk(const char *fpath, const struct stat *sb, int typeflag) {
     
    133131                if(fcount > fsize) {
    134132                        fsize *= 2;
    135                         raw = alloc(fsize, raw`realloc);
     133                        raw = alloc(raw, fsize, true);
    136134                }
    137135
     
    164162        file_cache.entries = anew(file_cache.size);
    165163
    166         if(options.file_cache.fixed_fds) {
    167                 file_cache.nfds   = fcount;
    168                 file_cache.rawfds = alloc(fcount);
    169         }
    170 
    171164        // Step 3 fill the cache
    172165        int conflicts = 0;
    173166        for(i; fcount) {
    174                 int fd;
    175                 if(options.file_cache.fixed_fds) {
    176                         file_cache.rawfds[i] = raw[i].fd;
    177                         fd = i;
    178                 }
    179                 else {
    180                         fd = raw[i].fd;
    181                 }
    182                 conflicts += put_file( raw[i], fd );
     167                conflicts += put_file( raw[i] );
    183168        }
    184169        printf("Filled cache from path \"%s\" with %zu files\n", path, fcount);
     
    212197        }
    213198
    214         size_t s = file_cache.nfds + extra;
    215         int * data = alloc(s, file_cache.rawfds`realloc);
    216         return [data, file_cache.nfds];
     199        return [aalloc(extra), 0];
    217200}
    218201
  • benchmark/io/http/main.cfa

    r14d8a9b rdfc13bb  
    1212#include <kernel.hfa>
    1313#include <stats.hfa>
    14 #include <time.hfa>
    1514#include <thread.hfa>
    1615
     16#include "channel.hfa"
    1717#include "filecache.hfa"
    1818#include "options.hfa"
    1919#include "worker.hfa"
    2020
    21 extern void register_fixed_files( cluster &, int *, unsigned count );
    22 
    23 Duration default_preemption() {
    24         return 0;
    25 }
    26 
    2721//=============================================================================================
    2822// Globals
    2923//=============================================================================================
     24channel & wait_connect;
     25
    3026struct ServerProc {
    3127        processor self;
     
    8884        // Run Server Cluster
    8985        {
    90                 cluster cl = { "Server Cluster", options.clopts.params };
     86                cluster cl = { "Server Cluster", options.clopts.flags };
    9187                #if !defined(__CFA_NO_STATISTICS__)
    9288                        print_stats_at_exit( cl, CFA_STATS_READY_Q | CFA_STATS_IO );
    9389                #endif
    9490                options.clopts.instance = &cl;
     91
     92                channel chan = { options.clopts.chan_size };
     93                &wait_connect = &chan;
    9594
    9695                int pipe_cnt = options.clopts.nworkers * 2;
     
    103102                }
    104103
    105                 if(options.file_cache.fixed_fds) {
    106                         register_fixed_files(cl, fds, pipe_off);
    107                 }
    108 
    109104                {
    110105                        ServerProc procs[options.clopts.nprocs];
     
    112107                                Worker workers[options.clopts.nworkers];
    113108                                for(i; options.clopts.nworkers) {
    114                                         // if( options.file_cache.fixed_fds ) {
    115                                         //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
    116                                         //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
    117                                         // }
    118                                         // else
    119                                         {
     109                                        if( options.file_cache.fixed_fds ) {
     110                                                workers[i].pipe[0] = pipe_off + (i * 2) + 0;
     111                                                workers[i].pipe[1] = pipe_off + (i * 2) + 1;
     112                                        }
     113                                        else {
    120114                                                workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0];
    121115                                                workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1];
    122                                                 workers[i].sockfd  = server_fd;
    123                                                 workers[i].addr    = (struct sockaddr *)&address;
    124                                                 workers[i].addrlen = (socklen_t*)&addrlen;
    125                                                 workers[i].flags   = 0;
    126116                                        }
    127117                                        unpark( workers[i] __cfaabi_dbg_ctx2 );
     
    129119                                printf("%d workers started on %d processors\n", options.clopts.nworkers, options.clopts.nprocs);
    130120                                {
     121                                        Acceptor acceptor = { server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen, 0 };
     122
    131123                                        char buffer[128];
    132124                                        while(!feof(stdin)) {
     
    135127
    136128                                        printf("Shutting Down\n");
     129                                }
     130                                printf("Acceptor Closed\n");
     131
     132                                // Clean-up the workers
     133                                for(options.clopts.nworkers) {
     134                                        put( wait_connect, -1 );
    137135                                }
    138136                        }
  • benchmark/io/http/options.cfa

    r14d8a9b rdfc13bb  
    3131                1,     // nworkers;
    3232                0,     // flags;
     33                10,    // chan_size;
    3334                false, // procstats
    3435                false, // viewhalts
     
    3839
    3940const char * parse_options( int argc, char * argv[] ) {
     41        bool uthrdpo = false;
    4042        bool subthrd = false;
    4143        bool eagrsub = false;
     
    5052                {'t', "threads",        "Number of worker threads to use", options.clopts.nworkers},
    5153                {'b', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog},
     54                {'B', "channel-size",   "Maximum number of accepted connection pending", options.clopts.chan_size},
    5255                {'r', "request_len",    "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen},
    5356                {'S', "seed",           "seed to use for hashing", options.file_cache.hash_seed },
    5457                {'C', "cache-size",     "Size of the cache to use, if set to small, will uses closes power of 2", options.file_cache.size },
    5558                {'l', "list-files",     "List the files in the specified path and exit", options.file_cache.list, parse_settrue },
     59                {'u', "userthread",     "If set, cluster uses user-thread to poll I/O", uthrdpo, parse_settrue },
    5660                {'s', "submitthread",   "If set, cluster uses polling thread to submit I/O", subthrd, parse_settrue },
    5761                {'e', "eagersubmit",    "If set, cluster submits I/O eagerly but still aggregates submits", eagrsub, parse_settrue},
     
    6771        parse_args( argc, argv, opt, opt_cnt, "[OPTIONS]... [PATH]\ncforall http server", left );
    6872
    69         options.clopts.params.poller_submits = subthrd;
    70         options.clopts.params.eager_submits  = eagrsub;
     73        if( uthrdpo ) {
     74                options.clopts.flags |= CFA_CLUSTER_IO_POLLER_USER_THREAD;
     75        }
     76
     77        if( subthrd ) {
     78                options.clopts.flags |= CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS;
     79        }
     80
     81        if( eagrsub ) {
     82                options.clopts.flags |= CFA_CLUSTER_IO_EAGER_SUBMITS;
     83        }
    7184
    7285        if( fixedfd ) {
     
    7588
    7689        if( sqkpoll ) {
    77                 options.clopts.params.poll_submit = true;
     90                options.clopts.flags |= CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS;
    7891                options.file_cache.fixed_fds = true;
    7992        }
    8093
    8194        if( iokpoll ) {
    82                 options.clopts.params.poll_complete = true;
     95                options.clopts.flags |= CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES;
    8396                options.file_cache.open_flags |= O_DIRECT;
    8497        }
    8598
    86         options.clopts.params.num_ready = sublen;
     99        options.clopts.flags |= (sublen << CFA_CLUSTER_IO_BUFFLEN_OFFSET);
    87100
    88101        if( left[0] == 0p ) { return "."; }
  • benchmark/io/http/options.hfa

    r14d8a9b rdfc13bb  
    22
    33#include <stdint.h>
    4 
    5 #include <kernel.hfa>
    64
    75struct cluster;
     
    2523                int nprocs;
    2624                int nworkers;
    27                 io_context_params params;
     25                int flags;
     26                int chan_size;
    2827                bool procstats;
    2928                bool viewhalts;
  • benchmark/io/http/protocol.cfa

    r14d8a9b rdfc13bb  
    1111extern "C" {
    1212      int snprintf ( char * s, size_t n, const char * format, ... );
    13         #include <linux/io_uring.h>
    1413}
    1514#include <string.h>
     15
    1616#include <errno.h>
    1717
    18 #include "options.hfa"
    1918
    2019const char * http_msgs[] = {
     
    7574        READ:
    7675        for() {
    77                 int ret = cfa_read(fd, (void*)it, count, 0, -1`s, 0p, 0p);
    78                 if(ret == 0 ) return [OK200, true, 0, 0];
     76                int ret = cfa_read(fd, it, count);
     77                if(ret == 0 ) return [OK200, true, 0p, 0];
    7978                if(ret < 0 ) {
    8079                        if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ;
     
    8988                count -= ret;
    9089
    91                 if( count < 1 ) return [E414, false, 0, 0];
     90                if( count < 1 ) return [E414, false, 0p, 0];
    9291        }
    9392
     
    9695        it = buffer;
    9796        int ret = memcmp(it, "GET /", 5);
    98         if( ret != 0 ) return [E400, false, 0, 0];
     97        if( ret != 0 ) return [E400, false, 0p, 0];
    9998        it += 5;
    10099
     
    107106        ssize_t ret;
    108107        SPLICE1: while(count > 0) {
    109                 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, SPLICE_F_MOVE | SPLICE_F_MORE, 0, -1`s, 0p, 0p);
     108                ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, SPLICE_F_MOVE | SPLICE_F_MORE);
    110109                if( ret < 0 ) {
    111110                        if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1;
     
    117116                size_t in_pipe = ret;
    118117                SPLICE2: while(in_pipe > 0) {
    119                         ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, SPLICE_F_MOVE | SPLICE_F_MORE, 0, -1`s, 0p, 0p);
     118                        ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, SPLICE_F_MOVE | SPLICE_F_MORE);
    120119                        if( ret < 0 ) {
    121120                                if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2;
  • benchmark/io/http/worker.cfa

    r14d8a9b rdfc13bb  
    2828        CONNECTION:
    2929        for() {
    30                 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, 0p, 0p );
    31                 if(fd < 0) {
    32                         if( errno == ECONNABORTED ) break;
    33                         abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
    34                 }
     30                int fd = take(wait_connect);
     31                if (fd < 0) break;
    3532
    3633                printf("New connection %d, waiting for requests\n", fd);
     
    8582        }
    8683}
     84
     85//=============================================================================================
     86// Acceptor Thread
     87//=============================================================================================
     88void ?{}( Acceptor & this, int sockfd, struct sockaddr * addr, socklen_t * addrlen, int flags ) {
     89        ((thread&)this){ "Acceptor Thread", *options.clopts.instance };
     90        this.sockfd  = sockfd;
     91        this.addr    = addr;
     92        this.addrlen = addrlen;
     93        this.flags   = flags;
     94}
     95
     96void main( Acceptor & this ) {
     97        for() {
     98                int ret = cfa_accept4( this.[sockfd, addr, addrlen, flags] );
     99                if(ret < 0) {
     100                        if( errno == ECONNABORTED ) break;
     101                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
     102                }
     103
     104                printf("New connection accepted\n");
     105                put( wait_connect, ret );
     106        }
     107}
  • benchmark/io/http/worker.hfa

    r14d8a9b rdfc13bb  
    77}
    88
     9#include "channel.hfa"
     10
     11extern channel & wait_connect;
     12
    913//=============================================================================================
    1014// Worker Thread
     
    1317thread Worker {
    1418        int pipe[2];
     19};
     20void ?{}( Worker & this );
     21void main( Worker & );
     22
     23//=============================================================================================
     24// Acceptor Thread
     25//=============================================================================================
     26thread Acceptor {
    1527        int sockfd;
    1628        struct sockaddr * addr;
     
    1830        int flags;
    1931};
    20 void ?{}( Worker & this);
    21 void main( Worker & );
     32
     33void ?{}( Acceptor & this, int sockfd, struct sockaddr * addr, socklen_t * addrlen, int flags );
     34void main( Acceptor & this );
Note: See TracChangeset for help on using the changeset viewer.