source: benchmark/io/http/main.cfa@ a38bbbc

ADT ast-experimental enum pthread-emulation qualifiedEnum
Last change on this file since a38bbbc was 6117fc0, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Httpforall now pins kernel threads

  • Property mode set to 100644
File size: 7.8 KB
RevLine 
[1e8b4b49]1#define _GNU_SOURCE
[0aec496]2
3#include <errno.h>
4#include <stdio.h>
5#include <string.h>
6#include <unistd.h>
7extern "C" {
[6117fc0]8 #include <sched.h>
[c2df3031]9 #include <signal.h>
[0aec496]10 #include <sys/socket.h>
11 #include <netinet/in.h>
12}
13
[8c43d05]14#include <fstream.hfa>
[0aec496]15#include <kernel.hfa>
[153dc387]16#include <iofwd.hfa>
[0aec496]17#include <stats.hfa>
[d11d6eb]18#include <time.hfa>
[0aec496]19#include <thread.hfa>
20
21#include "filecache.hfa"
22#include "options.hfa"
23#include "worker.hfa"
24
[d11d6eb]25extern void register_fixed_files( cluster &, int *, unsigned count );
26
27Duration default_preemption() {
28 return 0;
29}
30
[153dc387]31//=============================================================================================
32// Stats Printer
33//============================================================================================='
34
35thread StatsPrinter {};
36
[348f81d5]37void ?{}( StatsPrinter & this, cluster & cl ) {
38 ((thread&)this){ "Stats Printer Thread", cl };
[153dc387]39}
40
[2cd784a]41void ^?{}( StatsPrinter & mutex this ) {}
42
[153dc387]43void main(StatsPrinter & this) {
44 LOOP: for() {
45 waitfor( ^?{} : this) {
46 break LOOP;
47 }
48 or else {}
49
50 sleep(10`s);
51
[348f81d5]52 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
53 }
54}
55
56//=============================================================================================
57// Globals
58//=============================================================================================
59struct ServerCluster {
60 cluster self;
61 processor * procs;
[2cd784a]62 // io_context * ctxs;
[348f81d5]63 StatsPrinter * prnt;
64
65};
66
67void ?{}( ServerCluster & this ) {
68 (this.self){ "Server Cluster", options.clopts.params };
69
[6117fc0]70 cpu_set_t fullset;
71 CPU_ZERO(&fullset);
72 int ret = sched_getaffinity(getpid(), sizeof(fullset), &fullset);
73 if( ret != 0 ) abort | "sched_getaffinity failed with" | errno | strerror( errno );
74 int cnt = CPU_COUNT(&fullset);
75
[348f81d5]76 this.procs = alloc(options.clopts.nprocs);
77 for(i; options.clopts.nprocs) {
78 (this.procs[i]){ "Benchmark Processor", this.self };
79
[6117fc0]80 int c = 0;
81 int n = 1 + (i % cnt);
82 for(int j = 0; j < CPU_SETSIZE; j++) {
83 if(CPU_ISSET(j, &fullset)) n--;
84 if(n == 0) {
85 c = j;
86 break;
87 }
88 }
89 cpu_set_t localset;
90 CPU_ZERO(&localset);
91 CPU_SET(c, &localset);
92 ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
93 if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
94
[348f81d5]95 #if !defined(__CFA_NO_STATISTICS__)
96 if( options.clopts.procstats ) {
97 print_stats_at_exit( *this.procs, this.self.print_stats );
98 }
99 if( options.clopts.viewhalts ) {
100 print_halts( *this.procs );
101 }
102 #endif
103 }
104
[2cd784a]105 if(options.stats) {
106 this.prnt = alloc();
107 (*this.prnt){ this.self };
108 } else {
109 this.prnt = 0p;
[348f81d5]110 }
111
112 #if !defined(__CFA_NO_STATISTICS__)
113 print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
114 #endif
115
116 options.clopts.instance[options.clopts.cltr_cnt] = &this.self;
117 options.clopts.cltr_cnt++;
[153dc387]118}
119
[348f81d5]120void ^?{}( ServerCluster & this ) {
[2cd784a]121 delete(this.prnt);
[348f81d5]122
123 for(i; options.clopts.nprocs) {
124 ^(this.procs[i]){};
125 }
126 free(this.procs);
127
128 ^(this.self){};
129}
130
131extern void init_protocol(void);
132extern void deinit_protocol(void);
133
[0aec496]134//=============================================================================================
135// Main
136//============================================================================================='
137int main( int argc, char * argv[] ) {
[c2df3031]138 __sighandler_t s = 1p;
139 signal(SIGPIPE, s);
140
[0aec496]141 //===================
142 // Parse args
[b57db73]143 parse_options(argc, argv);
[0aec496]144
145 //===================
146 // Open Files
[b57db73]147 if( options.file_cache.path ) {
148 sout | "Filling cache from" | options.file_cache.path;
149 fill_cache( options.file_cache.path );
150 }
[0aec496]151
152 //===================
153 // Open Socket
[8c43d05]154 sout | getpid() | ": Listening on port" | options.socket.port;
[0aec496]155 int server_fd = socket(AF_INET, SOCK_STREAM, 0);
156 if(server_fd < 0) {
157 abort( "socket error: (%d) %s\n", (int)errno, strerror(errno) );
158 }
159
[7f389a5c]160 int ret = 0;
[0aec496]161 struct sockaddr_in address;
162 int addrlen = sizeof(address);
163 memset( (char *)&address, '\0' );
164 address.sin_family = AF_INET;
165 address.sin_addr.s_addr = htonl(INADDR_ANY);
[2ecbd7b]166 address.sin_port = htons( options.socket.port );
[0aec496]167
[ee913e0a]168 int waited = 0;
169 for() {
[1e8b4b49]170 int sockfd = server_fd;
171 __CONST_SOCKADDR_ARG addr;
172 addr.__sockaddr__ = (struct sockaddr *)&address;
173 socklen_t addrlen = sizeof(address);
174 ret = bind( sockfd, addr, addrlen );
[ee913e0a]175 if(ret < 0) {
[c3ee5f3]176 if(errno == EADDRINUSE) {
[ee913e0a]177 if(waited == 0) {
[3a0ddb6]178 if(!options.interactive) abort | "Port already in use in non-interactive mode. Aborting";
[8c43d05]179 sout | "Waiting for port";
[ee913e0a]180 } else {
[8c43d05]181 sout | "\r" | waited | nonl;
182 flush( sout );
[ee913e0a]183 }
184 waited ++;
185 sleep( 1`s );
186 continue;
187 }
188 abort( "bind error: (%d) %s\n", (int)errno, strerror(errno) );
189 }
190 break;
[0aec496]191 }
192
[2ecbd7b]193 ret = listen( server_fd, options.socket.backlog );
[0aec496]194 if(ret < 0) {
195 abort( "listen error: (%d) %s\n", (int)errno, strerror(errno) );
196 }
197
198 //===================
199 // Run Server Cluster
200 {
[d9c2284]201 int pipe_cnt = options.clopts.nworkers * 2;
202 int pipe_off;
203 int * fds;
204 [fds, pipe_off] = filefds( pipe_cnt );
205 for(i; 0 ~ pipe_cnt ~ 2) {
206 int ret = pipe(&fds[pipe_off + i]);
207 if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); }
208 }
209
[4f762d3]210 // if(options.file_cache.path && options.file_cache.fixed_fds) {
211 // register_fixed_files(cl, fds, pipe_off);
212 // }
[d11d6eb]213
[0aec496]214 {
[348f81d5]215 ServerCluster cl[options.clopts.nclusters];
[ece0e80]216
217 init_protocol();
[0aec496]218 {
[3eb540fb]219 Worker * workers = anew(options.clopts.nworkers);
[d9c2284]220 for(i; options.clopts.nworkers) {
[d11d6eb]221 // if( options.file_cache.fixed_fds ) {
222 // workers[i].pipe[0] = pipe_off + (i * 2) + 0;
223 // workers[i].pipe[1] = pipe_off + (i * 2) + 1;
224 // }
225 // else
226 {
[d9c2284]227 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0];
228 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1];
[8e3034d]229 workers[i].sockfd = server_fd;
230 workers[i].addr = (struct sockaddr *)&address;
231 workers[i].addrlen = (socklen_t*)&addrlen;
232 workers[i].flags = 0;
[d9c2284]233 }
[e235429]234 unpark( workers[i] );
[d9c2284]235 }
[348f81d5]236 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
237 for(i; options.clopts.nclusters) {
238 sout | options.clopts.thrd_cnt[i] | nonl;
239 }
240 sout | nl;
[4087baf]241 if(!options.interactive) park();
[0aec496]242 {
[e95a117]243 char buffer[128];
[2cd784a]244 for() {
245 int ret = cfa_read(0, buffer, 128, 0);
246 if(ret == 0) break;
[153dc387]247 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
[2cd784a]248 sout | "User wrote '" | "" | nonl;
249 write(sout, buffer, ret - 1);
250 sout | "'";
[e95a117]251 }
252
[8c43d05]253 sout | "Shutdown received";
[0aec496]254 }
[ece0e80]255
[b57db73]256 sout | "Notifying connections..." | nonl; flush( sout );
[ece0e80]257 for(i; options.clopts.nworkers) {
[481ee28]258 workers[i].done = true;
[ece0e80]259 }
[b57db73]260 sout | "done";
[ece0e80]261
[b57db73]262 sout | "Shutting down socket..." | nonl; flush( sout );
[ece0e80]263 int ret = shutdown( server_fd, SHUT_RD );
[b57db73]264 if( ret < 0 ) {
265 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
266 }
267 sout | "done";
[ece0e80]268
269 //===================
270 // Close Socket
[b57db73]271 sout | "Closing Socket..." | nonl; flush( sout );
[ece0e80]272 ret = close( server_fd );
273 if(ret < 0) {
274 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
275 }
[0197418]276 sout | "done";
277
[b57db73]278 sout | "Stopping connection threads..." | nonl; flush( sout );
[3eb540fb]279 adelete(workers);
[0aec496]280 }
[8c43d05]281 sout | "done";
[ece0e80]282
[b57db73]283 sout | "Stopping protocol threads..." | nonl; flush( sout );
[ece0e80]284 deinit_protocol();
[8c43d05]285 sout | "done";
286
[348f81d5]287 sout | "Stopping processors/clusters..." | nonl; flush( sout );
[0aec496]288 }
[8c43d05]289 sout | "done";
[d9c2284]290
[b57db73]291 sout | "Closing splice fds..." | nonl; flush( sout );
[d9c2284]292 for(i; pipe_cnt) {
293 ret = close( fds[pipe_off + i] );
294 if(ret < 0) {
295 abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );
296 }
297 }
298 free(fds);
[8c43d05]299 sout | "done";
[c3ee5f3]300
[b57db73]301 sout | "Stopping processors..." | nonl; flush( sout );
[0aec496]302 }
[8c43d05]303 sout | "done";
[0aec496]304
305 //===================
306 // Close Files
[b57db73]307 if( options.file_cache.path ) {
308 sout | "Closing open files..." | nonl; flush( sout );
309 close_cache();
310 sout | "done";
311 }
[0aec496]312}
Note: See TracBrowser for help on using the repository browser.