Changes in / [e93cbfa:289a21c]


Ignore:
Location:
benchmark/io/http
Files:
1 added
6 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/filecache.cfa

    re93cbfa r289a21c  
    8282
    8383[int fd, size_t size] get_file( * const char file, size_t len ) {
    84         uint32_t idx = murmur3_32( (const uint8_t *)file, len, options.hash_seed ) % file_cache.size;
     84        uint32_t idx = murmur3_32( (const uint8_t *)file, len, options.file_cache.hash_seed ) % file_cache.size;
    8585
    8686        for(int i = 0;; i++) {
     
    9999
    100100int put_file( cache_line & entry ) {
    101         uint32_t idx = murmur3_32( (const uint8_t *)entry.file, strlen(entry.file), options.hash_seed ) % file_cache.size;
     101        uint32_t idx = murmur3_32( (const uint8_t *)entry.file, strlen(entry.file), options.file_cache.hash_seed ) % file_cache.size;
    102102
    103103        int i = 0;
     
    136136                raw[idx].file = strdup(fpath+2);
    137137                raw[idx].size = sb->st_size;
    138                 if( !options.file_cache_list ) {
    139                         raw[idx].fd = open( fpath, options.open_flags );
     138                if( !options.file_cache.list ) {
     139                        raw[idx].fd = open( fpath, options.file_cache.open_flags );
    140140                        if(raw[idx].fd < 0) {
    141141                                abort( "open file error: (%d) %s\n", (int)errno, strerror(errno) );
     
    154154        }
    155155
    156         if(options.file_cache_list) {
     156        // Step 2 create the cache
     157        file_cache.size = options.file_cache.size > 0 ? options.file_cache.size : fsize;
     158        if( file_cache.size < fcount ) {
     159                abort("File Cache too small\n");
     160        }
     161
     162        file_cache.entries = anew(file_cache.size);
     163
     164        // Step 3 fill the cache
     165        int conflicts = 0;
     166        for(i; fcount) {
     167                conflicts += put_file( raw[i] );
     168        }
     169        printf("Filled cache from path \"%s\" with %zu files\n", path, fcount);
     170        if( conflicts > 0 ) {
     171                printf("Found %d conflicts (seed: %u)\n", conflicts, options.file_cache.hash_seed);
     172                #if defined(REJECT_CONFLICTS)
     173                        abort("Conflicts found in the cache");
     174                #endif
     175        }
     176
     177        if(options.file_cache.list) {
    157178                printf("Listing files and exiting\n");
    158179                for(i; fcount) {
     
    163184                }
    164185                free(raw);
     186                adelete(file_cache.size, file_cache.entries);
    165187                exit(0);
    166         }
    167 
    168         // Step 2 create the cache
    169         file_cache.size = options.file_cache_size > 0 ? options.file_cache_size : fsize;
    170         if( file_cache.size < fcount ) {
    171                 abort("File Cache too small\n");
    172         }
    173 
    174         file_cache.entries = anew(fsize);
    175 
    176         // Step 3 fill the cache
    177         int conflicts = 0;
    178         for(i; fcount) {
    179                 printf("Added file %s\n", raw[i].file);
    180                 conflicts += put_file( raw[i] );
    181         }
    182         printf("Filled cache from path \"%s\" with %zu files\n", path, fcount);
    183         if( conflicts > 0 ) {
    184                 printf("Found %d conflicts (seed: %u)\n", conflicts, options.hash_seed);
    185                 #if defined(REJECT_CONFLICTS)
    186                         abort("Conflicts found in the cache");
    187                 #endif
    188188        }
    189189
  • benchmark/io/http/main.cfa

    re93cbfa r289a21c  
    1717#include "filecache.hfa"
    1818#include "options.hfa"
    19 #include "parseargs.hfa"
    2019#include "worker.hfa"
    2120
     
    2322// Globals
    2423//=============================================================================================
    25 Options options @= {
    26         0,     //   open_flags;
    27         42u,   //       hash_seed;
    28         0,     //   file_cache_size;
    29         false, //       file_cache_list;
    30         false, //       procstats;
    31         false, //       viewhalts;
    32         0      //       the_cluster;
    33 };
    34 
    3524channel & wait_connect;
    3625
     
    4029
    4130void ?{}( ServerProc & this ) {
    42         /* paranoid */ assert( options.the_cluster != 0p );
    43         (this.self){ "Benchmark Processor", *options.the_cluster };
     31        /* paranoid */ assert( options.clopts.instance != 0p );
     32        (this.self){ "Benchmark Processor", *options.clopts.instance };
    4433
    4534        #if !defined(__CFA_NO_STATISTICS__)
    46                 if( options.procstats ) {
    47                         print_stats_at_exit( this.self, options.the_cluster->print_stats );
     35                if( options.clopts.procstats ) {
     36                        print_stats_at_exit( this.self, options.clopts.instance->print_stats );
    4837                }
    49                 if( options.viewhalts ) {
     38                if( options.clopts.viewhalts ) {
    5039                        print_halts( this.self );
    5140                }
     
    5746//============================================================================================='
    5847int main( int argc, char * argv[] ) {
    59         int port      = 8080;
    60         int backlog   = 10;
    61         int nprocs    = 1;
    62         int nworkers  = 1;
    63         int cl_flags  = 0;
    64         int chan_size = 10;
    65         const char * path = ".";
    6648        //===================
    6749        // Parse args
    68         static cfa_option opt[] = {
    69                 {'p', "port", "Port the server will listen on", port},
    70                 {'c', "cpus", "Number of processors to use", nprocs},
    71                 {'t', "threads", "Number of worker threads to use", nworkers},
    72                 {'b', "accept-backlog", "Maximum number of pending accepts", backlog},
    73                 {'B', "channel-size", "Maximum number of accepted connection pending", chan_size},
    74                 {'S', "seed", "seed to use for hashing", options.hash_seed },
    75                 {'C', "cache-size", "Size of the cache to use, if set to small, will uses closes power of 2", options.file_cache_size },
    76                 {'l', "list-files", "List the files in the specified path and exit", options.file_cache_list, parse_settrue }
    77 
    78         };
    79         int opt_cnt = sizeof(opt) / sizeof(cfa_option);
    80 
    81         char **left;
    82       parse_args( argc, argv, opt, opt_cnt, "[OPTIONS]... [PATH]\ncforall http server", left );
    83         if( left[0] != 0p ) {
    84                 path = left[0];
    85                 left++;
    86         }
    87         if( left[0] != 0p ) {
    88                 abort("Too many trailing arguments!\n");
    89         }
    90 
     50        const char * path = parse_options(argc, argv);
    9151
    9252        //===================
     
    9757        //===================
    9858        // Open Socket
    99         printf("Listening on port %d\n", port);
     59        printf("Listening on port %d\n", options.socket.port);
    10060        int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    10161        if(server_fd < 0) {
     
    10969        address.sin_family = AF_INET;
    11070        address.sin_addr.s_addr = htonl(INADDR_ANY);
    111         address.sin_port = htons( port );
     71        address.sin_port = htons( options.socket.port );
    11272
    11373        ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) );
     
    11676        }
    11777
    118         ret = listen( server_fd, backlog );
     78        ret = listen( server_fd, options.socket.backlog );
    11979        if(ret < 0) {
    12080                abort( "listen error: (%d) %s\n", (int)errno, strerror(errno) );
     
    12484        // Run Server Cluster
    12585        {
    126                 cluster cl = { "Server Cluster", cl_flags };
     86                cluster cl = { "Server Cluster", options.clopts.flags };
    12787                #if !defined(__CFA_NO_STATISTICS__)
    12888                        print_stats_at_exit( cl, CFA_STATS_READY_Q | CFA_STATS_IO );
    12989                #endif
    130                 options.the_cluster = &cl;
     90                options.clopts.instance = &cl;
    13191
    132                 channel chan = { chan_size };
     92                channel chan = { options.clopts.chan_size };
    13393                &wait_connect = &chan;
    13494
    13595                {
    136                         ServerProc procs[nprocs];
     96                        ServerProc procs[options.clopts.nprocs];
    13797                        {
    138                                 Worker workers[nworkers];
    139                                 printf("%d workers started on %d processors\n", nworkers, nprocs);
     98                                Worker workers[options.clopts.nworkers];
     99                                printf("%d workers started on %d processors\n", options.clopts.nworkers, options.clopts.nprocs);
    140100                                {
    141101                                        Acceptor acceptor = { server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen, 0 };
     102
     103                                        char buffer[128];
     104                                        while(!feof(stdin)) {
     105                                                fgets(buffer, 128, stdin);
     106                                        }
     107
     108                                        printf("Shutting Down\n");
    142109                                }
    143                                 printf("Shutting Down\n");
     110                                printf("Acceptor Closed\n");
    144111
    145112                                // Clean-up the workers
    146                                 for(nworkers) {
     113                                for(options.clopts.nworkers) {
    147114                                        put( wait_connect, -1 );
    148115                                }
    149116                        }
     117                        printf("Workers Closed\n");
    150118                }
    151119        }
  • benchmark/io/http/options.hfa

    re93cbfa r289a21c  
    66
    77struct Options {
    8         int open_flags;
    9         uint32_t hash_seed;
    10         size_t file_cache_size;
    11         bool file_cache_list;
    12         bool procstats;
    13         bool viewhalts;
    14         cluster * the_cluster;
     8        struct {
     9                int open_flags;
     10                uint32_t hash_seed;
     11                size_t size;
     12                bool list;
     13                bool fixed_fds;
     14        } file_cache;
     15
     16        struct {
     17                int port;
     18                int backlog;
     19        } socket;
     20
     21        struct {
     22                int nprocs;
     23                int nworkers;
     24                int flags;
     25                int chan_size;
     26                bool procstats;
     27                bool viewhalts;
     28                cluster * instance;
     29        } clopts;
    1530};
    1631
    1732extern Options options;
     33
     34const char * parse_options( int argc, char * argv[] );
  • benchmark/io/http/protocol.cfa

    re93cbfa r289a21c  
    2525};
    2626
    27 _Static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0])));
     27_Static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0])));
     28
     29const int http_codes[] = {
     30        200,
     31        400,
     32        404,
     33        413,
     34        414,
     35};
     36
     37_Static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0])));
     38
     39int code_val(HttpCode code) {
     40        return http_codes[code];
     41}
    2842
    2943static inline int answer( int fd, const char * it, int len) {
     
    6175        for() {
    6276                int ret = cfa_read(fd, it, count);
     77                if(ret == 0 ) return [OK200, true, 0p, 0];
    6378                if(ret < 0 ) {
    64                         if( errno ) return [OK200, true, 0p, 0];
    6579                        if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ;
    6680                        abort( "read error: (%d) %s\n", (int)errno, strerror(errno) );
  • benchmark/io/http/protocol.hfa

    re93cbfa r289a21c  
    1010};
    1111
     12int code_val(HttpCode code);
     13
    1214int answer_error( int fd, HttpCode code );
    1315int answer_header( int fd, size_t size );
  • benchmark/io/http/worker.cfa

    re93cbfa r289a21c  
    2626//=============================================================================================
    2727void ?{}( Worker & this ) {
    28         ((thread&)this){ "Server Worker Thread", *options.the_cluster };
     28        ((thread&)this){ "Server Worker Thread", *options.clopts.instance };
    2929        int ret = pipe(this.pipe);
    3030        if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); }
     
    3333void main( Worker & this ) {
    3434        CONNECTION:
    35         while( int fd = take(wait_connect); fd >= 0) {
    36             printf("New connection, waiting for requests\n");
     35        for() {
     36                int fd = take(wait_connect);
     37                if (fd < 0) break;
     38
     39                printf("New connection %d, waiting for requests\n", fd);
    3740                REQUEST:
    3841                for() {
     
    5659                        // If this wasn't a request retrun 400
    5760                        if( code != OK200 ) {
    58                                 printf("Invalid Request\n");
     61                                printf("Invalid Request : %d\n", code_val(code));
    5962                                answer_error(fd, code);
    6063                                continue REQUEST;
    6164                        }
    6265
    63                         printf("Request for file %.*s\n", name_size, file);
     66                        printf("Request for file %.*s\n", (int)name_size, file);
    6467
    6568                        // Get the fd from the file cache
     
    9093//=============================================================================================
    9194void ?{}( Acceptor & this, int sockfd, struct sockaddr * addr, socklen_t * addrlen, int flags ) {
    92         ((thread&)this){ "Acceptor Thread", *options.the_cluster };
     95        ((thread&)this){ "Acceptor Thread", *options.clopts.instance };
    9396        this.sockfd  = sockfd;
    9497        this.addr    = addr;
     
    105108                }
    106109
    107             printf("New connection accepted\n");
     110                printf("New connection accepted\n");
    108111                put( wait_connect, ret );
    109       }
     112        }
    110113}
Note: See TracChangeset for help on using the changeset viewer.