source: benchmark/io/http/main.cfa @ 8c58e73

ADTast-experimentalpthread-emulationqualifiedEnum
Last change on this file since 8c58e73 was 8c58e73, checked in by Thierry Delisle <tdelisle@…>, 2 years ago

Removed webserver feature to have multiple clusters (it never actually worked)

  • Property mode set to 100644
File size: 13.0 KB
Line 
1#define _GNU_SOURCE
2
3#include <errno.h>
4#include <signal.h>
5#include <stdio.h>
6#include <string.h>
7#include <unistd.h>
8extern "C" {
9        #include <sched.h>
10        #include <signal.h>
11        #include <sys/eventfd.h>
12        #include <sys/socket.h>
13        #include <netinet/in.h>
14}
15
16#include <fstream.hfa>
17#include <kernel.hfa>
18#include <locks.hfa>
19#include <iofwd.hfa>
20#include <stats.hfa>
21#include <time.hfa>
22#include <thread.hfa>
23
24#include "filecache.hfa"
25#include "options.hfa"
26#include "socket.hfa"
27#include "worker.hfa"
28
29extern void register_fixed_files( cluster &, int *, unsigned count );
30
31Duration default_preemption() {
32        return 0;
33}
34
35//=============================================================================================
36// Stats Printer
37//============================================================================================='
38
39thread StatsPrinter {
40        connection ** conns;
41        volatile int conn_cnt;
42        condition_variable(fast_block_lock) var;
43};
44
45void ?{}( StatsPrinter & this, cluster & cl ) {
46        ((thread&)this){ "Stats Printer Thread", cl };
47        this.conn_cnt = 0;
48}
49
50void ^?{}( StatsPrinter & mutex this ) {}
51
52#define eng3(X) (ws(3, 3, unit(eng( X ))))
53
54void main(StatsPrinter & this) {
55        LOOP: for() {
56                waitfor( ^?{} : this) {
57                        break LOOP;
58                }
59                or else {}
60
61                wait(this.var, 10`s);
62
63                print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
64                if(this.conn_cnt != 0) {
65                        uint64_t tries = 0;
66                        uint64_t calls = 0;
67                        uint64_t header = 0;
68                        uint64_t splcin = 0;
69                        uint64_t splcot = 0;
70                        struct {
71                                volatile uint64_t calls;
72                                volatile uint64_t bytes;
73                        } avgrd[zipf_cnts];
74                        memset(avgrd, 0, sizeof(avgrd));
75
76                        for(i; this.conn_cnt) {
77                                tries += this.conns[i]->stats.sendfile.tries;
78                                calls += this.conns[i]->stats.sendfile.calls;
79                                header += this.conns[i]->stats.sendfile.header;
80                                splcin += this.conns[i]->stats.sendfile.splcin;
81                                splcot += this.conns[i]->stats.sendfile.splcot;
82                                for(j; zipf_cnts) {
83                                        avgrd[j].calls += this.conns[i]->stats.sendfile.avgrd[j].calls;
84                                        avgrd[j].bytes += this.conns[i]->stats.sendfile.avgrd[j].bytes;
85                                }
86                        }
87
88                        double ratio = ((double)tries) / calls;
89
90                        sout | "----- Connection Stats -----";
91                        sout | "sendfile  : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";
92                        sout | "            " | header | "header," | splcin | "splice in," | splcot | "splice out";
93                        sout | " - zipf sizes:";
94                        for(i; zipf_cnts) {
95                                double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;
96                                sout | "        " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";
97                        }
98                }
99                else {
100                        sout | "No Connections!";
101                }
102        }
103}
104
105//=============================================================================================
106// Globals
107//=============================================================================================
108struct ServerCluster {
109        cluster self;
110        processor    * procs;
111        // io_context   * ctxs;
112        StatsPrinter * prnt;
113
114};
115
116void ?{}( ServerCluster & this ) {
117        (this.self){ "Server Cluster", options.clopts.params };
118
119        cpu_set_t fullset;
120        CPU_ZERO(&fullset);
121        int ret = sched_getaffinity(getpid(), sizeof(fullset), &fullset);
122        if( ret != 0 ) abort | "sched_getaffinity failed with" | errno | strerror( errno );
123        int cnt = CPU_COUNT(&fullset);
124
125        this.procs = alloc(options.clopts.nprocs);
126        for(i; options.clopts.nprocs) {
127                (this.procs[i]){ "Benchmark Processor", this.self };
128
129                int c = 0;
130                int n = 1 + (i % cnt);
131                for(int j = 0; j < CPU_SETSIZE; j++) {
132                        if(CPU_ISSET(j, &fullset)) n--;
133                        if(n == 0) {
134                                c = j;
135                                break;
136                        }
137                }
138                cpu_set_t localset;
139                CPU_ZERO(&localset);
140                CPU_SET(c, &localset);
141                ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
142                if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
143
144                #if !defined(__CFA_NO_STATISTICS__)
145                        if( options.clopts.procstats ) {
146                                print_stats_at_exit( *this.procs, this.self.print_stats );
147                        }
148                        if( options.clopts.viewhalts ) {
149                                print_halts( *this.procs );
150                        }
151                #endif
152        }
153
154        if(options.stats) {
155                this.prnt = alloc();
156                (*this.prnt){ this.self };
157        } else {
158                this.prnt = 0p;
159        }
160
161        #if !defined(__CFA_NO_STATISTICS__)
162                print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
163        #endif
164
165        options.clopts.instance = &this.self;
166}
167
168void ^?{}( ServerCluster & this ) {
169        delete(this.prnt);
170
171        for(i; options.clopts.nprocs) {
172                ^(this.procs[i]){};
173        }
174        free(this.procs);
175
176        ^(this.self){};
177}
178
179extern void init_protocol(void);
180extern void deinit_protocol(void);
181
182//=============================================================================================
183// REUSEPORT
184//=============================================================================================
185
186size_t sockarr_size;
187struct __attribute__((aligned(128))) Q {
188        mpsc_queue(PendingRead) q;
189};
190
191//=============================================================================================
192// Termination
193//=============================================================================================
194
195int closefd;
196void cleanstop(int) {
197        eventfd_t buffer = 1;
198        char * buffer_s = (char*)&buffer;
199        int ret = write(closefd, buffer_s, sizeof(buffer));
200        if(ret < 0) abort( "eventfd write error: (%d) %s\n", (int)errno, strerror(errno) );
201        return;
202}
203
204//=============================================================================================
205// Main
206//============================================================================================='
207int main( int argc, char * argv[] ) {
208        int ret;
209        __sighandler_t s = 1p;
210        signal(SIGPIPE, s);
211
212        //===================
213        // Parse args
214        parse_options(argc, argv);
215
216        //===================
217        // Setup non-interactive termination
218        if(!options.interactive) {
219                closefd = eventfd(0, 0);
220                if(closefd < 0) abort( "eventfd error: (%d) %s\n", (int)errno, strerror(errno) );
221
222                sighandler_t prev = signal(SIGTERM, cleanstop);
223                intptr_t prev_workaround = (intptr_t) prev;
224                // can't use SIG_ERR it crashes the compiler
225                if(prev_workaround == -1) abort( "signal setup error: (%d) %s\n", (int)errno, strerror(errno) );
226
227                sout | "Signal termination ready";
228        }
229
230        //===================
231        // Open Files
232        if( options.file_cache.path ) {
233                sout | "Filling cache from" | options.file_cache.path;
234                fill_cache( options.file_cache.path );
235        }
236
237        //===================
238        // Open Socket
239        sout | getpid() | ": Listening on port" | options.socket.port;
240
241        struct sockaddr_in address;
242        int addrlen = prepaddr(address);
243
244        int server_fd;
245
246        //===================
247        // Run Server Cluster
248        {
249                int pipe_cnt = options.clopts.nworkers * 2;
250                int pipe_off;
251                int * fds;
252                [fds, pipe_off] = filefds( pipe_cnt );
253                for(i; 0 ~ pipe_cnt ~ 2) {
254                        int ret = pipe(&fds[pipe_off + i]);
255                        if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); }
256                }
257
258                // if(options.file_cache.path && options.file_cache.fixed_fds) {
259                //      register_fixed_files(cl, fds, pipe_off);
260                // }
261
262                {
263                        // Stats printer makes a copy so this needs to persist longer than normal
264                        connection ** conns;
265                        AcceptWorker  * aworkers = 0p;
266                        ChannelWorker * cworkers = 0p;
267                        Acceptor * acceptors = 0p;
268                        Q * queues = 0p;
269                        ServerCluster cl;
270
271                        init_protocol();
272                        {
273                                conns = alloc(options.clopts.nworkers);
274                                if(options.socket.reuseport) {
275                                        queues = alloc(options.clopts.nprocs);
276                                        acceptors = anew(options.clopts.nprocs);
277                                        for(i; options.clopts.nprocs) {
278                                                (queues[i]){};
279                                                {
280                                                        acceptors[i].sockfd  = listener(address, addrlen);
281                                                        acceptors[i].addr    = (struct sockaddr *)&address;
282                                                        acceptors[i].addrlen = (socklen_t*)&addrlen;
283                                                        acceptors[i].flags   = 0;
284                                                        acceptors[i].queue   = &queues[i].q;
285                                                }
286                                                unpark( acceptors[i] );
287                                        }
288
289                                        cworkers = anew(options.clopts.nworkers);
290                                        for(i; options.clopts.nworkers) {
291                                                {
292                                                        cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
293                                                        cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
294                                                        cworkers[i].queue = &queues[i % options.clopts.nprocs].q;
295                                                        conns[i] = &cworkers[i].conn;
296                                                }
297                                                unpark( cworkers[i] );
298                                        }
299                                }
300                                else {
301                                        server_fd = listener(address, addrlen);
302                                        aworkers = anew(options.clopts.nworkers);
303                                        for(i; options.clopts.nworkers) {
304                                                // if( options.file_cache.fixed_fds ) {
305                                                //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
306                                                //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
307                                                // }
308                                                // else
309                                                {
310                                                        aworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
311                                                        aworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
312                                                        aworkers[i].sockfd = server_fd;
313                                                        aworkers[i].addr    = (struct sockaddr *)&address;
314                                                        aworkers[i].addrlen = (socklen_t*)&addrlen;
315                                                        aworkers[i].flags   = 0;
316                                                        conns[i] = &aworkers[i].conn;
317                                                }
318                                                unpark( aworkers[i] );
319                                        }
320                                }
321                                cl.prnt->conns = conns;
322                                cl.prnt->conn_cnt = options.clopts.nworkers;
323                                sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors";
324                                sout | nl;
325                                {
326                                        if(options.interactive) {
327                                                char buffer[128];
328                                                for() {
329                                                        int ret = cfa_read(0, buffer, 128, 0);
330                                                        if(ret == 0) break;
331                                                        if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
332                                                        sout | "User wrote '" | "" | nonl;
333                                                        write(sout, buffer, ret - 1);
334                                                        sout | "'";
335                                                }
336                                        }
337                                        else {
338                                                char buffer[sizeof(eventfd_t)];
339                                                int ret = cfa_read(closefd, buffer, sizeof(eventfd_t), 0);
340                                                if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
341                                        }
342
343                                        sout | "Shutdown received";
344                                }
345
346                                //===================
347                                // Close Socket and join
348                                if(options.socket.reuseport) {
349                                        sout | "Notifying connections..." | nonl; flush( sout );
350                                        for(i; options.clopts.nprocs) {
351                                                acceptors[i].done = true;
352                                        }
353                                        for(i; options.clopts.nworkers) {
354                                                cworkers[i].done = true;
355                                        }
356                                        sout | "done";
357
358                                        sout | "Shutting down Socket..." | nonl; flush( sout );
359                                        for(i; options.clopts.nprocs) {
360                                                ret = shutdown( acceptors[i].sockfd, SHUT_RD );
361                                                if( ret < 0 ) {
362                                                        abort( "shutdown1 error: (%d) %s\n", (int)errno, strerror(errno) );
363                                                }
364                                        }
365                                        sout | "done";
366
367                                        sout | "Closing Socket..." | nonl; flush( sout );
368                                        for(i; options.clopts.nprocs) {
369                                                ret = close( acceptors[i].sockfd );
370                                                if( ret < 0) {
371                                                        abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
372                                                }
373                                        }
374                                        sout | "done";
375
376                                        sout | "Stopping accept threads..." | nonl; flush( sout );
377                                        for(i; options.clopts.nprocs) {
378                                                join(acceptors[i]);
379                                        }
380                                        sout | "done";
381
382                                        sout | "Draining worker queues..." | nonl; flush( sout );
383                                        for(i; options.clopts.nprocs) {
384                                                PendingRead * p = 0p;
385                                                while(p = pop(queues[i].q)) {
386                                                        fulfil(p->f, -ECONNRESET);
387                                                }
388                                        }
389                                        sout | "done";
390
391                                        sout | "Stopping worker threads..." | nonl; flush( sout );
392                                        for(i; options.clopts.nworkers) {
393                                                for(j; 2) {
394                                                        ret = close(cworkers[i].conn.pipe[j]);
395                                                        if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
396                                                }
397                                                join(cworkers[i]);
398                                        }
399                                }
400                                else {
401                                        sout | "Notifying connections..." | nonl; flush( sout );
402                                        for(i; options.clopts.nworkers) {
403                                                aworkers[i].done = true;
404                                        }
405                                        sout | "done";
406
407                                        sout | "Shutting down Socket..." | nonl; flush( sout );
408                                        ret = shutdown( server_fd, SHUT_RD );
409                                        if( ret < 0 ) {
410                                                abort( "shutdown2 error: (%d) %s\n", (int)errno, strerror(errno) );
411                                        }
412                                        sout | "done";
413
414                                        sout | "Closing Socket..." | nonl; flush( sout );
415                                        ret = close( server_fd );
416                                        if(ret < 0) {
417                                                abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
418                                        }
419                                        sout | "done";
420
421                                        sout | "Stopping connection threads..." | nonl; flush( sout );
422                                        for(i; options.clopts.nworkers) {
423                                                for(j; 2) {
424                                                        ret = close(aworkers[i].conn.pipe[j]);
425                                                        if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
426                                                }
427                                                join(aworkers[i]);
428                                        }
429                                }
430                        }
431                        sout | "done";
432
433                        sout | "Stopping protocol threads..." | nonl; flush( sout );
434                        deinit_protocol();
435                        sout | "done";
436
437                        sout | "Stopping printer threads..." | nonl; flush( sout );
438                        StatsPrinter * p = cl.prnt;
439                        if(p) {
440                                notify_one(p->var);
441                                join(*p);
442                        }
443                        sout | "done";
444
445                        // Now that the stats printer is stopped, we can reclaim this
446                        adelete(aworkers);
447                        adelete(cworkers);
448                        adelete(acceptors);
449                        adelete(queues);
450                        free(conns);
451
452                        sout | "Stopping processors/clusters..." | nonl; flush( sout );
453                }
454                sout | "done";
455
456                free(fds);
457
458                sout | "Stopping processors..." | nonl; flush( sout );
459        }
460        sout | "done";
461
462        //===================
463        // Close Files
464        if( options.file_cache.path ) {
465                sout | "Closing open files..." | nonl; flush( sout );
466                close_cache();
467                sout | "done";
468        }
469}
470
471const size_t zipf_sizes[] = { 102, 204, 307, 409, 512, 614, 716, 819, 921, 1024, 2048, 3072, 4096, 5120, 6144, 7168, 8192, 9216, 10240, 20480, 30720, 40960, 51200, 61440, 71680, 81920, 92160, 102400, 204800, 307200, 409600, 512000, 614400, 716800, 819200, 921600 };
472static_assert(zipf_cnts == sizeof(zipf_sizes) / sizeof(zipf_sizes[0]));
Note: See TracBrowser for help on using the repository browser.