source: benchmark/io/http/worker.cfa @ df56e25

Last change on this file since df56e25 was 0f1336c, checked in by Thierry Delisle <tdelisle@…>, 2 years ago

Minor error handling improvement.

  • Property mode set to 100644
File size: 10.8 KB
Line 
1#include "worker.hfa"
2
3#include <errno.h>
4#include <stdio.h>
5#include <string.h>
6#include <unistd.h>
7
8#include <fstream.hfa>
9#include <iofwd.hfa>
10#include <mutex_stmt.hfa>
11
12#include "options.hfa"
13#include "protocol.hfa"
14#include "filecache.hfa"
15
16static const unsigned long long period = 5_000_000;
17
18//=============================================================================================
19// Generic connection handling
20//=============================================================================================
21static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f, unsigned long long & last ) {
22        REQUEST:
23        for() {
24                bool closed;
25                HttpCode code;
26                const char * file;
27                size_t name_size;
28
29                // Read the http request
30                if( options.log ) mutex(sout) sout | "=== Reading request ===";
31                [code, closed, file, name_size] = http_read(fd, buffer, len, f);
32                f = 0p;
33
34                // if we are done, break out of the loop
35                if( closed ) {
36                        if( code != OK200 ) this.stats.sendfile.error++;
37                        break REQUEST;
38                }
39
40                // If this wasn't a request retrun 400
41                if( code != OK200 ) {
42                        abort | "=== Invalid Request :" | code_val(code) | "===";
43                        // answer_error(fd, code);
44                        // continue REQUEST;
45                }
46
47                // if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
48                //      if( options.log ) mutex(sout) sout | "=== Request for /plaintext ===";
49
50                //      int ret = answer_plaintext(fd);
51                //      if( ret == -ECONNRESET ) { this.stats.sendfile.error++; break REQUEST; }
52
53                //      if( options.log ) mutex(sout) sout | "=== Answer sent ===";
54                //      continue REQUEST;
55                // }
56
57                // if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
58                //      if( options.log ) mutex(sout) sout | "=== Request for /ping ===";
59
60                //      // Send the header
61                //      int ret = answer_empty(fd);
62                //      if( ret == -ECONNRESET ) { this.stats.sendfile.error++; break REQUEST; }
63
64                //      if( options.log ) mutex(sout) sout | "=== Answer sent ===";
65                //      continue REQUEST;
66                // }
67
68                if( options.log ) {
69                        sout | "=== Request for file " | nonl;
70                        write(sout, file, name_size);
71                        sout | " ===";
72                }
73
74                if( !options.file_cache.path ) {
75                        // if( options.log ) {
76                                serr | "=== File Not Found (" | nonl;
77                                write(serr, file, name_size);
78                                serr | ") ===";
79                                abort();
80                        // }
81                        // answer_error(fd, E405);
82                        // continue REQUEST;
83                }
84
85                // Get the fd from the file cache
86                int ans_fd;
87                size_t count;
88                [ans_fd, count] = get_file( file, name_size );
89
90                // If we can't find the file, return 404
91                if( ans_fd < 0 ) {
92                        // if( options.log ) {
93                                serr | "=== File Not Found 2 (" | nonl;
94                                write(serr, file, name_size);
95                                serr | ") ===";
96                                abort();
97                        // }
98                        // answer_error(fd, E404);
99                        // continue REQUEST;
100                }
101
102                // Send the desired file
103                int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
104                if(ret < 0) {
105                        if( ret == -ECONNABORTED ) { this.stats.sendfile.error++; break REQUEST; }
106                        if( ret == -ECONNRESET ) { this.stats.sendfile.error++; break REQUEST; }
107                        if( ret == -EPIPE ) { this.stats.sendfile.error++; break REQUEST; }
108                        if( ret == -EBADF ) { this.stats.sendfile.error++; break REQUEST; }
109                        abort( "answer sendfile error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
110                }
111
112                if( options.log ) mutex(sout) sout | "=== Answer sent ===";
113        }
114
115        this.stats.sendfile.close++;
116
117        if (stats_thrd) {
118                // unsigned long long next = rdtscl();
119                // if(next > (last + period)) {
120                        if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
121                                push(this.stats.sendfile, stats_thrd->stats.send);
122                                unlock(stats_thrd->stats.lock);
123                                // last = next;
124                        }
125                // }
126        }
127}
128
129//=============================================================================================
130// Self Accepting Worker Thread
131//=============================================================================================
132void ?{}( AcceptWorker & this ) {
133        size_t cli = rand() % options.clopts.cltr_cnt;
134        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
135        options.clopts.thrd_cnt[cli]++;
136        this.done = false;
137}
138
139void main( AcceptWorker & this ) {
140        park();
141        unsigned long long last = rdtscl();
142        /* paranoid */ assert( this.conn.pipe[0] != -1 );
143        /* paranoid */ assert( this.conn.pipe[1] != -1 );
144        for() {
145                if( options.log ) mutex(sout) sout | "=== Accepting connection ===";
146                int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY );
147                if(fd <= 0) {
148                        if( errno == ECONNABORTED ) break;
149                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
150                        abort( "accept error %d: (%d) %s\n", fd, (int)errno, strerror(errno) );
151                }
152                if(this.done) break;
153
154                this.stats.accepts++;
155                if (stats_thrd && try_lock(stats_thrd->stats.lock)) {
156                        push(this.stats, stats_thrd->stats.accpt);
157                        unlock(stats_thrd->stats.lock);
158                }
159
160                if( options.log ) mutex(sout) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
161                size_t len = options.socket.buflen;
162                char buffer[len];
163                handle_connection( this.conn, fd, buffer, len, 0p, last );
164                this.conn.stats.sendfile.maxfd = max(this.conn.stats.sendfile.maxfd, fd);
165
166                if( options.log ) mutex(sout) sout | "=== Connection closed ===";
167        }
168}
169
170
171//=============================================================================================
172// Channel Worker Thread
173//=============================================================================================
174void ?{}( ChannelWorker & this ) {
175        size_t cli = rand() % options.clopts.cltr_cnt;
176        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
177        options.clopts.thrd_cnt[cli]++;
178        this.done = false;
179}
180
181void main( ChannelWorker & this ) {
182        park();
183        unsigned long long last = rdtscl();
184        /* paranoid */ assert( this.conn.pipe[0] != -1 );
185        /* paranoid */ assert( this.conn.pipe[1] != -1 );
186        this.conn.stats.sendfile.maxfd = max(this.conn.pipe[0], this.conn.pipe[1]);
187        // this.conn.stats.sendfile.maxfd = 0;
188        for() {
189                size_t len = options.socket.buflen;
190                char buffer[len];
191                PendingRead p;
192                p.next = 0p;
193                p.in.buf = (void*)buffer;
194                p.in.len = len;
195                push(*this.queue, &p);
196
197                if( options.log ) mutex(sout) sout | "=== Waiting new connection ===";
198                handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last );
199                if(this.done) break;
200                this.conn.stats.sendfile.maxfd = max(this.conn.stats.sendfile.maxfd, p.out.fd);
201                this.conn.stats.sendfile.close++;
202
203                if( options.log ) mutex(sout) sout | "=== Connection closed ===";
204        }
205
206        lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2);
207        push(this.conn.stats.sendfile, stats_thrd->stats.send);
208        unlock(stats_thrd->stats.lock);
209}
210
211extern "C" {
212extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
213}
214
215void ?{}( Acceptor & this, int cli ) {
216        ((thread&)this){ "Server Acceptor Thread", *options.clopts.instance[cli], 64000 };
217        options.clopts.thrd_cnt[cli]++;
218        this.done = false;
219}
220
221static inline __s32 get_res( io_future_t & this ) {
222        if( this.result < 0 ) {{
223                errno = -this.result;
224                return -1;
225        }}
226        return this.result;
227}
228
229static inline void push_connection( Acceptor & this, int fd ) {
230        this.stats.accepts++;
231        PendingRead * p = 0p;
232        for() {
233                if(this.done) return;
234                p = pop(*this.queue);
235                if(p) break;
236                // abort( "Too few threads" );
237                yield();
238                this.stats.creates++;
239        };
240
241        p->out.fd = fd;
242        async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY);
243}
244
245// #define ACCEPT_SPIN
246#define ACCEPT_ONE
247// #define ACCEPT_MANY
248
249void main( Acceptor & this ) {
250        park();
251        unsigned long long last = rdtscl();
252
253#if defined(ACCEPT_SPIN)
254        if( options.log ) sout | "=== Accepting connection ===";
255        for() {
256                int fd = accept4(this.sockfd, this.[addr, addrlen, flags]);
257                if(fd < 0) {
258                        if( errno == EWOULDBLOCK) {
259                                this.stats.eagains++;
260                                yield();
261                                continue;
262                        }
263                        if( errno == ECONNABORTED ) break;
264                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
265                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
266                }
267
268                if(this.done) return;
269
270                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
271
272                if(fd) push_connection(this, fd);
273
274                if (stats_thrd) {
275                        unsigned long long next = rdtscl();
276                        if(next > (last + period)) {
277                                if(try_lock(stats_thrd->stats.lock)) {
278                                        push(this.stats, stats_thrd->stats.accpt);
279                                        unlock(stats_thrd->stats.lock);
280                                        last = next;
281                                }
282                        }
283                }
284
285                if( options.log ) sout | "=== Accepting connection ===";
286        }
287
288#elif defined(ACCEPT_ONE)
289        if( options.log ) sout | "=== Accepting connection ===";
290        for() {
291                int fd = cfa_accept4(this.sockfd, this.[addr, addrlen, flags], 0);
292                if(fd < 0) {
293                        if( errno == ECONNABORTED ) break;
294                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
295                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
296                }
297
298                if(this.done) return;
299
300                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
301
302                if(fd) push_connection(this, fd);
303
304                if (stats_thrd) {
305                        // unsigned long long next = rdtscl();
306                        // if(next > (last + period)) {
307                                if(try_lock(stats_thrd->stats.lock)) {
308                                        push(this.stats, stats_thrd->stats.accpt);
309                                        unlock(stats_thrd->stats.lock);
310                                        // last = next;
311                                }
312                        // }
313                }
314
315                if( options.log ) sout | "=== Accepting connection ===";
316        }
317
318#elif defined(ACCEPT_MANY)
319        const int nacc = 10;
320        io_future_t results[nacc];
321
322        for(i; nacc) {
323                io_future_t & res = results[i];
324                reset(res);
325                /* paranoid */ assert(!available(res));
326                if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
327                async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
328        }
329
330        for() {
331                if (stats_thrd) {
332                        unsigned long long next = rdtscl();
333                        if(next > (last + period)) {
334                                if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
335                                        push(this.stats, stats_thrd->stats.accpt);
336                                        unlock(stats_thrd->stats.lock);
337                                        last = next;
338                                }
339                        }
340                }
341
342                for(i; nacc) {
343                        io_future_t & res = results[i];
344                        if(available(res)) {
345                                if( options.log ) mutex(sout) sout | "=== Accept no " | i | "completed with result" | res.result | "===";
346                                int fd = get_res(res);
347                                reset(res);
348                                if(fd < 0) {
349                                        if( errno == ECONNABORTED ) continue;
350                                        if( this.done && (errno == EINVAL || errno == EBADF) ) continue;
351                                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
352                                }
353                                push_connection( this, fd );
354
355                                /* paranoid */ assert(!available(res));
356                                if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
357                                async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
358                        }
359                }
360                if(this.done) return;
361
362                if( options.log ) mutex(sout) sout | "=== Waiting for any accept ===";
363                this.stats.eagains++;
364                wait_any(results, nacc);
365
366                if( options.log ) mutex(sout) {
367                        sout | "=== Acceptor wake-up ===";
368                        for(i; nacc) {
369                                io_future_t & res = results[i];
370                                sout | i | "available:" | available(res);
371                        }
372                }
373
374        }
375
376        for(i; nacc) {
377                wait(results[i]);
378        }
379#else
380#error no accept algorithm specified
381#endif
382        lock(stats_thrd->stats.lock);
383        push(this.stats, stats_thrd->stats.accpt);
384        unlock(stats_thrd->stats.lock);
385}
Note: See TracBrowser for help on using the repository browser.