- Timestamp:
- Mar 4, 2021, 7:40:25 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 77d601f
- Parents:
- 342af53 (diff), a5040fe (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- benchmark
- Files:
-
- 4 added
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/Makefile.am
r342af53 r8e4aa05 502 502 503 503 compile-io$(EXEEXT): 504 $(CFACOMPILE) -DNO_COMPILED_PRAGMA -fsyntax-only -w $(testdir)/io 1.cfa504 $(CFACOMPILE) -DNO_COMPILED_PRAGMA -fsyntax-only -w $(testdir)/io/io.cfa 505 505 506 506 compile-monitor$(EXEEXT): -
benchmark/io/http/filecache.cfa
r342af53 r8e4aa05 4 4 #include <string.h> 5 5 6 #include <fstream.hfa> 6 7 #include <stdlib.hfa> 7 8 … … 182 183 conflicts += put_file( raw[i], fd ); 183 184 } 184 printf("Filled cache from path \"%s\" with %zu files\n", path, fcount);185 sout | "Filled cache from path \"" | path | "\" with" | fcount | "files"; 185 186 if( conflicts > 0 ) { 186 printf("Found %d conflicts (seed: %u)\n", conflicts, options.file_cache.hash_seed);187 sout | "Found" | conflicts | "conflicts (seed: " | options.file_cache.hash_seed | ")"; 187 188 #if defined(REJECT_CONFLICTS) 188 189 abort("Conflicts found in the cache"); … … 191 192 192 193 if(options.file_cache.list) { 193 printf("Listing files and exiting\n");194 sout | "Listing files and exiting"; 194 195 for(i; fcount) { 195 196 int s; char u; 196 197 [s, u] = human_size(raw[i].size); 197 printf("%4d%c - %s\n", s, u, raw[i].file);198 sout | s | u | "-" | raw[i].file; 198 199 free(raw[i].file); 199 200 } … … 208 209 209 210 [int *, int] filefds(int extra) { 211 if(!options.file_cache.path) { 212 int * data = alloc(extra); 213 return [data, 0]; 214 } 215 210 216 if(!file_cache.entries) { 211 217 abort("File cache not filled!\n"); -
benchmark/io/http/main.cfa
r342af53 r8e4aa05 6 6 #include <unistd.h> 7 7 extern "C" { 8 #include <signal.h> 8 9 #include <sys/socket.h> 9 10 #include <netinet/in.h> 10 11 } 11 12 13 #include <fstream.hfa> 12 14 #include <kernel.hfa> 15 #include <iofwd.hfa> 13 16 #include <stats.hfa> 14 17 #include <time.hfa> … … 26 29 27 30 //============================================================================================= 31 // Stats Printer 32 //=============================================================================================' 33 34 thread StatsPrinter {}; 35 36 void ?{}( StatsPrinter & this, cluster & cl ) { 37 ((thread&)this){ "Stats Printer Thread", cl }; 38 } 39 40 void ^?{}( StatsPrinter & mutex this ) {} 41 42 void main(StatsPrinter & this) { 43 LOOP: for() { 44 waitfor( ^?{} : this) { 45 break LOOP; 46 } 47 or else {} 48 49 sleep(10`s); 50 51 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO ); 52 } 53 } 54 55 //============================================================================================= 28 56 // Globals 29 57 //============================================================================================= 30 struct ServerProc { 31 processor self; 58 struct ServerCluster { 59 cluster self; 60 processor * procs; 61 // io_context * ctxs; 62 StatsPrinter * prnt; 63 32 64 }; 33 65 34 void ?{}( ServerProc & this ) { 35 /* paranoid */ assert( options.clopts.instance != 0p ); 36 (this.self){ "Benchmark Processor", *options.clopts.instance }; 66 void ?{}( ServerCluster & this ) { 67 (this.self){ "Server Cluster", options.clopts.params }; 68 69 this.procs = alloc(options.clopts.nprocs); 70 for(i; options.clopts.nprocs) { 71 (this.procs[i]){ "Benchmark Processor", this.self }; 72 73 #if !defined(__CFA_NO_STATISTICS__) 74 if( options.clopts.procstats ) { 75 print_stats_at_exit( *this.procs, this.self.print_stats ); 76 } 77 if( options.clopts.viewhalts ) { 78 print_halts( *this.procs ); 79 } 80 #endif 81 } 82 83 if(options.stats) { 84 this.prnt = alloc(); 85 (*this.prnt){ this.self }; 86 } else { 87 this.prnt = 0p; 88 } 37 89 38 90 #if !defined(__CFA_NO_STATISTICS__) 39 if( options.clopts.procstats ) { 40 print_stats_at_exit( this.self, options.clopts.instance->print_stats ); 41 } 42 if( options.clopts.viewhalts ) { 43 print_halts( this.self ); 44 } 91 print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO ); 45 92 #endif 93 94 options.clopts.instance[options.clopts.cltr_cnt] = &this.self; 95 options.clopts.cltr_cnt++; 96 } 97 98 void ^?{}( ServerCluster & this ) { 99 delete(this.prnt); 100 101 for(i; options.clopts.nprocs) { 102 ^(this.procs[i]){}; 103 } 104 free(this.procs); 105 106 ^(this.self){}; 46 107 } 47 108 … … 53 114 //=============================================================================================' 54 115 int main( int argc, char * argv[] ) { 116 __sighandler_t s = 1p; 117 signal(SIGPIPE, s); 118 55 119 //=================== 56 120 // Parse args 57 const char * path =parse_options(argc, argv);121 parse_options(argc, argv); 58 122 59 123 //=================== 60 124 // Open Files 61 printf("Filling cache from %s\n", path); 62 fill_cache( path ); 125 if( options.file_cache.path ) { 126 sout | "Filling cache from" | options.file_cache.path; 127 fill_cache( options.file_cache.path ); 128 } 63 129 64 130 //=================== 65 131 // Open Socket 66 printf("%ld : Listening on port %d\n", getpid(), options.socket.port);132 sout | getpid() | ": Listening on port" | options.socket.port; 67 133 int server_fd = socket(AF_INET, SOCK_STREAM, 0); 68 134 if(server_fd < 0) { … … 84 150 if(errno == EADDRINUSE) { 85 151 if(waited == 0) { 86 printf("Waiting for port\n");152 sout | "Waiting for port"; 87 153 } else { 88 printf("\r%d", waited);89 f flush(stdout);154 sout | "\r" | waited | nonl; 155 flush( sout ); 90 156 } 91 157 waited ++; … … 106 172 // Run Server Cluster 107 173 { 108 cluster cl = { "Server Cluster", options.clopts.params };109 #if !defined(__CFA_NO_STATISTICS__)110 print_stats_at_exit( cl, CFA_STATS_READY_Q | CFA_STATS_IO );111 #endif112 options.clopts.instance = &cl;113 114 115 174 int pipe_cnt = options.clopts.nworkers * 2; 116 175 int pipe_off; … … 122 181 } 123 182 124 if(options.file_cache.fixed_fds) {125 register_fixed_files(cl, fds, pipe_off);126 }183 // if(options.file_cache.path && options.file_cache.fixed_fds) { 184 // register_fixed_files(cl, fds, pipe_off); 185 // } 127 186 128 187 { 129 Server Proc procs[options.clopts.nprocs];188 ServerCluster cl[options.clopts.nclusters]; 130 189 131 190 init_protocol(); … … 148 207 unpark( workers[i] ); 149 208 } 150 printf("%d workers started on %d processors\n", options.clopts.nworkers, options.clopts.nprocs); 209 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters"; 210 for(i; options.clopts.nclusters) { 211 sout | options.clopts.thrd_cnt[i] | nonl; 212 } 213 sout | nl; 151 214 { 152 215 char buffer[128]; 153 while(!feof(stdin)) { 154 fgets(buffer, 128, stdin); 216 for() { 217 int ret = cfa_read(0, buffer, 128, 0); 218 if(ret == 0) break; 219 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) ); 220 sout | "User wrote '" | "" | nonl; 221 write(sout, buffer, ret - 1); 222 sout | "'"; 155 223 } 156 224 157 printf("Shutting Down\n"); 158 } 159 225 sout | "Shutdown received"; 226 } 227 228 sout | "Notifying connections..." | nonl; flush( sout ); 160 229 for(i; options.clopts.nworkers) { 161 printf("Cancelling %p\n", (void*)workers[i].cancel.target);162 230 workers[i].done = true; 163 cancel(workers[i].cancel);164 }165 166 printf("Shutting down socket\n");231 } 232 sout | "done"; 233 234 sout | "Shutting down socket..." | nonl; flush( sout ); 167 235 int ret = shutdown( server_fd, SHUT_RD ); 168 if( ret < 0 ) { abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); } 236 if( ret < 0 ) { 237 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); 238 } 239 sout | "done"; 169 240 170 241 //=================== 171 242 // Close Socket 172 printf("Closing Socket\n");243 sout | "Closing Socket..." | nonl; flush( sout ); 173 244 ret = close( server_fd ); 174 245 if(ret < 0) { 175 246 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 176 247 } 177 } 178 printf("Workers Closed\n"); 179 248 sout | "done"; 249 250 sout | "Stopping connection threads..." | nonl; flush( sout ); 251 } 252 sout | "done"; 253 254 sout | "Stopping protocol threads..." | nonl; flush( sout ); 180 255 deinit_protocol(); 181 } 182 256 sout | "done"; 257 258 sout | "Stopping processors/clusters..." | nonl; flush( sout ); 259 } 260 sout | "done"; 261 262 sout | "Closing splice fds..." | nonl; flush( sout ); 183 263 for(i; pipe_cnt) { 184 264 ret = close( fds[pipe_off + i] ); … … 188 268 } 189 269 free(fds); 190 191 } 270 sout | "done"; 271 272 sout | "Stopping processors..." | nonl; flush( sout ); 273 } 274 sout | "done"; 192 275 193 276 //=================== 194 277 // Close Files 195 printf("Closing Files\n"); 196 close_cache(); 197 } 278 if( options.file_cache.path ) { 279 sout | "Closing open files..." | nonl; flush( sout ); 280 close_cache(); 281 sout | "done"; 282 } 283 } -
benchmark/io/http/options.cfa
r342af53 r8e4aa05 9 9 } 10 10 11 #include <bitmanip.hfa> 12 #include <fstream.hfa> 11 13 #include <kernel.hfa> 12 14 #include <parseargs.hfa> 15 #include <stdlib.hfa> 13 16 17 #include <stdlib.h> 14 18 #include <string.h> 15 19 16 20 Options options @= { 17 21 false, // log 22 false, // stats 18 23 19 24 { // file_cache 25 0, // path 20 26 0, // open_flags; 21 27 42u, // hash_seed; … … 32 38 33 39 { // cluster 40 1, // nclusters; 34 41 1, // nprocs; 35 42 1, // nworkers; 36 0, // flags;43 {}, // params; 37 44 false, // procstats 38 45 false, // viewhalts … … 41 48 }; 42 49 43 const char *parse_options( int argc, char * argv[] ) {44 bool subthrd = false;45 bool eagrsub= false;46 bool fixedfd= false;47 bool sqkpoll = false;48 bool i okpoll= false;49 unsigned sublen = 16; 50 void parse_options( int argc, char * argv[] ) { 51 // bool fixedfd = false; 52 // bool sqkpoll = false; 53 // bool iokpoll = false; 54 unsigned nentries = 16; 55 bool isolate = false; 56 50 57 51 58 static cfa_option opt[] = { 52 { 'p', "port", "Port the server will listen on", options.socket.port},53 { 'c', "cpus", "Number of processors to use", options.clopts.nprocs},54 { 'L', "log", "Enable logs", options.log, parse_settrue},55 {' t', "threads", "Number of worker threads to use", options.clopts.nworkers},56 {' b', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog},57 {' r', "request_len", "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen},58 {' S', "seed", "seed to use for hashing", options.file_cache.hash_seed},59 {' C', "cache-size", "Size of the cache to use, if set to small, will uses closes power of 2", options.file_cache.size},60 {' l', "list-files", "List the files in the specified path and exit", options.file_cache.list, parse_settrue},61 {' s', "submitthread", "If set, cluster uses polling thread to submit I/O", subthrd, parse_settrue },62 {' e', "eagersubmit", "If set, cluster submits I/O eagerly but still aggregates submits", eagrsub, parse_settrue},63 {'f', "fixed-fds", "If set, files are open eagerly and pre-registered with the cluster", fixedfd, parse_settrue},64 {'k', "kpollsubmit", "If set, cluster uses IORING_SETUP_SQPOLL, implies -f", sqkpoll, parse_settrue },65 {'i', "kpollcomplete", "If set, cluster uses IORING_SETUP_IOPOLL", iokpoll, parse_settrue },66 {' L', "submitlength", "Max number of submitions that can be submitted together", sublen},59 { 'p', "port", "Port the server will listen on", options.socket.port}, 60 { 'c', "cpus", "Number of processors to use", options.clopts.nprocs}, 61 { 't', "threads", "Number of worker threads to use", options.clopts.nworkers}, 62 {'\0', "isolate", "Create one cluster per processor", isolate, parse_settrue}, 63 {'\0', "log", "Enable logs", options.log, parse_settrue}, 64 {'\0', "stats", "Enable statistics", options.stats, parse_settrue}, 65 {'\0', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog}, 66 {'\0', "request_len", "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen}, 67 {'\0', "seed", "seed to use for hashing", options.file_cache.hash_seed }, 68 {'\0', "cache-size", "Size of the cache to use, if set to small, will uses closes power of 2", options.file_cache.size }, 69 {'\0', "list-files", "List the files in the specified path and exit", options.file_cache.list, parse_settrue }, 70 // { 'f', "fixed-fds", "If set, files are open eagerly and pre-registered with the cluster", fixedfd, parse_settrue}, 71 // { 'k', "kpollsubmit", "If set, cluster uses IORING_SETUP_SQPOLL, implies -f", sqkpoll, parse_settrue }, 72 // { 'i', "kpollcomplete", "If set, cluster uses IORING_SETUP_IOPOLL", iokpoll, parse_settrue }, 73 {'e', "numentries", "Number of I/O entries", nentries }, 67 74 68 75 }; … … 72 79 parse_args( argc, argv, opt, opt_cnt, "[OPTIONS]... [PATH]\ncforall http server", left ); 73 80 74 options.clopts.params.poller_submits = subthrd; 75 options.clopts.params.eager_submits = eagrsub; 76 77 if( fixedfd ) { 78 options.file_cache.fixed_fds = true; 81 if( !is_pow2(nentries) ) { 82 unsigned v = nentries; 83 v--; 84 v |= v >> 1; 85 v |= v >> 2; 86 v |= v >> 4; 87 v |= v >> 8; 88 v |= v >> 16; 89 v++; 90 serr | "Warning: num_entries not a power of 2" | '(' | nentries | ')' | "raising to " | v; 91 nentries = v; 92 } 93 if(isolate) { 94 options.clopts.nclusters = options.clopts.nprocs; 95 options.clopts.nprocs = 1; 96 } 97 options.clopts.params.num_entries = nentries; 98 options.clopts.instance = alloc(options.clopts.nclusters); 99 options.clopts.thrd_cnt = alloc(options.clopts.nclusters); 100 options.clopts.cltr_cnt = 0; 101 for(i; options.clopts.nclusters) { 102 options.clopts.thrd_cnt[i] = 0; 79 103 } 80 104 81 if( sqkpoll ) {82 options.clopts.params.poll_submit = true;83 options.file_cache.fixed_fds = true;84 }85 105 86 if( iokpoll ) { 87 options.clopts.params.poll_complete = true; 88 options.file_cache.open_flags |= O_DIRECT; 89 } 106 // if( fixedfd ) { 107 // options.file_cache.fixed_fds = true; 108 // } 90 109 91 options.clopts.params.num_ready = sublen; 110 // if( sqkpoll ) { 111 // options.file_cache.fixed_fds = true; 112 // } 92 113 93 if( left[0] == 0p ) { return "."; } 114 // if( iokpoll ) { 115 // options.file_cache.open_flags |= O_DIRECT; 116 // } 117 118 if( left[0] == 0p ) { return; } 94 119 95 120 const char * path = left[0]; … … 97 122 98 123 if( left[0] != 0p ) { 99 abort("Too many trailing arguments!\n"); 124 serr | "Too many trailing arguments!" | '\'' | path | '\''; 125 while(left[0] != 0p) { 126 serr | " - " | left[0]; 127 left++; 128 } 129 exit(EXIT_FAILURE); 100 130 } 101 131 102 returnpath;132 options.file_cache.path = path; 103 133 } -
benchmark/io/http/options.hfa
r342af53 r8e4aa05 9 9 struct Options { 10 10 bool log; 11 bool stats; 11 12 12 13 struct { 14 const char * path; 13 15 int open_flags; 14 16 uint32_t hash_seed; … … 25 27 26 28 struct { 29 int nclusters; 27 30 int nprocs; 28 31 int nworkers; … … 30 33 bool procstats; 31 34 bool viewhalts; 32 cluster * instance; 35 cluster ** instance; 36 size_t * thrd_cnt; 37 size_t cltr_cnt; 33 38 } clopts; 34 39 }; … … 36 41 extern Options options; 37 42 38 const char *parse_options( int argc, char * argv[] );43 void parse_options( int argc, char * argv[] ); -
benchmark/io/http/protocol.cfa
r342af53 r8e4aa05 5 5 #include <fcntl.h> 6 6 } 7 8 #include <fstream.hfa> 7 9 #include <iofwd.hfa> 8 10 … … 11 13 extern "C" { 12 14 int snprintf ( char * s, size_t n, const char * format, ... ); 13 #include <linux/io_uring.h>15 // #include <linux/io_uring.h> 14 16 } 15 17 #include <string.h> … … 18 20 #include "options.hfa" 19 21 20 const char * volatile date = 0p; 21 22 const char * http_msgs[] = { 23 "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: %zu \n\n", 24 "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",25 "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",26 "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 27 "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 28 };22 #define PLAINTEXT_1WRITE 23 #define PLAINTEXT_NOCOPY 24 25 struct https_msg_str { 26 char msg[512]; 27 size_t len; 28 }; 29 30 const https_msg_str * volatile http_msgs[KNOWN_CODES] = { 0 }; 29 31 30 32 _Static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0]))); 31 33 32 const int http_codes[] = { 34 const int http_codes[KNOWN_CODES] = { 35 200, 33 36 200, 34 37 400, 35 38 404, 39 405, 40 408, 36 41 413, 37 42 414, … … 47 52 while(len > 0) { 48 53 // Call write 49 int ret = cfa_write(fd, it, len, 0, -1`s, 0p, 0p); 50 // int ret = write(fd, it, len); 51 if( ret < 0 ) { if( errno != EAGAIN && errno != EWOULDBLOCK) abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) ); } 54 int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY); 55 if( ret < 0 ) { 56 if( errno == ECONNRESET || errno == EPIPE ) return -ECONNRESET; 57 if( errno == EAGAIN || errno == EWOULDBLOCK) return -EAGAIN; 58 59 abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) ); 60 } 52 61 53 62 // update it/len … … 61 70 /* paranoid */ assert( code < KNOWN_CODES && code != OK200 ); 62 71 int idx = (int)code; 63 return answer( fd, http_msgs[idx] , strlen( http_msgs[idx] ));72 return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len ); 64 73 } 65 74 66 75 int answer_header( int fd, size_t size ) { 67 const char * fmt = http_msgs[OK200]; 68 int len = 200; 69 char buffer[len]; 70 len = snprintf(buffer, len, fmt, date, size); 76 char buffer[512]; 77 char * it = buffer; 78 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len); 79 it += http_msgs[OK200]->len; 80 int len = http_msgs[OK200]->len; 81 len += snprintf(it, 512 - len, "%d \n\n", size); 71 82 return answer( fd, buffer, len ); 72 83 } 73 84 74 int answer_plain( int fd, char buffer[], size_t size ) { 75 int ret = answer_header(fd, size); 85 #if defined(PLAINTEXT_NOCOPY) 86 int answer_plaintext( int fd ) { 87 return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len + 1); // +1 cause snprintf doesn't count nullterminator 88 } 89 #elif defined(PLAINTEXT_1WRITE) 90 int answer_plaintext( int fd ) { 91 char text[] = "Hello, World!\n"; 92 char buffer[512 + sizeof(text)]; 93 char * it = buffer; 94 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len); 95 it += http_msgs[OK200]->len; 96 int len = http_msgs[OK200]->len; 97 int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text)); 98 it += r; 99 len += r; 100 memcpy(it, text, sizeof(text)); 101 return answer(fd, buffer, len + sizeof(text)); 102 } 103 #else 104 int answer_plaintext( int fd ) { 105 char text[] = "Hello, World!\n"; 106 int ret = answer_header(fd, sizeof(text)); 76 107 if( ret < 0 ) return ret; 77 return answer(fd, buffer, size); 78 } 108 return answer(fd, text, sizeof(text)); 109 } 110 #endif 79 111 80 112 int answer_empty( int fd ) { … … 83 115 84 116 85 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len , io_cancellation * cancel) {117 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) { 86 118 char * it = buffer; 87 119 size_t count = len - 1; … … 89 121 READ: 90 122 for() { 91 int ret = cfa_re ad(fd, (void*)it, count, 0, -1`s, cancel, 0p);123 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY); 92 124 // int ret = read(fd, (void*)it, count); 93 125 if(ret == 0 ) return [OK200, true, 0, 0]; 94 126 if(ret < 0 ) { 95 127 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ; 96 // if( errno == EINVAL ) return [E400, true, 0, 0]; 128 if( errno == ECONNRESET ) return [E408, true, 0, 0]; 129 if( errno == EPIPE ) return [E408, true, 0, 0]; 97 130 abort( "read error: (%d) %s\n", (int)errno, strerror(errno) ); 98 131 } … … 108 141 } 109 142 110 if( options.log ) printf("%.*s\n", rlen, buffer); 143 if( options.log ) { 144 write(sout, buffer, rlen); 145 sout | nl; 146 } 111 147 112 148 it = buffer; … … 119 155 } 120 156 121 voidsendfile( int pipe[2], int fd, int ans_fd, size_t count ) {157 int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) { 122 158 unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE; 123 159 off_t offset = 0; 124 160 ssize_t ret; 125 161 SPLICE1: while(count > 0) { 126 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, 0, -1`s, 0p, 0p); 127 // ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags); 162 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY); 128 163 if( ret < 0 ) { 129 164 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1; 165 if( errno == ECONNRESET ) return -ECONNRESET; 166 if( errno == EPIPE ) return -EPIPE; 130 167 abort( "splice [0] error: (%d) %s\n", (int)errno, strerror(errno) ); 131 168 } … … 135 172 size_t in_pipe = ret; 136 173 SPLICE2: while(in_pipe > 0) { 137 ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, 0, -1`s, 0p, 0p); 138 // ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags); 174 ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY); 139 175 if( ret < 0 ) { 140 176 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2; 177 if( errno == ECONNRESET ) return -ECONNRESET; 178 if( errno == EPIPE ) return -EPIPE; 141 179 abort( "splice [1] error: (%d) %s\n", (int)errno, strerror(errno) ); 142 180 } … … 145 183 146 184 } 185 return count; 147 186 } 148 187 … … 153 192 #include <thread.hfa> 154 193 194 const char * original_http_msgs[] = { 195 "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: ", 196 "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 15\n\nHello, World!\n", 197 "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 198 "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 199 "HTTP/1.1 405 Method Not Allowed\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 200 "HTTP/1.1 408 Request Timeout\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 201 "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 202 "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 203 }; 204 155 205 struct date_buffer { 156 char buff[100];206 https_msg_str strs[KNOWN_CODES]; 157 207 }; 158 208 … … 163 213 164 214 void ?{}( DateFormater & this ) { 165 ((thread&)this){ "Server Date Thread", *options.clopts.instance };215 ((thread&)this){ "Server Date Thread", *options.clopts.instance[0] }; 166 216 this.idx = 0; 167 memset( this.buffers[0].buff, 0, sizeof(this.buffers[0]) );168 memset( this.buffers[1].buff, 0, sizeof(this.buffers[1]) );217 memset( &this.buffers[0], 0, sizeof(this.buffers[0]) ); 218 memset( &this.buffers[1], 0, sizeof(this.buffers[1]) ); 169 219 } 170 220 … … 176 226 or else {} 177 227 228 229 char buff[100]; 178 230 Time now = getTimeNsec(); 179 180 strftime( this.buffers[this.idx].buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now ); 181 182 char * next = this.buffers[this.idx].buff; 183 __atomic_exchange_n((char * volatile *)&date, next, __ATOMIC_SEQ_CST); 231 strftime( buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now ); 232 sout | "Updated date to '" | buff | "'"; 233 234 for(i; KNOWN_CODES) { 235 size_t len = snprintf( this.buffers[this.idx].strs[i].msg, 512, original_http_msgs[i], buff ); 236 this.buffers[this.idx].strs[i].len = len; 237 } 238 239 for(i; KNOWN_CODES) { 240 https_msg_str * next = &this.buffers[this.idx].strs[i]; 241 __atomic_exchange_n((https_msg_str * volatile *)&http_msgs[i], next, __ATOMIC_SEQ_CST); 242 } 184 243 this.idx = (this.idx + 1) % 2; 244 245 sout | "Date thread sleeping"; 185 246 186 247 sleep(1`s); -
benchmark/io/http/protocol.hfa
r342af53 r8e4aa05 1 1 #pragma once 2 3 struct io_cancellation;4 2 5 3 enum HttpCode { 6 4 OK200 = 0, 5 OK200_PlainText, 7 6 E400, 8 7 E404, 8 E405, 9 E408, 9 10 E413, 10 11 E414, … … 16 17 int answer_error( int fd, HttpCode code ); 17 18 int answer_header( int fd, size_t size ); 18 int answer_plain ( int fd, char buffer [], size_t size);19 int answer_plaintext( int fd ); 19 20 int answer_empty( int fd ); 20 21 21 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len , io_cancellation *);22 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len); 22 23 23 voidsendfile( int pipe[2], int fd, int ans_fd, size_t count );24 int sendfile( int pipe[2], int fd, int ans_fd, size_t count ); -
benchmark/io/http/worker.cfa
r342af53 r8e4aa05 6 6 #include <unistd.h> 7 7 8 #include <fstream.hfa> 8 9 #include <iofwd.hfa> 9 10 … … 16 17 //============================================================================================= 17 18 void ?{}( Worker & this ) { 18 ((thread&)this){ "Server Worker Thread", *options.clopts.instance }; 19 size_t cli = rand() % options.clopts.cltr_cnt; 20 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli] }; 21 options.clopts.thrd_cnt[cli]++; 19 22 this.pipe[0] = -1; 20 23 this.pipe[1] = -1; … … 33 36 CONNECTION: 34 37 for() { 35 if( options.log ) printf("=== Accepting connection ===\n"); 36 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, &this.cancel, 0p ); 37 // int fd = accept4( this.[sockfd, addr, addrlen, flags] ); 38 if( options.log ) sout | "=== Accepting connection ==="; 39 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], CFA_IO_LAZY ); 38 40 if(fd < 0) { 39 41 if( errno == ECONNABORTED ) break; 40 if( errno == EINVAL && this.done) break;42 if( this.done && (errno == EINVAL || errno == EBADF) ) break; 41 43 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 42 44 } 45 if(this.done) break; 43 46 44 if( options.log ) printf("=== New connection %d, waiting for requests ===\n", fd);47 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; 45 48 REQUEST: 46 49 for() { … … 53 56 size_t len = options.socket.buflen; 54 57 char buffer[len]; 55 if( options.log ) printf("=== Reading request ===\n");56 [code, closed, file, name_size] = http_read(fd, buffer, len , &this.cancel);58 if( options.log ) sout | "=== Reading request ==="; 59 [code, closed, file, name_size] = http_read(fd, buffer, len); 57 60 58 61 // if we are done, break out of the loop 59 if( closed ) { 60 if( options.log ) printf("=== Connection closed ===\n"); 61 close(fd); 62 continue CONNECTION; 63 } 62 if( closed ) break REQUEST; 64 63 65 64 // If this wasn't a request retrun 400 66 65 if( code != OK200 ) { 67 printf("=== Invalid Request : %d ===\n", code_val(code));66 sout | "=== Invalid Request :" | code_val(code) | "==="; 68 67 answer_error(fd, code); 69 68 continue REQUEST; … … 71 70 72 71 if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) { 73 if( options.log ) printf("=== Request for /plaintext ===\n");72 if( options.log ) sout | "=== Request for /plaintext ==="; 74 73 75 char text[] = "Hello, World!\n"; 74 int ret = answer_plaintext(fd); 75 if( ret == -ECONNRESET ) break REQUEST; 76 76 77 // Send the header 78 answer_plain(fd, text, sizeof(text)); 79 80 if( options.log ) printf("=== Answer sent ===\n"); 77 if( options.log ) sout | "=== Answer sent ==="; 81 78 continue REQUEST; 82 79 } 83 80 84 81 if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) { 85 if( options.log ) printf("=== Request for /ping ===\n");82 if( options.log ) sout | "=== Request for /ping ==="; 86 83 87 84 // Send the header 88 answer_empty(fd); 85 int ret = answer_empty(fd); 86 if( ret == -ECONNRESET ) break REQUEST; 89 87 90 if( options.log ) printf("=== Answer sent ===\n");88 if( options.log ) sout | "=== Answer sent ==="; 91 89 continue REQUEST; 92 90 } 93 91 94 if( options.log ) printf("=== Request for file %.*s ===\n", (int)name_size, file); 92 if( options.log ) { 93 sout | "=== Request for file " | nonl; 94 write(sout, file, name_size); 95 sout | " ==="; 96 } 97 98 if( !options.file_cache.path ) { 99 if( options.log ) { 100 sout | "=== File Not Found (" | nonl; 101 write(sout, file, name_size); 102 sout | ") ==="; 103 } 104 answer_error(fd, E405); 105 continue REQUEST; 106 } 95 107 96 108 // Get the fd from the file cache … … 101 113 // If we can't find the file, return 404 102 114 if( ans_fd < 0 ) { 103 printf("=== File Not Found ===\n"); 115 if( options.log ) { 116 sout | "=== File Not Found (" | nonl; 117 write(sout, file, name_size); 118 sout | ") ==="; 119 } 104 120 answer_error(fd, E404); 105 121 continue REQUEST; … … 107 123 108 124 // Send the header 109 answer_header(fd, count); 125 int ret = answer_header(fd, count); 126 if( ret == -ECONNRESET ) break REQUEST; 110 127 111 128 // Send the desired file 112 sendfile( this.pipe, fd, ans_fd, count); 129 ret = sendfile( this.pipe, fd, ans_fd, count); 130 if( ret == -ECONNRESET ) break REQUEST; 113 131 114 if( options.log ) printf("=== Answer sent ===\n");132 if( options.log ) sout | "=== Answer sent ==="; 115 133 } 134 135 if( options.log ) sout | "=== Connection closed ==="; 136 close(fd); 137 continue CONNECTION; 116 138 } 117 139 } -
benchmark/io/http/worker.hfa
r342af53 r8e4aa05 17 17 socklen_t * addrlen; 18 18 int flags; 19 io_cancellation cancel;20 19 volatile bool done; 21 20 };
Note:
See TracChangeset
for help on using the changeset viewer.