source: benchmark/io/http/http_ring.cpp@ d23c0b2

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since d23c0b2 was eeb4866, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Changed read/write to send/recv to work around small bug in io_uring.
Some clean-up.

  • Property mode set to 100644
File size: 23.0 KB
Line 
1#include <cstdio>
2#include <cstdlib>
3#include <cstring>
4
5#include <iostream>
6
7#include <signal.h>
8#include <unistd.h>
9#include <liburing.h>
10
11// #define NOBATCHING
12// #define USE_ASYNC
13
14// Options passed to each threads
15struct __attribute__((aligned(128))) options_t {
16 // Data passed to accept
17 struct {
18 int sockfd;
19 struct sockaddr *addr;
20 socklen_t *addrlen;
21 int flags;
22 } acpt;
23
24 // Termination notification
25 int endfd;
26
27 // The ring to use for io
28 struct io_uring * ring;
29};
30
31//=========================================================
32// General statistics
33struct __attribute__((aligned(128))) stats_block_t {
34 struct {
35 volatile size_t conns = 0;
36 volatile size_t reads = 0;
37 volatile size_t writes = 0;
38 volatile size_t full_writes = 0;
39 } completions;
40
41 struct {
42 volatile size_t conns = 0;
43 struct {
44 volatile size_t pipes = 0;
45 volatile size_t reset = 0;
46 volatile size_t other = 0;
47 } requests;
48
49 struct {
50 volatile size_t pipes = 0;
51 volatile size_t reset = 0;
52 volatile size_t other = 0;
53 } answers;
54 } errors;
55
56 struct {
57 volatile size_t current = 0;
58 volatile size_t max = 0;
59 volatile size_t used = 0;
60 } conns;
61
62 volatile size_t recycle_errors = 0;
63};
64
65// Each thread gets its own block of stats
66// and there is a global block for tallying at the end
67thread_local stats_block_t stats;
68stats_block_t global_stats;
69
70// Get an array of current connections
71// This is just for debugging, to make sure
72// no two state-machines get the same fd
73const size_t array_max = 25000;
74class connection * volatile conns[array_max] = { 0 };
75
76// Max fd we've seen, keep track so it's convenient to adjust the array size after
77volatile int max_fd = 0;
78
79//=========================================================
80// Some small wrappers for ring operations used outside the connection state machine
81// get sqe + error handling
82static struct io_uring_sqe * get_sqe(struct io_uring * ring) {
83 struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
84 if(!sqe) {
85 std::cerr << "Insufficient entries in ring" << std::endl;
86 exit(EXIT_FAILURE);
87 }
88 return sqe;
89}
90
91// read of the event fd is not done by a connection
92// use nullptr as the user data
93static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) {
94 struct io_uring_sqe * sqe = get_sqe(ring);
95 io_uring_prep_read(sqe, fd, buffer, len, 0);
96 io_uring_sqe_set_data(sqe, nullptr);
97 io_uring_submit(ring);
98}
99
100//=========================================================
101// All answers are fixed and determined by the return code
102enum HttpCode {
103 OK200 = 0,
104 E400,
105 E404,
106 E405,
107 E408,
108 E413,
109 E414,
110 KNOWN_CODES
111};
112
113// Get a fix reply based on the return code
114const char * http_msgs[] = {
115 "HTTP/1.1 200 OK\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 15\r\nConnection: keep-alive\r\n\r\nHello, World!\r\n",
116 "HTTP/1.1 400 Bad Request\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
117 "HTTP/1.1 404 Not Found\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
118 "HTTP/1.1 405 Method Not \r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
119 "HTTP/1.1 408 Request Timeout\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
120 "HTTP/1.1 413 Payload Too Large\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
121 "HTTP/1.1 414 URI Too Long\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
122};
123static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0])) );
124
125// Pre-compute the length of these replys
126const size_t http_lens[] = {
127 strlen(http_msgs[0]),
128 strlen(http_msgs[1]),
129 strlen(http_msgs[2]),
130 strlen(http_msgs[3]),
131 strlen(http_msgs[4]),
132 strlen(http_msgs[5]),
133 strlen(http_msgs[6]),
134};
135static_assert( KNOWN_CODES == (sizeof(http_lens) / sizeof(http_lens[0])) );
136
137//=========================================================
138// Finate state machine responsible for handling each connection
139class __attribute__((aligned(128))) connection {
140private:
141 // The state of the machine
142 enum {
143 ACCEPTING, // Accept sent waiting for connection
144 REQUESTING, // Waiting for new request
145 ANSWERING, // Either request received submitting answer or short answer sent, need to submit rest
146 } state;
147
148 // The file descriptor of the connection
149 int fd;
150
151 // request data
152 static const size_t buffer_size = 1024; // Size of the read buffer
153 const char * buffer; // Buffer into which requests are read
154
155 // send data
156 size_t to_send; // Data left to send
157 const char * iterator; // Pointer to rest of the message to send
158
159 // stats
160 // how many requests/answers were complete, that is, a valid cqe was obtained
161 struct {
162 size_t requests = 0;
163 size_t answers = 0;
164 } stats;
165
166private:
167 connection()
168 : state(ACCEPTING)
169 , fd(0)
170 , buffer( new char[buffer_size])
171 , iterator(nullptr)
172 {}
173
174 ~connection() {
175 delete [] buffer;
176 ::stats.conns.current--;
177 }
178
179 // Close the current connection
180 void close(int err) {
181 // std::cout << "(" << this->stats.requests << "," << this->stats.answers << ", e" << err << ") ";
182 conns[fd] = nullptr;
183
184 if(fd != 0) {
185 ::close(fd);
186 }
187 delete this;
188 }
189
190 //--------------------------------------------------
191 // Wrappers for submit so we can tweak it more easily
192 static void submit(struct io_uring * ring, struct io_uring_sqe * sqe, connection * conn) {
193 (void)ring;
194 #ifdef USE_ASYNC
195 io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
196 #endif
197 io_uring_sqe_set_data(sqe, conn);
198 #ifdef NOBATCHING
199 io_uring_submit(ring);
200 #endif
201 }
202
203 void submit(struct io_uring * ring, struct io_uring_sqe * sqe) {
204 submit(ring, sqe, this);
205 }
206
207 //--------------------------------------------------
208 // get a new request from the client
209 void request(struct io_uring * ring) {
210 state = REQUESTING;
211 struct io_uring_sqe * sqe = get_sqe(ring);
212 io_uring_prep_recv(sqe, fd, (void*)buffer, buffer_size, 0);
213 submit(ring, sqe);
214 }
215
216 //--------------------------------------------------
217 // Send a new answer based on a return code
218 void answer(struct io_uring * ring, HttpCode code) {
219 iterator = http_msgs[code];
220 to_send = http_lens[code];
221 if(to_send != 124) {
222 std::cerr << "Answer has weird size: " << to_send << " (" << (int)code << ")" << std::endl;
223 }
224 answer(ring);
225 }
226
227 // send a new answer to the client
228 // Reused for incomplete writes
229 void answer(struct io_uring * ring) {
230 state = ANSWERING;
231 struct io_uring_sqe * sqe = get_sqe(ring);
232 io_uring_prep_send(sqe, fd, iterator, to_send, 0);
233 submit(ring, sqe);
234 }
235
236 //--------------------------------------------------
237 // Handle a new connection, results for getting an cqe while in the ACCEPTING state
238 void newconn(struct io_uring * ring, int ret) {
239 // Check errors
240 if( ret < 0 ) {
241 int err = -ret;
242 if( err == ECONNABORTED ) {
243 ::stats.errors.conns++;
244 this->close(err);
245 return;
246 }
247 std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
248 exit(EXIT_FAILURE);
249 }
250
251 // Count the connections
252 ::stats.completions.conns++;
253 ::stats.conns.current++;
254 if(::stats.conns.current > ::stats.conns.max) {
255 ::stats.conns.max = ::stats.conns.current;
256 }
257
258 // Read on the data
259 fd = ret;
260 request(ring);
261
262 // check the max fd so we know if we exceeded the array
263 for(;;) {
264 int expected = max_fd;
265 if(expected >= fd) return;
266 if( __atomic_compare_exchange_n(&max_fd, &expected, fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) return;
267 }
268
269 // check if we have enough space to fit inside the array
270 if(fd >= array_max) {
271 std::cerr << "accept error: fd " << fd << " is too high" << std::endl;
272 return;
273 }
274
275 // Put our connection into the global array
276 // No one else should be using it so if they are that's a bug
277 auto exist = __atomic_exchange_n( &conns[fd], this, __ATOMIC_SEQ_CST);
278 if( exist ) {
279 size_t first = __atomic_fetch_add(&global_stats.recycle_errors, 1, __ATOMIC_SEQ_CST);
280 if( first == 0 ) {
281 std::cerr << "First: accept has existing connection " << std::endl;
282 }
283 }
284 }
285
286 // Handle a new request, results for getting an cqe while in the REQUESTING state
287 void newrequest(struct io_uring * ring, int res) {
288 // Check errors
289 if( res < 0 ) {
290 int err = -res;
291 switch(err) {
292 case EPIPE:
293 ::stats.errors.requests.pipes++;
294 break;
295 // Don't fall through the get better stats
296 case ECONNRESET:
297 ::stats.errors.requests.reset++;
298 break;
299 default:
300 ::stats.errors.requests.other++;
301 std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl;
302 exit(EXIT_FAILURE);
303 }
304
305 // Connection failed, close it
306 this->close(err);
307 return;
308 }
309
310 // Update stats
311 ::stats.completions.reads++;
312
313 // Is this an EOF
314 if(res == 0) {
315 // Yes, close the connection
316 this->close(0);
317 return;
318 }
319
320 // Find the end of the request header
321 const char * it = buffer;
322 if( !strstr( it, "\r\n\r\n" ) ) {
323 // This state machine doesn't support incomplete reads
324 // Print them to output so it's clear there is an issue
325 std::cout << "Incomplete request" << std::endl;
326 this->close(EBADR);
327 return;
328 }
329
330 // Find the method to use
331 it = buffer;
332 int ret = memcmp(it, "GET ", 4);
333 if( ret != 0 ) {
334 // We only support get, answer with an error
335 answer(ring, E400);
336 return;
337 }
338
339 // Find the target
340 it += 4;
341 ret = memcmp(it, "/plaintext", 10);
342 if( ret != 0 ) {
343 // We only support /plaintext, answer with an error
344 answer(ring, E404);
345 return;
346 }
347
348 // Correct request, answer with the payload
349 this->stats.requests++;
350 answer(ring, OK200);
351 }
352
353 // Handle a partial or full answer sent, results for getting an cqe while in the ANSWERING state
354 void writedone(struct io_uring * ring, int res) {
355 // Check errors
356 if( res < 0 ) {
357 int err = -res;
358 switch(err) {
359 case EPIPE:
360 ::stats.errors.answers.pipes++;
361 break;
362 // Don't fall through the get better stats
363 case ECONNRESET:
364 ::stats.errors.answers.reset++;
365 break;
366 default:
367 ::stats.errors.answers.other++;
368 std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
369 exit(EXIT_FAILURE);
370 }
371
372 this->close(err);
373 return;
374 }
375
376 // Update stats
377 ::stats.completions.writes++;
378 if(res == 124) ::stats.completions.full_writes++;
379
380 // Is this write completed
381 if( res == to_send ) {
382 // Yes, more stats
383 this->stats.answers++;
384 if(this->stats.answers == 1) ::stats.conns.used++;
385 // Then read a new request
386 request(ring);
387 return;
388 }
389
390 // Not a completed read, push the rest
391 to_send -= res;
392 iterator += res;
393 answer(ring);
394 }
395public:
396 // Submit a call to accept and create a new connection object
397 static void accept(struct io_uring * ring, const struct options_t & opt) {
398 struct io_uring_sqe * sqe = get_sqe(ring);
399 io_uring_prep_accept(sqe, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
400 submit(ring, sqe, new connection());
401 // std::cout << "Submitted accept: " << req << std::endl;
402 }
403
404 // Handle a new cqe
405 void handle(struct io_uring * ring, int res, const struct options_t & opt) {
406 switch(state) {
407 case ACCEPTING:
408 connection::accept(ring, opt);
409 newconn(ring, res);
410 break;
411 case REQUESTING:
412 newrequest(ring, res);
413 break;
414 case ANSWERING:
415 writedone(ring, res);
416 break;
417 }
418 }
419};
420
421//=========================================================
422// Main loop of the WebServer
423// Effectively uses one thread_local copy of everything per kernel thread
424void * proc_loop(void * arg) {
425 // Get the thread local argument
426 struct options_t & opt = *(struct options_t *)arg;
427 struct io_uring * ring = opt.ring;
428
429 // Track the shutdown using a event_fd
430 char endfd_buf[8];
431 ring_end(ring, opt.endfd, endfd_buf, 8);
432
433 // Accept our first connection
434 // May not take effect until io_uring_submit_and_wait
435 connection::accept(ring, opt);
436
437 int reset = 1; // Counter to print stats once in a while
438 bool done = false; // Are we done
439 size_t sqes = 0; // Number of sqes we submitted
440 size_t call = 0; // Number of submits we made
441 while(!done) {
442 // Submit all the answers we have and wait for responses
443 int ret = io_uring_submit_and_wait(ring, 1);
444
445 // check errors
446 if (ret < 0) {
447 fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );
448 exit(EXIT_FAILURE);
449 }
450
451 // Check how good we are at batching sqes
452 sqes += ret;
453 call++;
454
455 struct io_uring_cqe *cqe;
456 unsigned head;
457 unsigned count = 0;
458
459 // go through all cqes
460 io_uring_for_each_cqe(ring, head, cqe) {
461 if (0 == cqe->user_data) {
462 done = true;
463 break;
464 }
465
466 auto req = (class connection *)cqe->user_data;
467 req->handle( ring, cqe->res, opt );
468
469 // Every now and then, print some stats
470 reset--;
471 if(reset == 0) {
472 std::cout << "Submit average: " << sqes << "/" << call << "(" << (((double)sqes) / call) << ")" << std::endl;
473 // Reset to some random number of completions
474 // use the ring_fd in the number of threads don't all print at once
475 reset = 100000 + (100000 * (ring->ring_fd % 5));
476 }
477
478 // Keep track of how many cqes we have seen
479 count++;
480 }
481
482 // Mark the cqes as seen
483 io_uring_cq_advance(ring, count);
484 }
485
486 // Tally all the thread local statistics
487 __atomic_fetch_add( &global_stats.completions.conns, ::stats.completions.conns, __ATOMIC_SEQ_CST );
488 __atomic_fetch_add( &global_stats.completions.reads, ::stats.completions.reads, __ATOMIC_SEQ_CST );
489 __atomic_fetch_add( &global_stats.completions.writes, ::stats.completions.writes, __ATOMIC_SEQ_CST );
490 __atomic_fetch_add( &global_stats.completions.full_writes, ::stats.completions.full_writes, __ATOMIC_SEQ_CST );
491 __atomic_fetch_add( &global_stats.errors.conns, ::stats.errors.conns, __ATOMIC_SEQ_CST );
492 __atomic_fetch_add( &global_stats.errors.requests.pipes, ::stats.errors.requests.pipes, __ATOMIC_SEQ_CST );
493 __atomic_fetch_add( &global_stats.errors.requests.reset, ::stats.errors.requests.reset, __ATOMIC_SEQ_CST );
494 __atomic_fetch_add( &global_stats.errors.requests.other, ::stats.errors.requests.other, __ATOMIC_SEQ_CST );
495 __atomic_fetch_add( &global_stats.errors.answers.pipes, ::stats.errors.answers.pipes, __ATOMIC_SEQ_CST );
496 __atomic_fetch_add( &global_stats.errors.answers.reset, ::stats.errors.answers.reset, __ATOMIC_SEQ_CST );
497 __atomic_fetch_add( &global_stats.errors.answers.other, ::stats.errors.answers.other, __ATOMIC_SEQ_CST );
498 __atomic_fetch_add( &global_stats.conns.current, ::stats.conns.current, __ATOMIC_SEQ_CST );
499 __atomic_fetch_add( &global_stats.conns.max, ::stats.conns.max, __ATOMIC_SEQ_CST );
500 __atomic_fetch_add( &global_stats.conns.used, ::stats.conns.used, __ATOMIC_SEQ_CST );
501
502 return nullptr;
503}
504
505//=========================================================
506#include <bit> // for ispow2
507
508extern "C" {
509 #include <pthread.h> // for pthreads
510 #include <signal.h> // for signal(SIGPIPE, SIG_IGN);
511 #include <sys/eventfd.h> // use for termination
512 #include <sys/socket.h> // for sockets in general
513 #include <netinet/in.h> // for sockaddr_in, AF_INET
514}
515
516int main(int argc, char * argv[]) {
517 // Initialize the array of connection-fd associations
518 for(int i = 0; i < array_max; i++) {
519 conns[i] = nullptr;
520 }
521
522 // Make sure we ignore all sigpipes
523 signal(SIGPIPE, SIG_IGN);
524
525 // Default command line arguments
526 unsigned nthreads = 1; // number of kernel threads
527 unsigned port = 8800; // which port to listen on
528 unsigned entries = 256; // number of entries per ring/kernel thread
529 unsigned backlog = 262144; // backlog argument to listen
530 bool attach = false; // Whether or not to attach all the rings
531 bool sqpoll = false; // Whether or not to use SQ Polling
532
533 //===================
534 // Arguments Parsing
535 int c;
536 while ((c = getopt (argc, argv, "t:p:e:b:aS")) != -1) {
537 switch (c)
538 {
539 case 't':
540 nthreads = atoi(optarg);
541 break;
542 case 'p':
543 port = atoi(optarg);
544 break;
545 case 'e':
546 entries = atoi(optarg);
547 break;
548 case 'b':
549 backlog = atoi(optarg);
550 break;
551 case 'a':
552 attach = true;
553 break;
554 case 'S':
555 sqpoll = true;
556 break;
557 case '?':
558 default:
559 std::cerr << "Usage: -t <threads> -p <port> -e <entries> -b <backlog> -aS" << std::endl;
560 return EXIT_FAILURE;
561 }
562 }
563
564 if( !std::ispow2(entries) ) {
565 unsigned v = entries;
566 v--;
567 v |= v >> 1;
568 v |= v >> 2;
569 v |= v >> 4;
570 v |= v >> 8;
571 v |= v >> 16;
572 v++;
573 std::cerr << "Warning: num_entries not a power of 2 (" << entries << ") raising to " << v << std::endl;
574 entries = v;
575 }
576
577 //===================
578 // End FD
579 // Create a single event fd to notify the kernel threads when the server shutsdown
580 int efd = eventfd(0, EFD_SEMAPHORE);
581 if (efd < 0) {
582 std::cerr << "eventfd error: (" << errno << ") " << strerror(errno) << std::endl;
583 exit(EXIT_FAILURE);
584 }
585
586 //===================
587 // Open Socket
588 // Listen on specified port
589 std::cout << getpid() << " : Listening on port " << port << std::endl;
590 int server_fd = socket(AF_INET, SOCK_STREAM, 0);
591 if(server_fd < 0) {
592 std::cerr << "socket error: (" << errno << ") " << strerror(errno) << std::endl;
593 exit(EXIT_FAILURE);
594 }
595
596 int ret = 0;
597 struct sockaddr_in address;
598 int addrlen = sizeof(address);
599 memset( (char *)&address, '\0', addrlen );
600 address.sin_family = AF_INET;
601 address.sin_addr.s_addr = htonl(INADDR_ANY);
602 address.sin_port = htons( port );
603
604 // In case the port is already in use, don't just return an error
605 // Linux is very slow at reclaiming port so just retry regularly
606 int waited = 0;
607 while(true) {
608 ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) );
609 if(ret < 0) {
610 if(errno == EADDRINUSE) {
611 // Port is in used let's retry later
612 if(waited == 0) {
613 std::cerr << "Waiting for port" << std::endl;
614 } else {
615 // To be cure, print how long we have been waiting
616 std::cerr << "\r" << waited;
617 std::cerr.flush();
618 }
619 waited ++;
620 usleep( 1000000 ); // Wait and retry
621 continue;
622 }
623 // Some other error occured, this is a real error
624 std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl;
625 exit(EXIT_FAILURE);
626 }
627 break;
628 }
629
630 ret = listen( server_fd, backlog );
631 if(ret < 0) {
632 std::cerr << "listen error: (" << errno << ") " << strerror(errno) << std::endl;
633 exit(EXIT_FAILURE);
634 }
635
636 //===================
637 // Run Server Threads
638 std::cout << "Starting " << nthreads << " Threads";
639 if(attach) {
640 std::cout << " with attached Rings";
641 }
642 std::cout << std::endl;
643
644 // Create the desired number of kernel-threads and for each
645 // create a ring. Create the rings in the main so we can attach them
646 // Since the rings are all in a dense VLA, aligned them so we don't get false sharing
647 // it's unlikely but better safe than sorry
648 struct __attribute__((aligned(128))) aligned_ring {
649 struct io_uring storage;
650 };
651 aligned_ring thrd_rings[nthreads];
652 pthread_t thrd_hdls[nthreads];
653 options_t thrd_opts[nthreads];
654 bool no_drops = true;
655 bool fast_poll = true;
656 bool nfix_sqpl = true;
657 for(unsigned i = 0; i < nthreads; i++) {
658 struct io_uring_params p = { };
659
660 if(sqpoll) { // If sqpoll is on, add the flag
661 p.flags |= IORING_SETUP_SQPOLL;
662 p.sq_thread_idle = 100;
663 }
664
665 if (attach && i != 0) { // If attach is on, add the flag, except for the first ring
666 p.flags |= IORING_SETUP_ATTACH_WQ;
667 p.wq_fd = thrd_rings[0].storage.ring_fd;
668 }
669
670 // Create the ring
671 io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p);
672
673 // Check if some of the note-worthy features are there
674 if(0 == (p.features & IORING_FEAT_NODROP )) { no_drops = false; }
675 if(0 == (p.features & IORING_FEAT_FAST_POLL )) { fast_poll = false; }
676 if(0 == (p.features & IORING_FEAT_SQPOLL_NONFIXED)) { nfix_sqpl = false; }
677
678 // Write the socket options we want to the options we pass to the threads
679 thrd_opts[i].acpt.sockfd = server_fd;
680 thrd_opts[i].acpt.addr = (struct sockaddr *)&address;
681 thrd_opts[i].acpt.addrlen = (socklen_t*)&addrlen;
682 thrd_opts[i].acpt.flags = 0;
683 thrd_opts[i].endfd = efd;
684 thrd_opts[i].ring = &thrd_rings[i].storage;
685
686 int ret = pthread_create(&thrd_hdls[i], nullptr, proc_loop, &thrd_opts[i]);
687 if (ret < 0) {
688 std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
689 exit(EXIT_FAILURE);
690 }
691 }
692
693 // Tell the user if the features are present
694 if( no_drops ) std::cout << "No Drop Present" << std::endl;
695 if( fast_poll) std::cout << "Fast Poll Present" << std::endl;
696 if(!nfix_sqpl) std::cout << "Non-Fixed SQ Poll not Present" << std::endl;
697
698 //===================
699 // Server Started
700 std::cout << "Server Started" << std::endl;
701 {
702 char buffer[128];
703 int ret;
704 do {
705 // Wait for a Ctrl-D to close the server
706 ret = read(STDIN_FILENO, buffer, 128);
707 if(ret < 0) {
708 std::cerr << "main read error: (" << errno << ") " << strerror(errno) << std::endl;
709 exit(EXIT_FAILURE);
710 }
711 else if(ret > 0) {
712 std::cout << "User inputed '";
713 std::cout.write(buffer, ret);
714 std::cout << "'" << std::endl;
715 }
716 } while(ret != 0);
717
718 std::cout << "Shutdown received" << std::endl;
719 }
720
721 //===================
722 // Use eventfd_write to tell the threads we are closing
723 (std::cout << "Sending Shutdown to Threads... ").flush();
724 ret = eventfd_write(efd, nthreads);
725 if (ret < 0) {
726 std::cerr << "eventfd close error: (" << errno << ") " << strerror(errno) << std::endl;
727 exit(EXIT_FAILURE);
728 }
729 std::cout << "done" << std::endl;
730
731 //===================
732 // Join all the threads and close the rings
733 (std::cout << "Stopping Threads Done... ").flush();
734 for(unsigned i = 0; i < nthreads; i++) {
735 void * retval;
736 int ret = pthread_join(thrd_hdls[i], &retval);
737 if (ret < 0) {
738 std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
739 exit(EXIT_FAILURE);
740 }
741
742 io_uring_queue_exit(thrd_opts[i].ring);
743 }
744 std::cout << "done" << std::endl;
745
746 //===================
747 // Close the sockets
748 (std::cout << "Closing Socket... ").flush();
749 ret = shutdown( server_fd, SHUT_RD );
750 if( ret < 0 ) {
751 std::cerr << "shutdown socket error: (" << errno << ") " << strerror(errno) << std::endl;
752 exit(EXIT_FAILURE);
753 }
754
755 ret = close(server_fd);
756 if (ret < 0) {
757 std::cerr << "close socket error: (" << errno << ") " << strerror(errno) << std::endl;
758 exit(EXIT_FAILURE);
759 }
760 std::cout << "done" << std::endl << std::endl;
761
762 // Print stats and exit
763 std::cout << "Errors: " << global_stats.errors.conns << "c, (" << global_stats.errors.requests.pipes << "p, " << global_stats.errors.requests.reset << "r, " << global_stats.errors.requests.other << "o" << ")r, (" << global_stats.errors.answers.pipes << "p, " << global_stats.errors.answers.reset << "r, " << global_stats.errors.answers.other << "o" << ")a" << std::endl;
764 std::cout << "Completions: " << global_stats.completions.conns << "c, " << global_stats.completions.reads << "r, " << global_stats.completions.writes << "w" << std::endl;
765 std::cout << "Full Writes: " << global_stats.completions.full_writes << std::endl;
766 std::cout << "Max FD: " << max_fd << std::endl;
767 std::cout << "Successful connections: " << global_stats.conns.used << std::endl;
768 std::cout << "Max concurrent connections: " << global_stats.conns.max << std::endl;
769 std::cout << "Accepts on non-zeros: " << global_stats.recycle_errors << std::endl;
770 std::cout << "Leaked conn objects: " << global_stats.conns.current << std::endl;
771}
772
773// compile-command: "g++ http_ring.cpp -std=c++2a -pthread -luring -O3" //
Note: See TracBrowser for help on using the repository browser.