source: benchmark/io/http/main.cfa @ 6117fc0

enumforall-pointer-decay
Last change on this file since 6117fc0 was 6117fc0, checked in by Thierry Delisle <tdelisle@…>, 5 months ago

Httpforall now pins kernel threads

  • Property mode set to 100644
File size: 7.8 KB
Line 
1#define _GNU_SOURCE
2
3#include <errno.h>
4#include <stdio.h>
5#include <string.h>
6#include <unistd.h>
7extern "C" {
8        #include <sched.h>
9        #include <signal.h>
10        #include <sys/socket.h>
11        #include <netinet/in.h>
12}
13
14#include <fstream.hfa>
15#include <kernel.hfa>
16#include <iofwd.hfa>
17#include <stats.hfa>
18#include <time.hfa>
19#include <thread.hfa>
20
21#include "filecache.hfa"
22#include "options.hfa"
23#include "worker.hfa"
24
25extern void register_fixed_files( cluster &, int *, unsigned count );
26
27Duration default_preemption() {
28        return 0;
29}
30
31//=============================================================================================
32// Stats Printer
33//============================================================================================='
34
35thread StatsPrinter {};
36
37void ?{}( StatsPrinter & this, cluster & cl ) {
38        ((thread&)this){ "Stats Printer Thread", cl };
39}
40
41void ^?{}( StatsPrinter & mutex this ) {}
42
43void main(StatsPrinter & this) {
44        LOOP: for() {
45                waitfor( ^?{} : this) {
46                        break LOOP;
47                }
48                or else {}
49
50                sleep(10`s);
51
52                print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
53        }
54}
55
56//=============================================================================================
57// Globals
58//=============================================================================================
59struct ServerCluster {
60        cluster self;
61        processor    * procs;
62        // io_context   * ctxs;
63        StatsPrinter * prnt;
64
65};
66
67void ?{}( ServerCluster & this ) {
68        (this.self){ "Server Cluster", options.clopts.params };
69
70        cpu_set_t fullset;
71        CPU_ZERO(&fullset);
72        int ret = sched_getaffinity(getpid(), sizeof(fullset), &fullset);
73        if( ret != 0 ) abort | "sched_getaffinity failed with" | errno | strerror( errno );
74        int cnt = CPU_COUNT(&fullset);
75
76        this.procs = alloc(options.clopts.nprocs);
77        for(i; options.clopts.nprocs) {
78                (this.procs[i]){ "Benchmark Processor", this.self };
79
80                int c = 0;
81                int n = 1 + (i % cnt);
82                for(int j = 0; j < CPU_SETSIZE; j++) {
83                        if(CPU_ISSET(j, &fullset)) n--;
84                        if(n == 0) {
85                                c = j;
86                                break;
87                        }
88                }
89                cpu_set_t localset;
90                CPU_ZERO(&localset);
91                CPU_SET(c, &localset);
92                ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
93                if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
94
95                #if !defined(__CFA_NO_STATISTICS__)
96                        if( options.clopts.procstats ) {
97                                print_stats_at_exit( *this.procs, this.self.print_stats );
98                        }
99                        if( options.clopts.viewhalts ) {
100                                print_halts( *this.procs );
101                        }
102                #endif
103        }
104
105        if(options.stats) {
106                this.prnt = alloc();
107                (*this.prnt){ this.self };
108        } else {
109                this.prnt = 0p;
110        }
111
112        #if !defined(__CFA_NO_STATISTICS__)
113                print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
114        #endif
115
116        options.clopts.instance[options.clopts.cltr_cnt] = &this.self;
117        options.clopts.cltr_cnt++;
118}
119
120void ^?{}( ServerCluster & this ) {
121        delete(this.prnt);
122
123        for(i; options.clopts.nprocs) {
124                ^(this.procs[i]){};
125        }
126        free(this.procs);
127
128        ^(this.self){};
129}
130
131extern void init_protocol(void);
132extern void deinit_protocol(void);
133
134//=============================================================================================
135// Main
136//============================================================================================='
137int main( int argc, char * argv[] ) {
138        __sighandler_t s = 1p;
139        signal(SIGPIPE, s);
140
141        //===================
142        // Parse args
143        parse_options(argc, argv);
144
145        //===================
146        // Open Files
147        if( options.file_cache.path ) {
148                sout | "Filling cache from" | options.file_cache.path;
149                fill_cache( options.file_cache.path );
150        }
151
152        //===================
153        // Open Socket
154        sout | getpid() | ": Listening on port" | options.socket.port;
155        int server_fd = socket(AF_INET, SOCK_STREAM, 0);
156        if(server_fd < 0) {
157                abort( "socket error: (%d) %s\n", (int)errno, strerror(errno) );
158        }
159
160        int ret = 0;
161        struct sockaddr_in address;
162        int addrlen = sizeof(address);
163        memset( (char *)&address, '\0' );
164        address.sin_family = AF_INET;
165        address.sin_addr.s_addr = htonl(INADDR_ANY);
166        address.sin_port = htons( options.socket.port );
167
168        int waited = 0;
169        for() {
170                int sockfd = server_fd;
171                __CONST_SOCKADDR_ARG addr;
172                addr.__sockaddr__ = (struct sockaddr *)&address;
173                socklen_t addrlen = sizeof(address);
174                ret = bind( sockfd, addr, addrlen );
175                if(ret < 0) {
176                        if(errno == EADDRINUSE) {
177                                if(waited == 0) {
178                                        if(!options.interactive) abort | "Port already in use in non-interactive mode. Aborting";
179                                        sout | "Waiting for port";
180                                } else {
181                                        sout | "\r" | waited | nonl;
182                                        flush( sout );
183                                }
184                                waited ++;
185                                sleep( 1`s );
186                                continue;
187                        }
188                        abort( "bind error: (%d) %s\n", (int)errno, strerror(errno) );
189                }
190                break;
191        }
192
193        ret = listen( server_fd, options.socket.backlog );
194        if(ret < 0) {
195                abort( "listen error: (%d) %s\n", (int)errno, strerror(errno) );
196        }
197
198        //===================
199        // Run Server Cluster
200        {
201                int pipe_cnt = options.clopts.nworkers * 2;
202                int pipe_off;
203                int * fds;
204                [fds, pipe_off] = filefds( pipe_cnt );
205                for(i; 0 ~ pipe_cnt ~ 2) {
206                        int ret = pipe(&fds[pipe_off + i]);
207                        if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); }
208                }
209
210                // if(options.file_cache.path && options.file_cache.fixed_fds) {
211                //      register_fixed_files(cl, fds, pipe_off);
212                // }
213
214                {
215                        ServerCluster cl[options.clopts.nclusters];
216
217                        init_protocol();
218                        {
219                                Worker * workers = anew(options.clopts.nworkers);
220                                for(i; options.clopts.nworkers) {
221                                        // if( options.file_cache.fixed_fds ) {
222                                        //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
223                                        //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
224                                        // }
225                                        // else
226                                        {
227                                                workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0];
228                                                workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1];
229                                                workers[i].sockfd  = server_fd;
230                                                workers[i].addr    = (struct sockaddr *)&address;
231                                                workers[i].addrlen = (socklen_t*)&addrlen;
232                                                workers[i].flags   = 0;
233                                        }
234                                        unpark( workers[i] );
235                                }
236                                sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
237                                for(i; options.clopts.nclusters) {
238                                        sout | options.clopts.thrd_cnt[i] | nonl;
239                                }
240                                sout | nl;
241                                if(!options.interactive) park();
242                                {
243                                        char buffer[128];
244                                        for() {
245                                                int ret = cfa_read(0, buffer, 128, 0);
246                                                if(ret == 0) break;
247                                                if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
248                                                sout | "User wrote '" | "" | nonl;
249                                                write(sout, buffer, ret - 1);
250                                                sout | "'";
251                                        }
252
253                                        sout | "Shutdown received";
254                                }
255
256                                sout | "Notifying connections..." | nonl; flush( sout );
257                                for(i; options.clopts.nworkers) {
258                                        workers[i].done = true;
259                                }
260                                sout | "done";
261
262                                sout | "Shutting down socket..." | nonl; flush( sout );
263                                int ret = shutdown( server_fd, SHUT_RD );
264                                if( ret < 0 ) {
265                                        abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
266                                }
267                                sout | "done";
268
269                                //===================
270                                // Close Socket
271                                sout | "Closing Socket..." | nonl; flush( sout );
272                                ret = close( server_fd );
273                                if(ret < 0) {
274                                        abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
275                                }
276                                sout | "done";
277
278                                sout | "Stopping connection threads..." | nonl; flush( sout );
279                                adelete(workers);
280                        }
281                        sout | "done";
282
283                        sout | "Stopping protocol threads..." | nonl; flush( sout );
284                        deinit_protocol();
285                        sout | "done";
286
287                        sout | "Stopping processors/clusters..." | nonl; flush( sout );
288                }
289                sout | "done";
290
291                sout | "Closing splice fds..." | nonl; flush( sout );
292                for(i; pipe_cnt) {
293                        ret = close( fds[pipe_off + i] );
294                        if(ret < 0) {
295                                abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );
296                        }
297                }
298                free(fds);
299                sout | "done";
300
301                sout | "Stopping processors..." | nonl; flush( sout );
302        }
303        sout | "done";
304
305        //===================
306        // Close Files
307        if( options.file_cache.path ) {
308                sout | "Closing open files..." | nonl; flush( sout );
309                close_cache();
310                sout | "done";
311        }
312}
Note: See TracBrowser for help on using the repository browser.