source: benchmark/io/http/worker.cfa@ 46ab782

ADT ast-experimental
Last change on this file since 46ab782 was 0f1336c, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Minor error handling improvement.

  • Property mode set to 100644
File size: 10.8 KB
RevLine 
[0aec496]1#include "worker.hfa"
2
3#include <errno.h>
4#include <stdio.h>
5#include <string.h>
[c82af9f]6#include <unistd.h>
[0aec496]7
[8c43d05]8#include <fstream.hfa>
[0aec496]9#include <iofwd.hfa>
[7f0ac12]10#include <mutex_stmt.hfa>
[0aec496]11
12#include "options.hfa"
13#include "protocol.hfa"
14#include "filecache.hfa"
15
[32d1383]16static const unsigned long long period = 5_000_000;
[3f95dab]17
[0aec496]18//=============================================================================================
[7f0ac12]19// Generic connection handling
[0aec496]20//=============================================================================================
[137974ae]21static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f, unsigned long long & last ) {
[7f0ac12]22 REQUEST:
23 for() {
24 bool closed;
25 HttpCode code;
26 const char * file;
27 size_t name_size;
28
29 // Read the http request
[10ba012]30 if( options.log ) mutex(sout) sout | "=== Reading request ===";
[7f0ac12]31 [code, closed, file, name_size] = http_read(fd, buffer, len, f);
32 f = 0p;
33
34 // if we are done, break out of the loop
[32d1383]35 if( closed ) {
36 if( code != OK200 ) this.stats.sendfile.error++;
37 break REQUEST;
38 }
[7f0ac12]39
40 // If this wasn't a request retrun 400
41 if( code != OK200 ) {
[32d1383]42 abort | "=== Invalid Request :" | code_val(code) | "===";
43 // answer_error(fd, code);
44 // continue REQUEST;
[7f0ac12]45 }
[ef3c383]46
[32d1383]47 // if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
48 // if( options.log ) mutex(sout) sout | "=== Request for /plaintext ===";
[7f0ac12]49
[32d1383]50 // int ret = answer_plaintext(fd);
51 // if( ret == -ECONNRESET ) { this.stats.sendfile.error++; break REQUEST; }
[7f0ac12]52
[32d1383]53 // if( options.log ) mutex(sout) sout | "=== Answer sent ===";
54 // continue REQUEST;
55 // }
[7f0ac12]56
[32d1383]57 // if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
58 // if( options.log ) mutex(sout) sout | "=== Request for /ping ===";
[7f0ac12]59
[32d1383]60 // // Send the header
61 // int ret = answer_empty(fd);
62 // if( ret == -ECONNRESET ) { this.stats.sendfile.error++; break REQUEST; }
[7f0ac12]63
[32d1383]64 // if( options.log ) mutex(sout) sout | "=== Answer sent ===";
65 // continue REQUEST;
66 // }
[7f0ac12]67
68 if( options.log ) {
69 sout | "=== Request for file " | nonl;
70 write(sout, file, name_size);
71 sout | " ===";
72 }
73
74 if( !options.file_cache.path ) {
[32d1383]75 // if( options.log ) {
76 serr | "=== File Not Found (" | nonl;
77 write(serr, file, name_size);
78 serr | ") ===";
79 abort();
80 // }
81 // answer_error(fd, E405);
82 // continue REQUEST;
[7f0ac12]83 }
84
85 // Get the fd from the file cache
86 int ans_fd;
87 size_t count;
88 [ans_fd, count] = get_file( file, name_size );
89
90 // If we can't find the file, return 404
91 if( ans_fd < 0 ) {
[32d1383]92 // if( options.log ) {
93 serr | "=== File Not Found 2 (" | nonl;
94 write(serr, file, name_size);
95 serr | ") ===";
96 abort();
97 // }
98 // answer_error(fd, E404);
99 // continue REQUEST;
[7f0ac12]100 }
101
102 // Send the desired file
103 int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
[3f95dab]104 if(ret < 0) {
[32d1383]105 if( ret == -ECONNABORTED ) { this.stats.sendfile.error++; break REQUEST; }
106 if( ret == -ECONNRESET ) { this.stats.sendfile.error++; break REQUEST; }
107 if( ret == -EPIPE ) { this.stats.sendfile.error++; break REQUEST; }
108 if( ret == -EBADF ) { this.stats.sendfile.error++; break REQUEST; }
109 abort( "answer sendfile error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
[3f95dab]110 }
[7f0ac12]111
[10ba012]112 if( options.log ) mutex(sout) sout | "=== Answer sent ===";
[ef3c383]113 }
[137974ae]114
[32d1383]115 this.stats.sendfile.close++;
116
[137974ae]117 if (stats_thrd) {
[32d1383]118 // unsigned long long next = rdtscl();
119 // if(next > (last + period)) {
[10ba012]120 if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
[137974ae]121 push(this.stats.sendfile, stats_thrd->stats.send);
122 unlock(stats_thrd->stats.lock);
[32d1383]123 // last = next;
[137974ae]124 }
[32d1383]125 // }
[137974ae]126 }
[481ee28]127}
128
[7f0ac12]129//=============================================================================================
130// Self Accepting Worker Thread
131//=============================================================================================
132void ?{}( AcceptWorker & this ) {
[329e26a]133 size_t cli = rand() % options.clopts.cltr_cnt;
134 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
135 options.clopts.thrd_cnt[cli]++;
[7f0ac12]136 this.done = false;
[0aec496]137}
138
[7f0ac12]139void main( AcceptWorker & this ) {
[e235429]140 park();
[137974ae]141 unsigned long long last = rdtscl();
[7f0ac12]142 /* paranoid */ assert( this.conn.pipe[0] != -1 );
143 /* paranoid */ assert( this.conn.pipe[1] != -1 );
[e95a117]144 for() {
[10ba012]145 if( options.log ) mutex(sout) sout | "=== Accepting connection ===";
[7f0ac12]146 int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY );
[0f1336c]147 if(fd <= 0) {
[8e3034d]148 if( errno == ECONNABORTED ) break;
[ee59ede]149 if( this.done && (errno == EINVAL || errno == EBADF) ) break;
[0f1336c]150 abort( "accept error %d: (%d) %s\n", fd, (int)errno, strerror(errno) );
[8e3034d]151 }
[4f762d3]152 if(this.done) break;
[e95a117]153
[32d1383]154 this.stats.accepts++;
155 if (stats_thrd && try_lock(stats_thrd->stats.lock)) {
156 push(this.stats, stats_thrd->stats.accpt);
157 unlock(stats_thrd->stats.lock);
158 }
159
[10ba012]160 if( options.log ) mutex(sout) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
[7f0ac12]161 size_t len = options.socket.buflen;
162 char buffer[len];
[137974ae]163 handle_connection( this.conn, fd, buffer, len, 0p, last );
[3f95dab]164 this.conn.stats.sendfile.maxfd = max(this.conn.stats.sendfile.maxfd, fd);
[0aec496]165
[10ba012]166 if( options.log ) mutex(sout) sout | "=== Connection closed ===";
[7f0ac12]167 }
168}
[0aec496]169
[561dd26]170
[7f0ac12]171//=============================================================================================
172// Channel Worker Thread
173//=============================================================================================
174void ?{}( ChannelWorker & this ) {
[329e26a]175 size_t cli = rand() % options.clopts.cltr_cnt;
176 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
177 options.clopts.thrd_cnt[cli]++;
[7f0ac12]178 this.done = false;
179}
[561dd26]180
[7f0ac12]181void main( ChannelWorker & this ) {
182 park();
[137974ae]183 unsigned long long last = rdtscl();
[7f0ac12]184 /* paranoid */ assert( this.conn.pipe[0] != -1 );
185 /* paranoid */ assert( this.conn.pipe[1] != -1 );
[3f95dab]186 this.conn.stats.sendfile.maxfd = max(this.conn.pipe[0], this.conn.pipe[1]);
[32d1383]187 // this.conn.stats.sendfile.maxfd = 0;
[7f0ac12]188 for() {
189 size_t len = options.socket.buflen;
190 char buffer[len];
191 PendingRead p;
[10ba012]192 p.next = 0p;
[7f0ac12]193 p.in.buf = (void*)buffer;
194 p.in.len = len;
195 push(*this.queue, &p);
[561dd26]196
[10ba012]197 if( options.log ) mutex(sout) sout | "=== Waiting new connection ===";
[137974ae]198 handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last );
[3f95dab]199 if(this.done) break;
200 this.conn.stats.sendfile.maxfd = max(this.conn.stats.sendfile.maxfd, p.out.fd);
201 this.conn.stats.sendfile.close++;
[561dd26]202
[10ba012]203 if( options.log ) mutex(sout) sout | "=== Connection closed ===";
[7f0ac12]204 }
[3f95dab]205
206 lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2);
207 push(this.conn.stats.sendfile, stats_thrd->stats.send);
208 unlock(stats_thrd->stats.lock);
[7f0ac12]209}
[0aec496]210
[7f0ac12]211extern "C" {
212extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
213}
[97748ee]214
[329e26a]215void ?{}( Acceptor & this, int cli ) {
216 ((thread&)this){ "Server Acceptor Thread", *options.clopts.instance[cli], 64000 };
217 options.clopts.thrd_cnt[cli]++;
[7f0ac12]218 this.done = false;
219}
[b57db73]220
[07997cd]221static inline __s32 get_res( io_future_t & this ) {
222 if( this.result < 0 ) {{
223 errno = -this.result;
224 return -1;
225 }}
226 return this.result;
227}
228
229static inline void push_connection( Acceptor & this, int fd ) {
[3f95dab]230 this.stats.accepts++;
[07997cd]231 PendingRead * p = 0p;
232 for() {
233 if(this.done) return;
234 p = pop(*this.queue);
235 if(p) break;
[32d1383]236 // abort( "Too few threads" );
[07997cd]237 yield();
238 this.stats.creates++;
239 };
240
241 p->out.fd = fd;
242 async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY);
243}
244
245// #define ACCEPT_SPIN
[3f95dab]246#define ACCEPT_ONE
247// #define ACCEPT_MANY
[c25338d]248
[7f0ac12]249void main( Acceptor & this ) {
250 park();
[137974ae]251 unsigned long long last = rdtscl();
[c25338d]252
253#if defined(ACCEPT_SPIN)
[7f0ac12]254 if( options.log ) sout | "=== Accepting connection ===";
255 for() {
256 int fd = accept4(this.sockfd, this.[addr, addrlen, flags]);
257 if(fd < 0) {
258 if( errno == EWOULDBLOCK) {
[137974ae]259 this.stats.eagains++;
[7f0ac12]260 yield();
261 continue;
[97748ee]262 }
[7f0ac12]263 if( errno == ECONNABORTED ) break;
264 if( this.done && (errno == EINVAL || errno == EBADF) ) break;
265 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
266 }
[137974ae]267
[7f0ac12]268 if(this.done) return;
[97748ee]269
[7f0ac12]270 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
[97748ee]271
[07997cd]272 if(fd) push_connection(this, fd);
[ee59ede]273
[137974ae]274 if (stats_thrd) {
275 unsigned long long next = rdtscl();
[3f95dab]276 if(next > (last + period)) {
277 if(try_lock(stats_thrd->stats.lock)) {
278 push(this.stats, stats_thrd->stats.accpt);
279 unlock(stats_thrd->stats.lock);
280 last = next;
281 }
282 }
283 }
284
285 if( options.log ) sout | "=== Accepting connection ===";
286 }
287
288#elif defined(ACCEPT_ONE)
289 if( options.log ) sout | "=== Accepting connection ===";
290 for() {
291 int fd = cfa_accept4(this.sockfd, this.[addr, addrlen, flags], 0);
292 if(fd < 0) {
293 if( errno == ECONNABORTED ) break;
294 if( this.done && (errno == EINVAL || errno == EBADF) ) break;
295 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
296 }
297
298 if(this.done) return;
299
300 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
301
302 if(fd) push_connection(this, fd);
303
304 if (stats_thrd) {
[32d1383]305 // unsigned long long next = rdtscl();
306 // if(next > (last + period)) {
[137974ae]307 if(try_lock(stats_thrd->stats.lock)) {
308 push(this.stats, stats_thrd->stats.accpt);
309 unlock(stats_thrd->stats.lock);
[32d1383]310 // last = next;
[137974ae]311 }
[32d1383]312 // }
[137974ae]313 }
314
[7f0ac12]315 if( options.log ) sout | "=== Accepting connection ===";
[0aec496]316 }
[c25338d]317
[07997cd]318#elif defined(ACCEPT_MANY)
[c25338d]319 const int nacc = 10;
320 io_future_t results[nacc];
321
322 for(i; nacc) {
323 io_future_t & res = results[i];
324 reset(res);
325 /* paranoid */ assert(!available(res));
326 if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
327 async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
328 }
329
330 for() {
331 if (stats_thrd) {
332 unsigned long long next = rdtscl();
[3f95dab]333 if(next > (last + period)) {
[c25338d]334 if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
335 push(this.stats, stats_thrd->stats.accpt);
336 unlock(stats_thrd->stats.lock);
337 last = next;
338 }
339 }
340 }
341
342 for(i; nacc) {
343 io_future_t & res = results[i];
344 if(available(res)) {
345 if( options.log ) mutex(sout) sout | "=== Accept no " | i | "completed with result" | res.result | "===";
346 int fd = get_res(res);
347 reset(res);
348 if(fd < 0) {
349 if( errno == ECONNABORTED ) continue;
350 if( this.done && (errno == EINVAL || errno == EBADF) ) continue;
351 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
352 }
353 push_connection( this, fd );
354
355 /* paranoid */ assert(!available(res));
356 if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
357 async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
358 }
359 }
360 if(this.done) return;
361
362 if( options.log ) mutex(sout) sout | "=== Waiting for any accept ===";
363 this.stats.eagains++;
364 wait_any(results, nacc);
365
366 if( options.log ) mutex(sout) {
367 sout | "=== Acceptor wake-up ===";
368 for(i; nacc) {
369 io_future_t & res = results[i];
370 sout | i | "available:" | available(res);
371 }
372 }
373
374 }
375
376 for(i; nacc) {
377 wait(results[i]);
378 }
379#else
380#error no accept algorithm specified
381#endif
[3f95dab]382 lock(stats_thrd->stats.lock);
383 push(this.stats, stats_thrd->stats.accpt);
384 unlock(stats_thrd->stats.lock);
[7f0ac12]385}
Note: See TracBrowser for help on using the repository browser.