source: benchmark/io/http/worker.cfa@ 6b00c53

ADT ast-experimental pthread-emulation
Last change on this file since 6b00c53 was 3f95dab, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Committing hopefully last version of the webserver

  • Property mode set to 100644
File size: 10.2 KB
Line 
1#include "worker.hfa"
2
3#include <errno.h>
4#include <stdio.h>
5#include <string.h>
6#include <unistd.h>
7
8#include <fstream.hfa>
9#include <iofwd.hfa>
10#include <mutex_stmt.hfa>
11
12#include "options.hfa"
13#include "protocol.hfa"
14#include "filecache.hfa"
15
16static const unsigned long long period = 50_000_000;
17
18//=============================================================================================
19// Generic connection handling
20//=============================================================================================
21static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f, unsigned long long & last ) {
22 REQUEST:
23 for() {
24 bool closed;
25 HttpCode code;
26 const char * file;
27 size_t name_size;
28
29 // Read the http request
30 if( options.log ) mutex(sout) sout | "=== Reading request ===";
31 [code, closed, file, name_size] = http_read(fd, buffer, len, f);
32 f = 0p;
33
34 // if we are done, break out of the loop
35 if( closed ) break REQUEST;
36
37 // If this wasn't a request retrun 400
38 if( code != OK200 ) {
39 sout | "=== Invalid Request :" | code_val(code) | "===";
40 answer_error(fd, code);
41 continue REQUEST;
42 }
43
44 if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
45 if( options.log ) mutex(sout) sout | "=== Request for /plaintext ===";
46
47 int ret = answer_plaintext(fd);
48 if( ret == -ECONNRESET ) break REQUEST;
49
50 if( options.log ) mutex(sout) sout | "=== Answer sent ===";
51 continue REQUEST;
52 }
53
54 if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
55 if( options.log ) mutex(sout) sout | "=== Request for /ping ===";
56
57 // Send the header
58 int ret = answer_empty(fd);
59 if( ret == -ECONNRESET ) break REQUEST;
60
61 if( options.log ) mutex(sout) sout | "=== Answer sent ===";
62 continue REQUEST;
63 }
64
65 if( options.log ) {
66 sout | "=== Request for file " | nonl;
67 write(sout, file, name_size);
68 sout | " ===";
69 }
70
71 if( !options.file_cache.path ) {
72 if( options.log ) {
73 sout | "=== File Not Found (" | nonl;
74 write(sout, file, name_size);
75 sout | ") ===";
76 }
77 answer_error(fd, E405);
78 continue REQUEST;
79 }
80
81 // Get the fd from the file cache
82 int ans_fd;
83 size_t count;
84 [ans_fd, count] = get_file( file, name_size );
85
86 // If we can't find the file, return 404
87 if( ans_fd < 0 ) {
88 if( options.log ) {
89 sout | "=== File Not Found (" | nonl;
90 write(sout, file, name_size);
91 sout | ") ===";
92 }
93 answer_error(fd, E404);
94 continue REQUEST;
95 }
96
97 // Send the desired file
98 int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
99 if(ret < 0) {
100 if( ret == -ECONNABORTED ) break REQUEST;
101 if( ret == -ECONNRESET ) break REQUEST;
102 if( ret == -EPIPE ) break REQUEST;
103 abort( "sendfile error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
104 }
105
106 if( options.log ) mutex(sout) sout | "=== Answer sent ===";
107 }
108
109 if (stats_thrd) {
110 unsigned long long next = rdtscl();
111 if(next > (last + period)) {
112 if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
113 push(this.stats.sendfile, stats_thrd->stats.send);
114 unlock(stats_thrd->stats.lock);
115 last = next;
116 }
117 }
118 }
119}
120
121//=============================================================================================
122// Self Accepting Worker Thread
123//=============================================================================================
124void ?{}( AcceptWorker & this ) {
125 size_t cli = rand() % options.clopts.cltr_cnt;
126 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
127 options.clopts.thrd_cnt[cli]++;
128 this.done = false;
129}
130
131void main( AcceptWorker & this ) {
132 park();
133 unsigned long long last = rdtscl();
134 /* paranoid */ assert( this.conn.pipe[0] != -1 );
135 /* paranoid */ assert( this.conn.pipe[1] != -1 );
136 for() {
137 if( options.log ) mutex(sout) sout | "=== Accepting connection ===";
138 int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY );
139 if(fd < 0) {
140 if( errno == ECONNABORTED ) break;
141 if( this.done && (errno == EINVAL || errno == EBADF) ) break;
142 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
143 }
144 if(this.done) break;
145
146 if( options.log ) mutex(sout) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
147 size_t len = options.socket.buflen;
148 char buffer[len];
149 handle_connection( this.conn, fd, buffer, len, 0p, last );
150 this.conn.stats.sendfile.maxfd = max(this.conn.stats.sendfile.maxfd, fd);
151 this.conn.stats.sendfile.close++;
152
153 if( options.log ) mutex(sout) sout | "=== Connection closed ===";
154 }
155}
156
157
158//=============================================================================================
159// Channel Worker Thread
160//=============================================================================================
161void ?{}( ChannelWorker & this ) {
162 size_t cli = rand() % options.clopts.cltr_cnt;
163 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
164 options.clopts.thrd_cnt[cli]++;
165 this.done = false;
166}
167
168void main( ChannelWorker & this ) {
169 park();
170 unsigned long long last = rdtscl();
171 /* paranoid */ assert( this.conn.pipe[0] != -1 );
172 /* paranoid */ assert( this.conn.pipe[1] != -1 );
173 this.conn.stats.sendfile.maxfd = max(this.conn.pipe[0], this.conn.pipe[1]);
174 for() {
175 size_t len = options.socket.buflen;
176 char buffer[len];
177 PendingRead p;
178 p.next = 0p;
179 p.in.buf = (void*)buffer;
180 p.in.len = len;
181 push(*this.queue, &p);
182
183 if( options.log ) mutex(sout) sout | "=== Waiting new connection ===";
184 handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last );
185 if(this.done) break;
186 this.conn.stats.sendfile.maxfd = max(this.conn.stats.sendfile.maxfd, p.out.fd);
187 this.conn.stats.sendfile.close++;
188
189 if( options.log ) mutex(sout) sout | "=== Connection closed ===";
190 }
191
192 lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2);
193 push(this.conn.stats.sendfile, stats_thrd->stats.send);
194 unlock(stats_thrd->stats.lock);
195}
196
197extern "C" {
198extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
199}
200
201void ?{}( Acceptor & this, int cli ) {
202 ((thread&)this){ "Server Acceptor Thread", *options.clopts.instance[cli], 64000 };
203 options.clopts.thrd_cnt[cli]++;
204 this.done = false;
205}
206
207static inline __s32 get_res( io_future_t & this ) {
208 if( this.result < 0 ) {{
209 errno = -this.result;
210 return -1;
211 }}
212 return this.result;
213}
214
215static inline void push_connection( Acceptor & this, int fd ) {
216 this.stats.accepts++;
217 PendingRead * p = 0p;
218 for() {
219 if(this.done) return;
220 p = pop(*this.queue);
221 if(p) break;
222 yield();
223 this.stats.creates++;
224 };
225
226 p->out.fd = fd;
227 async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY);
228}
229
230// #define ACCEPT_SPIN
231#define ACCEPT_ONE
232// #define ACCEPT_MANY
233
234void main( Acceptor & this ) {
235 park();
236 unsigned long long last = rdtscl();
237
238#if defined(ACCEPT_SPIN)
239 if( options.log ) sout | "=== Accepting connection ===";
240 for() {
241 int fd = accept4(this.sockfd, this.[addr, addrlen, flags]);
242 if(fd < 0) {
243 if( errno == EWOULDBLOCK) {
244 this.stats.eagains++;
245 yield();
246 continue;
247 }
248 if( errno == ECONNABORTED ) break;
249 if( this.done && (errno == EINVAL || errno == EBADF) ) break;
250 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
251 }
252
253 if(this.done) return;
254
255 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
256
257 if(fd) push_connection(this, fd);
258
259 if (stats_thrd) {
260 unsigned long long next = rdtscl();
261 if(next > (last + period)) {
262 if(try_lock(stats_thrd->stats.lock)) {
263 push(this.stats, stats_thrd->stats.accpt);
264 unlock(stats_thrd->stats.lock);
265 last = next;
266 }
267 }
268 }
269
270 if( options.log ) sout | "=== Accepting connection ===";
271 }
272
273#elif defined(ACCEPT_ONE)
274 if( options.log ) sout | "=== Accepting connection ===";
275 for() {
276 int fd = cfa_accept4(this.sockfd, this.[addr, addrlen, flags], 0);
277 if(fd < 0) {
278 if( errno == ECONNABORTED ) break;
279 if( this.done && (errno == EINVAL || errno == EBADF) ) break;
280 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
281 }
282
283 if(this.done) return;
284
285 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
286
287 if(fd) push_connection(this, fd);
288
289 if (stats_thrd) {
290 unsigned long long next = rdtscl();
291 if(next > (last + period)) {
292 if(try_lock(stats_thrd->stats.lock)) {
293 push(this.stats, stats_thrd->stats.accpt);
294 unlock(stats_thrd->stats.lock);
295 last = next;
296 }
297 }
298 }
299
300 if( options.log ) sout | "=== Accepting connection ===";
301 }
302
303#elif defined(ACCEPT_MANY)
304 const int nacc = 10;
305 io_future_t results[nacc];
306
307 for(i; nacc) {
308 io_future_t & res = results[i];
309 reset(res);
310 /* paranoid */ assert(!available(res));
311 if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
312 async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
313 }
314
315 for() {
316 if (stats_thrd) {
317 unsigned long long next = rdtscl();
318 if(next > (last + period)) {
319 if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
320 push(this.stats, stats_thrd->stats.accpt);
321 unlock(stats_thrd->stats.lock);
322 last = next;
323 }
324 }
325 }
326
327 for(i; nacc) {
328 io_future_t & res = results[i];
329 if(available(res)) {
330 if( options.log ) mutex(sout) sout | "=== Accept no " | i | "completed with result" | res.result | "===";
331 int fd = get_res(res);
332 reset(res);
333 if(fd < 0) {
334 if( errno == ECONNABORTED ) continue;
335 if( this.done && (errno == EINVAL || errno == EBADF) ) continue;
336 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
337 }
338 push_connection( this, fd );
339
340 /* paranoid */ assert(!available(res));
341 if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
342 async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
343 }
344 }
345 if(this.done) return;
346
347 if( options.log ) mutex(sout) sout | "=== Waiting for any accept ===";
348 this.stats.eagains++;
349 wait_any(results, nacc);
350
351 if( options.log ) mutex(sout) {
352 sout | "=== Acceptor wake-up ===";
353 for(i; nacc) {
354 io_future_t & res = results[i];
355 sout | i | "available:" | available(res);
356 }
357 }
358
359 }
360
361 for(i; nacc) {
362 wait(results[i]);
363 }
364#else
365#error no accept algorithm specified
366#endif
367 lock(stats_thrd->stats.lock);
368 push(this.stats, stats_thrd->stats.accpt);
369 unlock(stats_thrd->stats.lock);
370}
Note: See TracBrowser for help on using the repository browser.