source: benchmark/io/http/worker.cfa @ 329e26a

pthread-emulationqualifiedEnum
Last change on this file since 329e26a was 329e26a, checked in by Thierry Delisle <tdelisle@…>, 6 months ago

Re-instated the isolate/multi-cluster option.

  • Property mode set to 100644
File size: 8.4 KB
Line 
1#include "worker.hfa"
2
3#include <errno.h>
4#include <stdio.h>
5#include <string.h>
6#include <unistd.h>
7
8#include <fstream.hfa>
9#include <iofwd.hfa>
10#include <mutex_stmt.hfa>
11
12#include "options.hfa"
13#include "protocol.hfa"
14#include "filecache.hfa"
15
16//=============================================================================================
17// Generic connection handling
18//=============================================================================================
19static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f, unsigned long long & last ) {
20        REQUEST:
21        for() {
22                bool closed;
23                HttpCode code;
24                const char * file;
25                size_t name_size;
26
27                // Read the http request
28                if( options.log ) mutex(sout) sout | "=== Reading request ===";
29                [code, closed, file, name_size] = http_read(fd, buffer, len, f);
30                f = 0p;
31
32                // if we are done, break out of the loop
33                if( closed ) break REQUEST;
34
35                // If this wasn't a request retrun 400
36                if( code != OK200 ) {
37                        sout | "=== Invalid Request :" | code_val(code) | "===";
38                        answer_error(fd, code);
39                        continue REQUEST;
40                }
41
42                if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
43                        if( options.log ) mutex(sout) sout | "=== Request for /plaintext ===";
44
45                        int ret = answer_plaintext(fd);
46                        if( ret == -ECONNRESET ) break REQUEST;
47
48                        if( options.log ) mutex(sout) sout | "=== Answer sent ===";
49                        continue REQUEST;
50                }
51
52                if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
53                        if( options.log ) mutex(sout) sout | "=== Request for /ping ===";
54
55                        // Send the header
56                        int ret = answer_empty(fd);
57                        if( ret == -ECONNRESET ) break REQUEST;
58
59                        if( options.log ) mutex(sout) sout | "=== Answer sent ===";
60                        continue REQUEST;
61                }
62
63                if( options.log ) {
64                        sout | "=== Request for file " | nonl;
65                        write(sout, file, name_size);
66                        sout | " ===";
67                }
68
69                if( !options.file_cache.path ) {
70                        if( options.log ) {
71                                sout | "=== File Not Found (" | nonl;
72                                write(sout, file, name_size);
73                                sout | ") ===";
74                        }
75                        answer_error(fd, E405);
76                        continue REQUEST;
77                }
78
79                // Get the fd from the file cache
80                int ans_fd;
81                size_t count;
82                [ans_fd, count] = get_file( file, name_size );
83
84                // If we can't find the file, return 404
85                if( ans_fd < 0 ) {
86                        if( options.log ) {
87                                sout | "=== File Not Found (" | nonl;
88                                write(sout, file, name_size);
89                                sout | ") ===";
90                        }
91                        answer_error(fd, E404);
92                        continue REQUEST;
93                }
94
95                // Send the desired file
96                int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
97                if( ret == -ECONNRESET ) break REQUEST;
98
99                if( options.log ) mutex(sout) sout | "=== Answer sent ===";
100        }
101
102        if (stats_thrd) {
103                unsigned long long next = rdtscl();
104                if(next > (last + 500000000)) {
105                        if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
106                                push(this.stats.sendfile, stats_thrd->stats.send);
107                                unlock(stats_thrd->stats.lock);
108                                last = next;
109                        }
110                }
111        }
112}
113
114//=============================================================================================
115// Self Accepting Worker Thread
116//=============================================================================================
117void ?{}( AcceptWorker & this ) {
118        size_t cli = rand() % options.clopts.cltr_cnt;
119        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
120        options.clopts.thrd_cnt[cli]++;
121        this.done = false;
122}
123
124void main( AcceptWorker & this ) {
125        park();
126        unsigned long long last = rdtscl();
127        /* paranoid */ assert( this.conn.pipe[0] != -1 );
128        /* paranoid */ assert( this.conn.pipe[1] != -1 );
129        for() {
130                if( options.log ) mutex(sout) sout | "=== Accepting connection ===";
131                int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY );
132                if(fd < 0) {
133                        if( errno == ECONNABORTED ) break;
134                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
135                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
136                }
137                if(this.done) break;
138
139                if( options.log ) mutex(sout) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
140                size_t len = options.socket.buflen;
141                char buffer[len];
142                handle_connection( this.conn, fd, buffer, len, 0p, last );
143
144                if( options.log ) mutex(sout) sout | "=== Connection closed ===";
145        }
146}
147
148
149//=============================================================================================
150// Channel Worker Thread
151//=============================================================================================
152void ?{}( ChannelWorker & this ) {
153        size_t cli = rand() % options.clopts.cltr_cnt;
154        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
155        options.clopts.thrd_cnt[cli]++;
156        this.done = false;
157}
158
159void main( ChannelWorker & this ) {
160        park();
161        unsigned long long last = rdtscl();
162        /* paranoid */ assert( this.conn.pipe[0] != -1 );
163        /* paranoid */ assert( this.conn.pipe[1] != -1 );
164        for() {
165                size_t len = options.socket.buflen;
166                char buffer[len];
167                PendingRead p;
168                p.next = 0p;
169                p.in.buf = (void*)buffer;
170                p.in.len = len;
171                push(*this.queue, &p);
172
173                if( options.log ) mutex(sout) sout | "=== Waiting new connection ===";
174                handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last );
175
176                if( options.log ) mutex(sout) sout | "=== Connection closed ===";
177                if(this.done) break;
178        }
179}
180
181extern "C" {
182extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
183}
184
185void ?{}( Acceptor & this, int cli ) {
186        ((thread&)this){ "Server Acceptor Thread", *options.clopts.instance[cli], 64000 };
187        options.clopts.thrd_cnt[cli]++;
188        this.done = false;
189}
190
191#define ACCEPT_SPIN
192
193void main( Acceptor & this ) {
194        park();
195        unsigned long long last = rdtscl();
196
197#if defined(ACCEPT_SPIN)
198        if( options.log ) sout | "=== Accepting connection ===";
199        for() {
200                int fd = accept4(this.sockfd, this.[addr, addrlen, flags]);
201                if(fd < 0) {
202                        if( errno == EWOULDBLOCK) {
203                                this.stats.eagains++;
204                                yield();
205                                continue;
206                        }
207                        if( errno == ECONNABORTED ) break;
208                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
209                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
210                }
211                this.stats.accepts++;
212
213                if(this.done) return;
214
215                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
216
217                if(fd) {
218                        PendingRead * p = 0p;
219                        for() {
220                                if(this.done) return;
221                                p = pop(*this.queue);
222                                if(p) break;
223                                yield();
224                                this.stats.creates++;
225                        };
226
227                        p->out.fd = fd;
228                        async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY);
229                }
230
231                if (stats_thrd) {
232                        unsigned long long next = rdtscl();
233                        if(next > (last + 500000000)) {
234                                if(try_lock(stats_thrd->stats.lock)) {
235                                        push(this.stats, stats_thrd->stats.accpt);
236                                        unlock(stats_thrd->stats.lock);
237                                        last = next;
238                                }
239                        }
240                }
241
242                if( options.log ) sout | "=== Accepting connection ===";
243        }
244
245#elif define(ACCEPT_MANY)
246        const int nacc = 10;
247        io_future_t results[nacc];
248
249        for(i; nacc) {
250                io_future_t & res = results[i];
251                reset(res);
252                /* paranoid */ assert(!available(res));
253                if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
254                async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
255        }
256
257        for() {
258                if (stats_thrd) {
259                        unsigned long long next = rdtscl();
260                        if(next > (last + 500000000)) {
261                                if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
262                                        push(this.stats, stats_thrd->stats.accpt);
263                                        unlock(stats_thrd->stats.lock);
264                                        last = next;
265                                }
266                        }
267                }
268
269                for(i; nacc) {
270                        io_future_t & res = results[i];
271                        if(available(res)) {
272                                if( options.log ) mutex(sout) sout | "=== Accept no " | i | "completed with result" | res.result | "===";
273                                int fd = get_res(res);
274                                reset(res);
275                                this.stats.accepts++;
276                                if(fd < 0) {
277                                        if( errno == ECONNABORTED ) continue;
278                                        if( this.done && (errno == EINVAL || errno == EBADF) ) continue;
279                                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
280                                }
281                                push_connection( this, fd );
282
283                                /* paranoid */ assert(!available(res));
284                                if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
285                                async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
286                        }
287                }
288                if(this.done) return;
289
290                if( options.log ) mutex(sout) sout | "=== Waiting for any accept ===";
291                this.stats.eagains++;
292                wait_any(results, nacc);
293
294                if( options.log ) mutex(sout) {
295                        sout | "=== Acceptor wake-up ===";
296                        for(i; nacc) {
297                                io_future_t & res = results[i];
298                                sout | i | "available:" | available(res);
299                        }
300                }
301
302        }
303
304        for(i; nacc) {
305                wait(results[i]);
306        }
307#else
308#error no accept algorithm specified
309#endif
310}
Note: See TracBrowser for help on using the repository browser.