source: benchmark/io/http/main.cfa@ a8ef59e

ADT ast-experimental enum pthread-emulation qualifiedEnum
Last change on this file since a8ef59e was 6117fc0, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Httpforall now pins kernel threads

  • Property mode set to 100644
File size: 7.8 KB
Line 
1#define _GNU_SOURCE
2
3#include <errno.h>
4#include <stdio.h>
5#include <string.h>
6#include <unistd.h>
7extern "C" {
8 #include <sched.h>
9 #include <signal.h>
10 #include <sys/socket.h>
11 #include <netinet/in.h>
12}
13
14#include <fstream.hfa>
15#include <kernel.hfa>
16#include <iofwd.hfa>
17#include <stats.hfa>
18#include <time.hfa>
19#include <thread.hfa>
20
21#include "filecache.hfa"
22#include "options.hfa"
23#include "worker.hfa"
24
25extern void register_fixed_files( cluster &, int *, unsigned count );
26
27Duration default_preemption() {
28 return 0;
29}
30
31//=============================================================================================
32// Stats Printer
33//============================================================================================='
34
35thread StatsPrinter {};
36
37void ?{}( StatsPrinter & this, cluster & cl ) {
38 ((thread&)this){ "Stats Printer Thread", cl };
39}
40
41void ^?{}( StatsPrinter & mutex this ) {}
42
43void main(StatsPrinter & this) {
44 LOOP: for() {
45 waitfor( ^?{} : this) {
46 break LOOP;
47 }
48 or else {}
49
50 sleep(10`s);
51
52 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
53 }
54}
55
56//=============================================================================================
57// Globals
58//=============================================================================================
59struct ServerCluster {
60 cluster self;
61 processor * procs;
62 // io_context * ctxs;
63 StatsPrinter * prnt;
64
65};
66
67void ?{}( ServerCluster & this ) {
68 (this.self){ "Server Cluster", options.clopts.params };
69
70 cpu_set_t fullset;
71 CPU_ZERO(&fullset);
72 int ret = sched_getaffinity(getpid(), sizeof(fullset), &fullset);
73 if( ret != 0 ) abort | "sched_getaffinity failed with" | errno | strerror( errno );
74 int cnt = CPU_COUNT(&fullset);
75
76 this.procs = alloc(options.clopts.nprocs);
77 for(i; options.clopts.nprocs) {
78 (this.procs[i]){ "Benchmark Processor", this.self };
79
80 int c = 0;
81 int n = 1 + (i % cnt);
82 for(int j = 0; j < CPU_SETSIZE; j++) {
83 if(CPU_ISSET(j, &fullset)) n--;
84 if(n == 0) {
85 c = j;
86 break;
87 }
88 }
89 cpu_set_t localset;
90 CPU_ZERO(&localset);
91 CPU_SET(c, &localset);
92 ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
93 if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
94
95 #if !defined(__CFA_NO_STATISTICS__)
96 if( options.clopts.procstats ) {
97 print_stats_at_exit( *this.procs, this.self.print_stats );
98 }
99 if( options.clopts.viewhalts ) {
100 print_halts( *this.procs );
101 }
102 #endif
103 }
104
105 if(options.stats) {
106 this.prnt = alloc();
107 (*this.prnt){ this.self };
108 } else {
109 this.prnt = 0p;
110 }
111
112 #if !defined(__CFA_NO_STATISTICS__)
113 print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
114 #endif
115
116 options.clopts.instance[options.clopts.cltr_cnt] = &this.self;
117 options.clopts.cltr_cnt++;
118}
119
120void ^?{}( ServerCluster & this ) {
121 delete(this.prnt);
122
123 for(i; options.clopts.nprocs) {
124 ^(this.procs[i]){};
125 }
126 free(this.procs);
127
128 ^(this.self){};
129}
130
131extern void init_protocol(void);
132extern void deinit_protocol(void);
133
134//=============================================================================================
135// Main
136//============================================================================================='
137int main( int argc, char * argv[] ) {
138 __sighandler_t s = 1p;
139 signal(SIGPIPE, s);
140
141 //===================
142 // Parse args
143 parse_options(argc, argv);
144
145 //===================
146 // Open Files
147 if( options.file_cache.path ) {
148 sout | "Filling cache from" | options.file_cache.path;
149 fill_cache( options.file_cache.path );
150 }
151
152 //===================
153 // Open Socket
154 sout | getpid() | ": Listening on port" | options.socket.port;
155 int server_fd = socket(AF_INET, SOCK_STREAM, 0);
156 if(server_fd < 0) {
157 abort( "socket error: (%d) %s\n", (int)errno, strerror(errno) );
158 }
159
160 int ret = 0;
161 struct sockaddr_in address;
162 int addrlen = sizeof(address);
163 memset( (char *)&address, '\0' );
164 address.sin_family = AF_INET;
165 address.sin_addr.s_addr = htonl(INADDR_ANY);
166 address.sin_port = htons( options.socket.port );
167
168 int waited = 0;
169 for() {
170 int sockfd = server_fd;
171 __CONST_SOCKADDR_ARG addr;
172 addr.__sockaddr__ = (struct sockaddr *)&address;
173 socklen_t addrlen = sizeof(address);
174 ret = bind( sockfd, addr, addrlen );
175 if(ret < 0) {
176 if(errno == EADDRINUSE) {
177 if(waited == 0) {
178 if(!options.interactive) abort | "Port already in use in non-interactive mode. Aborting";
179 sout | "Waiting for port";
180 } else {
181 sout | "\r" | waited | nonl;
182 flush( sout );
183 }
184 waited ++;
185 sleep( 1`s );
186 continue;
187 }
188 abort( "bind error: (%d) %s\n", (int)errno, strerror(errno) );
189 }
190 break;
191 }
192
193 ret = listen( server_fd, options.socket.backlog );
194 if(ret < 0) {
195 abort( "listen error: (%d) %s\n", (int)errno, strerror(errno) );
196 }
197
198 //===================
199 // Run Server Cluster
200 {
201 int pipe_cnt = options.clopts.nworkers * 2;
202 int pipe_off;
203 int * fds;
204 [fds, pipe_off] = filefds( pipe_cnt );
205 for(i; 0 ~ pipe_cnt ~ 2) {
206 int ret = pipe(&fds[pipe_off + i]);
207 if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); }
208 }
209
210 // if(options.file_cache.path && options.file_cache.fixed_fds) {
211 // register_fixed_files(cl, fds, pipe_off);
212 // }
213
214 {
215 ServerCluster cl[options.clopts.nclusters];
216
217 init_protocol();
218 {
219 Worker * workers = anew(options.clopts.nworkers);
220 for(i; options.clopts.nworkers) {
221 // if( options.file_cache.fixed_fds ) {
222 // workers[i].pipe[0] = pipe_off + (i * 2) + 0;
223 // workers[i].pipe[1] = pipe_off + (i * 2) + 1;
224 // }
225 // else
226 {
227 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0];
228 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1];
229 workers[i].sockfd = server_fd;
230 workers[i].addr = (struct sockaddr *)&address;
231 workers[i].addrlen = (socklen_t*)&addrlen;
232 workers[i].flags = 0;
233 }
234 unpark( workers[i] );
235 }
236 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
237 for(i; options.clopts.nclusters) {
238 sout | options.clopts.thrd_cnt[i] | nonl;
239 }
240 sout | nl;
241 if(!options.interactive) park();
242 {
243 char buffer[128];
244 for() {
245 int ret = cfa_read(0, buffer, 128, 0);
246 if(ret == 0) break;
247 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
248 sout | "User wrote '" | "" | nonl;
249 write(sout, buffer, ret - 1);
250 sout | "'";
251 }
252
253 sout | "Shutdown received";
254 }
255
256 sout | "Notifying connections..." | nonl; flush( sout );
257 for(i; options.clopts.nworkers) {
258 workers[i].done = true;
259 }
260 sout | "done";
261
262 sout | "Shutting down socket..." | nonl; flush( sout );
263 int ret = shutdown( server_fd, SHUT_RD );
264 if( ret < 0 ) {
265 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
266 }
267 sout | "done";
268
269 //===================
270 // Close Socket
271 sout | "Closing Socket..." | nonl; flush( sout );
272 ret = close( server_fd );
273 if(ret < 0) {
274 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
275 }
276 sout | "done";
277
278 sout | "Stopping connection threads..." | nonl; flush( sout );
279 adelete(workers);
280 }
281 sout | "done";
282
283 sout | "Stopping protocol threads..." | nonl; flush( sout );
284 deinit_protocol();
285 sout | "done";
286
287 sout | "Stopping processors/clusters..." | nonl; flush( sout );
288 }
289 sout | "done";
290
291 sout | "Closing splice fds..." | nonl; flush( sout );
292 for(i; pipe_cnt) {
293 ret = close( fds[pipe_off + i] );
294 if(ret < 0) {
295 abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );
296 }
297 }
298 free(fds);
299 sout | "done";
300
301 sout | "Stopping processors..." | nonl; flush( sout );
302 }
303 sout | "done";
304
305 //===================
306 // Close Files
307 if( options.file_cache.path ) {
308 sout | "Closing open files..." | nonl; flush( sout );
309 close_cache();
310 sout | "done";
311 }
312}
Note: See TracBrowser for help on using the repository browser.