source: benchmark/io/http/main.cfa@ 6e2b04e

ADT ast-experimental pthread-emulation qualifiedEnum
Last change on this file since 6e2b04e was 86c12d65, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Checkpoint of the broken version of reuseport

  • Property mode set to 100644
File size: 11.0 KB
Line 
1#define _GNU_SOURCE
2
3#include <errno.h>
4#include <signal.h>
5#include <stdio.h>
6#include <string.h>
7#include <unistd.h>
8extern "C" {
9 #include <sched.h>
10 #include <signal.h>
11 #include <sys/eventfd.h>
12 #include <sys/socket.h>
13 #include <netinet/in.h>
14}
15
16#include <fstream.hfa>
17#include <kernel.hfa>
18#include <locks.hfa>
19#include <iofwd.hfa>
20#include <stats.hfa>
21#include <time.hfa>
22#include <thread.hfa>
23
24#include "filecache.hfa"
25#include "options.hfa"
26#include "socket.hfa"
27#include "worker.hfa"
28
29extern void register_fixed_files( cluster &, int *, unsigned count );
30
31Duration default_preemption() {
32 return 0;
33}
34
35//=============================================================================================
36// Stats Printer
37//============================================================================================='
38
39thread StatsPrinter {
40 Worker * workers;
41 int worker_cnt;
42 condition_variable(fast_block_lock) var;
43};
44
45void ?{}( StatsPrinter & this, cluster & cl ) {
46 ((thread&)this){ "Stats Printer Thread", cl };
47 this.worker_cnt = 0;
48}
49
50void ^?{}( StatsPrinter & mutex this ) {}
51
52#define eng3(X) (ws(3, 3, unit(eng( X ))))
53
54void main(StatsPrinter & this) {
55 LOOP: for() {
56 waitfor( ^?{} : this) {
57 break LOOP;
58 }
59 or else {}
60
61 wait(this.var, 10`s);
62
63 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
64 if(this.worker_cnt != 0) {
65 uint64_t tries = 0;
66 uint64_t calls = 0;
67 uint64_t header = 0;
68 uint64_t splcin = 0;
69 uint64_t splcot = 0;
70 struct {
71 volatile uint64_t calls;
72 volatile uint64_t bytes;
73 } avgrd[zipf_cnts];
74 memset(avgrd, 0, sizeof(avgrd));
75
76 for(i; this.worker_cnt) {
77 tries += this.workers[i].stats.sendfile.tries;
78 calls += this.workers[i].stats.sendfile.calls;
79 header += this.workers[i].stats.sendfile.header;
80 splcin += this.workers[i].stats.sendfile.splcin;
81 splcot += this.workers[i].stats.sendfile.splcot;
82 for(j; zipf_cnts) {
83 avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls;
84 avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes;
85 }
86 }
87
88 double ratio = ((double)tries) / calls;
89
90 sout | "----- Worker Stats -----";
91 sout | "sendfile : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";
92 sout | " " | header | "header," | splcin | "splice in," | splcot | "splice out";
93 sout | " - zipf sizes:";
94 for(i; zipf_cnts) {
95 double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;
96 sout | " " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";
97 }
98 }
99 else {
100 sout | "No Workers!";
101 }
102 }
103}
104
105//=============================================================================================
106// Globals
107//=============================================================================================
108struct ServerCluster {
109 cluster self;
110 processor * procs;
111 // io_context * ctxs;
112 StatsPrinter * prnt;
113
114};
115
116void ?{}( ServerCluster & this ) {
117 (this.self){ "Server Cluster", options.clopts.params };
118
119 cpu_set_t fullset;
120 CPU_ZERO(&fullset);
121 int ret = sched_getaffinity(getpid(), sizeof(fullset), &fullset);
122 if( ret != 0 ) abort | "sched_getaffinity failed with" | errno | strerror( errno );
123 int cnt = CPU_COUNT(&fullset);
124
125 this.procs = alloc(options.clopts.nprocs);
126 for(i; options.clopts.nprocs) {
127 (this.procs[i]){ "Benchmark Processor", this.self };
128
129 int c = 0;
130 int n = 1 + (i % cnt);
131 for(int j = 0; j < CPU_SETSIZE; j++) {
132 if(CPU_ISSET(j, &fullset)) n--;
133 if(n == 0) {
134 c = j;
135 break;
136 }
137 }
138 cpu_set_t localset;
139 CPU_ZERO(&localset);
140 CPU_SET(c, &localset);
141 ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
142 if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
143
144 #if !defined(__CFA_NO_STATISTICS__)
145 if( options.clopts.procstats ) {
146 print_stats_at_exit( *this.procs, this.self.print_stats );
147 }
148 if( options.clopts.viewhalts ) {
149 print_halts( *this.procs );
150 }
151 #endif
152 }
153
154 if(options.stats) {
155 this.prnt = alloc();
156 (*this.prnt){ this.self };
157 } else {
158 this.prnt = 0p;
159 }
160
161 #if !defined(__CFA_NO_STATISTICS__)
162 print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
163 #endif
164
165 options.clopts.instance[options.clopts.cltr_cnt] = &this.self;
166 options.clopts.cltr_cnt++;
167}
168
169void ^?{}( ServerCluster & this ) {
170 delete(this.prnt);
171
172 for(i; options.clopts.nprocs) {
173 ^(this.procs[i]){};
174 }
175 free(this.procs);
176
177 ^(this.self){};
178}
179
180extern void init_protocol(void);
181extern void deinit_protocol(void);
182
183//=============================================================================================
184// Termination
185//=============================================================================================
186
187int closefd;
188void cleanstop(int) {
189 eventfd_t buffer = 1;
190 char * buffer_s = (char*)&buffer;
191 int ret = write(closefd, buffer_s, sizeof(buffer));
192 if(ret < 0) abort( "eventfd write error: (%d) %s\n", (int)errno, strerror(errno) );
193 return;
194}
195
196//=============================================================================================
197// Main
198//============================================================================================='
199int main( int argc, char * argv[] ) {
200 int ret;
201 __sighandler_t s = 1p;
202 signal(SIGPIPE, s);
203
204 //===================
205 // Parse args
206 parse_options(argc, argv);
207
208 //===================
209 // Setup non-interactive termination
210 if(!options.interactive) {
211 closefd = eventfd(0, 0);
212 if(closefd < 0) abort( "eventfd error: (%d) %s\n", (int)errno, strerror(errno) );
213
214 sighandler_t prev = signal(SIGTERM, cleanstop);
215 intptr_t prev_workaround = (intptr_t) prev;
216 // can't use SIG_ERR it crashes the compiler
217 if(prev_workaround == -1) abort( "signal setup error: (%d) %s\n", (int)errno, strerror(errno) );
218
219 sout | "Signal termination ready";
220 }
221
222 //===================
223 // Open Files
224 if( options.file_cache.path ) {
225 sout | "Filling cache from" | options.file_cache.path;
226 fill_cache( options.file_cache.path );
227 }
228
229 //===================
230 // Open Socket
231 sout | getpid() | ": Listening on port" | options.socket.port;
232
233 struct sockaddr_in address;
234 int addrlen = prepaddr(address);
235
236 int server_fd;
237 if(!options.socket.manyreuse) {
238 server_fd = listener(address, addrlen);
239 }
240
241 //===================
242 // Run Server Cluster
243 {
244 int pipe_cnt = options.clopts.nworkers * 2;
245 int pipe_off;
246 int * fds;
247 [fds, pipe_off] = filefds( pipe_cnt );
248 for(i; 0 ~ pipe_cnt ~ 2) {
249 int ret = pipe(&fds[pipe_off + i]);
250 if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); }
251 }
252
253 // if(options.file_cache.path && options.file_cache.fixed_fds) {
254 // register_fixed_files(cl, fds, pipe_off);
255 // }
256
257 {
258 // Stats printer makes a copy so this needs to persist longer than normal
259 Worker * workers;
260 ServerCluster cl[options.clopts.nclusters];
261
262 init_protocol();
263 {
264 workers = anew(options.clopts.nworkers);
265 cl[0].prnt->workers = workers;
266 cl[0].prnt->worker_cnt = options.clopts.nworkers;
267 for(i; options.clopts.nworkers) {
268 // if( options.file_cache.fixed_fds ) {
269 // workers[i].pipe[0] = pipe_off + (i * 2) + 0;
270 // workers[i].pipe[1] = pipe_off + (i * 2) + 1;
271 // }
272 // else
273 {
274 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0];
275 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1];
276 workers[i].sockfd = options.socket.manyreuse ? listener(address, addrlen) : server_fd;
277 workers[i].addr = (struct sockaddr *)&address;
278 workers[i].addrlen = (socklen_t*)&addrlen;
279 workers[i].flags = 0;
280 }
281 unpark( workers[i] );
282 }
283 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
284 for(i; options.clopts.nclusters) {
285 sout | options.clopts.thrd_cnt[i] | nonl;
286 }
287 sout | nl;
288 {
289 if(options.interactive) {
290 char buffer[128];
291 for() {
292 int ret = cfa_read(0, buffer, 128, 0);
293 if(ret == 0) break;
294 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
295 sout | "User wrote '" | "" | nonl;
296 write(sout, buffer, ret - 1);
297 sout | "'";
298 }
299 }
300 else {
301 char buffer[sizeof(eventfd_t)];
302 int ret = cfa_read(closefd, buffer, sizeof(eventfd_t), 0);
303 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
304 }
305
306 sout | "Shutdown received";
307 }
308
309 sout | "Notifying connections..." | nonl; flush( sout );
310 for(i; options.clopts.nworkers) {
311 workers[i].done = true;
312 }
313 sout | "done";
314
315 sout | "Shutting down socket..." | nonl; flush( sout );
316 if(options.socket.manyreuse) {
317 for(i; options.clopts.nworkers) {
318 ret = shutdown( workers[i].sockfd, SHUT_RD );
319 if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) );
320 }
321 }
322 else {
323 ret = shutdown( server_fd, SHUT_RD );
324 if( ret < 0 ) {
325 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
326 }
327 }
328 sout | "done";
329
330 //===================
331 // Close Socket
332 sout | "Closing Socket..." | nonl; flush( sout );
333 if(options.socket.manyreuse) {
334 for(i; options.clopts.nworkers) {
335 ret = close(workers[i].sockfd);
336 if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) );
337 }
338 }
339 else {
340 ret = close( server_fd );
341 if(ret < 0) {
342 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
343 }
344 }
345 sout | "done";
346
347 sout | "Stopping connection threads..." | nonl; flush( sout );
348 for(i; options.clopts.nworkers) {
349 for(j; 2) {
350 ret = close(workers[i].pipe[j]);
351 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
352 }
353 join(workers[i]);
354 }
355 }
356 sout | "done";
357
358 sout | "Stopping protocol threads..." | nonl; flush( sout );
359 deinit_protocol();
360 sout | "done";
361
362 sout | "Stopping printer threads..." | nonl; flush( sout );
363 for(i; options.clopts.nclusters) {
364 StatsPrinter * p = cl[i].prnt;
365 if(p) {
366 notify_one(p->var);
367 join(*p);
368 }
369 }
370 sout | "done";
371
372 // Now that the stats printer is stopped, we can reclaim this
373 adelete(workers);
374
375 sout | "Stopping processors/clusters..." | nonl; flush( sout );
376 }
377 sout | "done";
378
379 // sout | "Closing splice fds..." | nonl; flush( sout );
380 // for(i; pipe_cnt) {
381 // ret = close( fds[pipe_off + i] );
382 // if(ret < 0) {
383 // abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );
384 // }
385 // }
386 free(fds);
387 sout | "done";
388
389 sout | "Stopping processors..." | nonl; flush( sout );
390 }
391 sout | "done";
392
393 //===================
394 // Close Files
395 if( options.file_cache.path ) {
396 sout | "Closing open files..." | nonl; flush( sout );
397 close_cache();
398 sout | "done";
399 }
400}
401
402const size_t zipf_sizes[] = { 102, 204, 307, 409, 512, 614, 716, 819, 921, 1024, 2048, 3072, 4096, 5120, 6144, 7168, 8192, 9216, 10240, 20480, 30720, 40960, 51200, 61440, 71680, 81920, 92160, 102400, 204800, 307200, 409600, 512000, 614400, 716800, 819200, 921600 };
403static_assert(zipf_cnts == sizeof(zipf_sizes) / sizeof(zipf_sizes[0]));
Note: See TracBrowser for help on using the repository browser.