Changes in / [e93cbfa:289a21c]
- Location:
- benchmark/io/http
- Files:
-
- 1 added
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/filecache.cfa
re93cbfa r289a21c 82 82 83 83 [int fd, size_t size] get_file( * const char file, size_t len ) { 84 uint32_t idx = murmur3_32( (const uint8_t *)file, len, options. hash_seed ) % file_cache.size;84 uint32_t idx = murmur3_32( (const uint8_t *)file, len, options.file_cache.hash_seed ) % file_cache.size; 85 85 86 86 for(int i = 0;; i++) { … … 99 99 100 100 int put_file( cache_line & entry ) { 101 uint32_t idx = murmur3_32( (const uint8_t *)entry.file, strlen(entry.file), options. hash_seed ) % file_cache.size;101 uint32_t idx = murmur3_32( (const uint8_t *)entry.file, strlen(entry.file), options.file_cache.hash_seed ) % file_cache.size; 102 102 103 103 int i = 0; … … 136 136 raw[idx].file = strdup(fpath+2); 137 137 raw[idx].size = sb->st_size; 138 if( !options.file_cache _list ) {139 raw[idx].fd = open( fpath, options. open_flags );138 if( !options.file_cache.list ) { 139 raw[idx].fd = open( fpath, options.file_cache.open_flags ); 140 140 if(raw[idx].fd < 0) { 141 141 abort( "open file error: (%d) %s\n", (int)errno, strerror(errno) ); … … 154 154 } 155 155 156 if(options.file_cache_list) { 156 // Step 2 create the cache 157 file_cache.size = options.file_cache.size > 0 ? options.file_cache.size : fsize; 158 if( file_cache.size < fcount ) { 159 abort("File Cache too small\n"); 160 } 161 162 file_cache.entries = anew(file_cache.size); 163 164 // Step 3 fill the cache 165 int conflicts = 0; 166 for(i; fcount) { 167 conflicts += put_file( raw[i] ); 168 } 169 printf("Filled cache from path \"%s\" with %zu files\n", path, fcount); 170 if( conflicts > 0 ) { 171 printf("Found %d conflicts (seed: %u)\n", conflicts, options.file_cache.hash_seed); 172 #if defined(REJECT_CONFLICTS) 173 abort("Conflicts found in the cache"); 174 #endif 175 } 176 177 if(options.file_cache.list) { 157 178 printf("Listing files and exiting\n"); 158 179 for(i; fcount) { … … 163 184 } 164 185 free(raw); 186 adelete(file_cache.size, file_cache.entries); 165 187 exit(0); 166 }167 168 // Step 2 create the cache169 file_cache.size = options.file_cache_size > 0 ? options.file_cache_size : fsize;170 if( file_cache.size < fcount ) {171 abort("File Cache too small\n");172 }173 174 file_cache.entries = anew(fsize);175 176 // Step 3 fill the cache177 int conflicts = 0;178 for(i; fcount) {179 printf("Added file %s\n", raw[i].file);180 conflicts += put_file( raw[i] );181 }182 printf("Filled cache from path \"%s\" with %zu files\n", path, fcount);183 if( conflicts > 0 ) {184 printf("Found %d conflicts (seed: %u)\n", conflicts, options.hash_seed);185 #if defined(REJECT_CONFLICTS)186 abort("Conflicts found in the cache");187 #endif188 188 } 189 189 -
benchmark/io/http/main.cfa
re93cbfa r289a21c 17 17 #include "filecache.hfa" 18 18 #include "options.hfa" 19 #include "parseargs.hfa"20 19 #include "worker.hfa" 21 20 … … 23 22 // Globals 24 23 //============================================================================================= 25 Options options @= {26 0, // open_flags;27 42u, // hash_seed;28 0, // file_cache_size;29 false, // file_cache_list;30 false, // procstats;31 false, // viewhalts;32 0 // the_cluster;33 };34 35 24 channel & wait_connect; 36 25 … … 40 29 41 30 void ?{}( ServerProc & this ) { 42 /* paranoid */ assert( options. the_cluster!= 0p );43 (this.self){ "Benchmark Processor", *options. the_cluster};31 /* paranoid */ assert( options.clopts.instance != 0p ); 32 (this.self){ "Benchmark Processor", *options.clopts.instance }; 44 33 45 34 #if !defined(__CFA_NO_STATISTICS__) 46 if( options. procstats ) {47 print_stats_at_exit( this.self, options. the_cluster->print_stats );35 if( options.clopts.procstats ) { 36 print_stats_at_exit( this.self, options.clopts.instance->print_stats ); 48 37 } 49 if( options. viewhalts ) {38 if( options.clopts.viewhalts ) { 50 39 print_halts( this.self ); 51 40 } … … 57 46 //=============================================================================================' 58 47 int main( int argc, char * argv[] ) { 59 int port = 8080;60 int backlog = 10;61 int nprocs = 1;62 int nworkers = 1;63 int cl_flags = 0;64 int chan_size = 10;65 const char * path = ".";66 48 //=================== 67 49 // Parse args 68 static cfa_option opt[] = { 69 {'p', "port", "Port the server will listen on", port}, 70 {'c', "cpus", "Number of processors to use", nprocs}, 71 {'t', "threads", "Number of worker threads to use", nworkers}, 72 {'b', "accept-backlog", "Maximum number of pending accepts", backlog}, 73 {'B', "channel-size", "Maximum number of accepted connection pending", chan_size}, 74 {'S', "seed", "seed to use for hashing", options.hash_seed }, 75 {'C', "cache-size", "Size of the cache to use, if set to small, will uses closes power of 2", options.file_cache_size }, 76 {'l', "list-files", "List the files in the specified path and exit", options.file_cache_list, parse_settrue } 77 78 }; 79 int opt_cnt = sizeof(opt) / sizeof(cfa_option); 80 81 char **left; 82 parse_args( argc, argv, opt, opt_cnt, "[OPTIONS]... [PATH]\ncforall http server", left ); 83 if( left[0] != 0p ) { 84 path = left[0]; 85 left++; 86 } 87 if( left[0] != 0p ) { 88 abort("Too many trailing arguments!\n"); 89 } 90 50 const char * path = parse_options(argc, argv); 91 51 92 52 //=================== … … 97 57 //=================== 98 58 // Open Socket 99 printf("Listening on port %d\n", port);59 printf("Listening on port %d\n", options.socket.port); 100 60 int server_fd = socket(AF_INET, SOCK_STREAM, 0); 101 61 if(server_fd < 0) { … … 109 69 address.sin_family = AF_INET; 110 70 address.sin_addr.s_addr = htonl(INADDR_ANY); 111 address.sin_port = htons( port );71 address.sin_port = htons( options.socket.port ); 112 72 113 73 ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) ); … … 116 76 } 117 77 118 ret = listen( server_fd, backlog );78 ret = listen( server_fd, options.socket.backlog ); 119 79 if(ret < 0) { 120 80 abort( "listen error: (%d) %s\n", (int)errno, strerror(errno) ); … … 124 84 // Run Server Cluster 125 85 { 126 cluster cl = { "Server Cluster", cl_flags };86 cluster cl = { "Server Cluster", options.clopts.flags }; 127 87 #if !defined(__CFA_NO_STATISTICS__) 128 88 print_stats_at_exit( cl, CFA_STATS_READY_Q | CFA_STATS_IO ); 129 89 #endif 130 options. the_cluster= &cl;90 options.clopts.instance = &cl; 131 91 132 channel chan = { chan_size };92 channel chan = { options.clopts.chan_size }; 133 93 &wait_connect = &chan; 134 94 135 95 { 136 ServerProc procs[ nprocs];96 ServerProc procs[options.clopts.nprocs]; 137 97 { 138 Worker workers[ nworkers];139 printf("%d workers started on %d processors\n", nworkers,nprocs);98 Worker workers[options.clopts.nworkers]; 99 printf("%d workers started on %d processors\n", options.clopts.nworkers, options.clopts.nprocs); 140 100 { 141 101 Acceptor acceptor = { server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen, 0 }; 102 103 char buffer[128]; 104 while(!feof(stdin)) { 105 fgets(buffer, 128, stdin); 106 } 107 108 printf("Shutting Down\n"); 142 109 } 143 printf(" Shutting Down\n");110 printf("Acceptor Closed\n"); 144 111 145 112 // Clean-up the workers 146 for( nworkers) {113 for(options.clopts.nworkers) { 147 114 put( wait_connect, -1 ); 148 115 } 149 116 } 117 printf("Workers Closed\n"); 150 118 } 151 119 } -
benchmark/io/http/options.hfa
re93cbfa r289a21c 6 6 7 7 struct Options { 8 int open_flags; 9 uint32_t hash_seed; 10 size_t file_cache_size; 11 bool file_cache_list; 12 bool procstats; 13 bool viewhalts; 14 cluster * the_cluster; 8 struct { 9 int open_flags; 10 uint32_t hash_seed; 11 size_t size; 12 bool list; 13 bool fixed_fds; 14 } file_cache; 15 16 struct { 17 int port; 18 int backlog; 19 } socket; 20 21 struct { 22 int nprocs; 23 int nworkers; 24 int flags; 25 int chan_size; 26 bool procstats; 27 bool viewhalts; 28 cluster * instance; 29 } clopts; 15 30 }; 16 31 17 32 extern Options options; 33 34 const char * parse_options( int argc, char * argv[] ); -
benchmark/io/http/protocol.cfa
re93cbfa r289a21c 25 25 }; 26 26 27 _Static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0]))); 27 _Static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0]))); 28 29 const int http_codes[] = { 30 200, 31 400, 32 404, 33 413, 34 414, 35 }; 36 37 _Static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0]))); 38 39 int code_val(HttpCode code) { 40 return http_codes[code]; 41 } 28 42 29 43 static inline int answer( int fd, const char * it, int len) { … … 61 75 for() { 62 76 int ret = cfa_read(fd, it, count); 77 if(ret == 0 ) return [OK200, true, 0p, 0]; 63 78 if(ret < 0 ) { 64 if( errno ) return [OK200, true, 0p, 0];65 79 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ; 66 80 abort( "read error: (%d) %s\n", (int)errno, strerror(errno) ); -
benchmark/io/http/protocol.hfa
re93cbfa r289a21c 10 10 }; 11 11 12 int code_val(HttpCode code); 13 12 14 int answer_error( int fd, HttpCode code ); 13 15 int answer_header( int fd, size_t size ); -
benchmark/io/http/worker.cfa
re93cbfa r289a21c 26 26 //============================================================================================= 27 27 void ?{}( Worker & this ) { 28 ((thread&)this){ "Server Worker Thread", *options. the_cluster};28 ((thread&)this){ "Server Worker Thread", *options.clopts.instance }; 29 29 int ret = pipe(this.pipe); 30 30 if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); } … … 33 33 void main( Worker & this ) { 34 34 CONNECTION: 35 while( int fd = take(wait_connect); fd >= 0) { 36 printf("New connection, waiting for requests\n"); 35 for() { 36 int fd = take(wait_connect); 37 if (fd < 0) break; 38 39 printf("New connection %d, waiting for requests\n", fd); 37 40 REQUEST: 38 41 for() { … … 56 59 // If this wasn't a request retrun 400 57 60 if( code != OK200 ) { 58 printf("Invalid Request \n");61 printf("Invalid Request : %d\n", code_val(code)); 59 62 answer_error(fd, code); 60 63 continue REQUEST; 61 64 } 62 65 63 printf("Request for file %.*s\n", name_size, file);66 printf("Request for file %.*s\n", (int)name_size, file); 64 67 65 68 // Get the fd from the file cache … … 90 93 //============================================================================================= 91 94 void ?{}( Acceptor & this, int sockfd, struct sockaddr * addr, socklen_t * addrlen, int flags ) { 92 ((thread&)this){ "Acceptor Thread", *options. the_cluster};95 ((thread&)this){ "Acceptor Thread", *options.clopts.instance }; 93 96 this.sockfd = sockfd; 94 97 this.addr = addr; … … 105 108 } 106 109 107 110 printf("New connection accepted\n"); 108 111 put( wait_connect, ret ); 109 112 } 110 113 }
Note: See TracChangeset
for help on using the changeset viewer.