source: benchmark/io/http/protocol.cfa@ 6dc17a3d

ADT ast-experimental enum pthread-emulation qualifiedEnum
Last change on this file since 6dc17a3d was 644162a, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Fix problem in splice use with signed/unsigned comparison.

  • Property mode set to 100644
File size: 15.2 KB
RevLine 
[0aec496]1#include "protocol.hfa"
2
[c82af9f]3#define _GNU_SOURCE
4extern "C" {
5 #include <fcntl.h>
6}
[8c43d05]7
[857a1c6]8#define xstr(s) str(s)
9#define str(s) #s
10
[8c43d05]11#include <fstream.hfa>
[0aec496]12#include <iofwd.hfa>
[3f39009]13#include <io/types.hfa>
14#include <mutex_stmt.hfa>
[0aec496]15
16#include <assert.h>
17// #include <stdio.h> // Don't use stdio.h, too slow to compile
18extern "C" {
19 int snprintf ( char * s, size_t n, const char * format, ... );
[8c43d05]20 // #include <linux/io_uring.h>
[0aec496]21}
22#include <string.h>
23#include <errno.h>
24
[d11d6eb]25#include "options.hfa"
[0aec496]26
[ed2cb3c]27#define PLAINTEXT_1WRITE
[857a1c6]28#define PLAINTEXT_MEMCPY
[ed2cb3c]29#define PLAINTEXT_NOCOPY
[3f39009]30#define LINKED_IO
[ed2cb3c]31
[2caed18]32struct https_msg_str {
33 char msg[512];
34 size_t len;
[0aec496]35};
36
[2caed18]37const https_msg_str * volatile http_msgs[KNOWN_CODES] = { 0 };
38
[2ecbd7b]39_Static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0])));
40
[2caed18]41const int http_codes[KNOWN_CODES] = {
[ed2cb3c]42 200,
[2ecbd7b]43 200,
44 400,
45 404,
[b57db73]46 405,
[ee59ede]47 408,
[2ecbd7b]48 413,
49 414,
50};
51
52_Static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0])));
53
54int code_val(HttpCode code) {
55 return http_codes[code];
56}
[0aec496]57
[3f39009]58static inline int answer( int fd, const char * it, int len ) {
[0aec496]59 while(len > 0) {
60 // Call write
[2cd784a]61 int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY);
[ee59ede]62 if( ret < 0 ) {
[3f39009]63 if( errno == ECONNRESET || errno == EPIPE ) { close(fd); return -ECONNRESET; }
[ee59ede]64 if( errno == EAGAIN || errno == EWOULDBLOCK) return -EAGAIN;
65
66 abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) );
67 }
[0aec496]68
69 // update it/len
70 it += ret;
71 len -= ret;
72 }
73 return 0;
74}
75
76int answer_error( int fd, HttpCode code ) {
77 /* paranoid */ assert( code < KNOWN_CODES && code != OK200 );
78 int idx = (int)code;
[2caed18]79 return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len );
[0aec496]80}
81
[3f39009]82static int fill_header(char * it, size_t size) {
[2caed18]83 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
84 it += http_msgs[OK200]->len;
85 int len = http_msgs[OK200]->len;
86 len += snprintf(it, 512 - len, "%d \n\n", size);
[3f39009]87 return len;
88}
89
90static int answer_header( int fd, size_t size ) {
91 char buffer[512];
92 int len = fill_header(buffer, size);
[0aec496]93 return answer( fd, buffer, len );
94}
95
[ed2cb3c]96#if defined(PLAINTEXT_NOCOPY)
97int answer_plaintext( int fd ) {
[857a1c6]98 return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len); // +1 cause snprintf doesn't count nullterminator
99}
100#elif defined(PLAINTEXT_MEMCPY)
101#define TEXTSIZE 15
102int answer_plaintext( int fd ) {
103 char text[] = "Hello, World!\n\n";
104 char ts[] = xstr(TEXTSIZE) " \n\n";
105 _Static_assert(sizeof(text) - 1 == TEXTSIZE);
106 char buffer[512 + TEXTSIZE];
107 char * it = buffer;
108 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
109 it += http_msgs[OK200]->len;
110 int len = http_msgs[OK200]->len;
111 memcpy(it, ts, sizeof(ts) - 1);
112 it += sizeof(ts) - 1;
113 len += sizeof(ts) - 1;
114 memcpy(it, text, TEXTSIZE);
115 return answer(fd, buffer, len + TEXTSIZE);
[ed2cb3c]116}
117#elif defined(PLAINTEXT_1WRITE)
[187fdb8]118int answer_plaintext( int fd ) {
[857a1c6]119 char text[] = "Hello, World!\n\n";
[187fdb8]120 char buffer[512 + sizeof(text)];
121 char * it = buffer;
122 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
123 it += http_msgs[OK200]->len;
124 int len = http_msgs[OK200]->len;
125 int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text));
126 it += r;
127 len += r;
128 memcpy(it, text, sizeof(text));
129 return answer(fd, buffer, len + sizeof(text));
130}
131#else
132int answer_plaintext( int fd ) {
[857a1c6]133 char text[] = "Hello, World!\n\n";
[187fdb8]134 int ret = answer_header(fd, sizeof(text));
[ba77750]135 if( ret < 0 ) return ret;
[187fdb8]136 return answer(fd, text, sizeof(text));
[ba77750]137}
[187fdb8]138#endif
[ba77750]139
[7270432]140int answer_empty( int fd ) {
141 return answer_header(fd, 0);
142}
143
[3f39009]144static int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) {
145 unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE;
146 off_t offset = 0;
147 ssize_t ret;
148 SPLICE1: while(count > 0) {
149 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY);
150 if( ret < 0 ) {
151 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1;
152 if( errno == ECONNRESET ) return -ECONNRESET;
153 if( errno == EPIPE ) return -EPIPE;
154 abort( "splice [0] error: (%d) %s\n", (int)errno, strerror(errno) );
155 }
156
157 count -= ret;
158 offset += ret;
159 size_t in_pipe = ret;
160 SPLICE2: while(in_pipe > 0) {
161 ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY);
162 if( ret < 0 ) {
163 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2;
164 if( errno == ECONNRESET ) return -ECONNRESET;
165 if( errno == EPIPE ) return -EPIPE;
166 abort( "splice [1] error: (%d) %s\n", (int)errno, strerror(errno) );
167 }
168 in_pipe -= ret;
169 }
170
171 }
172 return count;
173}
174
175static void zero_sqe(struct io_uring_sqe * sqe) {
176 sqe->flags = 0;
177 sqe->ioprio = 0;
178 sqe->fd = 0;
179 sqe->off = 0;
180 sqe->addr = 0;
181 sqe->len = 0;
182 sqe->fsync_flags = 0;
183 sqe->__pad2[0] = 0;
184 sqe->__pad2[1] = 0;
185 sqe->__pad2[2] = 0;
186 sqe->fd = 0;
187 sqe->off = 0;
188 sqe->addr = 0;
189 sqe->len = 0;
190}
191
192enum FSM_STATE {
193 Initial,
194 Retry,
195 Error,
196 Done,
197};
198
199struct FSM_Result {
200 FSM_STATE state;
201 int error;
202};
203
204static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; }
205static inline bool is_error(FSM_Result & this) { return Error == this.state; }
206static inline bool is_done(FSM_Result & this) { return Done == this.state; }
207
208static inline int error(FSM_Result & this, int error) {
209 this.error = error;
210 this.state = Error;
211 return error;
212}
213
214static inline int done(FSM_Result & this) {
215 this.state = Done;
216 return 0;
217}
218
219static inline int retry(FSM_Result & this) {
220 this.state = Retry;
221 return 0;
222}
223
224static inline int need(FSM_Result & this) {
225 switch(this.state) {
226 case Initial:
227 case Retry:
228 return 1;
229 case Error:
230 if(this.error == 0) mutex(serr) serr | "State marked error but code is 0";
231 case Done:
232 return 0;
233 }
234}
235
236// Generator that handles sending the header
237generator header_g {
238 io_future_t f;
239 const char * next;
240 int fd; size_t len;
241 FSM_Result res;
242};
243
244static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) {
245 this.next = it;
246 this.fd = fd;
247 this.len = len;
248}
249
250static inline void fill(header_g & this, struct io_uring_sqe * sqe) {
251 zero_sqe(sqe);
252 sqe->opcode = IORING_OP_SEND;
253 sqe->user_data = (uintptr_t)&this.f;
254 sqe->flags = IOSQE_IO_LINK;
255 sqe->fd = this.fd;
256 sqe->addr = (uintptr_t)this.next;
257 sqe->len = this.len;
258}
259
260static inline int error(header_g & this, int error) {
261 int ret = close(this.fd);
262 if( ret != 0 ) {
263 mutex(serr) serr | "Failed to close fd" | errno;
264 }
265 return error(this.res, error);
266}
267
268static inline int wait_and_process(header_g & this) {
269 wait(this.f);
270
271 // Did something crazy happen?
272 if(this.f.result > this.len) {
273 mutex(serr) serr | "HEADER sent too much!";
274 return error(this, -ERANGE);
275 }
276
277 // Something failed?
278 if(this.f.result < 0) {
279 int error = -this.f.result;
280 if( error == ECONNRESET ) return error(this, -ECONNRESET);
281 if( error == EPIPE ) return error(this, -EPIPE);
282 if( error == ECANCELED ) {
283 mutex(serr) serr | "HEADER was cancelled, WTF!";
284 return error(this, -ECONNRESET);
285 }
286 if( error == EAGAIN || error == EWOULDBLOCK) {
287 mutex(serr) serr | "HEADER got eagain, WTF!";
288 return error(this, -ECONNRESET);
289 }
290 }
291
292 // Done?
293 if(this.f.result == this.len) {
294 return done(this.res);
295 }
296
297 // It must be a Short read
298 this.len -= this.f.result;
299 this.next += this.f.result;
300 reset(this.f);
301 return retry(this.res);
302}
303
304// Generator that handles splicing in a file
305struct splice_in_t {
306 io_future_t f;
307 int fd; int pipe; size_t len; off_t off;
308 FSM_Result res;
309};
310
311static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) {
312 this.fd = fd;
313 this.pipe = pipe;
314 this.len = len;
315 this.off = 0;
316}
317
318static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) {
319 zero_sqe(sqe);
320 sqe->opcode = IORING_OP_SPLICE;
321 sqe->user_data = (uintptr_t)&this.f;
322 sqe->flags = 0;
323 sqe->splice_fd_in = this.fd;
324 sqe->splice_off_in = this.off;
325 sqe->fd = this.pipe;
326 sqe->off = (__u64)-1;
327 sqe->len = this.len;
328 sqe->splice_flags = SPLICE_F_MOVE;
329}
330
331static inline int wait_and_process(splice_in_t & this) {
332 wait(this.f);
333
334 // Something failed?
335 if(this.f.result < 0) {
336 int error = -this.f.result;
337 if( error == ECONNRESET ) return error(this.res, -ECONNRESET);
338 if( error == EPIPE ) return error(this.res, -EPIPE);
339 if( error == ECANCELED ) {
340 mutex(serr) serr | "SPLICE IN was cancelled, WTF!";
341 return error(this.res, -ECONNRESET);
342 }
343 if( error == EAGAIN || error == EWOULDBLOCK) {
344 mutex(serr) serr | "SPLICE IN got eagain, WTF!";
345 return error(this.res, -ECONNRESET);
346 }
347 }
348
[644162a]349 // Did something crazy happen?
350 if(this.f.result > this.len) {
351 mutex(serr) serr | "SPLICE IN spliced too much!";
352 return error(this.res, -ERANGE);
353 }
354
[3f39009]355 // Done?
356 if(this.f.result == this.len) {
357 return done(this.res);
358 }
359
360 // It must be a Short read
361 this.len -= this.f.result;
362 this.off += this.f.result;
363 reset(this.f);
364 return retry(this.res);
365}
366
367generator splice_out_g {
368 io_future_t f;
369 int pipe; int fd; size_t len;
370 FSM_Result res;
371};
372
373static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) {
374 this.pipe = pipe;
375 this.fd = fd;
376 this.len = len;
377}
378
379static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) {
380 zero_sqe(sqe);
381 sqe->opcode = IORING_OP_SPLICE;
382 sqe->user_data = (uintptr_t)&this.f;
383 sqe->flags = 0;
384 sqe->splice_fd_in = this.pipe;
385 sqe->splice_off_in = (__u64)-1;
386 sqe->fd = this.fd;
387 sqe->off = (__u64)-1;
388 sqe->len = this.len;
389 sqe->splice_flags = SPLICE_F_MOVE;
390}
391
392static inline int error(splice_out_g & this, int error) {
393 int ret = close(this.fd);
394 if( ret != 0 ) {
395 mutex(serr) serr | "Failed to close fd" | errno;
396 }
397 return error(this.res, error);
398}
399
400static inline void wait_and_process(splice_out_g & this) {
401 wait(this.f);
402
403 // Something failed?
404 if(this.f.result < 0) {
405 int error = -this.f.result;
406 if( error == ECONNRESET ) return error(this, -ECONNRESET);
407 if( error == EPIPE ) return error(this, -EPIPE);
408 if( error == ECANCELED ) {
409 this.f.result = 0;
410 goto SHORT_WRITE;
411 }
412 if( error == EAGAIN || error == EWOULDBLOCK) {
413 mutex(serr) serr | "SPLICE OUT got eagain, WTF!";
414 return error(this, -ECONNRESET);
415 }
416 }
417
[644162a]418 // Did something crazy happen?
419 if(this.f.result > this.len) {
420 mutex(serr) serr | "SPLICE OUT spliced too much!" | this.f.result | ">" | this.len;
421 return error(this.res, -ERANGE);
422 }
423
[3f39009]424 // Done?
425 if(this.f.result == this.len) {
426 return done(this.res);
427 }
428
429SHORT_WRITE:
430 // It must be a Short Write
431 this.len -= this.f.result;
432 reset(this.f);
433 return retry(this.res);
434}
435
436int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize ) {
437 #if defined(LINKED_IO)
438 char buffer[512];
439 int len = fill_header(buffer, fsize);
440 header_g header = { fd, buffer, len };
441 splice_in_t splice_in = { ans_fd, pipe[1], fsize };
442 splice_out_g splice_out = { pipe[0], fd, fsize };
443
444 RETRY_LOOP: for() {
445 int have = need(header.res) + need(splice_in.res) + 1;
446 int idx = 0;
447 struct io_uring_sqe * sqes[3];
448 __u32 idxs[3];
449 struct $io_context * ctx = cfa_io_allocate(sqes, idxs, have);
450
451 if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); }
452 if(need( header.res)) { fill(header , sqes[idx++]); }
453 fill(splice_out, sqes[idx]);
454
455 // Submit everything
456 asm volatile("": : :"memory");
457 cfa_io_submit( ctx, idxs, have, false );
458
459 // wait for the results
460 // Always wait for splice-in to complete as
461 // we may need to kill the connection if it fails
462 // If it already completed, this is a no-op
463 wait_and_process(splice_in);
464
465 if(is_error(splice_in.res)) {
466 mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error;
467 close(fd);
468 }
469
470 // Process the other 2
471 wait_and_process(header);
472 wait_and_process(splice_out);
473
474 if(is_done(splice_out.res)) {
475 break RETRY_LOOP;
476 }
477
478 // We need to wait for the completion if
479 // - both completed
480 // - the header failed
481 // -
482
483 if( is_error(header.res)
484 || is_error(splice_in.res)
485 || is_error(splice_out.res)) {
486 return -ECONNRESET;
487 }
488 }
489
490 return len + fsize;
491 #else
492 int ret = answer_header(fd, fsize);
[42b7fa5f]493 if( ret < 0 ) { close(fd); return ret; }
[3f39009]494 return sendfile(pipe, fd, ans_fd, fsize);
495 #endif
496}
[ba77750]497
[4f762d3]498[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) {
[0aec496]499 char * it = buffer;
500 size_t count = len - 1;
501 int rlen = 0;
502 READ:
503 for() {
[2cd784a]504 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
[c3ee5f3]505 // int ret = read(fd, (void*)it, count);
[d11d6eb]506 if(ret == 0 ) return [OK200, true, 0, 0];
[0aec496]507 if(ret < 0 ) {
508 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ;
[3f39009]509 if( errno == ECONNRESET ) { close(fd); return [E408, true, 0, 0]; }
510 if( errno == EPIPE ) { close(fd); return [E408, true, 0, 0]; }
[0aec496]511 abort( "read error: (%d) %s\n", (int)errno, strerror(errno) );
512 }
513 it[ret + 1] = '\0';
514 rlen += ret;
515
516 if( strstr( it, "\r\n\r\n" ) ) break;
517
518 it += ret;
519 count -= ret;
520
[d11d6eb]521 if( count < 1 ) return [E414, false, 0, 0];
[0aec496]522 }
523
[8c43d05]524 if( options.log ) {
525 write(sout, buffer, rlen);
526 sout | nl;
527 }
[0aec496]528
529 it = buffer;
530 int ret = memcmp(it, "GET /", 5);
[d11d6eb]531 if( ret != 0 ) return [E400, false, 0, 0];
[0aec496]532 it += 5;
533
534 char * end = strstr( it, " " );
535 return [OK200, false, it, end - it];
[c82af9f]536}
537
[c3ee5f3]538//=============================================================================================
539
540#include <clock.hfa>
541#include <time.hfa>
542#include <thread.hfa>
543
[2caed18]544const char * original_http_msgs[] = {
[1db1454]545 "HTTP/1.1 200 OK\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: ",
[52d2545]546 "HTTP/1.1 200 OK\r\nServer: HttpForall\r\nConnection: keep-alive\r\nContent-Length: 15\r\nContent-Type: text/html\r\nDate: %s \r\n\r\nHello, World!\r\n",
[1db1454]547 "HTTP/1.1 400 Bad Request\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
548 "HTTP/1.1 404 Not Found\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
549 "HTTP/1.1 405 Method Not Allowed\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
550 "HTTP/1.1 408 Request Timeout\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
551 "HTTP/1.1 413 Payload Too Large\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
552 "HTTP/1.1 414 URI Too Long\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
[2caed18]553};
554
[c3ee5f3]555struct date_buffer {
[2caed18]556 https_msg_str strs[KNOWN_CODES];
[c3ee5f3]557};
558
559thread DateFormater {
560 int idx;
561 date_buffer buffers[2];
562};
563
564void ?{}( DateFormater & this ) {
[348f81d5]565 ((thread&)this){ "Server Date Thread", *options.clopts.instance[0] };
[c3ee5f3]566 this.idx = 0;
[2caed18]567 memset( &this.buffers[0], 0, sizeof(this.buffers[0]) );
568 memset( &this.buffers[1], 0, sizeof(this.buffers[1]) );
[c3ee5f3]569}
570
571void main(DateFormater & this) {
572 LOOP: for() {
573 waitfor( ^?{} : this) {
574 break LOOP;
575 }
576 or else {}
577
[2caed18]578
579 char buff[100];
[e54d0c3]580 Time now = timeHiRes();
[2caed18]581 strftime( buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now );
[3f39009]582 // if( options.log ) sout | "Updated date to '" | buff | "'";
[ece0e80]583
[2caed18]584 for(i; KNOWN_CODES) {
585 size_t len = snprintf( this.buffers[this.idx].strs[i].msg, 512, original_http_msgs[i], buff );
586 this.buffers[this.idx].strs[i].len = len;
587 }
[c3ee5f3]588
[2caed18]589 for(i; KNOWN_CODES) {
590 https_msg_str * next = &this.buffers[this.idx].strs[i];
591 __atomic_exchange_n((https_msg_str * volatile *)&http_msgs[i], next, __ATOMIC_SEQ_CST);
592 }
[c3ee5f3]593 this.idx = (this.idx + 1) % 2;
594
[3f39009]595 // if( options.log ) sout | "Date thread sleeping";
[2cd784a]596
[c3ee5f3]597 sleep(1`s);
598 }
599}
600
601//=============================================================================================
602DateFormater * the_date_formatter;
603
604void init_protocol(void) {
605 the_date_formatter = alloc();
606 (*the_date_formatter){};
607}
608
609void deinit_protocol(void) {
610 ^(*the_date_formatter){};
611 free( the_date_formatter );
[e54d0c3]612}
Note: See TracBrowser for help on using the repository browser.