source: benchmark/io/http/worker.cfa @ bf7c7ea

ADTast-experimentalpthread-emulationqualifiedEnum
Last change on this file since bf7c7ea was c25338d, checked in by Thierry Delisle <tdelisle@…>, 2 years ago

Added accept 10 method (it doesn't really work).

  • Property mode set to 100644
File size: 8.2 KB
RevLine 
[0aec496]1#include "worker.hfa"
2
3#include <errno.h>
4#include <stdio.h>
5#include <string.h>
[c82af9f]6#include <unistd.h>
[0aec496]7
[8c43d05]8#include <fstream.hfa>
[0aec496]9#include <iofwd.hfa>
[7f0ac12]10#include <mutex_stmt.hfa>
[0aec496]11
12#include "options.hfa"
13#include "protocol.hfa"
14#include "filecache.hfa"
15
16//=============================================================================================
[7f0ac12]17// Generic connection handling
[0aec496]18//=============================================================================================
[137974ae]19static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f, unsigned long long & last ) {
[7f0ac12]20        REQUEST:
21        for() {
22                bool closed;
23                HttpCode code;
24                const char * file;
25                size_t name_size;
26
27                // Read the http request
[10ba012]28                if( options.log ) mutex(sout) sout | "=== Reading request ===";
[7f0ac12]29                [code, closed, file, name_size] = http_read(fd, buffer, len, f);
30                f = 0p;
31
32                // if we are done, break out of the loop
33                if( closed ) break REQUEST;
34
35                // If this wasn't a request retrun 400
36                if( code != OK200 ) {
37                        sout | "=== Invalid Request :" | code_val(code) | "===";
38                        answer_error(fd, code);
39                        continue REQUEST;
40                }
[ef3c383]41
[7f0ac12]42                if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
[10ba012]43                        if( options.log ) mutex(sout) sout | "=== Request for /plaintext ===";
[7f0ac12]44
45                        int ret = answer_plaintext(fd);
46                        if( ret == -ECONNRESET ) break REQUEST;
47
[10ba012]48                        if( options.log ) mutex(sout) sout | "=== Answer sent ===";
[7f0ac12]49                        continue REQUEST;
50                }
51
52                if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
[10ba012]53                        if( options.log ) mutex(sout) sout | "=== Request for /ping ===";
[7f0ac12]54
55                        // Send the header
56                        int ret = answer_empty(fd);
57                        if( ret == -ECONNRESET ) break REQUEST;
58
[10ba012]59                        if( options.log ) mutex(sout) sout | "=== Answer sent ===";
[7f0ac12]60                        continue REQUEST;
61                }
62
63                if( options.log ) {
64                        sout | "=== Request for file " | nonl;
65                        write(sout, file, name_size);
66                        sout | " ===";
67                }
68
69                if( !options.file_cache.path ) {
70                        if( options.log ) {
71                                sout | "=== File Not Found (" | nonl;
72                                write(sout, file, name_size);
73                                sout | ") ===";
74                        }
75                        answer_error(fd, E405);
76                        continue REQUEST;
77                }
78
79                // Get the fd from the file cache
80                int ans_fd;
81                size_t count;
82                [ans_fd, count] = get_file( file, name_size );
83
84                // If we can't find the file, return 404
85                if( ans_fd < 0 ) {
86                        if( options.log ) {
87                                sout | "=== File Not Found (" | nonl;
88                                write(sout, file, name_size);
89                                sout | ") ===";
90                        }
91                        answer_error(fd, E404);
92                        continue REQUEST;
93                }
94
95                // Send the desired file
96                int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
97                if( ret == -ECONNRESET ) break REQUEST;
98
[10ba012]99                if( options.log ) mutex(sout) sout | "=== Answer sent ===";
[ef3c383]100        }
[137974ae]101
102        if (stats_thrd) {
103                unsigned long long next = rdtscl();
104                if(next > (last + 500000000)) {
[10ba012]105                        if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
[137974ae]106                                push(this.stats.sendfile, stats_thrd->stats.send);
107                                unlock(stats_thrd->stats.lock);
108                                last = next;
109                        }
110                }
111        }
[481ee28]112}
113
[7f0ac12]114//=============================================================================================
115// Self Accepting Worker Thread
116//=============================================================================================
117void ?{}( AcceptWorker & this ) {
[8c58e73]118        ((thread&)this){ "Server Worker Thread", *options.clopts.instance, 64000 };
119        options.clopts.thrd_cnt++;
[7f0ac12]120        this.done = false;
[0aec496]121}
122
[7f0ac12]123void main( AcceptWorker & this ) {
[e235429]124        park();
[137974ae]125        unsigned long long last = rdtscl();
[7f0ac12]126        /* paranoid */ assert( this.conn.pipe[0] != -1 );
127        /* paranoid */ assert( this.conn.pipe[1] != -1 );
[e95a117]128        for() {
[10ba012]129                if( options.log ) mutex(sout) sout | "=== Accepting connection ===";
[7f0ac12]130                int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY );
[8e3034d]131                if(fd < 0) {
132                        if( errno == ECONNABORTED ) break;
[ee59ede]133                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
[8e3034d]134                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
135                }
[4f762d3]136                if(this.done) break;
[e95a117]137
[10ba012]138                if( options.log ) mutex(sout) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
[7f0ac12]139                size_t len = options.socket.buflen;
140                char buffer[len];
[137974ae]141                handle_connection( this.conn, fd, buffer, len, 0p, last );
[0aec496]142
[10ba012]143                if( options.log ) mutex(sout) sout | "=== Connection closed ===";
[7f0ac12]144        }
145}
[0aec496]146
[561dd26]147
[7f0ac12]148//=============================================================================================
149// Channel Worker Thread
150//=============================================================================================
151void ?{}( ChannelWorker & this ) {
[8c58e73]152        ((thread&)this){ "Server Worker Thread", *options.clopts.instance, 64000 };
153        options.clopts.thrd_cnt++;
[7f0ac12]154        this.done = false;
155}
[561dd26]156
[7f0ac12]157void main( ChannelWorker & this ) {
158        park();
[137974ae]159        unsigned long long last = rdtscl();
[7f0ac12]160        /* paranoid */ assert( this.conn.pipe[0] != -1 );
161        /* paranoid */ assert( this.conn.pipe[1] != -1 );
162        for() {
163                size_t len = options.socket.buflen;
164                char buffer[len];
165                PendingRead p;
[10ba012]166                p.next = 0p;
[7f0ac12]167                p.in.buf = (void*)buffer;
168                p.in.len = len;
169                push(*this.queue, &p);
[561dd26]170
[10ba012]171                if( options.log ) mutex(sout) sout | "=== Waiting new connection ===";
[137974ae]172                handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last );
[561dd26]173
[10ba012]174                if( options.log ) mutex(sout) sout | "=== Connection closed ===";
[7f0ac12]175                if(this.done) break;
176        }
177}
[0aec496]178
[7f0ac12]179extern "C" {
180extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
181}
[97748ee]182
[7f0ac12]183void ?{}( Acceptor & this ) {
[10ba012]184        ((thread&)this){ "Server Acceptor Thread", *options.clopts.instance, 64000 };
[8c58e73]185        options.clopts.thrd_cnt++;
[7f0ac12]186        this.done = false;
187}
[b57db73]188
[c25338d]189#define ACCEPT_SPIN
190
[7f0ac12]191void main( Acceptor & this ) {
192        park();
[137974ae]193        unsigned long long last = rdtscl();
[c25338d]194
195#if defined(ACCEPT_SPIN)
[7f0ac12]196        if( options.log ) sout | "=== Accepting connection ===";
197        for() {
198                int fd = accept4(this.sockfd, this.[addr, addrlen, flags]);
199                if(fd < 0) {
200                        if( errno == EWOULDBLOCK) {
[137974ae]201                                this.stats.eagains++;
[7f0ac12]202                                yield();
203                                continue;
[97748ee]204                        }
[7f0ac12]205                        if( errno == ECONNABORTED ) break;
206                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
207                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
208                }
[137974ae]209                this.stats.accepts++;
210
[7f0ac12]211                if(this.done) return;
[97748ee]212
[7f0ac12]213                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
[97748ee]214
[7f0ac12]215                if(fd) {
216                        PendingRead * p = 0p;
217                        for() {
218                                if(this.done) return;
219                                p = pop(*this.queue);
220                                if(p) break;
221                                yield();
[137974ae]222                                this.stats.creates++;
[7f0ac12]223                        };
224
225                        p->out.fd = fd;
226                        async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY);
[0aec496]227                }
[ee59ede]228
[137974ae]229                if (stats_thrd) {
230                        unsigned long long next = rdtscl();
231                        if(next > (last + 500000000)) {
232                                if(try_lock(stats_thrd->stats.lock)) {
233                                        push(this.stats, stats_thrd->stats.accpt);
234                                        unlock(stats_thrd->stats.lock);
235                                        last = next;
236                                }
237                        }
238                }
239
[7f0ac12]240                if( options.log ) sout | "=== Accepting connection ===";
[0aec496]241        }
[c25338d]242
243#elif define(ACCEPT_MANY)
244        const int nacc = 10;
245        io_future_t results[nacc];
246
247        for(i; nacc) {
248                io_future_t & res = results[i];
249                reset(res);
250                /* paranoid */ assert(!available(res));
251                if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
252                async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
253        }
254
255        for() {
256                if (stats_thrd) {
257                        unsigned long long next = rdtscl();
258                        if(next > (last + 500000000)) {
259                                if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
260                                        push(this.stats, stats_thrd->stats.accpt);
261                                        unlock(stats_thrd->stats.lock);
262                                        last = next;
263                                }
264                        }
265                }
266
267                for(i; nacc) {
268                        io_future_t & res = results[i];
269                        if(available(res)) {
270                                if( options.log ) mutex(sout) sout | "=== Accept no " | i | "completed with result" | res.result | "===";
271                                int fd = get_res(res);
272                                reset(res);
273                                this.stats.accepts++;
274                                if(fd < 0) {
275                                        if( errno == ECONNABORTED ) continue;
276                                        if( this.done && (errno == EINVAL || errno == EBADF) ) continue;
277                                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
278                                }
279                                push_connection( this, fd );
280
281                                /* paranoid */ assert(!available(res));
282                                if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
283                                async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
284                        }
285                }
286                if(this.done) return;
287
288                if( options.log ) mutex(sout) sout | "=== Waiting for any accept ===";
289                this.stats.eagains++;
290                wait_any(results, nacc);
291
292                if( options.log ) mutex(sout) {
293                        sout | "=== Acceptor wake-up ===";
294                        for(i; nacc) {
295                                io_future_t & res = results[i];
296                                sout | i | "available:" | available(res);
297                        }
298                }
299
300        }
301
302        for(i; nacc) {
303                wait(results[i]);
304        }
305#else
306#error no accept algorithm specified
307#endif
[7f0ac12]308}
Note: See TracBrowser for help on using the repository browser.