Changes in / [d672350:dbe2533]
- Location:
- benchmark/io/http
- Files:
-
- 6 edited
-
main.cfa (modified) (4 diffs)
-
parhttperf (modified) (1 diff)
-
protocol.cfa (modified) (17 diffs)
-
protocol.hfa (modified) (2 diffs)
-
worker.cfa (modified) (2 diffs)
-
worker.hfa (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/main.cfa
rd672350 rdbe2533 33 33 //=============================================================================================' 34 34 35 thread StatsPrinter { 36 Worker * workers; 37 int worker_cnt; 38 }; 35 thread StatsPrinter {}; 39 36 40 37 void ?{}( StatsPrinter & this, cluster & cl ) { 41 38 ((thread&)this){ "Stats Printer Thread", cl }; 42 this.worker_cnt = 0;43 39 } 44 40 45 41 void ^?{}( StatsPrinter & mutex this ) {} 46 47 #define eng3(X) (ws(3, 3, unit(eng( X ))))48 42 49 43 void main(StatsPrinter & this) { … … 57 51 58 52 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO ); 59 if(this.worker_cnt != 0) {60 uint64_t tries = 0;61 uint64_t calls = 0;62 uint64_t header = 0;63 uint64_t splcin = 0;64 uint64_t splcot = 0;65 struct {66 volatile uint64_t calls;67 volatile uint64_t bytes;68 } avgrd[zipf_cnts];69 memset(avgrd, 0, sizeof(avgrd));70 71 for(i; this.worker_cnt) {72 tries += this.workers[i].stats.sendfile.tries;73 calls += this.workers[i].stats.sendfile.calls;74 header += this.workers[i].stats.sendfile.header;75 splcin += this.workers[i].stats.sendfile.splcin;76 splcot += this.workers[i].stats.sendfile.splcot;77 for(j; zipf_cnts) {78 avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls;79 avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes;80 }81 }82 83 double ratio = ((double)tries) / calls;84 85 sout | "----- Worker Stats -----";86 sout | "sendfile : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";87 sout | " " | header | "header," | splcin | "splice in," | splcot | "splice out";88 sout | " - zipf sizes:";89 for(i; zipf_cnts) {90 double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;91 sout | " " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";92 }93 }94 else {95 sout | "No Workers!";96 }97 53 } 98 54 } … … 262 218 { 263 219 Worker * workers = anew(options.clopts.nworkers); 264 cl[0].prnt->workers = workers;265 cl[0].prnt->worker_cnt = options.clopts.nworkers;266 220 for(i; options.clopts.nworkers) { 267 221 // if( options.file_cache.fixed_fds ) { … … 357 311 } 358 312 } 359 360 const size_t zipf_sizes[] = { 102, 204, 307, 409, 512, 614, 716, 819, 921, 1024, 2048, 3072, 4096, 5120, 6144, 7168, 8192, 9216, 10240, 20480, 30720, 40960, 51200, 61440, 71680, 81920, 92160, 102400, 204800, 307200, 409600, 512000, 614400, 716800, 819200, 921600 };361 static_assert(zipf_cnts == sizeof(zipf_sizes) / sizeof(zipf_sizes[0])); -
benchmark/io/http/parhttperf
rd672350 rdbe2533 6 6 7 7 mkdir -p out 8 rm out/* 9 echo "httperf --client [0-$(($NTHREADS - 1))]/$NTHREADS $@ > out/result.[0-$(($NTHREADS - 1))].out" 8 rm -v out/* 10 9 for ((i=0; i<$NTHREADS; i++)) 11 10 do 11 # echo "httperf --client $i/$NTHREADS $@ > out/result.$i.out" 12 12 httperf --client $i/$NTHREADS $@ > out/result.$i.out & 13 13 done -
benchmark/io/http/protocol.cfa
rd672350 rdbe2533 24 24 25 25 #include "options.hfa" 26 #include "worker.hfa"27 26 28 27 #define PLAINTEXT_1WRITE … … 157 156 158 157 count -= ret; 158 offset += ret; 159 159 size_t in_pipe = ret; 160 160 SPLICE2: while(in_pipe > 0) { … … 249 249 } 250 250 251 static inline int wait_and_process(header_g & this , sendfile_stats_t & stats) {251 static inline int wait_and_process(header_g & this) { 252 252 wait(this.f); 253 253 … … 278 278 } 279 279 280 stats.header++;281 282 280 // It must be a Short read 283 281 this.len -= this.f.result; … … 291 289 io_future_t f; 292 290 int fd; int pipe; size_t len; off_t off; 293 short zipf_idx;294 291 FSM_Result res; 295 292 }; … … 300 297 this.len = len; 301 298 this.off = 0; 302 this.zipf_idx = -1;303 STATS: for(i; zipf_cnts) {304 if(len <= zipf_sizes[i]) {305 this.zipf_idx = i;306 break STATS;307 }308 }309 if(this.zipf_idx < 0) mutex(serr) serr | "SPLICE IN" | len | " greated than biggest zipf file";310 299 } 311 300 … … 323 312 } 324 313 325 static inline int wait_and_process(splice_in_t & this , sendfile_stats_t & stats) {314 static inline int wait_and_process(splice_in_t & this) { 326 315 wait(this.f); 327 316 … … 339 328 return error(this.res, -ECONNRESET); 340 329 } 341 mutex(serr) serr | "SPLICE IN got" | error | ", WTF!";342 return error(this.res, -ECONNRESET);343 330 } 344 331 … … 353 340 return done(this.res); 354 341 } 355 356 stats.splcin++;357 stats.avgrd[this.zipf_idx].calls++;358 stats.avgrd[this.zipf_idx].bytes += this.f.result;359 342 360 343 // It must be a Short read … … 398 381 } 399 382 400 static inline void wait_and_process(splice_out_g & this , sendfile_stats_t & stats) {383 static inline void wait_and_process(splice_out_g & this) { 401 384 wait(this.f); 402 385 … … 414 397 return error(this, -ECONNRESET); 415 398 } 416 mutex(serr) serr | "SPLICE OUT got" | error | ", WTF!";417 return error(this, -ECONNRESET);418 399 } 419 400 … … 430 411 431 412 SHORT_WRITE: 432 stats.splcot++;433 434 413 // It must be a Short Write 435 414 this.len -= this.f.result; … … 438 417 } 439 418 440 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize, sendfile_stats_t & stats ) { 441 stats.calls++; 419 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize ) { 442 420 #if defined(LINKED_IO) 443 421 char buffer[512]; … … 448 426 449 427 RETRY_LOOP: for() { 450 stats.tries++;451 428 int have = need(header.res) + need(splice_in.res) + 1; 452 429 int idx = 0; … … 467 444 // we may need to kill the connection if it fails 468 445 // If it already completed, this is a no-op 469 wait_and_process(splice_in , stats);446 wait_and_process(splice_in); 470 447 471 448 if(is_error(splice_in.res)) { … … 475 452 476 453 // Process the other 2 477 wait_and_process(header , stats);478 wait_and_process(splice_out , stats);454 wait_and_process(header); 455 wait_and_process(splice_out); 479 456 480 457 if(is_done(splice_out.res)) { … … 496 473 return len + fsize; 497 474 #else 498 stats.tries++;499 475 int ret = answer_header(fd, fsize); 500 476 if( ret < 0 ) { close(fd); return ret; } -
benchmark/io/http/protocol.hfa
rd672350 rdbe2533 1 1 #pragma once 2 3 struct sendfile_stats_t;4 2 5 3 enum HttpCode { … … 20 18 int answer_plaintext( int fd ); 21 19 int answer_empty( int fd ); 22 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t count , struct sendfile_stats_t &);20 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t count ); 23 21 24 22 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len); -
benchmark/io/http/worker.cfa
rd672350 rdbe2533 23 23 this.pipe[1] = -1; 24 24 this.done = false; 25 26 this.stats.sendfile.calls = 0;27 this.stats.sendfile.tries = 0;28 this.stats.sendfile.header = 0;29 this.stats.sendfile.splcin = 0;30 this.stats.sendfile.splcot = 0;31 for(i; zipf_cnts) {32 this.stats.sendfile.avgrd[i].calls = 0;33 this.stats.sendfile.avgrd[i].bytes = 0;34 }35 25 } 36 26 … … 133 123 134 124 // Send the desired file 135 int ret = answer_sendfile( this.pipe, fd, ans_fd, count , this.stats.sendfile);125 int ret = answer_sendfile( this.pipe, fd, ans_fd, count); 136 126 if( ret == -ECONNRESET ) break REQUEST; 137 127 -
benchmark/io/http/worker.hfa
rd672350 rdbe2533 11 11 //============================================================================================= 12 12 13 extern const size_t zipf_sizes[];14 enum { zipf_cnts = 36, };15 16 struct sendfile_stats_t {17 volatile uint64_t calls;18 volatile uint64_t tries;19 volatile uint64_t header;20 volatile uint64_t splcin;21 volatile uint64_t splcot;22 struct {23 volatile uint64_t calls;24 volatile uint64_t bytes;25 } avgrd[zipf_cnts];26 };27 28 13 thread Worker { 29 14 int pipe[2]; … … 33 18 int flags; 34 19 volatile bool done; 35 struct {36 sendfile_stats_t sendfile;37 } stats;38 20 }; 39 21 void ?{}( Worker & this);
Note:
See TracChangeset
for help on using the changeset viewer.