source: benchmark/io/http/worker.cfa@ c4072d8e

ADT ast-experimental pthread-emulation qualifiedEnum
Last change on this file since c4072d8e was 07997cd, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Fixed errors with the accept many version

  • Property mode set to 100644
File size: 8.6 KB
RevLine 
[0aec496]1#include "worker.hfa"
2
3#include <errno.h>
4#include <stdio.h>
5#include <string.h>
[c82af9f]6#include <unistd.h>
[0aec496]7
[8c43d05]8#include <fstream.hfa>
[0aec496]9#include <iofwd.hfa>
[7f0ac12]10#include <mutex_stmt.hfa>
[0aec496]11
12#include "options.hfa"
13#include "protocol.hfa"
14#include "filecache.hfa"
15
16//=============================================================================================
[7f0ac12]17// Generic connection handling
[0aec496]18//=============================================================================================
[137974ae]19static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f, unsigned long long & last ) {
[7f0ac12]20 REQUEST:
21 for() {
22 bool closed;
23 HttpCode code;
24 const char * file;
25 size_t name_size;
26
27 // Read the http request
[10ba012]28 if( options.log ) mutex(sout) sout | "=== Reading request ===";
[7f0ac12]29 [code, closed, file, name_size] = http_read(fd, buffer, len, f);
30 f = 0p;
31
32 // if we are done, break out of the loop
33 if( closed ) break REQUEST;
34
35 // If this wasn't a request retrun 400
36 if( code != OK200 ) {
37 sout | "=== Invalid Request :" | code_val(code) | "===";
38 answer_error(fd, code);
39 continue REQUEST;
40 }
[ef3c383]41
[7f0ac12]42 if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
[10ba012]43 if( options.log ) mutex(sout) sout | "=== Request for /plaintext ===";
[7f0ac12]44
45 int ret = answer_plaintext(fd);
46 if( ret == -ECONNRESET ) break REQUEST;
47
[10ba012]48 if( options.log ) mutex(sout) sout | "=== Answer sent ===";
[7f0ac12]49 continue REQUEST;
50 }
51
52 if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
[10ba012]53 if( options.log ) mutex(sout) sout | "=== Request for /ping ===";
[7f0ac12]54
55 // Send the header
56 int ret = answer_empty(fd);
57 if( ret == -ECONNRESET ) break REQUEST;
58
[10ba012]59 if( options.log ) mutex(sout) sout | "=== Answer sent ===";
[7f0ac12]60 continue REQUEST;
61 }
62
63 if( options.log ) {
64 sout | "=== Request for file " | nonl;
65 write(sout, file, name_size);
66 sout | " ===";
67 }
68
69 if( !options.file_cache.path ) {
70 if( options.log ) {
71 sout | "=== File Not Found (" | nonl;
72 write(sout, file, name_size);
73 sout | ") ===";
74 }
75 answer_error(fd, E405);
76 continue REQUEST;
77 }
78
79 // Get the fd from the file cache
80 int ans_fd;
81 size_t count;
82 [ans_fd, count] = get_file( file, name_size );
83
84 // If we can't find the file, return 404
85 if( ans_fd < 0 ) {
86 if( options.log ) {
87 sout | "=== File Not Found (" | nonl;
88 write(sout, file, name_size);
89 sout | ") ===";
90 }
91 answer_error(fd, E404);
92 continue REQUEST;
93 }
94
95 // Send the desired file
96 int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
97 if( ret == -ECONNRESET ) break REQUEST;
98
[10ba012]99 if( options.log ) mutex(sout) sout | "=== Answer sent ===";
[ef3c383]100 }
[137974ae]101
102 if (stats_thrd) {
103 unsigned long long next = rdtscl();
104 if(next > (last + 500000000)) {
[10ba012]105 if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
[137974ae]106 push(this.stats.sendfile, stats_thrd->stats.send);
107 unlock(stats_thrd->stats.lock);
108 last = next;
109 }
110 }
111 }
[481ee28]112}
113
[7f0ac12]114//=============================================================================================
115// Self Accepting Worker Thread
116//=============================================================================================
117void ?{}( AcceptWorker & this ) {
[329e26a]118 size_t cli = rand() % options.clopts.cltr_cnt;
119 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
120 options.clopts.thrd_cnt[cli]++;
[7f0ac12]121 this.done = false;
[0aec496]122}
123
[7f0ac12]124void main( AcceptWorker & this ) {
[e235429]125 park();
[137974ae]126 unsigned long long last = rdtscl();
[7f0ac12]127 /* paranoid */ assert( this.conn.pipe[0] != -1 );
128 /* paranoid */ assert( this.conn.pipe[1] != -1 );
[e95a117]129 for() {
[10ba012]130 if( options.log ) mutex(sout) sout | "=== Accepting connection ===";
[7f0ac12]131 int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY );
[8e3034d]132 if(fd < 0) {
133 if( errno == ECONNABORTED ) break;
[ee59ede]134 if( this.done && (errno == EINVAL || errno == EBADF) ) break;
[8e3034d]135 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
136 }
[4f762d3]137 if(this.done) break;
[e95a117]138
[10ba012]139 if( options.log ) mutex(sout) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
[7f0ac12]140 size_t len = options.socket.buflen;
141 char buffer[len];
[137974ae]142 handle_connection( this.conn, fd, buffer, len, 0p, last );
[0aec496]143
[10ba012]144 if( options.log ) mutex(sout) sout | "=== Connection closed ===";
[7f0ac12]145 }
146}
[0aec496]147
[561dd26]148
[7f0ac12]149//=============================================================================================
150// Channel Worker Thread
151//=============================================================================================
152void ?{}( ChannelWorker & this ) {
[329e26a]153 size_t cli = rand() % options.clopts.cltr_cnt;
154 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
155 options.clopts.thrd_cnt[cli]++;
[7f0ac12]156 this.done = false;
157}
[561dd26]158
[7f0ac12]159void main( ChannelWorker & this ) {
160 park();
[137974ae]161 unsigned long long last = rdtscl();
[7f0ac12]162 /* paranoid */ assert( this.conn.pipe[0] != -1 );
163 /* paranoid */ assert( this.conn.pipe[1] != -1 );
164 for() {
165 size_t len = options.socket.buflen;
166 char buffer[len];
167 PendingRead p;
[10ba012]168 p.next = 0p;
[7f0ac12]169 p.in.buf = (void*)buffer;
170 p.in.len = len;
171 push(*this.queue, &p);
[561dd26]172
[10ba012]173 if( options.log ) mutex(sout) sout | "=== Waiting new connection ===";
[137974ae]174 handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last );
[561dd26]175
[10ba012]176 if( options.log ) mutex(sout) sout | "=== Connection closed ===";
[7f0ac12]177 if(this.done) break;
178 }
179}
[0aec496]180
[7f0ac12]181extern "C" {
182extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
183}
[97748ee]184
[329e26a]185void ?{}( Acceptor & this, int cli ) {
186 ((thread&)this){ "Server Acceptor Thread", *options.clopts.instance[cli], 64000 };
187 options.clopts.thrd_cnt[cli]++;
[7f0ac12]188 this.done = false;
189}
[b57db73]190
[07997cd]191static inline __s32 get_res( io_future_t & this ) {
192 if( this.result < 0 ) {{
193 errno = -this.result;
194 return -1;
195 }}
196 return this.result;
197}
198
199static inline void push_connection( Acceptor & this, int fd ) {
200 PendingRead * p = 0p;
201 for() {
202 if(this.done) return;
203 p = pop(*this.queue);
204 if(p) break;
205 yield();
206 this.stats.creates++;
207 };
208
209 p->out.fd = fd;
210 async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY);
211}
212
213// #define ACCEPT_SPIN
214#define ACCEPT_MANY
[c25338d]215
[7f0ac12]216void main( Acceptor & this ) {
217 park();
[137974ae]218 unsigned long long last = rdtscl();
[c25338d]219
220#if defined(ACCEPT_SPIN)
[7f0ac12]221 if( options.log ) sout | "=== Accepting connection ===";
222 for() {
223 int fd = accept4(this.sockfd, this.[addr, addrlen, flags]);
224 if(fd < 0) {
225 if( errno == EWOULDBLOCK) {
[137974ae]226 this.stats.eagains++;
[7f0ac12]227 yield();
228 continue;
[97748ee]229 }
[7f0ac12]230 if( errno == ECONNABORTED ) break;
231 if( this.done && (errno == EINVAL || errno == EBADF) ) break;
232 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
233 }
[137974ae]234 this.stats.accepts++;
235
[7f0ac12]236 if(this.done) return;
[97748ee]237
[7f0ac12]238 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
[97748ee]239
[07997cd]240 if(fd) push_connection(this, fd);
[ee59ede]241
[137974ae]242 if (stats_thrd) {
243 unsigned long long next = rdtscl();
244 if(next > (last + 500000000)) {
245 if(try_lock(stats_thrd->stats.lock)) {
246 push(this.stats, stats_thrd->stats.accpt);
247 unlock(stats_thrd->stats.lock);
248 last = next;
249 }
250 }
251 }
252
[7f0ac12]253 if( options.log ) sout | "=== Accepting connection ===";
[0aec496]254 }
[c25338d]255
[07997cd]256#elif defined(ACCEPT_MANY)
[c25338d]257 const int nacc = 10;
258 io_future_t results[nacc];
259
260 for(i; nacc) {
261 io_future_t & res = results[i];
262 reset(res);
263 /* paranoid */ assert(!available(res));
264 if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
265 async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
266 }
267
268 for() {
269 if (stats_thrd) {
270 unsigned long long next = rdtscl();
271 if(next > (last + 500000000)) {
272 if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
273 push(this.stats, stats_thrd->stats.accpt);
274 unlock(stats_thrd->stats.lock);
275 last = next;
276 }
277 }
278 }
279
280 for(i; nacc) {
281 io_future_t & res = results[i];
282 if(available(res)) {
283 if( options.log ) mutex(sout) sout | "=== Accept no " | i | "completed with result" | res.result | "===";
284 int fd = get_res(res);
285 reset(res);
286 this.stats.accepts++;
287 if(fd < 0) {
288 if( errno == ECONNABORTED ) continue;
289 if( this.done && (errno == EINVAL || errno == EBADF) ) continue;
290 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
291 }
292 push_connection( this, fd );
293
294 /* paranoid */ assert(!available(res));
295 if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
296 async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
297 }
298 }
299 if(this.done) return;
300
301 if( options.log ) mutex(sout) sout | "=== Waiting for any accept ===";
302 this.stats.eagains++;
303 wait_any(results, nacc);
304
305 if( options.log ) mutex(sout) {
306 sout | "=== Acceptor wake-up ===";
307 for(i; nacc) {
308 io_future_t & res = results[i];
309 sout | i | "available:" | available(res);
310 }
311 }
312
313 }
314
315 for(i; nacc) {
316 wait(results[i]);
317 }
318#else
319#error no accept algorithm specified
320#endif
[7f0ac12]321}
Note: See TracBrowser for help on using the repository browser.