- Timestamp:
- Jul 23, 2020, 3:37:05 PM (6 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- f4ec4a90
- Parents:
- f0c3120 (diff), e262b5e (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - Location:
- benchmark/io/http
- Files:
-
- 1 added
- 9 edited
-
filecache.cfa (modified) (7 diffs)
-
filecache.hfa (modified) (1 diff)
-
main.cfa (modified) (8 diffs)
-
options.cfa (added)
-
options.hfa (modified) (1 diff)
-
parseargs.cfa (modified) (5 diffs)
-
parseargs.hfa (modified) (1 diff)
-
protocol.cfa (modified) (2 diffs)
-
protocol.hfa (modified) (1 diff)
-
worker.cfa (modified) (5 diffs)
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/filecache.cfa
rf0c3120 r04b73b6 56 56 } 57 57 58 static inline [unsigned size, char unit] human_size( size_t size ) { 59 int idx = 0; 60 static char units [] = { ' ', 'K', 'M', 'G', 'T' }; 61 while( size >= 1024 ) { 62 idx++; 63 size /= 1024; 64 if(idx >= 5) { 65 abort("File too large to print\n"); 66 } 67 } 68 69 return [size, units[idx]]; 70 } 58 71 59 72 struct { … … 69 82 70 83 [int fd, size_t size] get_file( * const char file, size_t len ) { 71 uint32_t idx = murmur3_32( (const uint8_t *)file, len, options. hash_seed ) % file_cache.size;84 uint32_t idx = murmur3_32( (const uint8_t *)file, len, options.file_cache.hash_seed ) % file_cache.size; 72 85 73 86 for(int i = 0;; i++) { … … 86 99 87 100 int put_file( cache_line & entry ) { 88 uint32_t idx = murmur3_32( (const uint8_t *)entry.file, strlen(entry.file), options. hash_seed ) % file_cache.size;101 uint32_t idx = murmur3_32( (const uint8_t *)entry.file, strlen(entry.file), options.file_cache.hash_seed ) % file_cache.size; 89 102 90 103 int i = 0; … … 101 114 void fill_cache( const char * path ) { 102 115 int ret; 116 ret = chdir(path); 117 if(ret < 0) { 118 abort( "chdir error: (%d) %s\n", (int)errno, strerror(errno) ); 119 } 120 103 121 size_t fcount = 0; 104 122 size_t fsize = 16; … … 118 136 raw[idx].file = strdup(fpath+2); 119 137 raw[idx].size = sb->st_size; 120 raw[idx].fd = open( fpath, options.open_flags ); 121 if(raw[idx].fd < 0) { 122 abort( "open file error: (%d) %s\n", (int)errno, strerror(errno) ); 138 if( !options.file_cache.list ) { 139 raw[idx].fd = open( fpath, options.file_cache.open_flags ); 140 if(raw[idx].fd < 0) { 141 abort( "open file error: (%d) %s\n", (int)errno, strerror(errno) ); 142 } 123 143 } 124 144 return 0; 125 145 } 126 146 127 ret = ftw( path, walk, 10);147 ret = ftw(".", walk, 10); 128 148 if(ret < 0) { 129 149 abort( "ftw error: (%d) %s\n", (int)errno, strerror(errno) ); … … 135 155 136 156 // Step 2 create the cache 137 file_cache.size = options.file_cache _size > 0 ? options.file_cache_size : fsize;157 file_cache.size = options.file_cache.size > 0 ? options.file_cache.size : fsize; 138 158 if( file_cache.size < fcount ) { 139 159 abort("File Cache too small\n"); 140 160 } 141 161 142 file_cache.entries = anew(f size);162 file_cache.entries = anew(file_cache.size); 143 163 144 164 // Step 3 fill the cache 145 165 int conflicts = 0; 146 166 for(i; fcount) { 147 printf("Added file %s\n", raw[i].file);148 167 conflicts += put_file( raw[i] ); 149 168 } 150 169 printf("Filled cache from path \"%s\" with %zu files\n", path, fcount); 151 170 if( conflicts > 0 ) { 152 printf("Found %d conflicts (seed: %u)\n", conflicts, options. hash_seed);171 printf("Found %d conflicts (seed: %u)\n", conflicts, options.file_cache.hash_seed); 153 172 #if defined(REJECT_CONFLICTS) 154 173 abort("Conflicts found in the cache"); … … 156 175 } 157 176 177 if(options.file_cache.list) { 178 printf("Listing files and exiting\n"); 179 for(i; fcount) { 180 int s; char u; 181 [s, u] = human_size(raw[i].size); 182 printf("%4d%c - %s\n", s, u, raw[i].file); 183 free(raw[i].file); 184 } 185 free(raw); 186 adelete(file_cache.size, file_cache.entries); 187 exit(0); 188 } 189 158 190 // Step 4 clean up 159 191 free( raw ); 160 192 } 193 194 [int *, int] filefds(int extra) { 195 if(!file_cache.entries) { 196 abort("File cache not filled!\n"); 197 } 198 199 return [aalloc(extra), 0]; 200 } 201 161 202 162 203 void close_cache() { -
benchmark/io/http/filecache.hfa
rf0c3120 r04b73b6 11 11 [int fd, size_t size] get_file( * const char file, size_t len ); 12 12 void fill_cache( const char * path ); 13 [int *, int] filefds( int extra ); 13 14 void close_cache(); -
benchmark/io/http/main.cfa
rf0c3120 r04b73b6 17 17 #include "filecache.hfa" 18 18 #include "options.hfa" 19 #include "parseargs.hfa"20 19 #include "worker.hfa" 21 20 … … 23 22 // Globals 24 23 //============================================================================================= 25 Options options @= {26 0,27 42u,28 0,29 false,30 false,31 032 };33 34 24 channel & wait_connect; 35 25 … … 39 29 40 30 void ?{}( ServerProc & this ) { 41 /* paranoid */ assert( options. the_cluster!= 0p );42 (this.self){ "Benchmark Processor", *options. the_cluster};31 /* paranoid */ assert( options.clopts.instance != 0p ); 32 (this.self){ "Benchmark Processor", *options.clopts.instance }; 43 33 44 34 #if !defined(__CFA_NO_STATISTICS__) 45 if( options. procstats ) {46 print_stats_at_exit( this.self, options. the_cluster->print_stats );35 if( options.clopts.procstats ) { 36 print_stats_at_exit( this.self, options.clopts.instance->print_stats ); 47 37 } 48 if( options. viewhalts ) {38 if( options.clopts.viewhalts ) { 49 39 print_halts( this.self ); 50 40 } … … 56 46 //=============================================================================================' 57 47 int main( int argc, char * argv[] ) { 58 int port = 8080;59 int backlog = 10;60 int nprocs = 1;61 int nworkers = 1;62 int cl_flags = 0;63 int chan_size = 10;64 const char * path = ".";65 48 //=================== 66 49 // Parse args 67 static cfa_option opt[] = { 68 {'p', "port", "Port the server will listen on", port}, 69 {'c', "cpus", "Number of processors to use", nprocs}, 70 {'t', "threads", "Number of worker threads to use", nworkers}, 71 {'b', "accept-backlog", "Maximum number of pending accepts", backlog}, 72 {'B', "channel-size", "Maximum number of accepted connection pending", chan_size} 73 }; 74 int opt_cnt = sizeof(opt) / sizeof(cfa_option); 75 76 char **left; 77 parse_args( argc, argv, opt, opt_cnt, "[OPTIONS] [PATH] -- cforall http server", left ); 78 50 const char * path = parse_options(argc, argv); 79 51 80 52 //=================== … … 85 57 //=================== 86 58 // Open Socket 87 printf("Listening on port %d\n", port);59 printf("Listening on port %d\n", options.socket.port); 88 60 int server_fd = socket(AF_INET, SOCK_STREAM, 0); 89 61 if(server_fd < 0) { … … 97 69 address.sin_family = AF_INET; 98 70 address.sin_addr.s_addr = htonl(INADDR_ANY); 99 address.sin_port = htons( port );71 address.sin_port = htons( options.socket.port ); 100 72 101 73 ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) ); … … 104 76 } 105 77 106 ret = listen( server_fd, backlog );78 ret = listen( server_fd, options.socket.backlog ); 107 79 if(ret < 0) { 108 80 abort( "listen error: (%d) %s\n", (int)errno, strerror(errno) ); … … 112 84 // Run Server Cluster 113 85 { 114 cluster cl = { "Server Cluster", cl_flags };86 cluster cl = { "Server Cluster", options.clopts.flags }; 115 87 #if !defined(__CFA_NO_STATISTICS__) 116 88 print_stats_at_exit( cl, CFA_STATS_READY_Q | CFA_STATS_IO ); 117 89 #endif 118 options. the_cluster= &cl;90 options.clopts.instance = &cl; 119 91 120 channel chan = { chan_size };92 channel chan = { options.clopts.chan_size }; 121 93 &wait_connect = &chan; 122 94 95 int pipe_cnt = options.clopts.nworkers * 2; 96 int pipe_off; 97 int * fds; 98 [fds, pipe_off] = filefds( pipe_cnt ); 99 for(i; 0 ~ pipe_cnt ~ 2) { 100 int ret = pipe(&fds[pipe_off + i]); 101 if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); } 102 } 103 123 104 { 124 ServerProc procs[ nprocs];105 ServerProc procs[options.clopts.nprocs]; 125 106 { 126 Worker workers[nworkers]; 127 printf("%d workers started on %d processors\n", nworkers, nprocs); 107 Worker workers[options.clopts.nworkers]; 108 for(i; options.clopts.nworkers) { 109 if( options.file_cache.fixed_fds ) { 110 workers[i].pipe[0] = pipe_off + (i * 2) + 0; 111 workers[i].pipe[1] = pipe_off + (i * 2) + 1; 112 } 113 else { 114 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0]; 115 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1]; 116 } 117 unpark( workers[i] __cfaabi_dbg_ctx2 ); 118 } 119 printf("%d workers started on %d processors\n", options.clopts.nworkers, options.clopts.nprocs); 128 120 { 129 121 Acceptor acceptor = { server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen, 0 }; 122 123 char buffer[128]; 124 while(!feof(stdin)) { 125 fgets(buffer, 128, stdin); 126 } 127 128 printf("Shutting Down\n"); 130 129 } 131 printf(" Shutting Down\n");130 printf("Acceptor Closed\n"); 132 131 133 132 // Clean-up the workers 134 for( nworkers) {133 for(options.clopts.nworkers) { 135 134 put( wait_connect, -1 ); 136 135 } 137 136 } 137 printf("Workers Closed\n"); 138 138 } 139 140 for(i; pipe_cnt) { 141 ret = close( fds[pipe_off + i] ); 142 if(ret < 0) { 143 abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) ); 144 } 145 } 146 free(fds); 139 147 } 140 148 -
benchmark/io/http/options.hfa
rf0c3120 r04b73b6 6 6 7 7 struct Options { 8 int open_flags; 9 uint32_t hash_seed; 10 size_t file_cache_size; 11 bool procstats; 12 bool viewhalts; 13 cluster * the_cluster; 8 struct { 9 int open_flags; 10 uint32_t hash_seed; 11 size_t size; 12 bool list; 13 bool fixed_fds; 14 } file_cache; 15 16 struct { 17 int port; 18 int backlog; 19 int buflen; 20 } socket; 21 22 struct { 23 int nprocs; 24 int nworkers; 25 int flags; 26 int chan_size; 27 bool procstats; 28 bool viewhalts; 29 cluster * instance; 30 } clopts; 14 31 }; 15 32 16 33 extern Options options; 34 35 const char * parse_options( int argc, char * argv[] ); -
benchmark/io/http/parseargs.cfa
rf0c3120 r04b73b6 3 3 // #include <stdio.h> 4 4 // #include <stdlib.h> 5 #include <errno.h> 5 6 #include <string.h> 7 #include <unistd.h> 6 8 extern "C" { 7 9 #include <getopt.h> 10 #include <sys/ioctl.h> 8 11 9 12 struct FILE; … … 11 14 extern FILE * stdout; 12 15 16 extern int fileno(FILE *stream); 17 13 18 extern int fprintf ( FILE * stream, const char * format, ... ); 14 extern long long int strtoll (const char* str, char** endptr, int base); 19 20 extern long long int strtoll (const char* str, char** endptr, int base); 21 extern unsigned long long int strtoull(const char* str, char** endptr, int base); 22 } 23 24 #include <common.hfa> 25 #include <limits.hfa> 26 27 void printopt(FILE * out, int width, int max, char sn, const char * ln, const char * help) { 28 int hwidth = max - (11 + width); 29 if(hwidth <= 0) hwidth = max; 30 31 fprintf(out, " -%c, --%-*s %.*s\n", sn, width, ln, hwidth, help); 32 for() { 33 help += min(strlen(help), hwidth); 34 if('\0' == *help) break; 35 fprintf(out, "%*s%.*s\n", width + 11, "", hwidth, help); 36 } 15 37 } 16 38 … … 25 47 struct option optarr[opt_count + 2]; 26 48 int width = 0; 49 int max_width = 1_000_000; 27 50 { 28 51 int idx = 0; … … 93 116 } 94 117 95 USAGE: 96 fprintf(out, "%s\n", usage); 118 USAGE:; 119 int outfd = fileno(out); 120 if(isatty(outfd)) { 121 struct winsize size; 122 int ret = ioctl(outfd, TIOCGWINSZ, &size); 123 if(ret < 0) abort( "ioctl error: (%d) %s\n", (int)errno, strerror(errno) ); 124 max_width = size.ws_col; 125 } 126 127 fprintf(out, "Usage:\n %s %s\n", argv[0], usage); 97 128 98 129 for(i; opt_count) { 99 fprintf(out, " -%c, --%-*s %s\n", options[i].short_name, width, options[i].long_name, options[i].help);130 printopt(out, width, max_width, options[i].short_name, options[i].long_name, options[i].help); 100 131 } 101 132 fprintf(out, " -%c, --%-*s %s\n", 'h', width, "help", "print this help message"); … … 132 163 } 133 164 165 bool parse(const char * arg, unsigned & value) { 166 char * end; 167 unsigned long long int r = strtoull(arg, &end, 10); 168 if(*end != '\0') return false; 169 if(r > (unsigned)MAX) return false; 170 171 value = r; 172 return true; 173 } 174 175 bool parse(const char * arg, size_t & value) { 176 char * end; 177 unsigned long long int r = strtoull(arg, &end, 10); 178 if(*end != '\0') return false; 179 if(r > (size_t)MAX) return false; 180 181 value = r; 182 return true; 183 } 184 134 185 bool parse(const char * arg, int & value) { 135 186 char * end; -
benchmark/io/http/parseargs.hfa
rf0c3120 r04b73b6 38 38 39 39 bool parse(const char *, const char * & ); 40 bool parse(const char *, unsigned & ); 41 bool parse(const char *, size_t & ); 40 42 bool parse(const char *, int & ); -
benchmark/io/http/protocol.cfa
rf0c3120 r04b73b6 25 25 }; 26 26 27 _Static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0]))); 27 _Static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0]))); 28 29 const int http_codes[] = { 30 200, 31 400, 32 404, 33 413, 34 414, 35 }; 36 37 _Static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0]))); 38 39 int code_val(HttpCode code) { 40 return http_codes[code]; 41 } 28 42 29 43 static inline int answer( int fd, const char * it, int len) { … … 61 75 for() { 62 76 int ret = cfa_read(fd, it, count); 77 if(ret == 0 ) return [OK200, true, 0p, 0]; 63 78 if(ret < 0 ) { 64 if( errno ) return [OK200, true, 0p, 0];65 79 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ; 66 80 abort( "read error: (%d) %s\n", (int)errno, strerror(errno) ); -
benchmark/io/http/protocol.hfa
rf0c3120 r04b73b6 10 10 }; 11 11 12 int code_val(HttpCode code); 13 12 14 int answer_error( int fd, HttpCode code ); 13 15 int answer_header( int fd, size_t size ); -
benchmark/io/http/worker.cfa
rf0c3120 r04b73b6 12 12 #include "filecache.hfa" 13 13 14 extern "C" {15 // extern ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);16 extern ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags);17 }18 19 ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count) {20 return splice(in_fd, offset, out_fd, 0p, count, 0);21 }22 23 24 14 //============================================================================================= 25 15 // Worker Thread 26 16 //============================================================================================= 27 17 void ?{}( Worker & this ) { 28 ((thread&)this){ "Server Worker Thread", *options. the_cluster};29 int ret = pipe(this.pipe);30 if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); }18 ((thread&)this){ "Server Worker Thread", *options.clopts.instance }; 19 this.pipe[0] = -1; 20 this.pipe[1] = -1; 31 21 } 32 22 33 23 void main( Worker & this ) { 24 park( __cfaabi_dbg_ctx ); 25 /* paranoid */ assert( this.pipe[0] != -1 ); 26 /* paranoid */ assert( this.pipe[1] != -1 ); 27 34 28 CONNECTION: 35 while( int fd = take(wait_connect); fd >= 0) { 36 printf("New connection, waiting for requests\n"); 29 for() { 30 int fd = take(wait_connect); 31 if (fd < 0) break; 32 33 printf("New connection %d, waiting for requests\n", fd); 37 34 REQUEST: 38 35 for() { … … 43 40 44 41 // Read the http request 45 size_t len = 1024;42 size_t len = options.socket.buflen; 46 43 char buffer[len]; 47 44 printf("Reading request\n"); … … 56 53 // If this wasn't a request retrun 400 57 54 if( code != OK200 ) { 58 printf("Invalid Request \n");55 printf("Invalid Request : %d\n", code_val(code)); 59 56 answer_error(fd, code); 60 57 continue REQUEST; 61 58 } 62 59 63 printf("Request for file %.*s\n", name_size, file);60 printf("Request for file %.*s\n", (int)name_size, file); 64 61 65 62 // Get the fd from the file cache … … 90 87 //============================================================================================= 91 88 void ?{}( Acceptor & this, int sockfd, struct sockaddr * addr, socklen_t * addrlen, int flags ) { 92 ((thread&)this){ "Acceptor Thread", *options. the_cluster};89 ((thread&)this){ "Acceptor Thread", *options.clopts.instance }; 93 90 this.sockfd = sockfd; 94 91 this.addr = addr; … … 105 102 } 106 103 107 printf("New connection accepted\n");104 printf("New connection accepted\n"); 108 105 put( wait_connect, ret ); 109 }106 } 110 107 }
Note:
See TracChangeset
for help on using the changeset viewer.