source: benchmark/io/http/worker.cfa@ c25338d

ADT ast-experimental pthread-emulation qualifiedEnum stuck-waitfor-destruct
Last change on this file since c25338d was c25338d, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Added accept 10 method (it doesn't really work).

  • Property mode set to 100644
File size: 8.2 KB
Line 
1#include "worker.hfa"
2
3#include <errno.h>
4#include <stdio.h>
5#include <string.h>
6#include <unistd.h>
7
8#include <fstream.hfa>
9#include <iofwd.hfa>
10#include <mutex_stmt.hfa>
11
12#include "options.hfa"
13#include "protocol.hfa"
14#include "filecache.hfa"
15
16//=============================================================================================
17// Generic connection handling
18//=============================================================================================
19static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f, unsigned long long & last ) {
20 REQUEST:
21 for() {
22 bool closed;
23 HttpCode code;
24 const char * file;
25 size_t name_size;
26
27 // Read the http request
28 if( options.log ) mutex(sout) sout | "=== Reading request ===";
29 [code, closed, file, name_size] = http_read(fd, buffer, len, f);
30 f = 0p;
31
32 // if we are done, break out of the loop
33 if( closed ) break REQUEST;
34
35 // If this wasn't a request retrun 400
36 if( code != OK200 ) {
37 sout | "=== Invalid Request :" | code_val(code) | "===";
38 answer_error(fd, code);
39 continue REQUEST;
40 }
41
42 if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
43 if( options.log ) mutex(sout) sout | "=== Request for /plaintext ===";
44
45 int ret = answer_plaintext(fd);
46 if( ret == -ECONNRESET ) break REQUEST;
47
48 if( options.log ) mutex(sout) sout | "=== Answer sent ===";
49 continue REQUEST;
50 }
51
52 if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
53 if( options.log ) mutex(sout) sout | "=== Request for /ping ===";
54
55 // Send the header
56 int ret = answer_empty(fd);
57 if( ret == -ECONNRESET ) break REQUEST;
58
59 if( options.log ) mutex(sout) sout | "=== Answer sent ===";
60 continue REQUEST;
61 }
62
63 if( options.log ) {
64 sout | "=== Request for file " | nonl;
65 write(sout, file, name_size);
66 sout | " ===";
67 }
68
69 if( !options.file_cache.path ) {
70 if( options.log ) {
71 sout | "=== File Not Found (" | nonl;
72 write(sout, file, name_size);
73 sout | ") ===";
74 }
75 answer_error(fd, E405);
76 continue REQUEST;
77 }
78
79 // Get the fd from the file cache
80 int ans_fd;
81 size_t count;
82 [ans_fd, count] = get_file( file, name_size );
83
84 // If we can't find the file, return 404
85 if( ans_fd < 0 ) {
86 if( options.log ) {
87 sout | "=== File Not Found (" | nonl;
88 write(sout, file, name_size);
89 sout | ") ===";
90 }
91 answer_error(fd, E404);
92 continue REQUEST;
93 }
94
95 // Send the desired file
96 int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
97 if( ret == -ECONNRESET ) break REQUEST;
98
99 if( options.log ) mutex(sout) sout | "=== Answer sent ===";
100 }
101
102 if (stats_thrd) {
103 unsigned long long next = rdtscl();
104 if(next > (last + 500000000)) {
105 if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
106 push(this.stats.sendfile, stats_thrd->stats.send);
107 unlock(stats_thrd->stats.lock);
108 last = next;
109 }
110 }
111 }
112}
113
114//=============================================================================================
115// Self Accepting Worker Thread
116//=============================================================================================
117void ?{}( AcceptWorker & this ) {
118 ((thread&)this){ "Server Worker Thread", *options.clopts.instance, 64000 };
119 options.clopts.thrd_cnt++;
120 this.done = false;
121}
122
123void main( AcceptWorker & this ) {
124 park();
125 unsigned long long last = rdtscl();
126 /* paranoid */ assert( this.conn.pipe[0] != -1 );
127 /* paranoid */ assert( this.conn.pipe[1] != -1 );
128 for() {
129 if( options.log ) mutex(sout) sout | "=== Accepting connection ===";
130 int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY );
131 if(fd < 0) {
132 if( errno == ECONNABORTED ) break;
133 if( this.done && (errno == EINVAL || errno == EBADF) ) break;
134 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
135 }
136 if(this.done) break;
137
138 if( options.log ) mutex(sout) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
139 size_t len = options.socket.buflen;
140 char buffer[len];
141 handle_connection( this.conn, fd, buffer, len, 0p, last );
142
143 if( options.log ) mutex(sout) sout | "=== Connection closed ===";
144 }
145}
146
147
148//=============================================================================================
149// Channel Worker Thread
150//=============================================================================================
151void ?{}( ChannelWorker & this ) {
152 ((thread&)this){ "Server Worker Thread", *options.clopts.instance, 64000 };
153 options.clopts.thrd_cnt++;
154 this.done = false;
155}
156
157void main( ChannelWorker & this ) {
158 park();
159 unsigned long long last = rdtscl();
160 /* paranoid */ assert( this.conn.pipe[0] != -1 );
161 /* paranoid */ assert( this.conn.pipe[1] != -1 );
162 for() {
163 size_t len = options.socket.buflen;
164 char buffer[len];
165 PendingRead p;
166 p.next = 0p;
167 p.in.buf = (void*)buffer;
168 p.in.len = len;
169 push(*this.queue, &p);
170
171 if( options.log ) mutex(sout) sout | "=== Waiting new connection ===";
172 handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last );
173
174 if( options.log ) mutex(sout) sout | "=== Connection closed ===";
175 if(this.done) break;
176 }
177}
178
179extern "C" {
180extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
181}
182
183void ?{}( Acceptor & this ) {
184 ((thread&)this){ "Server Acceptor Thread", *options.clopts.instance, 64000 };
185 options.clopts.thrd_cnt++;
186 this.done = false;
187}
188
189#define ACCEPT_SPIN
190
191void main( Acceptor & this ) {
192 park();
193 unsigned long long last = rdtscl();
194
195#if defined(ACCEPT_SPIN)
196 if( options.log ) sout | "=== Accepting connection ===";
197 for() {
198 int fd = accept4(this.sockfd, this.[addr, addrlen, flags]);
199 if(fd < 0) {
200 if( errno == EWOULDBLOCK) {
201 this.stats.eagains++;
202 yield();
203 continue;
204 }
205 if( errno == ECONNABORTED ) break;
206 if( this.done && (errno == EINVAL || errno == EBADF) ) break;
207 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
208 }
209 this.stats.accepts++;
210
211 if(this.done) return;
212
213 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
214
215 if(fd) {
216 PendingRead * p = 0p;
217 for() {
218 if(this.done) return;
219 p = pop(*this.queue);
220 if(p) break;
221 yield();
222 this.stats.creates++;
223 };
224
225 p->out.fd = fd;
226 async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY);
227 }
228
229 if (stats_thrd) {
230 unsigned long long next = rdtscl();
231 if(next > (last + 500000000)) {
232 if(try_lock(stats_thrd->stats.lock)) {
233 push(this.stats, stats_thrd->stats.accpt);
234 unlock(stats_thrd->stats.lock);
235 last = next;
236 }
237 }
238 }
239
240 if( options.log ) sout | "=== Accepting connection ===";
241 }
242
243#elif define(ACCEPT_MANY)
244 const int nacc = 10;
245 io_future_t results[nacc];
246
247 for(i; nacc) {
248 io_future_t & res = results[i];
249 reset(res);
250 /* paranoid */ assert(!available(res));
251 if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
252 async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
253 }
254
255 for() {
256 if (stats_thrd) {
257 unsigned long long next = rdtscl();
258 if(next > (last + 500000000)) {
259 if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
260 push(this.stats, stats_thrd->stats.accpt);
261 unlock(stats_thrd->stats.lock);
262 last = next;
263 }
264 }
265 }
266
267 for(i; nacc) {
268 io_future_t & res = results[i];
269 if(available(res)) {
270 if( options.log ) mutex(sout) sout | "=== Accept no " | i | "completed with result" | res.result | "===";
271 int fd = get_res(res);
272 reset(res);
273 this.stats.accepts++;
274 if(fd < 0) {
275 if( errno == ECONNABORTED ) continue;
276 if( this.done && (errno == EINVAL || errno == EBADF) ) continue;
277 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
278 }
279 push_connection( this, fd );
280
281 /* paranoid */ assert(!available(res));
282 if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
283 async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
284 }
285 }
286 if(this.done) return;
287
288 if( options.log ) mutex(sout) sout | "=== Waiting for any accept ===";
289 this.stats.eagains++;
290 wait_any(results, nacc);
291
292 if( options.log ) mutex(sout) {
293 sout | "=== Acceptor wake-up ===";
294 for(i; nacc) {
295 io_future_t & res = results[i];
296 sout | i | "available:" | available(res);
297 }
298 }
299
300 }
301
302 for(i; nacc) {
303 wait(results[i]);
304 }
305#else
306#error no accept algorithm specified
307#endif
308}
Note: See TracBrowser for help on using the repository browser.