source: benchmark/io/http/http_ring.cpp @ ef0b456

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since ef0b456 was eeb4866, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Changed read/write to send/recv to work around small bug in io_uring.
Some clean-up.

  • Property mode set to 100644
File size: 23.0 KB
Line 
1#include <cstdio>
2#include <cstdlib>
3#include <cstring>
4
5#include <iostream>
6
7#include <signal.h>
8#include <unistd.h>
9#include <liburing.h>
10
11// #define NOBATCHING
12// #define USE_ASYNC
13
14// Options passed to each threads
15struct __attribute__((aligned(128))) options_t {
16        // Data passed to accept
17        struct {
18                int sockfd;
19                struct sockaddr *addr;
20                socklen_t *addrlen;
21                int flags;
22        } acpt;
23
24        // Termination notification
25        int endfd;
26
27        // The ring to use for io
28        struct io_uring * ring;
29};
30
31//=========================================================
32// General statistics
33struct __attribute__((aligned(128))) stats_block_t {
34        struct {
35                volatile size_t conns = 0;
36                volatile size_t reads = 0;
37                volatile size_t writes = 0;
38                volatile size_t full_writes = 0;
39        } completions;
40
41        struct {
42                volatile size_t conns = 0;
43                struct {
44                        volatile size_t pipes = 0;
45                        volatile size_t reset = 0;
46                        volatile size_t other = 0;
47                } requests;
48
49                struct {
50                        volatile size_t pipes = 0;
51                        volatile size_t reset = 0;
52                        volatile size_t other = 0;
53                } answers;
54        } errors;
55
56        struct {
57                volatile size_t current = 0;
58                volatile size_t max = 0;
59                volatile size_t used = 0;
60        } conns;
61
62        volatile size_t recycle_errors = 0;
63};
64
65// Each thread gets its own block of stats
66// and there is a global block for tallying at the end
67thread_local stats_block_t stats;
68stats_block_t global_stats;
69
70// Get an array of current connections
71// This is just for debugging, to make sure
72// no two state-machines get the same fd
73const size_t array_max = 25000;
74class connection * volatile conns[array_max] = { 0 };
75
76// Max fd we've seen, keep track so it's convenient to adjust the array size after
77volatile int max_fd = 0;
78
79//=========================================================
80// Some small wrappers for ring operations used outside the connection state machine
81// get sqe + error handling
82static struct io_uring_sqe * get_sqe(struct io_uring * ring) {
83        struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
84        if(!sqe) {
85                std::cerr << "Insufficient entries in ring" << std::endl;
86                exit(EXIT_FAILURE);
87        }
88        return sqe;
89}
90
91// read of the event fd is not done by a connection
92// use nullptr as the user data
93static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) {
94        struct io_uring_sqe * sqe = get_sqe(ring);
95        io_uring_prep_read(sqe, fd, buffer, len, 0);
96        io_uring_sqe_set_data(sqe, nullptr);
97        io_uring_submit(ring);
98}
99
100//=========================================================
101// All answers are fixed and determined by the return code
102enum HttpCode {
103        OK200 = 0,
104        E400,
105        E404,
106        E405,
107        E408,
108        E413,
109        E414,
110        KNOWN_CODES
111};
112
113// Get a fix reply based on the return code
114const char * http_msgs[] = {
115        "HTTP/1.1 200 OK\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 15\r\nConnection: keep-alive\r\n\r\nHello, World!\r\n",
116        "HTTP/1.1 400 Bad Request\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
117        "HTTP/1.1 404 Not Found\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
118        "HTTP/1.1 405 Method Not \r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
119        "HTTP/1.1 408 Request Timeout\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
120        "HTTP/1.1 413 Payload Too Large\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
121        "HTTP/1.1 414 URI Too Long\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
122};
123static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0])) );
124
125// Pre-compute the length of these replys
126const size_t http_lens[] = {
127        strlen(http_msgs[0]),
128        strlen(http_msgs[1]),
129        strlen(http_msgs[2]),
130        strlen(http_msgs[3]),
131        strlen(http_msgs[4]),
132        strlen(http_msgs[5]),
133        strlen(http_msgs[6]),
134};
135static_assert( KNOWN_CODES == (sizeof(http_lens) / sizeof(http_lens[0])) );
136
137//=========================================================
138// Finate state machine responsible for handling each connection
139class __attribute__((aligned(128))) connection {
140private:
141        // The state of the machine
142        enum {
143                ACCEPTING,  // Accept sent waiting for connection
144                REQUESTING, // Waiting for new request
145                ANSWERING,  // Either request received submitting answer or short answer sent, need to submit rest
146        } state;
147
148        // The file descriptor of the connection
149        int fd;
150
151        // request data
152        static const size_t buffer_size = 1024; // Size of the read buffer
153        const char * buffer;                      // Buffer into which requests are read
154
155        // send data
156        size_t to_send;         // Data left to send
157        const char * iterator;  // Pointer to rest of the message to send
158
159        // stats
160        // how many requests/answers were complete, that is, a valid cqe was obtained
161        struct {
162                size_t requests = 0;
163                size_t answers = 0;
164        } stats;
165
166private:
167        connection()
168                : state(ACCEPTING)
169                , fd(0)
170                , buffer( new char[buffer_size])
171                , iterator(nullptr)
172        {}
173
174        ~connection() {
175                delete [] buffer;
176                ::stats.conns.current--;
177        }
178
179        // Close the current connection
180        void close(int err) {
181                // std::cout << "(" << this->stats.requests << "," << this->stats.answers << ", e" << err << ") ";
182                conns[fd] = nullptr;
183
184                if(fd != 0) {
185                        ::close(fd);
186                }
187                delete this;
188        }
189
190        //--------------------------------------------------
191        // Wrappers for submit so we can tweak it more easily
192        static void submit(struct io_uring * ring, struct io_uring_sqe * sqe, connection * conn) {
193                (void)ring;
194                #ifdef USE_ASYNC
195                        io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
196                #endif
197                io_uring_sqe_set_data(sqe, conn);
198                #ifdef NOBATCHING
199                        io_uring_submit(ring);
200                #endif
201        }
202
203        void submit(struct io_uring * ring, struct io_uring_sqe * sqe) {
204                submit(ring, sqe, this);
205        }
206
207        //--------------------------------------------------
208        // get a new request from the client
209        void request(struct io_uring * ring) {
210                state = REQUESTING;
211                struct io_uring_sqe * sqe = get_sqe(ring);
212                io_uring_prep_recv(sqe, fd, (void*)buffer, buffer_size, 0);
213                submit(ring, sqe);
214        }
215
216        //--------------------------------------------------
217        // Send a new answer based on a return code
218        void answer(struct io_uring * ring, HttpCode code) {
219                iterator = http_msgs[code];
220                to_send  = http_lens[code];
221                if(to_send != 124) {
222                        std::cerr << "Answer has weird size: " << to_send << " (" << (int)code << ")" << std::endl;
223                }
224                answer(ring);
225        }
226
227        // send a new answer to the client
228        // Reused for incomplete writes
229        void answer(struct io_uring * ring) {
230                state = ANSWERING;
231                struct io_uring_sqe * sqe = get_sqe(ring);
232                io_uring_prep_send(sqe, fd, iterator, to_send, 0);
233                submit(ring, sqe);
234        }
235
236        //--------------------------------------------------
237        // Handle a new connection, results for getting an cqe while in the ACCEPTING state
238        void newconn(struct io_uring * ring, int ret) {
239                // Check errors
240                if( ret < 0 ) {
241                        int err = -ret;
242                        if( err == ECONNABORTED ) {
243                                ::stats.errors.conns++;
244                                this->close(err);
245                                return;
246                        }
247                        std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
248                        exit(EXIT_FAILURE);
249                }
250
251                // Count the connections
252                ::stats.completions.conns++;
253                ::stats.conns.current++;
254                if(::stats.conns.current > ::stats.conns.max) {
255                        ::stats.conns.max = ::stats.conns.current;
256                }
257
258                // Read on the data
259                fd = ret;
260                request(ring);
261
262                // check the max fd so we know if we exceeded the array
263                for(;;) {
264                        int expected = max_fd;
265                        if(expected >= fd) return;
266                        if( __atomic_compare_exchange_n(&max_fd, &expected, fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) return;
267                }
268
269                // check if we have enough space to fit inside the array
270                if(fd >= array_max) {
271                        std::cerr << "accept error: fd " << fd << " is too high" << std::endl;
272                        return;
273                }
274
275                // Put our connection into the global array
276                // No one else should be using it so if they are that's a bug
277                auto exist = __atomic_exchange_n( &conns[fd], this, __ATOMIC_SEQ_CST);
278                if( exist ) {
279                        size_t first = __atomic_fetch_add(&global_stats.recycle_errors, 1, __ATOMIC_SEQ_CST);
280                        if( first == 0 ) {
281                                std::cerr << "First: accept has existing connection " << std::endl;
282                        }
283                }
284        }
285
286        // Handle a new request, results for getting an cqe while in the REQUESTING state
287        void newrequest(struct io_uring * ring, int res) {
288                // Check errors
289                if( res < 0 ) {
290                        int err = -res;
291                        switch(err) {
292                                case EPIPE:
293                                        ::stats.errors.requests.pipes++;
294                                        break;
295                                        // Don't fall through the get better stats
296                                case ECONNRESET:
297                                        ::stats.errors.requests.reset++;
298                                        break;
299                                default:
300                                        ::stats.errors.requests.other++;
301                                        std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl;
302                                        exit(EXIT_FAILURE);
303                        }
304
305                        // Connection failed, close it
306                        this->close(err);
307                        return;
308                }
309
310                // Update stats
311                ::stats.completions.reads++;
312
313                // Is this an EOF
314                if(res == 0) {
315                        // Yes, close the connection
316                        this->close(0);
317                        return;
318                }
319
320                // Find the end of the request header
321                const char * it = buffer;
322                if( !strstr( it, "\r\n\r\n" ) ) {
323                        // This state machine doesn't support incomplete reads
324                        // Print them to output so it's clear there is an issue
325                        std::cout << "Incomplete request" << std::endl;
326                        this->close(EBADR);
327                        return;
328                }
329
330                // Find the method to use
331                it = buffer;
332                int ret = memcmp(it, "GET ", 4);
333                if( ret != 0 ) {
334                        // We only support get, answer with an error
335                        answer(ring, E400);
336                        return;
337                }
338
339                // Find the target
340                it += 4;
341                ret = memcmp(it, "/plaintext", 10);
342                if( ret != 0 ) {
343                        // We only support /plaintext, answer with an error
344                        answer(ring, E404);
345                        return;
346                }
347
348                // Correct request, answer with the payload
349                this->stats.requests++;
350                answer(ring, OK200);
351        }
352
353        // Handle a partial or full answer sent, results for getting an cqe while in the ANSWERING state
354        void writedone(struct io_uring * ring, int res) {
355                // Check errors
356                if( res < 0 ) {
357                        int err = -res;
358                        switch(err) {
359                                case EPIPE:
360                                        ::stats.errors.answers.pipes++;
361                                        break;
362                                        // Don't fall through the get better stats
363                                case ECONNRESET:
364                                        ::stats.errors.answers.reset++;
365                                        break;
366                                default:
367                                        ::stats.errors.answers.other++;
368                                        std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
369                                        exit(EXIT_FAILURE);
370                        }
371
372                        this->close(err);
373                        return;
374                }
375
376                // Update stats
377                ::stats.completions.writes++;
378                if(res == 124) ::stats.completions.full_writes++;
379
380                // Is this write completed
381                if( res == to_send ) {
382                        // Yes, more stats
383                        this->stats.answers++;
384                        if(this->stats.answers == 1) ::stats.conns.used++;
385                        // Then read a new request
386                        request(ring);
387                        return;
388                }
389
390                // Not a completed read, push the rest
391                to_send -= res;
392                iterator += res;
393                answer(ring);
394        }
395public:
396        // Submit a call to accept and create a new connection object
397        static void accept(struct io_uring * ring, const struct options_t & opt) {
398                struct io_uring_sqe * sqe = get_sqe(ring);
399                io_uring_prep_accept(sqe, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
400                submit(ring, sqe, new connection());
401                // std::cout << "Submitted accept: " << req << std::endl;
402        }
403
404        // Handle a new cqe
405        void handle(struct io_uring * ring, int res, const struct options_t & opt) {
406                switch(state) {
407                case ACCEPTING:
408                        connection::accept(ring, opt);
409                        newconn(ring, res);
410                        break;
411                case REQUESTING:
412                        newrequest(ring, res);
413                        break;
414                case ANSWERING:
415                        writedone(ring, res);
416                        break;
417                }
418        }
419};
420
421//=========================================================
422// Main loop of the WebServer
423// Effectively uses one thread_local copy of everything per kernel thread
424void * proc_loop(void * arg) {
425        // Get the thread local argument
426        struct options_t & opt = *(struct options_t *)arg;
427        struct io_uring * ring = opt.ring;
428
429        // Track the shutdown using a event_fd
430        char endfd_buf[8];
431        ring_end(ring, opt.endfd, endfd_buf, 8);
432
433        // Accept our first connection
434        // May not take effect until io_uring_submit_and_wait
435        connection::accept(ring, opt);
436
437        int reset = 1;       // Counter to print stats once in a while
438        bool done = false;   // Are we done
439        size_t sqes = 0;     // Number of sqes we submitted
440        size_t call = 0;     // Number of submits we made
441        while(!done) {
442                // Submit all the answers we have and wait for responses
443                int ret = io_uring_submit_and_wait(ring, 1);
444
445                // check errors
446                if (ret < 0) {
447                        fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );
448                        exit(EXIT_FAILURE);
449                }
450
451                // Check how good we are at batching sqes
452                sqes += ret;
453                call++;
454
455                struct io_uring_cqe *cqe;
456                unsigned head;
457                unsigned count = 0;
458
459                // go through all cqes
460                io_uring_for_each_cqe(ring, head, cqe) {
461                        if (0 == cqe->user_data) {
462                                done = true;
463                                break;
464                        }
465
466                        auto req = (class connection *)cqe->user_data;
467                        req->handle( ring, cqe->res, opt );
468
469                        // Every now and then, print some stats
470                        reset--;
471                        if(reset == 0) {
472                                std::cout << "Submit average: " << sqes << "/" << call << "(" << (((double)sqes) / call) << ")" << std::endl;
473                                // Reset to some random number of completions
474                                // use the ring_fd in the number of threads don't all print at once
475                                reset = 100000 + (100000 * (ring->ring_fd % 5));
476                        }
477
478                        // Keep track of how many cqes we have seen
479                        count++;
480                }
481
482                // Mark the cqes as seen
483                io_uring_cq_advance(ring, count);
484        }
485
486        // Tally all the thread local statistics
487        __atomic_fetch_add( &global_stats.completions.conns, ::stats.completions.conns, __ATOMIC_SEQ_CST );
488        __atomic_fetch_add( &global_stats.completions.reads, ::stats.completions.reads, __ATOMIC_SEQ_CST );
489        __atomic_fetch_add( &global_stats.completions.writes, ::stats.completions.writes, __ATOMIC_SEQ_CST );
490        __atomic_fetch_add( &global_stats.completions.full_writes, ::stats.completions.full_writes, __ATOMIC_SEQ_CST );
491        __atomic_fetch_add( &global_stats.errors.conns, ::stats.errors.conns, __ATOMIC_SEQ_CST );
492        __atomic_fetch_add( &global_stats.errors.requests.pipes, ::stats.errors.requests.pipes, __ATOMIC_SEQ_CST );
493        __atomic_fetch_add( &global_stats.errors.requests.reset, ::stats.errors.requests.reset, __ATOMIC_SEQ_CST );
494        __atomic_fetch_add( &global_stats.errors.requests.other, ::stats.errors.requests.other, __ATOMIC_SEQ_CST );
495        __atomic_fetch_add( &global_stats.errors.answers.pipes, ::stats.errors.answers.pipes, __ATOMIC_SEQ_CST );
496        __atomic_fetch_add( &global_stats.errors.answers.reset, ::stats.errors.answers.reset, __ATOMIC_SEQ_CST );
497        __atomic_fetch_add( &global_stats.errors.answers.other, ::stats.errors.answers.other, __ATOMIC_SEQ_CST );
498        __atomic_fetch_add( &global_stats.conns.current, ::stats.conns.current, __ATOMIC_SEQ_CST );
499        __atomic_fetch_add( &global_stats.conns.max, ::stats.conns.max, __ATOMIC_SEQ_CST );
500        __atomic_fetch_add( &global_stats.conns.used, ::stats.conns.used, __ATOMIC_SEQ_CST );
501
502        return nullptr;
503}
504
505//=========================================================
506#include <bit> // for ispow2
507
508extern "C" {
509        #include <pthread.h>      // for pthreads
510        #include <signal.h>       // for signal(SIGPIPE, SIG_IGN);
511        #include <sys/eventfd.h>  // use for termination
512        #include <sys/socket.h>   // for sockets in general
513        #include <netinet/in.h>   // for sockaddr_in, AF_INET
514}
515
516int main(int argc, char * argv[]) {
517        // Initialize the array of connection-fd associations
518        for(int i = 0; i < array_max; i++) {
519                conns[i] = nullptr;
520        }
521
522        // Make sure we ignore all sigpipes
523        signal(SIGPIPE, SIG_IGN);
524
525        // Default command line arguments
526        unsigned nthreads = 1;      // number of kernel threads
527        unsigned port = 8800;       // which port to listen on
528        unsigned entries = 256;     // number of entries per ring/kernel thread
529        unsigned backlog = 262144;  // backlog argument to listen
530        bool attach = false;        // Whether or not to attach all the rings
531        bool sqpoll = false;        // Whether or not to use SQ Polling
532
533        //===================
534        // Arguments Parsing
535        int c;
536        while ((c = getopt (argc, argv, "t:p:e:b:aS")) != -1) {
537                switch (c)
538                {
539                case 't':
540                        nthreads = atoi(optarg);
541                        break;
542                case 'p':
543                        port = atoi(optarg);
544                        break;
545                case 'e':
546                        entries = atoi(optarg);
547                        break;
548                case 'b':
549                        backlog = atoi(optarg);
550                        break;
551                case 'a':
552                        attach = true;
553                        break;
554                case 'S':
555                        sqpoll = true;
556                        break;
557                case '?':
558                default:
559                        std::cerr << "Usage: -t <threads> -p <port> -e <entries> -b <backlog> -aS" << std::endl;
560                        return EXIT_FAILURE;
561                }
562        }
563
564        if( !std::ispow2(entries) ) {
565                unsigned v = entries;
566                v--;
567                v |= v >> 1;
568                v |= v >> 2;
569                v |= v >> 4;
570                v |= v >> 8;
571                v |= v >> 16;
572                v++;
573                std::cerr << "Warning: num_entries not a power of 2 (" << entries << ") raising to " << v << std::endl;
574                entries = v;
575        }
576
577        //===================
578        // End FD
579        // Create a single event fd to notify the kernel threads when the server shutsdown
580        int efd = eventfd(0, EFD_SEMAPHORE);
581        if (efd < 0) {
582                std::cerr << "eventfd error: (" << errno << ") " << strerror(errno) << std::endl;
583                exit(EXIT_FAILURE);
584        }
585
586        //===================
587        // Open Socket
588        // Listen on specified port
589        std::cout << getpid() << " : Listening on port " << port << std::endl;
590        int server_fd = socket(AF_INET, SOCK_STREAM, 0);
591        if(server_fd < 0) {
592                std::cerr << "socket error: (" << errno << ") " << strerror(errno) << std::endl;
593                exit(EXIT_FAILURE);
594        }
595
596        int ret = 0;
597        struct sockaddr_in address;
598        int addrlen = sizeof(address);
599        memset( (char *)&address, '\0', addrlen );
600        address.sin_family = AF_INET;
601        address.sin_addr.s_addr = htonl(INADDR_ANY);
602        address.sin_port = htons( port );
603
604        // In case the port is already in use, don't just return an error
605        // Linux is very slow at reclaiming port so just retry regularly
606        int waited = 0;
607        while(true) {
608                ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) );
609                if(ret < 0) {
610                        if(errno == EADDRINUSE) {
611                                // Port is in used let's retry later
612                                if(waited == 0) {
613                                        std::cerr << "Waiting for port" << std::endl;
614                                } else {
615                                        // To be cure, print how long we have been waiting
616                                        std::cerr << "\r" << waited;
617                                        std::cerr.flush();
618                                }
619                                waited ++;
620                                usleep( 1000000 ); // Wait and retry
621                                continue;
622                        }
623                        // Some other error occured, this is a real error
624                        std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl;
625                        exit(EXIT_FAILURE);
626                }
627                break;
628        }
629
630        ret = listen( server_fd, backlog );
631        if(ret < 0) {
632                std::cerr << "listen error: (" << errno << ") " << strerror(errno) << std::endl;
633                exit(EXIT_FAILURE);
634        }
635
636        //===================
637        // Run Server Threads
638        std::cout << "Starting " << nthreads << " Threads";
639        if(attach) {
640                std::cout << " with attached Rings";
641        }
642        std::cout << std::endl;
643
644        // Create the desired number of kernel-threads and for each
645        // create a ring. Create the rings in the main so we can attach them
646        // Since the rings are all in a dense VLA, aligned them so we don't get false sharing
647        // it's unlikely but better safe than sorry
648        struct __attribute__((aligned(128))) aligned_ring {
649                struct io_uring storage;
650        };
651        aligned_ring thrd_rings[nthreads];
652        pthread_t    thrd_hdls[nthreads];
653        options_t    thrd_opts[nthreads];
654        bool no_drops  = true;
655        bool fast_poll = true;
656        bool nfix_sqpl = true;
657        for(unsigned i = 0; i < nthreads; i++) {
658                struct io_uring_params p = { };
659
660                if(sqpoll) { // If sqpoll is on, add the flag
661                        p.flags |= IORING_SETUP_SQPOLL;
662                        p.sq_thread_idle = 100;
663                }
664
665                if (attach && i != 0) { // If attach is on, add the flag, except for the first ring
666                        p.flags |= IORING_SETUP_ATTACH_WQ;
667                        p.wq_fd = thrd_rings[0].storage.ring_fd;
668                }
669
670                // Create the ring
671                io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p);
672
673                // Check if some of the note-worthy features are there
674                if(0 == (p.features & IORING_FEAT_NODROP         )) { no_drops  = false; }
675                if(0 == (p.features & IORING_FEAT_FAST_POLL      )) { fast_poll = false; }
676                if(0 == (p.features & IORING_FEAT_SQPOLL_NONFIXED)) { nfix_sqpl = false; }
677
678                // Write the socket options we want to the options we pass to the threads
679                thrd_opts[i].acpt.sockfd  = server_fd;
680                thrd_opts[i].acpt.addr    = (struct sockaddr *)&address;
681                thrd_opts[i].acpt.addrlen = (socklen_t*)&addrlen;
682                thrd_opts[i].acpt.flags   = 0;
683                thrd_opts[i].endfd        = efd;
684                thrd_opts[i].ring         = &thrd_rings[i].storage;
685
686                int ret = pthread_create(&thrd_hdls[i], nullptr, proc_loop, &thrd_opts[i]);
687                if (ret < 0) {
688                        std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
689                        exit(EXIT_FAILURE);
690                }
691        }
692
693        // Tell the user if the features are present
694        if( no_drops ) std::cout << "No Drop Present" << std::endl;
695        if( fast_poll) std::cout << "Fast Poll Present" << std::endl;
696        if(!nfix_sqpl) std::cout << "Non-Fixed SQ Poll not Present" << std::endl;
697
698        //===================
699        // Server Started
700        std::cout << "Server Started" << std::endl;
701        {
702                char buffer[128];
703                int ret;
704                do {
705                        // Wait for a Ctrl-D to close the server
706                        ret = read(STDIN_FILENO, buffer, 128);
707                        if(ret < 0) {
708                                std::cerr << "main read error: (" << errno << ") " << strerror(errno) << std::endl;
709                                exit(EXIT_FAILURE);
710                        }
711                        else if(ret > 0) {
712                                std::cout << "User inputed '";
713                                std::cout.write(buffer, ret);
714                                std::cout << "'" << std::endl;
715                        }
716                } while(ret != 0);
717
718                std::cout << "Shutdown received" << std::endl;
719        }
720
721        //===================
722        // Use eventfd_write to tell the threads we are closing
723        (std::cout << "Sending Shutdown to Threads... ").flush();
724        ret = eventfd_write(efd, nthreads);
725        if (ret < 0) {
726                std::cerr << "eventfd close error: (" << errno << ") " << strerror(errno) << std::endl;
727                exit(EXIT_FAILURE);
728        }
729        std::cout << "done" << std::endl;
730
731        //===================
732        // Join all the threads and close the rings
733        (std::cout << "Stopping Threads Done... ").flush();
734        for(unsigned i = 0; i < nthreads; i++) {
735                void * retval;
736                int ret = pthread_join(thrd_hdls[i], &retval);
737                if (ret < 0) {
738                        std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
739                        exit(EXIT_FAILURE);
740                }
741
742                io_uring_queue_exit(thrd_opts[i].ring);
743        }
744        std::cout << "done" << std::endl;
745
746        //===================
747        // Close the sockets
748        (std::cout << "Closing Socket... ").flush();
749        ret = shutdown( server_fd, SHUT_RD );
750        if( ret < 0 ) {
751                std::cerr << "shutdown socket error: (" << errno << ") " << strerror(errno) << std::endl;
752                exit(EXIT_FAILURE);
753        }
754
755        ret = close(server_fd);
756        if (ret < 0) {
757                std::cerr << "close socket error: (" << errno << ") " << strerror(errno) << std::endl;
758                exit(EXIT_FAILURE);
759        }
760        std::cout << "done" << std::endl << std::endl;
761
762        // Print stats and exit
763        std::cout << "Errors: " << global_stats.errors.conns << "c, (" << global_stats.errors.requests.pipes << "p, " << global_stats.errors.requests.reset << "r, " << global_stats.errors.requests.other << "o" << ")r, (" << global_stats.errors.answers.pipes << "p, " << global_stats.errors.answers.reset << "r, " << global_stats.errors.answers.other << "o" << ")a" << std::endl;
764        std::cout << "Completions: " << global_stats.completions.conns << "c, " << global_stats.completions.reads << "r, " << global_stats.completions.writes << "w" << std::endl;
765        std::cout << "Full Writes: " << global_stats.completions.full_writes << std::endl;
766        std::cout << "Max FD: " << max_fd << std::endl;
767        std::cout << "Successful connections: " << global_stats.conns.used << std::endl;
768        std::cout << "Max concurrent connections: " << global_stats.conns.max << std::endl;
769        std::cout << "Accepts on non-zeros: " << global_stats.recycle_errors << std::endl;
770        std::cout << "Leaked conn objects: " << global_stats.conns.current << std::endl;
771}
772
773// compile-command: "g++ http_ring.cpp -std=c++2a -pthread -luring -O3" //
Note: See TracBrowser for help on using the repository browser.