source: benchmark/io/http/protocol.cfa @ 52d2545

enumforall-pointer-decaypthread-emulation
Last change on this file since 52d2545 was 52d2545, checked in by Thierry Delisle <tdelisle@…>, 9 months ago

"Fix" problems with persistent connection using HTTP1.0

  • Property mode set to 100644
File size: 15.2 KB
Line 
1#include "protocol.hfa"
2
3#define _GNU_SOURCE
4extern "C" {
5        #include <fcntl.h>
6}
7
8#define xstr(s) str(s)
9#define str(s) #s
10
11#include <fstream.hfa>
12#include <iofwd.hfa>
13#include <io/types.hfa>
14#include <mutex_stmt.hfa>
15
16#include <assert.h>
17// #include <stdio.h> // Don't use stdio.h, too slow to compile
18extern "C" {
19      int snprintf ( char * s, size_t n, const char * format, ... );
20        // #include <linux/io_uring.h>
21}
22#include <string.h>
23#include <errno.h>
24
25#include "options.hfa"
26
27#define PLAINTEXT_1WRITE
28#define PLAINTEXT_MEMCPY
29#define PLAINTEXT_NOCOPY
30#define LINKED_IO
31
32struct https_msg_str {
33        char msg[512];
34        size_t len;
35};
36
37const https_msg_str * volatile http_msgs[KNOWN_CODES] = { 0 };
38
39_Static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0])));
40
41const int http_codes[KNOWN_CODES] = {
42        200,
43        200,
44        400,
45        404,
46        405,
47        408,
48        413,
49        414,
50};
51
52_Static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0])));
53
54int code_val(HttpCode code) {
55        return http_codes[code];
56}
57
58static inline int answer( int fd, const char * it, int len ) {
59        while(len > 0) {
60                // Call write
61                int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY);
62                if( ret < 0 ) {
63                        if( errno == ECONNRESET || errno == EPIPE ) { close(fd); return -ECONNRESET; }
64                        if( errno == EAGAIN || errno == EWOULDBLOCK) return -EAGAIN;
65
66                        abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) );
67                }
68
69                // update it/len
70                it  += ret;
71                len -= ret;
72        }
73        return 0;
74}
75
76int answer_error( int fd, HttpCode code ) {
77        /* paranoid */ assert( code < KNOWN_CODES && code != OK200 );
78        int idx = (int)code;
79        return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len );
80}
81
82static int fill_header(char * it, size_t size) {
83        memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
84        it += http_msgs[OK200]->len;
85        int len = http_msgs[OK200]->len;
86        len += snprintf(it, 512 - len, "%d \n\n", size);
87        return len;
88}
89
90static int answer_header( int fd, size_t size ) {
91        char buffer[512];
92        int len = fill_header(buffer, size);
93        return answer( fd, buffer, len );
94}
95
96#if defined(PLAINTEXT_NOCOPY)
97int answer_plaintext( int fd ) {
98        return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len); // +1 cause snprintf doesn't count nullterminator
99}
100#elif defined(PLAINTEXT_MEMCPY)
101#define TEXTSIZE 15
102int answer_plaintext( int fd ) {
103        char text[] = "Hello, World!\n\n";
104        char ts[] = xstr(TEXTSIZE) " \n\n";
105        _Static_assert(sizeof(text) - 1 == TEXTSIZE);
106        char buffer[512 + TEXTSIZE];
107        char * it = buffer;
108        memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
109        it += http_msgs[OK200]->len;
110        int len = http_msgs[OK200]->len;
111        memcpy(it, ts, sizeof(ts) - 1);
112        it += sizeof(ts) - 1;
113        len += sizeof(ts) - 1;
114        memcpy(it, text, TEXTSIZE);
115        return answer(fd, buffer, len + TEXTSIZE);
116}
117#elif defined(PLAINTEXT_1WRITE)
118int answer_plaintext( int fd ) {
119        char text[] = "Hello, World!\n\n";
120        char buffer[512 + sizeof(text)];
121        char * it = buffer;
122        memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
123        it += http_msgs[OK200]->len;
124        int len = http_msgs[OK200]->len;
125        int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text));
126        it += r;
127        len += r;
128        memcpy(it, text, sizeof(text));
129        return answer(fd, buffer, len + sizeof(text));
130}
131#else
132int answer_plaintext( int fd ) {
133        char text[] = "Hello, World!\n\n";
134        int ret = answer_header(fd, sizeof(text));
135        if( ret < 0 ) return ret;
136        return answer(fd, text, sizeof(text));
137}
138#endif
139
140int answer_empty( int fd ) {
141        return answer_header(fd, 0);
142}
143
144static int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) {
145        unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE;
146        off_t offset = 0;
147        ssize_t ret;
148        SPLICE1: while(count > 0) {
149                ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY);
150                if( ret < 0 ) {
151                        if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1;
152                        if( errno == ECONNRESET ) return -ECONNRESET;
153                        if( errno == EPIPE ) return -EPIPE;
154                        abort( "splice [0] error: (%d) %s\n", (int)errno, strerror(errno) );
155                }
156
157                count -= ret;
158                offset += ret;
159                size_t in_pipe = ret;
160                SPLICE2: while(in_pipe > 0) {
161                        ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY);
162                        if( ret < 0 ) {
163                                if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2;
164                                if( errno == ECONNRESET ) return -ECONNRESET;
165                                if( errno == EPIPE ) return -EPIPE;
166                                abort( "splice [1] error: (%d) %s\n", (int)errno, strerror(errno) );
167                        }
168                        in_pipe -= ret;
169                }
170
171        }
172        return count;
173}
174
175static void zero_sqe(struct io_uring_sqe * sqe) {
176        sqe->flags = 0;
177        sqe->ioprio = 0;
178        sqe->fd = 0;
179        sqe->off = 0;
180        sqe->addr = 0;
181        sqe->len = 0;
182        sqe->fsync_flags = 0;
183        sqe->__pad2[0] = 0;
184        sqe->__pad2[1] = 0;
185        sqe->__pad2[2] = 0;
186        sqe->fd = 0;
187        sqe->off = 0;
188        sqe->addr = 0;
189        sqe->len = 0;
190}
191
192enum FSM_STATE {
193        Initial,
194        Retry,
195        Error,
196        Done,
197};
198
199struct FSM_Result {
200        FSM_STATE state;
201        int error;
202};
203
204static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; }
205static inline bool is_error(FSM_Result & this) { return Error == this.state; }
206static inline bool is_done(FSM_Result & this) { return Done == this.state; }
207
208static inline int error(FSM_Result & this, int error) {
209        this.error = error;
210        this.state = Error;
211        return error;
212}
213
214static inline int done(FSM_Result & this) {
215        this.state = Done;
216        return 0;
217}
218
219static inline int retry(FSM_Result & this) {
220        this.state = Retry;
221        return 0;
222}
223
224static inline int need(FSM_Result & this) {
225        switch(this.state) {
226                case Initial:
227                case Retry:
228                        return 1;
229                case Error:
230                        if(this.error == 0) mutex(serr) serr | "State marked error but code is 0";
231                case Done:
232                        return 0;
233        }
234}
235
236// Generator that handles sending the header
237generator header_g {
238        io_future_t f;
239        const char * next;
240        int fd; size_t len;
241        FSM_Result res;
242};
243
244static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) {
245        this.next = it;
246        this.fd = fd;
247        this.len = len;
248}
249
250static inline void fill(header_g & this, struct io_uring_sqe * sqe) {
251        zero_sqe(sqe);
252        sqe->opcode = IORING_OP_SEND;
253        sqe->user_data = (uintptr_t)&this.f;
254        sqe->flags = IOSQE_IO_LINK;
255        sqe->fd = this.fd;
256        sqe->addr = (uintptr_t)this.next;
257        sqe->len = this.len;
258}
259
260static inline int error(header_g & this, int error) {
261        int ret = close(this.fd);
262        if( ret != 0 ) {
263                mutex(serr) serr | "Failed to close fd" | errno;
264        }
265        return error(this.res, error);
266}
267
268static inline int wait_and_process(header_g & this) {
269        wait(this.f);
270
271        // Did something crazy happen?
272        if(this.f.result > this.len) {
273                mutex(serr) serr | "HEADER sent too much!";
274                return error(this, -ERANGE);
275        }
276
277        // Something failed?
278        if(this.f.result < 0) {
279                int error = -this.f.result;
280                if( error == ECONNRESET ) return error(this, -ECONNRESET);
281                if( error == EPIPE ) return error(this, -EPIPE);
282                if( error == ECANCELED ) {
283                        mutex(serr) serr | "HEADER was cancelled, WTF!";
284                        return error(this, -ECONNRESET);
285                }
286                if( error == EAGAIN || error == EWOULDBLOCK) {
287                        mutex(serr) serr | "HEADER got eagain, WTF!";
288                        return error(this, -ECONNRESET);
289                }
290        }
291
292        // Done?
293        if(this.f.result == this.len) {
294                return done(this.res);
295        }
296
297        // It must be a Short read
298        this.len  -= this.f.result;
299        this.next += this.f.result;
300        reset(this.f);
301        return retry(this.res);
302}
303
304// Generator that handles splicing in a file
305struct splice_in_t {
306        io_future_t f;
307        int fd; int pipe; size_t len; off_t off;
308        FSM_Result res;
309};
310
311static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) {
312        this.fd = fd;
313        this.pipe = pipe;
314        this.len = len;
315        this.off = 0;
316}
317
318static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) {
319        zero_sqe(sqe);
320        sqe->opcode = IORING_OP_SPLICE;
321        sqe->user_data = (uintptr_t)&this.f;
322        sqe->flags = 0;
323        sqe->splice_fd_in = this.fd;
324        sqe->splice_off_in = this.off;
325        sqe->fd = this.pipe;
326        sqe->off = (__u64)-1;
327        sqe->len = this.len;
328        sqe->splice_flags = SPLICE_F_MOVE;
329}
330
331static inline int wait_and_process(splice_in_t & this) {
332        wait(this.f);
333
334        // Did something crazy happen?
335        if(this.f.result > this.len) {
336                mutex(serr) serr | "SPLICE IN spliced too much!";
337                return error(this.res, -ERANGE);
338        }
339
340        // Something failed?
341        if(this.f.result < 0) {
342                int error = -this.f.result;
343                if( error == ECONNRESET ) return error(this.res, -ECONNRESET);
344                if( error == EPIPE ) return error(this.res, -EPIPE);
345                if( error == ECANCELED ) {
346                        mutex(serr) serr | "SPLICE IN was cancelled, WTF!";
347                        return error(this.res, -ECONNRESET);
348                }
349                if( error == EAGAIN || error == EWOULDBLOCK) {
350                        mutex(serr) serr | "SPLICE IN got eagain, WTF!";
351                        return error(this.res, -ECONNRESET);
352                }
353        }
354
355        // Done?
356        if(this.f.result == this.len) {
357                return done(this.res);
358        }
359
360        // It must be a Short read
361        this.len -= this.f.result;
362        this.off += this.f.result;
363        reset(this.f);
364        return retry(this.res);
365}
366
367generator splice_out_g {
368        io_future_t f;
369        int pipe; int fd; size_t len;
370        FSM_Result res;
371};
372
373static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) {
374        this.pipe = pipe;
375        this.fd = fd;
376        this.len = len;
377}
378
379static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) {
380        zero_sqe(sqe);
381        sqe->opcode = IORING_OP_SPLICE;
382        sqe->user_data = (uintptr_t)&this.f;
383        sqe->flags = 0;
384        sqe->splice_fd_in = this.pipe;
385        sqe->splice_off_in = (__u64)-1;
386        sqe->fd = this.fd;
387        sqe->off = (__u64)-1;
388        sqe->len = this.len;
389        sqe->splice_flags = SPLICE_F_MOVE;
390}
391
392static inline int error(splice_out_g & this, int error) {
393        int ret = close(this.fd);
394        if( ret != 0 ) {
395                mutex(serr) serr | "Failed to close fd" | errno;
396        }
397        return error(this.res, error);
398}
399
400static inline void wait_and_process(splice_out_g & this) {
401        wait(this.f);
402
403        // Did something crazy happen?
404        if(this.f.result > this.len) {
405                mutex(serr) serr | "SPLICE OUT spliced too much!";
406                return error(this.res, -ERANGE);
407        }
408
409        // Something failed?
410        if(this.f.result < 0) {
411                int error = -this.f.result;
412                if( error == ECONNRESET ) return error(this, -ECONNRESET);
413                if( error == EPIPE ) return error(this, -EPIPE);
414                if( error == ECANCELED ) {
415                        this.f.result = 0;
416                        goto SHORT_WRITE;
417                }
418                if( error == EAGAIN || error == EWOULDBLOCK) {
419                        mutex(serr) serr | "SPLICE OUT got eagain, WTF!";
420                        return error(this, -ECONNRESET);
421                }
422        }
423
424        // Done?
425        if(this.f.result == this.len) {
426                return done(this.res);
427        }
428
429SHORT_WRITE:
430        // It must be a Short Write
431        this.len -= this.f.result;
432        reset(this.f);
433        return retry(this.res);
434}
435
436int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize ) {
437        #if defined(LINKED_IO)
438                char buffer[512];
439                int len = fill_header(buffer, fsize);
440                header_g header = { fd, buffer, len };
441                splice_in_t splice_in = { ans_fd, pipe[1], fsize };
442                splice_out_g splice_out = { pipe[0], fd, fsize };
443
444                RETRY_LOOP: for() {
445                        int have = need(header.res) + need(splice_in.res) + 1;
446                        int idx = 0;
447                        struct io_uring_sqe * sqes[3];
448                        __u32 idxs[3];
449                        struct $io_context * ctx = cfa_io_allocate(sqes, idxs, have);
450
451                        if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); }
452                        if(need(   header.res)) { fill(header   , sqes[idx++]); }
453                        fill(splice_out, sqes[idx]);
454
455                        // Submit everything
456                        asm volatile("": : :"memory");
457                        cfa_io_submit( ctx, idxs, have, false );
458
459                        // wait for the results
460                        // Always wait for splice-in to complete as
461                        // we may need to kill the connection if it fails
462                        // If it already completed, this is a no-op
463                        wait_and_process(splice_in);
464
465                        if(is_error(splice_in.res)) {
466                                mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error;
467                                close(fd);
468                        }
469
470                        // Process the other 2
471                        wait_and_process(header);
472                        wait_and_process(splice_out);
473
474                        if(is_done(splice_out.res)) {
475                                break RETRY_LOOP;
476                        }
477
478                        // We need to wait for the completion if
479                        // - both completed
480                        // - the header failed
481                        // -
482
483                        if(  is_error(header.res)
484                          || is_error(splice_in.res)
485                          || is_error(splice_out.res)) {
486                                return -ECONNRESET;
487                        }
488                }
489
490                return len + fsize;
491        #else
492                int ret = answer_header(fd, fsize);
493                if( ret < 0 ) { close(fd); return ret; }
494                return sendfile(pipe, fd, ans_fd, fsize);
495        #endif
496}
497
498[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) {
499        char * it = buffer;
500        size_t count = len - 1;
501        int rlen = 0;
502        READ:
503        for() {
504                int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
505                // int ret = read(fd, (void*)it, count);
506                if(ret == 0 ) return [OK200, true, 0, 0];
507                if(ret < 0 ) {
508                        if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ;
509                        if( errno == ECONNRESET ) { close(fd); return [E408, true, 0, 0]; }
510                        if( errno == EPIPE ) { close(fd); return [E408, true, 0, 0]; }
511                        abort( "read error: (%d) %s\n", (int)errno, strerror(errno) );
512                }
513                it[ret + 1] = '\0';
514                rlen += ret;
515
516                if( strstr( it, "\r\n\r\n" ) ) break;
517
518                it += ret;
519                count -= ret;
520
521                if( count < 1 ) return [E414, false, 0, 0];
522        }
523
524        if( options.log ) {
525                write(sout, buffer, rlen);
526                sout | nl;
527        }
528
529        it = buffer;
530        int ret = memcmp(it, "GET /", 5);
531        if( ret != 0 ) return [E400, false, 0, 0];
532        it += 5;
533
534        char * end = strstr( it, " " );
535        return [OK200, false, it, end - it];
536}
537
538//=============================================================================================
539
540#include <clock.hfa>
541#include <time.hfa>
542#include <thread.hfa>
543
544const char * original_http_msgs[] = {
545        "HTTP/1.1 200 OK\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: ",
546        "HTTP/1.1 200 OK\r\nServer: HttpForall\r\nConnection: keep-alive\r\nContent-Length: 15\r\nContent-Type: text/html\r\nDate: %s \r\n\r\nHello, World!\r\n",
547        "HTTP/1.1 400 Bad Request\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
548        "HTTP/1.1 404 Not Found\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
549        "HTTP/1.1 405 Method Not Allowed\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
550        "HTTP/1.1 408 Request Timeout\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
551        "HTTP/1.1 413 Payload Too Large\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
552        "HTTP/1.1 414 URI Too Long\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
553};
554
555struct date_buffer {
556        https_msg_str strs[KNOWN_CODES];
557};
558
559thread DateFormater {
560        int idx;
561        date_buffer buffers[2];
562};
563
564void ?{}( DateFormater & this ) {
565        ((thread&)this){ "Server Date Thread", *options.clopts.instance[0] };
566        this.idx = 0;
567        memset( &this.buffers[0], 0, sizeof(this.buffers[0]) );
568        memset( &this.buffers[1], 0, sizeof(this.buffers[1]) );
569}
570
571void main(DateFormater & this) {
572        LOOP: for() {
573                waitfor( ^?{} : this) {
574                        break LOOP;
575                }
576                or else {}
577
578
579                char buff[100];
580                Time now = timeHiRes();
581                strftime( buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now );
582                // if( options.log ) sout | "Updated date to '" | buff | "'";
583
584                for(i; KNOWN_CODES) {
585                        size_t len = snprintf( this.buffers[this.idx].strs[i].msg, 512, original_http_msgs[i], buff );
586                        this.buffers[this.idx].strs[i].len = len;
587                }
588
589                for(i; KNOWN_CODES) {
590                        https_msg_str * next = &this.buffers[this.idx].strs[i];
591                        __atomic_exchange_n((https_msg_str * volatile *)&http_msgs[i], next, __ATOMIC_SEQ_CST);
592                }
593                this.idx = (this.idx + 1) % 2;
594
595                // if( options.log ) sout | "Date thread sleeping";
596
597                sleep(1`s);
598        }
599}
600
601//=============================================================================================
602DateFormater * the_date_formatter;
603
604void init_protocol(void) {
605        the_date_formatter = alloc();
606        (*the_date_formatter){};
607}
608
609void deinit_protocol(void) {
610        ^(*the_date_formatter){};
611        free( the_date_formatter );
612}
Note: See TracBrowser for help on using the repository browser.