source: benchmark/io/http/worker.cfa @ bbf61838

ADTast-experimentalpthread-emulationqualifiedEnum
Last change on this file since bbf61838 was 7f0ac12, checked in by Thierry Delisle <tdelisle@…>, 2 years ago

First draft at acceptor thread webserver

  • Property mode set to 100644
File size: 6.0 KB
Line 
1#include "worker.hfa"
2
3#include <errno.h>
4#include <stdio.h>
5#include <string.h>
6#include <unistd.h>
7
8#include <fstream.hfa>
9#include <iofwd.hfa>
10#include <mutex_stmt.hfa>
11
12#include "options.hfa"
13#include "protocol.hfa"
14#include "filecache.hfa"
15
16void ?{}( sendfile_stats_t & this ) {
17        this.calls = 0;
18        this.tries = 0;
19        this.header = 0;
20        this.splcin = 0;
21        this.splcot = 0;
22        for(i; zipf_cnts) {
23                this.avgrd[i].calls = 0;
24                this.avgrd[i].bytes = 0;
25        }
26}
27
28//=============================================================================================
29// Generic connection handling
30//=============================================================================================
31static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f ) {
32        REQUEST:
33        for() {
34                bool closed;
35                HttpCode code;
36                const char * file;
37                size_t name_size;
38
39                // Read the http request
40                if( options.log ) sout | "=== Reading request ===";
41                [code, closed, file, name_size] = http_read(fd, buffer, len, f);
42                f = 0p;
43
44                // if we are done, break out of the loop
45                if( closed ) break REQUEST;
46
47                // If this wasn't a request retrun 400
48                if( code != OK200 ) {
49                        sout | "=== Invalid Request :" | code_val(code) | "===";
50                        answer_error(fd, code);
51                        continue REQUEST;
52                }
53
54                if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
55                        if( options.log ) sout | "=== Request for /plaintext ===";
56
57                        int ret = answer_plaintext(fd);
58                        if( ret == -ECONNRESET ) break REQUEST;
59
60                        if( options.log ) sout | "=== Answer sent ===";
61                        continue REQUEST;
62                }
63
64                if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
65                        if( options.log ) sout | "=== Request for /ping ===";
66
67                        // Send the header
68                        int ret = answer_empty(fd);
69                        if( ret == -ECONNRESET ) break REQUEST;
70
71                        if( options.log ) sout | "=== Answer sent ===";
72                        continue REQUEST;
73                }
74
75                if( options.log ) {
76                        sout | "=== Request for file " | nonl;
77                        write(sout, file, name_size);
78                        sout | " ===";
79                }
80
81                if( !options.file_cache.path ) {
82                        if( options.log ) {
83                                sout | "=== File Not Found (" | nonl;
84                                write(sout, file, name_size);
85                                sout | ") ===";
86                        }
87                        answer_error(fd, E405);
88                        continue REQUEST;
89                }
90
91                // Get the fd from the file cache
92                int ans_fd;
93                size_t count;
94                [ans_fd, count] = get_file( file, name_size );
95
96                // If we can't find the file, return 404
97                if( ans_fd < 0 ) {
98                        if( options.log ) {
99                                sout | "=== File Not Found (" | nonl;
100                                write(sout, file, name_size);
101                                sout | ") ===";
102                        }
103                        answer_error(fd, E404);
104                        continue REQUEST;
105                }
106
107                // Send the desired file
108                int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
109                if( ret == -ECONNRESET ) break REQUEST;
110
111                if( options.log ) sout | "=== Answer sent ===";
112        }
113}
114
115//=============================================================================================
116// Self Accepting Worker Thread
117//=============================================================================================
118void ?{}( AcceptWorker & this ) {
119        size_t cli = rand() % options.clopts.cltr_cnt;
120        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
121        options.clopts.thrd_cnt[cli]++;
122        this.done = false;
123}
124
125void main( AcceptWorker & this ) {
126        park();
127        /* paranoid */ assert( this.conn.pipe[0] != -1 );
128        /* paranoid */ assert( this.conn.pipe[1] != -1 );
129        for() {
130                if( options.log ) sout | "=== Accepting connection ===";
131                int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY );
132                if(fd < 0) {
133                        if( errno == ECONNABORTED ) break;
134                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
135                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
136                }
137                if(this.done) break;
138
139                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
140                size_t len = options.socket.buflen;
141                char buffer[len];
142                handle_connection( this.conn, fd, buffer, len, 0p );
143
144                if( options.log ) sout | "=== Connection closed ===";
145        }
146}
147
148
149//=============================================================================================
150// Channel Worker Thread
151//=============================================================================================
152void ?{}( ChannelWorker & this ) {
153        size_t cli = rand() % options.clopts.cltr_cnt;
154        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
155        options.clopts.thrd_cnt[cli]++;
156        this.done = false;
157}
158
159void main( ChannelWorker & this ) {
160        park();
161        /* paranoid */ assert( this.conn.pipe[0] != -1 );
162        /* paranoid */ assert( this.conn.pipe[1] != -1 );
163        for() {
164                size_t len = options.socket.buflen;
165                char buffer[len];
166                PendingRead p;
167                p.in.buf = (void*)buffer;
168                p.in.len = len;
169                push(*this.queue, &p);
170
171                if( options.log ) sout | "=== Waiting new connection ===";
172                handle_connection( this.conn, p.out.fd, buffer, len, &p.f );
173
174                if( options.log ) sout | "=== Connection closed ===";
175                if(this.done) break;
176        }
177}
178
179extern "C" {
180extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
181}
182
183void ?{}( Acceptor & this ) {
184        size_t cli = rand() % options.clopts.cltr_cnt;
185        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
186        options.clopts.thrd_cnt[cli]++;
187        this.done = false;
188}
189
190void main( Acceptor & this ) {
191        park();
192        if( options.log ) sout | "=== Accepting connection ===";
193        for() {
194                int fd = accept4(this.sockfd, this.[addr, addrlen, flags]);
195                if(fd < 0) {
196                        if( errno == EWOULDBLOCK) {
197                                yield();
198                                continue;
199                        }
200                        if( errno == ECONNABORTED ) break;
201                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
202                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
203                }
204                if(this.done) return;
205
206                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
207
208                if(fd) {
209                        PendingRead * p = 0p;
210                        for() {
211                                if(this.done) return;
212                                p = pop(*this.queue);
213                                if(p) break;
214                                yield();
215                        };
216
217                        p->out.fd = fd;
218                        async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY);
219                }
220
221                if( options.log ) sout | "=== Accepting connection ===";
222        }
223}
Note: See TracBrowser for help on using the repository browser.