source: benchmark/io/http/main.cfa @ c4b10e2

pthread-emulationqualifiedEnum
Last change on this file since c4b10e2 was c4b10e2, checked in by Thierry Delisle <tdelisle@…>, 6 months ago

Moved socket creation to a different file.
Makefile now has debug symbol.
Pipes are now closed earlier for a cleaner exit.

  • Property mode set to 100644
File size: 10.3 KB
Line 
1#define _GNU_SOURCE
2
3#include <errno.h>
4#include <signal.h>
5#include <stdio.h>
6#include <string.h>
7#include <unistd.h>
8extern "C" {
9        #include <sched.h>
10        #include <signal.h>
11        #include <sys/eventfd.h>
12        #include <sys/socket.h>
13        #include <netinet/in.h>
14}
15
16#include <fstream.hfa>
17#include <kernel.hfa>
18#include <iofwd.hfa>
19#include <stats.hfa>
20#include <time.hfa>
21#include <thread.hfa>
22
23#include "filecache.hfa"
24#include "options.hfa"
25#include "socket.hfa"
26#include "worker.hfa"
27
28extern void register_fixed_files( cluster &, int *, unsigned count );
29
30Duration default_preemption() {
31        return 0;
32}
33
34//=============================================================================================
35// Stats Printer
36//============================================================================================='
37
38thread StatsPrinter {
39        Worker * workers;
40        int worker_cnt;
41};
42
43void ?{}( StatsPrinter & this, cluster & cl ) {
44        ((thread&)this){ "Stats Printer Thread", cl };
45        this.worker_cnt = 0;
46}
47
48void ^?{}( StatsPrinter & mutex this ) {}
49
50#define eng3(X) (ws(3, 3, unit(eng( X ))))
51
52void main(StatsPrinter & this) {
53        LOOP: for() {
54                waitfor( ^?{} : this) {
55                        break LOOP;
56                }
57                or else {}
58
59                sleep(10`s);
60
61                print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
62                if(this.worker_cnt != 0) {
63                        uint64_t tries = 0;
64                        uint64_t calls = 0;
65                        uint64_t header = 0;
66                        uint64_t splcin = 0;
67                        uint64_t splcot = 0;
68                        struct {
69                                volatile uint64_t calls;
70                                volatile uint64_t bytes;
71                        } avgrd[zipf_cnts];
72                        memset(avgrd, 0, sizeof(avgrd));
73
74                        for(i; this.worker_cnt) {
75                                tries += this.workers[i].stats.sendfile.tries;
76                                calls += this.workers[i].stats.sendfile.calls;
77                                header += this.workers[i].stats.sendfile.header;
78                                splcin += this.workers[i].stats.sendfile.splcin;
79                                splcot += this.workers[i].stats.sendfile.splcot;
80                                for(j; zipf_cnts) {
81                                        avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls;
82                                        avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes;
83                                }
84                        }
85
86                        double ratio = ((double)tries) / calls;
87
88                        sout | "----- Worker Stats -----";
89                        sout | "sendfile  : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";
90                        sout | "            " | header | "header," | splcin | "splice in," | splcot | "splice out";
91                        sout | " - zipf sizes:";
92                        for(i; zipf_cnts) {
93                                double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;
94                                sout | "        " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";
95                        }
96                }
97                else {
98                        sout | "No Workers!";
99                }
100        }
101}
102
103//=============================================================================================
104// Globals
105//=============================================================================================
106struct ServerCluster {
107        cluster self;
108        processor    * procs;
109        // io_context   * ctxs;
110        StatsPrinter * prnt;
111
112};
113
114void ?{}( ServerCluster & this ) {
115        (this.self){ "Server Cluster", options.clopts.params };
116
117        cpu_set_t fullset;
118        CPU_ZERO(&fullset);
119        int ret = sched_getaffinity(getpid(), sizeof(fullset), &fullset);
120        if( ret != 0 ) abort | "sched_getaffinity failed with" | errno | strerror( errno );
121        int cnt = CPU_COUNT(&fullset);
122
123        this.procs = alloc(options.clopts.nprocs);
124        for(i; options.clopts.nprocs) {
125                (this.procs[i]){ "Benchmark Processor", this.self };
126
127                int c = 0;
128                int n = 1 + (i % cnt);
129                for(int j = 0; j < CPU_SETSIZE; j++) {
130                        if(CPU_ISSET(j, &fullset)) n--;
131                        if(n == 0) {
132                                c = j;
133                                break;
134                        }
135                }
136                cpu_set_t localset;
137                CPU_ZERO(&localset);
138                CPU_SET(c, &localset);
139                ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
140                if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
141
142                #if !defined(__CFA_NO_STATISTICS__)
143                        if( options.clopts.procstats ) {
144                                print_stats_at_exit( *this.procs, this.self.print_stats );
145                        }
146                        if( options.clopts.viewhalts ) {
147                                print_halts( *this.procs );
148                        }
149                #endif
150        }
151
152        if(options.stats) {
153                this.prnt = alloc();
154                (*this.prnt){ this.self };
155        } else {
156                this.prnt = 0p;
157        }
158
159        #if !defined(__CFA_NO_STATISTICS__)
160                print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
161        #endif
162
163        options.clopts.instance[options.clopts.cltr_cnt] = &this.self;
164        options.clopts.cltr_cnt++;
165}
166
167void ^?{}( ServerCluster & this ) {
168        delete(this.prnt);
169
170        for(i; options.clopts.nprocs) {
171                ^(this.procs[i]){};
172        }
173        free(this.procs);
174
175        ^(this.self){};
176}
177
178extern void init_protocol(void);
179extern void deinit_protocol(void);
180
181//=============================================================================================
182// Termination
183//=============================================================================================
184
185int closefd;
186void cleanstop(int) {
187        eventfd_t buffer = 1;
188        char * buffer_s = (char*)&buffer;
189        int ret = write(closefd, buffer_s, sizeof(buffer));
190        if(ret < 0) abort( "eventfd write error: (%d) %s\n", (int)errno, strerror(errno) );
191        return;
192}
193
194//=============================================================================================
195// Main
196//============================================================================================='
197int main( int argc, char * argv[] ) {
198        int ret;
199        __sighandler_t s = 1p;
200        signal(SIGPIPE, s);
201
202        //===================
203        // Parse args
204        parse_options(argc, argv);
205
206        //===================
207        // Setup non-interactive termination
208        if(!options.interactive) {
209                closefd = eventfd(0, 0);
210                if(closefd < 0) abort( "eventfd error: (%d) %s\n", (int)errno, strerror(errno) );
211
212                sighandler_t prev = signal(SIGTERM, cleanstop);
213                intptr_t prev_workaround = (intptr_t) prev;
214                // can't use SIG_ERR it crashes the compiler
215                if(prev_workaround == -1) abort( "signal setup error: (%d) %s\n", (int)errno, strerror(errno) );
216
217                sout | "Signal termination ready";
218        }
219
220        //===================
221        // Open Files
222        if( options.file_cache.path ) {
223                sout | "Filling cache from" | options.file_cache.path;
224                fill_cache( options.file_cache.path );
225        }
226
227        //===================
228        // Open Socket
229        sout | getpid() | ": Listening on port" | options.socket.port;
230
231        struct sockaddr_in address;
232        int addrlen = prepaddr(address);
233
234        int server_fd = listener(address, addrlen);
235
236        //===================
237        // Run Server Cluster
238        {
239                int pipe_cnt = options.clopts.nworkers * 2;
240                int pipe_off;
241                int * fds;
242                [fds, pipe_off] = filefds( pipe_cnt );
243                for(i; 0 ~ pipe_cnt ~ 2) {
244                        int ret = pipe(&fds[pipe_off + i]);
245                        if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); }
246                }
247
248                // if(options.file_cache.path && options.file_cache.fixed_fds) {
249                //      register_fixed_files(cl, fds, pipe_off);
250                // }
251
252                {
253                        // Stats printer makes a copy so this needs to persist longer than normal
254                        Worker * workers;
255                        ServerCluster cl[options.clopts.nclusters];
256
257                        init_protocol();
258                        {
259                                workers = anew(options.clopts.nworkers);
260                                cl[0].prnt->workers = workers;
261                                cl[0].prnt->worker_cnt = options.clopts.nworkers;
262                                for(i; options.clopts.nworkers) {
263                                        // if( options.file_cache.fixed_fds ) {
264                                        //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
265                                        //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
266                                        // }
267                                        // else
268                                        {
269                                                workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0];
270                                                workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1];
271                                                workers[i].sockfd  = server_fd;
272                                                workers[i].addr    = (struct sockaddr *)&address;
273                                                workers[i].addrlen = (socklen_t*)&addrlen;
274                                                workers[i].flags   = 0;
275                                        }
276                                        unpark( workers[i] );
277                                }
278                                sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
279                                for(i; options.clopts.nclusters) {
280                                        sout | options.clopts.thrd_cnt[i] | nonl;
281                                }
282                                sout | nl;
283                                {
284                                        if(options.interactive) {
285                                                char buffer[128];
286                                                for() {
287                                                        int ret = cfa_read(0, buffer, 128, 0);
288                                                        if(ret == 0) break;
289                                                        if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
290                                                        sout | "User wrote '" | "" | nonl;
291                                                        write(sout, buffer, ret - 1);
292                                                        sout | "'";
293                                                }
294                                        }
295                                        else {
296                                                char buffer[sizeof(eventfd_t)];
297                                                int ret = cfa_read(closefd, buffer, sizeof(eventfd_t), 0);
298                                                if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
299                                        }
300
301                                        sout | "Shutdown received";
302                                }
303
304                                sout | "Notifying connections..." | nonl; flush( sout );
305                                for(i; options.clopts.nworkers) {
306                                        workers[i].done = true;
307                                }
308                                sout | "done";
309
310                                sout | "Shutting down socket..." | nonl; flush( sout );
311                                int ret = shutdown( server_fd, SHUT_RD );
312                                if( ret < 0 ) {
313                                        abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
314                                }
315                                sout | "done";
316
317                                //===================
318                                // Close Socket
319                                sout | "Closing Socket..." | nonl; flush( sout );
320                                ret = close( server_fd );
321                                if(ret < 0) {
322                                        abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
323                                }
324                                sout | "done";
325
326                                sout | "Stopping connection threads..." | nonl; flush( sout );
327                                for(i; options.clopts.nworkers) {
328                                        for(j; 2) {
329                                                ret = close(workers[i].pipe[j]);
330                                                if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
331                                        }
332                                        join(workers[i]);
333                                }
334                        }
335                        sout | "done";
336
337                        sout | "Stopping protocol threads..." | nonl; flush( sout );
338                        deinit_protocol();
339                        sout | "done";
340
341                        sout | "Stopping printer threads..." | nonl; flush( sout );
342                        for(i; options.clopts.nclusters) {
343                                StatsPrinter * p = cl[i].prnt;
344                                if(p) join(*p);
345                        }
346                        sout | "done";
347
348                        // Now that the stats printer is stopped, we can reclaim this
349                        adelete(workers);
350
351                        sout | "Stopping processors/clusters..." | nonl; flush( sout );
352                }
353                sout | "done";
354
355                // sout | "Closing splice fds..." | nonl; flush( sout );
356                // for(i; pipe_cnt) {
357                //      ret = close( fds[pipe_off + i] );
358                //      if(ret < 0) {
359                //              abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );
360                //      }
361                // }
362                free(fds);
363                sout | "done";
364
365                sout | "Stopping processors..." | nonl; flush( sout );
366        }
367        sout | "done";
368
369        //===================
370        // Close Files
371        if( options.file_cache.path ) {
372                sout | "Closing open files..." | nonl; flush( sout );
373                close_cache();
374                sout | "done";
375        }
376}
377
378const size_t zipf_sizes[] = { 102, 204, 307, 409, 512, 614, 716, 819, 921, 1024, 2048, 3072, 4096, 5120, 6144, 7168, 8192, 9216, 10240, 20480, 30720, 40960, 51200, 61440, 71680, 81920, 92160, 102400, 204800, 307200, 409600, 512000, 614400, 716800, 819200, 921600 };
379static_assert(zipf_cnts == sizeof(zipf_sizes) / sizeof(zipf_sizes[0]));
Note: See TracBrowser for help on using the repository browser.