Changeset 77ff383 for benchmark/io
- Timestamp:
- Jan 15, 2021, 3:22:50 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 02b73ea
- Parents:
- 03ecdcf (diff), d46bdac (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - Location:
- benchmark/io/http
- Files:
-
- 6 edited
-
filecache.cfa (modified) (3 diffs)
-
main.cfa (modified) (8 diffs)
-
options.cfa (modified) (1 diff)
-
protocol.cfa (modified) (11 diffs)
-
protocol.hfa (modified) (2 diffs)
-
worker.cfa (modified) (6 diffs)
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/filecache.cfa
r03ecdcf r77ff383 4 4 #include <string.h> 5 5 6 #include <fstream.hfa> 6 7 #include <stdlib.hfa> 7 8 … … 182 183 conflicts += put_file( raw[i], fd ); 183 184 } 184 printf("Filled cache from path \"%s\" with %zu files\n", path, fcount);185 sout | "Filled cache from path \"" | path | "\" with" | fcount | "files"; 185 186 if( conflicts > 0 ) { 186 printf("Found %d conflicts (seed: %u)\n", conflicts, options.file_cache.hash_seed);187 sout | "Found" | conflicts | "conflicts (seed: " | options.file_cache.hash_seed | ")"; 187 188 #if defined(REJECT_CONFLICTS) 188 189 abort("Conflicts found in the cache"); … … 191 192 192 193 if(options.file_cache.list) { 193 printf("Listing files and exiting\n");194 sout | "Listing files and exiting"; 194 195 for(i; fcount) { 195 196 int s; char u; 196 197 [s, u] = human_size(raw[i].size); 197 printf("%4d%c - %s\n", s, u, raw[i].file);198 sout | s | u | "-" | raw[i].file; 198 199 free(raw[i].file); 199 200 } -
benchmark/io/http/main.cfa
r03ecdcf r77ff383 10 10 } 11 11 12 #include <fstream.hfa> 12 13 #include <kernel.hfa> 14 #include <iofwd.hfa> 13 15 #include <stats.hfa> 14 16 #include <time.hfa> … … 50 52 51 53 //============================================================================================= 54 // Stats Printer 55 //=============================================================================================' 56 57 thread StatsPrinter {}; 58 59 void ?{}( StatsPrinter & this ) { 60 ((thread&)this){ "Stats Printer Thread" }; 61 } 62 63 void main(StatsPrinter & this) { 64 LOOP: for() { 65 waitfor( ^?{} : this) { 66 break LOOP; 67 } 68 or else {} 69 70 sleep(10`s); 71 72 print_stats_now( *options.clopts.instance, CFA_STATS_READY_Q | CFA_STATS_IO ); 73 } 74 } 75 76 //============================================================================================= 52 77 // Main 53 78 //=============================================================================================' … … 59 84 //=================== 60 85 // Open Files 61 printf("Filling cache from %s\n", path);86 sout | "Filling cache from" | path; 62 87 fill_cache( path ); 63 88 64 89 //=================== 65 90 // Open Socket 66 printf("%ld : Listening on port %d\n", getpid(), options.socket.port);91 sout | getpid() | ": Listening on port" | options.socket.port; 67 92 int server_fd = socket(AF_INET, SOCK_STREAM, 0); 68 93 if(server_fd < 0) { … … 84 109 if(errno == EADDRINUSE) { 85 110 if(waited == 0) { 86 printf("Waiting for port\n");111 sout | "Waiting for port"; 87 112 } else { 88 printf("\r%d", waited);89 f flush(stdout);113 sout | "\r" | waited | nonl; 114 flush( sout ); 90 115 } 91 116 waited ++; … … 128 153 { 129 154 ServerProc procs[options.clopts.nprocs]; 155 StatsPrinter printer; 130 156 131 157 init_protocol(); … … 148 174 unpark( workers[i] ); 149 175 } 150 printf("%d workers started on %d processors\n", options.clopts.nworkers, options.clopts.nprocs);176 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors"; 151 177 { 152 178 char buffer[128]; 153 while( !feof(stdin)) {154 fgets(buffer, 128, stdin);179 while(int ret = cfa_read(0, buffer, 128, 0, -1`s, 0p, 0p); ret != 0) { 180 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) ); 155 181 } 156 182 157 printf("Shutting Down\n"); 158 } 159 183 sout | "Shutdown received"; 184 } 185 186 sout | "Notifying connections"; 160 187 for(i; options.clopts.nworkers) { 161 printf("Cancelling %p\n", (void*)workers[i].cancel.target);162 188 workers[i].done = true; 163 189 cancel(workers[i].cancel); 164 190 } 165 191 166 printf("Shutting down socket\n");192 sout | "Shutting down socket"; 167 193 int ret = shutdown( server_fd, SHUT_RD ); 168 194 if( ret < 0 ) { abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); } … … 170 196 //=================== 171 197 // Close Socket 172 printf("Closing Socket\n");198 sout | "Closing Socket"; 173 199 ret = close( server_fd ); 174 200 if(ret < 0) { 175 201 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 176 202 } 203 sout | "Stopping connection threads..." | nonl; 177 204 } 178 printf("Workers Closed\n"); 179 205 sout | "done"; 206 207 sout | "Stopping protocol threads..." | nonl; 180 208 deinit_protocol(); 181 } 182 209 sout | "done"; 210 211 sout | "Stopping processors..." | nonl; 212 } 213 sout | "done"; 214 215 sout | "Closing splice fds..." | nonl; 183 216 for(i; pipe_cnt) { 184 217 ret = close( fds[pipe_off + i] ); … … 188 221 } 189 222 free(fds); 190 191 } 223 sout | "done"; 224 225 sout | "Stopping processors..." | nonl; 226 } 227 sout | "done"; 192 228 193 229 //=================== 194 230 // Close Files 195 printf("Closing Files\n");231 sout | "Closing open files..." | nonl; 196 232 close_cache(); 197 } 233 sout | "done"; 234 } -
benchmark/io/http/options.cfa
r03ecdcf r77ff383 50 50 51 51 static cfa_option opt[] = { 52 { 'p', "port", "Port the server will listen on", options.socket.port},53 { 'c', "cpus", "Number of processors to use", options.clopts.nprocs},54 { 'L', "log", "Enable logs", options.log, parse_settrue},55 {' t', "threads", "Number of worker threads to use", options.clopts.nworkers},56 {' b', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog},57 {' r', "request_len", "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen},58 {' S', "seed", "seed to use for hashing", options.file_cache.hash_seed },59 {' C', "cache-size", "Size of the cache to use, if set to small, will uses closes power of 2", options.file_cache.size },60 {' l', "list-files", "List the files in the specified path and exit", options.file_cache.list, parse_settrue },61 { 's', "submitthread", "If set, cluster uses polling thread to submit I/O", subthrd, parse_settrue },62 { 'e', "eagersubmit", "If set, cluster submits I/O eagerly but still aggregates submits", eagrsub, parse_settrue},63 { 'f', "fixed-fds", "If set, files are open eagerly and pre-registered with the cluster", fixedfd, parse_settrue},64 { 'k', "kpollsubmit", "If set, cluster uses IORING_SETUP_SQPOLL, implies -f", sqkpoll, parse_settrue },65 { 'i', "kpollcomplete", "If set, cluster uses IORING_SETUP_IOPOLL", iokpoll, parse_settrue },66 {' L', "submitlength", "Max number of submitions that can be submitted together", sublen },52 { 'p', "port", "Port the server will listen on", options.socket.port}, 53 { 'c', "cpus", "Number of processors to use", options.clopts.nprocs}, 54 { 't', "threads", "Number of worker threads to use", options.clopts.nworkers}, 55 {'\0', "log", "Enable logs", options.log, parse_settrue}, 56 {'\0', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog}, 57 {'\0', "request_len", "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen}, 58 {'\0', "seed", "seed to use for hashing", options.file_cache.hash_seed }, 59 {'\0', "cache-size", "Size of the cache to use, if set to small, will uses closes power of 2", options.file_cache.size }, 60 {'\0', "list-files", "List the files in the specified path and exit", options.file_cache.list, parse_settrue }, 61 { 's', "submitthread", "If set, cluster uses polling thread to submit I/O", subthrd, parse_settrue }, 62 { 'e', "eagersubmit", "If set, cluster submits I/O eagerly but still aggregates submits", eagrsub, parse_settrue}, 63 { 'f', "fixed-fds", "If set, files are open eagerly and pre-registered with the cluster", fixedfd, parse_settrue}, 64 { 'k', "kpollsubmit", "If set, cluster uses IORING_SETUP_SQPOLL, implies -f", sqkpoll, parse_settrue }, 65 { 'i', "kpollcomplete", "If set, cluster uses IORING_SETUP_IOPOLL", iokpoll, parse_settrue }, 66 {'\0', "submitlength", "Max number of submitions that can be submitted together", sublen }, 67 67 68 68 }; -
benchmark/io/http/protocol.cfa
r03ecdcf r77ff383 5 5 #include <fcntl.h> 6 6 } 7 8 #include <fstream.hfa> 7 9 #include <iofwd.hfa> 8 10 … … 11 13 extern "C" { 12 14 int snprintf ( char * s, size_t n, const char * format, ... ); 13 #include <linux/io_uring.h>15 // #include <linux/io_uring.h> 14 16 } 15 17 #include <string.h> … … 24 26 "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 25 27 "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 28 "HTTP/1.1 408 Request Timeout\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 26 29 "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 27 30 "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", … … 34 37 400, 35 38 404, 39 408, 36 40 413, 37 41 414, … … 49 53 int ret = cfa_write(fd, it, len, 0, -1`s, 0p, 0p); 50 54 // int ret = write(fd, it, len); 51 if( ret < 0 ) { if( errno != EAGAIN && errno != EWOULDBLOCK) abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) ); } 55 if( ret < 0 ) { 56 if( errno == ECONNRESET || errno == EPIPE ) return -ECONNRESET; 57 if( errno == EAGAIN || errno == EWOULDBLOCK) return -EAGAIN; 58 59 abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) ); 60 } 52 61 53 62 // update it/len … … 94 103 if(ret < 0 ) { 95 104 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ; 96 // if( errno == EINVAL ) return [E400, true, 0, 0];105 if( errno == ECONNRESET ) return [E408, true, 0, 0]; 97 106 abort( "read error: (%d) %s\n", (int)errno, strerror(errno) ); 98 107 } … … 108 117 } 109 118 110 if( options.log ) printf("%.*s\n", rlen, buffer); 119 if( options.log ) { 120 write(sout, buffer, rlen); 121 sout | nl; 122 } 111 123 112 124 it = buffer; … … 119 131 } 120 132 121 voidsendfile( int pipe[2], int fd, int ans_fd, size_t count ) {133 int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) { 122 134 unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE; 123 135 off_t offset = 0; … … 128 140 if( ret < 0 ) { 129 141 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1; 142 if( errno == ECONNRESET ) return -ECONNRESET; 143 if( errno == EPIPE ) return -EPIPE; 130 144 abort( "splice [0] error: (%d) %s\n", (int)errno, strerror(errno) ); 131 145 } … … 139 153 if( ret < 0 ) { 140 154 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2; 155 if( errno == ECONNRESET ) return -ECONNRESET; 156 if( errno == EPIPE ) return -EPIPE; 141 157 abort( "splice [1] error: (%d) %s\n", (int)errno, strerror(errno) ); 142 158 } … … 145 161 146 162 } 163 return count; 147 164 } 148 165 -
benchmark/io/http/protocol.hfa
r03ecdcf r77ff383 7 7 E400, 8 8 E404, 9 E408, 9 10 E413, 10 11 E414, … … 21 22 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation *); 22 23 23 voidsendfile( int pipe[2], int fd, int ans_fd, size_t count );24 int sendfile( int pipe[2], int fd, int ans_fd, size_t count ); -
benchmark/io/http/worker.cfa
r03ecdcf r77ff383 6 6 #include <unistd.h> 7 7 8 #include <fstream.hfa> 8 9 #include <iofwd.hfa> 9 10 … … 33 34 CONNECTION: 34 35 for() { 35 if( options.log ) printf("=== Accepting connection ===\n");36 if( options.log ) sout | "=== Accepting connection ==="; 36 37 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, &this.cancel, 0p ); 37 38 // int fd = accept4( this.[sockfd, addr, addrlen, flags] ); 38 39 if(fd < 0) { 39 40 if( errno == ECONNABORTED ) break; 40 if( errno == EINVAL && this.done) break;41 if( this.done && (errno == EINVAL || errno == EBADF) ) break; 41 42 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 42 43 } 43 44 44 if( options.log ) printf("=== New connection %d, waiting for requests ===\n", fd);45 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; 45 46 REQUEST: 46 47 for() { … … 53 54 size_t len = options.socket.buflen; 54 55 char buffer[len]; 55 if( options.log ) printf("=== Reading request ===\n");56 if( options.log ) sout | "=== Reading request ==="; 56 57 [code, closed, file, name_size] = http_read(fd, buffer, len, &this.cancel); 57 58 58 59 // if we are done, break out of the loop 59 if( closed ) { 60 if( options.log ) printf("=== Connection closed ===\n"); 61 close(fd); 62 continue CONNECTION; 63 } 60 if( closed ) break REQUEST; 64 61 65 62 // If this wasn't a request retrun 400 66 63 if( code != OK200 ) { 67 printf("=== Invalid Request : %d ===\n", code_val(code));64 sout | "=== Invalid Request :" | code_val(code) | "==="; 68 65 answer_error(fd, code); 69 66 continue REQUEST; … … 71 68 72 69 if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) { 73 if( options.log ) printf("=== Request for /plaintext ===\n");70 if( options.log ) sout | "=== Request for /plaintext ==="; 74 71 75 72 char text[] = "Hello, World!\n"; 76 73 77 74 // Send the header 78 answer_plain(fd, text, sizeof(text)); 75 int ret = answer_plain(fd, text, sizeof(text)); 76 if( ret == -ECONNRESET ) break REQUEST; 79 77 80 if( options.log ) printf("=== Answer sent ===\n");78 if( options.log ) sout | "=== Answer sent ==="; 81 79 continue REQUEST; 82 80 } 83 81 84 82 if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) { 85 if( options.log ) printf("=== Request for /ping ===\n");83 if( options.log ) sout | "=== Request for /ping ==="; 86 84 87 85 // Send the header 88 answer_empty(fd); 86 int ret = answer_empty(fd); 87 if( ret == -ECONNRESET ) break REQUEST; 89 88 90 if( options.log ) printf("=== Answer sent ===\n");89 if( options.log ) sout | "=== Answer sent ==="; 91 90 continue REQUEST; 92 91 } 93 92 94 if( options.log ) printf("=== Request for file %.*s ===\n", (int)name_size, file); 93 if( options.log ) { 94 sout | "=== Request for file " | nonl; 95 write(sout, file, name_size); 96 sout | " ==="; 97 } 95 98 96 99 // Get the fd from the file cache … … 101 104 // If we can't find the file, return 404 102 105 if( ans_fd < 0 ) { 103 printf("=== File Not Found ===\n"); 106 sout | "=== File Not Found (" | nonl; 107 write(sout, file, name_size); 108 sout | ") ==="; 104 109 answer_error(fd, E404); 105 110 continue REQUEST; … … 107 112 108 113 // Send the header 109 answer_header(fd, count); 114 int ret = answer_header(fd, count); 115 if( ret == -ECONNRESET ) break REQUEST; 110 116 111 117 // Send the desired file 112 sendfile( this.pipe, fd, ans_fd, count); 118 ret = sendfile( this.pipe, fd, ans_fd, count); 119 if( ret == -ECONNRESET ) break REQUEST; 113 120 114 if( options.log ) printf("=== Answer sent ===\n");121 if( options.log ) sout | "=== Answer sent ==="; 115 122 } 123 124 if( options.log ) sout | "=== Connection closed ==="; 125 close(fd); 126 continue CONNECTION; 116 127 } 117 128 }
Note:
See TracChangeset
for help on using the changeset viewer.