source: benchmark/io/http/protocol.cfa@ 4ecc35a

ADT ast-experimental enum pthread-emulation qualifiedEnum
Last change on this file since 4ecc35a was 93829cb, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Added helper function to zero out sqes.
Not done in allocate since so it can be inlined and redudant writes eliminated.

  • Property mode set to 100644
File size: 14.9 KB
RevLine 
[0aec496]1#include "protocol.hfa"
2
[c82af9f]3#define _GNU_SOURCE
4extern "C" {
5 #include <fcntl.h>
6}
[8c43d05]7
[857a1c6]8#define xstr(s) str(s)
9#define str(s) #s
10
[8c43d05]11#include <fstream.hfa>
[0aec496]12#include <iofwd.hfa>
[3f39009]13#include <io/types.hfa>
14#include <mutex_stmt.hfa>
[0aec496]15
16#include <assert.h>
17// #include <stdio.h> // Don't use stdio.h, too slow to compile
18extern "C" {
19 int snprintf ( char * s, size_t n, const char * format, ... );
[8c43d05]20 // #include <linux/io_uring.h>
[0aec496]21}
22#include <string.h>
23#include <errno.h>
24
[d11d6eb]25#include "options.hfa"
[0aec496]26
[ed2cb3c]27#define PLAINTEXT_1WRITE
[857a1c6]28#define PLAINTEXT_MEMCPY
[ed2cb3c]29#define PLAINTEXT_NOCOPY
[3f39009]30#define LINKED_IO
[ed2cb3c]31
[2caed18]32struct https_msg_str {
33 char msg[512];
34 size_t len;
[0aec496]35};
36
[2caed18]37const https_msg_str * volatile http_msgs[KNOWN_CODES] = { 0 };
38
[2ecbd7b]39_Static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0])));
40
[2caed18]41const int http_codes[KNOWN_CODES] = {
[ed2cb3c]42 200,
[2ecbd7b]43 200,
44 400,
45 404,
[b57db73]46 405,
[ee59ede]47 408,
[2ecbd7b]48 413,
49 414,
50};
51
52_Static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0])));
53
54int code_val(HttpCode code) {
55 return http_codes[code];
56}
[0aec496]57
[3f39009]58static inline int answer( int fd, const char * it, int len ) {
[0aec496]59 while(len > 0) {
60 // Call write
[2cd784a]61 int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY);
[ee59ede]62 if( ret < 0 ) {
[3f39009]63 if( errno == ECONNRESET || errno == EPIPE ) { close(fd); return -ECONNRESET; }
[ee59ede]64 if( errno == EAGAIN || errno == EWOULDBLOCK) return -EAGAIN;
65
66 abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) );
67 }
[0aec496]68
69 // update it/len
70 it += ret;
71 len -= ret;
72 }
73 return 0;
74}
75
76int answer_error( int fd, HttpCode code ) {
77 /* paranoid */ assert( code < KNOWN_CODES && code != OK200 );
78 int idx = (int)code;
[2caed18]79 return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len );
[0aec496]80}
81
[3f39009]82static int fill_header(char * it, size_t size) {
[2caed18]83 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
84 it += http_msgs[OK200]->len;
85 int len = http_msgs[OK200]->len;
86 len += snprintf(it, 512 - len, "%d \n\n", size);
[3f39009]87 return len;
88}
89
90static int answer_header( int fd, size_t size ) {
91 char buffer[512];
92 int len = fill_header(buffer, size);
[0aec496]93 return answer( fd, buffer, len );
94}
95
[ed2cb3c]96#if defined(PLAINTEXT_NOCOPY)
97int answer_plaintext( int fd ) {
[857a1c6]98 return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len); // +1 cause snprintf doesn't count nullterminator
99}
100#elif defined(PLAINTEXT_MEMCPY)
101#define TEXTSIZE 15
102int answer_plaintext( int fd ) {
103 char text[] = "Hello, World!\n\n";
104 char ts[] = xstr(TEXTSIZE) " \n\n";
105 _Static_assert(sizeof(text) - 1 == TEXTSIZE);
106 char buffer[512 + TEXTSIZE];
107 char * it = buffer;
108 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
109 it += http_msgs[OK200]->len;
110 int len = http_msgs[OK200]->len;
111 memcpy(it, ts, sizeof(ts) - 1);
112 it += sizeof(ts) - 1;
113 len += sizeof(ts) - 1;
114 memcpy(it, text, TEXTSIZE);
115 return answer(fd, buffer, len + TEXTSIZE);
[ed2cb3c]116}
117#elif defined(PLAINTEXT_1WRITE)
[187fdb8]118int answer_plaintext( int fd ) {
[857a1c6]119 char text[] = "Hello, World!\n\n";
[187fdb8]120 char buffer[512 + sizeof(text)];
121 char * it = buffer;
122 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
123 it += http_msgs[OK200]->len;
124 int len = http_msgs[OK200]->len;
125 int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text));
126 it += r;
127 len += r;
128 memcpy(it, text, sizeof(text));
129 return answer(fd, buffer, len + sizeof(text));
130}
131#else
132int answer_plaintext( int fd ) {
[857a1c6]133 char text[] = "Hello, World!\n\n";
[187fdb8]134 int ret = answer_header(fd, sizeof(text));
[ba77750]135 if( ret < 0 ) return ret;
[187fdb8]136 return answer(fd, text, sizeof(text));
[ba77750]137}
[187fdb8]138#endif
[ba77750]139
[7270432]140int answer_empty( int fd ) {
141 return answer_header(fd, 0);
142}
143
[3f39009]144static int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) {
145 unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE;
146 off_t offset = 0;
147 ssize_t ret;
148 SPLICE1: while(count > 0) {
149 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY);
150 if( ret < 0 ) {
151 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1;
152 if( errno == ECONNRESET ) return -ECONNRESET;
153 if( errno == EPIPE ) return -EPIPE;
154 abort( "splice [0] error: (%d) %s\n", (int)errno, strerror(errno) );
155 }
156
157 count -= ret;
158 offset += ret;
159 size_t in_pipe = ret;
160 SPLICE2: while(in_pipe > 0) {
161 ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY);
162 if( ret < 0 ) {
163 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2;
164 if( errno == ECONNRESET ) return -ECONNRESET;
165 if( errno == EPIPE ) return -EPIPE;
166 abort( "splice [1] error: (%d) %s\n", (int)errno, strerror(errno) );
167 }
168 in_pipe -= ret;
169 }
170
171 }
172 return count;
173}
174
175enum FSM_STATE {
176 Initial,
177 Retry,
178 Error,
179 Done,
180};
181
182struct FSM_Result {
183 FSM_STATE state;
184 int error;
185};
186
187static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; }
188static inline bool is_error(FSM_Result & this) { return Error == this.state; }
189static inline bool is_done(FSM_Result & this) { return Done == this.state; }
190
191static inline int error(FSM_Result & this, int error) {
192 this.error = error;
193 this.state = Error;
194 return error;
195}
196
197static inline int done(FSM_Result & this) {
198 this.state = Done;
199 return 0;
200}
201
202static inline int retry(FSM_Result & this) {
203 this.state = Retry;
204 return 0;
205}
206
207static inline int need(FSM_Result & this) {
208 switch(this.state) {
209 case Initial:
210 case Retry:
211 return 1;
212 case Error:
213 if(this.error == 0) mutex(serr) serr | "State marked error but code is 0";
214 case Done:
215 return 0;
216 }
217}
218
219// Generator that handles sending the header
220generator header_g {
221 io_future_t f;
222 const char * next;
223 int fd; size_t len;
224 FSM_Result res;
225};
226
227static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) {
228 this.next = it;
229 this.fd = fd;
230 this.len = len;
231}
232
233static inline void fill(header_g & this, struct io_uring_sqe * sqe) {
234 zero_sqe(sqe);
235 sqe->opcode = IORING_OP_SEND;
236 sqe->user_data = (uintptr_t)&this.f;
237 sqe->flags = IOSQE_IO_LINK;
238 sqe->fd = this.fd;
239 sqe->addr = (uintptr_t)this.next;
240 sqe->len = this.len;
241}
242
243static inline int error(header_g & this, int error) {
244 int ret = close(this.fd);
245 if( ret != 0 ) {
246 mutex(serr) serr | "Failed to close fd" | errno;
247 }
248 return error(this.res, error);
249}
250
251static inline int wait_and_process(header_g & this) {
252 wait(this.f);
253
254 // Did something crazy happen?
255 if(this.f.result > this.len) {
256 mutex(serr) serr | "HEADER sent too much!";
257 return error(this, -ERANGE);
258 }
259
260 // Something failed?
261 if(this.f.result < 0) {
262 int error = -this.f.result;
263 if( error == ECONNRESET ) return error(this, -ECONNRESET);
264 if( error == EPIPE ) return error(this, -EPIPE);
265 if( error == ECANCELED ) {
266 mutex(serr) serr | "HEADER was cancelled, WTF!";
267 return error(this, -ECONNRESET);
268 }
269 if( error == EAGAIN || error == EWOULDBLOCK) {
270 mutex(serr) serr | "HEADER got eagain, WTF!";
271 return error(this, -ECONNRESET);
272 }
273 }
274
275 // Done?
276 if(this.f.result == this.len) {
277 return done(this.res);
278 }
279
280 // It must be a Short read
281 this.len -= this.f.result;
282 this.next += this.f.result;
283 reset(this.f);
284 return retry(this.res);
285}
286
287// Generator that handles splicing in a file
288struct splice_in_t {
289 io_future_t f;
290 int fd; int pipe; size_t len; off_t off;
291 FSM_Result res;
292};
293
294static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) {
295 this.fd = fd;
296 this.pipe = pipe;
297 this.len = len;
298 this.off = 0;
299}
300
301static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) {
302 zero_sqe(sqe);
303 sqe->opcode = IORING_OP_SPLICE;
304 sqe->user_data = (uintptr_t)&this.f;
305 sqe->flags = 0;
306 sqe->splice_fd_in = this.fd;
307 sqe->splice_off_in = this.off;
308 sqe->fd = this.pipe;
309 sqe->off = (__u64)-1;
310 sqe->len = this.len;
311 sqe->splice_flags = SPLICE_F_MOVE;
312}
313
314static inline int wait_and_process(splice_in_t & this) {
315 wait(this.f);
316
317 // Something failed?
318 if(this.f.result < 0) {
319 int error = -this.f.result;
320 if( error == ECONNRESET ) return error(this.res, -ECONNRESET);
321 if( error == EPIPE ) return error(this.res, -EPIPE);
322 if( error == ECANCELED ) {
323 mutex(serr) serr | "SPLICE IN was cancelled, WTF!";
324 return error(this.res, -ECONNRESET);
325 }
326 if( error == EAGAIN || error == EWOULDBLOCK) {
327 mutex(serr) serr | "SPLICE IN got eagain, WTF!";
328 return error(this.res, -ECONNRESET);
329 }
330 }
331
[644162a]332 // Did something crazy happen?
333 if(this.f.result > this.len) {
334 mutex(serr) serr | "SPLICE IN spliced too much!";
335 return error(this.res, -ERANGE);
336 }
337
[3f39009]338 // Done?
339 if(this.f.result == this.len) {
340 return done(this.res);
341 }
342
343 // It must be a Short read
344 this.len -= this.f.result;
345 this.off += this.f.result;
346 reset(this.f);
347 return retry(this.res);
348}
349
350generator splice_out_g {
351 io_future_t f;
352 int pipe; int fd; size_t len;
353 FSM_Result res;
354};
355
356static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) {
357 this.pipe = pipe;
358 this.fd = fd;
359 this.len = len;
360}
361
362static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) {
363 zero_sqe(sqe);
364 sqe->opcode = IORING_OP_SPLICE;
365 sqe->user_data = (uintptr_t)&this.f;
366 sqe->flags = 0;
367 sqe->splice_fd_in = this.pipe;
368 sqe->splice_off_in = (__u64)-1;
369 sqe->fd = this.fd;
370 sqe->off = (__u64)-1;
371 sqe->len = this.len;
372 sqe->splice_flags = SPLICE_F_MOVE;
373}
374
375static inline int error(splice_out_g & this, int error) {
376 int ret = close(this.fd);
377 if( ret != 0 ) {
378 mutex(serr) serr | "Failed to close fd" | errno;
379 }
380 return error(this.res, error);
381}
382
383static inline void wait_and_process(splice_out_g & this) {
384 wait(this.f);
385
386 // Something failed?
387 if(this.f.result < 0) {
388 int error = -this.f.result;
389 if( error == ECONNRESET ) return error(this, -ECONNRESET);
390 if( error == EPIPE ) return error(this, -EPIPE);
391 if( error == ECANCELED ) {
392 this.f.result = 0;
393 goto SHORT_WRITE;
394 }
395 if( error == EAGAIN || error == EWOULDBLOCK) {
396 mutex(serr) serr | "SPLICE OUT got eagain, WTF!";
397 return error(this, -ECONNRESET);
398 }
399 }
400
[644162a]401 // Did something crazy happen?
402 if(this.f.result > this.len) {
403 mutex(serr) serr | "SPLICE OUT spliced too much!" | this.f.result | ">" | this.len;
404 return error(this.res, -ERANGE);
405 }
406
[3f39009]407 // Done?
408 if(this.f.result == this.len) {
409 return done(this.res);
410 }
411
412SHORT_WRITE:
413 // It must be a Short Write
414 this.len -= this.f.result;
415 reset(this.f);
416 return retry(this.res);
417}
418
419int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize ) {
420 #if defined(LINKED_IO)
421 char buffer[512];
422 int len = fill_header(buffer, fsize);
423 header_g header = { fd, buffer, len };
424 splice_in_t splice_in = { ans_fd, pipe[1], fsize };
425 splice_out_g splice_out = { pipe[0], fd, fsize };
426
427 RETRY_LOOP: for() {
428 int have = need(header.res) + need(splice_in.res) + 1;
429 int idx = 0;
430 struct io_uring_sqe * sqes[3];
431 __u32 idxs[3];
432 struct $io_context * ctx = cfa_io_allocate(sqes, idxs, have);
433
434 if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); }
435 if(need( header.res)) { fill(header , sqes[idx++]); }
436 fill(splice_out, sqes[idx]);
437
438 // Submit everything
439 asm volatile("": : :"memory");
440 cfa_io_submit( ctx, idxs, have, false );
441
442 // wait for the results
443 // Always wait for splice-in to complete as
444 // we may need to kill the connection if it fails
445 // If it already completed, this is a no-op
446 wait_and_process(splice_in);
447
448 if(is_error(splice_in.res)) {
449 mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error;
450 close(fd);
451 }
452
453 // Process the other 2
454 wait_and_process(header);
455 wait_and_process(splice_out);
456
457 if(is_done(splice_out.res)) {
458 break RETRY_LOOP;
459 }
460
461 // We need to wait for the completion if
462 // - both completed
463 // - the header failed
464 // -
465
466 if( is_error(header.res)
467 || is_error(splice_in.res)
468 || is_error(splice_out.res)) {
469 return -ECONNRESET;
470 }
471 }
472
473 return len + fsize;
474 #else
475 int ret = answer_header(fd, fsize);
[42b7fa5f]476 if( ret < 0 ) { close(fd); return ret; }
[3f39009]477 return sendfile(pipe, fd, ans_fd, fsize);
478 #endif
479}
[ba77750]480
[4f762d3]481[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) {
[0aec496]482 char * it = buffer;
483 size_t count = len - 1;
484 int rlen = 0;
485 READ:
486 for() {
[2cd784a]487 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
[c3ee5f3]488 // int ret = read(fd, (void*)it, count);
[d11d6eb]489 if(ret == 0 ) return [OK200, true, 0, 0];
[0aec496]490 if(ret < 0 ) {
491 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ;
[3f39009]492 if( errno == ECONNRESET ) { close(fd); return [E408, true, 0, 0]; }
493 if( errno == EPIPE ) { close(fd); return [E408, true, 0, 0]; }
[0aec496]494 abort( "read error: (%d) %s\n", (int)errno, strerror(errno) );
495 }
496 it[ret + 1] = '\0';
497 rlen += ret;
498
499 if( strstr( it, "\r\n\r\n" ) ) break;
500
501 it += ret;
502 count -= ret;
503
[d11d6eb]504 if( count < 1 ) return [E414, false, 0, 0];
[0aec496]505 }
506
[8c43d05]507 if( options.log ) {
508 write(sout, buffer, rlen);
509 sout | nl;
510 }
[0aec496]511
512 it = buffer;
513 int ret = memcmp(it, "GET /", 5);
[d11d6eb]514 if( ret != 0 ) return [E400, false, 0, 0];
[0aec496]515 it += 5;
516
517 char * end = strstr( it, " " );
518 return [OK200, false, it, end - it];
[c82af9f]519}
520
[c3ee5f3]521//=============================================================================================
522
523#include <clock.hfa>
524#include <time.hfa>
525#include <thread.hfa>
526
[2caed18]527const char * original_http_msgs[] = {
[1db1454]528 "HTTP/1.1 200 OK\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: ",
[52d2545]529 "HTTP/1.1 200 OK\r\nServer: HttpForall\r\nConnection: keep-alive\r\nContent-Length: 15\r\nContent-Type: text/html\r\nDate: %s \r\n\r\nHello, World!\r\n",
[1db1454]530 "HTTP/1.1 400 Bad Request\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
531 "HTTP/1.1 404 Not Found\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
532 "HTTP/1.1 405 Method Not Allowed\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
533 "HTTP/1.1 408 Request Timeout\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
534 "HTTP/1.1 413 Payload Too Large\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
535 "HTTP/1.1 414 URI Too Long\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
[2caed18]536};
537
[c3ee5f3]538struct date_buffer {
[2caed18]539 https_msg_str strs[KNOWN_CODES];
[c3ee5f3]540};
541
542thread DateFormater {
543 int idx;
544 date_buffer buffers[2];
545};
546
547void ?{}( DateFormater & this ) {
[348f81d5]548 ((thread&)this){ "Server Date Thread", *options.clopts.instance[0] };
[c3ee5f3]549 this.idx = 0;
[2caed18]550 memset( &this.buffers[0], 0, sizeof(this.buffers[0]) );
551 memset( &this.buffers[1], 0, sizeof(this.buffers[1]) );
[c3ee5f3]552}
553
554void main(DateFormater & this) {
555 LOOP: for() {
556 waitfor( ^?{} : this) {
557 break LOOP;
558 }
559 or else {}
560
[2caed18]561
562 char buff[100];
[e54d0c3]563 Time now = timeHiRes();
[2caed18]564 strftime( buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now );
[3f39009]565 // if( options.log ) sout | "Updated date to '" | buff | "'";
[ece0e80]566
[2caed18]567 for(i; KNOWN_CODES) {
568 size_t len = snprintf( this.buffers[this.idx].strs[i].msg, 512, original_http_msgs[i], buff );
569 this.buffers[this.idx].strs[i].len = len;
570 }
[c3ee5f3]571
[2caed18]572 for(i; KNOWN_CODES) {
573 https_msg_str * next = &this.buffers[this.idx].strs[i];
574 __atomic_exchange_n((https_msg_str * volatile *)&http_msgs[i], next, __ATOMIC_SEQ_CST);
575 }
[c3ee5f3]576 this.idx = (this.idx + 1) % 2;
577
[3f39009]578 // if( options.log ) sout | "Date thread sleeping";
[2cd784a]579
[c3ee5f3]580 sleep(1`s);
581 }
582}
583
584//=============================================================================================
585DateFormater * the_date_formatter;
586
587void init_protocol(void) {
588 the_date_formatter = alloc();
589 (*the_date_formatter){};
590}
591
592void deinit_protocol(void) {
593 ^(*the_date_formatter){};
594 free( the_date_formatter );
[e54d0c3]595}
Note: See TracBrowser for help on using the repository browser.