source: benchmark/io/http/protocol.cfa@ 2e9b59b

ADT ast-experimental pthread-emulation qualifiedEnum
Last change on this file since 2e9b59b was d672350, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

  • Property mode set to 100644
File size: 15.6 KB
Line 
1#include "protocol.hfa"
2
3#define _GNU_SOURCE
4extern "C" {
5 #include <fcntl.h>
6}
7
8#define xstr(s) str(s)
9#define str(s) #s
10
11#include <fstream.hfa>
12#include <iofwd.hfa>
13#include <io/types.hfa>
14#include <mutex_stmt.hfa>
15
16#include <assert.h>
17// #include <stdio.h> // Don't use stdio.h, too slow to compile
18extern "C" {
19 int snprintf ( char * s, size_t n, const char * format, ... );
20 // #include <linux/io_uring.h>
21}
22#include <string.h>
23#include <errno.h>
24
25#include "options.hfa"
26#include "worker.hfa"
27
28#define PLAINTEXT_1WRITE
29#define PLAINTEXT_MEMCPY
30#define PLAINTEXT_NOCOPY
31#define LINKED_IO
32
33struct https_msg_str {
34 char msg[512];
35 size_t len;
36};
37
38const https_msg_str * volatile http_msgs[KNOWN_CODES] = { 0 };
39
40_Static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0])));
41
42const int http_codes[KNOWN_CODES] = {
43 200,
44 200,
45 400,
46 404,
47 405,
48 408,
49 413,
50 414,
51};
52
53_Static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0])));
54
55int code_val(HttpCode code) {
56 return http_codes[code];
57}
58
59static inline int answer( int fd, const char * it, int len ) {
60 while(len > 0) {
61 // Call write
62 int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY);
63 if( ret < 0 ) {
64 if( errno == ECONNRESET || errno == EPIPE ) { close(fd); return -ECONNRESET; }
65 if( errno == EAGAIN || errno == EWOULDBLOCK) return -EAGAIN;
66
67 abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) );
68 }
69
70 // update it/len
71 it += ret;
72 len -= ret;
73 }
74 return 0;
75}
76
77int answer_error( int fd, HttpCode code ) {
78 /* paranoid */ assert( code < KNOWN_CODES && code != OK200 );
79 int idx = (int)code;
80 return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len );
81}
82
83static int fill_header(char * it, size_t size) {
84 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
85 it += http_msgs[OK200]->len;
86 int len = http_msgs[OK200]->len;
87 len += snprintf(it, 512 - len, "%d \n\n", size);
88 return len;
89}
90
91static int answer_header( int fd, size_t size ) {
92 char buffer[512];
93 int len = fill_header(buffer, size);
94 return answer( fd, buffer, len );
95}
96
97#if defined(PLAINTEXT_NOCOPY)
98int answer_plaintext( int fd ) {
99 return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len); // +1 cause snprintf doesn't count nullterminator
100}
101#elif defined(PLAINTEXT_MEMCPY)
102#define TEXTSIZE 15
103int answer_plaintext( int fd ) {
104 char text[] = "Hello, World!\n\n";
105 char ts[] = xstr(TEXTSIZE) " \n\n";
106 _Static_assert(sizeof(text) - 1 == TEXTSIZE);
107 char buffer[512 + TEXTSIZE];
108 char * it = buffer;
109 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
110 it += http_msgs[OK200]->len;
111 int len = http_msgs[OK200]->len;
112 memcpy(it, ts, sizeof(ts) - 1);
113 it += sizeof(ts) - 1;
114 len += sizeof(ts) - 1;
115 memcpy(it, text, TEXTSIZE);
116 return answer(fd, buffer, len + TEXTSIZE);
117}
118#elif defined(PLAINTEXT_1WRITE)
119int answer_plaintext( int fd ) {
120 char text[] = "Hello, World!\n\n";
121 char buffer[512 + sizeof(text)];
122 char * it = buffer;
123 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
124 it += http_msgs[OK200]->len;
125 int len = http_msgs[OK200]->len;
126 int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text));
127 it += r;
128 len += r;
129 memcpy(it, text, sizeof(text));
130 return answer(fd, buffer, len + sizeof(text));
131}
132#else
133int answer_plaintext( int fd ) {
134 char text[] = "Hello, World!\n\n";
135 int ret = answer_header(fd, sizeof(text));
136 if( ret < 0 ) return ret;
137 return answer(fd, text, sizeof(text));
138}
139#endif
140
141int answer_empty( int fd ) {
142 return answer_header(fd, 0);
143}
144
145static int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) {
146 unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE;
147 off_t offset = 0;
148 ssize_t ret;
149 SPLICE1: while(count > 0) {
150 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY);
151 if( ret < 0 ) {
152 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1;
153 if( errno == ECONNRESET ) return -ECONNRESET;
154 if( errno == EPIPE ) return -EPIPE;
155 abort( "splice [0] error: (%d) %s\n", (int)errno, strerror(errno) );
156 }
157
158 count -= ret;
159 size_t in_pipe = ret;
160 SPLICE2: while(in_pipe > 0) {
161 ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY);
162 if( ret < 0 ) {
163 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2;
164 if( errno == ECONNRESET ) return -ECONNRESET;
165 if( errno == EPIPE ) return -EPIPE;
166 abort( "splice [1] error: (%d) %s\n", (int)errno, strerror(errno) );
167 }
168 in_pipe -= ret;
169 }
170
171 }
172 return count;
173}
174
175enum FSM_STATE {
176 Initial,
177 Retry,
178 Error,
179 Done,
180};
181
182struct FSM_Result {
183 FSM_STATE state;
184 int error;
185};
186
187static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; }
188static inline bool is_error(FSM_Result & this) { return Error == this.state; }
189static inline bool is_done(FSM_Result & this) { return Done == this.state; }
190
191static inline int error(FSM_Result & this, int error) {
192 this.error = error;
193 this.state = Error;
194 return error;
195}
196
197static inline int done(FSM_Result & this) {
198 this.state = Done;
199 return 0;
200}
201
202static inline int retry(FSM_Result & this) {
203 this.state = Retry;
204 return 0;
205}
206
207static inline int need(FSM_Result & this) {
208 switch(this.state) {
209 case Initial:
210 case Retry:
211 return 1;
212 case Error:
213 if(this.error == 0) mutex(serr) serr | "State marked error but code is 0";
214 case Done:
215 return 0;
216 }
217}
218
219// Generator that handles sending the header
220generator header_g {
221 io_future_t f;
222 const char * next;
223 int fd; size_t len;
224 FSM_Result res;
225};
226
227static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) {
228 this.next = it;
229 this.fd = fd;
230 this.len = len;
231}
232
233static inline void fill(header_g & this, struct io_uring_sqe * sqe) {
234 zero_sqe(sqe);
235 sqe->opcode = IORING_OP_SEND;
236 sqe->user_data = (uintptr_t)&this.f;
237 sqe->flags = IOSQE_IO_LINK;
238 sqe->fd = this.fd;
239 sqe->addr = (uintptr_t)this.next;
240 sqe->len = this.len;
241}
242
243static inline int error(header_g & this, int error) {
244 int ret = close(this.fd);
245 if( ret != 0 ) {
246 mutex(serr) serr | "Failed to close fd" | errno;
247 }
248 return error(this.res, error);
249}
250
251static inline int wait_and_process(header_g & this, sendfile_stats_t & stats) {
252 wait(this.f);
253
254 // Did something crazy happen?
255 if(this.f.result > this.len) {
256 mutex(serr) serr | "HEADER sent too much!";
257 return error(this, -ERANGE);
258 }
259
260 // Something failed?
261 if(this.f.result < 0) {
262 int error = -this.f.result;
263 if( error == ECONNRESET ) return error(this, -ECONNRESET);
264 if( error == EPIPE ) return error(this, -EPIPE);
265 if( error == ECANCELED ) {
266 mutex(serr) serr | "HEADER was cancelled, WTF!";
267 return error(this, -ECONNRESET);
268 }
269 if( error == EAGAIN || error == EWOULDBLOCK) {
270 mutex(serr) serr | "HEADER got eagain, WTF!";
271 return error(this, -ECONNRESET);
272 }
273 }
274
275 // Done?
276 if(this.f.result == this.len) {
277 return done(this.res);
278 }
279
280 stats.header++;
281
282 // It must be a Short read
283 this.len -= this.f.result;
284 this.next += this.f.result;
285 reset(this.f);
286 return retry(this.res);
287}
288
289// Generator that handles splicing in a file
290struct splice_in_t {
291 io_future_t f;
292 int fd; int pipe; size_t len; off_t off;
293 short zipf_idx;
294 FSM_Result res;
295};
296
297static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) {
298 this.fd = fd;
299 this.pipe = pipe;
300 this.len = len;
301 this.off = 0;
302 this.zipf_idx = -1;
303 STATS: for(i; zipf_cnts) {
304 if(len <= zipf_sizes[i]) {
305 this.zipf_idx = i;
306 break STATS;
307 }
308 }
309 if(this.zipf_idx < 0) mutex(serr) serr | "SPLICE IN" | len | " greated than biggest zipf file";
310}
311
312static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) {
313 zero_sqe(sqe);
314 sqe->opcode = IORING_OP_SPLICE;
315 sqe->user_data = (uintptr_t)&this.f;
316 sqe->flags = 0;
317 sqe->splice_fd_in = this.fd;
318 sqe->splice_off_in = this.off;
319 sqe->fd = this.pipe;
320 sqe->off = (__u64)-1;
321 sqe->len = this.len;
322 sqe->splice_flags = SPLICE_F_MOVE;
323}
324
325static inline int wait_and_process(splice_in_t & this, sendfile_stats_t & stats ) {
326 wait(this.f);
327
328 // Something failed?
329 if(this.f.result < 0) {
330 int error = -this.f.result;
331 if( error == ECONNRESET ) return error(this.res, -ECONNRESET);
332 if( error == EPIPE ) return error(this.res, -EPIPE);
333 if( error == ECANCELED ) {
334 mutex(serr) serr | "SPLICE IN was cancelled, WTF!";
335 return error(this.res, -ECONNRESET);
336 }
337 if( error == EAGAIN || error == EWOULDBLOCK) {
338 mutex(serr) serr | "SPLICE IN got eagain, WTF!";
339 return error(this.res, -ECONNRESET);
340 }
341 mutex(serr) serr | "SPLICE IN got" | error | ", WTF!";
342 return error(this.res, -ECONNRESET);
343 }
344
345 // Did something crazy happen?
346 if(this.f.result > this.len) {
347 mutex(serr) serr | "SPLICE IN spliced too much!";
348 return error(this.res, -ERANGE);
349 }
350
351 // Done?
352 if(this.f.result == this.len) {
353 return done(this.res);
354 }
355
356 stats.splcin++;
357 stats.avgrd[this.zipf_idx].calls++;
358 stats.avgrd[this.zipf_idx].bytes += this.f.result;
359
360 // It must be a Short read
361 this.len -= this.f.result;
362 this.off += this.f.result;
363 reset(this.f);
364 return retry(this.res);
365}
366
367generator splice_out_g {
368 io_future_t f;
369 int pipe; int fd; size_t len;
370 FSM_Result res;
371};
372
373static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) {
374 this.pipe = pipe;
375 this.fd = fd;
376 this.len = len;
377}
378
379static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) {
380 zero_sqe(sqe);
381 sqe->opcode = IORING_OP_SPLICE;
382 sqe->user_data = (uintptr_t)&this.f;
383 sqe->flags = 0;
384 sqe->splice_fd_in = this.pipe;
385 sqe->splice_off_in = (__u64)-1;
386 sqe->fd = this.fd;
387 sqe->off = (__u64)-1;
388 sqe->len = this.len;
389 sqe->splice_flags = SPLICE_F_MOVE;
390}
391
392static inline int error(splice_out_g & this, int error) {
393 int ret = close(this.fd);
394 if( ret != 0 ) {
395 mutex(serr) serr | "Failed to close fd" | errno;
396 }
397 return error(this.res, error);
398}
399
400static inline void wait_and_process(splice_out_g & this, sendfile_stats_t & stats ) {
401 wait(this.f);
402
403 // Something failed?
404 if(this.f.result < 0) {
405 int error = -this.f.result;
406 if( error == ECONNRESET ) return error(this, -ECONNRESET);
407 if( error == EPIPE ) return error(this, -EPIPE);
408 if( error == ECANCELED ) {
409 this.f.result = 0;
410 goto SHORT_WRITE;
411 }
412 if( error == EAGAIN || error == EWOULDBLOCK) {
413 mutex(serr) serr | "SPLICE OUT got eagain, WTF!";
414 return error(this, -ECONNRESET);
415 }
416 mutex(serr) serr | "SPLICE OUT got" | error | ", WTF!";
417 return error(this, -ECONNRESET);
418 }
419
420 // Did something crazy happen?
421 if(this.f.result > this.len) {
422 mutex(serr) serr | "SPLICE OUT spliced too much!" | this.f.result | ">" | this.len;
423 return error(this.res, -ERANGE);
424 }
425
426 // Done?
427 if(this.f.result == this.len) {
428 return done(this.res);
429 }
430
431SHORT_WRITE:
432 stats.splcot++;
433
434 // It must be a Short Write
435 this.len -= this.f.result;
436 reset(this.f);
437 return retry(this.res);
438}
439
440int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize, sendfile_stats_t & stats ) {
441 stats.calls++;
442 #if defined(LINKED_IO)
443 char buffer[512];
444 int len = fill_header(buffer, fsize);
445 header_g header = { fd, buffer, len };
446 splice_in_t splice_in = { ans_fd, pipe[1], fsize };
447 splice_out_g splice_out = { pipe[0], fd, fsize };
448
449 RETRY_LOOP: for() {
450 stats.tries++;
451 int have = need(header.res) + need(splice_in.res) + 1;
452 int idx = 0;
453 struct io_uring_sqe * sqes[3];
454 __u32 idxs[3];
455 struct $io_context * ctx = cfa_io_allocate(sqes, idxs, have);
456
457 if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); }
458 if(need( header.res)) { fill(header , sqes[idx++]); }
459 fill(splice_out, sqes[idx]);
460
461 // Submit everything
462 asm volatile("": : :"memory");
463 cfa_io_submit( ctx, idxs, have, false );
464
465 // wait for the results
466 // Always wait for splice-in to complete as
467 // we may need to kill the connection if it fails
468 // If it already completed, this is a no-op
469 wait_and_process(splice_in, stats);
470
471 if(is_error(splice_in.res)) {
472 mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error;
473 close(fd);
474 }
475
476 // Process the other 2
477 wait_and_process(header, stats);
478 wait_and_process(splice_out, stats);
479
480 if(is_done(splice_out.res)) {
481 break RETRY_LOOP;
482 }
483
484 // We need to wait for the completion if
485 // - both completed
486 // - the header failed
487 // -
488
489 if( is_error(header.res)
490 || is_error(splice_in.res)
491 || is_error(splice_out.res)) {
492 return -ECONNRESET;
493 }
494 }
495
496 return len + fsize;
497 #else
498 stats.tries++;
499 int ret = answer_header(fd, fsize);
500 if( ret < 0 ) { close(fd); return ret; }
501 return sendfile(pipe, fd, ans_fd, fsize);
502 #endif
503}
504
505[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) {
506 char * it = buffer;
507 size_t count = len - 1;
508 int rlen = 0;
509 READ:
510 for() {
511 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
512 // int ret = read(fd, (void*)it, count);
513 if(ret == 0 ) return [OK200, true, 0, 0];
514 if(ret < 0 ) {
515 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ;
516 if( errno == ECONNRESET ) { close(fd); return [E408, true, 0, 0]; }
517 if( errno == EPIPE ) { close(fd); return [E408, true, 0, 0]; }
518 abort( "read error: (%d) %s\n", (int)errno, strerror(errno) );
519 }
520 it[ret + 1] = '\0';
521 rlen += ret;
522
523 if( strstr( it, "\r\n\r\n" ) ) break;
524
525 it += ret;
526 count -= ret;
527
528 if( count < 1 ) return [E414, false, 0, 0];
529 }
530
531 if( options.log ) {
532 write(sout, buffer, rlen);
533 sout | nl;
534 }
535
536 it = buffer;
537 int ret = memcmp(it, "GET /", 5);
538 if( ret != 0 ) return [E400, false, 0, 0];
539 it += 5;
540
541 char * end = strstr( it, " " );
542 return [OK200, false, it, end - it];
543}
544
545//=============================================================================================
546
547#include <clock.hfa>
548#include <time.hfa>
549#include <thread.hfa>
550
551const char * original_http_msgs[] = {
552 "HTTP/1.1 200 OK\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: ",
553 "HTTP/1.1 200 OK\r\nServer: HttpForall\r\nConnection: keep-alive\r\nContent-Length: 15\r\nContent-Type: text/html\r\nDate: %s \r\n\r\nHello, World!\r\n",
554 "HTTP/1.1 400 Bad Request\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
555 "HTTP/1.1 404 Not Found\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
556 "HTTP/1.1 405 Method Not Allowed\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
557 "HTTP/1.1 408 Request Timeout\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
558 "HTTP/1.1 413 Payload Too Large\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
559 "HTTP/1.1 414 URI Too Long\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
560};
561
562struct date_buffer {
563 https_msg_str strs[KNOWN_CODES];
564};
565
566thread DateFormater {
567 int idx;
568 date_buffer buffers[2];
569};
570
571void ?{}( DateFormater & this ) {
572 ((thread&)this){ "Server Date Thread", *options.clopts.instance[0] };
573 this.idx = 0;
574 memset( &this.buffers[0], 0, sizeof(this.buffers[0]) );
575 memset( &this.buffers[1], 0, sizeof(this.buffers[1]) );
576}
577
578void main(DateFormater & this) {
579 LOOP: for() {
580 waitfor( ^?{} : this) {
581 break LOOP;
582 }
583 or else {}
584
585
586 char buff[100];
587 Time now = timeHiRes();
588 strftime( buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now );
589 // if( options.log ) sout | "Updated date to '" | buff | "'";
590
591 for(i; KNOWN_CODES) {
592 size_t len = snprintf( this.buffers[this.idx].strs[i].msg, 512, original_http_msgs[i], buff );
593 this.buffers[this.idx].strs[i].len = len;
594 }
595
596 for(i; KNOWN_CODES) {
597 https_msg_str * next = &this.buffers[this.idx].strs[i];
598 __atomic_exchange_n((https_msg_str * volatile *)&http_msgs[i], next, __ATOMIC_SEQ_CST);
599 }
600 this.idx = (this.idx + 1) % 2;
601
602 // if( options.log ) sout | "Date thread sleeping";
603
604 sleep(1`s);
605 }
606}
607
608//=============================================================================================
609DateFormater * the_date_formatter;
610
611void init_protocol(void) {
612 the_date_formatter = alloc();
613 (*the_date_formatter){};
614}
615
616void deinit_protocol(void) {
617 ^(*the_date_formatter){};
618 free( the_date_formatter );
619}
Note: See TracBrowser for help on using the repository browser.