source: benchmark/io/http/http_ring.cpp @ df56e25

Last change on this file since df56e25 was 1db1454, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Fixed Typo

  • Property mode set to 100644
File size: 23.9 KB
Line 
1#include <cstdio>
2#include <cstdlib>
3#include <cstring>
4
5#include <iostream>
6
7#include <signal.h>
8#include <unistd.h>
9#include <liburing.h>
10
11// #define NOBATCHING
12// #define USE_ASYNC
13
14// Options passed to each threads
15struct __attribute__((aligned(128))) options_t {
16        // Data passed to accept
17        struct {
18                int sockfd;
19                struct sockaddr *addr;
20                socklen_t *addrlen;
21                int flags;
22                unsigned cnt;
23        } acpt;
24
25        // Termination notification
26        int endfd;
27
28        // The ring to use for io
29        struct io_uring * ring;
30};
31
32//=========================================================
33// General statistics
34struct __attribute__((aligned(128))) stats_block_t {
35        struct {
36                volatile size_t conns = 0;
37                volatile size_t reads = 0;
38                volatile size_t writes = 0;
39                volatile size_t full_writes = 0;
40        } completions;
41
42        struct {
43                volatile size_t conns = 0;
44                struct {
45                        volatile size_t pipes = 0;
46                        volatile size_t reset = 0;
47                        volatile size_t other = 0;
48                } requests;
49
50                struct {
51                        volatile size_t pipes = 0;
52                        volatile size_t reset = 0;
53                        volatile size_t other = 0;
54                } answers;
55        } errors;
56
57        struct {
58                volatile size_t current = 0;
59                volatile size_t max = 0;
60                volatile size_t used = 0;
61        } conns;
62
63        volatile size_t recycle_errors = 0;
64};
65
66// Each thread gets its own block of stats
67// and there is a global block for tallying at the end
68thread_local stats_block_t stats;
69stats_block_t global_stats;
70
71thread_local struct __attribute__((aligned(128))) {
72        size_t to_submit = 0;
73} local;
74
75// Get an array of current connections
76// This is just for debugging, to make sure
77// no two state-machines get the same fd
78const size_t array_max = 25000;
79class connection * volatile conns[array_max] = { 0 };
80
81// Max fd we've seen, keep track so it's convenient to adjust the array size after
82volatile int max_fd = 0;
83
84//=========================================================
85// Some small wrappers for ring operations used outside the connection state machine
86// get sqe + error handling
87static struct io_uring_sqe * get_sqe(struct io_uring * ring) {
88        struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
89        if(!sqe) {
90                std::cerr << "Insufficient entries in ring" << std::endl;
91                exit(EXIT_FAILURE);
92        }
93        return sqe;
94}
95
96// read of the event fd is not done by a connection
97// use nullptr as the user data
98static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) {
99        struct io_uring_sqe * sqe = get_sqe(ring);
100        io_uring_prep_read(sqe, fd, buffer, len, 0);
101        io_uring_sqe_set_data(sqe, nullptr);
102        io_uring_submit(ring);
103}
104
105//=========================================================
106// All answers are fixed and determined by the return code
107enum HttpCode {
108        OK200 = 0,
109        E400,
110        E404,
111        E405,
112        E408,
113        E413,
114        E414,
115        KNOWN_CODES
116};
117
118// Get a fix reply based on the return code
119const char * http_msgs[] = {
120        "HTTP/1.1 200 OK\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 15\r\nConnection: keep-alive\r\n\r\nHello, World!\r\n",
121        "HTTP/1.1 400 Bad Request\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
122        "HTTP/1.1 404 Not Found\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
123        "HTTP/1.1 405 Method Not \r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
124        "HTTP/1.1 408 Request Timeout\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
125        "HTTP/1.1 413 Payload Too Large\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
126        "HTTP/1.1 414 URI Too Long\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
127};
128static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0])) );
129
130// Pre-compute the length of these replys
131const size_t http_lens[] = {
132        strlen(http_msgs[0]),
133        strlen(http_msgs[1]),
134        strlen(http_msgs[2]),
135        strlen(http_msgs[3]),
136        strlen(http_msgs[4]),
137        strlen(http_msgs[5]),
138        strlen(http_msgs[6]),
139};
140static_assert( KNOWN_CODES == (sizeof(http_lens) / sizeof(http_lens[0])) );
141
142//=========================================================
143// Finate state machine responsible for handling each connection
144class __attribute__((aligned(128))) connection {
145private:
146        // The state of the machine
147        enum {
148                ACCEPTING,  // Accept sent waiting for connection
149                REQUESTING, // Waiting for new request
150                ANSWERING,  // Either request received submitting answer or short answer sent, need to submit rest
151        } state;
152
153        // The file descriptor of the connection
154        int fd;
155
156        // request data
157        static const size_t buffer_size = 1024; // Size of the read buffer
158        const char * buffer;                      // Buffer into which requests are read
159
160        // send data
161        size_t to_send;         // Data left to send
162        const char * iterator;  // Pointer to rest of the message to send
163
164        // stats
165        // how many requests/answers were complete, that is, a valid cqe was obtained
166        struct {
167                size_t requests = 0;
168                size_t answers = 0;
169        } stats;
170
171private:
172        connection()
173                : state(ACCEPTING)
174                , fd(0)
175                , buffer( new char[buffer_size])
176                , iterator(nullptr)
177        {}
178
179        ~connection() {
180                delete [] buffer;
181                ::stats.conns.current--;
182        }
183
184        // Close the current connection
185        void close(int err) {
186                // std::cout << "(" << this->stats.requests << "," << this->stats.answers << ", e" << err << ") ";
187                conns[fd] = nullptr;
188
189                if(fd != 0) {
190                        ::close(fd);
191                }
192                delete this;
193        }
194
195        //--------------------------------------------------
196        // Wrappers for submit so we can tweak it more easily
197        static void submit(struct io_uring * ring, struct io_uring_sqe * sqe, connection * conn) {
198                (void)ring;
199                local.to_submit++;
200                #ifdef USE_ASYNC
201                        io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
202                #endif
203                io_uring_sqe_set_data(sqe, conn);
204                #ifdef NOBATCHING
205                        io_uring_submit(ring);
206                #endif
207        }
208
209        void submit(struct io_uring * ring, struct io_uring_sqe * sqe) {
210                submit(ring, sqe, this);
211        }
212
213        //--------------------------------------------------
214        // get a new request from the client
215        void request(struct io_uring * ring) {
216                state = REQUESTING;
217                struct io_uring_sqe * sqe = get_sqe(ring);
218                io_uring_prep_recv(sqe, fd, (void*)buffer, buffer_size, 0);
219                submit(ring, sqe);
220        }
221
222        //--------------------------------------------------
223        // Send a new answer based on a return code
224        void answer(struct io_uring * ring, HttpCode code) {
225                iterator = http_msgs[code];
226                to_send  = http_lens[code];
227                if(to_send != 124) {
228                        std::cerr << "Answer has weird size: " << to_send << " (" << (int)code << ")" << std::endl;
229                }
230                answer(ring);
231        }
232
233        // send a new answer to the client
234        // Reused for incomplete writes
235        void answer(struct io_uring * ring) {
236                state = ANSWERING;
237                struct io_uring_sqe * sqe = get_sqe(ring);
238                io_uring_prep_send(sqe, fd, iterator, to_send, 0);
239                submit(ring, sqe);
240        }
241
242        //--------------------------------------------------
243        // Handle a new connection, results for getting an cqe while in the ACCEPTING state
244        void newconn(struct io_uring * ring, int ret) {
245                // Check errors
246                if( ret < 0 ) {
247                        int err = -ret;
248                        if( err == ECONNABORTED ) {
249                                ::stats.errors.conns++;
250                                this->close(err);
251                                return;
252                        }
253                        std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
254                        exit(EXIT_FAILURE);
255                }
256
257                // Count the connections
258                ::stats.completions.conns++;
259                ::stats.conns.current++;
260                if(::stats.conns.current > ::stats.conns.max) {
261                        ::stats.conns.max = ::stats.conns.current;
262                }
263
264                // Read on the data
265                fd = ret;
266                request(ring);
267
268                // check the max fd so we know if we exceeded the array
269                for(;;) {
270                        int expected = max_fd;
271                        if(expected >= fd) return;
272                        if( __atomic_compare_exchange_n(&max_fd, &expected, fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) return;
273                }
274
275                // check if we have enough space to fit inside the array
276                if(fd >= array_max) {
277                        std::cerr << "accept error: fd " << fd << " is too high" << std::endl;
278                        return;
279                }
280
281                // Put our connection into the global array
282                // No one else should be using it so if they are that's a bug
283                auto exist = __atomic_exchange_n( &conns[fd], this, __ATOMIC_SEQ_CST);
284                if( exist ) {
285                        size_t first = __atomic_fetch_add(&global_stats.recycle_errors, 1, __ATOMIC_SEQ_CST);
286                        if( first == 0 ) {
287                                std::cerr << "First: accept has existing connection " << std::endl;
288                        }
289                }
290        }
291
292        // Handle a new request, results for getting an cqe while in the REQUESTING state
293        void newrequest(struct io_uring * ring, int res) {
294                // Check errors
295                if( res < 0 ) {
296                        int err = -res;
297                        switch(err) {
298                                case EPIPE:
299                                        ::stats.errors.requests.pipes++;
300                                        break;
301                                        // Don't fall through the get better stats
302                                case ECONNRESET:
303                                        ::stats.errors.requests.reset++;
304                                        break;
305                                default:
306                                        ::stats.errors.requests.other++;
307                                        std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl;
308                                        exit(EXIT_FAILURE);
309                        }
310
311                        // Connection failed, close it
312                        this->close(err);
313                        return;
314                }
315
316                // Update stats
317                ::stats.completions.reads++;
318
319                // Is this an EOF
320                if(res == 0) {
321                        // Yes, close the connection
322                        this->close(0);
323                        return;
324                }
325
326                // Find the end of the request header
327                const char * it = buffer;
328                if( !strstr( it, "\r\n\r\n" ) ) {
329                        // This state machine doesn't support incomplete reads
330                        // Print them to output so it's clear there is an issue
331                        std::cout << "Incomplete request" << std::endl;
332                        this->close(EBADR);
333                        return;
334                }
335
336                // Find the method to use
337                it = buffer;
338                int ret = memcmp(it, "GET ", 4);
339                if( ret != 0 ) {
340                        // We only support get, answer with an error
341                        answer(ring, E400);
342                        return;
343                }
344
345                // Find the target
346                it += 4;
347                ret = memcmp(it, "/plaintext", 10);
348                if( ret != 0 ) {
349                        // We only support /plaintext, answer with an error
350                        answer(ring, E404);
351                        return;
352                }
353
354                // Correct request, answer with the payload
355                this->stats.requests++;
356                answer(ring, OK200);
357        }
358
359        // Handle a partial or full answer sent, results for getting an cqe while in the ANSWERING state
360        void writedone(struct io_uring * ring, int res) {
361                // Check errors
362                if( res < 0 ) {
363                        int err = -res;
364                        switch(err) {
365                                case EPIPE:
366                                        ::stats.errors.answers.pipes++;
367                                        break;
368                                        // Don't fall through the get better stats
369                                case ECONNRESET:
370                                        ::stats.errors.answers.reset++;
371                                        break;
372                                default:
373                                        ::stats.errors.answers.other++;
374                                        std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
375                                        exit(EXIT_FAILURE);
376                        }
377
378                        this->close(err);
379                        return;
380                }
381
382                // Update stats
383                ::stats.completions.writes++;
384                if(res == 124) ::stats.completions.full_writes++;
385
386                // Is this write completed
387                if( res == to_send ) {
388                        // Yes, more stats
389                        this->stats.answers++;
390                        if(this->stats.answers == 1) ::stats.conns.used++;
391                        // Then read a new request
392                        request(ring);
393                        return;
394                }
395
396                // Not a completed read, push the rest
397                to_send -= res;
398                iterator += res;
399                answer(ring);
400        }
401public:
402        // Submit a call to accept and create a new connection object
403        static void accept(struct io_uring * ring, const struct options_t & opt) {
404                struct io_uring_sqe * sqe = get_sqe(ring);
405                io_uring_prep_accept(sqe, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
406                submit(ring, sqe, new connection());
407                // std::cout << "Submitted accept: " << req << std::endl;
408        }
409
410        // Handle a new cqe
411        void handle(struct io_uring * ring, int res, const struct options_t & opt) {
412                switch(state) {
413                case ACCEPTING:
414                        // connection::accept(ring, opt);
415                        newconn(ring, res);
416                        break;
417                case REQUESTING:
418                        newrequest(ring, res);
419                        break;
420                case ANSWERING:
421                        writedone(ring, res);
422                        break;
423                }
424        }
425};
426
427//=========================================================
428extern "C" {
429        #include <sys/eventfd.h>  // use for termination
430}
431
432// Main loop of the WebServer
433// Effectively uses one thread_local copy of everything per kernel thread
434void * proc_loop(void * arg) {
435        // Get the thread local argument
436        struct options_t & opt = *(struct options_t *)arg;
437        struct io_uring * ring = opt.ring;
438
439        int blockfd = eventfd(0, 0);
440        if (blockfd < 0) {
441                fprintf( stderr, "eventfd create error: (%d) %s\n", (int)errno, strerror(errno) );
442                exit(EXIT_FAILURE);
443        }
444
445        int ret = io_uring_register_eventfd(ring, blockfd);
446        if (ret < 0) {
447                fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );
448                exit(EXIT_FAILURE);
449        }
450
451        // Track the shutdown using a event_fd
452        char endfd_buf[8];
453        ring_end(ring, opt.endfd, endfd_buf, 8);
454
455        // Accept our first connection
456        // May not take effect until io_uring_submit_and_wait
457        for(unsigned i = 0; i < opt.acpt.cnt; i++) {
458                connection::accept(ring, opt);
459        }
460
461        int reset = 1;       // Counter to print stats once in a while
462        bool done = false;   // Are we done
463        size_t sqes = 0;     // Number of sqes we submitted
464        size_t call = 0;     // Number of submits we made
465        while(!done) {
466                // Submit all the answers we have and wait for responses
467                int ret = io_uring_submit(ring);
468                local.to_submit = 0;
469
470                // check errors
471                if (ret < 0) {
472                        fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );
473                        exit(EXIT_FAILURE);
474                }
475
476                // Check how good we are at batching sqes
477                sqes += ret;
478                call++;
479
480
481                eventfd_t val;
482                ret = eventfd_read(blockfd, &val);
483
484                // check errors
485                if (ret < 0) {
486                        fprintf( stderr, "eventfd read error: (%d) %s\n", (int)errno, strerror(errno) );
487                        exit(EXIT_FAILURE);
488                }
489
490                struct io_uring_cqe *cqe;
491                unsigned head;
492                unsigned count = 0;
493
494                // go through all cqes
495                io_uring_for_each_cqe(ring, head, cqe) {
496                        if (0 == cqe->user_data) {
497                                done = true;
498                                break;
499                        }
500
501                        if(local.to_submit > 30) break;
502
503                        auto req = (class connection *)cqe->user_data;
504                        req->handle( ring, cqe->res, opt );
505
506                        // Every now and then, print some stats
507                        reset--;
508                        if(reset == 0) {
509                                std::cout << "Submit average: " << sqes << "/" << call << "(" << (((double)sqes) / call) << ")" << std::endl;
510                                // Reset to some random number of completions
511                                // use the ring_fd in the number of threads don't all print at once
512                                reset = 100000 + (100000 * (ring->ring_fd % 5));
513                        }
514
515                        // Keep track of how many cqes we have seen
516                        count++;
517                }
518
519                // Mark the cqes as seen
520                io_uring_cq_advance(ring, count);
521        }
522
523        // Tally all the thread local statistics
524        __atomic_fetch_add( &global_stats.completions.conns, ::stats.completions.conns, __ATOMIC_SEQ_CST );
525        __atomic_fetch_add( &global_stats.completions.reads, ::stats.completions.reads, __ATOMIC_SEQ_CST );
526        __atomic_fetch_add( &global_stats.completions.writes, ::stats.completions.writes, __ATOMIC_SEQ_CST );
527        __atomic_fetch_add( &global_stats.completions.full_writes, ::stats.completions.full_writes, __ATOMIC_SEQ_CST );
528        __atomic_fetch_add( &global_stats.errors.conns, ::stats.errors.conns, __ATOMIC_SEQ_CST );
529        __atomic_fetch_add( &global_stats.errors.requests.pipes, ::stats.errors.requests.pipes, __ATOMIC_SEQ_CST );
530        __atomic_fetch_add( &global_stats.errors.requests.reset, ::stats.errors.requests.reset, __ATOMIC_SEQ_CST );
531        __atomic_fetch_add( &global_stats.errors.requests.other, ::stats.errors.requests.other, __ATOMIC_SEQ_CST );
532        __atomic_fetch_add( &global_stats.errors.answers.pipes, ::stats.errors.answers.pipes, __ATOMIC_SEQ_CST );
533        __atomic_fetch_add( &global_stats.errors.answers.reset, ::stats.errors.answers.reset, __ATOMIC_SEQ_CST );
534        __atomic_fetch_add( &global_stats.errors.answers.other, ::stats.errors.answers.other, __ATOMIC_SEQ_CST );
535        __atomic_fetch_add( &global_stats.conns.current, ::stats.conns.current, __ATOMIC_SEQ_CST );
536        __atomic_fetch_add( &global_stats.conns.max, ::stats.conns.max, __ATOMIC_SEQ_CST );
537        __atomic_fetch_add( &global_stats.conns.used, ::stats.conns.used, __ATOMIC_SEQ_CST );
538
539        return nullptr;
540}
541
542//=========================================================
543#include <bit> // for ispow2
544
545extern "C" {
546        #include <pthread.h>      // for pthreads
547        #include <signal.h>       // for signal(SIGPIPE, SIG_IGN);
548        #include <sys/socket.h>   // for sockets in general
549        #include <netinet/in.h>   // for sockaddr_in, AF_INET
550}
551
552int main(int argc, char * argv[]) {
553        // Initialize the array of connection-fd associations
554        for(int i = 0; i < array_max; i++) {
555                conns[i] = nullptr;
556        }
557
558        // Make sure we ignore all sigpipes
559        signal(SIGPIPE, SIG_IGN);
560
561        // Default command line arguments
562        unsigned nthreads = 1;      // number of kernel threads
563        unsigned port = 8800;       // which port to listen on
564        unsigned entries = 256;     // number of entries per ring/kernel thread
565        unsigned backlog = 262144;  // backlog argument to listen
566        unsigned preaccept = 1;     // start by accepting X per threads
567        bool attach = false;        // Whether or not to attach all the rings
568        bool sqpoll = false;        // Whether or not to use SQ Polling
569
570        //===================
571        // Arguments Parsing
572        int c;
573        while ((c = getopt (argc, argv, "t:p:e:b:c:aS")) != -1) {
574                switch (c)
575                {
576                case 't':
577                        nthreads = atoi(optarg);
578                        break;
579                case 'p':
580                        port = atoi(optarg);
581                        break;
582                case 'e':
583                        entries = atoi(optarg);
584                        break;
585                case 'b':
586                        backlog = atoi(optarg);
587                        break;
588                case 'c':
589                        preaccept = atoi(optarg);
590                        break;
591                case 'a':
592                        attach = true;
593                        break;
594                case 'S':
595                        sqpoll = true;
596                        break;
597                case '?':
598                default:
599                        std::cerr << "Usage: -t <threads> -p <port> -e <entries> -b <backlog> -aS" << std::endl;
600                        return EXIT_FAILURE;
601                }
602        }
603
604        if( !std::ispow2(entries) ) {
605                unsigned v = entries;
606                v--;
607                v |= v >> 1;
608                v |= v >> 2;
609                v |= v >> 4;
610                v |= v >> 8;
611                v |= v >> 16;
612                v++;
613                std::cerr << "Warning: num_entries not a power of 2 (" << entries << ") raising to " << v << std::endl;
614                entries = v;
615        }
616
617        //===================
618        // End FD
619        // Create a single event fd to notify the kernel threads when the server shutsdown
620        int efd = eventfd(0, EFD_SEMAPHORE);
621        if (efd < 0) {
622                std::cerr << "eventfd error: (" << errno << ") " << strerror(errno) << std::endl;
623                exit(EXIT_FAILURE);
624        }
625
626        //===================
627        // Open Socket
628        // Listen on specified port
629        std::cout << getpid() << " : Listening on port " << port << std::endl;
630        int server_fd = socket(AF_INET, SOCK_STREAM, 0);
631        if(server_fd < 0) {
632                std::cerr << "socket error: (" << errno << ") " << strerror(errno) << std::endl;
633                exit(EXIT_FAILURE);
634        }
635
636        int ret = 0;
637        struct sockaddr_in address;
638        int addrlen = sizeof(address);
639        memset( (char *)&address, '\0', addrlen );
640        address.sin_family = AF_INET;
641        address.sin_addr.s_addr = htonl(INADDR_ANY);
642        address.sin_port = htons( port );
643
644        // In case the port is already in use, don't just return an error
645        // Linux is very slow at reclaiming port so just retry regularly
646        int waited = 0;
647        while(true) {
648                ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) );
649                if(ret < 0) {
650                        if(errno == EADDRINUSE) {
651                                // Port is in used let's retry later
652                                if(waited == 0) {
653                                        std::cerr << "Waiting for port" << std::endl;
654                                } else {
655                                        // To be cure, print how long we have been waiting
656                                        std::cerr << "\r" << waited;
657                                        std::cerr.flush();
658                                }
659                                waited ++;
660                                usleep( 1000000 ); // Wait and retry
661                                continue;
662                        }
663                        // Some other error occured, this is a real error
664                        std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl;
665                        exit(EXIT_FAILURE);
666                }
667                break;
668        }
669
670        ret = listen( server_fd, backlog );
671        if(ret < 0) {
672                std::cerr << "listen error: (" << errno << ") " << strerror(errno) << std::endl;
673                exit(EXIT_FAILURE);
674        }
675
676        //===================
677        // Run Server Threads
678        std::cout << "Starting " << nthreads << " Threads";
679        if(attach) {
680                std::cout << " with attached Rings";
681        }
682        std::cout << std::endl;
683
684        // Create the desired number of kernel-threads and for each
685        // create a ring. Create the rings in the main so we can attach them
686        // Since the rings are all in a dense VLA, aligned them so we don't get false sharing
687        // it's unlikely but better safe than sorry
688        struct __attribute__((aligned(128))) aligned_ring {
689                struct io_uring storage;
690        };
691        aligned_ring thrd_rings[nthreads];
692        pthread_t    thrd_hdls[nthreads];
693        options_t    thrd_opts[nthreads];
694        bool no_drops  = true;
695        bool fast_poll = true;
696        bool nfix_sqpl = true;
697        for(unsigned i = 0; i < nthreads; i++) {
698                struct io_uring_params p = { };
699
700                if(sqpoll) { // If sqpoll is on, add the flag
701                        p.flags |= IORING_SETUP_SQPOLL;
702                        p.sq_thread_idle = 100;
703                }
704
705                if (attach && i != 0) { // If attach is on, add the flag, except for the first ring
706                        p.flags |= IORING_SETUP_ATTACH_WQ;
707                        p.wq_fd = thrd_rings[0].storage.ring_fd;
708                }
709
710                // Create the ring
711                io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p);
712
713                // Check if some of the note-worthy features are there
714                if(0 == (p.features & IORING_FEAT_NODROP         )) { no_drops  = false; }
715                if(0 == (p.features & IORING_FEAT_FAST_POLL      )) { fast_poll = false; }
716                if(0 == (p.features & IORING_FEAT_SQPOLL_NONFIXED)) { nfix_sqpl = false; }
717
718                // Write the socket options we want to the options we pass to the threads
719                thrd_opts[i].acpt.sockfd  = server_fd;
720                thrd_opts[i].acpt.addr    = (struct sockaddr *)&address;
721                thrd_opts[i].acpt.addrlen = (socklen_t*)&addrlen;
722                thrd_opts[i].acpt.flags   = 0;
723                thrd_opts[i].acpt.cnt     = preaccept;
724                thrd_opts[i].endfd        = efd;
725                thrd_opts[i].ring         = &thrd_rings[i].storage;
726
727                int ret = pthread_create(&thrd_hdls[i], nullptr, proc_loop, &thrd_opts[i]);
728                if (ret < 0) {
729                        std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
730                        exit(EXIT_FAILURE);
731                }
732        }
733
734        // Tell the user if the features are present
735        if( no_drops ) std::cout << "No Drop Present" << std::endl;
736        if( fast_poll) std::cout << "Fast Poll Present" << std::endl;
737        if(!nfix_sqpl) std::cout << "Non-Fixed SQ Poll not Present" << std::endl;
738
739        //===================
740        // Server Started
741        std::cout << "Server Started" << std::endl;
742        {
743                char buffer[128];
744                int ret;
745                do {
746                        // Wait for a Ctrl-D to close the server
747                        ret = read(STDIN_FILENO, buffer, 128);
748                        if(ret < 0) {
749                                std::cerr << "main read error: (" << errno << ") " << strerror(errno) << std::endl;
750                                exit(EXIT_FAILURE);
751                        }
752                        else if(ret > 0) {
753                                std::cout << "User inputed '";
754                                std::cout.write(buffer, ret);
755                                std::cout << "'" << std::endl;
756                        }
757                } while(ret != 0);
758
759                std::cout << "Shutdown received" << std::endl;
760        }
761
762        //===================
763        // Use eventfd_write to tell the threads we are closing
764        (std::cout << "Sending Shutdown to Threads... ").flush();
765        ret = eventfd_write(efd, nthreads);
766        if (ret < 0) {
767                std::cerr << "eventfd close error: (" << errno << ") " << strerror(errno) << std::endl;
768                exit(EXIT_FAILURE);
769        }
770        std::cout << "done" << std::endl;
771
772        //===================
773        // Join all the threads and close the rings
774        (std::cout << "Stopping Threads Done... ").flush();
775        for(unsigned i = 0; i < nthreads; i++) {
776                void * retval;
777                int ret = pthread_join(thrd_hdls[i], &retval);
778                if (ret < 0) {
779                        std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
780                        exit(EXIT_FAILURE);
781                }
782
783                io_uring_queue_exit(thrd_opts[i].ring);
784        }
785        std::cout << "done" << std::endl;
786
787        //===================
788        // Close the sockets
789        (std::cout << "Closing Socket... ").flush();
790        ret = shutdown( server_fd, SHUT_RD );
791        if( ret < 0 ) {
792                std::cerr << "shutdown socket error: (" << errno << ") " << strerror(errno) << std::endl;
793                exit(EXIT_FAILURE);
794        }
795
796        ret = close(server_fd);
797        if (ret < 0) {
798                std::cerr << "close socket error: (" << errno << ") " << strerror(errno) << std::endl;
799                exit(EXIT_FAILURE);
800        }
801        std::cout << "done" << std::endl << std::endl;
802
803        // Print stats and exit
804        std::cout << "Errors: " << global_stats.errors.conns << "c, (" << global_stats.errors.requests.pipes << "p, " << global_stats.errors.requests.reset << "r, " << global_stats.errors.requests.other << "o" << ")r, (" << global_stats.errors.answers.pipes << "p, " << global_stats.errors.answers.reset << "r, " << global_stats.errors.answers.other << "o" << ")a" << std::endl;
805        std::cout << "Completions: " << global_stats.completions.conns << "c, " << global_stats.completions.reads << "r, " << global_stats.completions.writes << "w" << std::endl;
806        std::cout << "Full Writes: " << global_stats.completions.full_writes << std::endl;
807        std::cout << "Max FD: " << max_fd << std::endl;
808        std::cout << "Successful connections: " << global_stats.conns.used << std::endl;
809        std::cout << "Max concurrent connections: " << global_stats.conns.max << std::endl;
810        std::cout << "Accepts on non-zeros: " << global_stats.recycle_errors << std::endl;
811        std::cout << "Leaked conn objects: " << global_stats.conns.current << std::endl;
812}
813
814// compile-command: "g++ http_ring.cpp -std=c++2a -pthread -luring -O3" //
Note: See TracBrowser for help on using the repository browser.