source: benchmark/io/http/main.cfa@ b787cad

ADT ast-experimental enum pthread-emulation qualifiedEnum
Last change on this file since b787cad was ef3c383, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Added statistics about sendfile in the webserver

  • Property mode set to 100644
File size: 9.6 KB
RevLine 
[1e8b4b49]1#define _GNU_SOURCE
[0aec496]2
3#include <errno.h>
4#include <stdio.h>
5#include <string.h>
6#include <unistd.h>
7extern "C" {
[6117fc0]8 #include <sched.h>
[c2df3031]9 #include <signal.h>
[0aec496]10 #include <sys/socket.h>
11 #include <netinet/in.h>
12}
13
[8c43d05]14#include <fstream.hfa>
[0aec496]15#include <kernel.hfa>
[153dc387]16#include <iofwd.hfa>
[0aec496]17#include <stats.hfa>
[d11d6eb]18#include <time.hfa>
[0aec496]19#include <thread.hfa>
20
21#include "filecache.hfa"
22#include "options.hfa"
23#include "worker.hfa"
24
[d11d6eb]25extern void register_fixed_files( cluster &, int *, unsigned count );
26
27Duration default_preemption() {
28 return 0;
29}
30
[153dc387]31//=============================================================================================
32// Stats Printer
33//============================================================================================='
34
[ef3c383]35thread StatsPrinter {
36 Worker * workers;
37 int worker_cnt;
38};
[153dc387]39
[348f81d5]40void ?{}( StatsPrinter & this, cluster & cl ) {
41 ((thread&)this){ "Stats Printer Thread", cl };
[ef3c383]42 this.worker_cnt = 0;
[153dc387]43}
44
[2cd784a]45void ^?{}( StatsPrinter & mutex this ) {}
46
[ef3c383]47#define eng3(X) (ws(3, 3, unit(eng( X ))))
48
[153dc387]49void main(StatsPrinter & this) {
50 LOOP: for() {
51 waitfor( ^?{} : this) {
52 break LOOP;
53 }
54 or else {}
55
56 sleep(10`s);
57
[348f81d5]58 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
[ef3c383]59 if(this.worker_cnt != 0) {
60 uint64_t tries = 0;
61 uint64_t calls = 0;
62 uint64_t header = 0;
63 uint64_t splcin = 0;
64 uint64_t splcot = 0;
65 struct {
66 volatile uint64_t calls;
67 volatile uint64_t bytes;
68 } avgrd[zipf_cnts];
69 memset(avgrd, 0, sizeof(avgrd));
70
71 for(i; this.worker_cnt) {
72 tries += this.workers[i].stats.sendfile.tries;
73 calls += this.workers[i].stats.sendfile.calls;
74 header += this.workers[i].stats.sendfile.header;
75 splcin += this.workers[i].stats.sendfile.splcin;
76 splcot += this.workers[i].stats.sendfile.splcot;
77 for(j; zipf_cnts) {
78 avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls;
79 avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes;
80 }
81 }
82
83 double ratio = ((double)tries) / calls;
84
85 sout | "----- Worker Stats -----";
86 sout | "sendfile : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";
87 sout | " " | header | "header," | splcin | "splice in," | splcot | "splice out";
88 sout | " - zipf sizes:";
89 for(i; zipf_cnts) {
90 double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;
91 sout | " " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";
92 }
93 }
94 else {
95 sout | "No Workers!";
96 }
[348f81d5]97 }
98}
99
100//=============================================================================================
101// Globals
102//=============================================================================================
103struct ServerCluster {
104 cluster self;
105 processor * procs;
[2cd784a]106 // io_context * ctxs;
[348f81d5]107 StatsPrinter * prnt;
108
109};
110
111void ?{}( ServerCluster & this ) {
112 (this.self){ "Server Cluster", options.clopts.params };
113
[6117fc0]114 cpu_set_t fullset;
115 CPU_ZERO(&fullset);
116 int ret = sched_getaffinity(getpid(), sizeof(fullset), &fullset);
117 if( ret != 0 ) abort | "sched_getaffinity failed with" | errno | strerror( errno );
118 int cnt = CPU_COUNT(&fullset);
119
[348f81d5]120 this.procs = alloc(options.clopts.nprocs);
121 for(i; options.clopts.nprocs) {
122 (this.procs[i]){ "Benchmark Processor", this.self };
123
[6117fc0]124 int c = 0;
125 int n = 1 + (i % cnt);
126 for(int j = 0; j < CPU_SETSIZE; j++) {
127 if(CPU_ISSET(j, &fullset)) n--;
128 if(n == 0) {
129 c = j;
130 break;
131 }
132 }
133 cpu_set_t localset;
134 CPU_ZERO(&localset);
135 CPU_SET(c, &localset);
136 ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
137 if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
138
[348f81d5]139 #if !defined(__CFA_NO_STATISTICS__)
140 if( options.clopts.procstats ) {
141 print_stats_at_exit( *this.procs, this.self.print_stats );
142 }
143 if( options.clopts.viewhalts ) {
144 print_halts( *this.procs );
145 }
146 #endif
147 }
148
[2cd784a]149 if(options.stats) {
150 this.prnt = alloc();
151 (*this.prnt){ this.self };
152 } else {
153 this.prnt = 0p;
[348f81d5]154 }
155
156 #if !defined(__CFA_NO_STATISTICS__)
157 print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
158 #endif
159
160 options.clopts.instance[options.clopts.cltr_cnt] = &this.self;
161 options.clopts.cltr_cnt++;
[153dc387]162}
163
[348f81d5]164void ^?{}( ServerCluster & this ) {
[2cd784a]165 delete(this.prnt);
[348f81d5]166
167 for(i; options.clopts.nprocs) {
168 ^(this.procs[i]){};
169 }
170 free(this.procs);
171
172 ^(this.self){};
173}
174
175extern void init_protocol(void);
176extern void deinit_protocol(void);
177
[0aec496]178//=============================================================================================
179// Main
180//============================================================================================='
181int main( int argc, char * argv[] ) {
[c2df3031]182 __sighandler_t s = 1p;
183 signal(SIGPIPE, s);
184
[0aec496]185 //===================
186 // Parse args
[b57db73]187 parse_options(argc, argv);
[0aec496]188
189 //===================
190 // Open Files
[b57db73]191 if( options.file_cache.path ) {
192 sout | "Filling cache from" | options.file_cache.path;
193 fill_cache( options.file_cache.path );
194 }
[0aec496]195
196 //===================
197 // Open Socket
[8c43d05]198 sout | getpid() | ": Listening on port" | options.socket.port;
[0aec496]199 int server_fd = socket(AF_INET, SOCK_STREAM, 0);
200 if(server_fd < 0) {
201 abort( "socket error: (%d) %s\n", (int)errno, strerror(errno) );
202 }
203
[7f389a5c]204 int ret = 0;
[0aec496]205 struct sockaddr_in address;
206 int addrlen = sizeof(address);
207 memset( (char *)&address, '\0' );
208 address.sin_family = AF_INET;
209 address.sin_addr.s_addr = htonl(INADDR_ANY);
[2ecbd7b]210 address.sin_port = htons( options.socket.port );
[0aec496]211
[ee913e0a]212 int waited = 0;
213 for() {
[1e8b4b49]214 int sockfd = server_fd;
215 __CONST_SOCKADDR_ARG addr;
216 addr.__sockaddr__ = (struct sockaddr *)&address;
217 socklen_t addrlen = sizeof(address);
218 ret = bind( sockfd, addr, addrlen );
[ee913e0a]219 if(ret < 0) {
[c3ee5f3]220 if(errno == EADDRINUSE) {
[ee913e0a]221 if(waited == 0) {
[3a0ddb6]222 if(!options.interactive) abort | "Port already in use in non-interactive mode. Aborting";
[8c43d05]223 sout | "Waiting for port";
[ee913e0a]224 } else {
[8c43d05]225 sout | "\r" | waited | nonl;
226 flush( sout );
[ee913e0a]227 }
228 waited ++;
229 sleep( 1`s );
230 continue;
231 }
232 abort( "bind error: (%d) %s\n", (int)errno, strerror(errno) );
233 }
234 break;
[0aec496]235 }
236
[2ecbd7b]237 ret = listen( server_fd, options.socket.backlog );
[0aec496]238 if(ret < 0) {
239 abort( "listen error: (%d) %s\n", (int)errno, strerror(errno) );
240 }
241
242 //===================
243 // Run Server Cluster
244 {
[d9c2284]245 int pipe_cnt = options.clopts.nworkers * 2;
246 int pipe_off;
247 int * fds;
248 [fds, pipe_off] = filefds( pipe_cnt );
249 for(i; 0 ~ pipe_cnt ~ 2) {
250 int ret = pipe(&fds[pipe_off + i]);
251 if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); }
252 }
253
[4f762d3]254 // if(options.file_cache.path && options.file_cache.fixed_fds) {
255 // register_fixed_files(cl, fds, pipe_off);
256 // }
[d11d6eb]257
[0aec496]258 {
[348f81d5]259 ServerCluster cl[options.clopts.nclusters];
[ece0e80]260
261 init_protocol();
[0aec496]262 {
[3eb540fb]263 Worker * workers = anew(options.clopts.nworkers);
[ef3c383]264 cl[0].prnt->workers = workers;
265 cl[0].prnt->worker_cnt = options.clopts.nworkers;
[d9c2284]266 for(i; options.clopts.nworkers) {
[d11d6eb]267 // if( options.file_cache.fixed_fds ) {
268 // workers[i].pipe[0] = pipe_off + (i * 2) + 0;
269 // workers[i].pipe[1] = pipe_off + (i * 2) + 1;
270 // }
271 // else
272 {
[d9c2284]273 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0];
274 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1];
[8e3034d]275 workers[i].sockfd = server_fd;
276 workers[i].addr = (struct sockaddr *)&address;
277 workers[i].addrlen = (socklen_t*)&addrlen;
278 workers[i].flags = 0;
[d9c2284]279 }
[e235429]280 unpark( workers[i] );
[d9c2284]281 }
[348f81d5]282 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
283 for(i; options.clopts.nclusters) {
284 sout | options.clopts.thrd_cnt[i] | nonl;
285 }
286 sout | nl;
[4087baf]287 if(!options.interactive) park();
[0aec496]288 {
[e95a117]289 char buffer[128];
[2cd784a]290 for() {
291 int ret = cfa_read(0, buffer, 128, 0);
292 if(ret == 0) break;
[153dc387]293 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
[2cd784a]294 sout | "User wrote '" | "" | nonl;
295 write(sout, buffer, ret - 1);
296 sout | "'";
[e95a117]297 }
298
[8c43d05]299 sout | "Shutdown received";
[0aec496]300 }
[ece0e80]301
[b57db73]302 sout | "Notifying connections..." | nonl; flush( sout );
[ece0e80]303 for(i; options.clopts.nworkers) {
[481ee28]304 workers[i].done = true;
[ece0e80]305 }
[b57db73]306 sout | "done";
[ece0e80]307
[b57db73]308 sout | "Shutting down socket..." | nonl; flush( sout );
[ece0e80]309 int ret = shutdown( server_fd, SHUT_RD );
[b57db73]310 if( ret < 0 ) {
311 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
312 }
313 sout | "done";
[ece0e80]314
315 //===================
316 // Close Socket
[b57db73]317 sout | "Closing Socket..." | nonl; flush( sout );
[ece0e80]318 ret = close( server_fd );
319 if(ret < 0) {
320 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
321 }
[0197418]322 sout | "done";
323
[b57db73]324 sout | "Stopping connection threads..." | nonl; flush( sout );
[3eb540fb]325 adelete(workers);
[0aec496]326 }
[8c43d05]327 sout | "done";
[ece0e80]328
[b57db73]329 sout | "Stopping protocol threads..." | nonl; flush( sout );
[ece0e80]330 deinit_protocol();
[8c43d05]331 sout | "done";
332
[348f81d5]333 sout | "Stopping processors/clusters..." | nonl; flush( sout );
[0aec496]334 }
[8c43d05]335 sout | "done";
[d9c2284]336
[b57db73]337 sout | "Closing splice fds..." | nonl; flush( sout );
[d9c2284]338 for(i; pipe_cnt) {
339 ret = close( fds[pipe_off + i] );
340 if(ret < 0) {
341 abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );
342 }
343 }
344 free(fds);
[8c43d05]345 sout | "done";
[c3ee5f3]346
[b57db73]347 sout | "Stopping processors..." | nonl; flush( sout );
[0aec496]348 }
[8c43d05]349 sout | "done";
[0aec496]350
351 //===================
352 // Close Files
[b57db73]353 if( options.file_cache.path ) {
354 sout | "Closing open files..." | nonl; flush( sout );
355 close_cache();
356 sout | "done";
357 }
[ef3c383]358}
359
360const size_t zipf_sizes[] = { 102, 204, 307, 409, 512, 614, 716, 819, 921, 1024, 2048, 3072, 4096, 5120, 6144, 7168, 8192, 9216, 10240, 20480, 30720, 40960, 51200, 61440, 71680, 81920, 92160, 102400, 204800, 307200, 409600, 512000, 614400, 716800, 819200, 921600 };
361static_assert(zipf_cnts == sizeof(zipf_sizes) / sizeof(zipf_sizes[0]));
Note: See TracBrowser for help on using the repository browser.