source: benchmark/io/http/http_ring.cpp @ 95b3a9c

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 95b3a9c was c235179, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Added the option to pre-accept in httpring, this seems to have a major impact.

  • Property mode set to 100644
File size: 23.2 KB
Line 
1#include <cstdio>
2#include <cstdlib>
3#include <cstring>
4
5#include <iostream>
6
7#include <signal.h>
8#include <unistd.h>
9#include <liburing.h>
10
11// #define NOBATCHING
12// #define USE_ASYNC
13
14// Options passed to each threads
15struct __attribute__((aligned(128))) options_t {
16        // Data passed to accept
17        struct {
18                int sockfd;
19                struct sockaddr *addr;
20                socklen_t *addrlen;
21                int flags;
22                unsigned cnt;
23        } acpt;
24
25        // Termination notification
26        int endfd;
27
28        // The ring to use for io
29        struct io_uring * ring;
30};
31
32//=========================================================
33// General statistics
34struct __attribute__((aligned(128))) stats_block_t {
35        struct {
36                volatile size_t conns = 0;
37                volatile size_t reads = 0;
38                volatile size_t writes = 0;
39                volatile size_t full_writes = 0;
40        } completions;
41
42        struct {
43                volatile size_t conns = 0;
44                struct {
45                        volatile size_t pipes = 0;
46                        volatile size_t reset = 0;
47                        volatile size_t other = 0;
48                } requests;
49
50                struct {
51                        volatile size_t pipes = 0;
52                        volatile size_t reset = 0;
53                        volatile size_t other = 0;
54                } answers;
55        } errors;
56
57        struct {
58                volatile size_t current = 0;
59                volatile size_t max = 0;
60                volatile size_t used = 0;
61        } conns;
62
63        volatile size_t recycle_errors = 0;
64};
65
66// Each thread gets its own block of stats
67// and there is a global block for tallying at the end
68thread_local stats_block_t stats;
69stats_block_t global_stats;
70
71// Get an array of current connections
72// This is just for debugging, to make sure
73// no two state-machines get the same fd
74const size_t array_max = 25000;
75class connection * volatile conns[array_max] = { 0 };
76
77// Max fd we've seen, keep track so it's convenient to adjust the array size after
78volatile int max_fd = 0;
79
80//=========================================================
81// Some small wrappers for ring operations used outside the connection state machine
82// get sqe + error handling
83static struct io_uring_sqe * get_sqe(struct io_uring * ring) {
84        struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
85        if(!sqe) {
86                std::cerr << "Insufficient entries in ring" << std::endl;
87                exit(EXIT_FAILURE);
88        }
89        return sqe;
90}
91
92// read of the event fd is not done by a connection
93// use nullptr as the user data
94static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) {
95        struct io_uring_sqe * sqe = get_sqe(ring);
96        io_uring_prep_read(sqe, fd, buffer, len, 0);
97        io_uring_sqe_set_data(sqe, nullptr);
98        io_uring_submit(ring);
99}
100
101//=========================================================
102// All answers are fixed and determined by the return code
103enum HttpCode {
104        OK200 = 0,
105        E400,
106        E404,
107        E405,
108        E408,
109        E413,
110        E414,
111        KNOWN_CODES
112};
113
114// Get a fix reply based on the return code
115const char * http_msgs[] = {
116        "HTTP/1.1 200 OK\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 15\r\nConnection: keep-alive\r\n\r\nHello, World!\r\n",
117        "HTTP/1.1 400 Bad Request\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
118        "HTTP/1.1 404 Not Found\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
119        "HTTP/1.1 405 Method Not \r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
120        "HTTP/1.1 408 Request Timeout\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
121        "HTTP/1.1 413 Payload Too Large\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
122        "HTTP/1.1 414 URI Too Long\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
123};
124static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0])) );
125
126// Pre-compute the length of these replys
127const size_t http_lens[] = {
128        strlen(http_msgs[0]),
129        strlen(http_msgs[1]),
130        strlen(http_msgs[2]),
131        strlen(http_msgs[3]),
132        strlen(http_msgs[4]),
133        strlen(http_msgs[5]),
134        strlen(http_msgs[6]),
135};
136static_assert( KNOWN_CODES == (sizeof(http_lens) / sizeof(http_lens[0])) );
137
138//=========================================================
139// Finate state machine responsible for handling each connection
140class __attribute__((aligned(128))) connection {
141private:
142        // The state of the machine
143        enum {
144                ACCEPTING,  // Accept sent waiting for connection
145                REQUESTING, // Waiting for new request
146                ANSWERING,  // Either request received submitting answer or short answer sent, need to submit rest
147        } state;
148
149        // The file descriptor of the connection
150        int fd;
151
152        // request data
153        static const size_t buffer_size = 1024; // Size of the read buffer
154        const char * buffer;                      // Buffer into which requests are read
155
156        // send data
157        size_t to_send;         // Data left to send
158        const char * iterator;  // Pointer to rest of the message to send
159
160        // stats
161        // how many requests/answers were complete, that is, a valid cqe was obtained
162        struct {
163                size_t requests = 0;
164                size_t answers = 0;
165        } stats;
166
167private:
168        connection()
169                : state(ACCEPTING)
170                , fd(0)
171                , buffer( new char[buffer_size])
172                , iterator(nullptr)
173        {}
174
175        ~connection() {
176                delete [] buffer;
177                ::stats.conns.current--;
178        }
179
180        // Close the current connection
181        void close(int err) {
182                // std::cout << "(" << this->stats.requests << "," << this->stats.answers << ", e" << err << ") ";
183                conns[fd] = nullptr;
184
185                if(fd != 0) {
186                        ::close(fd);
187                }
188                delete this;
189        }
190
191        //--------------------------------------------------
192        // Wrappers for submit so we can tweak it more easily
193        static void submit(struct io_uring * ring, struct io_uring_sqe * sqe, connection * conn) {
194                (void)ring;
195                #ifdef USE_ASYNC
196                        io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
197                #endif
198                io_uring_sqe_set_data(sqe, conn);
199                #ifdef NOBATCHING
200                        io_uring_submit(ring);
201                #endif
202        }
203
204        void submit(struct io_uring * ring, struct io_uring_sqe * sqe) {
205                submit(ring, sqe, this);
206        }
207
208        //--------------------------------------------------
209        // get a new request from the client
210        void request(struct io_uring * ring) {
211                state = REQUESTING;
212                struct io_uring_sqe * sqe = get_sqe(ring);
213                io_uring_prep_recv(sqe, fd, (void*)buffer, buffer_size, 0);
214                submit(ring, sqe);
215        }
216
217        //--------------------------------------------------
218        // Send a new answer based on a return code
219        void answer(struct io_uring * ring, HttpCode code) {
220                iterator = http_msgs[code];
221                to_send  = http_lens[code];
222                if(to_send != 124) {
223                        std::cerr << "Answer has weird size: " << to_send << " (" << (int)code << ")" << std::endl;
224                }
225                answer(ring);
226        }
227
228        // send a new answer to the client
229        // Reused for incomplete writes
230        void answer(struct io_uring * ring) {
231                state = ANSWERING;
232                struct io_uring_sqe * sqe = get_sqe(ring);
233                io_uring_prep_send(sqe, fd, iterator, to_send, 0);
234                submit(ring, sqe);
235        }
236
237        //--------------------------------------------------
238        // Handle a new connection, results for getting an cqe while in the ACCEPTING state
239        void newconn(struct io_uring * ring, int ret) {
240                // Check errors
241                if( ret < 0 ) {
242                        int err = -ret;
243                        if( err == ECONNABORTED ) {
244                                ::stats.errors.conns++;
245                                this->close(err);
246                                return;
247                        }
248                        std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
249                        exit(EXIT_FAILURE);
250                }
251
252                // Count the connections
253                ::stats.completions.conns++;
254                ::stats.conns.current++;
255                if(::stats.conns.current > ::stats.conns.max) {
256                        ::stats.conns.max = ::stats.conns.current;
257                }
258
259                // Read on the data
260                fd = ret;
261                request(ring);
262
263                // check the max fd so we know if we exceeded the array
264                for(;;) {
265                        int expected = max_fd;
266                        if(expected >= fd) return;
267                        if( __atomic_compare_exchange_n(&max_fd, &expected, fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) return;
268                }
269
270                // check if we have enough space to fit inside the array
271                if(fd >= array_max) {
272                        std::cerr << "accept error: fd " << fd << " is too high" << std::endl;
273                        return;
274                }
275
276                // Put our connection into the global array
277                // No one else should be using it so if they are that's a bug
278                auto exist = __atomic_exchange_n( &conns[fd], this, __ATOMIC_SEQ_CST);
279                if( exist ) {
280                        size_t first = __atomic_fetch_add(&global_stats.recycle_errors, 1, __ATOMIC_SEQ_CST);
281                        if( first == 0 ) {
282                                std::cerr << "First: accept has existing connection " << std::endl;
283                        }
284                }
285        }
286
287        // Handle a new request, results for getting an cqe while in the REQUESTING state
288        void newrequest(struct io_uring * ring, int res) {
289                // Check errors
290                if( res < 0 ) {
291                        int err = -res;
292                        switch(err) {
293                                case EPIPE:
294                                        ::stats.errors.requests.pipes++;
295                                        break;
296                                        // Don't fall through the get better stats
297                                case ECONNRESET:
298                                        ::stats.errors.requests.reset++;
299                                        break;
300                                default:
301                                        ::stats.errors.requests.other++;
302                                        std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl;
303                                        exit(EXIT_FAILURE);
304                        }
305
306                        // Connection failed, close it
307                        this->close(err);
308                        return;
309                }
310
311                // Update stats
312                ::stats.completions.reads++;
313
314                // Is this an EOF
315                if(res == 0) {
316                        // Yes, close the connection
317                        this->close(0);
318                        return;
319                }
320
321                // Find the end of the request header
322                const char * it = buffer;
323                if( !strstr( it, "\r\n\r\n" ) ) {
324                        // This state machine doesn't support incomplete reads
325                        // Print them to output so it's clear there is an issue
326                        std::cout << "Incomplete request" << std::endl;
327                        this->close(EBADR);
328                        return;
329                }
330
331                // Find the method to use
332                it = buffer;
333                int ret = memcmp(it, "GET ", 4);
334                if( ret != 0 ) {
335                        // We only support get, answer with an error
336                        answer(ring, E400);
337                        return;
338                }
339
340                // Find the target
341                it += 4;
342                ret = memcmp(it, "/plaintext", 10);
343                if( ret != 0 ) {
344                        // We only support /plaintext, answer with an error
345                        answer(ring, E404);
346                        return;
347                }
348
349                // Correct request, answer with the payload
350                this->stats.requests++;
351                answer(ring, OK200);
352        }
353
354        // Handle a partial or full answer sent, results for getting an cqe while in the ANSWERING state
355        void writedone(struct io_uring * ring, int res) {
356                // Check errors
357                if( res < 0 ) {
358                        int err = -res;
359                        switch(err) {
360                                case EPIPE:
361                                        ::stats.errors.answers.pipes++;
362                                        break;
363                                        // Don't fall through the get better stats
364                                case ECONNRESET:
365                                        ::stats.errors.answers.reset++;
366                                        break;
367                                default:
368                                        ::stats.errors.answers.other++;
369                                        std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
370                                        exit(EXIT_FAILURE);
371                        }
372
373                        this->close(err);
374                        return;
375                }
376
377                // Update stats
378                ::stats.completions.writes++;
379                if(res == 124) ::stats.completions.full_writes++;
380
381                // Is this write completed
382                if( res == to_send ) {
383                        // Yes, more stats
384                        this->stats.answers++;
385                        if(this->stats.answers == 1) ::stats.conns.used++;
386                        // Then read a new request
387                        request(ring);
388                        return;
389                }
390
391                // Not a completed read, push the rest
392                to_send -= res;
393                iterator += res;
394                answer(ring);
395        }
396public:
397        // Submit a call to accept and create a new connection object
398        static void accept(struct io_uring * ring, const struct options_t & opt) {
399                struct io_uring_sqe * sqe = get_sqe(ring);
400                io_uring_prep_accept(sqe, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
401                submit(ring, sqe, new connection());
402                // std::cout << "Submitted accept: " << req << std::endl;
403        }
404
405        // Handle a new cqe
406        void handle(struct io_uring * ring, int res, const struct options_t & opt) {
407                switch(state) {
408                case ACCEPTING:
409                        connection::accept(ring, opt);
410                        newconn(ring, res);
411                        break;
412                case REQUESTING:
413                        newrequest(ring, res);
414                        break;
415                case ANSWERING:
416                        writedone(ring, res);
417                        break;
418                }
419        }
420};
421
422//=========================================================
423// Main loop of the WebServer
424// Effectively uses one thread_local copy of everything per kernel thread
425void * proc_loop(void * arg) {
426        // Get the thread local argument
427        struct options_t & opt = *(struct options_t *)arg;
428        struct io_uring * ring = opt.ring;
429
430        // Track the shutdown using a event_fd
431        char endfd_buf[8];
432        ring_end(ring, opt.endfd, endfd_buf, 8);
433
434        // Accept our first connection
435        // May not take effect until io_uring_submit_and_wait
436        for(unsigned i = 0; i < opt.acpt.cnt; i++) {
437                connection::accept(ring, opt);
438        }
439
440        int reset = 1;       // Counter to print stats once in a while
441        bool done = false;   // Are we done
442        size_t sqes = 0;     // Number of sqes we submitted
443        size_t call = 0;     // Number of submits we made
444        while(!done) {
445                // Submit all the answers we have and wait for responses
446                int ret = io_uring_submit_and_wait(ring, 1);
447
448                // check errors
449                if (ret < 0) {
450                        fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );
451                        exit(EXIT_FAILURE);
452                }
453
454                // Check how good we are at batching sqes
455                sqes += ret;
456                call++;
457
458                struct io_uring_cqe *cqe;
459                unsigned head;
460                unsigned count = 0;
461
462                // go through all cqes
463                io_uring_for_each_cqe(ring, head, cqe) {
464                        if (0 == cqe->user_data) {
465                                done = true;
466                                break;
467                        }
468
469                        auto req = (class connection *)cqe->user_data;
470                        req->handle( ring, cqe->res, opt );
471
472                        // Every now and then, print some stats
473                        reset--;
474                        if(reset == 0) {
475                                std::cout << "Submit average: " << sqes << "/" << call << "(" << (((double)sqes) / call) << ")" << std::endl;
476                                // Reset to some random number of completions
477                                // use the ring_fd in the number of threads don't all print at once
478                                reset = 100000 + (100000 * (ring->ring_fd % 5));
479                        }
480
481                        // Keep track of how many cqes we have seen
482                        count++;
483                }
484
485                // Mark the cqes as seen
486                io_uring_cq_advance(ring, count);
487        }
488
489        // Tally all the thread local statistics
490        __atomic_fetch_add( &global_stats.completions.conns, ::stats.completions.conns, __ATOMIC_SEQ_CST );
491        __atomic_fetch_add( &global_stats.completions.reads, ::stats.completions.reads, __ATOMIC_SEQ_CST );
492        __atomic_fetch_add( &global_stats.completions.writes, ::stats.completions.writes, __ATOMIC_SEQ_CST );
493        __atomic_fetch_add( &global_stats.completions.full_writes, ::stats.completions.full_writes, __ATOMIC_SEQ_CST );
494        __atomic_fetch_add( &global_stats.errors.conns, ::stats.errors.conns, __ATOMIC_SEQ_CST );
495        __atomic_fetch_add( &global_stats.errors.requests.pipes, ::stats.errors.requests.pipes, __ATOMIC_SEQ_CST );
496        __atomic_fetch_add( &global_stats.errors.requests.reset, ::stats.errors.requests.reset, __ATOMIC_SEQ_CST );
497        __atomic_fetch_add( &global_stats.errors.requests.other, ::stats.errors.requests.other, __ATOMIC_SEQ_CST );
498        __atomic_fetch_add( &global_stats.errors.answers.pipes, ::stats.errors.answers.pipes, __ATOMIC_SEQ_CST );
499        __atomic_fetch_add( &global_stats.errors.answers.reset, ::stats.errors.answers.reset, __ATOMIC_SEQ_CST );
500        __atomic_fetch_add( &global_stats.errors.answers.other, ::stats.errors.answers.other, __ATOMIC_SEQ_CST );
501        __atomic_fetch_add( &global_stats.conns.current, ::stats.conns.current, __ATOMIC_SEQ_CST );
502        __atomic_fetch_add( &global_stats.conns.max, ::stats.conns.max, __ATOMIC_SEQ_CST );
503        __atomic_fetch_add( &global_stats.conns.used, ::stats.conns.used, __ATOMIC_SEQ_CST );
504
505        return nullptr;
506}
507
508//=========================================================
509#include <bit> // for ispow2
510
511extern "C" {
512        #include <pthread.h>      // for pthreads
513        #include <signal.h>       // for signal(SIGPIPE, SIG_IGN);
514        #include <sys/eventfd.h>  // use for termination
515        #include <sys/socket.h>   // for sockets in general
516        #include <netinet/in.h>   // for sockaddr_in, AF_INET
517}
518
519int main(int argc, char * argv[]) {
520        // Initialize the array of connection-fd associations
521        for(int i = 0; i < array_max; i++) {
522                conns[i] = nullptr;
523        }
524
525        // Make sure we ignore all sigpipes
526        signal(SIGPIPE, SIG_IGN);
527
528        // Default command line arguments
529        unsigned nthreads = 1;      // number of kernel threads
530        unsigned port = 8800;       // which port to listen on
531        unsigned entries = 256;     // number of entries per ring/kernel thread
532        unsigned backlog = 262144;  // backlog argument to listen
533        unsigned preaccept = 1;     // start by accepting X per threads
534        bool attach = false;        // Whether or not to attach all the rings
535        bool sqpoll = false;        // Whether or not to use SQ Polling
536
537        //===================
538        // Arguments Parsing
539        int c;
540        while ((c = getopt (argc, argv, "t:p:e:b:c:aS")) != -1) {
541                switch (c)
542                {
543                case 't':
544                        nthreads = atoi(optarg);
545                        break;
546                case 'p':
547                        port = atoi(optarg);
548                        break;
549                case 'e':
550                        entries = atoi(optarg);
551                        break;
552                case 'b':
553                        backlog = atoi(optarg);
554                        break;
555                case 'c':
556                        preaccept = atoi(optarg);
557                        break;
558                case 'a':
559                        attach = true;
560                        break;
561                case 'S':
562                        sqpoll = true;
563                        break;
564                case '?':
565                default:
566                        std::cerr << "Usage: -t <threads> -p <port> -e <entries> -b <backlog> -aS" << std::endl;
567                        return EXIT_FAILURE;
568                }
569        }
570
571        if( !std::ispow2(entries) ) {
572                unsigned v = entries;
573                v--;
574                v |= v >> 1;
575                v |= v >> 2;
576                v |= v >> 4;
577                v |= v >> 8;
578                v |= v >> 16;
579                v++;
580                std::cerr << "Warning: num_entries not a power of 2 (" << entries << ") raising to " << v << std::endl;
581                entries = v;
582        }
583
584        //===================
585        // End FD
586        // Create a single event fd to notify the kernel threads when the server shutsdown
587        int efd = eventfd(0, EFD_SEMAPHORE);
588        if (efd < 0) {
589                std::cerr << "eventfd error: (" << errno << ") " << strerror(errno) << std::endl;
590                exit(EXIT_FAILURE);
591        }
592
593        //===================
594        // Open Socket
595        // Listen on specified port
596        std::cout << getpid() << " : Listening on port " << port << std::endl;
597        int server_fd = socket(AF_INET, SOCK_STREAM, 0);
598        if(server_fd < 0) {
599                std::cerr << "socket error: (" << errno << ") " << strerror(errno) << std::endl;
600                exit(EXIT_FAILURE);
601        }
602
603        int ret = 0;
604        struct sockaddr_in address;
605        int addrlen = sizeof(address);
606        memset( (char *)&address, '\0', addrlen );
607        address.sin_family = AF_INET;
608        address.sin_addr.s_addr = htonl(INADDR_ANY);
609        address.sin_port = htons( port );
610
611        // In case the port is already in use, don't just return an error
612        // Linux is very slow at reclaiming port so just retry regularly
613        int waited = 0;
614        while(true) {
615                ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) );
616                if(ret < 0) {
617                        if(errno == EADDRINUSE) {
618                                // Port is in used let's retry later
619                                if(waited == 0) {
620                                        std::cerr << "Waiting for port" << std::endl;
621                                } else {
622                                        // To be cure, print how long we have been waiting
623                                        std::cerr << "\r" << waited;
624                                        std::cerr.flush();
625                                }
626                                waited ++;
627                                usleep( 1000000 ); // Wait and retry
628                                continue;
629                        }
630                        // Some other error occured, this is a real error
631                        std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl;
632                        exit(EXIT_FAILURE);
633                }
634                break;
635        }
636
637        ret = listen( server_fd, backlog );
638        if(ret < 0) {
639                std::cerr << "listen error: (" << errno << ") " << strerror(errno) << std::endl;
640                exit(EXIT_FAILURE);
641        }
642
643        //===================
644        // Run Server Threads
645        std::cout << "Starting " << nthreads << " Threads";
646        if(attach) {
647                std::cout << " with attached Rings";
648        }
649        std::cout << std::endl;
650
651        // Create the desired number of kernel-threads and for each
652        // create a ring. Create the rings in the main so we can attach them
653        // Since the rings are all in a dense VLA, aligned them so we don't get false sharing
654        // it's unlikely but better safe than sorry
655        struct __attribute__((aligned(128))) aligned_ring {
656                struct io_uring storage;
657        };
658        aligned_ring thrd_rings[nthreads];
659        pthread_t    thrd_hdls[nthreads];
660        options_t    thrd_opts[nthreads];
661        bool no_drops  = true;
662        bool fast_poll = true;
663        bool nfix_sqpl = true;
664        for(unsigned i = 0; i < nthreads; i++) {
665                struct io_uring_params p = { };
666
667                if(sqpoll) { // If sqpoll is on, add the flag
668                        p.flags |= IORING_SETUP_SQPOLL;
669                        p.sq_thread_idle = 100;
670                }
671
672                if (attach && i != 0) { // If attach is on, add the flag, except for the first ring
673                        p.flags |= IORING_SETUP_ATTACH_WQ;
674                        p.wq_fd = thrd_rings[0].storage.ring_fd;
675                }
676
677                // Create the ring
678                io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p);
679
680                // Check if some of the note-worthy features are there
681                if(0 == (p.features & IORING_FEAT_NODROP         )) { no_drops  = false; }
682                if(0 == (p.features & IORING_FEAT_FAST_POLL      )) { fast_poll = false; }
683                if(0 == (p.features & IORING_FEAT_SQPOLL_NONFIXED)) { nfix_sqpl = false; }
684
685                // Write the socket options we want to the options we pass to the threads
686                thrd_opts[i].acpt.sockfd  = server_fd;
687                thrd_opts[i].acpt.addr    = (struct sockaddr *)&address;
688                thrd_opts[i].acpt.addrlen = (socklen_t*)&addrlen;
689                thrd_opts[i].acpt.flags   = 0;
690                thrd_opts[i].acpt.cnt     = preaccept;
691                thrd_opts[i].endfd        = efd;
692                thrd_opts[i].ring         = &thrd_rings[i].storage;
693
694                int ret = pthread_create(&thrd_hdls[i], nullptr, proc_loop, &thrd_opts[i]);
695                if (ret < 0) {
696                        std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
697                        exit(EXIT_FAILURE);
698                }
699        }
700
701        // Tell the user if the features are present
702        if( no_drops ) std::cout << "No Drop Present" << std::endl;
703        if( fast_poll) std::cout << "Fast Poll Present" << std::endl;
704        if(!nfix_sqpl) std::cout << "Non-Fixed SQ Poll not Present" << std::endl;
705
706        //===================
707        // Server Started
708        std::cout << "Server Started" << std::endl;
709        {
710                char buffer[128];
711                int ret;
712                do {
713                        // Wait for a Ctrl-D to close the server
714                        ret = read(STDIN_FILENO, buffer, 128);
715                        if(ret < 0) {
716                                std::cerr << "main read error: (" << errno << ") " << strerror(errno) << std::endl;
717                                exit(EXIT_FAILURE);
718                        }
719                        else if(ret > 0) {
720                                std::cout << "User inputed '";
721                                std::cout.write(buffer, ret);
722                                std::cout << "'" << std::endl;
723                        }
724                } while(ret != 0);
725
726                std::cout << "Shutdown received" << std::endl;
727        }
728
729        //===================
730        // Use eventfd_write to tell the threads we are closing
731        (std::cout << "Sending Shutdown to Threads... ").flush();
732        ret = eventfd_write(efd, nthreads);
733        if (ret < 0) {
734                std::cerr << "eventfd close error: (" << errno << ") " << strerror(errno) << std::endl;
735                exit(EXIT_FAILURE);
736        }
737        std::cout << "done" << std::endl;
738
739        //===================
740        // Join all the threads and close the rings
741        (std::cout << "Stopping Threads Done... ").flush();
742        for(unsigned i = 0; i < nthreads; i++) {
743                void * retval;
744                int ret = pthread_join(thrd_hdls[i], &retval);
745                if (ret < 0) {
746                        std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
747                        exit(EXIT_FAILURE);
748                }
749
750                io_uring_queue_exit(thrd_opts[i].ring);
751        }
752        std::cout << "done" << std::endl;
753
754        //===================
755        // Close the sockets
756        (std::cout << "Closing Socket... ").flush();
757        ret = shutdown( server_fd, SHUT_RD );
758        if( ret < 0 ) {
759                std::cerr << "shutdown socket error: (" << errno << ") " << strerror(errno) << std::endl;
760                exit(EXIT_FAILURE);
761        }
762
763        ret = close(server_fd);
764        if (ret < 0) {
765                std::cerr << "close socket error: (" << errno << ") " << strerror(errno) << std::endl;
766                exit(EXIT_FAILURE);
767        }
768        std::cout << "done" << std::endl << std::endl;
769
770        // Print stats and exit
771        std::cout << "Errors: " << global_stats.errors.conns << "c, (" << global_stats.errors.requests.pipes << "p, " << global_stats.errors.requests.reset << "r, " << global_stats.errors.requests.other << "o" << ")r, (" << global_stats.errors.answers.pipes << "p, " << global_stats.errors.answers.reset << "r, " << global_stats.errors.answers.other << "o" << ")a" << std::endl;
772        std::cout << "Completions: " << global_stats.completions.conns << "c, " << global_stats.completions.reads << "r, " << global_stats.completions.writes << "w" << std::endl;
773        std::cout << "Full Writes: " << global_stats.completions.full_writes << std::endl;
774        std::cout << "Max FD: " << max_fd << std::endl;
775        std::cout << "Successful connections: " << global_stats.conns.used << std::endl;
776        std::cout << "Max concurrent connections: " << global_stats.conns.max << std::endl;
777        std::cout << "Accepts on non-zeros: " << global_stats.recycle_errors << std::endl;
778        std::cout << "Leaked conn objects: " << global_stats.conns.current << std::endl;
779}
780
781// compile-command: "g++ http_ring.cpp -std=c++2a -pthread -luring -O3" //
Note: See TracBrowser for help on using the repository browser.