Changeset 137974ae
- Timestamp:
- Jun 9, 2022, 10:56:34 AM (2 years ago)
- Branches:
- ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
- Children:
- 430ce61
- Parents:
- 8c58e73
- Location:
- benchmark/io/http
- Files:
-
- 2 added
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/Makefile.am
r8c58e73 r137974ae 37 37 options.cfa \ 38 38 options.hfa \ 39 printer.cfa \ 40 printer.hfa \ 39 41 protocol.cfa \ 40 42 protocol.hfa \ -
benchmark/io/http/main.cfa
r8c58e73 r137974ae 25 25 #include "options.hfa" 26 26 #include "socket.hfa" 27 #include "printer.hfa" 27 28 #include "worker.hfa" 28 29 … … 31 32 Duration default_preemption() { 32 33 return 0; 33 }34 35 //=============================================================================================36 // Stats Printer37 //============================================================================================='38 39 thread StatsPrinter {40 connection ** conns;41 volatile int conn_cnt;42 condition_variable(fast_block_lock) var;43 };44 45 void ?{}( StatsPrinter & this, cluster & cl ) {46 ((thread&)this){ "Stats Printer Thread", cl };47 this.conn_cnt = 0;48 }49 50 void ^?{}( StatsPrinter & mutex this ) {}51 52 #define eng3(X) (ws(3, 3, unit(eng( X ))))53 54 void main(StatsPrinter & this) {55 LOOP: for() {56 waitfor( ^?{} : this) {57 break LOOP;58 }59 or else {}60 61 wait(this.var, 10`s);62 63 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );64 if(this.conn_cnt != 0) {65 uint64_t tries = 0;66 uint64_t calls = 0;67 uint64_t header = 0;68 uint64_t splcin = 0;69 uint64_t splcot = 0;70 struct {71 volatile uint64_t calls;72 volatile uint64_t bytes;73 } avgrd[zipf_cnts];74 memset(avgrd, 0, sizeof(avgrd));75 76 for(i; this.conn_cnt) {77 tries += this.conns[i]->stats.sendfile.tries;78 calls += this.conns[i]->stats.sendfile.calls;79 header += this.conns[i]->stats.sendfile.header;80 splcin += this.conns[i]->stats.sendfile.splcin;81 splcot += this.conns[i]->stats.sendfile.splcot;82 for(j; zipf_cnts) {83 avgrd[j].calls += this.conns[i]->stats.sendfile.avgrd[j].calls;84 avgrd[j].bytes += this.conns[i]->stats.sendfile.avgrd[j].bytes;85 }86 }87 88 double ratio = ((double)tries) / calls;89 90 sout | "----- Connection Stats -----";91 sout | "sendfile : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";92 sout | " " | header | "header," | splcin | "splice in," | splcot | "splice out";93 sout | " - zipf sizes:";94 for(i; zipf_cnts) {95 double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;96 sout | " " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";97 }98 }99 else {100 sout | "No Connections!";101 }102 }103 34 } 104 35 … … 109 40 cluster self; 110 41 processor * procs; 111 // io_context * ctxs;112 StatsPrinter * prnt;113 42 114 43 }; … … 152 81 } 153 82 154 if(options.stats) {155 this.prnt = alloc();156 (*this.prnt){ this.self };157 } else {158 this.prnt = 0p;159 }160 161 83 #if !defined(__CFA_NO_STATISTICS__) 162 84 print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO ); … … 167 89 168 90 void ^?{}( ServerCluster & this ) { 169 delete(this.prnt);170 171 91 for(i; options.clopts.nprocs) { 172 92 ^(this.procs[i]){}; … … 268 188 Q * queues = 0p; 269 189 ServerCluster cl; 190 191 if(options.stats) { 192 stats_thrd = alloc(); 193 (*stats_thrd){ cl.self }; 194 } else { 195 stats_thrd = 0p; 196 } 270 197 271 198 init_protocol(); … … 319 246 } 320 247 } 321 cl.prnt->conns = conns; 322 cl.prnt->conn_cnt = options.clopts.nworkers; 248 323 249 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors"; 324 250 sout | nl; … … 436 362 437 363 sout | "Stopping printer threads..." | nonl; flush( sout ); 438 StatsPrinter * p = cl.prnt; 439 if(p) { 440 notify_one(p->var); 441 join(*p); 442 } 364 if(stats_thrd) { 365 notify_one(stats_thrd->var); 366 } 367 delete(stats_thrd); 443 368 sout | "done"; 444 369 -
benchmark/io/http/protocol.cfa
r8c58e73 r137974ae 587 587 588 588 void ?{}( DateFormater & this ) { 589 ((thread&)this){ "Server Date Thread", *options.clopts.instance [0]};589 ((thread&)this){ "Server Date Thread", *options.clopts.instance }; 590 590 this.idx = 0; 591 591 memset( &this.buffers[0], 0, sizeof(this.buffers[0]) ); -
benchmark/io/http/worker.cfa
r8c58e73 r137974ae 14 14 #include "filecache.hfa" 15 15 16 void ?{}( sendfile_stats_t & this ) {17 this.calls = 0;18 this.tries = 0;19 this.header = 0;20 this.splcin = 0;21 this.splcot = 0;22 for(i; zipf_cnts) {23 this.avgrd[i].calls = 0;24 this.avgrd[i].bytes = 0;25 }26 }27 28 16 //============================================================================================= 29 17 // Generic connection handling 30 18 //============================================================================================= 31 static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f ) {19 static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f, unsigned long long & last ) { 32 20 REQUEST: 33 21 for() { … … 111 99 if( options.log ) sout | "=== Answer sent ==="; 112 100 } 101 102 if (stats_thrd) { 103 unsigned long long next = rdtscl(); 104 if(next > (last + 500000000)) { 105 if(try_lock(stats_thrd->stats.lock)) { 106 push(this.stats.sendfile, stats_thrd->stats.send); 107 unlock(stats_thrd->stats.lock); 108 last = next; 109 } 110 } 111 } 113 112 } 114 113 … … 124 123 void main( AcceptWorker & this ) { 125 124 park(); 125 unsigned long long last = rdtscl(); 126 126 /* paranoid */ assert( this.conn.pipe[0] != -1 ); 127 127 /* paranoid */ assert( this.conn.pipe[1] != -1 ); … … 139 139 size_t len = options.socket.buflen; 140 140 char buffer[len]; 141 handle_connection( this.conn, fd, buffer, len, 0p );141 handle_connection( this.conn, fd, buffer, len, 0p, last ); 142 142 143 143 if( options.log ) sout | "=== Connection closed ==="; … … 157 157 void main( ChannelWorker & this ) { 158 158 park(); 159 unsigned long long last = rdtscl(); 159 160 /* paranoid */ assert( this.conn.pipe[0] != -1 ); 160 161 /* paranoid */ assert( this.conn.pipe[1] != -1 ); … … 168 169 169 170 if( options.log ) sout | "=== Waiting new connection ==="; 170 handle_connection( this.conn, p.out.fd, buffer, len, &p.f );171 handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last ); 171 172 172 173 if( options.log ) sout | "=== Connection closed ==="; … … 187 188 void main( Acceptor & this ) { 188 189 park(); 190 unsigned long long last = rdtscl(); 189 191 if( options.log ) sout | "=== Accepting connection ==="; 190 192 for() { … … 192 194 if(fd < 0) { 193 195 if( errno == EWOULDBLOCK) { 196 this.stats.eagains++; 194 197 yield(); 195 198 continue; … … 199 202 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 200 203 } 204 this.stats.accepts++; 205 201 206 if(this.done) return; 202 207 … … 210 215 if(p) break; 211 216 yield(); 217 this.stats.creates++; 212 218 }; 213 219 … … 216 222 } 217 223 224 if (stats_thrd) { 225 unsigned long long next = rdtscl(); 226 if(next > (last + 500000000)) { 227 if(try_lock(stats_thrd->stats.lock)) { 228 push(this.stats, stats_thrd->stats.accpt); 229 unlock(stats_thrd->stats.lock); 230 last = next; 231 } 232 } 233 } 234 218 235 if( options.log ) sout | "=== Accepting connection ==="; 219 236 } -
benchmark/io/http/worker.hfa
r8c58e73 r137974ae 9 9 } 10 10 11 #include "printer.hfa" 12 11 13 //============================================================================================= 12 14 // Worker Thread 13 15 //============================================================================================= 14 15 extern const size_t zipf_sizes[];16 enum { zipf_cnts = 36, };17 18 struct sendfile_stats_t {19 volatile uint64_t calls;20 volatile uint64_t tries;21 volatile uint64_t header;22 volatile uint64_t splcin;23 volatile uint64_t splcot;24 struct {25 volatile uint64_t calls;26 volatile uint64_t bytes;27 } avgrd[zipf_cnts];28 };29 30 void ?{}( sendfile_stats_t & this );31 16 32 17 struct connection { … … 85 70 int flags; 86 71 volatile bool done; 72 acceptor_stats_t stats; 87 73 }; 88 74 void ?{}( Acceptor & );
Note: See TracChangeset
for help on using the changeset viewer.