- Timestamp:
- Mar 21, 2022, 1:43:58 PM (3 years ago)
- Branches:
- ADT, ast-experimental, enum, master, pthread-emulation, qualifiedEnum
- Children:
- d672350
- Parents:
- 3a40df6
- Location:
- benchmark/io/http
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/main.cfa
r3a40df6 ref3c383 33 33 //=============================================================================================' 34 34 35 thread StatsPrinter {}; 35 thread StatsPrinter { 36 Worker * workers; 37 int worker_cnt; 38 }; 36 39 37 40 void ?{}( StatsPrinter & this, cluster & cl ) { 38 41 ((thread&)this){ "Stats Printer Thread", cl }; 42 this.worker_cnt = 0; 39 43 } 40 44 41 45 void ^?{}( StatsPrinter & mutex this ) {} 46 47 #define eng3(X) (ws(3, 3, unit(eng( X )))) 42 48 43 49 void main(StatsPrinter & this) { … … 51 57 52 58 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO ); 59 if(this.worker_cnt != 0) { 60 uint64_t tries = 0; 61 uint64_t calls = 0; 62 uint64_t header = 0; 63 uint64_t splcin = 0; 64 uint64_t splcot = 0; 65 struct { 66 volatile uint64_t calls; 67 volatile uint64_t bytes; 68 } avgrd[zipf_cnts]; 69 memset(avgrd, 0, sizeof(avgrd)); 70 71 for(i; this.worker_cnt) { 72 tries += this.workers[i].stats.sendfile.tries; 73 calls += this.workers[i].stats.sendfile.calls; 74 header += this.workers[i].stats.sendfile.header; 75 splcin += this.workers[i].stats.sendfile.splcin; 76 splcot += this.workers[i].stats.sendfile.splcot; 77 for(j; zipf_cnts) { 78 avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls; 79 avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes; 80 } 81 } 82 83 double ratio = ((double)tries) / calls; 84 85 sout | "----- Worker Stats -----"; 86 sout | "sendfile : " | calls | "calls," | tries | "tries (" | ratio | " try/call)"; 87 sout | " " | header | "header," | splcin | "splice in," | splcot | "splice out"; 88 sout | " - zipf sizes:"; 89 for(i; zipf_cnts) { 90 double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0; 91 sout | " " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written"; 92 } 93 } 94 else { 95 sout | "No Workers!"; 96 } 53 97 } 54 98 } … … 218 262 { 219 263 Worker * workers = anew(options.clopts.nworkers); 264 cl[0].prnt->workers = workers; 265 cl[0].prnt->worker_cnt = options.clopts.nworkers; 220 266 for(i; options.clopts.nworkers) { 221 267 // if( options.file_cache.fixed_fds ) { … … 311 357 } 312 358 } 359 360 const size_t zipf_sizes[] = { 102, 204, 307, 409, 512, 614, 716, 819, 921, 1024, 2048, 3072, 4096, 5120, 6144, 7168, 8192, 9216, 10240, 20480, 30720, 40960, 51200, 61440, 71680, 81920, 92160, 102400, 204800, 307200, 409600, 512000, 614400, 716800, 819200, 921600 }; 361 static_assert(zipf_cnts == sizeof(zipf_sizes) / sizeof(zipf_sizes[0])); -
benchmark/io/http/parhttperf
r3a40df6 ref3c383 6 6 7 7 mkdir -p out 8 rm -v out/* 8 rm out/* 9 echo "httperf --client [0-$(($NTHREADS - 1))]/$NTHREADS $@ > out/result.[0-$(($NTHREADS - 1))].out" 9 10 for ((i=0; i<$NTHREADS; i++)) 10 11 do 11 # echo "httperf --client $i/$NTHREADS $@ > out/result.$i.out"12 12 httperf --client $i/$NTHREADS $@ > out/result.$i.out & 13 13 done -
benchmark/io/http/protocol.cfa
r3a40df6 ref3c383 24 24 25 25 #include "options.hfa" 26 #include "worker.hfa" 26 27 27 28 #define PLAINTEXT_1WRITE … … 156 157 157 158 count -= ret; 158 offset += ret;159 159 size_t in_pipe = ret; 160 160 SPLICE2: while(in_pipe > 0) { … … 266 266 } 267 267 268 static inline int wait_and_process(header_g & this ) {268 static inline int wait_and_process(header_g & this, sendfile_stats_t & stats) { 269 269 wait(this.f); 270 270 … … 295 295 } 296 296 297 stats.header++; 298 297 299 // It must be a Short read 298 300 this.len -= this.f.result; … … 306 308 io_future_t f; 307 309 int fd; int pipe; size_t len; off_t off; 310 short zipf_idx; 308 311 FSM_Result res; 309 312 }; … … 314 317 this.len = len; 315 318 this.off = 0; 319 this.zipf_idx = -1; 320 STATS: for(i; zipf_cnts) { 321 if(len <= zipf_sizes[i]) { 322 this.zipf_idx = i; 323 break STATS; 324 } 325 } 326 if(this.zipf_idx < 0) mutex(serr) serr | "SPLICE IN" | len | " greated than biggest zipf file"; 316 327 } 317 328 … … 329 340 } 330 341 331 static inline int wait_and_process(splice_in_t & this ) {342 static inline int wait_and_process(splice_in_t & this, sendfile_stats_t & stats ) { 332 343 wait(this.f); 333 344 … … 345 356 return error(this.res, -ECONNRESET); 346 357 } 358 mutex(serr) serr | "SPLICE IN got" | error | ", WTF!"; 359 return error(this.res, -ECONNRESET); 347 360 } 348 361 … … 357 370 return done(this.res); 358 371 } 372 373 stats.splcin++; 374 stats.avgrd[this.zipf_idx].calls++; 375 stats.avgrd[this.zipf_idx].bytes += this.f.result; 359 376 360 377 // It must be a Short read … … 398 415 } 399 416 400 static inline void wait_and_process(splice_out_g & this ) {417 static inline void wait_and_process(splice_out_g & this, sendfile_stats_t & stats ) { 401 418 wait(this.f); 402 419 … … 414 431 return error(this, -ECONNRESET); 415 432 } 433 mutex(serr) serr | "SPLICE OUT got" | error | ", WTF!"; 434 return error(this, -ECONNRESET); 416 435 } 417 436 … … 428 447 429 448 SHORT_WRITE: 449 stats.splcot++; 450 430 451 // It must be a Short Write 431 452 this.len -= this.f.result; … … 434 455 } 435 456 436 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize ) { 457 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize, sendfile_stats_t & stats ) { 458 stats.calls++; 437 459 #if defined(LINKED_IO) 438 460 char buffer[512]; … … 443 465 444 466 RETRY_LOOP: for() { 467 stats.tries++; 445 468 int have = need(header.res) + need(splice_in.res) + 1; 446 469 int idx = 0; … … 461 484 // we may need to kill the connection if it fails 462 485 // If it already completed, this is a no-op 463 wait_and_process(splice_in );486 wait_and_process(splice_in, stats); 464 487 465 488 if(is_error(splice_in.res)) { … … 469 492 470 493 // Process the other 2 471 wait_and_process(header );472 wait_and_process(splice_out );494 wait_and_process(header, stats); 495 wait_and_process(splice_out, stats); 473 496 474 497 if(is_done(splice_out.res)) { … … 490 513 return len + fsize; 491 514 #else 515 stats.tries++; 492 516 int ret = answer_header(fd, fsize); 493 517 if( ret < 0 ) { close(fd); return ret; } -
benchmark/io/http/protocol.hfa
r3a40df6 ref3c383 1 1 #pragma once 2 3 struct sendfile_stats_t; 2 4 3 5 enum HttpCode { … … 18 20 int answer_plaintext( int fd ); 19 21 int answer_empty( int fd ); 20 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t count );22 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t count, struct sendfile_stats_t & ); 21 23 22 24 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len); -
benchmark/io/http/worker.cfa
r3a40df6 ref3c383 23 23 this.pipe[1] = -1; 24 24 this.done = false; 25 26 this.stats.sendfile.calls = 0; 27 this.stats.sendfile.tries = 0; 28 this.stats.sendfile.header = 0; 29 this.stats.sendfile.splcin = 0; 30 this.stats.sendfile.splcot = 0; 31 for(i; zipf_cnts) { 32 this.stats.sendfile.avgrd[i].calls = 0; 33 this.stats.sendfile.avgrd[i].bytes = 0; 34 } 25 35 } 26 36 … … 123 133 124 134 // Send the desired file 125 int ret = answer_sendfile( this.pipe, fd, ans_fd, count );135 int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile ); 126 136 if( ret == -ECONNRESET ) break REQUEST; 127 137 -
benchmark/io/http/worker.hfa
r3a40df6 ref3c383 11 11 //============================================================================================= 12 12 13 extern const size_t zipf_sizes[]; 14 enum { zipf_cnts = 36, }; 15 16 struct sendfile_stats_t { 17 volatile uint64_t calls; 18 volatile uint64_t tries; 19 volatile uint64_t header; 20 volatile uint64_t splcin; 21 volatile uint64_t splcot; 22 struct { 23 volatile uint64_t calls; 24 volatile uint64_t bytes; 25 } avgrd[zipf_cnts]; 26 }; 27 13 28 thread Worker { 14 29 int pipe[2]; … … 18 33 int flags; 19 34 volatile bool done; 35 struct { 36 sendfile_stats_t sendfile; 37 } stats; 20 38 }; 21 39 void ?{}( Worker & this);
Note: See TracChangeset
for help on using the changeset viewer.