source: benchmark/io/http/http_ring.cpp @ 761a246

arm-ehenumforall-pointer-decayjacob/cs343-translationnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 761a246 was 761a246, checked in by Thierry Delisle <tdelisle@…>, 2 years ago

Massive changes to how http_ring works

  • Property mode set to 100644
File size: 22.5 KB
Line 
1#include <cstdio>
2#include <cstdlib>
3#include <cstring>
4
5#include <iostream>
6
7#include <signal.h>
8#include <unistd.h>
9#include <liburing.h>
10
11// Options passed to each threads
12struct __attribute__((aligned(128))) options_t {
13        // Data passed to accept
14        struct {
15                int sockfd;
16                struct sockaddr *addr;
17                socklen_t *addrlen;
18                int flags;
19        } acpt;
20
21        // Termination notification
22        int endfd;
23
24        // The ring to use for io
25        struct io_uring * ring;
26};
27
28//=========================================================
29// General statistics
30struct __attribute__((aligned(128))) stats_block_t {
31        struct {
32                volatile size_t conns = 0;
33                volatile size_t reads = 0;
34                volatile size_t writes = 0;
35                volatile size_t full_writes = 0;
36        } completions;
37
38        struct {
39                volatile size_t conns = 0;
40                struct {
41                        volatile size_t pipes = 0;
42                        volatile size_t reset = 0;
43                        volatile size_t other = 0;
44                } requests;
45
46                struct {
47                        volatile size_t pipes = 0;
48                        volatile size_t reset = 0;
49                        volatile size_t other = 0;
50                } answers;
51        } errors;
52
53        struct {
54                volatile size_t current = 0;
55                volatile size_t max = 0;
56                volatile size_t used = 0;
57        } conns;
58
59        volatile size_t recycle_errors = 0;
60};
61
62// Each thread gets its own block of stats
63// and there is a global block for tallying at the end
64thread_local stats_block_t stats;
65stats_block_t global_stats;
66
67// Get an array of current connections
68// This is just for debugging, to make sure
69// no two state-machines get the same fd
70const size_t array_max = 25000;
71class connection * volatile conns[array_max] = { 0 };
72
73// Max fd we've seen, keep track so it's convenient to adjust the array size after
74volatile int max_fd = 0;
75
76//=========================================================
77// Some small wrappers for ring operations used outside the connection state machine
78// get sqe + error handling
79static struct io_uring_sqe * get_sqe(struct io_uring * ring) {
80        struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
81        if(!sqe) {
82                std::cerr << "Insufficient entries in ring" << std::endl;
83                exit(EXIT_FAILURE);
84        }
85        return sqe;
86}
87
88// read of the event fd is not done by a connection
89// use nullptr as the user data
90static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) {
91        struct io_uring_sqe * sqe = get_sqe(ring);
92        io_uring_prep_read(sqe, fd, buffer, len, 0);
93        io_uring_sqe_set_data(sqe, nullptr);
94        io_uring_submit(ring);
95}
96
97//=========================================================
98// All answers are fixed and determined by the return code
99enum HttpCode {
100        OK200 = 0,
101        E400,
102        E404,
103        E405,
104        E408,
105        E413,
106        E414,
107        KNOWN_CODES
108};
109
110// Get a fix reply based on the return code
111const char * http_msgs[] = {
112        "HTTP/1.1 200 OK\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 15\r\nConnection: keep-alive\r\n\r\nHello, World!\r\n",
113        "HTTP/1.1 400 Bad Request\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
114        "HTTP/1.1 404 Not Found\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
115        "HTTP/1.1 405 Method Not \r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
116        "HTTP/1.1 408 Request Timeout\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
117        "HTTP/1.1 413 Payload Too Large\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
118        "HTTP/1.1 414 URI Too Long\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
119};
120static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0])) );
121
122// Pre-compute the length of these replys
123const size_t http_lens[] = {
124        strlen(http_msgs[0]),
125        strlen(http_msgs[1]),
126        strlen(http_msgs[2]),
127        strlen(http_msgs[3]),
128        strlen(http_msgs[4]),
129        strlen(http_msgs[5]),
130        strlen(http_msgs[6]),
131};
132static_assert( KNOWN_CODES == (sizeof(http_lens) / sizeof(http_lens[0])) );
133
134//=========================================================
135// Finate state machine responsible for handling each connection
136class __attribute__((aligned(128))) connection {
137private:
138        // The state of the machine
139        enum {
140                ACCEPTING,  // Accept sent waiting for connection
141                REQUESTING, // Waiting for new request
142                ANSWERING,  // Either request received submitting answer or short answer sent, need to submit rest
143        } state;
144
145        // The file descriptor of the connection
146        int fd;
147
148        // request data
149        static const size_t buffer_size = 1024; // Size of the read buffer
150        const char * buffer;                      // Buffer into which requests are read
151
152        // send data
153        size_t to_send;         // Data left to send
154        const char * iterator;  // Pointer to rest of the message to send
155
156        // stats
157        // how many requests/answers were complete, that is, a valid cqe was obtained
158        struct {
159                size_t requests = 0;
160                size_t answers = 0;
161        } stats;
162
163private:
164        connection()
165                : state(ACCEPTING)
166                , fd(0)
167                , buffer( new char[buffer_size])
168                , iterator(nullptr)
169        {
170                ::stats.conns.max++;
171                ::stats.conns.current++;
172        }
173
174        ~connection() {
175                delete [] buffer;
176                ::stats.conns.current--;
177        }
178
179        // Close the current connection
180        void close(int err) {
181                // std::cout << "(" << this->stats.requests << "," << this->stats.answers << ", e" << err << ") ";
182                conns[fd] = nullptr;
183
184                if(fd != 0) {
185                        ::close(fd);
186                }
187                delete this;
188        }
189
190        //--------------------------------------------------
191        // Wrappers for submit so we can tweak it more easily
192        static void submit(struct io_uring * ring, struct io_uring_sqe * sqe, connection * conn) {
193                (void)ring;
194                // io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
195                io_uring_sqe_set_data(sqe, conn);
196                // io_uring_submit(ring);
197        }
198
199        void submit(struct io_uring * ring, struct io_uring_sqe * sqe) {
200                submit(ring, sqe, this);
201        }
202
203        //--------------------------------------------------
204        // get a new request from the client
205        void request(struct io_uring * ring) {
206                state = REQUESTING;
207                struct io_uring_sqe * sqe = get_sqe(ring);
208                io_uring_prep_read(sqe, fd, (void*)buffer, buffer_size, 0);
209                submit(ring, sqe);
210        }
211
212        //--------------------------------------------------
213        // Send a new answer based on a return code
214        void answer(struct io_uring * ring, HttpCode code) {
215                iterator = http_msgs[code];
216                to_send  = http_lens[code];
217                if(to_send != 124) {
218                        std::cerr << "Answer has weird size: " << to_send << " (" << (int)code << ")" << std::endl;
219                }
220                answer(ring);
221        }
222
223        // send a new answer to the client
224        // Reused for incomplete writes
225        void answer(struct io_uring * ring) {
226                state = ANSWERING;
227                struct io_uring_sqe * sqe = get_sqe(ring);
228                io_uring_prep_write(sqe, fd, iterator, to_send, 0);
229                submit(ring, sqe);
230        }
231
232        //--------------------------------------------------
233        // Handle a new connection, results for getting an cqe while in the ACCEPTING state
234        void newconn(struct io_uring * ring, int ret) {
235                // Check errors
236                if( ret < 0 ) {
237                        int err = -ret;
238                        if( err == ECONNABORTED ) {
239                                ::stats.errors.conns++;
240                                this->close(err);
241                                return;
242                        }
243                        std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
244                        exit(EXIT_FAILURE);
245                }
246
247                // Count the connections
248                ::stats.completions.conns++;
249
250                // Read on the data
251                fd = ret;
252                request(ring);
253
254                // check the max fd so we know if we exceeded the array
255                for(;;) {
256                        int expected = max_fd;
257                        if(expected >= fd) return;
258                        if( __atomic_compare_exchange_n(&max_fd, &expected, fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) return;
259                }
260
261                // check if we have enough space to fit inside the array
262                if(fd >= array_max) {
263                        std::cerr << "accept error: fd " << fd << " is too high" << std::endl;
264                        return;
265                }
266
267                // Put our connection into the global array
268                // No one else should be using it so if they are that's a bug
269                auto exist = __atomic_exchange_n( &conns[fd], this, __ATOMIC_SEQ_CST);
270                if( exist ) {
271                        size_t first = __atomic_fetch_add(&global_stats.recycle_errors, 1, __ATOMIC_SEQ_CST);
272                        if( first == 0 ) {
273                                std::cerr << "First: accept has existing connection " << std::endl;
274                        }
275                }
276        }
277
278        // Handle a new request, results for getting an cqe while in the REQUESTING state
279        void newrequest(struct io_uring * ring, int ret) {
280                // Check errors
281                if( ret < 0 ) {
282                        int err = -ret;
283                        switch(err) {
284                                case EPIPE:
285                                        ::stats.errors.requests.pipes++;
286                                        break;
287                                        // Don't fall through the get better stats
288                                case ECONNRESET:
289                                        ::stats.errors.requests.reset++;
290                                        break;
291                                default:
292                                        ::stats.errors.requests.other++;
293                                        std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl;
294                                        exit(EXIT_FAILURE);
295                        }
296
297                        // Connection failed, close it
298                        this->close(err);
299                        return;
300                }
301
302                // Update stats
303                ::stats.completions.reads++;
304
305                // Is this an EOF
306                if(ret == 0) {
307                        // Yes, close the connection
308                        this->close(0);
309                        return;
310                }
311
312                // Find the end of the request header
313                const char * it = buffer;
314                if( !strstr( it, "\r\n\r\n" ) ) {
315                        // This state machine doesn't support incomplete reads
316                        // Print them to output so it's clear there is an issue
317                        std::cout << "Incomplete request" << std::endl;
318                        this->close(EBADR);
319                        return;
320                }
321
322                // Find the method to use
323                it = buffer;
324                int ret = memcmp(it, "GET ", 4);
325                if( ret != 0 ) {
326                        // We only support get, answer with an error
327                        answer(ring, E400);
328                        return;
329                }
330
331                // Find the target
332                it += 4;
333                ret = memcmp(it, "/plaintext", 10);
334                if( ret != 0 ) {
335                        // We only support /plaintext, answer with an error
336                        answer(ring, E404);
337                        return;
338                }
339
340                // Correct request, answer with the payload
341                this->stats.requests++;
342                answer(ring, OK200);
343        }
344
345        // Handle a partial or full answer sent, results for getting an cqe while in the ANSWERING state
346        void writedone(struct io_uring * ring, int res) {
347                // Check errors
348                if( res < 0 ) {
349                        int err = -res;
350                        switch(err) {
351                                case EPIPE:
352                                        ::stats.errors.answers.pipes++;
353                                        break;
354                                        // Don't fall through the get better stats
355                                case ECONNRESET:
356                                        ::stats.errors.answers.reset++;
357                                        break;
358                                default:
359                                        ::stats.errors.answers.other++;
360                                        std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
361                                        exit(EXIT_FAILURE);
362                        }
363
364                        this->close(err);
365                        return;
366                }
367
368                // Update stats
369                ::stats.completions.writes++;
370                if(res == 124) ::stats.completions.full_writes++;
371
372                // Is this write completed
373                if( res == to_send ) {
374                        // Yes, more stats
375                        stats.answers++;
376                        if(stats.answers == 2) ::stats.conns.used++;
377                        // Then read a new request
378                        request(ring);
379                        return;
380                }
381
382                // Not a completed read, push the rest
383                to_send -= res;
384                iterator += res;
385                answer(ring);
386        }
387public:
388        // Submit a call to accept and create a new connection object
389        static void accept(struct io_uring * ring, const struct options_t & opt) {
390                struct io_uring_sqe * sqe = get_sqe(ring);
391                io_uring_prep_accept(sqe, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
392                submit(ring, sqe, new connection());
393                // std::cout << "Submitted accept: " << req << std::endl;
394        }
395
396        // Handle a new cqe
397        void handle(struct io_uring * ring, int res, const struct options_t & opt) {
398                switch(state) {
399                case ACCEPTING:
400                        connection::accept(ring, opt);
401                        newconn(ring, res);
402                        break;
403                case REQUESTING:
404                        newrequest(ring, res);
405                        break;
406                case ANSWERING:
407                        writedone(ring, res);
408                        break;
409                }
410        }
411};
412
413//=========================================================
414// Main loop of the WebServer
415// Effectively uses one thread_local copy of everything per kernel thread
416void * proc_loop(void * arg) {
417        // Get the thread local argument
418        struct options_t & opt = *(struct options_t *)arg;
419        struct io_uring * ring = opt.ring;
420
421        // Track the shutdown using a event_fd
422        char endfd_buf[8];
423        ring_end(ring, opt.endfd, endfd_buf, 8);
424
425        // Accept our first connection
426        // May not take effect until io_uring_submit_and_wait
427        connection::accept(ring, opt);
428
429        int reset = 1;       // Counter to print stats once in a while
430        bool done = false;   // Are we done
431        size_t sqes = 0;     // Number of sqes we submitted
432        size_t call = 0;     // Number of submits we made
433        while(!done) {
434                // Submit all the answers we have and wait for responses
435                int ret = io_uring_submit_and_wait(ring, 1);
436
437                // check errors
438                if (ret < 0) {
439                        fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );
440                        exit(EXIT_FAILURE);
441                }
442
443                // Check how good we are at batching sqes
444                sqes += ret;
445                call++;
446
447                struct io_uring_cqe *cqe;
448                unsigned head;
449                unsigned count = 0;
450
451                // go through all cqes
452                io_uring_for_each_cqe(ring, head, cqe) {
453                        if (0 == cqe->user_data) {
454                                done = true;
455                                break;
456                        }
457
458                        auto req = (class connection *)cqe->user_data;
459                        req->handle( ring, cqe->res, opt );
460
461                        reset--;
462                        if(reset == 0) {
463                                std::cout << "Submit average: " << sqes << "/" << call << "(" << (((double)sqes) / call) << ")" << std::endl;
464                                reset = 100000 + (100000 * (ring->ring_fd % 5));
465                        }
466
467                        // Keep track of how many cqes we have seen
468                        count++;
469                }
470
471                // Mark the cqes as seen
472                io_uring_cq_advance(ring, count);
473        }
474
475        // Tally all the thread local statistics
476        __atomic_fetch_add( &global_stats.completions.conns, ::stats.completions.conns, __ATOMIC_SEQ_CST );
477        __atomic_fetch_add( &global_stats.completions.reads, ::stats.completions.reads, __ATOMIC_SEQ_CST );
478        __atomic_fetch_add( &global_stats.completions.writes, ::stats.completions.writes, __ATOMIC_SEQ_CST );
479        __atomic_fetch_add( &global_stats.completions.full_writes, ::stats.completions.full_writes, __ATOMIC_SEQ_CST );
480        __atomic_fetch_add( &global_stats.errors.conns, ::stats.errors.conns, __ATOMIC_SEQ_CST );
481        __atomic_fetch_add( &global_stats.errors.requests.pipes, ::stats.errors.requests.pipes, __ATOMIC_SEQ_CST );
482        __atomic_fetch_add( &global_stats.errors.requests.reset, ::stats.errors.requests.reset, __ATOMIC_SEQ_CST );
483        __atomic_fetch_add( &global_stats.errors.requests.other, ::stats.errors.requests.other, __ATOMIC_SEQ_CST );
484        __atomic_fetch_add( &global_stats.errors.answers.pipes, ::stats.errors.answers.pipes, __ATOMIC_SEQ_CST );
485        __atomic_fetch_add( &global_stats.errors.answers.reset, ::stats.errors.answers.reset, __ATOMIC_SEQ_CST );
486        __atomic_fetch_add( &global_stats.errors.answers.other, ::stats.errors.answers.other, __ATOMIC_SEQ_CST );
487        __atomic_fetch_add( &global_stats.conns.current, ::stats.conns.current, __ATOMIC_SEQ_CST );
488        __atomic_fetch_add( &global_stats.conns.max, ::stats.conns.max, __ATOMIC_SEQ_CST );
489        __atomic_fetch_add( &global_stats.conns.used, ::stats.conns.used, __ATOMIC_SEQ_CST );
490
491        return nullptr;
492}
493
494//=========================================================
495#include <bit> // for ispow2
496
497extern "C" {
498        #include <pthread.h>      // for pthreads
499        #include <signal.h>       // for signal(SIGPIPE, SIG_IGN);
500        #include <sys/eventfd.h>  // use for termination
501        #include <sys/socket.h>   // for sockets in general
502        #include <netinet/in.h>   // for sockaddr_in, AF_INET
503}
504
505int main(int argc, char * argv[]) {
506        // Initialize the array of connection-fd associations
507        for(int i = 0; i < array_max; i++) {
508                conns[i] = nullptr;
509        }
510
511        // Make sure we ignore all sigpipes
512        signal(SIGPIPE, SIG_IGN);
513
514        // Default command line arguments
515        unsigned nthreads = 1;      // number of kernel threads
516        unsigned port = 8800;       // which port to listen on
517        unsigned entries = 256;     // number of entries per ring/kernel thread
518        unsigned backlog = 262144;  // backlog argument to listen
519        bool attach = false;        // Whether or not to attach all the rings
520        bool sqpoll = false;        // Whether or not to use SQ Polling
521
522        //===================
523        // Arguments Parsing
524        int c;
525        while ((c = getopt (argc, argv, "t:p:e:b:aS")) != -1) {
526                switch (c)
527                {
528                case 't':
529                        nthreads = atoi(optarg);
530                        break;
531                case 'p':
532                        port = atoi(optarg);
533                        break;
534                case 'e':
535                        entries = atoi(optarg);
536                        break;
537                case 'b':
538                        backlog = atoi(optarg);
539                        break;
540                case 'a':
541                        attach = true;
542                        break;
543                case 'S':
544                        sqpoll = true;
545                        break;
546                case '?':
547                default:
548                        std::cerr << "Usage: -t <threads> -p <port> -e <entries> -b <backlog> -aS" << std::endl;
549                        return EXIT_FAILURE;
550                }
551        }
552
553        if( !std::ispow2(entries) ) {
554                unsigned v = entries;
555                v--;
556                v |= v >> 1;
557                v |= v >> 2;
558                v |= v >> 4;
559                v |= v >> 8;
560                v |= v >> 16;
561                v++;
562                std::cerr << "Warning: num_entries not a power of 2 (" << entries << ") raising to " << v << std::endl;
563                entries = v;
564        }
565
566        //===================
567        // End FD
568        // Create a single event fd to notify the kernel threads when the server shutsdown
569        int efd = eventfd(0, EFD_SEMAPHORE);
570        if (efd < 0) {
571                std::cerr << "eventfd error: (" << errno << ") " << strerror(errno) << std::endl;
572                exit(EXIT_FAILURE);
573        }
574
575        //===================
576        // Open Socket
577        // Listen on specified port
578        std::cout << getpid() << " : Listening on port " << port << std::endl;
579        int server_fd = socket(AF_INET, SOCK_STREAM, 0);
580        if(server_fd < 0) {
581                std::cerr << "socket error: (" << errno << ") " << strerror(errno) << std::endl;
582                exit(EXIT_FAILURE);
583        }
584
585        int ret = 0;
586        struct sockaddr_in address;
587        int addrlen = sizeof(address);
588        memset( (char *)&address, '\0', addrlen );
589        address.sin_family = AF_INET;
590        address.sin_addr.s_addr = htonl(INADDR_ANY);
591        address.sin_port = htons( port );
592
593        // In case the port is already in use, don't just return an error
594        // Linux is very slow at reclaiming port so just retry regularly
595        int waited = 0;
596        while(true) {
597                ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) );
598                if(ret < 0) {
599                        if(errno == EADDRINUSE) {
600                                // Port is in used let's retry later
601                                if(waited == 0) {
602                                        std::cerr << "Waiting for port" << std::endl;
603                                } else {
604                                        // To be cure, print how long we have been waiting
605                                        std::cerr << "\r" << waited;
606                                        std::cerr.flush();
607                                }
608                                waited ++;
609                                usleep( 1000000 ); // Wait and retry
610                                continue;
611                        }
612                        // Some other error occured, this is a real error
613                        std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl;
614                        exit(EXIT_FAILURE);
615                }
616                break;
617        }
618
619        ret = listen( server_fd, backlog );
620        if(ret < 0) {
621                std::cerr << "listen error: (" << errno << ") " << strerror(errno) << std::endl;
622                exit(EXIT_FAILURE);
623        }
624
625        //===================
626        // Run Server Threads
627        std::cout << "Starting " << nthreads << " Threads";
628        if(attach) {
629                std::cout << " with attached Rings";
630        }
631        std::cout << std::endl;
632
633        // Create the desired number of kernel-threads and for each
634        // create a ring. Create the rings in the main so we can attach them
635        // Since the rings are all in a dense VLA, aligned them so we don't get false sharing
636        // it's unlikely but better safe than sorry
637        struct __attribute__((aligned(128))) aligned_ring {
638                struct io_uring storage;
639        };
640        aligned_ring thrd_rings[nthreads];
641        pthread_t    thrd_hdls[nthreads];
642        options_t    thrd_opts[nthreads];
643        bool no_drops  = true;
644        bool fast_poll = true;
645        bool nfix_sqpl = true;
646        for(unsigned i = 0; i < nthreads; i++) {
647                struct io_uring_params p = { };
648
649                if(sqpoll) { // If sqpoll is on, add the flag
650                        p.flags |= IORING_SETUP_SQPOLL;
651                        p.sq_thread_idle = 100;
652                }
653
654                if (attach && i != 0) { // If attach is on, add the flag, except for the first ring
655                        p.flags |= IORING_SETUP_ATTACH_WQ;
656                        p.wq_fd = thrd_rings[0].storage.ring_fd;
657                }
658
659                // Create the ring
660                io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p);
661
662                // Check if some of the note-worthy features are there
663                if(0 == (p.features & IORING_FEAT_NODROP         )) { no_drops  = false; }
664                if(0 == (p.features & IORING_FEAT_FAST_POLL      )) { fast_poll = false; }
665                if(0 == (p.features & IORING_FEAT_SQPOLL_NONFIXED)) { nfix_sqpl = false; }
666
667                // Write the socket options we want to the options we pass to the threads
668                thrd_opts[i].acpt.sockfd  = server_fd;
669                thrd_opts[i].acpt.addr    = (struct sockaddr *)&address;
670                thrd_opts[i].acpt.addrlen = (socklen_t*)&addrlen;
671                thrd_opts[i].acpt.flags   = 0;
672                thrd_opts[i].endfd        = efd;
673                thrd_opts[i].ring         = &thrd_rings[i].storage;
674
675                int ret = pthread_create(&thrd_hdls[i], nullptr, proc_loop, &thrd_opts[i]);
676                if (ret < 0) {
677                        std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
678                        exit(EXIT_FAILURE);
679                }
680        }
681
682        // Tell the user if the features are present
683        if( no_drops ) std::cout << "No Drop Present" << std::endl;
684        if( fast_poll) std::cout << "Fast Poll Present" << std::endl;
685        if(!nfix_sqpl) std::cout << "Non-Fixed SQ Poll not Present" << std::endl;
686
687        //===================
688        // Server Started
689        std::cout << "Server Started" << std::endl;
690        {
691                char buffer[128];
692                int ret;
693                do {
694                        // Wait for a Ctrl-D to close the server
695                        ret = read(STDIN_FILENO, buffer, 128);
696                        if(ret < 0) {
697                                std::cerr << "main read error: (" << errno << ") " << strerror(errno) << std::endl;
698                                exit(EXIT_FAILURE);
699                        }
700                        else if(ret > 0) {
701                                std::cout << "User inputed '";
702                                std::cout.write(buffer, ret);
703                                std::cout << "'" << std::endl;
704                        }
705                } while(ret != 0);
706
707                std::cout << "Shutdown received" << std::endl;
708        }
709
710        //===================
711        // Use eventfd_write to tell the threads we are closing
712        (std::cout << "Sending Shutdown to Threads... ").flush();
713        ret = eventfd_write(efd, nthreads);
714        if (ret < 0) {
715                std::cerr << "eventfd close error: (" << errno << ") " << strerror(errno) << std::endl;
716                exit(EXIT_FAILURE);
717        }
718        std::cout << "done" << std::endl;
719
720        //===================
721        // Join all the threads and close the rings
722        (std::cout << "Stopping Threads Done... ").flush();
723        for(unsigned i = 0; i < nthreads; i++) {
724                void * retval;
725                int ret = pthread_join(thrd_hdls[i], &retval);
726                if (ret < 0) {
727                        std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
728                        exit(EXIT_FAILURE);
729                }
730
731                io_uring_queue_exit(thrd_opts[i].ring);
732        }
733        std::cout << "done" << std::endl;
734
735        //===================
736        // Close the sockets
737        (std::cout << "Closing Socket... ").flush();
738        ret = shutdown( server_fd, SHUT_RD );
739        if( ret < 0 ) {
740                std::cerr << "shutdown socket error: (" << errno << ") " << strerror(errno) << std::endl;
741                exit(EXIT_FAILURE);
742        }
743
744        ret = close(server_fd);
745        if (ret < 0) {
746                std::cerr << "close socket error: (" << errno << ") " << strerror(errno) << std::endl;
747                exit(EXIT_FAILURE);
748        }
749        std::cout << "done" << std::endl << std::endl;
750
751        // Print stats and exit
752        std::cout << "Errors: " << global_stats.errors.conns << "c, (" << global_stats.errors.requests.pipes << "p, " << global_stats.errors.requests.reset << "r, " << global_stats.errors.requests.other << "o" << ")r, (" << global_stats.errors.answers.pipes << "p, " << global_stats.errors.answers.reset << "r, " << global_stats.errors.answers.other << "o" << ")a" << std::endl;
753        std::cout << "Completions: " << global_stats.completions.conns << "c, " << global_stats.completions.reads << "r, " << global_stats.completions.writes << "w" << std::endl;
754        std::cout << "Full Writes: " << global_stats.completions.full_writes << std::endl;
755        std::cout << "Max FD: " << max_fd << std::endl;
756        std::cout << "Successful connections: " << global_stats.conns.used << std::endl;
757        std::cout << "Accepts on non-zeros: " << global_stats.recycle_errors << std::endl;
758        std::cout << "Leaked conn objects: " << global_stats.conns.current << std::endl;
759}
Note: See TracBrowser for help on using the repository browser.