source: benchmark/io/http/http_ring.cpp@ d60d30e

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast-unique-expr pthread-emulation qualifiedEnum stuck-waitfor-destruct
Last change on this file since d60d30e was c235179, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Added the option to pre-accept in httpring, this seems to have a major impact.

  • Property mode set to 100644
File size: 23.2 KB
RevLine 
[efdfdee]1#include <cstdio>
2#include <cstdlib>
3#include <cstring>
4
5#include <iostream>
6
7#include <signal.h>
8#include <unistd.h>
9#include <liburing.h>
10
[eeb4866]11// #define NOBATCHING
12// #define USE_ASYNC
13
[761a246]14// Options passed to each threads
[3acbf89]15struct __attribute__((aligned(128))) options_t {
[761a246]16 // Data passed to accept
[efdfdee]17 struct {
18 int sockfd;
19 struct sockaddr *addr;
20 socklen_t *addrlen;
21 int flags;
[c235179]22 unsigned cnt;
[efdfdee]23 } acpt;
24
[761a246]25 // Termination notification
[efdfdee]26 int endfd;
[761a246]27
28 // The ring to use for io
[f3e87af]29 struct io_uring * ring;
[761a246]30};
31
32//=========================================================
33// General statistics
34struct __attribute__((aligned(128))) stats_block_t {
35 struct {
36 volatile size_t conns = 0;
37 volatile size_t reads = 0;
38 volatile size_t writes = 0;
39 volatile size_t full_writes = 0;
40 } completions;
[3acbf89]41
42 struct {
[761a246]43 volatile size_t conns = 0;
44 struct {
45 volatile size_t pipes = 0;
46 volatile size_t reset = 0;
47 volatile size_t other = 0;
48 } requests;
49
50 struct {
51 volatile size_t pipes = 0;
52 volatile size_t reset = 0;
53 volatile size_t other = 0;
54 } answers;
55 } errors;
56
57 struct {
58 volatile size_t current = 0;
59 volatile size_t max = 0;
60 volatile size_t used = 0;
61 } conns;
62
63 volatile size_t recycle_errors = 0;
[efdfdee]64};
65
[761a246]66// Each thread gets its own block of stats
67// and there is a global block for tallying at the end
68thread_local stats_block_t stats;
69stats_block_t global_stats;
70
71// Get an array of current connections
72// This is just for debugging, to make sure
73// no two state-machines get the same fd
74const size_t array_max = 25000;
75class connection * volatile conns[array_max] = { 0 };
76
77// Max fd we've seen, keep track so it's convenient to adjust the array size after
78volatile int max_fd = 0;
[c05c58f]79
[efdfdee]80//=========================================================
[761a246]81// Some small wrappers for ring operations used outside the connection state machine
82// get sqe + error handling
[3acbf89]83static struct io_uring_sqe * get_sqe(struct io_uring * ring) {
[efdfdee]84 struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
[3acbf89]85 if(!sqe) {
86 std::cerr << "Insufficient entries in ring" << std::endl;
87 exit(EXIT_FAILURE);
88 }
89 return sqe;
90}
91
[761a246]92// read of the event fd is not done by a connection
93// use nullptr as the user data
[3acbf89]94static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) {
95 struct io_uring_sqe * sqe = get_sqe(ring);
[efdfdee]96 io_uring_prep_read(sqe, fd, buffer, len, 0);
[761a246]97 io_uring_sqe_set_data(sqe, nullptr);
98 io_uring_submit(ring);
[efdfdee]99}
100
101//=========================================================
[761a246]102// All answers are fixed and determined by the return code
[efdfdee]103enum HttpCode {
104 OK200 = 0,
105 E400,
106 E404,
107 E405,
108 E408,
109 E413,
110 E414,
111 KNOWN_CODES
112};
113
[761a246]114// Get a fix reply based on the return code
[efdfdee]115const char * http_msgs[] = {
[761a246]116 "HTTP/1.1 200 OK\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 15\r\nConnection: keep-alive\r\n\r\nHello, World!\r\n",
117 "HTTP/1.1 400 Bad Request\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
118 "HTTP/1.1 404 Not Found\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
119 "HTTP/1.1 405 Method Not \r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
120 "HTTP/1.1 408 Request Timeout\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
121 "HTTP/1.1 413 Payload Too Large\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
122 "HTTP/1.1 414 URI Too Long\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
[efdfdee]123};
[761a246]124static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0])) );
125
126// Pre-compute the length of these replys
127const size_t http_lens[] = {
128 strlen(http_msgs[0]),
129 strlen(http_msgs[1]),
130 strlen(http_msgs[2]),
131 strlen(http_msgs[3]),
132 strlen(http_msgs[4]),
133 strlen(http_msgs[5]),
134 strlen(http_msgs[6]),
135};
136static_assert( KNOWN_CODES == (sizeof(http_lens) / sizeof(http_lens[0])) );
[efdfdee]137
[761a246]138//=========================================================
139// Finate state machine responsible for handling each connection
140class __attribute__((aligned(128))) connection {
141private:
142 // The state of the machine
143 enum {
144 ACCEPTING, // Accept sent waiting for connection
145 REQUESTING, // Waiting for new request
146 ANSWERING, // Either request received submitting answer or short answer sent, need to submit rest
147 } state;
148
149 // The file descriptor of the connection
150 int fd;
[efdfdee]151
[761a246]152 // request data
153 static const size_t buffer_size = 1024; // Size of the read buffer
154 const char * buffer; // Buffer into which requests are read
[efdfdee]155
[761a246]156 // send data
157 size_t to_send; // Data left to send
158 const char * iterator; // Pointer to rest of the message to send
[efdfdee]159
[761a246]160 // stats
161 // how many requests/answers were complete, that is, a valid cqe was obtained
162 struct {
163 size_t requests = 0;
164 size_t answers = 0;
165 } stats;
166
167private:
168 connection()
169 : state(ACCEPTING)
170 , fd(0)
171 , buffer( new char[buffer_size])
172 , iterator(nullptr)
[eeb4866]173 {}
[efdfdee]174
[761a246]175 ~connection() {
176 delete [] buffer;
177 ::stats.conns.current--;
178 }
[efdfdee]179
[761a246]180 // Close the current connection
181 void close(int err) {
182 // std::cout << "(" << this->stats.requests << "," << this->stats.answers << ", e" << err << ") ";
183 conns[fd] = nullptr;
[efdfdee]184
[761a246]185 if(fd != 0) {
186 ::close(fd);
187 }
188 delete this;
189 }
[efdfdee]190
[761a246]191 //--------------------------------------------------
192 // Wrappers for submit so we can tweak it more easily
193 static void submit(struct io_uring * ring, struct io_uring_sqe * sqe, connection * conn) {
194 (void)ring;
[eeb4866]195 #ifdef USE_ASYNC
196 io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
197 #endif
[761a246]198 io_uring_sqe_set_data(sqe, conn);
[eeb4866]199 #ifdef NOBATCHING
200 io_uring_submit(ring);
201 #endif
[761a246]202 }
[3acbf89]203
[761a246]204 void submit(struct io_uring * ring, struct io_uring_sqe * sqe) {
205 submit(ring, sqe, this);
206 }
207
208 //--------------------------------------------------
209 // get a new request from the client
210 void request(struct io_uring * ring) {
211 state = REQUESTING;
212 struct io_uring_sqe * sqe = get_sqe(ring);
[eeb4866]213 io_uring_prep_recv(sqe, fd, (void*)buffer, buffer_size, 0);
[761a246]214 submit(ring, sqe);
215 }
[3acbf89]216
[761a246]217 //--------------------------------------------------
218 // Send a new answer based on a return code
219 void answer(struct io_uring * ring, HttpCode code) {
220 iterator = http_msgs[code];
221 to_send = http_lens[code];
222 if(to_send != 124) {
223 std::cerr << "Answer has weird size: " << to_send << " (" << (int)code << ")" << std::endl;
224 }
225 answer(ring);
226 }
[3acbf89]227
[761a246]228 // send a new answer to the client
229 // Reused for incomplete writes
230 void answer(struct io_uring * ring) {
231 state = ANSWERING;
232 struct io_uring_sqe * sqe = get_sqe(ring);
[eeb4866]233 io_uring_prep_send(sqe, fd, iterator, to_send, 0);
[761a246]234 submit(ring, sqe);
235 }
[3acbf89]236
[761a246]237 //--------------------------------------------------
238 // Handle a new connection, results for getting an cqe while in the ACCEPTING state
239 void newconn(struct io_uring * ring, int ret) {
240 // Check errors
241 if( ret < 0 ) {
242 int err = -ret;
243 if( err == ECONNABORTED ) {
244 ::stats.errors.conns++;
245 this->close(err);
246 return;
247 }
248 std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
249 exit(EXIT_FAILURE);
250 }
[3acbf89]251
[761a246]252 // Count the connections
253 ::stats.completions.conns++;
[eeb4866]254 ::stats.conns.current++;
255 if(::stats.conns.current > ::stats.conns.max) {
256 ::stats.conns.max = ::stats.conns.current;
257 }
[3acbf89]258
[761a246]259 // Read on the data
260 fd = ret;
261 request(ring);
[efdfdee]262
[761a246]263 // check the max fd so we know if we exceeded the array
264 for(;;) {
265 int expected = max_fd;
266 if(expected >= fd) return;
267 if( __atomic_compare_exchange_n(&max_fd, &expected, fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) return;
268 }
[efdfdee]269
[761a246]270 // check if we have enough space to fit inside the array
271 if(fd >= array_max) {
272 std::cerr << "accept error: fd " << fd << " is too high" << std::endl;
273 return;
274 }
[efdfdee]275
[761a246]276 // Put our connection into the global array
277 // No one else should be using it so if they are that's a bug
278 auto exist = __atomic_exchange_n( &conns[fd], this, __ATOMIC_SEQ_CST);
279 if( exist ) {
280 size_t first = __atomic_fetch_add(&global_stats.recycle_errors, 1, __ATOMIC_SEQ_CST);
281 if( first == 0 ) {
282 std::cerr << "First: accept has existing connection " << std::endl;
283 }
284 }
[efdfdee]285 }
286
[761a246]287 // Handle a new request, results for getting an cqe while in the REQUESTING state
[9715567]288 void newrequest(struct io_uring * ring, int res) {
[761a246]289 // Check errors
[9715567]290 if( res < 0 ) {
291 int err = -res;
[761a246]292 switch(err) {
293 case EPIPE:
294 ::stats.errors.requests.pipes++;
295 break;
296 // Don't fall through the get better stats
297 case ECONNRESET:
298 ::stats.errors.requests.reset++;
299 break;
300 default:
301 ::stats.errors.requests.other++;
302 std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl;
303 exit(EXIT_FAILURE);
304 }
[efdfdee]305
[761a246]306 // Connection failed, close it
307 this->close(err);
308 return;
[efdfdee]309 }
310
[761a246]311 // Update stats
312 ::stats.completions.reads++;
[efdfdee]313
[761a246]314 // Is this an EOF
[9715567]315 if(res == 0) {
[761a246]316 // Yes, close the connection
317 this->close(0);
318 return;
319 }
[efdfdee]320
[761a246]321 // Find the end of the request header
322 const char * it = buffer;
323 if( !strstr( it, "\r\n\r\n" ) ) {
324 // This state machine doesn't support incomplete reads
325 // Print them to output so it's clear there is an issue
326 std::cout << "Incomplete request" << std::endl;
327 this->close(EBADR);
328 return;
329 }
330
331 // Find the method to use
332 it = buffer;
333 int ret = memcmp(it, "GET ", 4);
334 if( ret != 0 ) {
335 // We only support get, answer with an error
336 answer(ring, E400);
337 return;
338 }
339
340 // Find the target
341 it += 4;
342 ret = memcmp(it, "/plaintext", 10);
343 if( ret != 0 ) {
344 // We only support /plaintext, answer with an error
345 answer(ring, E404);
346 return;
347 }
[efdfdee]348
[761a246]349 // Correct request, answer with the payload
350 this->stats.requests++;
351 answer(ring, OK200);
[efdfdee]352 }
353
[761a246]354 // Handle a partial or full answer sent, results for getting an cqe while in the ANSWERING state
355 void writedone(struct io_uring * ring, int res) {
356 // Check errors
357 if( res < 0 ) {
358 int err = -res;
359 switch(err) {
360 case EPIPE:
361 ::stats.errors.answers.pipes++;
362 break;
363 // Don't fall through the get better stats
364 case ECONNRESET:
365 ::stats.errors.answers.reset++;
366 break;
367 default:
368 ::stats.errors.answers.other++;
369 std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
370 exit(EXIT_FAILURE);
371 }
[efdfdee]372
[761a246]373 this->close(err);
374 return;
375 }
[efdfdee]376
[761a246]377 // Update stats
378 ::stats.completions.writes++;
379 if(res == 124) ::stats.completions.full_writes++;
380
381 // Is this write completed
382 if( res == to_send ) {
383 // Yes, more stats
[eeb4866]384 this->stats.answers++;
385 if(this->stats.answers == 1) ::stats.conns.used++;
[761a246]386 // Then read a new request
387 request(ring);
388 return;
[efdfdee]389 }
390
[761a246]391 // Not a completed read, push the rest
392 to_send -= res;
393 iterator += res;
394 answer(ring);
395 }
396public:
397 // Submit a call to accept and create a new connection object
398 static void accept(struct io_uring * ring, const struct options_t & opt) {
399 struct io_uring_sqe * sqe = get_sqe(ring);
400 io_uring_prep_accept(sqe, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
401 submit(ring, sqe, new connection());
402 // std::cout << "Submitted accept: " << req << std::endl;
[efdfdee]403 }
404
[761a246]405 // Handle a new cqe
406 void handle(struct io_uring * ring, int res, const struct options_t & opt) {
407 switch(state) {
408 case ACCEPTING:
409 connection::accept(ring, opt);
410 newconn(ring, res);
411 break;
412 case REQUESTING:
413 newrequest(ring, res);
414 break;
415 case ANSWERING:
416 writedone(ring, res);
417 break;
418 }
419 }
420};
[efdfdee]421
422//=========================================================
[761a246]423// Main loop of the WebServer
424// Effectively uses one thread_local copy of everything per kernel thread
[efdfdee]425void * proc_loop(void * arg) {
[761a246]426 // Get the thread local argument
[3acbf89]427 struct options_t & opt = *(struct options_t *)arg;
[f3e87af]428 struct io_uring * ring = opt.ring;
[efdfdee]429
[761a246]430 // Track the shutdown using a event_fd
[efdfdee]431 char endfd_buf[8];
432 ring_end(ring, opt.endfd, endfd_buf, 8);
433
[761a246]434 // Accept our first connection
435 // May not take effect until io_uring_submit_and_wait
[c235179]436 for(unsigned i = 0; i < opt.acpt.cnt; i++) {
437 connection::accept(ring, opt);
438 }
[efdfdee]439
[761a246]440 int reset = 1; // Counter to print stats once in a while
441 bool done = false; // Are we done
442 size_t sqes = 0; // Number of sqes we submitted
443 size_t call = 0; // Number of submits we made
[efdfdee]444 while(!done) {
[761a246]445 // Submit all the answers we have and wait for responses
446 int ret = io_uring_submit_and_wait(ring, 1);
[3acbf89]447
[761a246]448 // check errors
449 if (ret < 0) {
450 fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );
[efdfdee]451 exit(EXIT_FAILURE);
452 }
453
[761a246]454 // Check how good we are at batching sqes
455 sqes += ret;
456 call++;
457
458 struct io_uring_cqe *cqe;
459 unsigned head;
460 unsigned count = 0;
[efdfdee]461
[761a246]462 // go through all cqes
463 io_uring_for_each_cqe(ring, head, cqe) {
464 if (0 == cqe->user_data) {
[efdfdee]465 done = true;
466 break;
[761a246]467 }
[efdfdee]468
[761a246]469 auto req = (class connection *)cqe->user_data;
470 req->handle( ring, cqe->res, opt );
[c05c58f]471
[eeb4866]472 // Every now and then, print some stats
[761a246]473 reset--;
474 if(reset == 0) {
475 std::cout << "Submit average: " << sqes << "/" << call << "(" << (((double)sqes) / call) << ")" << std::endl;
[eeb4866]476 // Reset to some random number of completions
477 // use the ring_fd in the number of threads don't all print at once
[761a246]478 reset = 100000 + (100000 * (ring->ring_fd % 5));
479 }
480
481 // Keep track of how many cqes we have seen
482 count++;
[c05c58f]483 }
[761a246]484
485 // Mark the cqes as seen
486 io_uring_cq_advance(ring, count);
[efdfdee]487 }
488
[761a246]489 // Tally all the thread local statistics
490 __atomic_fetch_add( &global_stats.completions.conns, ::stats.completions.conns, __ATOMIC_SEQ_CST );
491 __atomic_fetch_add( &global_stats.completions.reads, ::stats.completions.reads, __ATOMIC_SEQ_CST );
492 __atomic_fetch_add( &global_stats.completions.writes, ::stats.completions.writes, __ATOMIC_SEQ_CST );
493 __atomic_fetch_add( &global_stats.completions.full_writes, ::stats.completions.full_writes, __ATOMIC_SEQ_CST );
494 __atomic_fetch_add( &global_stats.errors.conns, ::stats.errors.conns, __ATOMIC_SEQ_CST );
495 __atomic_fetch_add( &global_stats.errors.requests.pipes, ::stats.errors.requests.pipes, __ATOMIC_SEQ_CST );
496 __atomic_fetch_add( &global_stats.errors.requests.reset, ::stats.errors.requests.reset, __ATOMIC_SEQ_CST );
497 __atomic_fetch_add( &global_stats.errors.requests.other, ::stats.errors.requests.other, __ATOMIC_SEQ_CST );
498 __atomic_fetch_add( &global_stats.errors.answers.pipes, ::stats.errors.answers.pipes, __ATOMIC_SEQ_CST );
499 __atomic_fetch_add( &global_stats.errors.answers.reset, ::stats.errors.answers.reset, __ATOMIC_SEQ_CST );
500 __atomic_fetch_add( &global_stats.errors.answers.other, ::stats.errors.answers.other, __ATOMIC_SEQ_CST );
501 __atomic_fetch_add( &global_stats.conns.current, ::stats.conns.current, __ATOMIC_SEQ_CST );
502 __atomic_fetch_add( &global_stats.conns.max, ::stats.conns.max, __ATOMIC_SEQ_CST );
503 __atomic_fetch_add( &global_stats.conns.used, ::stats.conns.used, __ATOMIC_SEQ_CST );
504
505 return nullptr;
[efdfdee]506}
507
508//=========================================================
[761a246]509#include <bit> // for ispow2
[3acbf89]510
[efdfdee]511extern "C" {
[761a246]512 #include <pthread.h> // for pthreads
513 #include <signal.h> // for signal(SIGPIPE, SIG_IGN);
514 #include <sys/eventfd.h> // use for termination
515 #include <sys/socket.h> // for sockets in general
516 #include <netinet/in.h> // for sockaddr_in, AF_INET
[efdfdee]517}
518
[3acbf89]519int main(int argc, char * argv[]) {
[761a246]520 // Initialize the array of connection-fd associations
521 for(int i = 0; i < array_max; i++) {
522 conns[i] = nullptr;
523 }
524
525 // Make sure we ignore all sigpipes
[efdfdee]526 signal(SIGPIPE, SIG_IGN);
527
[761a246]528 // Default command line arguments
529 unsigned nthreads = 1; // number of kernel threads
530 unsigned port = 8800; // which port to listen on
531 unsigned entries = 256; // number of entries per ring/kernel thread
532 unsigned backlog = 262144; // backlog argument to listen
[c235179]533 unsigned preaccept = 1; // start by accepting X per threads
[761a246]534 bool attach = false; // Whether or not to attach all the rings
535 bool sqpoll = false; // Whether or not to use SQ Polling
[efdfdee]536
[3acbf89]537 //===================
[761a246]538 // Arguments Parsing
[3acbf89]539 int c;
[c235179]540 while ((c = getopt (argc, argv, "t:p:e:b:c:aS")) != -1) {
[3acbf89]541 switch (c)
542 {
543 case 't':
544 nthreads = atoi(optarg);
545 break;
546 case 'p':
547 port = atoi(optarg);
548 break;
549 case 'e':
550 entries = atoi(optarg);
551 break;
552 case 'b':
553 backlog = atoi(optarg);
554 break;
[c235179]555 case 'c':
556 preaccept = atoi(optarg);
557 break;
[f3e87af]558 case 'a':
559 attach = true;
560 break;
[c05c58f]561 case 'S':
562 sqpoll = true;
563 break;
[3acbf89]564 case '?':
565 default:
[c05c58f]566 std::cerr << "Usage: -t <threads> -p <port> -e <entries> -b <backlog> -aS" << std::endl;
[3acbf89]567 return EXIT_FAILURE;
568 }
569 }
570
571 if( !std::ispow2(entries) ) {
572 unsigned v = entries;
573 v--;
574 v |= v >> 1;
575 v |= v >> 2;
576 v |= v >> 4;
577 v |= v >> 8;
578 v |= v >> 16;
579 v++;
580 std::cerr << "Warning: num_entries not a power of 2 (" << entries << ") raising to " << v << std::endl;
581 entries = v;
582 }
583
[efdfdee]584 //===================
585 // End FD
[761a246]586 // Create a single event fd to notify the kernel threads when the server shutsdown
[efdfdee]587 int efd = eventfd(0, EFD_SEMAPHORE);
588 if (efd < 0) {
589 std::cerr << "eventfd error: (" << errno << ") " << strerror(errno) << std::endl;
590 exit(EXIT_FAILURE);
591 }
592
593 //===================
594 // Open Socket
[761a246]595 // Listen on specified port
[efdfdee]596 std::cout << getpid() << " : Listening on port " << port << std::endl;
597 int server_fd = socket(AF_INET, SOCK_STREAM, 0);
598 if(server_fd < 0) {
599 std::cerr << "socket error: (" << errno << ") " << strerror(errno) << std::endl;
600 exit(EXIT_FAILURE);
601 }
602
603 int ret = 0;
604 struct sockaddr_in address;
605 int addrlen = sizeof(address);
606 memset( (char *)&address, '\0', addrlen );
607 address.sin_family = AF_INET;
608 address.sin_addr.s_addr = htonl(INADDR_ANY);
609 address.sin_port = htons( port );
610
[761a246]611 // In case the port is already in use, don't just return an error
612 // Linux is very slow at reclaiming port so just retry regularly
[efdfdee]613 int waited = 0;
614 while(true) {
615 ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) );
616 if(ret < 0) {
617 if(errno == EADDRINUSE) {
[761a246]618 // Port is in used let's retry later
[efdfdee]619 if(waited == 0) {
620 std::cerr << "Waiting for port" << std::endl;
621 } else {
[761a246]622 // To be cure, print how long we have been waiting
[efdfdee]623 std::cerr << "\r" << waited;
624 std::cerr.flush();
625 }
626 waited ++;
[761a246]627 usleep( 1000000 ); // Wait and retry
[efdfdee]628 continue;
629 }
[761a246]630 // Some other error occured, this is a real error
[efdfdee]631 std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl;
632 exit(EXIT_FAILURE);
633 }
634 break;
635 }
636
637 ret = listen( server_fd, backlog );
638 if(ret < 0) {
639 std::cerr << "listen error: (" << errno << ") " << strerror(errno) << std::endl;
640 exit(EXIT_FAILURE);
641 }
642
643 //===================
644 // Run Server Threads
[f3e87af]645 std::cout << "Starting " << nthreads << " Threads";
646 if(attach) {
647 std::cout << " with attached Rings";
648 }
649 std::cout << std::endl;
650
[761a246]651 // Create the desired number of kernel-threads and for each
652 // create a ring. Create the rings in the main so we can attach them
653 // Since the rings are all in a dense VLA, aligned them so we don't get false sharing
654 // it's unlikely but better safe than sorry
655 struct __attribute__((aligned(128))) aligned_ring {
656 struct io_uring storage;
657 };
[f3e87af]658 aligned_ring thrd_rings[nthreads];
659 pthread_t thrd_hdls[nthreads];
660 options_t thrd_opts[nthreads];
[761a246]661 bool no_drops = true;
[c05c58f]662 bool fast_poll = true;
663 bool nfix_sqpl = true;
[efdfdee]664 for(unsigned i = 0; i < nthreads; i++) {
[c05c58f]665 struct io_uring_params p = { };
[761a246]666
667 if(sqpoll) { // If sqpoll is on, add the flag
[c05c58f]668 p.flags |= IORING_SETUP_SQPOLL;
669 p.sq_thread_idle = 100;
[f3e87af]670 }
[c05c58f]671
[761a246]672 if (attach && i != 0) { // If attach is on, add the flag, except for the first ring
[c05c58f]673 p.flags |= IORING_SETUP_ATTACH_WQ;
[f3e87af]674 p.wq_fd = thrd_rings[0].storage.ring_fd;
675 }
[761a246]676
677 // Create the ring
[c05c58f]678 io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p);
679
[761a246]680 // Check if some of the note-worthy features are there
681 if(0 == (p.features & IORING_FEAT_NODROP )) { no_drops = false; }
[c05c58f]682 if(0 == (p.features & IORING_FEAT_FAST_POLL )) { fast_poll = false; }
683 if(0 == (p.features & IORING_FEAT_SQPOLL_NONFIXED)) { nfix_sqpl = false; }
[f3e87af]684
[761a246]685 // Write the socket options we want to the options we pass to the threads
[efdfdee]686 thrd_opts[i].acpt.sockfd = server_fd;
687 thrd_opts[i].acpt.addr = (struct sockaddr *)&address;
688 thrd_opts[i].acpt.addrlen = (socklen_t*)&addrlen;
689 thrd_opts[i].acpt.flags = 0;
[c235179]690 thrd_opts[i].acpt.cnt = preaccept;
[f3e87af]691 thrd_opts[i].endfd = efd;
692 thrd_opts[i].ring = &thrd_rings[i].storage;
[efdfdee]693
694 int ret = pthread_create(&thrd_hdls[i], nullptr, proc_loop, &thrd_opts[i]);
695 if (ret < 0) {
696 std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
697 exit(EXIT_FAILURE);
698 }
699 }
700
[761a246]701 // Tell the user if the features are present
702 if( no_drops ) std::cout << "No Drop Present" << std::endl;
[c05c58f]703 if( fast_poll) std::cout << "Fast Poll Present" << std::endl;
704 if(!nfix_sqpl) std::cout << "Non-Fixed SQ Poll not Present" << std::endl;
705
[efdfdee]706 //===================
707 // Server Started
708 std::cout << "Server Started" << std::endl;
709 {
710 char buffer[128];
711 int ret;
712 do {
[761a246]713 // Wait for a Ctrl-D to close the server
[efdfdee]714 ret = read(STDIN_FILENO, buffer, 128);
715 if(ret < 0) {
716 std::cerr << "main read error: (" << errno << ") " << strerror(errno) << std::endl;
717 exit(EXIT_FAILURE);
718 }
719 else if(ret > 0) {
720 std::cout << "User inputed '";
721 std::cout.write(buffer, ret);
722 std::cout << "'" << std::endl;
723 }
724 } while(ret != 0);
725
726 std::cout << "Shutdown received" << std::endl;
727 }
728
729 //===================
[761a246]730 // Use eventfd_write to tell the threads we are closing
[efdfdee]731 (std::cout << "Sending Shutdown to Threads... ").flush();
732 ret = eventfd_write(efd, nthreads);
733 if (ret < 0) {
734 std::cerr << "eventfd close error: (" << errno << ") " << strerror(errno) << std::endl;
735 exit(EXIT_FAILURE);
736 }
737 std::cout << "done" << std::endl;
738
739 //===================
[761a246]740 // Join all the threads and close the rings
[efdfdee]741 (std::cout << "Stopping Threads Done... ").flush();
742 for(unsigned i = 0; i < nthreads; i++) {
743 void * retval;
744 int ret = pthread_join(thrd_hdls[i], &retval);
745 if (ret < 0) {
746 std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
747 exit(EXIT_FAILURE);
748 }
[f3e87af]749
750 io_uring_queue_exit(thrd_opts[i].ring);
[efdfdee]751 }
752 std::cout << "done" << std::endl;
753
754 //===================
[761a246]755 // Close the sockets
[efdfdee]756 (std::cout << "Closing Socket... ").flush();
757 ret = shutdown( server_fd, SHUT_RD );
758 if( ret < 0 ) {
759 std::cerr << "shutdown socket error: (" << errno << ") " << strerror(errno) << std::endl;
760 exit(EXIT_FAILURE);
761 }
762
763 ret = close(server_fd);
764 if (ret < 0) {
765 std::cerr << "close socket error: (" << errno << ") " << strerror(errno) << std::endl;
766 exit(EXIT_FAILURE);
767 }
[761a246]768 std::cout << "done" << std::endl << std::endl;
769
770 // Print stats and exit
771 std::cout << "Errors: " << global_stats.errors.conns << "c, (" << global_stats.errors.requests.pipes << "p, " << global_stats.errors.requests.reset << "r, " << global_stats.errors.requests.other << "o" << ")r, (" << global_stats.errors.answers.pipes << "p, " << global_stats.errors.answers.reset << "r, " << global_stats.errors.answers.other << "o" << ")a" << std::endl;
772 std::cout << "Completions: " << global_stats.completions.conns << "c, " << global_stats.completions.reads << "r, " << global_stats.completions.writes << "w" << std::endl;
773 std::cout << "Full Writes: " << global_stats.completions.full_writes << std::endl;
774 std::cout << "Max FD: " << max_fd << std::endl;
775 std::cout << "Successful connections: " << global_stats.conns.used << std::endl;
[eeb4866]776 std::cout << "Max concurrent connections: " << global_stats.conns.max << std::endl;
[761a246]777 std::cout << "Accepts on non-zeros: " << global_stats.recycle_errors << std::endl;
778 std::cout << "Leaked conn objects: " << global_stats.conns.current << std::endl;
[eeb4866]779}
780
781// compile-command: "g++ http_ring.cpp -std=c++2a -pthread -luring -O3" //
Note: See TracBrowser for help on using the repository browser.