source: benchmark/io/http/http_ring.cpp@ fdfb0ba

ADT ast-experimental enum forall-pointer-decay pthread-emulation qualifiedEnum
Last change on this file since fdfb0ba was a80db97, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Http_ring now uses eventfd_read instead of io_uring_wait

  • Property mode set to 100644
File size: 23.9 KB
Line 
1#include <cstdio>
2#include <cstdlib>
3#include <cstring>
4
5#include <iostream>
6
7#include <signal.h>
8#include <unistd.h>
9#include <liburing.h>
10
11// #define NOBATCHING
12// #define USE_ASYNC
13
14// Options passed to each threads
15struct __attribute__((aligned(128))) options_t {
16 // Data passed to accept
17 struct {
18 int sockfd;
19 struct sockaddr *addr;
20 socklen_t *addrlen;
21 int flags;
22 unsigned cnt;
23 } acpt;
24
25 // Termination notification
26 int endfd;
27
28 // The ring to use for io
29 struct io_uring * ring;
30};
31
32//=========================================================
33// General statistics
34struct __attribute__((aligned(128))) stats_block_t {
35 struct {
36 volatile size_t conns = 0;
37 volatile size_t reads = 0;
38 volatile size_t writes = 0;
39 volatile size_t full_writes = 0;
40 } completions;
41
42 struct {
43 volatile size_t conns = 0;
44 struct {
45 volatile size_t pipes = 0;
46 volatile size_t reset = 0;
47 volatile size_t other = 0;
48 } requests;
49
50 struct {
51 volatile size_t pipes = 0;
52 volatile size_t reset = 0;
53 volatile size_t other = 0;
54 } answers;
55 } errors;
56
57 struct {
58 volatile size_t current = 0;
59 volatile size_t max = 0;
60 volatile size_t used = 0;
61 } conns;
62
63 volatile size_t recycle_errors = 0;
64};
65
66// Each thread gets its own block of stats
67// and there is a global block for tallying at the end
68thread_local stats_block_t stats;
69stats_block_t global_stats;
70
71thread_local struct __attribute__((aligned(128))) {
72 size_t to_submit = 0;
73} local;
74
75// Get an array of current connections
76// This is just for debugging, to make sure
77// no two state-machines get the same fd
78const size_t array_max = 25000;
79class connection * volatile conns[array_max] = { 0 };
80
81// Max fd we've seen, keep track so it's convenient to adjust the array size after
82volatile int max_fd = 0;
83
84//=========================================================
85// Some small wrappers for ring operations used outside the connection state machine
86// get sqe + error handling
87static struct io_uring_sqe * get_sqe(struct io_uring * ring) {
88 struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
89 if(!sqe) {
90 std::cerr << "Insufficient entries in ring" << std::endl;
91 exit(EXIT_FAILURE);
92 }
93 return sqe;
94}
95
96// read of the event fd is not done by a connection
97// use nullptr as the user data
98static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) {
99 struct io_uring_sqe * sqe = get_sqe(ring);
100 io_uring_prep_read(sqe, fd, buffer, len, 0);
101 io_uring_sqe_set_data(sqe, nullptr);
102 io_uring_submit(ring);
103}
104
105//=========================================================
106// All answers are fixed and determined by the return code
107enum HttpCode {
108 OK200 = 0,
109 E400,
110 E404,
111 E405,
112 E408,
113 E413,
114 E414,
115 KNOWN_CODES
116};
117
118// Get a fix reply based on the return code
119const char * http_msgs[] = {
120 "HTTP/1.1 200 OK\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 15\r\nConnection: keep-alive\r\n\r\nHello, World!\r\n",
121 "HTTP/1.1 400 Bad Request\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
122 "HTTP/1.1 404 Not Found\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
123 "HTTP/1.1 405 Method Not \r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
124 "HTTP/1.1 408 Request Timeout\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
125 "HTTP/1.1 413 Payload Too Large\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
126 "HTTP/1.1 414 URI Too Long\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
127};
128static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0])) );
129
130// Pre-compute the length of these replys
131const size_t http_lens[] = {
132 strlen(http_msgs[0]),
133 strlen(http_msgs[1]),
134 strlen(http_msgs[2]),
135 strlen(http_msgs[3]),
136 strlen(http_msgs[4]),
137 strlen(http_msgs[5]),
138 strlen(http_msgs[6]),
139};
140static_assert( KNOWN_CODES == (sizeof(http_lens) / sizeof(http_lens[0])) );
141
142//=========================================================
143// Finate state machine responsible for handling each connection
144class __attribute__((aligned(128))) connection {
145private:
146 // The state of the machine
147 enum {
148 ACCEPTING, // Accept sent waiting for connection
149 REQUESTING, // Waiting for new request
150 ANSWERING, // Either request received submitting answer or short answer sent, need to submit rest
151 } state;
152
153 // The file descriptor of the connection
154 int fd;
155
156 // request data
157 static const size_t buffer_size = 1024; // Size of the read buffer
158 const char * buffer; // Buffer into which requests are read
159
160 // send data
161 size_t to_send; // Data left to send
162 const char * iterator; // Pointer to rest of the message to send
163
164 // stats
165 // how many requests/answers were complete, that is, a valid cqe was obtained
166 struct {
167 size_t requests = 0;
168 size_t answers = 0;
169 } stats;
170
171private:
172 connection()
173 : state(ACCEPTING)
174 , fd(0)
175 , buffer( new char[buffer_size])
176 , iterator(nullptr)
177 {}
178
179 ~connection() {
180 delete [] buffer;
181 ::stats.conns.current--;
182 }
183
184 // Close the current connection
185 void close(int err) {
186 // std::cout << "(" << this->stats.requests << "," << this->stats.answers << ", e" << err << ") ";
187 conns[fd] = nullptr;
188
189 if(fd != 0) {
190 ::close(fd);
191 }
192 delete this;
193 }
194
195 //--------------------------------------------------
196 // Wrappers for submit so we can tweak it more easily
197 static void submit(struct io_uring * ring, struct io_uring_sqe * sqe, connection * conn) {
198 (void)ring;
199 local.to_submit++;
200 #ifdef USE_ASYNC
201 io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
202 #endif
203 io_uring_sqe_set_data(sqe, conn);
204 #ifdef NOBATCHING
205 io_uring_submit(ring);
206 #endif
207 }
208
209 void submit(struct io_uring * ring, struct io_uring_sqe * sqe) {
210 submit(ring, sqe, this);
211 }
212
213 //--------------------------------------------------
214 // get a new request from the client
215 void request(struct io_uring * ring) {
216 state = REQUESTING;
217 struct io_uring_sqe * sqe = get_sqe(ring);
218 io_uring_prep_recv(sqe, fd, (void*)buffer, buffer_size, 0);
219 submit(ring, sqe);
220 }
221
222 //--------------------------------------------------
223 // Send a new answer based on a return code
224 void answer(struct io_uring * ring, HttpCode code) {
225 iterator = http_msgs[code];
226 to_send = http_lens[code];
227 if(to_send != 124) {
228 std::cerr << "Answer has weird size: " << to_send << " (" << (int)code << ")" << std::endl;
229 }
230 answer(ring);
231 }
232
233 // send a new answer to the client
234 // Reused for incomplete writes
235 void answer(struct io_uring * ring) {
236 state = ANSWERING;
237 struct io_uring_sqe * sqe = get_sqe(ring);
238 io_uring_prep_send(sqe, fd, iterator, to_send, 0);
239 submit(ring, sqe);
240 }
241
242 //--------------------------------------------------
243 // Handle a new connection, results for getting an cqe while in the ACCEPTING state
244 void newconn(struct io_uring * ring, int ret) {
245 // Check errors
246 if( ret < 0 ) {
247 int err = -ret;
248 if( err == ECONNABORTED ) {
249 ::stats.errors.conns++;
250 this->close(err);
251 return;
252 }
253 std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
254 exit(EXIT_FAILURE);
255 }
256
257 // Count the connections
258 ::stats.completions.conns++;
259 ::stats.conns.current++;
260 if(::stats.conns.current > ::stats.conns.max) {
261 ::stats.conns.max = ::stats.conns.current;
262 }
263
264 // Read on the data
265 fd = ret;
266 request(ring);
267
268 // check the max fd so we know if we exceeded the array
269 for(;;) {
270 int expected = max_fd;
271 if(expected >= fd) return;
272 if( __atomic_compare_exchange_n(&max_fd, &expected, fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) return;
273 }
274
275 // check if we have enough space to fit inside the array
276 if(fd >= array_max) {
277 std::cerr << "accept error: fd " << fd << " is too high" << std::endl;
278 return;
279 }
280
281 // Put our connection into the global array
282 // No one else should be using it so if they are that's a bug
283 auto exist = __atomic_exchange_n( &conns[fd], this, __ATOMIC_SEQ_CST);
284 if( exist ) {
285 size_t first = __atomic_fetch_add(&global_stats.recycle_errors, 1, __ATOMIC_SEQ_CST);
286 if( first == 0 ) {
287 std::cerr << "First: accept has existing connection " << std::endl;
288 }
289 }
290 }
291
292 // Handle a new request, results for getting an cqe while in the REQUESTING state
293 void newrequest(struct io_uring * ring, int res) {
294 // Check errors
295 if( res < 0 ) {
296 int err = -res;
297 switch(err) {
298 case EPIPE:
299 ::stats.errors.requests.pipes++;
300 break;
301 // Don't fall through the get better stats
302 case ECONNRESET:
303 ::stats.errors.requests.reset++;
304 break;
305 default:
306 ::stats.errors.requests.other++;
307 std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl;
308 exit(EXIT_FAILURE);
309 }
310
311 // Connection failed, close it
312 this->close(err);
313 return;
314 }
315
316 // Update stats
317 ::stats.completions.reads++;
318
319 // Is this an EOF
320 if(res == 0) {
321 // Yes, close the connection
322 this->close(0);
323 return;
324 }
325
326 // Find the end of the request header
327 const char * it = buffer;
328 if( !strstr( it, "\r\n\r\n" ) ) {
329 // This state machine doesn't support incomplete reads
330 // Print them to output so it's clear there is an issue
331 std::cout << "Incomplete request" << std::endl;
332 this->close(EBADR);
333 return;
334 }
335
336 // Find the method to use
337 it = buffer;
338 int ret = memcmp(it, "GET ", 4);
339 if( ret != 0 ) {
340 // We only support get, answer with an error
341 answer(ring, E400);
342 return;
343 }
344
345 // Find the target
346 it += 4;
347 ret = memcmp(it, "/plaintext", 10);
348 if( ret != 0 ) {
349 // We only support /plaintext, answer with an error
350 answer(ring, E404);
351 return;
352 }
353
354 // Correct request, answer with the payload
355 this->stats.requests++;
356 answer(ring, OK200);
357 }
358
359 // Handle a partial or full answer sent, results for getting an cqe while in the ANSWERING state
360 void writedone(struct io_uring * ring, int res) {
361 // Check errors
362 if( res < 0 ) {
363 int err = -res;
364 switch(err) {
365 case EPIPE:
366 ::stats.errors.answers.pipes++;
367 break;
368 // Don't fall through the get better stats
369 case ECONNRESET:
370 ::stats.errors.answers.reset++;
371 break;
372 default:
373 ::stats.errors.answers.other++;
374 std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
375 exit(EXIT_FAILURE);
376 }
377
378 this->close(err);
379 return;
380 }
381
382 // Update stats
383 ::stats.completions.writes++;
384 if(res == 124) ::stats.completions.full_writes++;
385
386 // Is this write completed
387 if( res == to_send ) {
388 // Yes, more stats
389 this->stats.answers++;
390 if(this->stats.answers == 1) ::stats.conns.used++;
391 // Then read a new request
392 request(ring);
393 return;
394 }
395
396 // Not a completed read, push the rest
397 to_send -= res;
398 iterator += res;
399 answer(ring);
400 }
401public:
402 // Submit a call to accept and create a new connection object
403 static void accept(struct io_uring * ring, const struct options_t & opt) {
404 struct io_uring_sqe * sqe = get_sqe(ring);
405 io_uring_prep_accept(sqe, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
406 submit(ring, sqe, new connection());
407 // std::cout << "Submitted accept: " << req << std::endl;
408 }
409
410 // Handle a new cqe
411 void handle(struct io_uring * ring, int res, const struct options_t & opt) {
412 switch(state) {
413 case ACCEPTING:
414 // connection::accept(ring, opt);
415 newconn(ring, res);
416 break;
417 case REQUESTING:
418 newrequest(ring, res);
419 break;
420 case ANSWERING:
421 writedone(ring, res);
422 break;
423 }
424 }
425};
426
427//=========================================================
428extern "C" {
429 #include <sys/eventfd.h> // use for termination
430}
431
432// Main loop of the WebServer
433// Effectively uses one thread_local copy of everything per kernel thread
434void * proc_loop(void * arg) {
435 // Get the thread local argument
436 struct options_t & opt = *(struct options_t *)arg;
437 struct io_uring * ring = opt.ring;
438
439 int blockfd = eventfd(0, 0);
440 if (blockfd < 0) {
441 fprintf( stderr, "eventfd create error: (%d) %s\n", (int)errno, strerror(errno) );
442 exit(EXIT_FAILURE);
443 }
444
445 int ret = io_uring_register_eventfd(ring, blockfd);
446 if (ret < 0) {
447 fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );
448 exit(EXIT_FAILURE);
449 }
450
451 // Track the shutdown using a event_fd
452 char endfd_buf[8];
453 ring_end(ring, opt.endfd, endfd_buf, 8);
454
455 // Accept our first connection
456 // May not take effect until io_uring_submit_and_wait
457 for(unsigned i = 0; i < opt.acpt.cnt; i++) {
458 connection::accept(ring, opt);
459 }
460
461 int reset = 1; // Counter to print stats once in a while
462 bool done = false; // Are we done
463 size_t sqes = 0; // Number of sqes we submitted
464 size_t call = 0; // Number of submits we made
465 while(!done) {
466 // Submit all the answers we have and wait for responses
467 int ret = io_uring_submit(ring);
468 local.to_submit = 0;
469
470 // check errors
471 if (ret < 0) {
472 fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );
473 exit(EXIT_FAILURE);
474 }
475
476 // Check how good we are at batching sqes
477 sqes += ret;
478 call++;
479
480
481 eventfd_t val;
482 ret = eventfd_read(blockfd, &val);
483
484 // check errors
485 if (ret < 0) {
486 fprintf( stderr, "eventfd read error: (%d) %s\n", (int)errno, strerror(errno) );
487 exit(EXIT_FAILURE);
488 }
489
490 struct io_uring_cqe *cqe;
491 unsigned head;
492 unsigned count = 0;
493
494 // go through all cqes
495 io_uring_for_each_cqe(ring, head, cqe) {
496 if (0 == cqe->user_data) {
497 done = true;
498 break;
499 }
500
501 if(local.to_submit > 30) break;
502
503 auto req = (class connection *)cqe->user_data;
504 req->handle( ring, cqe->res, opt );
505
506 // Every now and then, print some stats
507 reset--;
508 if(reset == 0) {
509 std::cout << "Submit average: " << sqes << "/" << call << "(" << (((double)sqes) / call) << ")" << std::endl;
510 // Reset to some random number of completions
511 // use the ring_fd in the number of threads don't all print at once
512 reset = 100000 + (100000 * (ring->ring_fd % 5));
513 }
514
515 // Keep track of how many cqes we have seen
516 count++;
517 }
518
519 // Mark the cqes as seen
520 io_uring_cq_advance(ring, count);
521 }
522
523 // Tally all the thread local statistics
524 __atomic_fetch_add( &global_stats.completions.conns, ::stats.completions.conns, __ATOMIC_SEQ_CST );
525 __atomic_fetch_add( &global_stats.completions.reads, ::stats.completions.reads, __ATOMIC_SEQ_CST );
526 __atomic_fetch_add( &global_stats.completions.writes, ::stats.completions.writes, __ATOMIC_SEQ_CST );
527 __atomic_fetch_add( &global_stats.completions.full_writes, ::stats.completions.full_writes, __ATOMIC_SEQ_CST );
528 __atomic_fetch_add( &global_stats.errors.conns, ::stats.errors.conns, __ATOMIC_SEQ_CST );
529 __atomic_fetch_add( &global_stats.errors.requests.pipes, ::stats.errors.requests.pipes, __ATOMIC_SEQ_CST );
530 __atomic_fetch_add( &global_stats.errors.requests.reset, ::stats.errors.requests.reset, __ATOMIC_SEQ_CST );
531 __atomic_fetch_add( &global_stats.errors.requests.other, ::stats.errors.requests.other, __ATOMIC_SEQ_CST );
532 __atomic_fetch_add( &global_stats.errors.answers.pipes, ::stats.errors.answers.pipes, __ATOMIC_SEQ_CST );
533 __atomic_fetch_add( &global_stats.errors.answers.reset, ::stats.errors.answers.reset, __ATOMIC_SEQ_CST );
534 __atomic_fetch_add( &global_stats.errors.answers.other, ::stats.errors.answers.other, __ATOMIC_SEQ_CST );
535 __atomic_fetch_add( &global_stats.conns.current, ::stats.conns.current, __ATOMIC_SEQ_CST );
536 __atomic_fetch_add( &global_stats.conns.max, ::stats.conns.max, __ATOMIC_SEQ_CST );
537 __atomic_fetch_add( &global_stats.conns.used, ::stats.conns.used, __ATOMIC_SEQ_CST );
538
539 return nullptr;
540}
541
542//=========================================================
543#include <bit> // for ispow2
544
545extern "C" {
546 #include <pthread.h> // for pthreads
547 #include <signal.h> // for signal(SIGPIPE, SIG_IGN);
548 #include <sys/socket.h> // for sockets in general
549 #include <netinet/in.h> // for sockaddr_in, AF_INET
550}
551
552int main(int argc, char * argv[]) {
553 // Initialize the array of connection-fd associations
554 for(int i = 0; i < array_max; i++) {
555 conns[i] = nullptr;
556 }
557
558 // Make sure we ignore all sigpipes
559 signal(SIGPIPE, SIG_IGN);
560
561 // Default command line arguments
562 unsigned nthreads = 1; // number of kernel threads
563 unsigned port = 8800; // which port to listen on
564 unsigned entries = 256; // number of entries per ring/kernel thread
565 unsigned backlog = 262144; // backlog argument to listen
566 unsigned preaccept = 1; // start by accepting X per threads
567 bool attach = false; // Whether or not to attach all the rings
568 bool sqpoll = false; // Whether or not to use SQ Polling
569
570 //===================
571 // Arguments Parsing
572 int c;
573 while ((c = getopt (argc, argv, "t:p:e:b:c:aS")) != -1) {
574 switch (c)
575 {
576 case 't':
577 nthreads = atoi(optarg);
578 break;
579 case 'p':
580 port = atoi(optarg);
581 break;
582 case 'e':
583 entries = atoi(optarg);
584 break;
585 case 'b':
586 backlog = atoi(optarg);
587 break;
588 case 'c':
589 preaccept = atoi(optarg);
590 break;
591 case 'a':
592 attach = true;
593 break;
594 case 'S':
595 sqpoll = true;
596 break;
597 case '?':
598 default:
599 std::cerr << "Usage: -t <threads> -p <port> -e <entries> -b <backlog> -aS" << std::endl;
600 return EXIT_FAILURE;
601 }
602 }
603
604 if( !std::ispow2(entries) ) {
605 unsigned v = entries;
606 v--;
607 v |= v >> 1;
608 v |= v >> 2;
609 v |= v >> 4;
610 v |= v >> 8;
611 v |= v >> 16;
612 v++;
613 std::cerr << "Warning: num_entries not a power of 2 (" << entries << ") raising to " << v << std::endl;
614 entries = v;
615 }
616
617 //===================
618 // End FD
619 // Create a single event fd to notify the kernel threads when the server shutsdown
620 int efd = eventfd(0, EFD_SEMAPHORE);
621 if (efd < 0) {
622 std::cerr << "eventfd error: (" << errno << ") " << strerror(errno) << std::endl;
623 exit(EXIT_FAILURE);
624 }
625
626 //===================
627 // Open Socket
628 // Listen on specified port
629 std::cout << getpid() << " : Listening on port " << port << std::endl;
630 int server_fd = socket(AF_INET, SOCK_STREAM, 0);
631 if(server_fd < 0) {
632 std::cerr << "socket error: (" << errno << ") " << strerror(errno) << std::endl;
633 exit(EXIT_FAILURE);
634 }
635
636 int ret = 0;
637 struct sockaddr_in address;
638 int addrlen = sizeof(address);
639 memset( (char *)&address, '\0', addrlen );
640 address.sin_family = AF_INET;
641 address.sin_addr.s_addr = htonl(INADDR_ANY);
642 address.sin_port = htons( port );
643
644 // In case the port is already in use, don't just return an error
645 // Linux is very slow at reclaiming port so just retry regularly
646 int waited = 0;
647 while(true) {
648 ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) );
649 if(ret < 0) {
650 if(errno == EADDRINUSE) {
651 // Port is in used let's retry later
652 if(waited == 0) {
653 std::cerr << "Waiting for port" << std::endl;
654 } else {
655 // To be cure, print how long we have been waiting
656 std::cerr << "\r" << waited;
657 std::cerr.flush();
658 }
659 waited ++;
660 usleep( 1000000 ); // Wait and retry
661 continue;
662 }
663 // Some other error occured, this is a real error
664 std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl;
665 exit(EXIT_FAILURE);
666 }
667 break;
668 }
669
670 ret = listen( server_fd, backlog );
671 if(ret < 0) {
672 std::cerr << "listen error: (" << errno << ") " << strerror(errno) << std::endl;
673 exit(EXIT_FAILURE);
674 }
675
676 //===================
677 // Run Server Threads
678 std::cout << "Starting " << nthreads << " Threads";
679 if(attach) {
680 std::cout << " with attached Rings";
681 }
682 std::cout << std::endl;
683
684 // Create the desired number of kernel-threads and for each
685 // create a ring. Create the rings in the main so we can attach them
686 // Since the rings are all in a dense VLA, aligned them so we don't get false sharing
687 // it's unlikely but better safe than sorry
688 struct __attribute__((aligned(128))) aligned_ring {
689 struct io_uring storage;
690 };
691 aligned_ring thrd_rings[nthreads];
692 pthread_t thrd_hdls[nthreads];
693 options_t thrd_opts[nthreads];
694 bool no_drops = true;
695 bool fast_poll = true;
696 bool nfix_sqpl = true;
697 for(unsigned i = 0; i < nthreads; i++) {
698 struct io_uring_params p = { };
699
700 if(sqpoll) { // If sqpoll is on, add the flag
701 p.flags |= IORING_SETUP_SQPOLL;
702 p.sq_thread_idle = 100;
703 }
704
705 if (attach && i != 0) { // If attach is on, add the flag, except for the first ring
706 p.flags |= IORING_SETUP_ATTACH_WQ;
707 p.wq_fd = thrd_rings[0].storage.ring_fd;
708 }
709
710 // Create the ring
711 io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p);
712
713 // Check if some of the note-worthy features are there
714 if(0 == (p.features & IORING_FEAT_NODROP )) { no_drops = false; }
715 if(0 == (p.features & IORING_FEAT_FAST_POLL )) { fast_poll = false; }
716 if(0 == (p.features & IORING_FEAT_SQPOLL_NONFIXED)) { nfix_sqpl = false; }
717
718 // Write the socket options we want to the options we pass to the threads
719 thrd_opts[i].acpt.sockfd = server_fd;
720 thrd_opts[i].acpt.addr = (struct sockaddr *)&address;
721 thrd_opts[i].acpt.addrlen = (socklen_t*)&addrlen;
722 thrd_opts[i].acpt.flags = 0;
723 thrd_opts[i].acpt.cnt = preaccept;
724 thrd_opts[i].endfd = efd;
725 thrd_opts[i].ring = &thrd_rings[i].storage;
726
727 int ret = pthread_create(&thrd_hdls[i], nullptr, proc_loop, &thrd_opts[i]);
728 if (ret < 0) {
729 std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
730 exit(EXIT_FAILURE);
731 }
732 }
733
734 // Tell the user if the features are present
735 if( no_drops ) std::cout << "No Drop Present" << std::endl;
736 if( fast_poll) std::cout << "Fast Poll Present" << std::endl;
737 if(!nfix_sqpl) std::cout << "Non-Fixed SQ Poll not Present" << std::endl;
738
739 //===================
740 // Server Started
741 std::cout << "Server Started" << std::endl;
742 {
743 char buffer[128];
744 int ret;
745 do {
746 // Wait for a Ctrl-D to close the server
747 ret = read(STDIN_FILENO, buffer, 128);
748 if(ret < 0) {
749 std::cerr << "main read error: (" << errno << ") " << strerror(errno) << std::endl;
750 exit(EXIT_FAILURE);
751 }
752 else if(ret > 0) {
753 std::cout << "User inputed '";
754 std::cout.write(buffer, ret);
755 std::cout << "'" << std::endl;
756 }
757 } while(ret != 0);
758
759 std::cout << "Shutdown received" << std::endl;
760 }
761
762 //===================
763 // Use eventfd_write to tell the threads we are closing
764 (std::cout << "Sending Shutdown to Threads... ").flush();
765 ret = eventfd_write(efd, nthreads);
766 if (ret < 0) {
767 std::cerr << "eventfd close error: (" << errno << ") " << strerror(errno) << std::endl;
768 exit(EXIT_FAILURE);
769 }
770 std::cout << "done" << std::endl;
771
772 //===================
773 // Join all the threads and close the rings
774 (std::cout << "Stopping Threads Done... ").flush();
775 for(unsigned i = 0; i < nthreads; i++) {
776 void * retval;
777 int ret = pthread_join(thrd_hdls[i], &retval);
778 if (ret < 0) {
779 std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
780 exit(EXIT_FAILURE);
781 }
782
783 io_uring_queue_exit(thrd_opts[i].ring);
784 }
785 std::cout << "done" << std::endl;
786
787 //===================
788 // Close the sockets
789 (std::cout << "Closing Socket... ").flush();
790 ret = shutdown( server_fd, SHUT_RD );
791 if( ret < 0 ) {
792 std::cerr << "shutdown socket error: (" << errno << ") " << strerror(errno) << std::endl;
793 exit(EXIT_FAILURE);
794 }
795
796 ret = close(server_fd);
797 if (ret < 0) {
798 std::cerr << "close socket error: (" << errno << ") " << strerror(errno) << std::endl;
799 exit(EXIT_FAILURE);
800 }
801 std::cout << "done" << std::endl << std::endl;
802
803 // Print stats and exit
804 std::cout << "Errors: " << global_stats.errors.conns << "c, (" << global_stats.errors.requests.pipes << "p, " << global_stats.errors.requests.reset << "r, " << global_stats.errors.requests.other << "o" << ")r, (" << global_stats.errors.answers.pipes << "p, " << global_stats.errors.answers.reset << "r, " << global_stats.errors.answers.other << "o" << ")a" << std::endl;
805 std::cout << "Completions: " << global_stats.completions.conns << "c, " << global_stats.completions.reads << "r, " << global_stats.completions.writes << "w" << std::endl;
806 std::cout << "Full Writes: " << global_stats.completions.full_writes << std::endl;
807 std::cout << "Max FD: " << max_fd << std::endl;
808 std::cout << "Successful connections: " << global_stats.conns.used << std::endl;
809 std::cout << "Max concurrent connections: " << global_stats.conns.max << std::endl;
810 std::cout << "Accepts on non-zeros: " << global_stats.recycle_errors << std::endl;
811 std::cout << "Leaked conn objects: " << global_stats.conns.current << std::endl;
812}
813
814// compile-command: "g++ http_ring.cpp -std=c++2a -pthread -luring -O3" //
Note: See TracBrowser for help on using the repository browser.