source: benchmark/io/http/main.cfa@ 6e2b04e

ADT ast-experimental pthread-emulation qualifiedEnum
Last change on this file since 6e2b04e was 86c12d65, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Checkpoint of the broken version of reuseport

  • Property mode set to 100644
File size: 11.0 KB
RevLine 
[1e8b4b49]1#define _GNU_SOURCE
[0aec496]2
3#include <errno.h>
[153570d]4#include <signal.h>
[0aec496]5#include <stdio.h>
6#include <string.h>
7#include <unistd.h>
8extern "C" {
[6117fc0]9 #include <sched.h>
[c2df3031]10 #include <signal.h>
[153570d]11 #include <sys/eventfd.h>
[0aec496]12 #include <sys/socket.h>
13 #include <netinet/in.h>
14}
15
[8c43d05]16#include <fstream.hfa>
[0aec496]17#include <kernel.hfa>
[ce98816]18#include <locks.hfa>
[153dc387]19#include <iofwd.hfa>
[0aec496]20#include <stats.hfa>
[d11d6eb]21#include <time.hfa>
[0aec496]22#include <thread.hfa>
23
24#include "filecache.hfa"
25#include "options.hfa"
[c4b10e2]26#include "socket.hfa"
[0aec496]27#include "worker.hfa"
28
[d11d6eb]29extern void register_fixed_files( cluster &, int *, unsigned count );
30
31Duration default_preemption() {
32 return 0;
33}
34
[153dc387]35//=============================================================================================
36// Stats Printer
37//============================================================================================='
38
[ef3c383]39thread StatsPrinter {
40 Worker * workers;
41 int worker_cnt;
[ce98816]42 condition_variable(fast_block_lock) var;
[ef3c383]43};
[153dc387]44
[348f81d5]45void ?{}( StatsPrinter & this, cluster & cl ) {
46 ((thread&)this){ "Stats Printer Thread", cl };
[ef3c383]47 this.worker_cnt = 0;
[153dc387]48}
49
[2cd784a]50void ^?{}( StatsPrinter & mutex this ) {}
51
[ef3c383]52#define eng3(X) (ws(3, 3, unit(eng( X ))))
53
[153dc387]54void main(StatsPrinter & this) {
55 LOOP: for() {
56 waitfor( ^?{} : this) {
57 break LOOP;
58 }
59 or else {}
60
[ce98816]61 wait(this.var, 10`s);
[153dc387]62
[348f81d5]63 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
[ef3c383]64 if(this.worker_cnt != 0) {
65 uint64_t tries = 0;
66 uint64_t calls = 0;
67 uint64_t header = 0;
68 uint64_t splcin = 0;
69 uint64_t splcot = 0;
70 struct {
71 volatile uint64_t calls;
72 volatile uint64_t bytes;
73 } avgrd[zipf_cnts];
74 memset(avgrd, 0, sizeof(avgrd));
75
76 for(i; this.worker_cnt) {
77 tries += this.workers[i].stats.sendfile.tries;
78 calls += this.workers[i].stats.sendfile.calls;
79 header += this.workers[i].stats.sendfile.header;
80 splcin += this.workers[i].stats.sendfile.splcin;
81 splcot += this.workers[i].stats.sendfile.splcot;
82 for(j; zipf_cnts) {
83 avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls;
84 avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes;
85 }
86 }
87
88 double ratio = ((double)tries) / calls;
89
90 sout | "----- Worker Stats -----";
91 sout | "sendfile : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";
92 sout | " " | header | "header," | splcin | "splice in," | splcot | "splice out";
93 sout | " - zipf sizes:";
94 for(i; zipf_cnts) {
95 double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;
96 sout | " " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";
97 }
98 }
99 else {
100 sout | "No Workers!";
101 }
[348f81d5]102 }
103}
104
105//=============================================================================================
106// Globals
107//=============================================================================================
108struct ServerCluster {
109 cluster self;
110 processor * procs;
[2cd784a]111 // io_context * ctxs;
[348f81d5]112 StatsPrinter * prnt;
113
114};
115
116void ?{}( ServerCluster & this ) {
117 (this.self){ "Server Cluster", options.clopts.params };
118
[6117fc0]119 cpu_set_t fullset;
120 CPU_ZERO(&fullset);
121 int ret = sched_getaffinity(getpid(), sizeof(fullset), &fullset);
122 if( ret != 0 ) abort | "sched_getaffinity failed with" | errno | strerror( errno );
123 int cnt = CPU_COUNT(&fullset);
124
[348f81d5]125 this.procs = alloc(options.clopts.nprocs);
126 for(i; options.clopts.nprocs) {
127 (this.procs[i]){ "Benchmark Processor", this.self };
128
[6117fc0]129 int c = 0;
130 int n = 1 + (i % cnt);
131 for(int j = 0; j < CPU_SETSIZE; j++) {
132 if(CPU_ISSET(j, &fullset)) n--;
133 if(n == 0) {
134 c = j;
135 break;
136 }
137 }
138 cpu_set_t localset;
139 CPU_ZERO(&localset);
140 CPU_SET(c, &localset);
141 ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
142 if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
143
[348f81d5]144 #if !defined(__CFA_NO_STATISTICS__)
145 if( options.clopts.procstats ) {
146 print_stats_at_exit( *this.procs, this.self.print_stats );
147 }
148 if( options.clopts.viewhalts ) {
149 print_halts( *this.procs );
150 }
151 #endif
152 }
153
[2cd784a]154 if(options.stats) {
155 this.prnt = alloc();
156 (*this.prnt){ this.self };
157 } else {
158 this.prnt = 0p;
[348f81d5]159 }
160
161 #if !defined(__CFA_NO_STATISTICS__)
162 print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
163 #endif
164
165 options.clopts.instance[options.clopts.cltr_cnt] = &this.self;
166 options.clopts.cltr_cnt++;
[153dc387]167}
168
[348f81d5]169void ^?{}( ServerCluster & this ) {
[2cd784a]170 delete(this.prnt);
[348f81d5]171
172 for(i; options.clopts.nprocs) {
173 ^(this.procs[i]){};
174 }
175 free(this.procs);
176
177 ^(this.self){};
178}
179
180extern void init_protocol(void);
181extern void deinit_protocol(void);
182
[153570d]183//=============================================================================================
184// Termination
185//=============================================================================================
186
187int closefd;
188void cleanstop(int) {
189 eventfd_t buffer = 1;
190 char * buffer_s = (char*)&buffer;
191 int ret = write(closefd, buffer_s, sizeof(buffer));
192 if(ret < 0) abort( "eventfd write error: (%d) %s\n", (int)errno, strerror(errno) );
193 return;
194}
195
[0aec496]196//=============================================================================================
197// Main
198//============================================================================================='
199int main( int argc, char * argv[] ) {
[c4b10e2]200 int ret;
[c2df3031]201 __sighandler_t s = 1p;
202 signal(SIGPIPE, s);
203
[0aec496]204 //===================
205 // Parse args
[b57db73]206 parse_options(argc, argv);
[0aec496]207
[153570d]208 //===================
209 // Setup non-interactive termination
210 if(!options.interactive) {
211 closefd = eventfd(0, 0);
212 if(closefd < 0) abort( "eventfd error: (%d) %s\n", (int)errno, strerror(errno) );
213
214 sighandler_t prev = signal(SIGTERM, cleanstop);
215 intptr_t prev_workaround = (intptr_t) prev;
216 // can't use SIG_ERR it crashes the compiler
217 if(prev_workaround == -1) abort( "signal setup error: (%d) %s\n", (int)errno, strerror(errno) );
218
219 sout | "Signal termination ready";
220 }
221
[0aec496]222 //===================
223 // Open Files
[b57db73]224 if( options.file_cache.path ) {
225 sout | "Filling cache from" | options.file_cache.path;
226 fill_cache( options.file_cache.path );
227 }
[0aec496]228
229 //===================
230 // Open Socket
[8c43d05]231 sout | getpid() | ": Listening on port" | options.socket.port;
[0aec496]232
233 struct sockaddr_in address;
[c4b10e2]234 int addrlen = prepaddr(address);
[0aec496]235
[86c12d65]236 int server_fd;
237 if(!options.socket.manyreuse) {
238 server_fd = listener(address, addrlen);
239 }
[0aec496]240
241 //===================
242 // Run Server Cluster
243 {
[d9c2284]244 int pipe_cnt = options.clopts.nworkers * 2;
245 int pipe_off;
246 int * fds;
247 [fds, pipe_off] = filefds( pipe_cnt );
248 for(i; 0 ~ pipe_cnt ~ 2) {
249 int ret = pipe(&fds[pipe_off + i]);
250 if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); }
251 }
252
[4f762d3]253 // if(options.file_cache.path && options.file_cache.fixed_fds) {
254 // register_fixed_files(cl, fds, pipe_off);
255 // }
[d11d6eb]256
[0aec496]257 {
[153570d]258 // Stats printer makes a copy so this needs to persist longer than normal
259 Worker * workers;
[348f81d5]260 ServerCluster cl[options.clopts.nclusters];
[ece0e80]261
262 init_protocol();
[0aec496]263 {
[153570d]264 workers = anew(options.clopts.nworkers);
[ef3c383]265 cl[0].prnt->workers = workers;
266 cl[0].prnt->worker_cnt = options.clopts.nworkers;
[d9c2284]267 for(i; options.clopts.nworkers) {
[d11d6eb]268 // if( options.file_cache.fixed_fds ) {
269 // workers[i].pipe[0] = pipe_off + (i * 2) + 0;
270 // workers[i].pipe[1] = pipe_off + (i * 2) + 1;
271 // }
272 // else
273 {
[d9c2284]274 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0];
275 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1];
[86c12d65]276 workers[i].sockfd = options.socket.manyreuse ? listener(address, addrlen) : server_fd;
[8e3034d]277 workers[i].addr = (struct sockaddr *)&address;
278 workers[i].addrlen = (socklen_t*)&addrlen;
279 workers[i].flags = 0;
[d9c2284]280 }
[e235429]281 unpark( workers[i] );
[d9c2284]282 }
[348f81d5]283 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
284 for(i; options.clopts.nclusters) {
285 sout | options.clopts.thrd_cnt[i] | nonl;
286 }
287 sout | nl;
[0aec496]288 {
[153570d]289 if(options.interactive) {
290 char buffer[128];
291 for() {
292 int ret = cfa_read(0, buffer, 128, 0);
293 if(ret == 0) break;
294 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
295 sout | "User wrote '" | "" | nonl;
296 write(sout, buffer, ret - 1);
297 sout | "'";
298 }
299 }
300 else {
301 char buffer[sizeof(eventfd_t)];
302 int ret = cfa_read(closefd, buffer, sizeof(eventfd_t), 0);
[153dc387]303 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
[e95a117]304 }
305
[8c43d05]306 sout | "Shutdown received";
[0aec496]307 }
[ece0e80]308
[b57db73]309 sout | "Notifying connections..." | nonl; flush( sout );
[ece0e80]310 for(i; options.clopts.nworkers) {
[481ee28]311 workers[i].done = true;
[ece0e80]312 }
[b57db73]313 sout | "done";
[ece0e80]314
[b57db73]315 sout | "Shutting down socket..." | nonl; flush( sout );
[86c12d65]316 if(options.socket.manyreuse) {
317 for(i; options.clopts.nworkers) {
318 ret = shutdown( workers[i].sockfd, SHUT_RD );
319 if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) );
320 }
321 }
322 else {
323 ret = shutdown( server_fd, SHUT_RD );
324 if( ret < 0 ) {
325 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
326 }
[b57db73]327 }
328 sout | "done";
[ece0e80]329
330 //===================
331 // Close Socket
[b57db73]332 sout | "Closing Socket..." | nonl; flush( sout );
[86c12d65]333 if(options.socket.manyreuse) {
334 for(i; options.clopts.nworkers) {
335 ret = close(workers[i].sockfd);
336 if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) );
337 }
338 }
339 else {
340 ret = close( server_fd );
341 if(ret < 0) {
342 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
343 }
[ece0e80]344 }
[0197418]345 sout | "done";
346
[b57db73]347 sout | "Stopping connection threads..." | nonl; flush( sout );
[153570d]348 for(i; options.clopts.nworkers) {
[c4b10e2]349 for(j; 2) {
350 ret = close(workers[i].pipe[j]);
351 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
352 }
[153570d]353 join(workers[i]);
354 }
[0aec496]355 }
[8c43d05]356 sout | "done";
[ece0e80]357
[b57db73]358 sout | "Stopping protocol threads..." | nonl; flush( sout );
[ece0e80]359 deinit_protocol();
[8c43d05]360 sout | "done";
361
[153570d]362 sout | "Stopping printer threads..." | nonl; flush( sout );
363 for(i; options.clopts.nclusters) {
364 StatsPrinter * p = cl[i].prnt;
[ce98816]365 if(p) {
366 notify_one(p->var);
367 join(*p);
368 }
[153570d]369 }
370 sout | "done";
371
372 // Now that the stats printer is stopped, we can reclaim this
373 adelete(workers);
374
[348f81d5]375 sout | "Stopping processors/clusters..." | nonl; flush( sout );
[0aec496]376 }
[8c43d05]377 sout | "done";
[d9c2284]378
[c4b10e2]379 // sout | "Closing splice fds..." | nonl; flush( sout );
380 // for(i; pipe_cnt) {
381 // ret = close( fds[pipe_off + i] );
382 // if(ret < 0) {
383 // abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );
384 // }
385 // }
[d9c2284]386 free(fds);
[8c43d05]387 sout | "done";
[c3ee5f3]388
[b57db73]389 sout | "Stopping processors..." | nonl; flush( sout );
[0aec496]390 }
[8c43d05]391 sout | "done";
[0aec496]392
393 //===================
394 // Close Files
[b57db73]395 if( options.file_cache.path ) {
396 sout | "Closing open files..." | nonl; flush( sout );
397 close_cache();
398 sout | "done";
399 }
[ef3c383]400}
401
402const size_t zipf_sizes[] = { 102, 204, 307, 409, 512, 614, 716, 819, 921, 1024, 2048, 3072, 4096, 5120, 6144, 7168, 8192, 9216, 10240, 20480, 30720, 40960, 51200, 61440, 71680, 81920, 92160, 102400, 204800, 307200, 409600, 512000, 614400, 716800, 819200, 921600 };
403static_assert(zipf_cnts == sizeof(zipf_sizes) / sizeof(zipf_sizes[0]));
Note: See TracBrowser for help on using the repository browser.