source: benchmark/io/http/worker.cfa @ 8bee858

ADTast-experimentalpthread-emulation
Last change on this file since 8bee858 was 3f95dab, checked in by Thierry Delisle <tdelisle@…>, 2 years ago

Committing hopefully last version of the webserver

  • Property mode set to 100644
File size: 10.2 KB
Line 
1#include "worker.hfa"
2
3#include <errno.h>
4#include <stdio.h>
5#include <string.h>
6#include <unistd.h>
7
8#include <fstream.hfa>
9#include <iofwd.hfa>
10#include <mutex_stmt.hfa>
11
12#include "options.hfa"
13#include "protocol.hfa"
14#include "filecache.hfa"
15
16static const unsigned long long period = 50_000_000;
17
18//=============================================================================================
19// Generic connection handling
20//=============================================================================================
21static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f, unsigned long long & last ) {
22        REQUEST:
23        for() {
24                bool closed;
25                HttpCode code;
26                const char * file;
27                size_t name_size;
28
29                // Read the http request
30                if( options.log ) mutex(sout) sout | "=== Reading request ===";
31                [code, closed, file, name_size] = http_read(fd, buffer, len, f);
32                f = 0p;
33
34                // if we are done, break out of the loop
35                if( closed ) break REQUEST;
36
37                // If this wasn't a request retrun 400
38                if( code != OK200 ) {
39                        sout | "=== Invalid Request :" | code_val(code) | "===";
40                        answer_error(fd, code);
41                        continue REQUEST;
42                }
43
44                if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
45                        if( options.log ) mutex(sout) sout | "=== Request for /plaintext ===";
46
47                        int ret = answer_plaintext(fd);
48                        if( ret == -ECONNRESET ) break REQUEST;
49
50                        if( options.log ) mutex(sout) sout | "=== Answer sent ===";
51                        continue REQUEST;
52                }
53
54                if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
55                        if( options.log ) mutex(sout) sout | "=== Request for /ping ===";
56
57                        // Send the header
58                        int ret = answer_empty(fd);
59                        if( ret == -ECONNRESET ) break REQUEST;
60
61                        if( options.log ) mutex(sout) sout | "=== Answer sent ===";
62                        continue REQUEST;
63                }
64
65                if( options.log ) {
66                        sout | "=== Request for file " | nonl;
67                        write(sout, file, name_size);
68                        sout | " ===";
69                }
70
71                if( !options.file_cache.path ) {
72                        if( options.log ) {
73                                sout | "=== File Not Found (" | nonl;
74                                write(sout, file, name_size);
75                                sout | ") ===";
76                        }
77                        answer_error(fd, E405);
78                        continue REQUEST;
79                }
80
81                // Get the fd from the file cache
82                int ans_fd;
83                size_t count;
84                [ans_fd, count] = get_file( file, name_size );
85
86                // If we can't find the file, return 404
87                if( ans_fd < 0 ) {
88                        if( options.log ) {
89                                sout | "=== File Not Found (" | nonl;
90                                write(sout, file, name_size);
91                                sout | ") ===";
92                        }
93                        answer_error(fd, E404);
94                        continue REQUEST;
95                }
96
97                // Send the desired file
98                int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
99                if(ret < 0) {
100                        if( ret == -ECONNABORTED ) break REQUEST;
101                        if( ret == -ECONNRESET ) break REQUEST;
102                        if( ret == -EPIPE ) break REQUEST;
103                        abort( "sendfile error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
104                }
105
106                if( options.log ) mutex(sout) sout | "=== Answer sent ===";
107        }
108
109        if (stats_thrd) {
110                unsigned long long next = rdtscl();
111                if(next > (last + period)) {
112                        if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
113                                push(this.stats.sendfile, stats_thrd->stats.send);
114                                unlock(stats_thrd->stats.lock);
115                                last = next;
116                        }
117                }
118        }
119}
120
121//=============================================================================================
122// Self Accepting Worker Thread
123//=============================================================================================
124void ?{}( AcceptWorker & this ) {
125        size_t cli = rand() % options.clopts.cltr_cnt;
126        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
127        options.clopts.thrd_cnt[cli]++;
128        this.done = false;
129}
130
131void main( AcceptWorker & this ) {
132        park();
133        unsigned long long last = rdtscl();
134        /* paranoid */ assert( this.conn.pipe[0] != -1 );
135        /* paranoid */ assert( this.conn.pipe[1] != -1 );
136        for() {
137                if( options.log ) mutex(sout) sout | "=== Accepting connection ===";
138                int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY );
139                if(fd < 0) {
140                        if( errno == ECONNABORTED ) break;
141                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
142                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
143                }
144                if(this.done) break;
145
146                if( options.log ) mutex(sout) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
147                size_t len = options.socket.buflen;
148                char buffer[len];
149                handle_connection( this.conn, fd, buffer, len, 0p, last );
150                this.conn.stats.sendfile.maxfd = max(this.conn.stats.sendfile.maxfd, fd);
151                this.conn.stats.sendfile.close++;
152
153                if( options.log ) mutex(sout) sout | "=== Connection closed ===";
154        }
155}
156
157
158//=============================================================================================
159// Channel Worker Thread
160//=============================================================================================
161void ?{}( ChannelWorker & this ) {
162        size_t cli = rand() % options.clopts.cltr_cnt;
163        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
164        options.clopts.thrd_cnt[cli]++;
165        this.done = false;
166}
167
168void main( ChannelWorker & this ) {
169        park();
170        unsigned long long last = rdtscl();
171        /* paranoid */ assert( this.conn.pipe[0] != -1 );
172        /* paranoid */ assert( this.conn.pipe[1] != -1 );
173        this.conn.stats.sendfile.maxfd = max(this.conn.pipe[0], this.conn.pipe[1]);
174        for() {
175                size_t len = options.socket.buflen;
176                char buffer[len];
177                PendingRead p;
178                p.next = 0p;
179                p.in.buf = (void*)buffer;
180                p.in.len = len;
181                push(*this.queue, &p);
182
183                if( options.log ) mutex(sout) sout | "=== Waiting new connection ===";
184                handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last );
185                if(this.done) break;
186                this.conn.stats.sendfile.maxfd = max(this.conn.stats.sendfile.maxfd, p.out.fd);
187                this.conn.stats.sendfile.close++;
188
189                if( options.log ) mutex(sout) sout | "=== Connection closed ===";
190        }
191
192        lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2);
193        push(this.conn.stats.sendfile, stats_thrd->stats.send);
194        unlock(stats_thrd->stats.lock);
195}
196
197extern "C" {
198extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
199}
200
201void ?{}( Acceptor & this, int cli ) {
202        ((thread&)this){ "Server Acceptor Thread", *options.clopts.instance[cli], 64000 };
203        options.clopts.thrd_cnt[cli]++;
204        this.done = false;
205}
206
207static inline __s32 get_res( io_future_t & this ) {
208        if( this.result < 0 ) {{
209                errno = -this.result;
210                return -1;
211        }}
212        return this.result;
213}
214
215static inline void push_connection( Acceptor & this, int fd ) {
216        this.stats.accepts++;
217        PendingRead * p = 0p;
218        for() {
219                if(this.done) return;
220                p = pop(*this.queue);
221                if(p) break;
222                yield();
223                this.stats.creates++;
224        };
225
226        p->out.fd = fd;
227        async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY);
228}
229
230// #define ACCEPT_SPIN
231#define ACCEPT_ONE
232// #define ACCEPT_MANY
233
234void main( Acceptor & this ) {
235        park();
236        unsigned long long last = rdtscl();
237
238#if defined(ACCEPT_SPIN)
239        if( options.log ) sout | "=== Accepting connection ===";
240        for() {
241                int fd = accept4(this.sockfd, this.[addr, addrlen, flags]);
242                if(fd < 0) {
243                        if( errno == EWOULDBLOCK) {
244                                this.stats.eagains++;
245                                yield();
246                                continue;
247                        }
248                        if( errno == ECONNABORTED ) break;
249                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
250                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
251                }
252
253                if(this.done) return;
254
255                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
256
257                if(fd) push_connection(this, fd);
258
259                if (stats_thrd) {
260                        unsigned long long next = rdtscl();
261                        if(next > (last + period)) {
262                                if(try_lock(stats_thrd->stats.lock)) {
263                                        push(this.stats, stats_thrd->stats.accpt);
264                                        unlock(stats_thrd->stats.lock);
265                                        last = next;
266                                }
267                        }
268                }
269
270                if( options.log ) sout | "=== Accepting connection ===";
271        }
272
273#elif defined(ACCEPT_ONE)
274        if( options.log ) sout | "=== Accepting connection ===";
275        for() {
276                int fd = cfa_accept4(this.sockfd, this.[addr, addrlen, flags], 0);
277                if(fd < 0) {
278                        if( errno == ECONNABORTED ) break;
279                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
280                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
281                }
282
283                if(this.done) return;
284
285                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
286
287                if(fd) push_connection(this, fd);
288
289                if (stats_thrd) {
290                        unsigned long long next = rdtscl();
291                        if(next > (last + period)) {
292                                if(try_lock(stats_thrd->stats.lock)) {
293                                        push(this.stats, stats_thrd->stats.accpt);
294                                        unlock(stats_thrd->stats.lock);
295                                        last = next;
296                                }
297                        }
298                }
299
300                if( options.log ) sout | "=== Accepting connection ===";
301        }
302
303#elif defined(ACCEPT_MANY)
304        const int nacc = 10;
305        io_future_t results[nacc];
306
307        for(i; nacc) {
308                io_future_t & res = results[i];
309                reset(res);
310                /* paranoid */ assert(!available(res));
311                if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
312                async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
313        }
314
315        for() {
316                if (stats_thrd) {
317                        unsigned long long next = rdtscl();
318                        if(next > (last + period)) {
319                                if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
320                                        push(this.stats, stats_thrd->stats.accpt);
321                                        unlock(stats_thrd->stats.lock);
322                                        last = next;
323                                }
324                        }
325                }
326
327                for(i; nacc) {
328                        io_future_t & res = results[i];
329                        if(available(res)) {
330                                if( options.log ) mutex(sout) sout | "=== Accept no " | i | "completed with result" | res.result | "===";
331                                int fd = get_res(res);
332                                reset(res);
333                                if(fd < 0) {
334                                        if( errno == ECONNABORTED ) continue;
335                                        if( this.done && (errno == EINVAL || errno == EBADF) ) continue;
336                                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
337                                }
338                                push_connection( this, fd );
339
340                                /* paranoid */ assert(!available(res));
341                                if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
342                                async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
343                        }
344                }
345                if(this.done) return;
346
347                if( options.log ) mutex(sout) sout | "=== Waiting for any accept ===";
348                this.stats.eagains++;
349                wait_any(results, nacc);
350
351                if( options.log ) mutex(sout) {
352                        sout | "=== Acceptor wake-up ===";
353                        for(i; nacc) {
354                                io_future_t & res = results[i];
355                                sout | i | "available:" | available(res);
356                        }
357                }
358
359        }
360
361        for(i; nacc) {
362                wait(results[i]);
363        }
364#else
365#error no accept algorithm specified
366#endif
367        lock(stats_thrd->stats.lock);
368        push(this.stats, stats_thrd->stats.accpt);
369        unlock(stats_thrd->stats.lock);
370}
Note: See TracBrowser for help on using the repository browser.