- Timestamp:
- Jun 21, 2022, 1:39:24 PM (4 years ago)
- Branches:
- ADT, ast-experimental, master, pthread-emulation, qualifiedEnum, stuck-waitfor-destruct
- Children:
- b62d1d6
- Parents:
- 1df492a (diff), 1dbbef6 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - Location:
- benchmark/io
- Files:
-
- 5 added
- 8 edited
-
http/Makefile.am (modified) (2 diffs)
-
http/main.cfa (modified) (16 diffs)
-
http/options.cfa (modified) (3 diffs)
-
http/options.hfa (modified) (1 diff)
-
http/printer.cfa (added)
-
http/printer.hfa (added)
-
http/protocol.cfa (modified) (5 diffs)
-
http/protocol.hfa (modified) (2 diffs)
-
http/socket.cfa (added)
-
http/socket.hfa (added)
-
http/worker.cfa (modified) (3 diffs)
-
http/worker.hfa (modified) (3 diffs)
-
sendfile/producer.cfa (added)
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/Makefile.am
r1df492a reb5962a 21 21 include $(top_srcdir)/tools/build/cfa.make 22 22 23 AM_CFLAGS = -O3 -Wall -Wextra -I$(srcdir) -lrt -pthread # -Werror23 AM_CFLAGS = -O3 -Wall -Wextra -I$(srcdir) -lrt -pthread -g # -Werror 24 24 AM_CFAFLAGS = -quiet -nodebug 25 25 AM_LDFLAGS = -quiet -nodebug … … 37 37 options.cfa \ 38 38 options.hfa \ 39 printer.cfa \ 40 printer.hfa \ 39 41 protocol.cfa \ 40 42 protocol.hfa \ 43 socket.cfa \ 44 socket.hfa \ 41 45 worker.cfa \ 42 46 worker.hfa -
benchmark/io/http/main.cfa
r1df492a reb5962a 2 2 3 3 #include <errno.h> 4 #include <signal.h> 4 5 #include <stdio.h> 5 6 #include <string.h> … … 8 9 #include <sched.h> 9 10 #include <signal.h> 11 #include <sys/eventfd.h> 10 12 #include <sys/socket.h> 11 13 #include <netinet/in.h> … … 14 16 #include <fstream.hfa> 15 17 #include <kernel.hfa> 18 #include <locks.hfa> 16 19 #include <iofwd.hfa> 17 20 #include <stats.hfa> … … 21 24 #include "filecache.hfa" 22 25 #include "options.hfa" 26 #include "socket.hfa" 27 #include "printer.hfa" 23 28 #include "worker.hfa" 24 29 … … 30 35 31 36 //============================================================================================= 32 // Stats Printer33 //============================================================================================='34 35 thread StatsPrinter {36 Worker * workers;37 int worker_cnt;38 };39 40 void ?{}( StatsPrinter & this, cluster & cl ) {41 ((thread&)this){ "Stats Printer Thread", cl };42 this.worker_cnt = 0;43 }44 45 void ^?{}( StatsPrinter & mutex this ) {}46 47 #define eng3(X) (ws(3, 3, unit(eng( X ))))48 49 void main(StatsPrinter & this) {50 LOOP: for() {51 waitfor( ^?{} : this) {52 break LOOP;53 }54 or else {}55 56 sleep(10`s);57 58 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );59 if(this.worker_cnt != 0) {60 uint64_t tries = 0;61 uint64_t calls = 0;62 uint64_t header = 0;63 uint64_t splcin = 0;64 uint64_t splcot = 0;65 struct {66 volatile uint64_t calls;67 volatile uint64_t bytes;68 } avgrd[zipf_cnts];69 memset(avgrd, 0, sizeof(avgrd));70 71 for(i; this.worker_cnt) {72 tries += this.workers[i].stats.sendfile.tries;73 calls += this.workers[i].stats.sendfile.calls;74 header += this.workers[i].stats.sendfile.header;75 splcin += this.workers[i].stats.sendfile.splcin;76 splcot += this.workers[i].stats.sendfile.splcot;77 for(j; zipf_cnts) {78 avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls;79 avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes;80 }81 }82 83 double ratio = ((double)tries) / calls;84 85 sout | "----- Worker Stats -----";86 sout | "sendfile : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";87 sout | " " | header | "header," | splcin | "splice in," | splcot | "splice out";88 sout | " - zipf sizes:";89 for(i; zipf_cnts) {90 double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;91 sout | " " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";92 }93 }94 else {95 sout | "No Workers!";96 }97 }98 }99 100 //=============================================================================================101 37 // Globals 102 38 //============================================================================================= 103 struct ServerCluster {104 cluster self;105 processor * procs;106 // io_context * ctxs;107 StatsPrinter * prnt;108 109 };110 111 39 void ?{}( ServerCluster & this ) { 112 40 (this.self){ "Server Cluster", options.clopts.params }; … … 122 50 (this.procs[i]){ "Benchmark Processor", this.self }; 123 51 124 int c = 0;125 int n = 1 + (i % cnt);126 for(int j = 0; j < CPU_SETSIZE; j++) {127 if(CPU_ISSET(j, &fullset)) n--;128 if(n == 0) {129 c = j;130 break;131 }132 }133 cpu_set_t localset;134 CPU_ZERO(&localset);135 CPU_SET(c, &localset);136 ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);137 if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );52 // int c = 0; 53 // int n = 1 + (i % cnt); 54 // for(int j = 0; j < CPU_SETSIZE; j++) { 55 // if(CPU_ISSET(j, &fullset)) n--; 56 // if(n == 0) { 57 // c = j; 58 // break; 59 // } 60 // } 61 // cpu_set_t localset; 62 // CPU_ZERO(&localset); 63 // CPU_SET(c, &localset); 64 // ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset); 65 // if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret ); 138 66 139 67 #if !defined(__CFA_NO_STATISTICS__) … … 147 75 } 148 76 149 if(options.stats) {150 this.prnt = alloc();151 (*this.prnt){ this.self };152 } else {153 this.prnt = 0p;154 }155 156 77 #if !defined(__CFA_NO_STATISTICS__) 157 78 print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO ); … … 163 84 164 85 void ^?{}( ServerCluster & this ) { 165 delete(this.prnt);166 167 86 for(i; options.clopts.nprocs) { 168 87 ^(this.procs[i]){}; … … 175 94 extern void init_protocol(void); 176 95 extern void deinit_protocol(void); 96 97 //============================================================================================= 98 // REUSEPORT 99 //============================================================================================= 100 101 size_t sockarr_size; 102 struct __attribute__((aligned(128))) Q { 103 mpsc_queue(PendingRead) q; 104 }; 105 106 //============================================================================================= 107 // Termination 108 //============================================================================================= 109 110 int closefd; 111 void cleanstop(int) { 112 eventfd_t buffer = 1; 113 char * buffer_s = (char*)&buffer; 114 int ret = write(closefd, buffer_s, sizeof(buffer)); 115 if(ret < 0) abort( "eventfd write error: (%d) %s\n", (int)errno, strerror(errno) ); 116 return; 117 } 177 118 178 119 //============================================================================================= … … 180 121 //=============================================================================================' 181 122 int main( int argc, char * argv[] ) { 123 int ret; 182 124 __sighandler_t s = 1p; 183 125 signal(SIGPIPE, s); … … 186 128 // Parse args 187 129 parse_options(argc, argv); 130 131 //=================== 132 // Setup non-interactive termination 133 if(!options.interactive) { 134 closefd = eventfd(0, 0); 135 if(closefd < 0) abort( "eventfd error: (%d) %s\n", (int)errno, strerror(errno) ); 136 137 sighandler_t prev = signal(SIGTERM, cleanstop); 138 intptr_t prev_workaround = (intptr_t) prev; 139 // can't use SIG_ERR it crashes the compiler 140 if(prev_workaround == -1) abort( "signal setup error: (%d) %s\n", (int)errno, strerror(errno) ); 141 142 sout | "Signal termination ready"; 143 } 188 144 189 145 //=================== … … 197 153 // Open Socket 198 154 sout | getpid() | ": Listening on port" | options.socket.port; 199 int server_fd = socket(AF_INET, SOCK_STREAM, 0); 200 if(server_fd < 0) { 201 abort( "socket error: (%d) %s\n", (int)errno, strerror(errno) ); 202 } 203 204 int ret = 0; 155 205 156 struct sockaddr_in address; 206 int addrlen = sizeof(address); 207 memset( (char *)&address, '\0' ); 208 address.sin_family = AF_INET; 209 address.sin_addr.s_addr = htonl(INADDR_ANY); 210 address.sin_port = htons( options.socket.port ); 211 212 int waited = 0; 213 for() { 214 int sockfd = server_fd; 215 __CONST_SOCKADDR_ARG addr; 216 addr.__sockaddr__ = (struct sockaddr *)&address; 217 socklen_t addrlen = sizeof(address); 218 ret = bind( sockfd, addr, addrlen ); 219 if(ret < 0) { 220 if(errno == EADDRINUSE) { 221 if(waited == 0) { 222 if(!options.interactive) abort | "Port already in use in non-interactive mode. Aborting"; 223 sout | "Waiting for port"; 224 } else { 225 sout | "\r" | waited | nonl; 226 flush( sout ); 227 } 228 waited ++; 229 sleep( 1`s ); 230 continue; 231 } 232 abort( "bind error: (%d) %s\n", (int)errno, strerror(errno) ); 233 } 234 break; 235 } 236 237 ret = listen( server_fd, options.socket.backlog ); 238 if(ret < 0) { 239 abort( "listen error: (%d) %s\n", (int)errno, strerror(errno) ); 240 } 157 int addrlen = prepaddr(address); 158 159 int server_fd; 241 160 242 161 //=================== … … 257 176 258 177 { 178 // Stats printer makes a copy so this needs to persist longer than normal 179 connection ** conns; 180 AcceptWorker * aworkers = 0p; 181 ChannelWorker * cworkers = 0p; 182 Acceptor * acceptors = 0p; 183 Q * queues = 0p; 259 184 ServerCluster cl[options.clopts.nclusters]; 185 186 if(options.stats) { 187 stats_thrd = alloc(); 188 (*stats_thrd){ cl }; 189 } else { 190 stats_thrd = 0p; 191 } 260 192 261 193 init_protocol(); 262 194 { 263 Worker * workers = anew(options.clopts.nworkers); 264 cl[0].prnt->workers = workers; 265 cl[0].prnt->worker_cnt = options.clopts.nworkers; 266 for(i; options.clopts.nworkers) { 267 // if( options.file_cache.fixed_fds ) { 268 // workers[i].pipe[0] = pipe_off + (i * 2) + 0; 269 // workers[i].pipe[1] = pipe_off + (i * 2) + 1; 270 // } 271 // else 272 { 273 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0]; 274 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1]; 275 workers[i].sockfd = server_fd; 276 workers[i].addr = (struct sockaddr *)&address; 277 workers[i].addrlen = (socklen_t*)&addrlen; 278 workers[i].flags = 0; 279 } 280 unpark( workers[i] ); 195 int nacceptors = options.clopts.nprocs * options.clopts.nclusters; 196 conns = alloc(options.clopts.nworkers); 197 if(options.socket.reuseport) { 198 queues = alloc(nacceptors); 199 acceptors = alloc(nacceptors); 200 sout | "Creating" | nacceptors | "Acceptors"; 201 for(i; nacceptors) { 202 (acceptors[i]){ i % options.clopts.nclusters }; 203 } 204 for(i; nacceptors) { 205 (queues[i]){}; 206 { 207 acceptors[i].sockfd = listener(address, addrlen); 208 acceptors[i].addr = (struct sockaddr *)&address; 209 acceptors[i].addrlen = (socklen_t*)&addrlen; 210 acceptors[i].flags = 0; 211 acceptors[i].queue = &queues[i].q; 212 } 213 unpark( acceptors[i] ); 214 } 215 216 cworkers = anew(options.clopts.nworkers); 217 for(i; options.clopts.nworkers) { 218 { 219 cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0]; 220 cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1]; 221 cworkers[i].queue = &queues[i % nacceptors].q; 222 conns[i] = &cworkers[i].conn; 223 } 224 unpark( cworkers[i] ); 225 } 226 } 227 else { 228 server_fd = listener(address, addrlen); 229 aworkers = anew(options.clopts.nworkers); 230 for(i; options.clopts.nworkers) { 231 // if( options.file_cache.fixed_fds ) { 232 // workers[i].pipe[0] = pipe_off + (i * 2) + 0; 233 // workers[i].pipe[1] = pipe_off + (i * 2) + 1; 234 // } 235 // else 236 { 237 aworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0]; 238 aworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1]; 239 aworkers[i].sockfd = server_fd; 240 aworkers[i].addr = (struct sockaddr *)&address; 241 aworkers[i].addrlen = (socklen_t*)&addrlen; 242 aworkers[i].flags = 0; 243 conns[i] = &aworkers[i].conn; 244 } 245 unpark( aworkers[i] ); 246 } 281 247 } 282 248 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters"; … … 285 251 } 286 252 sout | nl; 287 if(!options.interactive) park();288 253 { 289 char buffer[128]; 290 for() { 291 int ret = cfa_read(0, buffer, 128, 0); 292 if(ret == 0) break; 254 if(options.interactive) { 255 char buffer[128]; 256 for() { 257 int ret = cfa_read(0, buffer, 128, 0); 258 if(ret == 0) break; 259 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) ); 260 sout | "User wrote '" | "" | nonl; 261 write(sout, buffer, ret - 1); 262 sout | "'"; 263 } 264 } 265 else { 266 char buffer[sizeof(eventfd_t)]; 267 int ret = cfa_read(closefd, buffer, sizeof(eventfd_t), 0); 293 268 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) ); 294 sout | "User wrote '" | "" | nonl;295 write(sout, buffer, ret - 1);296 sout | "'";297 269 } 298 270 … … 300 272 } 301 273 302 sout | "Notifying connections..." | nonl; flush( sout );303 for(i; options.clopts.nworkers) {304 workers[i].done = true;305 }306 sout | "done";307 308 sout | "Shutting down socket..." | nonl; flush( sout );309 int ret = shutdown( server_fd, SHUT_RD );310 if( ret < 0 ) {311 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );312 }313 sout | "done";314 315 274 //=================== 316 // Close Socket 317 sout | "Closing Socket..." | nonl; flush( sout ); 318 ret = close( server_fd ); 319 if(ret < 0) { 320 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 321 } 322 sout | "done"; 323 324 sout | "Stopping connection threads..." | nonl; flush( sout ); 325 adelete(workers); 275 // Close Socket and join 276 if(options.socket.reuseport) { 277 sout | "Notifying connections..." | nonl; flush( sout ); 278 for(i; nacceptors) { 279 acceptors[i].done = true; 280 } 281 for(i; options.clopts.nworkers) { 282 cworkers[i].done = true; 283 } 284 sout | "done"; 285 286 sout | "Shutting down Socket..." | nonl; flush( sout ); 287 for(i; nacceptors) { 288 ret = shutdown( acceptors[i].sockfd, SHUT_RD ); 289 if( ret < 0 ) { 290 abort( "shutdown1 error: (%d) %s\n", (int)errno, strerror(errno) ); 291 } 292 } 293 sout | "done"; 294 295 sout | "Closing Socket..." | nonl; flush( sout ); 296 for(i; nacceptors) { 297 ret = close( acceptors[i].sockfd ); 298 if( ret < 0) { 299 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 300 } 301 } 302 sout | "done"; 303 304 sout | "Stopping accept threads..." | nonl; flush( sout ); 305 for(i; nacceptors) { 306 join(acceptors[i]); 307 } 308 sout | "done"; 309 310 sout | "Draining worker queues..." | nonl; flush( sout ); 311 for(i; nacceptors) { 312 PendingRead * p = 0p; 313 while(p = pop(queues[i].q)) { 314 fulfil(p->f, -ECONNRESET); 315 } 316 } 317 sout | "done"; 318 319 sout | "Stopping worker threads..." | nonl; flush( sout ); 320 for(i; options.clopts.nworkers) { 321 for(j; 2) { 322 ret = close(cworkers[i].conn.pipe[j]); 323 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) ); 324 } 325 join(cworkers[i]); 326 } 327 } 328 else { 329 sout | "Notifying connections..." | nonl; flush( sout ); 330 for(i; options.clopts.nworkers) { 331 aworkers[i].done = true; 332 } 333 sout | "done"; 334 335 sout | "Shutting down Socket..." | nonl; flush( sout ); 336 ret = shutdown( server_fd, SHUT_RD ); 337 if( ret < 0 ) { 338 abort( "shutdown2 error: (%d) %s\n", (int)errno, strerror(errno) ); 339 } 340 sout | "done"; 341 342 sout | "Closing Socket..." | nonl; flush( sout ); 343 ret = close( server_fd ); 344 if(ret < 0) { 345 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 346 } 347 sout | "done"; 348 349 sout | "Stopping connection threads..." | nonl; flush( sout ); 350 for(i; options.clopts.nworkers) { 351 for(j; 2) { 352 ret = close(aworkers[i].conn.pipe[j]); 353 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) ); 354 } 355 join(aworkers[i]); 356 } 357 } 326 358 } 327 359 sout | "done"; … … 331 363 sout | "done"; 332 364 365 sout | "Stopping printer threads..." | nonl; flush( sout ); 366 if(stats_thrd) { 367 notify_one(stats_thrd->var); 368 } 369 delete(stats_thrd); 370 sout | "done"; 371 372 // Now that the stats printer is stopped, we can reclaim this 373 adelete(aworkers); 374 adelete(cworkers); 375 adelete(acceptors); 376 adelete(queues); 377 free(conns); 378 333 379 sout | "Stopping processors/clusters..." | nonl; flush( sout ); 334 380 } 335 381 sout | "done"; 336 382 337 sout | "Closing splice fds..." | nonl; flush( sout );338 for(i; pipe_cnt) {339 ret = close( fds[pipe_off + i] );340 if(ret < 0) {341 abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );342 }343 }344 383 free(fds); 345 sout | "done";346 384 347 385 sout | "Stopping processors..." | nonl; flush( sout ); -
benchmark/io/http/options.cfa
r1df492a reb5962a 35 35 36 36 { // socket 37 8080, // port 38 10, // backlog 39 1024 // buflen 37 8080, // port 38 10, // backlog 39 1024, // buflen 40 false // reuseport 40 41 }, 41 42 … … 52 53 53 54 void parse_options( int argc, char * argv[] ) { 54 // bool fixedfd = false;55 // bool sqkpoll = false;56 // bool iokpoll = false;57 55 unsigned nentries = 0; 58 56 bool isolate = false; … … 70 68 {'\0', "shell", "Disable interactive mode", options.interactive, parse_setfalse}, 71 69 {'\0', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog}, 70 {'\0', "reuseport", "Use acceptor threads with reuse port SO_REUSEPORT", options.socket.reuseport, parse_settrue}, 72 71 {'\0', "request_len", "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen}, 73 72 {'\0', "seed", "seed to use for hashing", options.file_cache.hash_seed }, -
benchmark/io/http/options.hfa
r1df492a reb5962a 27 27 int backlog; 28 28 int buflen; 29 bool reuseport; 29 30 } socket; 30 31 -
benchmark/io/http/protocol.cfa
r1df492a reb5962a 30 30 #define PLAINTEXT_NOCOPY 31 31 #define LINKED_IO 32 33 static inline __s32 wait_res( io_future_t & this ) { 34 wait( this ); 35 if( this.result < 0 ) {{ 36 errno = -this.result; 37 return -1; 38 }} 39 return this.result; 40 } 32 41 33 42 struct https_msg_str { … … 470 479 471 480 if(is_error(splice_in.res)) { 481 if(splice_in.res.error == -EPIPE) return -ECONNRESET; 472 482 mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error; 473 483 close(fd); … … 503 513 } 504 514 505 [HttpCode code, bool closed, * const char file, size_t len] http_read( int fd, []char buffer, size_t len) {515 [HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f) { 506 516 char * it = buffer; 507 517 size_t count = len - 1; … … 509 519 READ: 510 520 for() { 511 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY); 521 int ret; 522 if( f ) { 523 ret = wait_res(*f); 524 reset(*f); 525 f = 0p; 526 } else { 527 ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY); 528 } 512 529 // int ret = read(fd, (void*)it, count); 513 530 if(ret == 0 ) return [OK200, true, 0, 0]; … … 570 587 571 588 void ?{}( DateFormater & this ) { 572 ((thread&)this){ "Server Date Thread" , *options.clopts.instance[0]};589 ((thread&)this){ "Server Date Thread" }; 573 590 this.idx = 0; 574 591 memset( &this.buffers[0], 0, sizeof(this.buffers[0]) ); -
benchmark/io/http/protocol.hfa
r1df492a reb5962a 1 1 #pragma once 2 2 3 struct io_future_t; 3 4 struct sendfile_stats_t; 4 5 … … 22 23 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t count, struct sendfile_stats_t & ); 23 24 24 [HttpCode code, bool closed, * const char file, size_t len] http_read( int fd, []char buffer, size_t len);25 [HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f); -
benchmark/io/http/worker.cfa
r1df492a reb5962a 8 8 #include <fstream.hfa> 9 9 #include <iofwd.hfa> 10 #include <mutex_stmt.hfa> 10 11 11 12 #include "options.hfa" … … 14 15 15 16 //============================================================================================= 16 // Worker Thread 17 //============================================================================================= 18 void ?{}( Worker & this ) { 17 // Generic connection handling 18 //============================================================================================= 19 static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f, unsigned long long & last ) { 20 REQUEST: 21 for() { 22 bool closed; 23 HttpCode code; 24 const char * file; 25 size_t name_size; 26 27 // Read the http request 28 if( options.log ) mutex(sout) sout | "=== Reading request ==="; 29 [code, closed, file, name_size] = http_read(fd, buffer, len, f); 30 f = 0p; 31 32 // if we are done, break out of the loop 33 if( closed ) break REQUEST; 34 35 // If this wasn't a request retrun 400 36 if( code != OK200 ) { 37 sout | "=== Invalid Request :" | code_val(code) | "==="; 38 answer_error(fd, code); 39 continue REQUEST; 40 } 41 42 if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) { 43 if( options.log ) mutex(sout) sout | "=== Request for /plaintext ==="; 44 45 int ret = answer_plaintext(fd); 46 if( ret == -ECONNRESET ) break REQUEST; 47 48 if( options.log ) mutex(sout) sout | "=== Answer sent ==="; 49 continue REQUEST; 50 } 51 52 if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) { 53 if( options.log ) mutex(sout) sout | "=== Request for /ping ==="; 54 55 // Send the header 56 int ret = answer_empty(fd); 57 if( ret == -ECONNRESET ) break REQUEST; 58 59 if( options.log ) mutex(sout) sout | "=== Answer sent ==="; 60 continue REQUEST; 61 } 62 63 if( options.log ) { 64 sout | "=== Request for file " | nonl; 65 write(sout, file, name_size); 66 sout | " ==="; 67 } 68 69 if( !options.file_cache.path ) { 70 if( options.log ) { 71 sout | "=== File Not Found (" | nonl; 72 write(sout, file, name_size); 73 sout | ") ==="; 74 } 75 answer_error(fd, E405); 76 continue REQUEST; 77 } 78 79 // Get the fd from the file cache 80 int ans_fd; 81 size_t count; 82 [ans_fd, count] = get_file( file, name_size ); 83 84 // If we can't find the file, return 404 85 if( ans_fd < 0 ) { 86 if( options.log ) { 87 sout | "=== File Not Found (" | nonl; 88 write(sout, file, name_size); 89 sout | ") ==="; 90 } 91 answer_error(fd, E404); 92 continue REQUEST; 93 } 94 95 // Send the desired file 96 int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile ); 97 if( ret == -ECONNRESET ) break REQUEST; 98 99 if( options.log ) mutex(sout) sout | "=== Answer sent ==="; 100 } 101 102 if (stats_thrd) { 103 unsigned long long next = rdtscl(); 104 if(next > (last + 500000000)) { 105 if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) { 106 push(this.stats.sendfile, stats_thrd->stats.send); 107 unlock(stats_thrd->stats.lock); 108 last = next; 109 } 110 } 111 } 112 } 113 114 //============================================================================================= 115 // Self Accepting Worker Thread 116 //============================================================================================= 117 void ?{}( AcceptWorker & this ) { 19 118 size_t cli = rand() % options.clopts.cltr_cnt; 20 119 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 }; 21 120 options.clopts.thrd_cnt[cli]++; 22 this.pipe[0] = -1;23 this.pipe[1] = -1;24 121 this.done = false; 25 26 this.stats.sendfile.calls = 0; 27 this.stats.sendfile.tries = 0; 28 this.stats.sendfile.header = 0; 29 this.stats.sendfile.splcin = 0; 30 this.stats.sendfile.splcot = 0; 31 for(i; zipf_cnts) { 32 this.stats.sendfile.avgrd[i].calls = 0; 33 this.stats.sendfile.avgrd[i].bytes = 0; 34 } 35 } 36 37 extern "C" { 38 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); 39 } 40 41 void main( Worker & this ) { 122 } 123 124 void main( AcceptWorker & this ) { 42 125 park(); 43 /* paranoid */ assert( this.pipe[0] != -1 ); 44 /* paranoid */ assert( this.pipe[1] != -1 ); 45 46 CONNECTION: 47 for() { 48 if( options.log ) sout | "=== Accepting connection ==="; 49 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], CFA_IO_LAZY ); 126 unsigned long long last = rdtscl(); 127 /* paranoid */ assert( this.conn.pipe[0] != -1 ); 128 /* paranoid */ assert( this.conn.pipe[1] != -1 ); 129 for() { 130 if( options.log ) mutex(sout) sout | "=== Accepting connection ==="; 131 int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY ); 50 132 if(fd < 0) { 51 133 if( errno == ECONNABORTED ) break; … … 55 137 if(this.done) break; 56 138 139 if( options.log ) mutex(sout) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; 140 size_t len = options.socket.buflen; 141 char buffer[len]; 142 handle_connection( this.conn, fd, buffer, len, 0p, last ); 143 144 if( options.log ) mutex(sout) sout | "=== Connection closed ==="; 145 } 146 } 147 148 149 //============================================================================================= 150 // Channel Worker Thread 151 //============================================================================================= 152 void ?{}( ChannelWorker & this ) { 153 size_t cli = rand() % options.clopts.cltr_cnt; 154 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 }; 155 options.clopts.thrd_cnt[cli]++; 156 this.done = false; 157 } 158 159 void main( ChannelWorker & this ) { 160 park(); 161 unsigned long long last = rdtscl(); 162 /* paranoid */ assert( this.conn.pipe[0] != -1 ); 163 /* paranoid */ assert( this.conn.pipe[1] != -1 ); 164 for() { 165 size_t len = options.socket.buflen; 166 char buffer[len]; 167 PendingRead p; 168 p.next = 0p; 169 p.in.buf = (void*)buffer; 170 p.in.len = len; 171 push(*this.queue, &p); 172 173 if( options.log ) mutex(sout) sout | "=== Waiting new connection ==="; 174 handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last ); 175 176 if( options.log ) mutex(sout) sout | "=== Connection closed ==="; 177 if(this.done) break; 178 } 179 } 180 181 extern "C" { 182 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); 183 } 184 185 void ?{}( Acceptor & this, int cli ) { 186 ((thread&)this){ "Server Acceptor Thread", *options.clopts.instance[cli], 64000 }; 187 options.clopts.thrd_cnt[cli]++; 188 this.done = false; 189 } 190 191 static inline __s32 get_res( io_future_t & this ) { 192 if( this.result < 0 ) {{ 193 errno = -this.result; 194 return -1; 195 }} 196 return this.result; 197 } 198 199 static inline void push_connection( Acceptor & this, int fd ) { 200 PendingRead * p = 0p; 201 for() { 202 if(this.done) return; 203 p = pop(*this.queue); 204 if(p) break; 205 yield(); 206 this.stats.creates++; 207 }; 208 209 p->out.fd = fd; 210 async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY); 211 } 212 213 // #define ACCEPT_SPIN 214 #define ACCEPT_MANY 215 216 void main( Acceptor & this ) { 217 park(); 218 unsigned long long last = rdtscl(); 219 220 #if defined(ACCEPT_SPIN) 221 if( options.log ) sout | "=== Accepting connection ==="; 222 for() { 223 int fd = accept4(this.sockfd, this.[addr, addrlen, flags]); 224 if(fd < 0) { 225 if( errno == EWOULDBLOCK) { 226 this.stats.eagains++; 227 yield(); 228 continue; 229 } 230 if( errno == ECONNABORTED ) break; 231 if( this.done && (errno == EINVAL || errno == EBADF) ) break; 232 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 233 } 234 this.stats.accepts++; 235 236 if(this.done) return; 237 57 238 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; 58 REQUEST: 59 for() { 60 bool closed; 61 HttpCode code; 62 const char * file; 63 size_t name_size; 64 65 // Read the http request 66 size_t len = options.socket.buflen; 67 char buffer[len]; 68 if( options.log ) sout | "=== Reading request ==="; 69 [code, closed, file, name_size] = http_read(fd, buffer, len); 70 71 // if we are done, break out of the loop 72 if( closed ) break REQUEST; 73 74 // If this wasn't a request retrun 400 75 if( code != OK200 ) { 76 sout | "=== Invalid Request :" | code_val(code) | "==="; 77 answer_error(fd, code); 78 continue REQUEST; 79 } 80 81 if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) { 82 if( options.log ) sout | "=== Request for /plaintext ==="; 83 84 int ret = answer_plaintext(fd); 85 if( ret == -ECONNRESET ) break REQUEST; 86 87 if( options.log ) sout | "=== Answer sent ==="; 88 continue REQUEST; 89 } 90 91 if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) { 92 if( options.log ) sout | "=== Request for /ping ==="; 93 94 // Send the header 95 int ret = answer_empty(fd); 96 if( ret == -ECONNRESET ) break REQUEST; 97 98 if( options.log ) sout | "=== Answer sent ==="; 99 continue REQUEST; 100 } 101 102 if( options.log ) { 103 sout | "=== Request for file " | nonl; 104 write(sout, file, name_size); 105 sout | " ==="; 106 } 107 108 if( !options.file_cache.path ) { 109 if( options.log ) { 110 sout | "=== File Not Found (" | nonl; 111 write(sout, file, name_size); 112 sout | ") ==="; 239 240 if(fd) push_connection(this, fd); 241 242 if (stats_thrd) { 243 unsigned long long next = rdtscl(); 244 if(next > (last + 500000000)) { 245 if(try_lock(stats_thrd->stats.lock)) { 246 push(this.stats, stats_thrd->stats.accpt); 247 unlock(stats_thrd->stats.lock); 248 last = next; 113 249 } 114 answer_error(fd, E405); 115 continue REQUEST; 116 } 117 118 // Get the fd from the file cache 119 int ans_fd; 120 size_t count; 121 [ans_fd, count] = get_file( file, name_size ); 122 123 // If we can't find the file, return 404 124 if( ans_fd < 0 ) { 125 if( options.log ) { 126 sout | "=== File Not Found (" | nonl; 127 write(sout, file, name_size); 128 sout | ") ==="; 250 } 251 } 252 253 if( options.log ) sout | "=== Accepting connection ==="; 254 } 255 256 #elif defined(ACCEPT_MANY) 257 const int nacc = 10; 258 io_future_t results[nacc]; 259 260 for(i; nacc) { 261 io_future_t & res = results[i]; 262 reset(res); 263 /* paranoid */ assert(!available(res)); 264 if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ==="; 265 async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY); 266 } 267 268 for() { 269 if (stats_thrd) { 270 unsigned long long next = rdtscl(); 271 if(next > (last + 500000000)) { 272 if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) { 273 push(this.stats, stats_thrd->stats.accpt); 274 unlock(stats_thrd->stats.lock); 275 last = next; 129 276 } 130 answer_error(fd, E404); 131 continue REQUEST; 132 } 133 134 // Send the desired file 135 int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile ); 136 if( ret == -ECONNRESET ) break REQUEST; 137 138 if( options.log ) sout | "=== Answer sent ==="; 139 } 140 141 if( options.log ) sout | "=== Connection closed ==="; 142 continue CONNECTION; 143 } 144 } 277 } 278 } 279 280 for(i; nacc) { 281 io_future_t & res = results[i]; 282 if(available(res)) { 283 if( options.log ) mutex(sout) sout | "=== Accept no " | i | "completed with result" | res.result | "==="; 284 int fd = get_res(res); 285 reset(res); 286 this.stats.accepts++; 287 if(fd < 0) { 288 if( errno == ECONNABORTED ) continue; 289 if( this.done && (errno == EINVAL || errno == EBADF) ) continue; 290 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 291 } 292 push_connection( this, fd ); 293 294 /* paranoid */ assert(!available(res)); 295 if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ==="; 296 async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY); 297 } 298 } 299 if(this.done) return; 300 301 if( options.log ) mutex(sout) sout | "=== Waiting for any accept ==="; 302 this.stats.eagains++; 303 wait_any(results, nacc); 304 305 if( options.log ) mutex(sout) { 306 sout | "=== Acceptor wake-up ==="; 307 for(i; nacc) { 308 io_future_t & res = results[i]; 309 sout | i | "available:" | available(res); 310 } 311 } 312 313 } 314 315 for(i; nacc) { 316 wait(results[i]); 317 } 318 #else 319 #error no accept algorithm specified 320 #endif 321 } -
benchmark/io/http/worker.hfa
r1df492a reb5962a 1 1 #pragma once 2 2 3 #include <iofwd.hfa> 4 #include <queueLockFree.hfa> 3 5 #include <thread.hfa> 4 6 … … 7 9 } 8 10 11 #include "printer.hfa" 12 9 13 //============================================================================================= 10 14 // Worker Thread 11 15 //============================================================================================= 12 16 13 extern const size_t zipf_sizes[]; 14 enum { zipf_cnts = 36, }; 15 16 struct sendfile_stats_t { 17 volatile uint64_t calls; 18 volatile uint64_t tries; 19 volatile uint64_t header; 20 volatile uint64_t splcin; 21 volatile uint64_t splcot; 17 struct connection { 18 int pipe[2]; 22 19 struct { 23 volatile uint64_t calls; 24 volatile uint64_t bytes; 25 } avgrd[zipf_cnts]; 20 sendfile_stats_t sendfile; 21 } stats; 26 22 }; 27 23 28 thread Worker { 29 int pipe[2]; 24 static inline void ?{}( connection & this ) { 25 this.pipe[0] = -1; 26 this.pipe[1] = -1; 27 } 28 29 thread AcceptWorker { 30 connection conn; 30 31 int sockfd; 31 32 struct sockaddr * addr; … … 33 34 int flags; 34 35 volatile bool done; 36 }; 37 void ?{}( AcceptWorker & this); 38 void main( AcceptWorker & ); 39 40 41 struct PendingRead { 42 PendingRead * volatile next; 43 io_future_t f; 35 44 struct { 36 sendfile_stats_t sendfile; 37 } stats; 45 void * buf; 46 size_t len; 47 } in; 48 struct { 49 volatile int fd; 50 } out; 38 51 }; 39 void ?{}( Worker & this); 40 void main( Worker & ); 52 53 static inline PendingRead * volatile & ?`next ( PendingRead * node ) { 54 return node->next; 55 } 56 57 thread ChannelWorker { 58 connection conn; 59 volatile bool done; 60 mpsc_queue(PendingRead) * queue; 61 }; 62 void ?{}( ChannelWorker & ); 63 void main( ChannelWorker & ); 64 65 thread Acceptor { 66 mpsc_queue(PendingRead) * queue; 67 int sockfd; 68 struct sockaddr * addr; 69 socklen_t * addrlen; 70 int flags; 71 volatile bool done; 72 acceptor_stats_t stats; 73 }; 74 void ?{}( Acceptor &, int cli ); 75 void main( Acceptor & );
Note:
See TracChangeset
for help on using the changeset viewer.