Changes in / [7d94bfe3:ffa48a8]
- Location:
- benchmark/io/http
- Files:
-
- 1 deleted
- 7 edited
-
filecache.cfa (modified) (5 diffs)
-
main.cfa (modified) (8 diffs)
-
options.cfa (deleted)
-
options.hfa (modified) (1 diff)
-
parseargs.cfa (modified) (6 diffs)
-
protocol.cfa (modified) (2 diffs)
-
protocol.hfa (modified) (1 diff)
-
worker.cfa (modified) (6 diffs)
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/filecache.cfa
r7d94bfe3 rffa48a8 82 82 83 83 [int fd, size_t size] get_file( * const char file, size_t len ) { 84 uint32_t idx = murmur3_32( (const uint8_t *)file, len, options. file_cache.hash_seed ) % file_cache.size;84 uint32_t idx = murmur3_32( (const uint8_t *)file, len, options.hash_seed ) % file_cache.size; 85 85 86 86 for(int i = 0;; i++) { … … 99 99 100 100 int put_file( cache_line & entry ) { 101 uint32_t idx = murmur3_32( (const uint8_t *)entry.file, strlen(entry.file), options. file_cache.hash_seed ) % file_cache.size;101 uint32_t idx = murmur3_32( (const uint8_t *)entry.file, strlen(entry.file), options.hash_seed ) % file_cache.size; 102 102 103 103 int i = 0; … … 136 136 raw[idx].file = strdup(fpath+2); 137 137 raw[idx].size = sb->st_size; 138 if( !options.file_cache .list ) {139 raw[idx].fd = open( fpath, options. file_cache.open_flags );138 if( !options.file_cache_list ) { 139 raw[idx].fd = open( fpath, options.open_flags ); 140 140 if(raw[idx].fd < 0) { 141 141 abort( "open file error: (%d) %s\n", (int)errno, strerror(errno) ); … … 154 154 } 155 155 156 // Step 2 create the cache 157 file_cache.size = options.file_cache.size > 0 ? options.file_cache.size : fsize; 158 if( file_cache.size < fcount ) { 159 abort("File Cache too small\n"); 160 } 161 162 file_cache.entries = anew(file_cache.size); 163 164 // Step 3 fill the cache 165 int conflicts = 0; 166 for(i; fcount) { 167 conflicts += put_file( raw[i] ); 168 } 169 printf("Filled cache from path \"%s\" with %zu files\n", path, fcount); 170 if( conflicts > 0 ) { 171 printf("Found %d conflicts (seed: %u)\n", conflicts, options.file_cache.hash_seed); 172 #if defined(REJECT_CONFLICTS) 173 abort("Conflicts found in the cache"); 174 #endif 175 } 176 177 if(options.file_cache.list) { 156 if(options.file_cache_list) { 178 157 printf("Listing files and exiting\n"); 179 158 for(i; fcount) { … … 184 163 } 185 164 free(raw); 186 adelete(file_cache.size, file_cache.entries);187 165 exit(0); 166 } 167 168 // Step 2 create the cache 169 file_cache.size = options.file_cache_size > 0 ? options.file_cache_size : fsize; 170 if( file_cache.size < fcount ) { 171 abort("File Cache too small\n"); 172 } 173 174 file_cache.entries = anew(fsize); 175 176 // Step 3 fill the cache 177 int conflicts = 0; 178 for(i; fcount) { 179 printf("Added file %s\n", raw[i].file); 180 conflicts += put_file( raw[i] ); 181 } 182 printf("Filled cache from path \"%s\" with %zu files\n", path, fcount); 183 if( conflicts > 0 ) { 184 printf("Found %d conflicts (seed: %u)\n", conflicts, options.hash_seed); 185 #if defined(REJECT_CONFLICTS) 186 abort("Conflicts found in the cache"); 187 #endif 188 188 } 189 189 -
benchmark/io/http/main.cfa
r7d94bfe3 rffa48a8 17 17 #include "filecache.hfa" 18 18 #include "options.hfa" 19 #include "parseargs.hfa" 19 20 #include "worker.hfa" 20 21 … … 22 23 // Globals 23 24 //============================================================================================= 25 Options options @= { 26 0, // open_flags; 27 42u, // hash_seed; 28 0, // file_cache_size; 29 false, // file_cache_list; 30 false, // procstats; 31 false, // viewhalts; 32 0 // the_cluster; 33 }; 34 24 35 channel & wait_connect; 25 36 … … 29 40 30 41 void ?{}( ServerProc & this ) { 31 /* paranoid */ assert( options. clopts.instance!= 0p );32 (this.self){ "Benchmark Processor", *options. clopts.instance};42 /* paranoid */ assert( options.the_cluster != 0p ); 43 (this.self){ "Benchmark Processor", *options.the_cluster }; 33 44 34 45 #if !defined(__CFA_NO_STATISTICS__) 35 if( options. clopts.procstats ) {36 print_stats_at_exit( this.self, options. clopts.instance->print_stats );46 if( options.procstats ) { 47 print_stats_at_exit( this.self, options.the_cluster->print_stats ); 37 48 } 38 if( options. clopts.viewhalts ) {49 if( options.viewhalts ) { 39 50 print_halts( this.self ); 40 51 } … … 46 57 //=============================================================================================' 47 58 int main( int argc, char * argv[] ) { 59 int port = 8080; 60 int backlog = 10; 61 int nprocs = 1; 62 int nworkers = 1; 63 int cl_flags = 0; 64 int chan_size = 10; 65 const char * path = "."; 48 66 //=================== 49 67 // Parse args 50 const char * path = parse_options(argc, argv); 68 static cfa_option opt[] = { 69 {'p', "port", "Port the server will listen on", port}, 70 {'c', "cpus", "Number of processors to use", nprocs}, 71 {'t', "threads", "Number of worker threads to use", nworkers}, 72 {'b', "accept-backlog", "Maximum number of pending accepts", backlog}, 73 {'B', "channel-size", "Maximum number of accepted connection pending", chan_size}, 74 {'S', "seed", "seed to use for hashing", options.hash_seed }, 75 {'C', "cache-size", "Size of the cache to use, if set to small, will uses closes power of 2", options.file_cache_size }, 76 {'l', "list-files", "List the files in the specified path and exit", options.file_cache_list, parse_settrue } 77 78 }; 79 int opt_cnt = sizeof(opt) / sizeof(cfa_option); 80 81 char **left; 82 parse_args( argc, argv, opt, opt_cnt, "[OPTIONS]... [PATH]\ncforall http server", left ); 83 if( left[0] != 0p ) { 84 path = left[0]; 85 left++; 86 } 87 if( left[0] != 0p ) { 88 abort("Too many trailing arguments!\n"); 89 } 90 51 91 52 92 //=================== … … 57 97 //=================== 58 98 // Open Socket 59 printf("Listening on port %d\n", options.socket.port);99 printf("Listening on port %d\n", port); 60 100 int server_fd = socket(AF_INET, SOCK_STREAM, 0); 61 101 if(server_fd < 0) { … … 69 109 address.sin_family = AF_INET; 70 110 address.sin_addr.s_addr = htonl(INADDR_ANY); 71 address.sin_port = htons( options.socket.port );111 address.sin_port = htons( port ); 72 112 73 113 ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) ); … … 76 116 } 77 117 78 ret = listen( server_fd, options.socket.backlog );118 ret = listen( server_fd, backlog ); 79 119 if(ret < 0) { 80 120 abort( "listen error: (%d) %s\n", (int)errno, strerror(errno) ); … … 84 124 // Run Server Cluster 85 125 { 86 cluster cl = { "Server Cluster", options.clopts.flags };126 cluster cl = { "Server Cluster", cl_flags }; 87 127 #if !defined(__CFA_NO_STATISTICS__) 88 128 print_stats_at_exit( cl, CFA_STATS_READY_Q | CFA_STATS_IO ); 89 129 #endif 90 options. clopts.instance= &cl;130 options.the_cluster = &cl; 91 131 92 channel chan = { options.clopts.chan_size };132 channel chan = { chan_size }; 93 133 &wait_connect = &chan; 94 134 95 135 { 96 ServerProc procs[ options.clopts.nprocs];136 ServerProc procs[nprocs]; 97 137 { 98 Worker workers[ options.clopts.nworkers];99 printf("%d workers started on %d processors\n", options.clopts.nworkers, options.clopts.nprocs);138 Worker workers[nworkers]; 139 printf("%d workers started on %d processors\n", nworkers, nprocs); 100 140 { 101 141 Acceptor acceptor = { server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen, 0 }; 102 103 char buffer[128];104 while(!feof(stdin)) {105 fgets(buffer, 128, stdin);106 }107 108 printf("Shutting Down\n");109 142 } 110 printf(" Acceptor Closed\n");143 printf("Shutting Down\n"); 111 144 112 145 // Clean-up the workers 113 for( options.clopts.nworkers) {146 for(nworkers) { 114 147 put( wait_connect, -1 ); 115 148 } 116 149 } 117 printf("Workers Closed\n");118 150 } 119 151 } -
benchmark/io/http/options.hfa
r7d94bfe3 rffa48a8 6 6 7 7 struct Options { 8 struct { 9 int open_flags; 10 uint32_t hash_seed; 11 size_t size; 12 bool list; 13 bool fixed_fds; 14 } file_cache; 15 16 struct { 17 int port; 18 int backlog; 19 int buflen; 20 } socket; 21 22 struct { 23 int nprocs; 24 int nworkers; 25 int flags; 26 int chan_size; 27 bool procstats; 28 bool viewhalts; 29 cluster * instance; 30 } clopts; 8 int open_flags; 9 uint32_t hash_seed; 10 size_t file_cache_size; 11 bool file_cache_list; 12 bool procstats; 13 bool viewhalts; 14 cluster * the_cluster; 31 15 }; 32 16 33 17 extern Options options; 34 35 const char * parse_options( int argc, char * argv[] ); -
benchmark/io/http/parseargs.cfa
r7d94bfe3 rffa48a8 3 3 // #include <stdio.h> 4 4 // #include <stdlib.h> 5 #include <errno.h>6 5 #include <string.h> 7 #include <unistd.h>8 6 extern "C" { 9 7 #include <getopt.h> 10 #include <sys/ioctl.h>11 8 12 9 struct FILE; … … 14 11 extern FILE * stdout; 15 12 16 extern int fileno(FILE *stream);17 18 13 extern int fprintf ( FILE * stream, const char * format, ... ); 19 14 20 15 extern long long int strtoll (const char* str, char** endptr, int base); 21 16 extern unsigned long long int strtoull(const char* str, char** endptr, int base); 22 }23 24 #include <common.hfa>25 #include <limits.hfa>26 27 void printopt(FILE * out, int width, int max, char sn, const char * ln, const char * help) {28 int hwidth = max - (11 + width);29 if(hwidth <= 0) hwidth = max;30 31 fprintf(out, " -%c, --%-*s %.*s\n", sn, width, ln, hwidth, help);32 for() {33 help += min(strlen(help), hwidth);34 if('\0' == *help) break;35 fprintf(out, "%*s%.*s\n", width + 11, "", hwidth, help);36 }37 17 } 38 18 … … 47 27 struct option optarr[opt_count + 2]; 48 28 int width = 0; 49 int max_width = 1_000_000;50 29 { 51 30 int idx = 0; … … 116 95 } 117 96 118 USAGE:; 119 int outfd = fileno(out); 120 if(isatty(outfd)) { 121 struct winsize size; 122 int ret = ioctl(outfd, TIOCGWINSZ, &size); 123 if(ret < 0) abort( "ioctl error: (%d) %s\n", (int)errno, strerror(errno) ); 124 max_width = size.ws_col; 125 } 126 97 USAGE: 127 98 fprintf(out, "Usage:\n %s %s\n", argv[0], usage); 128 99 129 100 for(i; opt_count) { 130 printopt(out, width, max_width, options[i].short_name, options[i].long_name, options[i].help);101 fprintf(out, " -%c, --%-*s %s\n", options[i].short_name, width, options[i].long_name, options[i].help); 131 102 } 132 103 fprintf(out, " -%c, --%-*s %s\n", 'h', width, "help", "print this help message"); … … 167 138 unsigned long long int r = strtoull(arg, &end, 10); 168 139 if(*end != '\0') return false; 169 if(r > (unsigned)MAX) return false; 140 #warning not checking max 170 141 171 142 value = r; … … 177 148 unsigned long long int r = strtoull(arg, &end, 10); 178 149 if(*end != '\0') return false; 179 if(r > (size_t)MAX) return false; 150 #warning not checking max 180 151 181 152 value = r; -
benchmark/io/http/protocol.cfa
r7d94bfe3 rffa48a8 25 25 }; 26 26 27 _Static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0]))); 28 29 const int http_codes[] = { 30 200, 31 400, 32 404, 33 413, 34 414, 35 }; 36 37 _Static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0]))); 38 39 int code_val(HttpCode code) { 40 return http_codes[code]; 41 } 27 _Static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0]))); 42 28 43 29 static inline int answer( int fd, const char * it, int len) { … … 75 61 for() { 76 62 int ret = cfa_read(fd, it, count); 77 if(ret == 0 ) return [OK200, true, 0p, 0];78 63 if(ret < 0 ) { 64 if( errno ) return [OK200, true, 0p, 0]; 79 65 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ; 80 66 abort( "read error: (%d) %s\n", (int)errno, strerror(errno) ); -
benchmark/io/http/protocol.hfa
r7d94bfe3 rffa48a8 10 10 }; 11 11 12 int code_val(HttpCode code);13 14 12 int answer_error( int fd, HttpCode code ); 15 13 int answer_header( int fd, size_t size ); -
benchmark/io/http/worker.cfa
r7d94bfe3 rffa48a8 26 26 //============================================================================================= 27 27 void ?{}( Worker & this ) { 28 ((thread&)this){ "Server Worker Thread", *options. clopts.instance};28 ((thread&)this){ "Server Worker Thread", *options.the_cluster }; 29 29 int ret = pipe(this.pipe); 30 30 if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); } … … 33 33 void main( Worker & this ) { 34 34 CONNECTION: 35 for() { 36 int fd = take(wait_connect); 37 if (fd < 0) break; 38 39 printf("New connection %d, waiting for requests\n", fd); 35 while( int fd = take(wait_connect); fd >= 0) { 36 printf("New connection, waiting for requests\n"); 40 37 REQUEST: 41 38 for() { … … 46 43 47 44 // Read the http request 48 size_t len = options.socket.buflen;45 size_t len = 1024; 49 46 char buffer[len]; 50 47 printf("Reading request\n"); … … 59 56 // If this wasn't a request retrun 400 60 57 if( code != OK200 ) { 61 printf("Invalid Request : %d\n", code_val(code));58 printf("Invalid Request\n"); 62 59 answer_error(fd, code); 63 60 continue REQUEST; 64 61 } 65 62 66 printf("Request for file %.*s\n", (int)name_size, file);63 printf("Request for file %.*s\n", name_size, file); 67 64 68 65 // Get the fd from the file cache … … 93 90 //============================================================================================= 94 91 void ?{}( Acceptor & this, int sockfd, struct sockaddr * addr, socklen_t * addrlen, int flags ) { 95 ((thread&)this){ "Acceptor Thread", *options. clopts.instance};92 ((thread&)this){ "Acceptor Thread", *options.the_cluster }; 96 93 this.sockfd = sockfd; 97 94 this.addr = addr; … … 108 105 } 109 106 110 printf("New connection accepted\n");107 printf("New connection accepted\n"); 111 108 put( wait_connect, ret ); 112 }109 } 113 110 }
Note:
See TracChangeset
for help on using the changeset viewer.