source: benchmark/io/http/http_ring.cpp@ c05c58f

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast-unique-expr pthread-emulation qualifiedEnum stuck-waitfor-destruct
Last change on this file since c05c58f was c05c58f, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Minor improvements to http_ring

  • Property mode set to 100644
File size: 14.7 KB
Line 
1#include <cstdio>
2#include <cstdlib>
3#include <cstring>
4
5#include <iostream>
6
7#include <signal.h>
8#include <unistd.h>
9#include <liburing.h>
10
11typedef enum {
12 EVENT_END,
13 EVENT_ACCEPT,
14 EVENT_REQUEST,
15 EVENT_ANSWER
16} event_t;
17
18struct __attribute__((aligned(128))) request_t {
19 event_t type;
20 int fd;
21 size_t length;
22 char * buff;
23 char data[0];
24
25 static struct request_t * create(event_t type, size_t extra) {
26 auto ret = (struct request_t *)malloc(sizeof(struct request_t) + extra);
27 ret->type = type;
28 ret->length = extra;
29 ret->buff = ret->data;
30 return ret;
31 }
32
33 static struct request_t * create(event_t type) {
34 return create(type, 0);
35 }
36};
37
38struct __attribute__((aligned(128))) options_t {
39 struct {
40 int sockfd;
41 struct sockaddr *addr;
42 socklen_t *addrlen;
43 int flags;
44 } acpt;
45
46 int endfd;
47 struct io_uring * ring;
48
49 struct {
50 size_t subs = 0;
51 size_t cnts = 0;
52 } result;
53};
54
55volatile size_t total = 0;
56volatile size_t count = 0;
57
58//=========================================================
59static struct io_uring_sqe * get_sqe(struct io_uring * ring) {
60 struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
61 if(!sqe) {
62 std::cerr << "Insufficient entries in ring" << std::endl;
63 exit(EXIT_FAILURE);
64 }
65 return sqe;
66}
67
68static void submit(struct io_uring * ) {
69 // io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
70 // io_uring_submit(ring);
71}
72
73//=========================================================
74static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) {
75 struct io_uring_sqe * sqe = get_sqe(ring);
76 io_uring_prep_read(sqe, fd, buffer, len, 0);
77 io_uring_sqe_set_data(sqe, request_t::create(EVENT_END));
78 submit(ring);
79}
80
81static void ring_accept(struct io_uring * ring, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
82 auto req = request_t::create(EVENT_ACCEPT);
83 struct io_uring_sqe * sqe = get_sqe(ring);
84 io_uring_prep_accept(sqe, sockfd, addr, addrlen, flags);
85 io_uring_sqe_set_data(sqe, req);
86 submit(ring);
87 // std::cout << "Submitted accept: " << req << std::endl;
88}
89
90static void ring_request(struct io_uring * ring, int fd) {
91 size_t size = 1024;
92 auto req = request_t::create(EVENT_REQUEST, size);
93 req->fd = fd;
94
95 struct io_uring_sqe * sqe = get_sqe(ring);
96 io_uring_prep_read(sqe, fd, req->buff, size, 0);
97 io_uring_sqe_set_data(sqe, req);
98 submit(ring);
99 // std::cout << "Submitted request: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
100}
101
102//=========================================================
103enum HttpCode {
104 OK200 = 0,
105 E400,
106 E404,
107 E405,
108 E408,
109 E413,
110 E414,
111 KNOWN_CODES
112};
113
114const char * http_msgs[] = {
115 "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: %zu \n\n%s",
116 "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
117 "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
118 "HTTP/1.1 405 Method Not Allowed\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
119 "HTTP/1.1 408 Request Timeout\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
120 "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
121 "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
122};
123
124static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0])));
125
126const int http_codes[] = {
127 200,
128 400,
129 404,
130 405,
131 408,
132 413,
133 414,
134};
135
136static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0])));
137
138int code_val(HttpCode code) {
139 return http_codes[code];
140}
141
142static void ring_answer(struct io_uring * ring, int fd, HttpCode code) {
143 size_t size = 256;
144 auto req = request_t::create(EVENT_ANSWER, size);
145 req->fd = fd;
146
147 const char * fmt = http_msgs[code];
148 const char * date = "";
149 size = snprintf(req->buff, size, fmt, date, size);
150
151 struct io_uring_sqe * sqe = get_sqe(ring);
152 io_uring_prep_write(sqe, fd, req->buff, size, 0);
153 io_uring_sqe_set_data(sqe, req);
154 submit(ring);
155 // std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
156}
157
158static void ring_answer(struct io_uring * ring, int fd, const std::string &) {
159 // size_t size = 256;
160 // auto req = request_t::create(EVENT_ANSWER, size);
161 // req->fd = fd;
162
163 // const char * fmt = http_msgs[OK200];
164 // const char * date = "";
165 // size_t len = snprintf(req->buffer, size, fmt, date, ans.size(), ans.c_str());
166 // req->length = len;
167
168 // struct io_uring_sqe * sqe = get_sqe(ring);
169 // io_uring_prep_write(sqe, fd, req->buffer, len, 0);
170 // io_uring_sqe_set_data(sqe, req);
171 // submit(ring);
172 // std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
173
174
175 static const char* RESPONSE = "HTTP/1.1 200 OK\r\n" \
176 "Content-Length: 15\r\n" \
177 "Content-Type: text/html\r\n" \
178 "Connection: keep-alive\r\n" \
179 "Server: testserver\r\n" \
180 "\r\n" \
181 "Hello, World!\r\n";
182
183 static const size_t RLEN = strlen(RESPONSE);
184
185 size_t size = 256;
186 auto req = request_t::create(EVENT_ANSWER, size);
187 req->fd = fd;
188 req->buff = (char*)RESPONSE;
189 req->length = RLEN;
190
191 // const char * fmt = http_msgs[OK200];
192 // const char * date = "";
193 // size_t len = snprintf(req->buffer, size, fmt, date, ans.size(), ans.c_str());
194 // req->length = len;
195
196 struct io_uring_sqe * sqe = get_sqe(ring);
197 io_uring_prep_write(sqe, fd, RESPONSE, RLEN, 0);
198 io_uring_sqe_set_data(sqe, req);
199 submit(ring);
200}
201
202//=========================================================
203static void handle_new_conn(struct io_uring * ring, int fd) {
204 if( fd < 0 ) {
205 int err = -fd;
206 if( err == ECONNABORTED ) return;
207 std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
208 exit(EXIT_FAILURE);
209 }
210
211 ring_request(ring, fd);
212}
213
214static void handle_request(struct io_uring * ring, struct request_t * in, int res) {
215 if( res < 0 ) {
216 int err = -res;
217 switch(err) {
218 case EPIPE:
219 case ECONNRESET:
220 close(in->fd);
221 free(in);
222 return;
223 default:
224 std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl;
225 exit(EXIT_FAILURE);
226 }
227 }
228
229 if(res == 0) {
230 close(in->fd);
231 free(in);
232 return;
233 }
234
235 const char * it = in->buff;
236 if( !strstr( it, "\r\n\r\n" ) ) {
237 std::cout << "Incomplete request" << std::endl;
238 close(in->fd);
239 free(in);
240 return;
241 }
242
243 it = in->buff;
244 const std::string reply = "Hello, World!\n";
245 int ret = memcmp(it, "GET ", 4);
246 if( ret != 0 ) {
247 ring_answer(ring, in->fd, E400);
248 goto NEXT;
249 }
250
251 it += 4;
252 ret = memcmp(it, "/plaintext", 10);
253 if( ret != 0 ) {
254 ring_answer(ring, in->fd, E404);
255 goto NEXT;
256 }
257
258 ring_answer(ring, in->fd, reply);
259
260 NEXT:
261 ring_request(ring, in->fd);
262 free(in);
263 return;
264}
265
266static void handle_answer(struct io_uring * ring, struct request_t * in, int res) {
267 if( res < 0 ) {
268 int err = -res;
269 switch(err) {
270 case EPIPE:
271 case ECONNRESET:
272 close(in->fd);
273 free(in);
274 return;
275 default:
276 std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
277 exit(EXIT_FAILURE);
278 }
279 }
280
281 if( res >= in->length ) {
282 free(in);
283 return;
284 }
285
286 struct io_uring_sqe * sqe = get_sqe(ring);
287 io_uring_prep_write(sqe, in->fd, in->buff + res, in->length - res, 0);
288 io_uring_sqe_set_data(sqe, in);
289 submit(ring);
290 // std::cout << "Re-Submitted request: " << in << " (" << (void*)in->buffer << ")"<<std::endl;
291
292 ring_request(ring, in->fd);
293}
294
295//=========================================================
296extern "C" {
297extern int __io_uring_flush_sq(struct io_uring *ring);
298}
299
300void * proc_loop(void * arg) {
301 size_t count = 0;
302 struct options_t & opt = *(struct options_t *)arg;
303
304 struct io_uring * ring = opt.ring;
305
306 char endfd_buf[8];
307 ring_end(ring, opt.endfd, endfd_buf, 8);
308
309 ring_accept(ring, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
310
311 int reset = 1;
312 bool done = false;
313 while(!done) {
314 struct io_uring_cqe *cqe;
315 int ret;
316 while(-EAGAIN == (ret = io_uring_wait_cqe_nr(ring, &cqe, 0))) {
317 ret = io_uring_submit_and_wait(ring, 1);
318 if (ret < 0) {
319 fprintf( stderr, "io_uring get error: (%d) %s\n", (int)-ret, strerror(-ret) );
320 exit(EXIT_FAILURE);
321 }
322 opt.result.subs += ret;
323 opt.result.cnts++;
324 }
325
326 if (ret < 0 && -EAGAIN != ret) {
327 fprintf( stderr, "io_uring peek error: (%d) %s\n", (int)-ret, strerror(-ret) );
328 exit(EXIT_FAILURE);
329 }
330
331 auto req = (struct request_t *)cqe->user_data;
332 // std::cout << req << " completed with " << cqe->res << std::endl;
333
334 switch(req->type) {
335 case EVENT_END:
336 done = true;
337 break;
338 case EVENT_ACCEPT:
339 handle_new_conn(ring, cqe->res);
340 free(req);
341 ring_accept(ring, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
342 break;
343 case EVENT_REQUEST:
344 handle_request(ring, req, cqe->res);
345 break;
346 case EVENT_ANSWER:
347 handle_answer(ring, req, cqe->res);
348 break;
349 }
350
351 io_uring_cqe_seen(ring, cqe);
352 reset--;
353 if(reset == 0) {
354 size_t ltotal = opt.result.subs;
355 size_t lcount = opt.result.cnts;
356
357 std::cout << "Submit average: " << ltotal << "/" << lcount << "(" << (((double)ltotal) / lcount) << ")" << std::endl;
358 reset = 100000 + (100000 * (ring->ring_fd % 5));
359 }
360 }
361
362 return (void*)count;
363}
364
365//=========================================================
366struct __attribute__((aligned(128))) aligned_ring {
367 struct io_uring storage;
368};
369
370#include <bit>
371
372#include <pthread.h>
373extern "C" {
374 #include <signal.h>
375 #include <sys/eventfd.h>
376 #include <sys/socket.h>
377 #include <netinet/in.h>
378}
379
380int main(int argc, char * argv[]) {
381 signal(SIGPIPE, SIG_IGN);
382
383 unsigned nthreads = 1;
384 unsigned port = 8800;
385 unsigned entries = 256;
386 unsigned backlog = 10;
387 bool attach = false;
388 bool sqpoll = false;
389
390 //===================
391 // Arguments
392 int c;
393 while ((c = getopt (argc, argv, "t:p:e:b:aS")) != -1) {
394 switch (c)
395 {
396 case 't':
397 nthreads = atoi(optarg);
398 break;
399 case 'p':
400 port = atoi(optarg);
401 break;
402 case 'e':
403 entries = atoi(optarg);
404 break;
405 case 'b':
406 backlog = atoi(optarg);
407 break;
408 case 'a':
409 attach = true;
410 break;
411 case 'S':
412 sqpoll = true;
413 break;
414 case '?':
415 default:
416 std::cerr << "Usage: -t <threads> -p <port> -e <entries> -b <backlog> -aS" << std::endl;
417 return EXIT_FAILURE;
418 }
419 }
420
421 if( !std::ispow2(entries) ) {
422 unsigned v = entries;
423 v--;
424 v |= v >> 1;
425 v |= v >> 2;
426 v |= v >> 4;
427 v |= v >> 8;
428 v |= v >> 16;
429 v++;
430 std::cerr << "Warning: num_entries not a power of 2 (" << entries << ") raising to " << v << std::endl;
431 entries = v;
432 }
433
434 //===================
435 // End FD
436 int efd = eventfd(0, EFD_SEMAPHORE);
437 if (efd < 0) {
438 std::cerr << "eventfd error: (" << errno << ") " << strerror(errno) << std::endl;
439 exit(EXIT_FAILURE);
440 }
441
442 //===================
443 // Open Socket
444 std::cout << getpid() << " : Listening on port " << port << std::endl;
445 int server_fd = socket(AF_INET, SOCK_STREAM, 0);
446 if(server_fd < 0) {
447 std::cerr << "socket error: (" << errno << ") " << strerror(errno) << std::endl;
448 exit(EXIT_FAILURE);
449 }
450
451 int ret = 0;
452 struct sockaddr_in address;
453 int addrlen = sizeof(address);
454 memset( (char *)&address, '\0', addrlen );
455 address.sin_family = AF_INET;
456 address.sin_addr.s_addr = htonl(INADDR_ANY);
457 address.sin_port = htons( port );
458
459 int waited = 0;
460 while(true) {
461 ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) );
462 if(ret < 0) {
463 if(errno == EADDRINUSE) {
464 if(waited == 0) {
465 std::cerr << "Waiting for port" << std::endl;
466 } else {
467 std::cerr << "\r" << waited;
468 std::cerr.flush();
469 }
470 waited ++;
471 usleep( 1000000 );
472 continue;
473 }
474 std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl;
475 exit(EXIT_FAILURE);
476 }
477 break;
478 }
479
480 ret = listen( server_fd, backlog );
481 if(ret < 0) {
482 std::cerr << "listen error: (" << errno << ") " << strerror(errno) << std::endl;
483 exit(EXIT_FAILURE);
484 }
485
486 //===================
487 // Run Server Threads
488 std::cout << "Starting " << nthreads << " Threads";
489 if(attach) {
490 std::cout << " with attached Rings";
491 }
492 std::cout << std::endl;
493
494 aligned_ring thrd_rings[nthreads];
495 pthread_t thrd_hdls[nthreads];
496 options_t thrd_opts[nthreads];
497 bool fast_poll = true;
498 bool nfix_sqpl = true;
499 for(unsigned i = 0; i < nthreads; i++) {
500 struct io_uring_params p = { };
501 if(sqpoll) {
502 p.flags |= IORING_SETUP_SQPOLL;
503 p.sq_thread_idle = 100;
504 }
505
506 if (attach && i != 0) {
507 p.flags |= IORING_SETUP_ATTACH_WQ;
508 p.wq_fd = thrd_rings[0].storage.ring_fd;
509 }
510 io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p);
511
512 if(0 == (p.features & IORING_FEAT_FAST_POLL )) { fast_poll = false; }
513 if(0 == (p.features & IORING_FEAT_SQPOLL_NONFIXED)) { nfix_sqpl = false; }
514
515 thrd_opts[i].acpt.sockfd = server_fd;
516 thrd_opts[i].acpt.addr = (struct sockaddr *)&address;
517 thrd_opts[i].acpt.addrlen = (socklen_t*)&addrlen;
518 thrd_opts[i].acpt.flags = 0;
519 thrd_opts[i].endfd = efd;
520 thrd_opts[i].ring = &thrd_rings[i].storage;
521
522 int ret = pthread_create(&thrd_hdls[i], nullptr, proc_loop, &thrd_opts[i]);
523 if (ret < 0) {
524 std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
525 exit(EXIT_FAILURE);
526 }
527 }
528
529 if( fast_poll) std::cout << "Fast Poll Present" << std::endl;
530 if(!nfix_sqpl) std::cout << "Non-Fixed SQ Poll not Present" << std::endl;
531
532 //===================
533 // Server Started
534 std::cout << "Server Started" << std::endl;
535 {
536 char buffer[128];
537 int ret;
538 do {
539 ret = read(STDIN_FILENO, buffer, 128);
540 if(ret < 0) {
541 std::cerr << "main read error: (" << errno << ") " << strerror(errno) << std::endl;
542 exit(EXIT_FAILURE);
543 }
544 else if(ret > 0) {
545 std::cout << "User inputed '";
546 std::cout.write(buffer, ret);
547 std::cout << "'" << std::endl;
548 }
549 } while(ret != 0);
550
551 std::cout << "Shutdown received" << std::endl;
552 }
553
554 //===================
555 (std::cout << "Sending Shutdown to Threads... ").flush();
556 ret = eventfd_write(efd, nthreads);
557 if (ret < 0) {
558 std::cerr << "eventfd close error: (" << errno << ") " << strerror(errno) << std::endl;
559 exit(EXIT_FAILURE);
560 }
561 std::cout << "done" << std::endl;
562
563 //===================
564 (std::cout << "Stopping Threads Done... ").flush();
565 for(unsigned i = 0; i < nthreads; i++) {
566 void * retval;
567 int ret = pthread_join(thrd_hdls[i], &retval);
568 if (ret < 0) {
569 std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
570 exit(EXIT_FAILURE);
571 }
572
573 io_uring_queue_exit(thrd_opts[i].ring);
574 }
575 std::cout << "done" << std::endl;
576
577 //===================
578 (std::cout << "Closing Socket... ").flush();
579 ret = shutdown( server_fd, SHUT_RD );
580 if( ret < 0 ) {
581 std::cerr << "shutdown socket error: (" << errno << ") " << strerror(errno) << std::endl;
582 exit(EXIT_FAILURE);
583 }
584
585 ret = close(server_fd);
586 if (ret < 0) {
587 std::cerr << "close socket error: (" << errno << ") " << strerror(errno) << std::endl;
588 exit(EXIT_FAILURE);
589 }
590 std::cout << "done" << std::endl;
591}
Note: See TracBrowser for help on using the repository browser.