source: benchmark/io/http/main.cfa@ c4b10e2

ADT ast-experimental pthread-emulation qualifiedEnum stuck-waitfor-destruct
Last change on this file since c4b10e2 was c4b10e2, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Moved socket creation to a different file.
Makefile now has debug symbol.
Pipes are now closed earlier for a cleaner exit.

  • Property mode set to 100644
File size: 10.3 KB
Line 
1#define _GNU_SOURCE
2
3#include <errno.h>
4#include <signal.h>
5#include <stdio.h>
6#include <string.h>
7#include <unistd.h>
8extern "C" {
9 #include <sched.h>
10 #include <signal.h>
11 #include <sys/eventfd.h>
12 #include <sys/socket.h>
13 #include <netinet/in.h>
14}
15
16#include <fstream.hfa>
17#include <kernel.hfa>
18#include <iofwd.hfa>
19#include <stats.hfa>
20#include <time.hfa>
21#include <thread.hfa>
22
23#include "filecache.hfa"
24#include "options.hfa"
25#include "socket.hfa"
26#include "worker.hfa"
27
28extern void register_fixed_files( cluster &, int *, unsigned count );
29
30Duration default_preemption() {
31 return 0;
32}
33
34//=============================================================================================
35// Stats Printer
36//============================================================================================='
37
38thread StatsPrinter {
39 Worker * workers;
40 int worker_cnt;
41};
42
43void ?{}( StatsPrinter & this, cluster & cl ) {
44 ((thread&)this){ "Stats Printer Thread", cl };
45 this.worker_cnt = 0;
46}
47
48void ^?{}( StatsPrinter & mutex this ) {}
49
50#define eng3(X) (ws(3, 3, unit(eng( X ))))
51
52void main(StatsPrinter & this) {
53 LOOP: for() {
54 waitfor( ^?{} : this) {
55 break LOOP;
56 }
57 or else {}
58
59 sleep(10`s);
60
61 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
62 if(this.worker_cnt != 0) {
63 uint64_t tries = 0;
64 uint64_t calls = 0;
65 uint64_t header = 0;
66 uint64_t splcin = 0;
67 uint64_t splcot = 0;
68 struct {
69 volatile uint64_t calls;
70 volatile uint64_t bytes;
71 } avgrd[zipf_cnts];
72 memset(avgrd, 0, sizeof(avgrd));
73
74 for(i; this.worker_cnt) {
75 tries += this.workers[i].stats.sendfile.tries;
76 calls += this.workers[i].stats.sendfile.calls;
77 header += this.workers[i].stats.sendfile.header;
78 splcin += this.workers[i].stats.sendfile.splcin;
79 splcot += this.workers[i].stats.sendfile.splcot;
80 for(j; zipf_cnts) {
81 avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls;
82 avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes;
83 }
84 }
85
86 double ratio = ((double)tries) / calls;
87
88 sout | "----- Worker Stats -----";
89 sout | "sendfile : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";
90 sout | " " | header | "header," | splcin | "splice in," | splcot | "splice out";
91 sout | " - zipf sizes:";
92 for(i; zipf_cnts) {
93 double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;
94 sout | " " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";
95 }
96 }
97 else {
98 sout | "No Workers!";
99 }
100 }
101}
102
103//=============================================================================================
104// Globals
105//=============================================================================================
106struct ServerCluster {
107 cluster self;
108 processor * procs;
109 // io_context * ctxs;
110 StatsPrinter * prnt;
111
112};
113
114void ?{}( ServerCluster & this ) {
115 (this.self){ "Server Cluster", options.clopts.params };
116
117 cpu_set_t fullset;
118 CPU_ZERO(&fullset);
119 int ret = sched_getaffinity(getpid(), sizeof(fullset), &fullset);
120 if( ret != 0 ) abort | "sched_getaffinity failed with" | errno | strerror( errno );
121 int cnt = CPU_COUNT(&fullset);
122
123 this.procs = alloc(options.clopts.nprocs);
124 for(i; options.clopts.nprocs) {
125 (this.procs[i]){ "Benchmark Processor", this.self };
126
127 int c = 0;
128 int n = 1 + (i % cnt);
129 for(int j = 0; j < CPU_SETSIZE; j++) {
130 if(CPU_ISSET(j, &fullset)) n--;
131 if(n == 0) {
132 c = j;
133 break;
134 }
135 }
136 cpu_set_t localset;
137 CPU_ZERO(&localset);
138 CPU_SET(c, &localset);
139 ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
140 if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
141
142 #if !defined(__CFA_NO_STATISTICS__)
143 if( options.clopts.procstats ) {
144 print_stats_at_exit( *this.procs, this.self.print_stats );
145 }
146 if( options.clopts.viewhalts ) {
147 print_halts( *this.procs );
148 }
149 #endif
150 }
151
152 if(options.stats) {
153 this.prnt = alloc();
154 (*this.prnt){ this.self };
155 } else {
156 this.prnt = 0p;
157 }
158
159 #if !defined(__CFA_NO_STATISTICS__)
160 print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
161 #endif
162
163 options.clopts.instance[options.clopts.cltr_cnt] = &this.self;
164 options.clopts.cltr_cnt++;
165}
166
167void ^?{}( ServerCluster & this ) {
168 delete(this.prnt);
169
170 for(i; options.clopts.nprocs) {
171 ^(this.procs[i]){};
172 }
173 free(this.procs);
174
175 ^(this.self){};
176}
177
178extern void init_protocol(void);
179extern void deinit_protocol(void);
180
181//=============================================================================================
182// Termination
183//=============================================================================================
184
185int closefd;
186void cleanstop(int) {
187 eventfd_t buffer = 1;
188 char * buffer_s = (char*)&buffer;
189 int ret = write(closefd, buffer_s, sizeof(buffer));
190 if(ret < 0) abort( "eventfd write error: (%d) %s\n", (int)errno, strerror(errno) );
191 return;
192}
193
194//=============================================================================================
195// Main
196//============================================================================================='
197int main( int argc, char * argv[] ) {
198 int ret;
199 __sighandler_t s = 1p;
200 signal(SIGPIPE, s);
201
202 //===================
203 // Parse args
204 parse_options(argc, argv);
205
206 //===================
207 // Setup non-interactive termination
208 if(!options.interactive) {
209 closefd = eventfd(0, 0);
210 if(closefd < 0) abort( "eventfd error: (%d) %s\n", (int)errno, strerror(errno) );
211
212 sighandler_t prev = signal(SIGTERM, cleanstop);
213 intptr_t prev_workaround = (intptr_t) prev;
214 // can't use SIG_ERR it crashes the compiler
215 if(prev_workaround == -1) abort( "signal setup error: (%d) %s\n", (int)errno, strerror(errno) );
216
217 sout | "Signal termination ready";
218 }
219
220 //===================
221 // Open Files
222 if( options.file_cache.path ) {
223 sout | "Filling cache from" | options.file_cache.path;
224 fill_cache( options.file_cache.path );
225 }
226
227 //===================
228 // Open Socket
229 sout | getpid() | ": Listening on port" | options.socket.port;
230
231 struct sockaddr_in address;
232 int addrlen = prepaddr(address);
233
234 int server_fd = listener(address, addrlen);
235
236 //===================
237 // Run Server Cluster
238 {
239 int pipe_cnt = options.clopts.nworkers * 2;
240 int pipe_off;
241 int * fds;
242 [fds, pipe_off] = filefds( pipe_cnt );
243 for(i; 0 ~ pipe_cnt ~ 2) {
244 int ret = pipe(&fds[pipe_off + i]);
245 if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); }
246 }
247
248 // if(options.file_cache.path && options.file_cache.fixed_fds) {
249 // register_fixed_files(cl, fds, pipe_off);
250 // }
251
252 {
253 // Stats printer makes a copy so this needs to persist longer than normal
254 Worker * workers;
255 ServerCluster cl[options.clopts.nclusters];
256
257 init_protocol();
258 {
259 workers = anew(options.clopts.nworkers);
260 cl[0].prnt->workers = workers;
261 cl[0].prnt->worker_cnt = options.clopts.nworkers;
262 for(i; options.clopts.nworkers) {
263 // if( options.file_cache.fixed_fds ) {
264 // workers[i].pipe[0] = pipe_off + (i * 2) + 0;
265 // workers[i].pipe[1] = pipe_off + (i * 2) + 1;
266 // }
267 // else
268 {
269 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0];
270 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1];
271 workers[i].sockfd = server_fd;
272 workers[i].addr = (struct sockaddr *)&address;
273 workers[i].addrlen = (socklen_t*)&addrlen;
274 workers[i].flags = 0;
275 }
276 unpark( workers[i] );
277 }
278 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
279 for(i; options.clopts.nclusters) {
280 sout | options.clopts.thrd_cnt[i] | nonl;
281 }
282 sout | nl;
283 {
284 if(options.interactive) {
285 char buffer[128];
286 for() {
287 int ret = cfa_read(0, buffer, 128, 0);
288 if(ret == 0) break;
289 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
290 sout | "User wrote '" | "" | nonl;
291 write(sout, buffer, ret - 1);
292 sout | "'";
293 }
294 }
295 else {
296 char buffer[sizeof(eventfd_t)];
297 int ret = cfa_read(closefd, buffer, sizeof(eventfd_t), 0);
298 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
299 }
300
301 sout | "Shutdown received";
302 }
303
304 sout | "Notifying connections..." | nonl; flush( sout );
305 for(i; options.clopts.nworkers) {
306 workers[i].done = true;
307 }
308 sout | "done";
309
310 sout | "Shutting down socket..." | nonl; flush( sout );
311 int ret = shutdown( server_fd, SHUT_RD );
312 if( ret < 0 ) {
313 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
314 }
315 sout | "done";
316
317 //===================
318 // Close Socket
319 sout | "Closing Socket..." | nonl; flush( sout );
320 ret = close( server_fd );
321 if(ret < 0) {
322 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
323 }
324 sout | "done";
325
326 sout | "Stopping connection threads..." | nonl; flush( sout );
327 for(i; options.clopts.nworkers) {
328 for(j; 2) {
329 ret = close(workers[i].pipe[j]);
330 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
331 }
332 join(workers[i]);
333 }
334 }
335 sout | "done";
336
337 sout | "Stopping protocol threads..." | nonl; flush( sout );
338 deinit_protocol();
339 sout | "done";
340
341 sout | "Stopping printer threads..." | nonl; flush( sout );
342 for(i; options.clopts.nclusters) {
343 StatsPrinter * p = cl[i].prnt;
344 if(p) join(*p);
345 }
346 sout | "done";
347
348 // Now that the stats printer is stopped, we can reclaim this
349 adelete(workers);
350
351 sout | "Stopping processors/clusters..." | nonl; flush( sout );
352 }
353 sout | "done";
354
355 // sout | "Closing splice fds..." | nonl; flush( sout );
356 // for(i; pipe_cnt) {
357 // ret = close( fds[pipe_off + i] );
358 // if(ret < 0) {
359 // abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );
360 // }
361 // }
362 free(fds);
363 sout | "done";
364
365 sout | "Stopping processors..." | nonl; flush( sout );
366 }
367 sout | "done";
368
369 //===================
370 // Close Files
371 if( options.file_cache.path ) {
372 sout | "Closing open files..." | nonl; flush( sout );
373 close_cache();
374 sout | "done";
375 }
376}
377
378const size_t zipf_sizes[] = { 102, 204, 307, 409, 512, 614, 716, 819, 921, 1024, 2048, 3072, 4096, 5120, 6144, 7168, 8192, 9216, 10240, 20480, 30720, 40960, 51200, 61440, 71680, 81920, 92160, 102400, 204800, 307200, 409600, 512000, 614400, 716800, 819200, 921600 };
379static_assert(zipf_cnts == sizeof(zipf_sizes) / sizeof(zipf_sizes[0]));
Note: See TracBrowser for help on using the repository browser.