source: benchmark/io/http/http_ring.cpp@ d60d30e

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast-unique-expr pthread-emulation qualifiedEnum stuck-waitfor-destruct
Last change on this file since d60d30e was c235179, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Added the option to pre-accept in httpring, this seems to have a major impact.

  • Property mode set to 100644
File size: 23.2 KB
Line 
1#include <cstdio>
2#include <cstdlib>
3#include <cstring>
4
5#include <iostream>
6
7#include <signal.h>
8#include <unistd.h>
9#include <liburing.h>
10
11// #define NOBATCHING
12// #define USE_ASYNC
13
14// Options passed to each threads
15struct __attribute__((aligned(128))) options_t {
16 // Data passed to accept
17 struct {
18 int sockfd;
19 struct sockaddr *addr;
20 socklen_t *addrlen;
21 int flags;
22 unsigned cnt;
23 } acpt;
24
25 // Termination notification
26 int endfd;
27
28 // The ring to use for io
29 struct io_uring * ring;
30};
31
32//=========================================================
33// General statistics
34struct __attribute__((aligned(128))) stats_block_t {
35 struct {
36 volatile size_t conns = 0;
37 volatile size_t reads = 0;
38 volatile size_t writes = 0;
39 volatile size_t full_writes = 0;
40 } completions;
41
42 struct {
43 volatile size_t conns = 0;
44 struct {
45 volatile size_t pipes = 0;
46 volatile size_t reset = 0;
47 volatile size_t other = 0;
48 } requests;
49
50 struct {
51 volatile size_t pipes = 0;
52 volatile size_t reset = 0;
53 volatile size_t other = 0;
54 } answers;
55 } errors;
56
57 struct {
58 volatile size_t current = 0;
59 volatile size_t max = 0;
60 volatile size_t used = 0;
61 } conns;
62
63 volatile size_t recycle_errors = 0;
64};
65
66// Each thread gets its own block of stats
67// and there is a global block for tallying at the end
68thread_local stats_block_t stats;
69stats_block_t global_stats;
70
71// Get an array of current connections
72// This is just for debugging, to make sure
73// no two state-machines get the same fd
74const size_t array_max = 25000;
75class connection * volatile conns[array_max] = { 0 };
76
77// Max fd we've seen, keep track so it's convenient to adjust the array size after
78volatile int max_fd = 0;
79
80//=========================================================
81// Some small wrappers for ring operations used outside the connection state machine
82// get sqe + error handling
83static struct io_uring_sqe * get_sqe(struct io_uring * ring) {
84 struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
85 if(!sqe) {
86 std::cerr << "Insufficient entries in ring" << std::endl;
87 exit(EXIT_FAILURE);
88 }
89 return sqe;
90}
91
92// read of the event fd is not done by a connection
93// use nullptr as the user data
94static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) {
95 struct io_uring_sqe * sqe = get_sqe(ring);
96 io_uring_prep_read(sqe, fd, buffer, len, 0);
97 io_uring_sqe_set_data(sqe, nullptr);
98 io_uring_submit(ring);
99}
100
101//=========================================================
102// All answers are fixed and determined by the return code
103enum HttpCode {
104 OK200 = 0,
105 E400,
106 E404,
107 E405,
108 E408,
109 E413,
110 E414,
111 KNOWN_CODES
112};
113
114// Get a fix reply based on the return code
115const char * http_msgs[] = {
116 "HTTP/1.1 200 OK\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 15\r\nConnection: keep-alive\r\n\r\nHello, World!\r\n",
117 "HTTP/1.1 400 Bad Request\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
118 "HTTP/1.1 404 Not Found\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
119 "HTTP/1.1 405 Method Not \r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
120 "HTTP/1.1 408 Request Timeout\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
121 "HTTP/1.1 413 Payload Too Large\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
122 "HTTP/1.1 414 URI Too Long\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
123};
124static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0])) );
125
126// Pre-compute the length of these replys
127const size_t http_lens[] = {
128 strlen(http_msgs[0]),
129 strlen(http_msgs[1]),
130 strlen(http_msgs[2]),
131 strlen(http_msgs[3]),
132 strlen(http_msgs[4]),
133 strlen(http_msgs[5]),
134 strlen(http_msgs[6]),
135};
136static_assert( KNOWN_CODES == (sizeof(http_lens) / sizeof(http_lens[0])) );
137
138//=========================================================
139// Finate state machine responsible for handling each connection
140class __attribute__((aligned(128))) connection {
141private:
142 // The state of the machine
143 enum {
144 ACCEPTING, // Accept sent waiting for connection
145 REQUESTING, // Waiting for new request
146 ANSWERING, // Either request received submitting answer or short answer sent, need to submit rest
147 } state;
148
149 // The file descriptor of the connection
150 int fd;
151
152 // request data
153 static const size_t buffer_size = 1024; // Size of the read buffer
154 const char * buffer; // Buffer into which requests are read
155
156 // send data
157 size_t to_send; // Data left to send
158 const char * iterator; // Pointer to rest of the message to send
159
160 // stats
161 // how many requests/answers were complete, that is, a valid cqe was obtained
162 struct {
163 size_t requests = 0;
164 size_t answers = 0;
165 } stats;
166
167private:
168 connection()
169 : state(ACCEPTING)
170 , fd(0)
171 , buffer( new char[buffer_size])
172 , iterator(nullptr)
173 {}
174
175 ~connection() {
176 delete [] buffer;
177 ::stats.conns.current--;
178 }
179
180 // Close the current connection
181 void close(int err) {
182 // std::cout << "(" << this->stats.requests << "," << this->stats.answers << ", e" << err << ") ";
183 conns[fd] = nullptr;
184
185 if(fd != 0) {
186 ::close(fd);
187 }
188 delete this;
189 }
190
191 //--------------------------------------------------
192 // Wrappers for submit so we can tweak it more easily
193 static void submit(struct io_uring * ring, struct io_uring_sqe * sqe, connection * conn) {
194 (void)ring;
195 #ifdef USE_ASYNC
196 io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
197 #endif
198 io_uring_sqe_set_data(sqe, conn);
199 #ifdef NOBATCHING
200 io_uring_submit(ring);
201 #endif
202 }
203
204 void submit(struct io_uring * ring, struct io_uring_sqe * sqe) {
205 submit(ring, sqe, this);
206 }
207
208 //--------------------------------------------------
209 // get a new request from the client
210 void request(struct io_uring * ring) {
211 state = REQUESTING;
212 struct io_uring_sqe * sqe = get_sqe(ring);
213 io_uring_prep_recv(sqe, fd, (void*)buffer, buffer_size, 0);
214 submit(ring, sqe);
215 }
216
217 //--------------------------------------------------
218 // Send a new answer based on a return code
219 void answer(struct io_uring * ring, HttpCode code) {
220 iterator = http_msgs[code];
221 to_send = http_lens[code];
222 if(to_send != 124) {
223 std::cerr << "Answer has weird size: " << to_send << " (" << (int)code << ")" << std::endl;
224 }
225 answer(ring);
226 }
227
228 // send a new answer to the client
229 // Reused for incomplete writes
230 void answer(struct io_uring * ring) {
231 state = ANSWERING;
232 struct io_uring_sqe * sqe = get_sqe(ring);
233 io_uring_prep_send(sqe, fd, iterator, to_send, 0);
234 submit(ring, sqe);
235 }
236
237 //--------------------------------------------------
238 // Handle a new connection, results for getting an cqe while in the ACCEPTING state
239 void newconn(struct io_uring * ring, int ret) {
240 // Check errors
241 if( ret < 0 ) {
242 int err = -ret;
243 if( err == ECONNABORTED ) {
244 ::stats.errors.conns++;
245 this->close(err);
246 return;
247 }
248 std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
249 exit(EXIT_FAILURE);
250 }
251
252 // Count the connections
253 ::stats.completions.conns++;
254 ::stats.conns.current++;
255 if(::stats.conns.current > ::stats.conns.max) {
256 ::stats.conns.max = ::stats.conns.current;
257 }
258
259 // Read on the data
260 fd = ret;
261 request(ring);
262
263 // check the max fd so we know if we exceeded the array
264 for(;;) {
265 int expected = max_fd;
266 if(expected >= fd) return;
267 if( __atomic_compare_exchange_n(&max_fd, &expected, fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) return;
268 }
269
270 // check if we have enough space to fit inside the array
271 if(fd >= array_max) {
272 std::cerr << "accept error: fd " << fd << " is too high" << std::endl;
273 return;
274 }
275
276 // Put our connection into the global array
277 // No one else should be using it so if they are that's a bug
278 auto exist = __atomic_exchange_n( &conns[fd], this, __ATOMIC_SEQ_CST);
279 if( exist ) {
280 size_t first = __atomic_fetch_add(&global_stats.recycle_errors, 1, __ATOMIC_SEQ_CST);
281 if( first == 0 ) {
282 std::cerr << "First: accept has existing connection " << std::endl;
283 }
284 }
285 }
286
287 // Handle a new request, results for getting an cqe while in the REQUESTING state
288 void newrequest(struct io_uring * ring, int res) {
289 // Check errors
290 if( res < 0 ) {
291 int err = -res;
292 switch(err) {
293 case EPIPE:
294 ::stats.errors.requests.pipes++;
295 break;
296 // Don't fall through the get better stats
297 case ECONNRESET:
298 ::stats.errors.requests.reset++;
299 break;
300 default:
301 ::stats.errors.requests.other++;
302 std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl;
303 exit(EXIT_FAILURE);
304 }
305
306 // Connection failed, close it
307 this->close(err);
308 return;
309 }
310
311 // Update stats
312 ::stats.completions.reads++;
313
314 // Is this an EOF
315 if(res == 0) {
316 // Yes, close the connection
317 this->close(0);
318 return;
319 }
320
321 // Find the end of the request header
322 const char * it = buffer;
323 if( !strstr( it, "\r\n\r\n" ) ) {
324 // This state machine doesn't support incomplete reads
325 // Print them to output so it's clear there is an issue
326 std::cout << "Incomplete request" << std::endl;
327 this->close(EBADR);
328 return;
329 }
330
331 // Find the method to use
332 it = buffer;
333 int ret = memcmp(it, "GET ", 4);
334 if( ret != 0 ) {
335 // We only support get, answer with an error
336 answer(ring, E400);
337 return;
338 }
339
340 // Find the target
341 it += 4;
342 ret = memcmp(it, "/plaintext", 10);
343 if( ret != 0 ) {
344 // We only support /plaintext, answer with an error
345 answer(ring, E404);
346 return;
347 }
348
349 // Correct request, answer with the payload
350 this->stats.requests++;
351 answer(ring, OK200);
352 }
353
354 // Handle a partial or full answer sent, results for getting an cqe while in the ANSWERING state
355 void writedone(struct io_uring * ring, int res) {
356 // Check errors
357 if( res < 0 ) {
358 int err = -res;
359 switch(err) {
360 case EPIPE:
361 ::stats.errors.answers.pipes++;
362 break;
363 // Don't fall through the get better stats
364 case ECONNRESET:
365 ::stats.errors.answers.reset++;
366 break;
367 default:
368 ::stats.errors.answers.other++;
369 std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
370 exit(EXIT_FAILURE);
371 }
372
373 this->close(err);
374 return;
375 }
376
377 // Update stats
378 ::stats.completions.writes++;
379 if(res == 124) ::stats.completions.full_writes++;
380
381 // Is this write completed
382 if( res == to_send ) {
383 // Yes, more stats
384 this->stats.answers++;
385 if(this->stats.answers == 1) ::stats.conns.used++;
386 // Then read a new request
387 request(ring);
388 return;
389 }
390
391 // Not a completed read, push the rest
392 to_send -= res;
393 iterator += res;
394 answer(ring);
395 }
396public:
397 // Submit a call to accept and create a new connection object
398 static void accept(struct io_uring * ring, const struct options_t & opt) {
399 struct io_uring_sqe * sqe = get_sqe(ring);
400 io_uring_prep_accept(sqe, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
401 submit(ring, sqe, new connection());
402 // std::cout << "Submitted accept: " << req << std::endl;
403 }
404
405 // Handle a new cqe
406 void handle(struct io_uring * ring, int res, const struct options_t & opt) {
407 switch(state) {
408 case ACCEPTING:
409 connection::accept(ring, opt);
410 newconn(ring, res);
411 break;
412 case REQUESTING:
413 newrequest(ring, res);
414 break;
415 case ANSWERING:
416 writedone(ring, res);
417 break;
418 }
419 }
420};
421
422//=========================================================
423// Main loop of the WebServer
424// Effectively uses one thread_local copy of everything per kernel thread
425void * proc_loop(void * arg) {
426 // Get the thread local argument
427 struct options_t & opt = *(struct options_t *)arg;
428 struct io_uring * ring = opt.ring;
429
430 // Track the shutdown using a event_fd
431 char endfd_buf[8];
432 ring_end(ring, opt.endfd, endfd_buf, 8);
433
434 // Accept our first connection
435 // May not take effect until io_uring_submit_and_wait
436 for(unsigned i = 0; i < opt.acpt.cnt; i++) {
437 connection::accept(ring, opt);
438 }
439
440 int reset = 1; // Counter to print stats once in a while
441 bool done = false; // Are we done
442 size_t sqes = 0; // Number of sqes we submitted
443 size_t call = 0; // Number of submits we made
444 while(!done) {
445 // Submit all the answers we have and wait for responses
446 int ret = io_uring_submit_and_wait(ring, 1);
447
448 // check errors
449 if (ret < 0) {
450 fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );
451 exit(EXIT_FAILURE);
452 }
453
454 // Check how good we are at batching sqes
455 sqes += ret;
456 call++;
457
458 struct io_uring_cqe *cqe;
459 unsigned head;
460 unsigned count = 0;
461
462 // go through all cqes
463 io_uring_for_each_cqe(ring, head, cqe) {
464 if (0 == cqe->user_data) {
465 done = true;
466 break;
467 }
468
469 auto req = (class connection *)cqe->user_data;
470 req->handle( ring, cqe->res, opt );
471
472 // Every now and then, print some stats
473 reset--;
474 if(reset == 0) {
475 std::cout << "Submit average: " << sqes << "/" << call << "(" << (((double)sqes) / call) << ")" << std::endl;
476 // Reset to some random number of completions
477 // use the ring_fd in the number of threads don't all print at once
478 reset = 100000 + (100000 * (ring->ring_fd % 5));
479 }
480
481 // Keep track of how many cqes we have seen
482 count++;
483 }
484
485 // Mark the cqes as seen
486 io_uring_cq_advance(ring, count);
487 }
488
489 // Tally all the thread local statistics
490 __atomic_fetch_add( &global_stats.completions.conns, ::stats.completions.conns, __ATOMIC_SEQ_CST );
491 __atomic_fetch_add( &global_stats.completions.reads, ::stats.completions.reads, __ATOMIC_SEQ_CST );
492 __atomic_fetch_add( &global_stats.completions.writes, ::stats.completions.writes, __ATOMIC_SEQ_CST );
493 __atomic_fetch_add( &global_stats.completions.full_writes, ::stats.completions.full_writes, __ATOMIC_SEQ_CST );
494 __atomic_fetch_add( &global_stats.errors.conns, ::stats.errors.conns, __ATOMIC_SEQ_CST );
495 __atomic_fetch_add( &global_stats.errors.requests.pipes, ::stats.errors.requests.pipes, __ATOMIC_SEQ_CST );
496 __atomic_fetch_add( &global_stats.errors.requests.reset, ::stats.errors.requests.reset, __ATOMIC_SEQ_CST );
497 __atomic_fetch_add( &global_stats.errors.requests.other, ::stats.errors.requests.other, __ATOMIC_SEQ_CST );
498 __atomic_fetch_add( &global_stats.errors.answers.pipes, ::stats.errors.answers.pipes, __ATOMIC_SEQ_CST );
499 __atomic_fetch_add( &global_stats.errors.answers.reset, ::stats.errors.answers.reset, __ATOMIC_SEQ_CST );
500 __atomic_fetch_add( &global_stats.errors.answers.other, ::stats.errors.answers.other, __ATOMIC_SEQ_CST );
501 __atomic_fetch_add( &global_stats.conns.current, ::stats.conns.current, __ATOMIC_SEQ_CST );
502 __atomic_fetch_add( &global_stats.conns.max, ::stats.conns.max, __ATOMIC_SEQ_CST );
503 __atomic_fetch_add( &global_stats.conns.used, ::stats.conns.used, __ATOMIC_SEQ_CST );
504
505 return nullptr;
506}
507
508//=========================================================
509#include <bit> // for ispow2
510
511extern "C" {
512 #include <pthread.h> // for pthreads
513 #include <signal.h> // for signal(SIGPIPE, SIG_IGN);
514 #include <sys/eventfd.h> // use for termination
515 #include <sys/socket.h> // for sockets in general
516 #include <netinet/in.h> // for sockaddr_in, AF_INET
517}
518
519int main(int argc, char * argv[]) {
520 // Initialize the array of connection-fd associations
521 for(int i = 0; i < array_max; i++) {
522 conns[i] = nullptr;
523 }
524
525 // Make sure we ignore all sigpipes
526 signal(SIGPIPE, SIG_IGN);
527
528 // Default command line arguments
529 unsigned nthreads = 1; // number of kernel threads
530 unsigned port = 8800; // which port to listen on
531 unsigned entries = 256; // number of entries per ring/kernel thread
532 unsigned backlog = 262144; // backlog argument to listen
533 unsigned preaccept = 1; // start by accepting X per threads
534 bool attach = false; // Whether or not to attach all the rings
535 bool sqpoll = false; // Whether or not to use SQ Polling
536
537 //===================
538 // Arguments Parsing
539 int c;
540 while ((c = getopt (argc, argv, "t:p:e:b:c:aS")) != -1) {
541 switch (c)
542 {
543 case 't':
544 nthreads = atoi(optarg);
545 break;
546 case 'p':
547 port = atoi(optarg);
548 break;
549 case 'e':
550 entries = atoi(optarg);
551 break;
552 case 'b':
553 backlog = atoi(optarg);
554 break;
555 case 'c':
556 preaccept = atoi(optarg);
557 break;
558 case 'a':
559 attach = true;
560 break;
561 case 'S':
562 sqpoll = true;
563 break;
564 case '?':
565 default:
566 std::cerr << "Usage: -t <threads> -p <port> -e <entries> -b <backlog> -aS" << std::endl;
567 return EXIT_FAILURE;
568 }
569 }
570
571 if( !std::ispow2(entries) ) {
572 unsigned v = entries;
573 v--;
574 v |= v >> 1;
575 v |= v >> 2;
576 v |= v >> 4;
577 v |= v >> 8;
578 v |= v >> 16;
579 v++;
580 std::cerr << "Warning: num_entries not a power of 2 (" << entries << ") raising to " << v << std::endl;
581 entries = v;
582 }
583
584 //===================
585 // End FD
586 // Create a single event fd to notify the kernel threads when the server shutsdown
587 int efd = eventfd(0, EFD_SEMAPHORE);
588 if (efd < 0) {
589 std::cerr << "eventfd error: (" << errno << ") " << strerror(errno) << std::endl;
590 exit(EXIT_FAILURE);
591 }
592
593 //===================
594 // Open Socket
595 // Listen on specified port
596 std::cout << getpid() << " : Listening on port " << port << std::endl;
597 int server_fd = socket(AF_INET, SOCK_STREAM, 0);
598 if(server_fd < 0) {
599 std::cerr << "socket error: (" << errno << ") " << strerror(errno) << std::endl;
600 exit(EXIT_FAILURE);
601 }
602
603 int ret = 0;
604 struct sockaddr_in address;
605 int addrlen = sizeof(address);
606 memset( (char *)&address, '\0', addrlen );
607 address.sin_family = AF_INET;
608 address.sin_addr.s_addr = htonl(INADDR_ANY);
609 address.sin_port = htons( port );
610
611 // In case the port is already in use, don't just return an error
612 // Linux is very slow at reclaiming port so just retry regularly
613 int waited = 0;
614 while(true) {
615 ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) );
616 if(ret < 0) {
617 if(errno == EADDRINUSE) {
618 // Port is in used let's retry later
619 if(waited == 0) {
620 std::cerr << "Waiting for port" << std::endl;
621 } else {
622 // To be cure, print how long we have been waiting
623 std::cerr << "\r" << waited;
624 std::cerr.flush();
625 }
626 waited ++;
627 usleep( 1000000 ); // Wait and retry
628 continue;
629 }
630 // Some other error occured, this is a real error
631 std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl;
632 exit(EXIT_FAILURE);
633 }
634 break;
635 }
636
637 ret = listen( server_fd, backlog );
638 if(ret < 0) {
639 std::cerr << "listen error: (" << errno << ") " << strerror(errno) << std::endl;
640 exit(EXIT_FAILURE);
641 }
642
643 //===================
644 // Run Server Threads
645 std::cout << "Starting " << nthreads << " Threads";
646 if(attach) {
647 std::cout << " with attached Rings";
648 }
649 std::cout << std::endl;
650
651 // Create the desired number of kernel-threads and for each
652 // create a ring. Create the rings in the main so we can attach them
653 // Since the rings are all in a dense VLA, aligned them so we don't get false sharing
654 // it's unlikely but better safe than sorry
655 struct __attribute__((aligned(128))) aligned_ring {
656 struct io_uring storage;
657 };
658 aligned_ring thrd_rings[nthreads];
659 pthread_t thrd_hdls[nthreads];
660 options_t thrd_opts[nthreads];
661 bool no_drops = true;
662 bool fast_poll = true;
663 bool nfix_sqpl = true;
664 for(unsigned i = 0; i < nthreads; i++) {
665 struct io_uring_params p = { };
666
667 if(sqpoll) { // If sqpoll is on, add the flag
668 p.flags |= IORING_SETUP_SQPOLL;
669 p.sq_thread_idle = 100;
670 }
671
672 if (attach && i != 0) { // If attach is on, add the flag, except for the first ring
673 p.flags |= IORING_SETUP_ATTACH_WQ;
674 p.wq_fd = thrd_rings[0].storage.ring_fd;
675 }
676
677 // Create the ring
678 io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p);
679
680 // Check if some of the note-worthy features are there
681 if(0 == (p.features & IORING_FEAT_NODROP )) { no_drops = false; }
682 if(0 == (p.features & IORING_FEAT_FAST_POLL )) { fast_poll = false; }
683 if(0 == (p.features & IORING_FEAT_SQPOLL_NONFIXED)) { nfix_sqpl = false; }
684
685 // Write the socket options we want to the options we pass to the threads
686 thrd_opts[i].acpt.sockfd = server_fd;
687 thrd_opts[i].acpt.addr = (struct sockaddr *)&address;
688 thrd_opts[i].acpt.addrlen = (socklen_t*)&addrlen;
689 thrd_opts[i].acpt.flags = 0;
690 thrd_opts[i].acpt.cnt = preaccept;
691 thrd_opts[i].endfd = efd;
692 thrd_opts[i].ring = &thrd_rings[i].storage;
693
694 int ret = pthread_create(&thrd_hdls[i], nullptr, proc_loop, &thrd_opts[i]);
695 if (ret < 0) {
696 std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
697 exit(EXIT_FAILURE);
698 }
699 }
700
701 // Tell the user if the features are present
702 if( no_drops ) std::cout << "No Drop Present" << std::endl;
703 if( fast_poll) std::cout << "Fast Poll Present" << std::endl;
704 if(!nfix_sqpl) std::cout << "Non-Fixed SQ Poll not Present" << std::endl;
705
706 //===================
707 // Server Started
708 std::cout << "Server Started" << std::endl;
709 {
710 char buffer[128];
711 int ret;
712 do {
713 // Wait for a Ctrl-D to close the server
714 ret = read(STDIN_FILENO, buffer, 128);
715 if(ret < 0) {
716 std::cerr << "main read error: (" << errno << ") " << strerror(errno) << std::endl;
717 exit(EXIT_FAILURE);
718 }
719 else if(ret > 0) {
720 std::cout << "User inputed '";
721 std::cout.write(buffer, ret);
722 std::cout << "'" << std::endl;
723 }
724 } while(ret != 0);
725
726 std::cout << "Shutdown received" << std::endl;
727 }
728
729 //===================
730 // Use eventfd_write to tell the threads we are closing
731 (std::cout << "Sending Shutdown to Threads... ").flush();
732 ret = eventfd_write(efd, nthreads);
733 if (ret < 0) {
734 std::cerr << "eventfd close error: (" << errno << ") " << strerror(errno) << std::endl;
735 exit(EXIT_FAILURE);
736 }
737 std::cout << "done" << std::endl;
738
739 //===================
740 // Join all the threads and close the rings
741 (std::cout << "Stopping Threads Done... ").flush();
742 for(unsigned i = 0; i < nthreads; i++) {
743 void * retval;
744 int ret = pthread_join(thrd_hdls[i], &retval);
745 if (ret < 0) {
746 std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
747 exit(EXIT_FAILURE);
748 }
749
750 io_uring_queue_exit(thrd_opts[i].ring);
751 }
752 std::cout << "done" << std::endl;
753
754 //===================
755 // Close the sockets
756 (std::cout << "Closing Socket... ").flush();
757 ret = shutdown( server_fd, SHUT_RD );
758 if( ret < 0 ) {
759 std::cerr << "shutdown socket error: (" << errno << ") " << strerror(errno) << std::endl;
760 exit(EXIT_FAILURE);
761 }
762
763 ret = close(server_fd);
764 if (ret < 0) {
765 std::cerr << "close socket error: (" << errno << ") " << strerror(errno) << std::endl;
766 exit(EXIT_FAILURE);
767 }
768 std::cout << "done" << std::endl << std::endl;
769
770 // Print stats and exit
771 std::cout << "Errors: " << global_stats.errors.conns << "c, (" << global_stats.errors.requests.pipes << "p, " << global_stats.errors.requests.reset << "r, " << global_stats.errors.requests.other << "o" << ")r, (" << global_stats.errors.answers.pipes << "p, " << global_stats.errors.answers.reset << "r, " << global_stats.errors.answers.other << "o" << ")a" << std::endl;
772 std::cout << "Completions: " << global_stats.completions.conns << "c, " << global_stats.completions.reads << "r, " << global_stats.completions.writes << "w" << std::endl;
773 std::cout << "Full Writes: " << global_stats.completions.full_writes << std::endl;
774 std::cout << "Max FD: " << max_fd << std::endl;
775 std::cout << "Successful connections: " << global_stats.conns.used << std::endl;
776 std::cout << "Max concurrent connections: " << global_stats.conns.max << std::endl;
777 std::cout << "Accepts on non-zeros: " << global_stats.recycle_errors << std::endl;
778 std::cout << "Leaked conn objects: " << global_stats.conns.current << std::endl;
779}
780
781// compile-command: "g++ http_ring.cpp -std=c++2a -pthread -luring -O3" //
Note: See TracBrowser for help on using the repository browser.