source: benchmark/io/http/main.cfa @ bbf61838

ADTast-experimentalpthread-emulationqualifiedEnum
Last change on this file since bbf61838 was 7f0ac12, checked in by Thierry Delisle <tdelisle@…>, 2 years ago

First draft at acceptor thread webserver

  • Property mode set to 100644
File size: 13.2 KB
Line 
1#define _GNU_SOURCE
2
3#include <errno.h>
4#include <signal.h>
5#include <stdio.h>
6#include <string.h>
7#include <unistd.h>
8extern "C" {
9        #include <sched.h>
10        #include <signal.h>
11        #include <sys/eventfd.h>
12        #include <sys/socket.h>
13        #include <netinet/in.h>
14}
15
16#include <fstream.hfa>
17#include <kernel.hfa>
18#include <locks.hfa>
19#include <iofwd.hfa>
20#include <stats.hfa>
21#include <time.hfa>
22#include <thread.hfa>
23
24#include "filecache.hfa"
25#include "options.hfa"
26#include "socket.hfa"
27#include "worker.hfa"
28
29extern void register_fixed_files( cluster &, int *, unsigned count );
30
31Duration default_preemption() {
32        return 0;
33}
34
35//=============================================================================================
36// Stats Printer
37//============================================================================================='
38
39thread StatsPrinter {
40        connection ** conns;
41        volatile int conn_cnt;
42        condition_variable(fast_block_lock) var;
43};
44
45void ?{}( StatsPrinter & this, cluster & cl ) {
46        ((thread&)this){ "Stats Printer Thread", cl };
47        this.conn_cnt = 0;
48}
49
50void ^?{}( StatsPrinter & mutex this ) {}
51
52#define eng3(X) (ws(3, 3, unit(eng( X ))))
53
54void main(StatsPrinter & this) {
55        LOOP: for() {
56                waitfor( ^?{} : this) {
57                        break LOOP;
58                }
59                or else {}
60
61                wait(this.var, 10`s);
62
63                print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
64                if(this.conn_cnt != 0) {
65                        uint64_t tries = 0;
66                        uint64_t calls = 0;
67                        uint64_t header = 0;
68                        uint64_t splcin = 0;
69                        uint64_t splcot = 0;
70                        struct {
71                                volatile uint64_t calls;
72                                volatile uint64_t bytes;
73                        } avgrd[zipf_cnts];
74                        memset(avgrd, 0, sizeof(avgrd));
75
76                        for(i; this.conn_cnt) {
77                                tries += this.conns[i]->stats.sendfile.tries;
78                                calls += this.conns[i]->stats.sendfile.calls;
79                                header += this.conns[i]->stats.sendfile.header;
80                                splcin += this.conns[i]->stats.sendfile.splcin;
81                                splcot += this.conns[i]->stats.sendfile.splcot;
82                                for(j; zipf_cnts) {
83                                        avgrd[j].calls += this.conns[i]->stats.sendfile.avgrd[j].calls;
84                                        avgrd[j].bytes += this.conns[i]->stats.sendfile.avgrd[j].bytes;
85                                }
86                        }
87
88                        double ratio = ((double)tries) / calls;
89
90                        sout | "----- Connection Stats -----";
91                        sout | "sendfile  : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";
92                        sout | "            " | header | "header," | splcin | "splice in," | splcot | "splice out";
93                        sout | " - zipf sizes:";
94                        for(i; zipf_cnts) {
95                                double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;
96                                sout | "        " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";
97                        }
98                }
99                else {
100                        sout | "No Connections!";
101                }
102        }
103}
104
105//=============================================================================================
106// Globals
107//=============================================================================================
108struct ServerCluster {
109        cluster self;
110        processor    * procs;
111        // io_context   * ctxs;
112        StatsPrinter * prnt;
113
114};
115
116void ?{}( ServerCluster & this ) {
117        (this.self){ "Server Cluster", options.clopts.params };
118
119        cpu_set_t fullset;
120        CPU_ZERO(&fullset);
121        int ret = sched_getaffinity(getpid(), sizeof(fullset), &fullset);
122        if( ret != 0 ) abort | "sched_getaffinity failed with" | errno | strerror( errno );
123        int cnt = CPU_COUNT(&fullset);
124
125        this.procs = alloc(options.clopts.nprocs);
126        for(i; options.clopts.nprocs) {
127                (this.procs[i]){ "Benchmark Processor", this.self };
128
129                int c = 0;
130                int n = 1 + (i % cnt);
131                for(int j = 0; j < CPU_SETSIZE; j++) {
132                        if(CPU_ISSET(j, &fullset)) n--;
133                        if(n == 0) {
134                                c = j;
135                                break;
136                        }
137                }
138                cpu_set_t localset;
139                CPU_ZERO(&localset);
140                CPU_SET(c, &localset);
141                ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
142                if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
143
144                #if !defined(__CFA_NO_STATISTICS__)
145                        if( options.clopts.procstats ) {
146                                print_stats_at_exit( *this.procs, this.self.print_stats );
147                        }
148                        if( options.clopts.viewhalts ) {
149                                print_halts( *this.procs );
150                        }
151                #endif
152        }
153
154        if(options.stats) {
155                this.prnt = alloc();
156                (*this.prnt){ this.self };
157        } else {
158                this.prnt = 0p;
159        }
160
161        #if !defined(__CFA_NO_STATISTICS__)
162                print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
163        #endif
164
165        options.clopts.instance[options.clopts.cltr_cnt] = &this.self;
166        options.clopts.cltr_cnt++;
167}
168
169void ^?{}( ServerCluster & this ) {
170        delete(this.prnt);
171
172        for(i; options.clopts.nprocs) {
173                ^(this.procs[i]){};
174        }
175        free(this.procs);
176
177        ^(this.self){};
178}
179
180extern void init_protocol(void);
181extern void deinit_protocol(void);
182
183//=============================================================================================
184// REUSEPORT
185//=============================================================================================
186
187size_t sockarr_size;
188struct __attribute__((aligned(128))) Q {
189        mpsc_queue(PendingRead) q;
190};
191
192//=============================================================================================
193// Termination
194//=============================================================================================
195
196int closefd;
197void cleanstop(int) {
198        eventfd_t buffer = 1;
199        char * buffer_s = (char*)&buffer;
200        int ret = write(closefd, buffer_s, sizeof(buffer));
201        if(ret < 0) abort( "eventfd write error: (%d) %s\n", (int)errno, strerror(errno) );
202        return;
203}
204
205//=============================================================================================
206// Main
207//============================================================================================='
208int main( int argc, char * argv[] ) {
209        int ret;
210        __sighandler_t s = 1p;
211        signal(SIGPIPE, s);
212
213        //===================
214        // Parse args
215        parse_options(argc, argv);
216
217        //===================
218        // Setup non-interactive termination
219        if(!options.interactive) {
220                closefd = eventfd(0, 0);
221                if(closefd < 0) abort( "eventfd error: (%d) %s\n", (int)errno, strerror(errno) );
222
223                sighandler_t prev = signal(SIGTERM, cleanstop);
224                intptr_t prev_workaround = (intptr_t) prev;
225                // can't use SIG_ERR it crashes the compiler
226                if(prev_workaround == -1) abort( "signal setup error: (%d) %s\n", (int)errno, strerror(errno) );
227
228                sout | "Signal termination ready";
229        }
230
231        //===================
232        // Open Files
233        if( options.file_cache.path ) {
234                sout | "Filling cache from" | options.file_cache.path;
235                fill_cache( options.file_cache.path );
236        }
237
238        //===================
239        // Open Socket
240        sout | getpid() | ": Listening on port" | options.socket.port;
241
242        struct sockaddr_in address;
243        int addrlen = prepaddr(address);
244
245        int server_fd;
246
247        //===================
248        // Run Server Cluster
249        {
250                int pipe_cnt = options.clopts.nworkers * 2;
251                int pipe_off;
252                int * fds;
253                [fds, pipe_off] = filefds( pipe_cnt );
254                for(i; 0 ~ pipe_cnt ~ 2) {
255                        int ret = pipe(&fds[pipe_off + i]);
256                        if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); }
257                }
258
259                // if(options.file_cache.path && options.file_cache.fixed_fds) {
260                //      register_fixed_files(cl, fds, pipe_off);
261                // }
262
263                {
264                        // Stats printer makes a copy so this needs to persist longer than normal
265                        connection ** conns;
266                        AcceptWorker  * aworkers = 0p;
267                        ChannelWorker * cworkers = 0p;
268                        Acceptor * acceptors = 0p;
269                        Q * queues = 0p;
270                        ServerCluster cl[options.clopts.nclusters];
271
272                        init_protocol();
273                        {
274                                conns = alloc(options.clopts.nworkers);
275                                if(options.socket.reuseport) {
276                                        queues = alloc(options.clopts.nprocs);
277                                        acceptors = anew(options.clopts.nprocs);
278                                        for(i; options.clopts.nprocs) {
279                                                (queues[i]){};
280                                                {
281                                                        acceptors[i].sockfd  = listener(address, addrlen);
282                                                        acceptors[i].addr    = (struct sockaddr *)&address;
283                                                        acceptors[i].addrlen = (socklen_t*)&addrlen;
284                                                        acceptors[i].flags   = 0;
285                                                        acceptors[i].queue   = &queues[i].q;
286                                                }
287                                                unpark( acceptors[i] );
288                                        }
289
290                                        cworkers = anew(options.clopts.nworkers);
291                                        for(i; options.clopts.nworkers) {
292                                                {
293                                                        cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
294                                                        cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
295                                                        cworkers[i].queue = &queues[i % options.clopts.nprocs].q;
296                                                        conns[i] = &cworkers[i].conn;
297                                                }
298                                                unpark( cworkers[i] );
299                                        }
300                                }
301                                else {
302                                        server_fd = listener(address, addrlen);
303                                        aworkers = anew(options.clopts.nworkers);
304                                        for(i; options.clopts.nworkers) {
305                                                // if( options.file_cache.fixed_fds ) {
306                                                //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
307                                                //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
308                                                // }
309                                                // else
310                                                {
311                                                        aworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
312                                                        aworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
313                                                        aworkers[i].sockfd = server_fd;
314                                                        aworkers[i].addr    = (struct sockaddr *)&address;
315                                                        aworkers[i].addrlen = (socklen_t*)&addrlen;
316                                                        aworkers[i].flags   = 0;
317                                                        conns[i] = &aworkers[i].conn;
318                                                }
319                                                unpark( aworkers[i] );
320                                        }
321                                }
322                                cl[0].prnt->conns = conns;
323                                cl[0].prnt->conn_cnt = options.clopts.nworkers;
324                                sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
325                                for(i; options.clopts.nclusters) {
326                                        sout | options.clopts.thrd_cnt[i] | nonl;
327                                }
328                                sout | nl;
329                                {
330                                        if(options.interactive) {
331                                                char buffer[128];
332                                                for() {
333                                                        int ret = cfa_read(0, buffer, 128, 0);
334                                                        if(ret == 0) break;
335                                                        if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
336                                                        sout | "User wrote '" | "" | nonl;
337                                                        write(sout, buffer, ret - 1);
338                                                        sout | "'";
339                                                }
340                                        }
341                                        else {
342                                                char buffer[sizeof(eventfd_t)];
343                                                int ret = cfa_read(closefd, buffer, sizeof(eventfd_t), 0);
344                                                if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
345                                        }
346
347                                        sout | "Shutdown received";
348                                }
349
350                                //===================
351                                // Close Socket and join
352                                if(options.socket.reuseport) {
353                                        sout | "Notifying connections..." | nonl; flush( sout );
354                                        for(i; options.clopts.nprocs) {
355                                                acceptors[i].done = true;
356                                        }
357                                        for(i; options.clopts.nworkers) {
358                                                cworkers[i].done = true;
359                                        }
360                                        sout | "done";
361
362                                        sout | "Shutting down Socket..." | nonl; flush( sout );
363                                        for(i; options.clopts.nprocs) {
364                                                ret = shutdown( acceptors[i].sockfd, SHUT_RD );
365                                                if( ret < 0 ) {
366                                                        abort( "shutdown1 error: (%d) %s\n", (int)errno, strerror(errno) );
367                                                }
368                                        }
369                                        sout | "done";
370
371                                        sout | "Closing Socket..." | nonl; flush( sout );
372                                        for(i; options.clopts.nprocs) {
373                                                ret = close( acceptors[i].sockfd );
374                                                if( ret < 0) {
375                                                        abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
376                                                }
377                                        }
378                                        sout | "done";
379
380                                        sout | "Stopping accept threads..." | nonl; flush( sout );
381                                        for(i; options.clopts.nprocs) {
382                                                join(acceptors[i]);
383                                        }
384                                        sout | "done";
385
386                                        sout | "Draining worker queues..." | nonl; flush( sout );
387                                        for(i; options.clopts.nprocs) {
388                                                PendingRead * p = 0p;
389                                                while(p = pop(queues[i].q)) {
390                                                        fulfil(p->f, -ECONNRESET);
391                                                }
392                                        }
393                                        sout | "done";
394
395                                        sout | "Stopping worker threads..." | nonl; flush( sout );
396                                        for(i; options.clopts.nworkers) {
397                                                for(j; 2) {
398                                                        ret = close(cworkers[i].conn.pipe[j]);
399                                                        if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
400                                                }
401                                                join(cworkers[i]);
402                                        }
403                                }
404                                else {
405                                        sout | "Notifying connections..." | nonl; flush( sout );
406                                        for(i; options.clopts.nworkers) {
407                                                aworkers[i].done = true;
408                                        }
409                                        sout | "done";
410
411                                        sout | "Shutting down Socket..." | nonl; flush( sout );
412                                        ret = shutdown( server_fd, SHUT_RD );
413                                        if( ret < 0 ) {
414                                                abort( "shutdown2 error: (%d) %s\n", (int)errno, strerror(errno) );
415                                        }
416                                        sout | "done";
417
418                                        sout | "Closing Socket..." | nonl; flush( sout );
419                                        ret = close( server_fd );
420                                        if(ret < 0) {
421                                                abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
422                                        }
423                                        sout | "done";
424
425                                        sout | "Stopping connection threads..." | nonl; flush( sout );
426                                        for(i; options.clopts.nworkers) {
427                                                for(j; 2) {
428                                                        ret = close(aworkers[i].conn.pipe[j]);
429                                                        if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
430                                                }
431                                                join(aworkers[i]);
432                                        }
433                                }
434                        }
435                        sout | "done";
436
437                        sout | "Stopping protocol threads..." | nonl; flush( sout );
438                        deinit_protocol();
439                        sout | "done";
440
441                        sout | "Stopping printer threads..." | nonl; flush( sout );
442                        for(i; options.clopts.nclusters) {
443                                StatsPrinter * p = cl[i].prnt;
444                                if(p) {
445                                        notify_one(p->var);
446                                        join(*p);
447                                }
448                        }
449                        sout | "done";
450
451                        // Now that the stats printer is stopped, we can reclaim this
452                        adelete(aworkers);
453                        adelete(cworkers);
454                        adelete(acceptors);
455                        adelete(queues);
456                        free(conns);
457
458                        sout | "Stopping processors/clusters..." | nonl; flush( sout );
459                }
460                sout | "done";
461
462                free(fds);
463
464                sout | "Stopping processors..." | nonl; flush( sout );
465        }
466        sout | "done";
467
468        //===================
469        // Close Files
470        if( options.file_cache.path ) {
471                sout | "Closing open files..." | nonl; flush( sout );
472                close_cache();
473                sout | "done";
474        }
475}
476
477const size_t zipf_sizes[] = { 102, 204, 307, 409, 512, 614, 716, 819, 921, 1024, 2048, 3072, 4096, 5120, 6144, 7168, 8192, 9216, 10240, 20480, 30720, 40960, 51200, 61440, 71680, 81920, 92160, 102400, 204800, 307200, 409600, 512000, 614400, 716800, 819200, 921600 };
478static_assert(zipf_cnts == sizeof(zipf_sizes) / sizeof(zipf_sizes[0]));
Note: See TracBrowser for help on using the repository browser.