Changes in / [dbe2533:d672350]
- Location:
- benchmark/io/http
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/main.cfa
rdbe2533 rd672350 33 33 //=============================================================================================' 34 34 35 thread StatsPrinter {}; 35 thread StatsPrinter { 36 Worker * workers; 37 int worker_cnt; 38 }; 36 39 37 40 void ?{}( StatsPrinter & this, cluster & cl ) { 38 41 ((thread&)this){ "Stats Printer Thread", cl }; 42 this.worker_cnt = 0; 39 43 } 40 44 41 45 void ^?{}( StatsPrinter & mutex this ) {} 46 47 #define eng3(X) (ws(3, 3, unit(eng( X )))) 42 48 43 49 void main(StatsPrinter & this) { … … 51 57 52 58 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO ); 59 if(this.worker_cnt != 0) { 60 uint64_t tries = 0; 61 uint64_t calls = 0; 62 uint64_t header = 0; 63 uint64_t splcin = 0; 64 uint64_t splcot = 0; 65 struct { 66 volatile uint64_t calls; 67 volatile uint64_t bytes; 68 } avgrd[zipf_cnts]; 69 memset(avgrd, 0, sizeof(avgrd)); 70 71 for(i; this.worker_cnt) { 72 tries += this.workers[i].stats.sendfile.tries; 73 calls += this.workers[i].stats.sendfile.calls; 74 header += this.workers[i].stats.sendfile.header; 75 splcin += this.workers[i].stats.sendfile.splcin; 76 splcot += this.workers[i].stats.sendfile.splcot; 77 for(j; zipf_cnts) { 78 avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls; 79 avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes; 80 } 81 } 82 83 double ratio = ((double)tries) / calls; 84 85 sout | "----- Worker Stats -----"; 86 sout | "sendfile : " | calls | "calls," | tries | "tries (" | ratio | " try/call)"; 87 sout | " " | header | "header," | splcin | "splice in," | splcot | "splice out"; 88 sout | " - zipf sizes:"; 89 for(i; zipf_cnts) { 90 double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0; 91 sout | " " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written"; 92 } 93 } 94 else { 95 sout | "No Workers!"; 96 } 53 97 } 54 98 } … … 218 262 { 219 263 Worker * workers = anew(options.clopts.nworkers); 264 cl[0].prnt->workers = workers; 265 cl[0].prnt->worker_cnt = options.clopts.nworkers; 220 266 for(i; options.clopts.nworkers) { 221 267 // if( options.file_cache.fixed_fds ) { … … 311 357 } 312 358 } 359 360 const size_t zipf_sizes[] = { 102, 204, 307, 409, 512, 614, 716, 819, 921, 1024, 2048, 3072, 4096, 5120, 6144, 7168, 8192, 9216, 10240, 20480, 30720, 40960, 51200, 61440, 71680, 81920, 92160, 102400, 204800, 307200, 409600, 512000, 614400, 716800, 819200, 921600 }; 361 static_assert(zipf_cnts == sizeof(zipf_sizes) / sizeof(zipf_sizes[0])); -
benchmark/io/http/parhttperf
rdbe2533 rd672350 6 6 7 7 mkdir -p out 8 rm -v out/* 8 rm out/* 9 echo "httperf --client [0-$(($NTHREADS - 1))]/$NTHREADS $@ > out/result.[0-$(($NTHREADS - 1))].out" 9 10 for ((i=0; i<$NTHREADS; i++)) 10 11 do 11 # echo "httperf --client $i/$NTHREADS $@ > out/result.$i.out"12 12 httperf --client $i/$NTHREADS $@ > out/result.$i.out & 13 13 done -
benchmark/io/http/protocol.cfa
rdbe2533 rd672350 24 24 25 25 #include "options.hfa" 26 #include "worker.hfa" 26 27 27 28 #define PLAINTEXT_1WRITE … … 156 157 157 158 count -= ret; 158 offset += ret;159 159 size_t in_pipe = ret; 160 160 SPLICE2: while(in_pipe > 0) { … … 249 249 } 250 250 251 static inline int wait_and_process(header_g & this ) {251 static inline int wait_and_process(header_g & this, sendfile_stats_t & stats) { 252 252 wait(this.f); 253 253 … … 278 278 } 279 279 280 stats.header++; 281 280 282 // It must be a Short read 281 283 this.len -= this.f.result; … … 289 291 io_future_t f; 290 292 int fd; int pipe; size_t len; off_t off; 293 short zipf_idx; 291 294 FSM_Result res; 292 295 }; … … 297 300 this.len = len; 298 301 this.off = 0; 302 this.zipf_idx = -1; 303 STATS: for(i; zipf_cnts) { 304 if(len <= zipf_sizes[i]) { 305 this.zipf_idx = i; 306 break STATS; 307 } 308 } 309 if(this.zipf_idx < 0) mutex(serr) serr | "SPLICE IN" | len | " greated than biggest zipf file"; 299 310 } 300 311 … … 312 323 } 313 324 314 static inline int wait_and_process(splice_in_t & this ) {325 static inline int wait_and_process(splice_in_t & this, sendfile_stats_t & stats ) { 315 326 wait(this.f); 316 327 … … 328 339 return error(this.res, -ECONNRESET); 329 340 } 341 mutex(serr) serr | "SPLICE IN got" | error | ", WTF!"; 342 return error(this.res, -ECONNRESET); 330 343 } 331 344 … … 340 353 return done(this.res); 341 354 } 355 356 stats.splcin++; 357 stats.avgrd[this.zipf_idx].calls++; 358 stats.avgrd[this.zipf_idx].bytes += this.f.result; 342 359 343 360 // It must be a Short read … … 381 398 } 382 399 383 static inline void wait_and_process(splice_out_g & this ) {400 static inline void wait_and_process(splice_out_g & this, sendfile_stats_t & stats ) { 384 401 wait(this.f); 385 402 … … 397 414 return error(this, -ECONNRESET); 398 415 } 416 mutex(serr) serr | "SPLICE OUT got" | error | ", WTF!"; 417 return error(this, -ECONNRESET); 399 418 } 400 419 … … 411 430 412 431 SHORT_WRITE: 432 stats.splcot++; 433 413 434 // It must be a Short Write 414 435 this.len -= this.f.result; … … 417 438 } 418 439 419 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize ) { 440 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize, sendfile_stats_t & stats ) { 441 stats.calls++; 420 442 #if defined(LINKED_IO) 421 443 char buffer[512]; … … 426 448 427 449 RETRY_LOOP: for() { 450 stats.tries++; 428 451 int have = need(header.res) + need(splice_in.res) + 1; 429 452 int idx = 0; … … 444 467 // we may need to kill the connection if it fails 445 468 // If it already completed, this is a no-op 446 wait_and_process(splice_in );469 wait_and_process(splice_in, stats); 447 470 448 471 if(is_error(splice_in.res)) { … … 452 475 453 476 // Process the other 2 454 wait_and_process(header );455 wait_and_process(splice_out );477 wait_and_process(header, stats); 478 wait_and_process(splice_out, stats); 456 479 457 480 if(is_done(splice_out.res)) { … … 473 496 return len + fsize; 474 497 #else 498 stats.tries++; 475 499 int ret = answer_header(fd, fsize); 476 500 if( ret < 0 ) { close(fd); return ret; } -
benchmark/io/http/protocol.hfa
rdbe2533 rd672350 1 1 #pragma once 2 3 struct sendfile_stats_t; 2 4 3 5 enum HttpCode { … … 18 20 int answer_plaintext( int fd ); 19 21 int answer_empty( int fd ); 20 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t count );22 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t count, struct sendfile_stats_t & ); 21 23 22 24 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len); -
benchmark/io/http/worker.cfa
rdbe2533 rd672350 23 23 this.pipe[1] = -1; 24 24 this.done = false; 25 26 this.stats.sendfile.calls = 0; 27 this.stats.sendfile.tries = 0; 28 this.stats.sendfile.header = 0; 29 this.stats.sendfile.splcin = 0; 30 this.stats.sendfile.splcot = 0; 31 for(i; zipf_cnts) { 32 this.stats.sendfile.avgrd[i].calls = 0; 33 this.stats.sendfile.avgrd[i].bytes = 0; 34 } 25 35 } 26 36 … … 123 133 124 134 // Send the desired file 125 int ret = answer_sendfile( this.pipe, fd, ans_fd, count );135 int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile ); 126 136 if( ret == -ECONNRESET ) break REQUEST; 127 137 -
benchmark/io/http/worker.hfa
rdbe2533 rd672350 11 11 //============================================================================================= 12 12 13 extern const size_t zipf_sizes[]; 14 enum { zipf_cnts = 36, }; 15 16 struct sendfile_stats_t { 17 volatile uint64_t calls; 18 volatile uint64_t tries; 19 volatile uint64_t header; 20 volatile uint64_t splcin; 21 volatile uint64_t splcot; 22 struct { 23 volatile uint64_t calls; 24 volatile uint64_t bytes; 25 } avgrd[zipf_cnts]; 26 }; 27 13 28 thread Worker { 14 29 int pipe[2]; … … 18 33 int flags; 19 34 volatile bool done; 35 struct { 36 sendfile_stats_t sendfile; 37 } stats; 20 38 }; 21 39 void ?{}( Worker & this);
Note: See TracChangeset
for help on using the changeset viewer.