source: benchmark/io/http/main.cfa @ 153570d

ADTast-experimentalpthread-emulationqualifiedEnum
Last change on this file since 153570d was 153570d, checked in by Thierry Delisle <tdelisle@…>, 2 years ago

Server now cleanly closes in non interactive mode

  • Property mode set to 100644
File size: 11.1 KB
Line 
1#define _GNU_SOURCE
2
3#include <errno.h>
4#include <signal.h>
5#include <stdio.h>
6#include <string.h>
7#include <unistd.h>
8extern "C" {
9        #include <sched.h>
10        #include <signal.h>
11        #include <sys/eventfd.h>
12        #include <sys/socket.h>
13        #include <netinet/in.h>
14}
15
16#include <fstream.hfa>
17#include <kernel.hfa>
18#include <iofwd.hfa>
19#include <stats.hfa>
20#include <time.hfa>
21#include <thread.hfa>
22
23#include "filecache.hfa"
24#include "options.hfa"
25#include "worker.hfa"
26
27extern void register_fixed_files( cluster &, int *, unsigned count );
28
29Duration default_preemption() {
30        return 0;
31}
32
33//=============================================================================================
34// Stats Printer
35//============================================================================================='
36
37thread StatsPrinter {
38        Worker * workers;
39        int worker_cnt;
40};
41
42void ?{}( StatsPrinter & this, cluster & cl ) {
43        ((thread&)this){ "Stats Printer Thread", cl };
44        this.worker_cnt = 0;
45}
46
47void ^?{}( StatsPrinter & mutex this ) {}
48
49#define eng3(X) (ws(3, 3, unit(eng( X ))))
50
51void main(StatsPrinter & this) {
52        LOOP: for() {
53                waitfor( ^?{} : this) {
54                        break LOOP;
55                }
56                or else {}
57
58                sleep(10`s);
59
60                print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
61                if(this.worker_cnt != 0) {
62                        uint64_t tries = 0;
63                        uint64_t calls = 0;
64                        uint64_t header = 0;
65                        uint64_t splcin = 0;
66                        uint64_t splcot = 0;
67                        struct {
68                                volatile uint64_t calls;
69                                volatile uint64_t bytes;
70                        } avgrd[zipf_cnts];
71                        memset(avgrd, 0, sizeof(avgrd));
72
73                        for(i; this.worker_cnt) {
74                                tries += this.workers[i].stats.sendfile.tries;
75                                calls += this.workers[i].stats.sendfile.calls;
76                                header += this.workers[i].stats.sendfile.header;
77                                splcin += this.workers[i].stats.sendfile.splcin;
78                                splcot += this.workers[i].stats.sendfile.splcot;
79                                for(j; zipf_cnts) {
80                                        avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls;
81                                        avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes;
82                                }
83                        }
84
85                        double ratio = ((double)tries) / calls;
86
87                        sout | "----- Worker Stats -----";
88                        sout | "sendfile  : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";
89                        sout | "            " | header | "header," | splcin | "splice in," | splcot | "splice out";
90                        sout | " - zipf sizes:";
91                        for(i; zipf_cnts) {
92                                double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;
93                                sout | "        " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";
94                        }
95                }
96                else {
97                        sout | "No Workers!";
98                }
99        }
100}
101
102//=============================================================================================
103// Globals
104//=============================================================================================
105struct ServerCluster {
106        cluster self;
107        processor    * procs;
108        // io_context   * ctxs;
109        StatsPrinter * prnt;
110
111};
112
113void ?{}( ServerCluster & this ) {
114        (this.self){ "Server Cluster", options.clopts.params };
115
116        cpu_set_t fullset;
117        CPU_ZERO(&fullset);
118        int ret = sched_getaffinity(getpid(), sizeof(fullset), &fullset);
119        if( ret != 0 ) abort | "sched_getaffinity failed with" | errno | strerror( errno );
120        int cnt = CPU_COUNT(&fullset);
121
122        this.procs = alloc(options.clopts.nprocs);
123        for(i; options.clopts.nprocs) {
124                (this.procs[i]){ "Benchmark Processor", this.self };
125
126                int c = 0;
127                int n = 1 + (i % cnt);
128                for(int j = 0; j < CPU_SETSIZE; j++) {
129                        if(CPU_ISSET(j, &fullset)) n--;
130                        if(n == 0) {
131                                c = j;
132                                break;
133                        }
134                }
135                cpu_set_t localset;
136                CPU_ZERO(&localset);
137                CPU_SET(c, &localset);
138                ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
139                if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
140
141                #if !defined(__CFA_NO_STATISTICS__)
142                        if( options.clopts.procstats ) {
143                                print_stats_at_exit( *this.procs, this.self.print_stats );
144                        }
145                        if( options.clopts.viewhalts ) {
146                                print_halts( *this.procs );
147                        }
148                #endif
149        }
150
151        if(options.stats) {
152                this.prnt = alloc();
153                (*this.prnt){ this.self };
154        } else {
155                this.prnt = 0p;
156        }
157
158        #if !defined(__CFA_NO_STATISTICS__)
159                print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
160        #endif
161
162        options.clopts.instance[options.clopts.cltr_cnt] = &this.self;
163        options.clopts.cltr_cnt++;
164}
165
166void ^?{}( ServerCluster & this ) {
167        delete(this.prnt);
168
169        for(i; options.clopts.nprocs) {
170                ^(this.procs[i]){};
171        }
172        free(this.procs);
173
174        ^(this.self){};
175}
176
177extern void init_protocol(void);
178extern void deinit_protocol(void);
179
180//=============================================================================================
181// Termination
182//=============================================================================================
183
184int closefd;
185void cleanstop(int) {
186        eventfd_t buffer = 1;
187        char * buffer_s = (char*)&buffer;
188        int ret = write(closefd, buffer_s, sizeof(buffer));
189        if(ret < 0) abort( "eventfd write error: (%d) %s\n", (int)errno, strerror(errno) );
190        return;
191}
192
193//=============================================================================================
194// Main
195//============================================================================================='
196int main( int argc, char * argv[] ) {
197        __sighandler_t s = 1p;
198        signal(SIGPIPE, s);
199
200        //===================
201        // Parse args
202        parse_options(argc, argv);
203
204        //===================
205        // Setup non-interactive termination
206        if(!options.interactive) {
207                closefd = eventfd(0, 0);
208                if(closefd < 0) abort( "eventfd error: (%d) %s\n", (int)errno, strerror(errno) );
209
210                sighandler_t prev = signal(SIGTERM, cleanstop);
211                intptr_t prev_workaround = (intptr_t) prev;
212                // can't use SIG_ERR it crashes the compiler
213                if(prev_workaround == -1) abort( "signal setup error: (%d) %s\n", (int)errno, strerror(errno) );
214
215                sout | "Signal termination ready";
216        }
217
218        //===================
219        // Open Files
220        if( options.file_cache.path ) {
221                sout | "Filling cache from" | options.file_cache.path;
222                fill_cache( options.file_cache.path );
223        }
224
225        //===================
226        // Open Socket
227        sout | getpid() | ": Listening on port" | options.socket.port;
228        int server_fd = socket(AF_INET, SOCK_STREAM, 0);
229        if(server_fd < 0) {
230                abort( "socket error: (%d) %s\n", (int)errno, strerror(errno) );
231        }
232
233        int ret = 0;
234        struct sockaddr_in address;
235        int addrlen = sizeof(address);
236        memset( (char *)&address, '\0' );
237        address.sin_family = AF_INET;
238        address.sin_addr.s_addr = htonl(INADDR_ANY);
239        address.sin_port = htons( options.socket.port );
240
241        int waited = 0;
242        for() {
243                int sockfd = server_fd;
244                __CONST_SOCKADDR_ARG addr;
245                addr.__sockaddr__ = (struct sockaddr *)&address;
246                socklen_t addrlen = sizeof(address);
247                ret = bind( sockfd, addr, addrlen );
248                if(ret < 0) {
249                        if(errno == EADDRINUSE) {
250                                if(waited == 0) {
251                                        if(!options.interactive) abort | "Port already in use in non-interactive mode. Aborting";
252                                        sout | "Waiting for port";
253                                } else {
254                                        sout | "\r" | waited | nonl;
255                                        flush( sout );
256                                }
257                                waited ++;
258                                sleep( 1`s );
259                                continue;
260                        }
261                        abort( "bind error: (%d) %s\n", (int)errno, strerror(errno) );
262                }
263                break;
264        }
265
266        ret = listen( server_fd, options.socket.backlog );
267        if(ret < 0) {
268                abort( "listen error: (%d) %s\n", (int)errno, strerror(errno) );
269        }
270
271        //===================
272        // Run Server Cluster
273        {
274                int pipe_cnt = options.clopts.nworkers * 2;
275                int pipe_off;
276                int * fds;
277                [fds, pipe_off] = filefds( pipe_cnt );
278                for(i; 0 ~ pipe_cnt ~ 2) {
279                        int ret = pipe(&fds[pipe_off + i]);
280                        if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); }
281                }
282
283                // if(options.file_cache.path && options.file_cache.fixed_fds) {
284                //      register_fixed_files(cl, fds, pipe_off);
285                // }
286
287                {
288                        // Stats printer makes a copy so this needs to persist longer than normal
289                        Worker * workers;
290                        ServerCluster cl[options.clopts.nclusters];
291
292                        init_protocol();
293                        {
294                                workers = anew(options.clopts.nworkers);
295                                cl[0].prnt->workers = workers;
296                                cl[0].prnt->worker_cnt = options.clopts.nworkers;
297                                for(i; options.clopts.nworkers) {
298                                        // if( options.file_cache.fixed_fds ) {
299                                        //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
300                                        //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
301                                        // }
302                                        // else
303                                        {
304                                                workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0];
305                                                workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1];
306                                                workers[i].sockfd  = server_fd;
307                                                workers[i].addr    = (struct sockaddr *)&address;
308                                                workers[i].addrlen = (socklen_t*)&addrlen;
309                                                workers[i].flags   = 0;
310                                        }
311                                        unpark( workers[i] );
312                                }
313                                sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
314                                for(i; options.clopts.nclusters) {
315                                        sout | options.clopts.thrd_cnt[i] | nonl;
316                                }
317                                sout | nl;
318                                {
319                                        if(options.interactive) {
320                                                char buffer[128];
321                                                for() {
322                                                        int ret = cfa_read(0, buffer, 128, 0);
323                                                        if(ret == 0) break;
324                                                        if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
325                                                        sout | "User wrote '" | "" | nonl;
326                                                        write(sout, buffer, ret - 1);
327                                                        sout | "'";
328                                                }
329                                        }
330                                        else {
331                                                char buffer[sizeof(eventfd_t)];
332                                                int ret = cfa_read(closefd, buffer, sizeof(eventfd_t), 0);
333                                                if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
334                                        }
335
336                                        sout | "Shutdown received";
337                                }
338
339                                sout | "Notifying connections..." | nonl; flush( sout );
340                                for(i; options.clopts.nworkers) {
341                                        workers[i].done = true;
342                                }
343                                sout | "done";
344
345                                sout | "Shutting down socket..." | nonl; flush( sout );
346                                int ret = shutdown( server_fd, SHUT_RD );
347                                if( ret < 0 ) {
348                                        abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
349                                }
350                                sout | "done";
351
352                                //===================
353                                // Close Socket
354                                sout | "Closing Socket..." | nonl; flush( sout );
355                                ret = close( server_fd );
356                                if(ret < 0) {
357                                        abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
358                                }
359                                sout | "done";
360
361                                sout | "Stopping connection threads..." | nonl; flush( sout );
362                                for(i; options.clopts.nworkers) {
363                                        join(workers[i]);
364                                }
365                        }
366                        sout | "done";
367
368                        sout | "Stopping protocol threads..." | nonl; flush( sout );
369                        deinit_protocol();
370                        sout | "done";
371
372                        sout | "Stopping printer threads..." | nonl; flush( sout );
373                        for(i; options.clopts.nclusters) {
374                                StatsPrinter * p = cl[i].prnt;
375                                if(p) join(*p);
376                        }
377                        sout | "done";
378
379                        // Now that the stats printer is stopped, we can reclaim this
380                        adelete(workers);
381
382                        sout | "Stopping processors/clusters..." | nonl; flush( sout );
383                }
384                sout | "done";
385
386                sout | "Closing splice fds..." | nonl; flush( sout );
387                for(i; pipe_cnt) {
388                        ret = close( fds[pipe_off + i] );
389                        if(ret < 0) {
390                                abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );
391                        }
392                }
393                free(fds);
394                sout | "done";
395
396                sout | "Stopping processors..." | nonl; flush( sout );
397        }
398        sout | "done";
399
400        //===================
401        // Close Files
402        if( options.file_cache.path ) {
403                sout | "Closing open files..." | nonl; flush( sout );
404                close_cache();
405                sout | "done";
406        }
407}
408
409const size_t zipf_sizes[] = { 102, 204, 307, 409, 512, 614, 716, 819, 921, 1024, 2048, 3072, 4096, 5120, 6144, 7168, 8192, 9216, 10240, 20480, 30720, 40960, 51200, 61440, 71680, 81920, 92160, 102400, 204800, 307200, 409600, 512000, 614400, 716800, 819200, 921600 };
410static_assert(zipf_cnts == sizeof(zipf_sizes) / sizeof(zipf_sizes[0]));
Note: See TracBrowser for help on using the repository browser.