source: benchmark/io/http/protocol.cfa @ 93829cb

ADTast-experimentalenumpthread-emulationqualifiedEnum
Last change on this file since 93829cb was 93829cb, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Added helper function to zero out sqes.
Not done in allocate since so it can be inlined and redudant writes eliminated.

  • Property mode set to 100644
File size: 14.9 KB
Line 
1#include "protocol.hfa"
2
3#define _GNU_SOURCE
4extern "C" {
5        #include <fcntl.h>
6}
7
8#define xstr(s) str(s)
9#define str(s) #s
10
11#include <fstream.hfa>
12#include <iofwd.hfa>
13#include <io/types.hfa>
14#include <mutex_stmt.hfa>
15
16#include <assert.h>
17// #include <stdio.h> // Don't use stdio.h, too slow to compile
18extern "C" {
19      int snprintf ( char * s, size_t n, const char * format, ... );
20        // #include <linux/io_uring.h>
21}
22#include <string.h>
23#include <errno.h>
24
25#include "options.hfa"
26
27#define PLAINTEXT_1WRITE
28#define PLAINTEXT_MEMCPY
29#define PLAINTEXT_NOCOPY
30#define LINKED_IO
31
32struct https_msg_str {
33        char msg[512];
34        size_t len;
35};
36
37const https_msg_str * volatile http_msgs[KNOWN_CODES] = { 0 };
38
39_Static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0])));
40
41const int http_codes[KNOWN_CODES] = {
42        200,
43        200,
44        400,
45        404,
46        405,
47        408,
48        413,
49        414,
50};
51
52_Static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0])));
53
54int code_val(HttpCode code) {
55        return http_codes[code];
56}
57
58static inline int answer( int fd, const char * it, int len ) {
59        while(len > 0) {
60                // Call write
61                int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY);
62                if( ret < 0 ) {
63                        if( errno == ECONNRESET || errno == EPIPE ) { close(fd); return -ECONNRESET; }
64                        if( errno == EAGAIN || errno == EWOULDBLOCK) return -EAGAIN;
65
66                        abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) );
67                }
68
69                // update it/len
70                it  += ret;
71                len -= ret;
72        }
73        return 0;
74}
75
76int answer_error( int fd, HttpCode code ) {
77        /* paranoid */ assert( code < KNOWN_CODES && code != OK200 );
78        int idx = (int)code;
79        return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len );
80}
81
82static int fill_header(char * it, size_t size) {
83        memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
84        it += http_msgs[OK200]->len;
85        int len = http_msgs[OK200]->len;
86        len += snprintf(it, 512 - len, "%d \n\n", size);
87        return len;
88}
89
90static int answer_header( int fd, size_t size ) {
91        char buffer[512];
92        int len = fill_header(buffer, size);
93        return answer( fd, buffer, len );
94}
95
96#if defined(PLAINTEXT_NOCOPY)
97int answer_plaintext( int fd ) {
98        return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len); // +1 cause snprintf doesn't count nullterminator
99}
100#elif defined(PLAINTEXT_MEMCPY)
101#define TEXTSIZE 15
102int answer_plaintext( int fd ) {
103        char text[] = "Hello, World!\n\n";
104        char ts[] = xstr(TEXTSIZE) " \n\n";
105        _Static_assert(sizeof(text) - 1 == TEXTSIZE);
106        char buffer[512 + TEXTSIZE];
107        char * it = buffer;
108        memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
109        it += http_msgs[OK200]->len;
110        int len = http_msgs[OK200]->len;
111        memcpy(it, ts, sizeof(ts) - 1);
112        it += sizeof(ts) - 1;
113        len += sizeof(ts) - 1;
114        memcpy(it, text, TEXTSIZE);
115        return answer(fd, buffer, len + TEXTSIZE);
116}
117#elif defined(PLAINTEXT_1WRITE)
118int answer_plaintext( int fd ) {
119        char text[] = "Hello, World!\n\n";
120        char buffer[512 + sizeof(text)];
121        char * it = buffer;
122        memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
123        it += http_msgs[OK200]->len;
124        int len = http_msgs[OK200]->len;
125        int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text));
126        it += r;
127        len += r;
128        memcpy(it, text, sizeof(text));
129        return answer(fd, buffer, len + sizeof(text));
130}
131#else
132int answer_plaintext( int fd ) {
133        char text[] = "Hello, World!\n\n";
134        int ret = answer_header(fd, sizeof(text));
135        if( ret < 0 ) return ret;
136        return answer(fd, text, sizeof(text));
137}
138#endif
139
140int answer_empty( int fd ) {
141        return answer_header(fd, 0);
142}
143
144static int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) {
145        unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE;
146        off_t offset = 0;
147        ssize_t ret;
148        SPLICE1: while(count > 0) {
149                ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY);
150                if( ret < 0 ) {
151                        if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1;
152                        if( errno == ECONNRESET ) return -ECONNRESET;
153                        if( errno == EPIPE ) return -EPIPE;
154                        abort( "splice [0] error: (%d) %s\n", (int)errno, strerror(errno) );
155                }
156
157                count -= ret;
158                offset += ret;
159                size_t in_pipe = ret;
160                SPLICE2: while(in_pipe > 0) {
161                        ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY);
162                        if( ret < 0 ) {
163                                if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2;
164                                if( errno == ECONNRESET ) return -ECONNRESET;
165                                if( errno == EPIPE ) return -EPIPE;
166                                abort( "splice [1] error: (%d) %s\n", (int)errno, strerror(errno) );
167                        }
168                        in_pipe -= ret;
169                }
170
171        }
172        return count;
173}
174
175enum FSM_STATE {
176        Initial,
177        Retry,
178        Error,
179        Done,
180};
181
182struct FSM_Result {
183        FSM_STATE state;
184        int error;
185};
186
187static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; }
188static inline bool is_error(FSM_Result & this) { return Error == this.state; }
189static inline bool is_done(FSM_Result & this) { return Done == this.state; }
190
191static inline int error(FSM_Result & this, int error) {
192        this.error = error;
193        this.state = Error;
194        return error;
195}
196
197static inline int done(FSM_Result & this) {
198        this.state = Done;
199        return 0;
200}
201
202static inline int retry(FSM_Result & this) {
203        this.state = Retry;
204        return 0;
205}
206
207static inline int need(FSM_Result & this) {
208        switch(this.state) {
209                case Initial:
210                case Retry:
211                        return 1;
212                case Error:
213                        if(this.error == 0) mutex(serr) serr | "State marked error but code is 0";
214                case Done:
215                        return 0;
216        }
217}
218
219// Generator that handles sending the header
220generator header_g {
221        io_future_t f;
222        const char * next;
223        int fd; size_t len;
224        FSM_Result res;
225};
226
227static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) {
228        this.next = it;
229        this.fd = fd;
230        this.len = len;
231}
232
233static inline void fill(header_g & this, struct io_uring_sqe * sqe) {
234        zero_sqe(sqe);
235        sqe->opcode = IORING_OP_SEND;
236        sqe->user_data = (uintptr_t)&this.f;
237        sqe->flags = IOSQE_IO_LINK;
238        sqe->fd = this.fd;
239        sqe->addr = (uintptr_t)this.next;
240        sqe->len = this.len;
241}
242
243static inline int error(header_g & this, int error) {
244        int ret = close(this.fd);
245        if( ret != 0 ) {
246                mutex(serr) serr | "Failed to close fd" | errno;
247        }
248        return error(this.res, error);
249}
250
251static inline int wait_and_process(header_g & this) {
252        wait(this.f);
253
254        // Did something crazy happen?
255        if(this.f.result > this.len) {
256                mutex(serr) serr | "HEADER sent too much!";
257                return error(this, -ERANGE);
258        }
259
260        // Something failed?
261        if(this.f.result < 0) {
262                int error = -this.f.result;
263                if( error == ECONNRESET ) return error(this, -ECONNRESET);
264                if( error == EPIPE ) return error(this, -EPIPE);
265                if( error == ECANCELED ) {
266                        mutex(serr) serr | "HEADER was cancelled, WTF!";
267                        return error(this, -ECONNRESET);
268                }
269                if( error == EAGAIN || error == EWOULDBLOCK) {
270                        mutex(serr) serr | "HEADER got eagain, WTF!";
271                        return error(this, -ECONNRESET);
272                }
273        }
274
275        // Done?
276        if(this.f.result == this.len) {
277                return done(this.res);
278        }
279
280        // It must be a Short read
281        this.len  -= this.f.result;
282        this.next += this.f.result;
283        reset(this.f);
284        return retry(this.res);
285}
286
287// Generator that handles splicing in a file
288struct splice_in_t {
289        io_future_t f;
290        int fd; int pipe; size_t len; off_t off;
291        FSM_Result res;
292};
293
294static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) {
295        this.fd = fd;
296        this.pipe = pipe;
297        this.len = len;
298        this.off = 0;
299}
300
301static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) {
302        zero_sqe(sqe);
303        sqe->opcode = IORING_OP_SPLICE;
304        sqe->user_data = (uintptr_t)&this.f;
305        sqe->flags = 0;
306        sqe->splice_fd_in = this.fd;
307        sqe->splice_off_in = this.off;
308        sqe->fd = this.pipe;
309        sqe->off = (__u64)-1;
310        sqe->len = this.len;
311        sqe->splice_flags = SPLICE_F_MOVE;
312}
313
314static inline int wait_and_process(splice_in_t & this) {
315        wait(this.f);
316
317        // Something failed?
318        if(this.f.result < 0) {
319                int error = -this.f.result;
320                if( error == ECONNRESET ) return error(this.res, -ECONNRESET);
321                if( error == EPIPE ) return error(this.res, -EPIPE);
322                if( error == ECANCELED ) {
323                        mutex(serr) serr | "SPLICE IN was cancelled, WTF!";
324                        return error(this.res, -ECONNRESET);
325                }
326                if( error == EAGAIN || error == EWOULDBLOCK) {
327                        mutex(serr) serr | "SPLICE IN got eagain, WTF!";
328                        return error(this.res, -ECONNRESET);
329                }
330        }
331
332        // Did something crazy happen?
333        if(this.f.result > this.len) {
334                mutex(serr) serr | "SPLICE IN spliced too much!";
335                return error(this.res, -ERANGE);
336        }
337
338        // Done?
339        if(this.f.result == this.len) {
340                return done(this.res);
341        }
342
343        // It must be a Short read
344        this.len -= this.f.result;
345        this.off += this.f.result;
346        reset(this.f);
347        return retry(this.res);
348}
349
350generator splice_out_g {
351        io_future_t f;
352        int pipe; int fd; size_t len;
353        FSM_Result res;
354};
355
356static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) {
357        this.pipe = pipe;
358        this.fd = fd;
359        this.len = len;
360}
361
362static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) {
363        zero_sqe(sqe);
364        sqe->opcode = IORING_OP_SPLICE;
365        sqe->user_data = (uintptr_t)&this.f;
366        sqe->flags = 0;
367        sqe->splice_fd_in = this.pipe;
368        sqe->splice_off_in = (__u64)-1;
369        sqe->fd = this.fd;
370        sqe->off = (__u64)-1;
371        sqe->len = this.len;
372        sqe->splice_flags = SPLICE_F_MOVE;
373}
374
375static inline int error(splice_out_g & this, int error) {
376        int ret = close(this.fd);
377        if( ret != 0 ) {
378                mutex(serr) serr | "Failed to close fd" | errno;
379        }
380        return error(this.res, error);
381}
382
383static inline void wait_and_process(splice_out_g & this) {
384        wait(this.f);
385
386        // Something failed?
387        if(this.f.result < 0) {
388                int error = -this.f.result;
389                if( error == ECONNRESET ) return error(this, -ECONNRESET);
390                if( error == EPIPE ) return error(this, -EPIPE);
391                if( error == ECANCELED ) {
392                        this.f.result = 0;
393                        goto SHORT_WRITE;
394                }
395                if( error == EAGAIN || error == EWOULDBLOCK) {
396                        mutex(serr) serr | "SPLICE OUT got eagain, WTF!";
397                        return error(this, -ECONNRESET);
398                }
399        }
400
401        // Did something crazy happen?
402        if(this.f.result > this.len) {
403                mutex(serr) serr | "SPLICE OUT spliced too much!" | this.f.result | ">" | this.len;
404                return error(this.res, -ERANGE);
405        }
406
407        // Done?
408        if(this.f.result == this.len) {
409                return done(this.res);
410        }
411
412SHORT_WRITE:
413        // It must be a Short Write
414        this.len -= this.f.result;
415        reset(this.f);
416        return retry(this.res);
417}
418
419int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize ) {
420        #if defined(LINKED_IO)
421                char buffer[512];
422                int len = fill_header(buffer, fsize);
423                header_g header = { fd, buffer, len };
424                splice_in_t splice_in = { ans_fd, pipe[1], fsize };
425                splice_out_g splice_out = { pipe[0], fd, fsize };
426
427                RETRY_LOOP: for() {
428                        int have = need(header.res) + need(splice_in.res) + 1;
429                        int idx = 0;
430                        struct io_uring_sqe * sqes[3];
431                        __u32 idxs[3];
432                        struct $io_context * ctx = cfa_io_allocate(sqes, idxs, have);
433
434                        if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); }
435                        if(need(   header.res)) { fill(header   , sqes[idx++]); }
436                        fill(splice_out, sqes[idx]);
437
438                        // Submit everything
439                        asm volatile("": : :"memory");
440                        cfa_io_submit( ctx, idxs, have, false );
441
442                        // wait for the results
443                        // Always wait for splice-in to complete as
444                        // we may need to kill the connection if it fails
445                        // If it already completed, this is a no-op
446                        wait_and_process(splice_in);
447
448                        if(is_error(splice_in.res)) {
449                                mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error;
450                                close(fd);
451                        }
452
453                        // Process the other 2
454                        wait_and_process(header);
455                        wait_and_process(splice_out);
456
457                        if(is_done(splice_out.res)) {
458                                break RETRY_LOOP;
459                        }
460
461                        // We need to wait for the completion if
462                        // - both completed
463                        // - the header failed
464                        // -
465
466                        if(  is_error(header.res)
467                          || is_error(splice_in.res)
468                          || is_error(splice_out.res)) {
469                                return -ECONNRESET;
470                        }
471                }
472
473                return len + fsize;
474        #else
475                int ret = answer_header(fd, fsize);
476                if( ret < 0 ) { close(fd); return ret; }
477                return sendfile(pipe, fd, ans_fd, fsize);
478        #endif
479}
480
481[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) {
482        char * it = buffer;
483        size_t count = len - 1;
484        int rlen = 0;
485        READ:
486        for() {
487                int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
488                // int ret = read(fd, (void*)it, count);
489                if(ret == 0 ) return [OK200, true, 0, 0];
490                if(ret < 0 ) {
491                        if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ;
492                        if( errno == ECONNRESET ) { close(fd); return [E408, true, 0, 0]; }
493                        if( errno == EPIPE ) { close(fd); return [E408, true, 0, 0]; }
494                        abort( "read error: (%d) %s\n", (int)errno, strerror(errno) );
495                }
496                it[ret + 1] = '\0';
497                rlen += ret;
498
499                if( strstr( it, "\r\n\r\n" ) ) break;
500
501                it += ret;
502                count -= ret;
503
504                if( count < 1 ) return [E414, false, 0, 0];
505        }
506
507        if( options.log ) {
508                write(sout, buffer, rlen);
509                sout | nl;
510        }
511
512        it = buffer;
513        int ret = memcmp(it, "GET /", 5);
514        if( ret != 0 ) return [E400, false, 0, 0];
515        it += 5;
516
517        char * end = strstr( it, " " );
518        return [OK200, false, it, end - it];
519}
520
521//=============================================================================================
522
523#include <clock.hfa>
524#include <time.hfa>
525#include <thread.hfa>
526
527const char * original_http_msgs[] = {
528        "HTTP/1.1 200 OK\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: ",
529        "HTTP/1.1 200 OK\r\nServer: HttpForall\r\nConnection: keep-alive\r\nContent-Length: 15\r\nContent-Type: text/html\r\nDate: %s \r\n\r\nHello, World!\r\n",
530        "HTTP/1.1 400 Bad Request\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
531        "HTTP/1.1 404 Not Found\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
532        "HTTP/1.1 405 Method Not Allowed\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
533        "HTTP/1.1 408 Request Timeout\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
534        "HTTP/1.1 413 Payload Too Large\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
535        "HTTP/1.1 414 URI Too Long\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
536};
537
538struct date_buffer {
539        https_msg_str strs[KNOWN_CODES];
540};
541
542thread DateFormater {
543        int idx;
544        date_buffer buffers[2];
545};
546
547void ?{}( DateFormater & this ) {
548        ((thread&)this){ "Server Date Thread", *options.clopts.instance[0] };
549        this.idx = 0;
550        memset( &this.buffers[0], 0, sizeof(this.buffers[0]) );
551        memset( &this.buffers[1], 0, sizeof(this.buffers[1]) );
552}
553
554void main(DateFormater & this) {
555        LOOP: for() {
556                waitfor( ^?{} : this) {
557                        break LOOP;
558                }
559                or else {}
560
561
562                char buff[100];
563                Time now = timeHiRes();
564                strftime( buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now );
565                // if( options.log ) sout | "Updated date to '" | buff | "'";
566
567                for(i; KNOWN_CODES) {
568                        size_t len = snprintf( this.buffers[this.idx].strs[i].msg, 512, original_http_msgs[i], buff );
569                        this.buffers[this.idx].strs[i].len = len;
570                }
571
572                for(i; KNOWN_CODES) {
573                        https_msg_str * next = &this.buffers[this.idx].strs[i];
574                        __atomic_exchange_n((https_msg_str * volatile *)&http_msgs[i], next, __ATOMIC_SEQ_CST);
575                }
576                this.idx = (this.idx + 1) % 2;
577
578                // if( options.log ) sout | "Date thread sleeping";
579
580                sleep(1`s);
581        }
582}
583
584//=============================================================================================
585DateFormater * the_date_formatter;
586
587void init_protocol(void) {
588        the_date_formatter = alloc();
589        (*the_date_formatter){};
590}
591
592void deinit_protocol(void) {
593        ^(*the_date_formatter){};
594        free( the_date_formatter );
595}
Note: See TracBrowser for help on using the repository browser.