source: benchmark/io/http/protocol.cfa@ c4072d8e

ADT ast-experimental pthread-emulation qualifiedEnum
Last change on this file since c4072d8e was bf7c7ea, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Moved printer and protocol thread to the main cluster

  • Property mode set to 100644
File size: 15.9 KB
Line 
1#include "protocol.hfa"
2
3#define _GNU_SOURCE
4extern "C" {
5 #include <fcntl.h>
6}
7
8#define xstr(s) str(s)
9#define str(s) #s
10
11#include <fstream.hfa>
12#include <iofwd.hfa>
13#include <io/types.hfa>
14#include <mutex_stmt.hfa>
15
16#include <assert.h>
17// #include <stdio.h> // Don't use stdio.h, too slow to compile
18extern "C" {
19 int snprintf ( char * s, size_t n, const char * format, ... );
20 // #include <linux/io_uring.h>
21}
22#include <string.h>
23#include <errno.h>
24
25#include "options.hfa"
26#include "worker.hfa"
27
28#define PLAINTEXT_1WRITE
29#define PLAINTEXT_MEMCPY
30#define PLAINTEXT_NOCOPY
31#define LINKED_IO
32
33static inline __s32 wait_res( io_future_t & this ) {
34 wait( this );
35 if( this.result < 0 ) {{
36 errno = -this.result;
37 return -1;
38 }}
39 return this.result;
40}
41
42struct https_msg_str {
43 char msg[512];
44 size_t len;
45};
46
47const https_msg_str * volatile http_msgs[KNOWN_CODES] = { 0 };
48
49_Static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0])));
50
51const int http_codes[KNOWN_CODES] = {
52 200,
53 200,
54 400,
55 404,
56 405,
57 408,
58 413,
59 414,
60};
61
62_Static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0])));
63
64int code_val(HttpCode code) {
65 return http_codes[code];
66}
67
68static inline int answer( int fd, const char * it, int len ) {
69 while(len > 0) {
70 // Call write
71 int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY);
72 if( ret < 0 ) {
73 if( errno == ECONNRESET || errno == EPIPE ) { close(fd); return -ECONNRESET; }
74 if( errno == EAGAIN || errno == EWOULDBLOCK) return -EAGAIN;
75
76 abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) );
77 }
78
79 // update it/len
80 it += ret;
81 len -= ret;
82 }
83 return 0;
84}
85
86int answer_error( int fd, HttpCode code ) {
87 /* paranoid */ assert( code < KNOWN_CODES && code != OK200 );
88 int idx = (int)code;
89 return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len );
90}
91
92static int fill_header(char * it, size_t size) {
93 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
94 it += http_msgs[OK200]->len;
95 int len = http_msgs[OK200]->len;
96 len += snprintf(it, 512 - len, "%d \n\n", size);
97 return len;
98}
99
100static int answer_header( int fd, size_t size ) {
101 char buffer[512];
102 int len = fill_header(buffer, size);
103 return answer( fd, buffer, len );
104}
105
106#if defined(PLAINTEXT_NOCOPY)
107int answer_plaintext( int fd ) {
108 return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len); // +1 cause snprintf doesn't count nullterminator
109}
110#elif defined(PLAINTEXT_MEMCPY)
111#define TEXTSIZE 15
112int answer_plaintext( int fd ) {
113 char text[] = "Hello, World!\n\n";
114 char ts[] = xstr(TEXTSIZE) " \n\n";
115 _Static_assert(sizeof(text) - 1 == TEXTSIZE);
116 char buffer[512 + TEXTSIZE];
117 char * it = buffer;
118 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
119 it += http_msgs[OK200]->len;
120 int len = http_msgs[OK200]->len;
121 memcpy(it, ts, sizeof(ts) - 1);
122 it += sizeof(ts) - 1;
123 len += sizeof(ts) - 1;
124 memcpy(it, text, TEXTSIZE);
125 return answer(fd, buffer, len + TEXTSIZE);
126}
127#elif defined(PLAINTEXT_1WRITE)
128int answer_plaintext( int fd ) {
129 char text[] = "Hello, World!\n\n";
130 char buffer[512 + sizeof(text)];
131 char * it = buffer;
132 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
133 it += http_msgs[OK200]->len;
134 int len = http_msgs[OK200]->len;
135 int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text));
136 it += r;
137 len += r;
138 memcpy(it, text, sizeof(text));
139 return answer(fd, buffer, len + sizeof(text));
140}
141#else
142int answer_plaintext( int fd ) {
143 char text[] = "Hello, World!\n\n";
144 int ret = answer_header(fd, sizeof(text));
145 if( ret < 0 ) return ret;
146 return answer(fd, text, sizeof(text));
147}
148#endif
149
150int answer_empty( int fd ) {
151 return answer_header(fd, 0);
152}
153
154static int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) {
155 unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE;
156 off_t offset = 0;
157 ssize_t ret;
158 SPLICE1: while(count > 0) {
159 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY);
160 if( ret < 0 ) {
161 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1;
162 if( errno == ECONNRESET ) return -ECONNRESET;
163 if( errno == EPIPE ) return -EPIPE;
164 abort( "splice [0] error: (%d) %s\n", (int)errno, strerror(errno) );
165 }
166
167 count -= ret;
168 size_t in_pipe = ret;
169 SPLICE2: while(in_pipe > 0) {
170 ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY);
171 if( ret < 0 ) {
172 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2;
173 if( errno == ECONNRESET ) return -ECONNRESET;
174 if( errno == EPIPE ) return -EPIPE;
175 abort( "splice [1] error: (%d) %s\n", (int)errno, strerror(errno) );
176 }
177 in_pipe -= ret;
178 }
179
180 }
181 return count;
182}
183
184enum FSM_STATE {
185 Initial,
186 Retry,
187 Error,
188 Done,
189};
190
191struct FSM_Result {
192 FSM_STATE state;
193 int error;
194};
195
196static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; }
197static inline bool is_error(FSM_Result & this) { return Error == this.state; }
198static inline bool is_done(FSM_Result & this) { return Done == this.state; }
199
200static inline int error(FSM_Result & this, int error) {
201 this.error = error;
202 this.state = Error;
203 return error;
204}
205
206static inline int done(FSM_Result & this) {
207 this.state = Done;
208 return 0;
209}
210
211static inline int retry(FSM_Result & this) {
212 this.state = Retry;
213 return 0;
214}
215
216static inline int need(FSM_Result & this) {
217 switch(this.state) {
218 case Initial:
219 case Retry:
220 return 1;
221 case Error:
222 if(this.error == 0) mutex(serr) serr | "State marked error but code is 0";
223 case Done:
224 return 0;
225 }
226}
227
228// Generator that handles sending the header
229generator header_g {
230 io_future_t f;
231 const char * next;
232 int fd; size_t len;
233 FSM_Result res;
234};
235
236static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) {
237 this.next = it;
238 this.fd = fd;
239 this.len = len;
240}
241
242static inline void fill(header_g & this, struct io_uring_sqe * sqe) {
243 zero_sqe(sqe);
244 sqe->opcode = IORING_OP_SEND;
245 sqe->user_data = (uintptr_t)&this.f;
246 sqe->flags = IOSQE_IO_LINK;
247 sqe->fd = this.fd;
248 sqe->addr = (uintptr_t)this.next;
249 sqe->len = this.len;
250}
251
252static inline int error(header_g & this, int error) {
253 int ret = close(this.fd);
254 if( ret != 0 ) {
255 mutex(serr) serr | "Failed to close fd" | errno;
256 }
257 return error(this.res, error);
258}
259
260static inline int wait_and_process(header_g & this, sendfile_stats_t & stats) {
261 wait(this.f);
262
263 // Did something crazy happen?
264 if(this.f.result > this.len) {
265 mutex(serr) serr | "HEADER sent too much!";
266 return error(this, -ERANGE);
267 }
268
269 // Something failed?
270 if(this.f.result < 0) {
271 int error = -this.f.result;
272 if( error == ECONNRESET ) return error(this, -ECONNRESET);
273 if( error == EPIPE ) return error(this, -EPIPE);
274 if( error == ECANCELED ) {
275 mutex(serr) serr | "HEADER was cancelled, WTF!";
276 return error(this, -ECONNRESET);
277 }
278 if( error == EAGAIN || error == EWOULDBLOCK) {
279 mutex(serr) serr | "HEADER got eagain, WTF!";
280 return error(this, -ECONNRESET);
281 }
282 }
283
284 // Done?
285 if(this.f.result == this.len) {
286 return done(this.res);
287 }
288
289 stats.header++;
290
291 // It must be a Short read
292 this.len -= this.f.result;
293 this.next += this.f.result;
294 reset(this.f);
295 return retry(this.res);
296}
297
298// Generator that handles splicing in a file
299struct splice_in_t {
300 io_future_t f;
301 int fd; int pipe; size_t len; off_t off;
302 short zipf_idx;
303 FSM_Result res;
304};
305
306static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) {
307 this.fd = fd;
308 this.pipe = pipe;
309 this.len = len;
310 this.off = 0;
311 this.zipf_idx = -1;
312 STATS: for(i; zipf_cnts) {
313 if(len <= zipf_sizes[i]) {
314 this.zipf_idx = i;
315 break STATS;
316 }
317 }
318 if(this.zipf_idx < 0) mutex(serr) serr | "SPLICE IN" | len | " greated than biggest zipf file";
319}
320
321static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) {
322 zero_sqe(sqe);
323 sqe->opcode = IORING_OP_SPLICE;
324 sqe->user_data = (uintptr_t)&this.f;
325 sqe->flags = 0;
326 sqe->splice_fd_in = this.fd;
327 sqe->splice_off_in = this.off;
328 sqe->fd = this.pipe;
329 sqe->off = (__u64)-1;
330 sqe->len = this.len;
331 sqe->splice_flags = SPLICE_F_MOVE;
332}
333
334static inline int wait_and_process(splice_in_t & this, sendfile_stats_t & stats ) {
335 wait(this.f);
336
337 // Something failed?
338 if(this.f.result < 0) {
339 int error = -this.f.result;
340 if( error == ECONNRESET ) return error(this.res, -ECONNRESET);
341 if( error == EPIPE ) return error(this.res, -EPIPE);
342 if( error == ECANCELED ) {
343 mutex(serr) serr | "SPLICE IN was cancelled, WTF!";
344 return error(this.res, -ECONNRESET);
345 }
346 if( error == EAGAIN || error == EWOULDBLOCK) {
347 mutex(serr) serr | "SPLICE IN got eagain, WTF!";
348 return error(this.res, -ECONNRESET);
349 }
350 mutex(serr) serr | "SPLICE IN got" | error | ", WTF!";
351 return error(this.res, -ECONNRESET);
352 }
353
354 // Did something crazy happen?
355 if(this.f.result > this.len) {
356 mutex(serr) serr | "SPLICE IN spliced too much!";
357 return error(this.res, -ERANGE);
358 }
359
360 // Done?
361 if(this.f.result == this.len) {
362 return done(this.res);
363 }
364
365 stats.splcin++;
366 stats.avgrd[this.zipf_idx].calls++;
367 stats.avgrd[this.zipf_idx].bytes += this.f.result;
368
369 // It must be a Short read
370 this.len -= this.f.result;
371 this.off += this.f.result;
372 reset(this.f);
373 return retry(this.res);
374}
375
376generator splice_out_g {
377 io_future_t f;
378 int pipe; int fd; size_t len;
379 FSM_Result res;
380};
381
382static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) {
383 this.pipe = pipe;
384 this.fd = fd;
385 this.len = len;
386}
387
388static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) {
389 zero_sqe(sqe);
390 sqe->opcode = IORING_OP_SPLICE;
391 sqe->user_data = (uintptr_t)&this.f;
392 sqe->flags = 0;
393 sqe->splice_fd_in = this.pipe;
394 sqe->splice_off_in = (__u64)-1;
395 sqe->fd = this.fd;
396 sqe->off = (__u64)-1;
397 sqe->len = this.len;
398 sqe->splice_flags = SPLICE_F_MOVE;
399}
400
401static inline int error(splice_out_g & this, int error) {
402 int ret = close(this.fd);
403 if( ret != 0 ) {
404 mutex(serr) serr | "Failed to close fd" | errno;
405 }
406 return error(this.res, error);
407}
408
409static inline void wait_and_process(splice_out_g & this, sendfile_stats_t & stats ) {
410 wait(this.f);
411
412 // Something failed?
413 if(this.f.result < 0) {
414 int error = -this.f.result;
415 if( error == ECONNRESET ) return error(this, -ECONNRESET);
416 if( error == EPIPE ) return error(this, -EPIPE);
417 if( error == ECANCELED ) {
418 this.f.result = 0;
419 goto SHORT_WRITE;
420 }
421 if( error == EAGAIN || error == EWOULDBLOCK) {
422 mutex(serr) serr | "SPLICE OUT got eagain, WTF!";
423 return error(this, -ECONNRESET);
424 }
425 mutex(serr) serr | "SPLICE OUT got" | error | ", WTF!";
426 return error(this, -ECONNRESET);
427 }
428
429 // Did something crazy happen?
430 if(this.f.result > this.len) {
431 mutex(serr) serr | "SPLICE OUT spliced too much!" | this.f.result | ">" | this.len;
432 return error(this.res, -ERANGE);
433 }
434
435 // Done?
436 if(this.f.result == this.len) {
437 return done(this.res);
438 }
439
440SHORT_WRITE:
441 stats.splcot++;
442
443 // It must be a Short Write
444 this.len -= this.f.result;
445 reset(this.f);
446 return retry(this.res);
447}
448
449int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize, sendfile_stats_t & stats ) {
450 stats.calls++;
451 #if defined(LINKED_IO)
452 char buffer[512];
453 int len = fill_header(buffer, fsize);
454 header_g header = { fd, buffer, len };
455 splice_in_t splice_in = { ans_fd, pipe[1], fsize };
456 splice_out_g splice_out = { pipe[0], fd, fsize };
457
458 RETRY_LOOP: for() {
459 stats.tries++;
460 int have = need(header.res) + need(splice_in.res) + 1;
461 int idx = 0;
462 struct io_uring_sqe * sqes[3];
463 __u32 idxs[3];
464 struct $io_context * ctx = cfa_io_allocate(sqes, idxs, have);
465
466 if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); }
467 if(need( header.res)) { fill(header , sqes[idx++]); }
468 fill(splice_out, sqes[idx]);
469
470 // Submit everything
471 asm volatile("": : :"memory");
472 cfa_io_submit( ctx, idxs, have, false );
473
474 // wait for the results
475 // Always wait for splice-in to complete as
476 // we may need to kill the connection if it fails
477 // If it already completed, this is a no-op
478 wait_and_process(splice_in, stats);
479
480 if(is_error(splice_in.res)) {
481 if(splice_in.res.error == -EPIPE) return -ECONNRESET;
482 mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error;
483 close(fd);
484 }
485
486 // Process the other 2
487 wait_and_process(header, stats);
488 wait_and_process(splice_out, stats);
489
490 if(is_done(splice_out.res)) {
491 break RETRY_LOOP;
492 }
493
494 // We need to wait for the completion if
495 // - both completed
496 // - the header failed
497 // -
498
499 if( is_error(header.res)
500 || is_error(splice_in.res)
501 || is_error(splice_out.res)) {
502 return -ECONNRESET;
503 }
504 }
505
506 return len + fsize;
507 #else
508 stats.tries++;
509 int ret = answer_header(fd, fsize);
510 if( ret < 0 ) { close(fd); return ret; }
511 return sendfile(pipe, fd, ans_fd, fsize);
512 #endif
513}
514
515[HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f) {
516 char * it = buffer;
517 size_t count = len - 1;
518 int rlen = 0;
519 READ:
520 for() {
521 int ret;
522 if( f ) {
523 ret = wait_res(*f);
524 reset(*f);
525 f = 0p;
526 } else {
527 ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
528 }
529 // int ret = read(fd, (void*)it, count);
530 if(ret == 0 ) return [OK200, true, 0, 0];
531 if(ret < 0 ) {
532 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ;
533 if( errno == ECONNRESET ) { close(fd); return [E408, true, 0, 0]; }
534 if( errno == EPIPE ) { close(fd); return [E408, true, 0, 0]; }
535 abort( "read error: (%d) %s\n", (int)errno, strerror(errno) );
536 }
537 it[ret + 1] = '\0';
538 rlen += ret;
539
540 if( strstr( it, "\r\n\r\n" ) ) break;
541
542 it += ret;
543 count -= ret;
544
545 if( count < 1 ) return [E414, false, 0, 0];
546 }
547
548 if( options.log ) {
549 write(sout, buffer, rlen);
550 sout | nl;
551 }
552
553 it = buffer;
554 int ret = memcmp(it, "GET /", 5);
555 if( ret != 0 ) return [E400, false, 0, 0];
556 it += 5;
557
558 char * end = strstr( it, " " );
559 return [OK200, false, it, end - it];
560}
561
562//=============================================================================================
563
564#include <clock.hfa>
565#include <time.hfa>
566#include <thread.hfa>
567
568const char * original_http_msgs[] = {
569 "HTTP/1.1 200 OK\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: ",
570 "HTTP/1.1 200 OK\r\nServer: HttpForall\r\nConnection: keep-alive\r\nContent-Length: 15\r\nContent-Type: text/html\r\nDate: %s \r\n\r\nHello, World!\r\n",
571 "HTTP/1.1 400 Bad Request\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
572 "HTTP/1.1 404 Not Found\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
573 "HTTP/1.1 405 Method Not Allowed\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
574 "HTTP/1.1 408 Request Timeout\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
575 "HTTP/1.1 413 Payload Too Large\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
576 "HTTP/1.1 414 URI Too Long\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
577};
578
579struct date_buffer {
580 https_msg_str strs[KNOWN_CODES];
581};
582
583thread DateFormater {
584 int idx;
585 date_buffer buffers[2];
586};
587
588void ?{}( DateFormater & this ) {
589 ((thread&)this){ "Server Date Thread" };
590 this.idx = 0;
591 memset( &this.buffers[0], 0, sizeof(this.buffers[0]) );
592 memset( &this.buffers[1], 0, sizeof(this.buffers[1]) );
593}
594
595void main(DateFormater & this) {
596 LOOP: for() {
597 waitfor( ^?{} : this) {
598 break LOOP;
599 }
600 or else {}
601
602
603 char buff[100];
604 Time now = timeHiRes();
605 strftime( buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now );
606 // if( options.log ) sout | "Updated date to '" | buff | "'";
607
608 for(i; KNOWN_CODES) {
609 size_t len = snprintf( this.buffers[this.idx].strs[i].msg, 512, original_http_msgs[i], buff );
610 this.buffers[this.idx].strs[i].len = len;
611 }
612
613 for(i; KNOWN_CODES) {
614 https_msg_str * next = &this.buffers[this.idx].strs[i];
615 __atomic_exchange_n((https_msg_str * volatile *)&http_msgs[i], next, __ATOMIC_SEQ_CST);
616 }
617 this.idx = (this.idx + 1) % 2;
618
619 // if( options.log ) sout | "Date thread sleeping";
620
621 sleep(1`s);
622 }
623}
624
625//=============================================================================================
626DateFormater * the_date_formatter;
627
628void init_protocol(void) {
629 the_date_formatter = alloc();
630 (*the_date_formatter){};
631}
632
633void deinit_protocol(void) {
634 ^(*the_date_formatter){};
635 free( the_date_formatter );
636}
Note: See TracBrowser for help on using the repository browser.