source: benchmark/io/http/main.cfa @ 17c6edeb

ADTast-experimentalpthread-emulation
Last change on this file since 17c6edeb was 32d1383, checked in by Thierry Delisle <tdelisle@…>, 21 months ago

Committing http server when I know it works

  • Property mode set to 100644
File size: 11.4 KB
RevLine 
[1e8b4b49]1#define _GNU_SOURCE
[0aec496]2
3#include <errno.h>
[153570d]4#include <signal.h>
[0aec496]5#include <stdio.h>
6#include <string.h>
7#include <unistd.h>
8extern "C" {
[6117fc0]9        #include <sched.h>
[c2df3031]10        #include <signal.h>
[153570d]11        #include <sys/eventfd.h>
[0aec496]12        #include <sys/socket.h>
13        #include <netinet/in.h>
14}
15
[8c43d05]16#include <fstream.hfa>
[0aec496]17#include <kernel.hfa>
[ce98816]18#include <locks.hfa>
[153dc387]19#include <iofwd.hfa>
[0aec496]20#include <stats.hfa>
[d11d6eb]21#include <time.hfa>
[0aec496]22#include <thread.hfa>
23
24#include "filecache.hfa"
25#include "options.hfa"
[c4b10e2]26#include "socket.hfa"
[137974a]27#include "printer.hfa"
[0aec496]28#include "worker.hfa"
29
[d11d6eb]30extern void register_fixed_files( cluster &, int *, unsigned count );
31
32Duration default_preemption() {
33        return 0;
34}
35
[348f81d]36//=============================================================================================
37// Globals
38//=============================================================================================
39void ?{}( ServerCluster & this ) {
40        (this.self){ "Server Cluster", options.clopts.params };
41
[6117fc0]42        cpu_set_t fullset;
43        CPU_ZERO(&fullset);
44        int ret = sched_getaffinity(getpid(), sizeof(fullset), &fullset);
45        if( ret != 0 ) abort | "sched_getaffinity failed with" | errno | strerror( errno );
46        int cnt = CPU_COUNT(&fullset);
47
[348f81d]48        this.procs = alloc(options.clopts.nprocs);
49        for(i; options.clopts.nprocs) {
50                (this.procs[i]){ "Benchmark Processor", this.self };
51
[329e26a]52                // int c = 0;
53                // int n = 1 + (i % cnt);
54                // for(int j = 0; j < CPU_SETSIZE; j++) {
55                //      if(CPU_ISSET(j, &fullset)) n--;
56                //      if(n == 0) {
57                //              c = j;
58                //              break;
59                //      }
60                // }
61                // cpu_set_t localset;
62                // CPU_ZERO(&localset);
63                // CPU_SET(c, &localset);
64                // ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
65                // if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
[6117fc0]66
[348f81d]67                #if !defined(__CFA_NO_STATISTICS__)
68                        if( options.clopts.procstats ) {
69                                print_stats_at_exit( *this.procs, this.self.print_stats );
70                        }
71                        if( options.clopts.viewhalts ) {
72                                print_halts( *this.procs );
73                        }
74                #endif
75        }
76
77        #if !defined(__CFA_NO_STATISTICS__)
78                print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
79        #endif
80
[329e26a]81        options.clopts.instance[options.clopts.cltr_cnt] = &this.self;
82        options.clopts.cltr_cnt++;
[153dc387]83}
84
[348f81d]85void ^?{}( ServerCluster & this ) {
86        for(i; options.clopts.nprocs) {
87                ^(this.procs[i]){};
88        }
89        free(this.procs);
90
91        ^(this.self){};
92}
93
94extern void init_protocol(void);
95extern void deinit_protocol(void);
96
[7f0ac12]97//=============================================================================================
98// REUSEPORT
99//=============================================================================================
100
101size_t sockarr_size;
102struct __attribute__((aligned(128))) Q {
103        mpsc_queue(PendingRead) q;
104};
105
[153570d]106//=============================================================================================
107// Termination
108//=============================================================================================
109
110int closefd;
111void cleanstop(int) {
112        eventfd_t buffer = 1;
113        char * buffer_s = (char*)&buffer;
114        int ret = write(closefd, buffer_s, sizeof(buffer));
115        if(ret < 0) abort( "eventfd write error: (%d) %s\n", (int)errno, strerror(errno) );
116        return;
117}
118
[0aec496]119//=============================================================================================
120// Main
121//============================================================================================='
122int main( int argc, char * argv[] ) {
[c4b10e2]123        int ret;
[c2df3031]124        __sighandler_t s = 1p;
125        signal(SIGPIPE, s);
126
[0aec496]127        //===================
128        // Parse args
[b57db73]129        parse_options(argc, argv);
[0aec496]130
[153570d]131        //===================
132        // Setup non-interactive termination
133        if(!options.interactive) {
134                closefd = eventfd(0, 0);
135                if(closefd < 0) abort( "eventfd error: (%d) %s\n", (int)errno, strerror(errno) );
136
137                sighandler_t prev = signal(SIGTERM, cleanstop);
138                intptr_t prev_workaround = (intptr_t) prev;
139                // can't use SIG_ERR it crashes the compiler
140                if(prev_workaround == -1) abort( "signal setup error: (%d) %s\n", (int)errno, strerror(errno) );
141
142                sout | "Signal termination ready";
143        }
144
[0aec496]145        //===================
146        // Open Files
[b57db73]147        if( options.file_cache.path ) {
148                sout | "Filling cache from" | options.file_cache.path;
149                fill_cache( options.file_cache.path );
150        }
[0aec496]151
152        //===================
153        // Open Socket
[8c43d05]154        sout | getpid() | ": Listening on port" | options.socket.port;
[0aec496]155
156        struct sockaddr_in address;
[c4b10e2]157        int addrlen = prepaddr(address);
[0aec496]158
[86c12d65]159        int server_fd;
[0aec496]160
161        //===================
162        // Run Server Cluster
163        {
[d9c2284]164                int pipe_cnt = options.clopts.nworkers * 2;
[32d1383]165                // int pipe_cnt = 0;
[d9c2284]166                int pipe_off;
167                int * fds;
168                [fds, pipe_off] = filefds( pipe_cnt );
169                for(i; 0 ~ pipe_cnt ~ 2) {
170                        int ret = pipe(&fds[pipe_off + i]);
171                        if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); }
172                }
173
[4f762d3]174                // if(options.file_cache.path && options.file_cache.fixed_fds) {
175                //      register_fixed_files(cl, fds, pipe_off);
176                // }
[d11d6eb]177
[0aec496]178                {
[153570d]179                        // Stats printer makes a copy so this needs to persist longer than normal
[7f0ac12]180                        connection ** conns;
181                        AcceptWorker  * aworkers = 0p;
182                        ChannelWorker * cworkers = 0p;
183                        Acceptor * acceptors = 0p;
184                        Q * queues = 0p;
[329e26a]185                        ServerCluster cl[options.clopts.nclusters];
[ece0e80]186
[137974a]187                        if(options.stats) {
188                                stats_thrd = alloc();
[329e26a]189                                (*stats_thrd){ cl };
[137974a]190                        } else {
191                                stats_thrd = 0p;
192                        }
193
[ece0e80]194                        init_protocol();
[0aec496]195                        {
[329e26a]196                                int nacceptors = options.clopts.nprocs * options.clopts.nclusters;
[7f0ac12]197                                conns = alloc(options.clopts.nworkers);
198                                if(options.socket.reuseport) {
[329e26a]199                                        queues = alloc(nacceptors);
200                                        acceptors = alloc(nacceptors);
201                                        sout | "Creating" | nacceptors | "Acceptors";
202                                        for(i; nacceptors) {
203                                                (acceptors[i]){ i % options.clopts.nclusters };
204                                        }
205                                        for(i; nacceptors) {
[7f0ac12]206                                                (queues[i]){};
207                                                {
208                                                        acceptors[i].sockfd  = listener(address, addrlen);
209                                                        acceptors[i].addr    = (struct sockaddr *)&address;
210                                                        acceptors[i].addrlen = (socklen_t*)&addrlen;
211                                                        acceptors[i].flags   = 0;
212                                                        acceptors[i].queue   = &queues[i].q;
213                                                }
214                                                unpark( acceptors[i] );
215                                        }
216
217                                        cworkers = anew(options.clopts.nworkers);
218                                        for(i; options.clopts.nworkers) {
219                                                {
220                                                        cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
221                                                        cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
[329e26a]222                                                        cworkers[i].queue = &queues[i % nacceptors].q;
[7f0ac12]223                                                        conns[i] = &cworkers[i].conn;
224                                                }
225                                                unpark( cworkers[i] );
226                                        }
227                                }
228                                else {
229                                        server_fd = listener(address, addrlen);
230                                        aworkers = anew(options.clopts.nworkers);
231                                        for(i; options.clopts.nworkers) {
232                                                // if( options.file_cache.fixed_fds ) {
233                                                //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
234                                                //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
235                                                // }
236                                                // else
237                                                {
238                                                        aworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
239                                                        aworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
240                                                        aworkers[i].sockfd = server_fd;
241                                                        aworkers[i].addr    = (struct sockaddr *)&address;
242                                                        aworkers[i].addrlen = (socklen_t*)&addrlen;
243                                                        aworkers[i].flags   = 0;
244                                                        conns[i] = &aworkers[i].conn;
245                                                }
246                                                unpark( aworkers[i] );
[d9c2284]247                                        }
248                                }
[329e26a]249                                sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
250                                for(i; options.clopts.nclusters) {
251                                        sout | options.clopts.thrd_cnt[i] | nonl;
252                                }
[348f81d]253                                sout | nl;
[0aec496]254                                {
[153570d]255                                        if(options.interactive) {
256                                                char buffer[128];
257                                                for() {
258                                                        int ret = cfa_read(0, buffer, 128, 0);
259                                                        if(ret == 0) break;
260                                                        if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
261                                                        sout | "User wrote '" | "" | nonl;
262                                                        write(sout, buffer, ret - 1);
263                                                        sout | "'";
264                                                }
265                                        }
266                                        else {
267                                                char buffer[sizeof(eventfd_t)];
268                                                int ret = cfa_read(closefd, buffer, sizeof(eventfd_t), 0);
[153dc387]269                                                if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
[e95a117]270                                        }
271
[8c43d05]272                                        sout | "Shutdown received";
[0aec496]273                                }
[ece0e80]274
[7f0ac12]275                                //===================
276                                // Close Socket and join
277                                if(options.socket.reuseport) {
278                                        sout | "Notifying connections..." | nonl; flush( sout );
[329e26a]279                                        for(i; nacceptors) {
[7f0ac12]280                                                acceptors[i].done = true;
281                                        }
282                                        for(i; options.clopts.nworkers) {
283                                                cworkers[i].done = true;
284                                        }
285                                        sout | "done";
286
287                                        sout | "Shutting down Socket..." | nonl; flush( sout );
[329e26a]288                                        for(i; nacceptors) {
[7f0ac12]289                                                ret = shutdown( acceptors[i].sockfd, SHUT_RD );
290                                                if( ret < 0 ) {
291                                                        abort( "shutdown1 error: (%d) %s\n", (int)errno, strerror(errno) );
292                                                }
293                                        }
294                                        sout | "done";
[ece0e80]295
[7f0ac12]296                                        sout | "Closing Socket..." | nonl; flush( sout );
[329e26a]297                                        for(i; nacceptors) {
[7f0ac12]298                                                ret = close( acceptors[i].sockfd );
299                                                if( ret < 0) {
300                                                        abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
301                                                }
302                                        }
303                                        sout | "done";
304
[3f95dab]305                                        //===================
306                                        // Close Files
307                                        if( options.file_cache.path ) {
308                                                sout | "Closing open files..." | nonl; flush( sout );
309                                                close_cache();
310                                                sout | "done";
311                                        }
312
[7f0ac12]313                                        sout | "Stopping accept threads..." | nonl; flush( sout );
[329e26a]314                                        for(i; nacceptors) {
[7f0ac12]315                                                join(acceptors[i]);
316                                        }
317                                        sout | "done";
318
319                                        sout | "Draining worker queues..." | nonl; flush( sout );
[329e26a]320                                        for(i; nacceptors) {
[7f0ac12]321                                                PendingRead * p = 0p;
322                                                while(p = pop(queues[i].q)) {
323                                                        fulfil(p->f, -ECONNRESET);
324                                                }
325                                        }
326                                        sout | "done";
327
328                                        sout | "Stopping worker threads..." | nonl; flush( sout );
[86c12d65]329                                        for(i; options.clopts.nworkers) {
[7f0ac12]330                                                for(j; 2) {
331                                                        ret = close(cworkers[i].conn.pipe[j]);
332                                                        if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
333                                                }
334                                                join(cworkers[i]);
[86c12d65]335                                        }
336                                }
337                                else {
[7f0ac12]338                                        sout | "Notifying connections..." | nonl; flush( sout );
339                                        for(i; options.clopts.nworkers) {
340                                                aworkers[i].done = true;
341                                        }
342                                        sout | "done";
343
344                                        sout | "Shutting down Socket..." | nonl; flush( sout );
[86c12d65]345                                        ret = shutdown( server_fd, SHUT_RD );
346                                        if( ret < 0 ) {
[7f0ac12]347                                                abort( "shutdown2 error: (%d) %s\n", (int)errno, strerror(errno) );
[86c12d65]348                                        }
[7f0ac12]349                                        sout | "done";
[ece0e80]350
[7f0ac12]351                                        sout | "Closing Socket..." | nonl; flush( sout );
[86c12d65]352                                        ret = close( server_fd );
353                                        if(ret < 0) {
354                                                abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
355                                        }
[7f0ac12]356                                        sout | "done";
[0197418]357
[3f95dab]358                                        //===================
359                                        // Close Files
360                                        if( options.file_cache.path ) {
361                                                sout | "Closing open files..." | nonl; flush( sout );
362                                                close_cache();
363                                                sout | "done";
364                                        }
365
[7f0ac12]366                                        sout | "Stopping connection threads..." | nonl; flush( sout );
367                                        for(i; options.clopts.nworkers) {
368                                                for(j; 2) {
369                                                        ret = close(aworkers[i].conn.pipe[j]);
370                                                        if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
371                                                }
372                                                join(aworkers[i]);
[c4b10e2]373                                        }
[153570d]374                                }
[0aec496]375                        }
[8c43d05]376                        sout | "done";
[ece0e80]377
[b57db73]378                        sout | "Stopping protocol threads..." | nonl; flush( sout );
[ece0e80]379                        deinit_protocol();
[8c43d05]380                        sout | "done";
381
[153570d]382                        sout | "Stopping printer threads..." | nonl; flush( sout );
[137974a]383                        if(stats_thrd) {
384                                notify_one(stats_thrd->var);
[153570d]385                        }
[137974a]386                        delete(stats_thrd);
[153570d]387                        sout | "done";
388
389                        // Now that the stats printer is stopped, we can reclaim this
[7f0ac12]390                        adelete(aworkers);
391                        adelete(cworkers);
392                        adelete(acceptors);
393                        adelete(queues);
394                        free(conns);
[153570d]395
[348f81d]396                        sout | "Stopping processors/clusters..." | nonl; flush( sout );
[0aec496]397                }
[8c43d05]398                sout | "done";
[d9c2284]399
400                free(fds);
[c3ee5f3]401
[b57db73]402                sout | "Stopping processors..." | nonl; flush( sout );
[0aec496]403        }
[8c43d05]404        sout | "done";
[ef3c383]405}
406
407const size_t zipf_sizes[] = { 102, 204, 307, 409, 512, 614, 716, 819, 921, 1024, 2048, 3072, 4096, 5120, 6144, 7168, 8192, 9216, 10240, 20480, 30720, 40960, 51200, 61440, 71680, 81920, 92160, 102400, 204800, 307200, 409600, 512000, 614400, 716800, 819200, 921600 };
408static_assert(zipf_cnts == sizeof(zipf_sizes) / sizeof(zipf_sizes[0]));
Note: See TracBrowser for help on using the repository browser.