source: benchmark/io/http/protocol.cfa @ e76fa30

ADTast-experimentalpthread-emulation
Last change on this file since e76fa30 was 3f95dab, checked in by Thierry Delisle <tdelisle@…>, 2 years ago

Committing hopefully last version of the webserver

  • Property mode set to 100644
File size: 16.2 KB
Line 
1#include "protocol.hfa"
2
3#define _GNU_SOURCE
4extern "C" {
5        #include <fcntl.h>
6}
7
8#define xstr(s) str(s)
9#define str(s) #s
10
11#include <fstream.hfa>
12#include <iofwd.hfa>
13#include <io/types.hfa>
14#include <mutex_stmt.hfa>
15
16#include <assert.h>
17// #include <stdio.h> // Don't use stdio.h, too slow to compile
18extern "C" {
19      int snprintf ( char * s, size_t n, const char * format, ... );
20        // #include <linux/io_uring.h>
21}
22#include <string.h>
23#include <errno.h>
24
25#include "options.hfa"
26#include "worker.hfa"
27
28#define PLAINTEXT_1WRITE
29#define PLAINTEXT_MEMCPY
30#define PLAINTEXT_NOCOPY
31// #define LINKED_IO
32
33static inline __s32 wait_res( io_future_t & this ) {
34        wait( this );
35        if( this.result < 0 ) {{
36                errno = -this.result;
37                return -1;
38        }}
39        return this.result;
40}
41
42struct https_msg_str {
43        char msg[512];
44        size_t len;
45};
46
47const https_msg_str * volatile http_msgs[KNOWN_CODES] = { 0 };
48
49_Static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0])));
50
51const int http_codes[KNOWN_CODES] = {
52        200,
53        200,
54        400,
55        404,
56        405,
57        408,
58        413,
59        414,
60};
61
62_Static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0])));
63
64int code_val(HttpCode code) {
65        return http_codes[code];
66}
67
68static inline int answer( int fd, const char * it, int len ) {
69        while(len > 0) {
70                // Call write
71                int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY);
72                if( ret < 0 ) {
73                        if( errno == ECONNRESET || errno == EPIPE ) { close(fd); return -ECONNRESET; }
74
75                        abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) );
76                }
77
78                // update it/len
79                it  += ret;
80                len -= ret;
81        }
82        return 0;
83}
84
85int answer_error( int fd, HttpCode code ) {
86        /* paranoid */ assert( code < KNOWN_CODES && code != OK200 );
87        int idx = (int)code;
88        return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len );
89}
90
91static int fill_header(char * it, size_t size) {
92        memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
93        it += http_msgs[OK200]->len;
94        int len = http_msgs[OK200]->len;
95        len += snprintf(it, 512 - len, "%d \n\n", size);
96        return len;
97}
98
99static int answer_header( int fd, size_t size ) {
100        char buffer[512];
101        int len = fill_header(buffer, size);
102        return answer( fd, buffer, len );
103}
104
105#if defined(PLAINTEXT_NOCOPY)
106int answer_plaintext( int fd ) {
107        return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len); // +1 cause snprintf doesn't count nullterminator
108}
109#elif defined(PLAINTEXT_MEMCPY)
110#define TEXTSIZE 15
111int answer_plaintext( int fd ) {
112        char text[] = "Hello, World!\n\n";
113        char ts[] = xstr(TEXTSIZE) " \n\n";
114        _Static_assert(sizeof(text) - 1 == TEXTSIZE);
115        char buffer[512 + TEXTSIZE];
116        char * it = buffer;
117        memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
118        it += http_msgs[OK200]->len;
119        int len = http_msgs[OK200]->len;
120        memcpy(it, ts, sizeof(ts) - 1);
121        it += sizeof(ts) - 1;
122        len += sizeof(ts) - 1;
123        memcpy(it, text, TEXTSIZE);
124        return answer(fd, buffer, len + TEXTSIZE);
125}
126#elif defined(PLAINTEXT_1WRITE)
127int answer_plaintext( int fd ) {
128        char text[] = "Hello, World!\n\n";
129        char buffer[512 + sizeof(text)];
130        char * it = buffer;
131        memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
132        it += http_msgs[OK200]->len;
133        int len = http_msgs[OK200]->len;
134        int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text));
135        it += r;
136        len += r;
137        memcpy(it, text, sizeof(text));
138        return answer(fd, buffer, len + sizeof(text));
139}
140#else
141int answer_plaintext( int fd ) {
142        char text[] = "Hello, World!\n\n";
143        int ret = answer_header(fd, sizeof(text));
144        if( ret < 0 ) return ret;
145        return answer(fd, text, sizeof(text));
146}
147#endif
148
149int answer_empty( int fd ) {
150        return answer_header(fd, 0);
151}
152
153static int sendfile( int pipe[2], int fd, int ans_fd, size_t count, sendfile_stats_t & stats ) {
154        int zipf_idx = -1;
155        STATS: for(i; zipf_cnts) {
156                if(count <= zipf_sizes[i]) {
157                        zipf_idx = i;
158                        break STATS;
159                }
160        }
161        if(zipf_idx < 0) mutex(serr) serr | "SENDFILE" | count | " greated than biggest zipf file";
162
163
164        unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE;
165        off_t offset = 0;
166        ssize_t ret;
167        SPLICE1: while(count > 0) {
168                stats.tries++;
169                // ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY);
170                ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags);
171                if( ret <= 0 ) {
172                        if( errno == ECONNRESET ) return -ECONNRESET;
173                        if( errno == EPIPE ) return -EPIPE;
174                        abort( "splice [0] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
175                }
176                count -= ret;
177                stats.splcin++;
178                if(count > 0) stats.avgrd[zipf_idx].calls++;
179                stats.avgrd[zipf_idx].bytes += ret;
180
181                size_t in_pipe = ret;
182                SPLICE2: while(in_pipe > 0) {
183                        // ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY);
184                        ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags);
185                        if( ret <= 0 ) {
186                                if( errno == ECONNRESET ) return -ECONNRESET;
187                                if( errno == EPIPE ) return -EPIPE;
188                                abort( "splice [1] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
189                        }
190                        stats.splcot++;
191                        in_pipe -= ret;
192                }
193
194        }
195        return count;
196}
197
198enum FSM_STATE {
199        Initial,
200        Retry,
201        Error,
202        Done,
203};
204
205struct FSM_Result {
206        FSM_STATE state;
207        int error;
208};
209
210static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; }
211static inline bool is_error(FSM_Result & this) { return Error == this.state; }
212static inline bool is_done(FSM_Result & this) { return Done == this.state; }
213
214static inline int error(FSM_Result & this, int error) {
215        this.error = error;
216        this.state = Error;
217        return error;
218}
219
220static inline int done(FSM_Result & this) {
221        this.state = Done;
222        return 0;
223}
224
225static inline int retry(FSM_Result & this) {
226        this.state = Retry;
227        return 0;
228}
229
230static inline int need(FSM_Result & this) {
231        switch(this.state) {
232                case Initial:
233                case Retry:
234                        return 1;
235                case Error:
236                        if(this.error == 0) mutex(serr) serr | "State marked error but code is 0";
237                case Done:
238                        return 0;
239        }
240}
241
242// Generator that handles sending the header
243generator header_g {
244        io_future_t f;
245        const char * next;
246        int fd; size_t len;
247        FSM_Result res;
248};
249
250static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) {
251        this.next = it;
252        this.fd = fd;
253        this.len = len;
254}
255
256static inline void fill(header_g & this, struct io_uring_sqe * sqe) {
257        zero_sqe(sqe);
258        sqe->opcode = IORING_OP_SEND;
259        sqe->user_data = (uintptr_t)&this.f;
260        sqe->flags = IOSQE_IO_LINK;
261        sqe->fd = this.fd;
262        sqe->addr = (uintptr_t)this.next;
263        sqe->len = this.len;
264}
265
266static inline int error(header_g & this, int error) {
267        int ret = close(this.fd);
268        if( ret != 0 ) {
269                mutex(serr) serr | "Failed to close fd" | errno;
270        }
271        return error(this.res, error);
272}
273
274static inline int wait_and_process(header_g & this, sendfile_stats_t & stats) {
275        wait(this.f);
276
277        // Did something crazy happen?
278        if(this.f.result > this.len) {
279                mutex(serr) serr | "HEADER sent too much!";
280                return error(this, -ERANGE);
281        }
282
283        // Something failed?
284        if(this.f.result < 0) {
285                int error = -this.f.result;
286                if( error == ECONNRESET ) return error(this, -ECONNRESET);
287                if( error == EPIPE ) return error(this, -EPIPE);
288                if( error == ECANCELED ) {
289                        mutex(serr) serr | "HEADER was cancelled, WTF!";
290                        return error(this, -ECONNRESET);
291                }
292                if( error == EAGAIN || error == EWOULDBLOCK) {
293                        mutex(serr) serr | "HEADER got eagain, WTF!";
294                        return error(this, -ECONNRESET);
295                }
296        }
297
298        // Done?
299        if(this.f.result == this.len) {
300                return done(this.res);
301        }
302
303        stats.header++;
304
305        // It must be a Short read
306        this.len  -= this.f.result;
307        this.next += this.f.result;
308        reset(this.f);
309        return retry(this.res);
310}
311
312// Generator that handles splicing in a file
313struct splice_in_t {
314        io_future_t f;
315        int fd; int pipe; size_t len; off_t off;
316        short zipf_idx;
317        FSM_Result res;
318};
319
320static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) {
321        this.fd = fd;
322        this.pipe = pipe;
323        this.len = len;
324        this.off = 0;
325        this.zipf_idx = -1;
326        STATS: for(i; zipf_cnts) {
327                if(len <= zipf_sizes[i]) {
328                        this.zipf_idx = i;
329                        break STATS;
330                }
331        }
332        if(this.zipf_idx < 0) mutex(serr) serr | "SPLICE IN" | len | " greated than biggest zipf file";
333}
334
335static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) {
336        zero_sqe(sqe);
337        sqe->opcode = IORING_OP_SPLICE;
338        sqe->user_data = (uintptr_t)&this.f;
339        sqe->flags = 0;
340        sqe->splice_fd_in = this.fd;
341        sqe->splice_off_in = this.off;
342        sqe->fd = this.pipe;
343        sqe->off = (__u64)-1;
344        sqe->len = this.len;
345        sqe->splice_flags = SPLICE_F_MOVE;
346}
347
348static inline int wait_and_process(splice_in_t & this, sendfile_stats_t & stats ) {
349        wait(this.f);
350
351        // Something failed?
352        if(this.f.result < 0) {
353                int error = -this.f.result;
354                if( error == ECONNRESET ) return error(this.res, -ECONNRESET);
355                if( error == EPIPE ) return error(this.res, -EPIPE);
356                if( error == ECANCELED ) {
357                        mutex(serr) serr | "SPLICE IN was cancelled, WTF!";
358                        return error(this.res, -ECONNRESET);
359                }
360                if( error == EAGAIN || error == EWOULDBLOCK) {
361                        mutex(serr) serr | "SPLICE IN got eagain, WTF!";
362                        return error(this.res, -ECONNRESET);
363                }
364                mutex(serr) serr | "SPLICE IN got" | error | ", WTF!";
365                return error(this.res, -ECONNRESET);
366        }
367
368        // Did something crazy happen?
369        if(this.f.result > this.len) {
370                mutex(serr) serr | "SPLICE IN spliced too much!";
371                return error(this.res, -ERANGE);
372        }
373
374        // Done?
375        if(this.f.result == this.len) {
376                return done(this.res);
377        }
378
379        stats.splcin++;
380        stats.avgrd[this.zipf_idx].calls++;
381        stats.avgrd[this.zipf_idx].bytes += this.f.result;
382
383        // It must be a Short read
384        this.len -= this.f.result;
385        this.off += this.f.result;
386        reset(this.f);
387        return retry(this.res);
388}
389
390generator splice_out_g {
391        io_future_t f;
392        int pipe; int fd; size_t len;
393        FSM_Result res;
394};
395
396static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) {
397        this.pipe = pipe;
398        this.fd = fd;
399        this.len = len;
400}
401
402static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) {
403        zero_sqe(sqe);
404        sqe->opcode = IORING_OP_SPLICE;
405        sqe->user_data = (uintptr_t)&this.f;
406        sqe->flags = 0;
407        sqe->splice_fd_in = this.pipe;
408        sqe->splice_off_in = (__u64)-1;
409        sqe->fd = this.fd;
410        sqe->off = (__u64)-1;
411        sqe->len = this.len;
412        sqe->splice_flags = SPLICE_F_MOVE;
413}
414
415static inline int error(splice_out_g & this, int error) {
416        int ret = close(this.fd);
417        if( ret != 0 ) {
418                mutex(serr) serr | "Failed to close fd" | errno;
419        }
420        return error(this.res, error);
421}
422
423static inline void wait_and_process(splice_out_g & this, sendfile_stats_t & stats ) {
424        wait(this.f);
425
426        // Something failed?
427        if(this.f.result < 0) {
428                int error = -this.f.result;
429                if( error == ECONNRESET ) return error(this, -ECONNRESET);
430                if( error == EPIPE ) return error(this, -EPIPE);
431                if( error == ECANCELED ) {
432                        this.f.result = 0;
433                        goto SHORT_WRITE;
434                }
435                if( error == EAGAIN || error == EWOULDBLOCK) {
436                        mutex(serr) serr | "SPLICE OUT got eagain, WTF!";
437                        return error(this, -ECONNRESET);
438                }
439                mutex(serr) serr | "SPLICE OUT got" | error | ", WTF!";
440                return error(this, -ECONNRESET);
441        }
442
443        // Did something crazy happen?
444        if(this.f.result > this.len) {
445                mutex(serr) serr | "SPLICE OUT spliced too much!" | this.f.result | ">" | this.len;
446                return error(this.res, -ERANGE);
447        }
448
449        // Done?
450        if(this.f.result == this.len) {
451                return done(this.res);
452        }
453
454SHORT_WRITE:
455        stats.splcot++;
456
457        // It must be a Short Write
458        this.len -= this.f.result;
459        reset(this.f);
460        return retry(this.res);
461}
462
463int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize, sendfile_stats_t & stats ) {
464        stats.calls++;
465        #if defined(LINKED_IO)
466                char buffer[512];
467                int len = fill_header(buffer, fsize);
468                header_g header = { fd, buffer, len };
469                splice_in_t splice_in = { ans_fd, pipe[1], fsize };
470                splice_out_g splice_out = { pipe[0], fd, fsize };
471
472                RETRY_LOOP: for() {
473                        stats.tries++;
474                        int have = need(header.res) + need(splice_in.res) + 1;
475                        int idx = 0;
476                        struct io_uring_sqe * sqes[3];
477                        __u32 idxs[3];
478                        struct $io_context * ctx = cfa_io_allocate(sqes, idxs, have);
479
480                        if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); }
481                        if(need(   header.res)) { fill(header   , sqes[idx++]); }
482                        fill(splice_out, sqes[idx]);
483
484                        // Submit everything
485                        asm volatile("": : :"memory");
486                        cfa_io_submit( ctx, idxs, have, false );
487
488                        // wait for the results
489                        // Always wait for splice-in to complete as
490                        // we may need to kill the connection if it fails
491                        // If it already completed, this is a no-op
492                        wait_and_process(splice_in, stats);
493
494                        if(is_error(splice_in.res)) {
495                                if(splice_in.res.error == -EPIPE) return -ECONNRESET;
496                                mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error;
497                                close(fd);
498                        }
499
500                        // Process the other 2
501                        wait_and_process(header, stats);
502                        wait_and_process(splice_out, stats);
503
504                        if(is_done(splice_out.res)) {
505                                break RETRY_LOOP;
506                        }
507
508                        // We need to wait for the completion if
509                        // - both completed
510                        // - the header failed
511                        // -
512
513                        if(  is_error(header.res)
514                          || is_error(splice_in.res)
515                          || is_error(splice_out.res)) {
516                                return -ECONNRESET;
517                        }
518                }
519
520                return len + fsize;
521        #else
522                int ret = answer_header(fd, fsize);
523                if( ret < 0 ) { close(fd); return ret; }
524                return sendfile(pipe, fd, ans_fd, fsize, stats);
525        #endif
526}
527
528[HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f) {
529        char * it = buffer;
530        size_t count = len - 1;
531        int rlen = 0;
532        READ:
533        for() {
534                int ret;
535                if( f ) {
536                        ret = wait_res(*f);
537                        reset(*f);
538                        f = 0p;
539                } else {
540                        ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
541                }
542                // int ret = read(fd, (void*)it, count);
543                if(ret == 0 ) { close(fd); return [OK200, true, 0, 0]; }
544                if(ret < 0 ) {
545                        if( errno == ECONNRESET ) { close(fd); return [E408, true, 0, 0]; }
546                        if( errno == EPIPE ) { close(fd); return [E408, true, 0, 0]; }
547                        abort( "read error: (%d) %s\n", (int)errno, strerror(errno) );
548                }
549                it[ret + 1] = '\0';
550                rlen += ret;
551
552                if( strstr( it, "\r\n\r\n" ) ) break;
553
554                it += ret;
555                count -= ret;
556
557                if( count < 1 ) return [E414, false, 0, 0];
558        }
559
560        if( options.log ) {
561                write(sout, buffer, rlen);
562                sout | nl;
563        }
564
565        it = buffer;
566        int ret = memcmp(it, "GET /", 5);
567        if( ret != 0 ) return [E400, false, 0, 0];
568        it += 5;
569
570        char * end = strstr( it, " " );
571        return [OK200, false, it, end - it];
572}
573
574//=============================================================================================
575
576#include <clock.hfa>
577#include <time.hfa>
578#include <thread.hfa>
579
580const char * original_http_msgs[] = {
581        "HTTP/1.1 200 OK\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: ",
582        "HTTP/1.1 200 OK\r\nServer: HttpForall\r\nConnection: keep-alive\r\nContent-Length: 15\r\nContent-Type: text/html\r\nDate: %s \r\n\r\nHello, World!\r\n",
583        "HTTP/1.1 400 Bad Request\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
584        "HTTP/1.1 404 Not Found\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
585        "HTTP/1.1 405 Method Not Allowed\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
586        "HTTP/1.1 408 Request Timeout\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
587        "HTTP/1.1 413 Payload Too Large\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
588        "HTTP/1.1 414 URI Too Long\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
589};
590
591struct date_buffer {
592        https_msg_str strs[KNOWN_CODES];
593};
594
595thread DateFormater {
596        int idx;
597        date_buffer buffers[2];
598};
599
600void ?{}( DateFormater & this ) {
601        ((thread&)this){ "Server Date Thread" };
602        this.idx = 0;
603        memset( &this.buffers[0], 0, sizeof(this.buffers[0]) );
604        memset( &this.buffers[1], 0, sizeof(this.buffers[1]) );
605}
606
607void main(DateFormater & this) {
608        LOOP: for() {
609                waitfor( ^?{} : this) {
610                        break LOOP;
611                }
612                or else {}
613
614
615                char buff[100];
616                Time now = timeHiRes();
617                strftime( buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now );
618                // if( options.log ) sout | "Updated date to '" | buff | "'";
619
620                for(i; KNOWN_CODES) {
621                        size_t len = snprintf( this.buffers[this.idx].strs[i].msg, 512, original_http_msgs[i], buff );
622                        this.buffers[this.idx].strs[i].len = len;
623                }
624
625                for(i; KNOWN_CODES) {
626                        https_msg_str * next = &this.buffers[this.idx].strs[i];
627                        __atomic_exchange_n((https_msg_str * volatile *)&http_msgs[i], next, __ATOMIC_SEQ_CST);
628                }
629                this.idx = (this.idx + 1) % 2;
630
631                // if( options.log ) sout | "Date thread sleeping";
632
633                sleep(1`s);
634        }
635}
636
637//=============================================================================================
638DateFormater * the_date_formatter;
639
640void init_protocol(void) {
641        the_date_formatter = alloc();
642        (*the_date_formatter){};
643}
644
645void deinit_protocol(void) {
646        ^(*the_date_formatter){};
647        free( the_date_formatter );
648}
Note: See TracBrowser for help on using the repository browser.