source: benchmark/io/http/main.cfa @ bf7c7ea

ADTast-experimentalpthread-emulationqualifiedEnum
Last change on this file since bf7c7ea was 137974ae, checked in by Thierry Delisle <tdelisle@…>, 2 years ago

Moved stats printer to it's own file and now using push-stats rather than pull

  • Property mode set to 100644
File size: 10.9 KB
RevLine 
[1e8b4b49]1#define _GNU_SOURCE
[0aec496]2
3#include <errno.h>
[153570d]4#include <signal.h>
[0aec496]5#include <stdio.h>
6#include <string.h>
7#include <unistd.h>
8extern "C" {
[6117fc0]9        #include <sched.h>
[c2df3031]10        #include <signal.h>
[153570d]11        #include <sys/eventfd.h>
[0aec496]12        #include <sys/socket.h>
13        #include <netinet/in.h>
14}
15
[8c43d05]16#include <fstream.hfa>
[0aec496]17#include <kernel.hfa>
[ce98816]18#include <locks.hfa>
[153dc387]19#include <iofwd.hfa>
[0aec496]20#include <stats.hfa>
[d11d6eb]21#include <time.hfa>
[0aec496]22#include <thread.hfa>
23
24#include "filecache.hfa"
25#include "options.hfa"
[c4b10e2]26#include "socket.hfa"
[137974ae]27#include "printer.hfa"
[0aec496]28#include "worker.hfa"
29
[d11d6eb]30extern void register_fixed_files( cluster &, int *, unsigned count );
31
32Duration default_preemption() {
33        return 0;
34}
35
[348f81d5]36//=============================================================================================
37// Globals
38//=============================================================================================
39struct ServerCluster {
40        cluster self;
41        processor    * procs;
42
43};
44
45void ?{}( ServerCluster & this ) {
46        (this.self){ "Server Cluster", options.clopts.params };
47
[6117fc0]48        cpu_set_t fullset;
49        CPU_ZERO(&fullset);
50        int ret = sched_getaffinity(getpid(), sizeof(fullset), &fullset);
51        if( ret != 0 ) abort | "sched_getaffinity failed with" | errno | strerror( errno );
52        int cnt = CPU_COUNT(&fullset);
53
[348f81d5]54        this.procs = alloc(options.clopts.nprocs);
55        for(i; options.clopts.nprocs) {
56                (this.procs[i]){ "Benchmark Processor", this.self };
57
[6117fc0]58                int c = 0;
59                int n = 1 + (i % cnt);
60                for(int j = 0; j < CPU_SETSIZE; j++) {
61                        if(CPU_ISSET(j, &fullset)) n--;
62                        if(n == 0) {
63                                c = j;
64                                break;
65                        }
66                }
67                cpu_set_t localset;
68                CPU_ZERO(&localset);
69                CPU_SET(c, &localset);
70                ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
71                if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
72
[348f81d5]73                #if !defined(__CFA_NO_STATISTICS__)
74                        if( options.clopts.procstats ) {
75                                print_stats_at_exit( *this.procs, this.self.print_stats );
76                        }
77                        if( options.clopts.viewhalts ) {
78                                print_halts( *this.procs );
79                        }
80                #endif
81        }
82
83        #if !defined(__CFA_NO_STATISTICS__)
84                print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
85        #endif
86
[8c58e73]87        options.clopts.instance = &this.self;
[153dc387]88}
89
[348f81d5]90void ^?{}( ServerCluster & this ) {
91        for(i; options.clopts.nprocs) {
92                ^(this.procs[i]){};
93        }
94        free(this.procs);
95
96        ^(this.self){};
97}
98
99extern void init_protocol(void);
100extern void deinit_protocol(void);
101
[7f0ac12]102//=============================================================================================
103// REUSEPORT
104//=============================================================================================
105
106size_t sockarr_size;
107struct __attribute__((aligned(128))) Q {
108        mpsc_queue(PendingRead) q;
109};
110
[153570d]111//=============================================================================================
112// Termination
113//=============================================================================================
114
115int closefd;
116void cleanstop(int) {
117        eventfd_t buffer = 1;
118        char * buffer_s = (char*)&buffer;
119        int ret = write(closefd, buffer_s, sizeof(buffer));
120        if(ret < 0) abort( "eventfd write error: (%d) %s\n", (int)errno, strerror(errno) );
121        return;
122}
123
[0aec496]124//=============================================================================================
125// Main
126//============================================================================================='
127int main( int argc, char * argv[] ) {
[c4b10e2]128        int ret;
[c2df3031]129        __sighandler_t s = 1p;
130        signal(SIGPIPE, s);
131
[0aec496]132        //===================
133        // Parse args
[b57db73]134        parse_options(argc, argv);
[0aec496]135
[153570d]136        //===================
137        // Setup non-interactive termination
138        if(!options.interactive) {
139                closefd = eventfd(0, 0);
140                if(closefd < 0) abort( "eventfd error: (%d) %s\n", (int)errno, strerror(errno) );
141
142                sighandler_t prev = signal(SIGTERM, cleanstop);
143                intptr_t prev_workaround = (intptr_t) prev;
144                // can't use SIG_ERR it crashes the compiler
145                if(prev_workaround == -1) abort( "signal setup error: (%d) %s\n", (int)errno, strerror(errno) );
146
147                sout | "Signal termination ready";
148        }
149
[0aec496]150        //===================
151        // Open Files
[b57db73]152        if( options.file_cache.path ) {
153                sout | "Filling cache from" | options.file_cache.path;
154                fill_cache( options.file_cache.path );
155        }
[0aec496]156
157        //===================
158        // Open Socket
[8c43d05]159        sout | getpid() | ": Listening on port" | options.socket.port;
[0aec496]160
161        struct sockaddr_in address;
[c4b10e2]162        int addrlen = prepaddr(address);
[0aec496]163
[86c12d65]164        int server_fd;
[0aec496]165
166        //===================
167        // Run Server Cluster
168        {
[d9c2284]169                int pipe_cnt = options.clopts.nworkers * 2;
170                int pipe_off;
171                int * fds;
172                [fds, pipe_off] = filefds( pipe_cnt );
173                for(i; 0 ~ pipe_cnt ~ 2) {
174                        int ret = pipe(&fds[pipe_off + i]);
175                        if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); }
176                }
177
[4f762d3]178                // if(options.file_cache.path && options.file_cache.fixed_fds) {
179                //      register_fixed_files(cl, fds, pipe_off);
180                // }
[d11d6eb]181
[0aec496]182                {
[153570d]183                        // Stats printer makes a copy so this needs to persist longer than normal
[7f0ac12]184                        connection ** conns;
185                        AcceptWorker  * aworkers = 0p;
186                        ChannelWorker * cworkers = 0p;
187                        Acceptor * acceptors = 0p;
188                        Q * queues = 0p;
[8c58e73]189                        ServerCluster cl;
[ece0e80]190
[137974ae]191                        if(options.stats) {
192                                stats_thrd = alloc();
193                                (*stats_thrd){ cl.self };
194                        } else {
195                                stats_thrd = 0p;
196                        }
197
[ece0e80]198                        init_protocol();
[0aec496]199                        {
[7f0ac12]200                                conns = alloc(options.clopts.nworkers);
201                                if(options.socket.reuseport) {
202                                        queues = alloc(options.clopts.nprocs);
203                                        acceptors = anew(options.clopts.nprocs);
204                                        for(i; options.clopts.nprocs) {
205                                                (queues[i]){};
206                                                {
207                                                        acceptors[i].sockfd  = listener(address, addrlen);
208                                                        acceptors[i].addr    = (struct sockaddr *)&address;
209                                                        acceptors[i].addrlen = (socklen_t*)&addrlen;
210                                                        acceptors[i].flags   = 0;
211                                                        acceptors[i].queue   = &queues[i].q;
212                                                }
213                                                unpark( acceptors[i] );
214                                        }
215
216                                        cworkers = anew(options.clopts.nworkers);
217                                        for(i; options.clopts.nworkers) {
218                                                {
219                                                        cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
220                                                        cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
221                                                        cworkers[i].queue = &queues[i % options.clopts.nprocs].q;
222                                                        conns[i] = &cworkers[i].conn;
223                                                }
224                                                unpark( cworkers[i] );
225                                        }
226                                }
227                                else {
228                                        server_fd = listener(address, addrlen);
229                                        aworkers = anew(options.clopts.nworkers);
230                                        for(i; options.clopts.nworkers) {
231                                                // if( options.file_cache.fixed_fds ) {
232                                                //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
233                                                //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
234                                                // }
235                                                // else
236                                                {
237                                                        aworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
238                                                        aworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
239                                                        aworkers[i].sockfd = server_fd;
240                                                        aworkers[i].addr    = (struct sockaddr *)&address;
241                                                        aworkers[i].addrlen = (socklen_t*)&addrlen;
242                                                        aworkers[i].flags   = 0;
243                                                        conns[i] = &aworkers[i].conn;
244                                                }
245                                                unpark( aworkers[i] );
[d9c2284]246                                        }
247                                }
[137974ae]248
[8c58e73]249                                sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors";
[348f81d5]250                                sout | nl;
[0aec496]251                                {
[153570d]252                                        if(options.interactive) {
253                                                char buffer[128];
254                                                for() {
255                                                        int ret = cfa_read(0, buffer, 128, 0);
256                                                        if(ret == 0) break;
257                                                        if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
258                                                        sout | "User wrote '" | "" | nonl;
259                                                        write(sout, buffer, ret - 1);
260                                                        sout | "'";
261                                                }
262                                        }
263                                        else {
264                                                char buffer[sizeof(eventfd_t)];
265                                                int ret = cfa_read(closefd, buffer, sizeof(eventfd_t), 0);
[153dc387]266                                                if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
[e95a117]267                                        }
268
[8c43d05]269                                        sout | "Shutdown received";
[0aec496]270                                }
[ece0e80]271
[7f0ac12]272                                //===================
273                                // Close Socket and join
274                                if(options.socket.reuseport) {
275                                        sout | "Notifying connections..." | nonl; flush( sout );
276                                        for(i; options.clopts.nprocs) {
277                                                acceptors[i].done = true;
278                                        }
279                                        for(i; options.clopts.nworkers) {
280                                                cworkers[i].done = true;
281                                        }
282                                        sout | "done";
283
284                                        sout | "Shutting down Socket..." | nonl; flush( sout );
285                                        for(i; options.clopts.nprocs) {
286                                                ret = shutdown( acceptors[i].sockfd, SHUT_RD );
287                                                if( ret < 0 ) {
288                                                        abort( "shutdown1 error: (%d) %s\n", (int)errno, strerror(errno) );
289                                                }
290                                        }
291                                        sout | "done";
[ece0e80]292
[7f0ac12]293                                        sout | "Closing Socket..." | nonl; flush( sout );
294                                        for(i; options.clopts.nprocs) {
295                                                ret = close( acceptors[i].sockfd );
296                                                if( ret < 0) {
297                                                        abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
298                                                }
299                                        }
300                                        sout | "done";
301
302                                        sout | "Stopping accept threads..." | nonl; flush( sout );
303                                        for(i; options.clopts.nprocs) {
304                                                join(acceptors[i]);
305                                        }
306                                        sout | "done";
307
308                                        sout | "Draining worker queues..." | nonl; flush( sout );
309                                        for(i; options.clopts.nprocs) {
310                                                PendingRead * p = 0p;
311                                                while(p = pop(queues[i].q)) {
312                                                        fulfil(p->f, -ECONNRESET);
313                                                }
314                                        }
315                                        sout | "done";
316
317                                        sout | "Stopping worker threads..." | nonl; flush( sout );
[86c12d65]318                                        for(i; options.clopts.nworkers) {
[7f0ac12]319                                                for(j; 2) {
320                                                        ret = close(cworkers[i].conn.pipe[j]);
321                                                        if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
322                                                }
323                                                join(cworkers[i]);
[86c12d65]324                                        }
325                                }
326                                else {
[7f0ac12]327                                        sout | "Notifying connections..." | nonl; flush( sout );
328                                        for(i; options.clopts.nworkers) {
329                                                aworkers[i].done = true;
330                                        }
331                                        sout | "done";
332
333                                        sout | "Shutting down Socket..." | nonl; flush( sout );
[86c12d65]334                                        ret = shutdown( server_fd, SHUT_RD );
335                                        if( ret < 0 ) {
[7f0ac12]336                                                abort( "shutdown2 error: (%d) %s\n", (int)errno, strerror(errno) );
[86c12d65]337                                        }
[7f0ac12]338                                        sout | "done";
[ece0e80]339
[7f0ac12]340                                        sout | "Closing Socket..." | nonl; flush( sout );
[86c12d65]341                                        ret = close( server_fd );
342                                        if(ret < 0) {
343                                                abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
344                                        }
[7f0ac12]345                                        sout | "done";
[0197418]346
[7f0ac12]347                                        sout | "Stopping connection threads..." | nonl; flush( sout );
348                                        for(i; options.clopts.nworkers) {
349                                                for(j; 2) {
350                                                        ret = close(aworkers[i].conn.pipe[j]);
351                                                        if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
352                                                }
353                                                join(aworkers[i]);
[c4b10e2]354                                        }
[153570d]355                                }
[0aec496]356                        }
[8c43d05]357                        sout | "done";
[ece0e80]358
[b57db73]359                        sout | "Stopping protocol threads..." | nonl; flush( sout );
[ece0e80]360                        deinit_protocol();
[8c43d05]361                        sout | "done";
362
[153570d]363                        sout | "Stopping printer threads..." | nonl; flush( sout );
[137974ae]364                        if(stats_thrd) {
365                                notify_one(stats_thrd->var);
[153570d]366                        }
[137974ae]367                        delete(stats_thrd);
[153570d]368                        sout | "done";
369
370                        // Now that the stats printer is stopped, we can reclaim this
[7f0ac12]371                        adelete(aworkers);
372                        adelete(cworkers);
373                        adelete(acceptors);
374                        adelete(queues);
375                        free(conns);
[153570d]376
[348f81d5]377                        sout | "Stopping processors/clusters..." | nonl; flush( sout );
[0aec496]378                }
[8c43d05]379                sout | "done";
[d9c2284]380
381                free(fds);
[c3ee5f3]382
[b57db73]383                sout | "Stopping processors..." | nonl; flush( sout );
[0aec496]384        }
[8c43d05]385        sout | "done";
[0aec496]386
387        //===================
388        // Close Files
[b57db73]389        if( options.file_cache.path ) {
390                sout | "Closing open files..." | nonl; flush( sout );
391                close_cache();
392                sout | "done";
393        }
[ef3c383]394}
395
396const size_t zipf_sizes[] = { 102, 204, 307, 409, 512, 614, 716, 819, 921, 1024, 2048, 3072, 4096, 5120, 6144, 7168, 8192, 9216, 10240, 20480, 30720, 40960, 51200, 61440, 71680, 81920, 92160, 102400, 204800, 307200, 409600, 512000, 614400, 716800, 819200, 921600 };
397static_assert(zipf_cnts == sizeof(zipf_sizes) / sizeof(zipf_sizes[0]));
Note: See TracBrowser for help on using the repository browser.