source: benchmark/io/http/worker.cfa @ 5ff721a

Last change on this file since 5ff721a was 0f1336c, checked in by Thierry Delisle <tdelisle@…>, 2 years ago

Minor error handling improvement.

  • Property mode set to 100644
File size: 10.8 KB
RevLine 
[0aec496]1#include "worker.hfa"
2
3#include <errno.h>
4#include <stdio.h>
5#include <string.h>
[c82af9f]6#include <unistd.h>
[0aec496]7
[8c43d05]8#include <fstream.hfa>
[0aec496]9#include <iofwd.hfa>
[7f0ac12]10#include <mutex_stmt.hfa>
[0aec496]11
12#include "options.hfa"
13#include "protocol.hfa"
14#include "filecache.hfa"
15
[32d1383]16static const unsigned long long period = 5_000_000;
[3f95dab]17
[0aec496]18//=============================================================================================
[7f0ac12]19// Generic connection handling
[0aec496]20//=============================================================================================
[137974ae]21static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f, unsigned long long & last ) {
[7f0ac12]22        REQUEST:
23        for() {
24                bool closed;
25                HttpCode code;
26                const char * file;
27                size_t name_size;
28
29                // Read the http request
[10ba012]30                if( options.log ) mutex(sout) sout | "=== Reading request ===";
[7f0ac12]31                [code, closed, file, name_size] = http_read(fd, buffer, len, f);
32                f = 0p;
33
34                // if we are done, break out of the loop
[32d1383]35                if( closed ) {
36                        if( code != OK200 ) this.stats.sendfile.error++;
37                        break REQUEST;
38                }
[7f0ac12]39
40                // If this wasn't a request retrun 400
41                if( code != OK200 ) {
[32d1383]42                        abort | "=== Invalid Request :" | code_val(code) | "===";
43                        // answer_error(fd, code);
44                        // continue REQUEST;
[7f0ac12]45                }
[ef3c383]46
[32d1383]47                // if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
48                //      if( options.log ) mutex(sout) sout | "=== Request for /plaintext ===";
[7f0ac12]49
[32d1383]50                //      int ret = answer_plaintext(fd);
51                //      if( ret == -ECONNRESET ) { this.stats.sendfile.error++; break REQUEST; }
[7f0ac12]52
[32d1383]53                //      if( options.log ) mutex(sout) sout | "=== Answer sent ===";
54                //      continue REQUEST;
55                // }
[7f0ac12]56
[32d1383]57                // if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
58                //      if( options.log ) mutex(sout) sout | "=== Request for /ping ===";
[7f0ac12]59
[32d1383]60                //      // Send the header
61                //      int ret = answer_empty(fd);
62                //      if( ret == -ECONNRESET ) { this.stats.sendfile.error++; break REQUEST; }
[7f0ac12]63
[32d1383]64                //      if( options.log ) mutex(sout) sout | "=== Answer sent ===";
65                //      continue REQUEST;
66                // }
[7f0ac12]67
68                if( options.log ) {
69                        sout | "=== Request for file " | nonl;
70                        write(sout, file, name_size);
71                        sout | " ===";
72                }
73
74                if( !options.file_cache.path ) {
[32d1383]75                        // if( options.log ) {
76                                serr | "=== File Not Found (" | nonl;
77                                write(serr, file, name_size);
78                                serr | ") ===";
79                                abort();
80                        // }
81                        // answer_error(fd, E405);
82                        // continue REQUEST;
[7f0ac12]83                }
84
85                // Get the fd from the file cache
86                int ans_fd;
87                size_t count;
88                [ans_fd, count] = get_file( file, name_size );
89
90                // If we can't find the file, return 404
91                if( ans_fd < 0 ) {
[32d1383]92                        // if( options.log ) {
93                                serr | "=== File Not Found 2 (" | nonl;
94                                write(serr, file, name_size);
95                                serr | ") ===";
96                                abort();
97                        // }
98                        // answer_error(fd, E404);
99                        // continue REQUEST;
[7f0ac12]100                }
101
102                // Send the desired file
103                int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
[3f95dab]104                if(ret < 0) {
[32d1383]105                        if( ret == -ECONNABORTED ) { this.stats.sendfile.error++; break REQUEST; }
106                        if( ret == -ECONNRESET ) { this.stats.sendfile.error++; break REQUEST; }
107                        if( ret == -EPIPE ) { this.stats.sendfile.error++; break REQUEST; }
108                        if( ret == -EBADF ) { this.stats.sendfile.error++; break REQUEST; }
109                        abort( "answer sendfile error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
[3f95dab]110                }
[7f0ac12]111
[10ba012]112                if( options.log ) mutex(sout) sout | "=== Answer sent ===";
[ef3c383]113        }
[137974ae]114
[32d1383]115        this.stats.sendfile.close++;
116
[137974ae]117        if (stats_thrd) {
[32d1383]118                // unsigned long long next = rdtscl();
119                // if(next > (last + period)) {
[10ba012]120                        if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
[137974ae]121                                push(this.stats.sendfile, stats_thrd->stats.send);
122                                unlock(stats_thrd->stats.lock);
[32d1383]123                                // last = next;
[137974ae]124                        }
[32d1383]125                // }
[137974ae]126        }
[481ee28]127}
128
[7f0ac12]129//=============================================================================================
130// Self Accepting Worker Thread
131//=============================================================================================
132void ?{}( AcceptWorker & this ) {
[329e26a]133        size_t cli = rand() % options.clopts.cltr_cnt;
134        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
135        options.clopts.thrd_cnt[cli]++;
[7f0ac12]136        this.done = false;
[0aec496]137}
138
[7f0ac12]139void main( AcceptWorker & this ) {
[e235429]140        park();
[137974ae]141        unsigned long long last = rdtscl();
[7f0ac12]142        /* paranoid */ assert( this.conn.pipe[0] != -1 );
143        /* paranoid */ assert( this.conn.pipe[1] != -1 );
[e95a117]144        for() {
[10ba012]145                if( options.log ) mutex(sout) sout | "=== Accepting connection ===";
[7f0ac12]146                int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY );
[0f1336c]147                if(fd <= 0) {
[8e3034d]148                        if( errno == ECONNABORTED ) break;
[ee59ede]149                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
[0f1336c]150                        abort( "accept error %d: (%d) %s\n", fd, (int)errno, strerror(errno) );
[8e3034d]151                }
[4f762d3]152                if(this.done) break;
[e95a117]153
[32d1383]154                this.stats.accepts++;
155                if (stats_thrd && try_lock(stats_thrd->stats.lock)) {
156                        push(this.stats, stats_thrd->stats.accpt);
157                        unlock(stats_thrd->stats.lock);
158                }
159
[10ba012]160                if( options.log ) mutex(sout) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
[7f0ac12]161                size_t len = options.socket.buflen;
162                char buffer[len];
[137974ae]163                handle_connection( this.conn, fd, buffer, len, 0p, last );
[3f95dab]164                this.conn.stats.sendfile.maxfd = max(this.conn.stats.sendfile.maxfd, fd);
[0aec496]165
[10ba012]166                if( options.log ) mutex(sout) sout | "=== Connection closed ===";
[7f0ac12]167        }
168}
[0aec496]169
[561dd26]170
[7f0ac12]171//=============================================================================================
172// Channel Worker Thread
173//=============================================================================================
174void ?{}( ChannelWorker & this ) {
[329e26a]175        size_t cli = rand() % options.clopts.cltr_cnt;
176        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
177        options.clopts.thrd_cnt[cli]++;
[7f0ac12]178        this.done = false;
179}
[561dd26]180
[7f0ac12]181void main( ChannelWorker & this ) {
182        park();
[137974ae]183        unsigned long long last = rdtscl();
[7f0ac12]184        /* paranoid */ assert( this.conn.pipe[0] != -1 );
185        /* paranoid */ assert( this.conn.pipe[1] != -1 );
[3f95dab]186        this.conn.stats.sendfile.maxfd = max(this.conn.pipe[0], this.conn.pipe[1]);
[32d1383]187        // this.conn.stats.sendfile.maxfd = 0;
[7f0ac12]188        for() {
189                size_t len = options.socket.buflen;
190                char buffer[len];
191                PendingRead p;
[10ba012]192                p.next = 0p;
[7f0ac12]193                p.in.buf = (void*)buffer;
194                p.in.len = len;
195                push(*this.queue, &p);
[561dd26]196
[10ba012]197                if( options.log ) mutex(sout) sout | "=== Waiting new connection ===";
[137974ae]198                handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last );
[3f95dab]199                if(this.done) break;
200                this.conn.stats.sendfile.maxfd = max(this.conn.stats.sendfile.maxfd, p.out.fd);
201                this.conn.stats.sendfile.close++;
[561dd26]202
[10ba012]203                if( options.log ) mutex(sout) sout | "=== Connection closed ===";
[7f0ac12]204        }
[3f95dab]205
206        lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2);
207        push(this.conn.stats.sendfile, stats_thrd->stats.send);
208        unlock(stats_thrd->stats.lock);
[7f0ac12]209}
[0aec496]210
[7f0ac12]211extern "C" {
212extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
213}
[97748ee]214
[329e26a]215void ?{}( Acceptor & this, int cli ) {
216        ((thread&)this){ "Server Acceptor Thread", *options.clopts.instance[cli], 64000 };
217        options.clopts.thrd_cnt[cli]++;
[7f0ac12]218        this.done = false;
219}
[b57db73]220
[07997cd]221static inline __s32 get_res( io_future_t & this ) {
222        if( this.result < 0 ) {{
223                errno = -this.result;
224                return -1;
225        }}
226        return this.result;
227}
228
229static inline void push_connection( Acceptor & this, int fd ) {
[3f95dab]230        this.stats.accepts++;
[07997cd]231        PendingRead * p = 0p;
232        for() {
233                if(this.done) return;
234                p = pop(*this.queue);
235                if(p) break;
[32d1383]236                // abort( "Too few threads" );
[07997cd]237                yield();
238                this.stats.creates++;
239        };
240
241        p->out.fd = fd;
242        async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY);
243}
244
245// #define ACCEPT_SPIN
[3f95dab]246#define ACCEPT_ONE
247// #define ACCEPT_MANY
[c25338d]248
[7f0ac12]249void main( Acceptor & this ) {
250        park();
[137974ae]251        unsigned long long last = rdtscl();
[c25338d]252
253#if defined(ACCEPT_SPIN)
[7f0ac12]254        if( options.log ) sout | "=== Accepting connection ===";
255        for() {
256                int fd = accept4(this.sockfd, this.[addr, addrlen, flags]);
257                if(fd < 0) {
258                        if( errno == EWOULDBLOCK) {
[137974ae]259                                this.stats.eagains++;
[7f0ac12]260                                yield();
261                                continue;
[97748ee]262                        }
[7f0ac12]263                        if( errno == ECONNABORTED ) break;
264                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
265                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
266                }
[137974ae]267
[7f0ac12]268                if(this.done) return;
[97748ee]269
[7f0ac12]270                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
[97748ee]271
[07997cd]272                if(fd) push_connection(this, fd);
[ee59ede]273
[137974ae]274                if (stats_thrd) {
275                        unsigned long long next = rdtscl();
[3f95dab]276                        if(next > (last + period)) {
277                                if(try_lock(stats_thrd->stats.lock)) {
278                                        push(this.stats, stats_thrd->stats.accpt);
279                                        unlock(stats_thrd->stats.lock);
280                                        last = next;
281                                }
282                        }
283                }
284
285                if( options.log ) sout | "=== Accepting connection ===";
286        }
287
288#elif defined(ACCEPT_ONE)
289        if( options.log ) sout | "=== Accepting connection ===";
290        for() {
291                int fd = cfa_accept4(this.sockfd, this.[addr, addrlen, flags], 0);
292                if(fd < 0) {
293                        if( errno == ECONNABORTED ) break;
294                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
295                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
296                }
297
298                if(this.done) return;
299
300                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
301
302                if(fd) push_connection(this, fd);
303
304                if (stats_thrd) {
[32d1383]305                        // unsigned long long next = rdtscl();
306                        // if(next > (last + period)) {
[137974ae]307                                if(try_lock(stats_thrd->stats.lock)) {
308                                        push(this.stats, stats_thrd->stats.accpt);
309                                        unlock(stats_thrd->stats.lock);
[32d1383]310                                        // last = next;
[137974ae]311                                }
[32d1383]312                        // }
[137974ae]313                }
314
[7f0ac12]315                if( options.log ) sout | "=== Accepting connection ===";
[0aec496]316        }
[c25338d]317
[07997cd]318#elif defined(ACCEPT_MANY)
[c25338d]319        const int nacc = 10;
320        io_future_t results[nacc];
321
322        for(i; nacc) {
323                io_future_t & res = results[i];
324                reset(res);
325                /* paranoid */ assert(!available(res));
326                if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
327                async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
328        }
329
330        for() {
331                if (stats_thrd) {
332                        unsigned long long next = rdtscl();
[3f95dab]333                        if(next > (last + period)) {
[c25338d]334                                if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
335                                        push(this.stats, stats_thrd->stats.accpt);
336                                        unlock(stats_thrd->stats.lock);
337                                        last = next;
338                                }
339                        }
340                }
341
342                for(i; nacc) {
343                        io_future_t & res = results[i];
344                        if(available(res)) {
345                                if( options.log ) mutex(sout) sout | "=== Accept no " | i | "completed with result" | res.result | "===";
346                                int fd = get_res(res);
347                                reset(res);
348                                if(fd < 0) {
349                                        if( errno == ECONNABORTED ) continue;
350                                        if( this.done && (errno == EINVAL || errno == EBADF) ) continue;
351                                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
352                                }
353                                push_connection( this, fd );
354
355                                /* paranoid */ assert(!available(res));
356                                if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
357                                async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
358                        }
359                }
360                if(this.done) return;
361
362                if( options.log ) mutex(sout) sout | "=== Waiting for any accept ===";
363                this.stats.eagains++;
364                wait_any(results, nacc);
365
366                if( options.log ) mutex(sout) {
367                        sout | "=== Acceptor wake-up ===";
368                        for(i; nacc) {
369                                io_future_t & res = results[i];
370                                sout | i | "available:" | available(res);
371                        }
372                }
373
374        }
375
376        for(i; nacc) {
377                wait(results[i]);
378        }
379#else
380#error no accept algorithm specified
381#endif
[3f95dab]382        lock(stats_thrd->stats.lock);
383        push(this.stats, stats_thrd->stats.accpt);
384        unlock(stats_thrd->stats.lock);
[7f0ac12]385}
Note: See TracBrowser for help on using the repository browser.