source: benchmark/io/http/main.cfa@ 5f53cc3

ADT ast-experimental enum pthread-emulation qualifiedEnum
Last change on this file since 5f53cc3 was ef3c383, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Added statistics about sendfile in the webserver

  • Property mode set to 100644
File size: 9.6 KB
Line 
1#define _GNU_SOURCE
2
3#include <errno.h>
4#include <stdio.h>
5#include <string.h>
6#include <unistd.h>
7extern "C" {
8 #include <sched.h>
9 #include <signal.h>
10 #include <sys/socket.h>
11 #include <netinet/in.h>
12}
13
14#include <fstream.hfa>
15#include <kernel.hfa>
16#include <iofwd.hfa>
17#include <stats.hfa>
18#include <time.hfa>
19#include <thread.hfa>
20
21#include "filecache.hfa"
22#include "options.hfa"
23#include "worker.hfa"
24
25extern void register_fixed_files( cluster &, int *, unsigned count );
26
27Duration default_preemption() {
28 return 0;
29}
30
31//=============================================================================================
32// Stats Printer
33//============================================================================================='
34
35thread StatsPrinter {
36 Worker * workers;
37 int worker_cnt;
38};
39
40void ?{}( StatsPrinter & this, cluster & cl ) {
41 ((thread&)this){ "Stats Printer Thread", cl };
42 this.worker_cnt = 0;
43}
44
45void ^?{}( StatsPrinter & mutex this ) {}
46
47#define eng3(X) (ws(3, 3, unit(eng( X ))))
48
49void main(StatsPrinter & this) {
50 LOOP: for() {
51 waitfor( ^?{} : this) {
52 break LOOP;
53 }
54 or else {}
55
56 sleep(10`s);
57
58 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
59 if(this.worker_cnt != 0) {
60 uint64_t tries = 0;
61 uint64_t calls = 0;
62 uint64_t header = 0;
63 uint64_t splcin = 0;
64 uint64_t splcot = 0;
65 struct {
66 volatile uint64_t calls;
67 volatile uint64_t bytes;
68 } avgrd[zipf_cnts];
69 memset(avgrd, 0, sizeof(avgrd));
70
71 for(i; this.worker_cnt) {
72 tries += this.workers[i].stats.sendfile.tries;
73 calls += this.workers[i].stats.sendfile.calls;
74 header += this.workers[i].stats.sendfile.header;
75 splcin += this.workers[i].stats.sendfile.splcin;
76 splcot += this.workers[i].stats.sendfile.splcot;
77 for(j; zipf_cnts) {
78 avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls;
79 avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes;
80 }
81 }
82
83 double ratio = ((double)tries) / calls;
84
85 sout | "----- Worker Stats -----";
86 sout | "sendfile : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";
87 sout | " " | header | "header," | splcin | "splice in," | splcot | "splice out";
88 sout | " - zipf sizes:";
89 for(i; zipf_cnts) {
90 double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;
91 sout | " " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";
92 }
93 }
94 else {
95 sout | "No Workers!";
96 }
97 }
98}
99
100//=============================================================================================
101// Globals
102//=============================================================================================
103struct ServerCluster {
104 cluster self;
105 processor * procs;
106 // io_context * ctxs;
107 StatsPrinter * prnt;
108
109};
110
111void ?{}( ServerCluster & this ) {
112 (this.self){ "Server Cluster", options.clopts.params };
113
114 cpu_set_t fullset;
115 CPU_ZERO(&fullset);
116 int ret = sched_getaffinity(getpid(), sizeof(fullset), &fullset);
117 if( ret != 0 ) abort | "sched_getaffinity failed with" | errno | strerror( errno );
118 int cnt = CPU_COUNT(&fullset);
119
120 this.procs = alloc(options.clopts.nprocs);
121 for(i; options.clopts.nprocs) {
122 (this.procs[i]){ "Benchmark Processor", this.self };
123
124 int c = 0;
125 int n = 1 + (i % cnt);
126 for(int j = 0; j < CPU_SETSIZE; j++) {
127 if(CPU_ISSET(j, &fullset)) n--;
128 if(n == 0) {
129 c = j;
130 break;
131 }
132 }
133 cpu_set_t localset;
134 CPU_ZERO(&localset);
135 CPU_SET(c, &localset);
136 ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
137 if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
138
139 #if !defined(__CFA_NO_STATISTICS__)
140 if( options.clopts.procstats ) {
141 print_stats_at_exit( *this.procs, this.self.print_stats );
142 }
143 if( options.clopts.viewhalts ) {
144 print_halts( *this.procs );
145 }
146 #endif
147 }
148
149 if(options.stats) {
150 this.prnt = alloc();
151 (*this.prnt){ this.self };
152 } else {
153 this.prnt = 0p;
154 }
155
156 #if !defined(__CFA_NO_STATISTICS__)
157 print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
158 #endif
159
160 options.clopts.instance[options.clopts.cltr_cnt] = &this.self;
161 options.clopts.cltr_cnt++;
162}
163
164void ^?{}( ServerCluster & this ) {
165 delete(this.prnt);
166
167 for(i; options.clopts.nprocs) {
168 ^(this.procs[i]){};
169 }
170 free(this.procs);
171
172 ^(this.self){};
173}
174
175extern void init_protocol(void);
176extern void deinit_protocol(void);
177
178//=============================================================================================
179// Main
180//============================================================================================='
181int main( int argc, char * argv[] ) {
182 __sighandler_t s = 1p;
183 signal(SIGPIPE, s);
184
185 //===================
186 // Parse args
187 parse_options(argc, argv);
188
189 //===================
190 // Open Files
191 if( options.file_cache.path ) {
192 sout | "Filling cache from" | options.file_cache.path;
193 fill_cache( options.file_cache.path );
194 }
195
196 //===================
197 // Open Socket
198 sout | getpid() | ": Listening on port" | options.socket.port;
199 int server_fd = socket(AF_INET, SOCK_STREAM, 0);
200 if(server_fd < 0) {
201 abort( "socket error: (%d) %s\n", (int)errno, strerror(errno) );
202 }
203
204 int ret = 0;
205 struct sockaddr_in address;
206 int addrlen = sizeof(address);
207 memset( (char *)&address, '\0' );
208 address.sin_family = AF_INET;
209 address.sin_addr.s_addr = htonl(INADDR_ANY);
210 address.sin_port = htons( options.socket.port );
211
212 int waited = 0;
213 for() {
214 int sockfd = server_fd;
215 __CONST_SOCKADDR_ARG addr;
216 addr.__sockaddr__ = (struct sockaddr *)&address;
217 socklen_t addrlen = sizeof(address);
218 ret = bind( sockfd, addr, addrlen );
219 if(ret < 0) {
220 if(errno == EADDRINUSE) {
221 if(waited == 0) {
222 if(!options.interactive) abort | "Port already in use in non-interactive mode. Aborting";
223 sout | "Waiting for port";
224 } else {
225 sout | "\r" | waited | nonl;
226 flush( sout );
227 }
228 waited ++;
229 sleep( 1`s );
230 continue;
231 }
232 abort( "bind error: (%d) %s\n", (int)errno, strerror(errno) );
233 }
234 break;
235 }
236
237 ret = listen( server_fd, options.socket.backlog );
238 if(ret < 0) {
239 abort( "listen error: (%d) %s\n", (int)errno, strerror(errno) );
240 }
241
242 //===================
243 // Run Server Cluster
244 {
245 int pipe_cnt = options.clopts.nworkers * 2;
246 int pipe_off;
247 int * fds;
248 [fds, pipe_off] = filefds( pipe_cnt );
249 for(i; 0 ~ pipe_cnt ~ 2) {
250 int ret = pipe(&fds[pipe_off + i]);
251 if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); }
252 }
253
254 // if(options.file_cache.path && options.file_cache.fixed_fds) {
255 // register_fixed_files(cl, fds, pipe_off);
256 // }
257
258 {
259 ServerCluster cl[options.clopts.nclusters];
260
261 init_protocol();
262 {
263 Worker * workers = anew(options.clopts.nworkers);
264 cl[0].prnt->workers = workers;
265 cl[0].prnt->worker_cnt = options.clopts.nworkers;
266 for(i; options.clopts.nworkers) {
267 // if( options.file_cache.fixed_fds ) {
268 // workers[i].pipe[0] = pipe_off + (i * 2) + 0;
269 // workers[i].pipe[1] = pipe_off + (i * 2) + 1;
270 // }
271 // else
272 {
273 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0];
274 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1];
275 workers[i].sockfd = server_fd;
276 workers[i].addr = (struct sockaddr *)&address;
277 workers[i].addrlen = (socklen_t*)&addrlen;
278 workers[i].flags = 0;
279 }
280 unpark( workers[i] );
281 }
282 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
283 for(i; options.clopts.nclusters) {
284 sout | options.clopts.thrd_cnt[i] | nonl;
285 }
286 sout | nl;
287 if(!options.interactive) park();
288 {
289 char buffer[128];
290 for() {
291 int ret = cfa_read(0, buffer, 128, 0);
292 if(ret == 0) break;
293 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
294 sout | "User wrote '" | "" | nonl;
295 write(sout, buffer, ret - 1);
296 sout | "'";
297 }
298
299 sout | "Shutdown received";
300 }
301
302 sout | "Notifying connections..." | nonl; flush( sout );
303 for(i; options.clopts.nworkers) {
304 workers[i].done = true;
305 }
306 sout | "done";
307
308 sout | "Shutting down socket..." | nonl; flush( sout );
309 int ret = shutdown( server_fd, SHUT_RD );
310 if( ret < 0 ) {
311 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
312 }
313 sout | "done";
314
315 //===================
316 // Close Socket
317 sout | "Closing Socket..." | nonl; flush( sout );
318 ret = close( server_fd );
319 if(ret < 0) {
320 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
321 }
322 sout | "done";
323
324 sout | "Stopping connection threads..." | nonl; flush( sout );
325 adelete(workers);
326 }
327 sout | "done";
328
329 sout | "Stopping protocol threads..." | nonl; flush( sout );
330 deinit_protocol();
331 sout | "done";
332
333 sout | "Stopping processors/clusters..." | nonl; flush( sout );
334 }
335 sout | "done";
336
337 sout | "Closing splice fds..." | nonl; flush( sout );
338 for(i; pipe_cnt) {
339 ret = close( fds[pipe_off + i] );
340 if(ret < 0) {
341 abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );
342 }
343 }
344 free(fds);
345 sout | "done";
346
347 sout | "Stopping processors..." | nonl; flush( sout );
348 }
349 sout | "done";
350
351 //===================
352 // Close Files
353 if( options.file_cache.path ) {
354 sout | "Closing open files..." | nonl; flush( sout );
355 close_cache();
356 sout | "done";
357 }
358}
359
360const size_t zipf_sizes[] = { 102, 204, 307, 409, 512, 614, 716, 819, 921, 1024, 2048, 3072, 4096, 5120, 6144, 7168, 8192, 9216, 10240, 20480, 30720, 40960, 51200, 61440, 71680, 81920, 92160, 102400, 204800, 307200, 409600, 512000, 614400, 716800, 819200, 921600 };
361static_assert(zipf_cnts == sizeof(zipf_sizes) / sizeof(zipf_sizes[0]));
Note: See TracBrowser for help on using the repository browser.