Ignore:
Timestamp:
Feb 4, 2021, 10:03:29 PM (9 months ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
arm-eh, jacob/cs343-translation, master, new-ast-unique-expr
Children:
5ce9bea
Parents:
c292244 (diff), 9af0fe2d (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

File:
1 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/http_ring.cpp

    rc292244 ref0b456  
    99#include <liburing.h>
    1010
    11 typedef enum {
    12         EVENT_END,
    13         EVENT_ACCEPT,
    14         EVENT_REQUEST,
    15         EVENT_ANSWER
    16 } event_t;
    17 
    18 struct __attribute__((aligned(128))) request_t {
    19         event_t type;
    20         int fd;
    21         size_t length;
    22         char * buff;
    23         char data[0];
    24 
    25         static struct request_t * create(event_t type, size_t extra) {
    26                 auto ret = (struct request_t *)malloc(sizeof(struct request_t) + extra);
    27                 ret->type = type;
    28                 ret->length = extra;
    29                 ret->buff = ret->data;
    30                 return ret;
    31         }
    32 
    33         static struct request_t * create(event_t type) {
    34                 return create(type, 0);
    35         }
    36 };
    37 
     11// #define NOBATCHING
     12// #define USE_ASYNC
     13
     14// Options passed to each threads
    3815struct __attribute__((aligned(128))) options_t {
     16        // Data passed to accept
    3917        struct {
    4018                int sockfd;
     
    4422        } acpt;
    4523
     24        // Termination notification
    4625        int endfd;
     26
     27        // The ring to use for io
    4728        struct io_uring * ring;
    48 
     29};
     30
     31//=========================================================
     32// General statistics
     33struct __attribute__((aligned(128))) stats_block_t {
    4934        struct {
    50                 size_t subs = 0;
    51                 size_t cnts = 0;
    52         } result;
     35                volatile size_t conns = 0;
     36                volatile size_t reads = 0;
     37                volatile size_t writes = 0;
     38                volatile size_t full_writes = 0;
     39        } completions;
     40
     41        struct {
     42                volatile size_t conns = 0;
     43                struct {
     44                        volatile size_t pipes = 0;
     45                        volatile size_t reset = 0;
     46                        volatile size_t other = 0;
     47                } requests;
     48
     49                struct {
     50                        volatile size_t pipes = 0;
     51                        volatile size_t reset = 0;
     52                        volatile size_t other = 0;
     53                } answers;
     54        } errors;
     55
     56        struct {
     57                volatile size_t current = 0;
     58                volatile size_t max = 0;
     59                volatile size_t used = 0;
     60        } conns;
     61
     62        volatile size_t recycle_errors = 0;
    5363};
    5464
     65// Each thread gets its own block of stats
     66// and there is a global block for tallying at the end
     67thread_local stats_block_t stats;
     68stats_block_t global_stats;
     69
     70// Get an array of current connections
     71// This is just for debugging, to make sure
     72// no two state-machines get the same fd
     73const size_t array_max = 25000;
     74class connection * volatile conns[array_max] = { 0 };
     75
     76// Max fd we've seen, keep track so it's convenient to adjust the array size after
     77volatile int max_fd = 0;
     78
    5579//=========================================================
     80// Some small wrappers for ring operations used outside the connection state machine
     81// get sqe + error handling
    5682static struct io_uring_sqe * get_sqe(struct io_uring * ring) {
    5783        struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
     
    6389}
    6490
    65 static void submit(struct io_uring * ) {
    66         // io_uring_submit(ring);
    67 }
    68 
    69 //=========================================================
     91// read of the event fd is not done by a connection
     92// use nullptr as the user data
    7093static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) {
    7194        struct io_uring_sqe * sqe = get_sqe(ring);
    7295        io_uring_prep_read(sqe, fd, buffer, len, 0);
    73         io_uring_sqe_set_data(sqe, request_t::create(EVENT_END));
    74         submit(ring);
     96        io_uring_sqe_set_data(sqe, nullptr);
     97        io_uring_submit(ring);
    7598}
    7699
    77 static void ring_accept(struct io_uring * ring, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
    78         auto req = request_t::create(EVENT_ACCEPT);
    79         struct io_uring_sqe * sqe = get_sqe(ring);
    80         io_uring_prep_accept(sqe, sockfd, addr, addrlen, flags);
    81         io_uring_sqe_set_data(sqe, req);
    82         submit(ring);
    83         // std::cout << "Submitted accept: " << req << std::endl;
    84 }
    85 
    86 static void ring_request(struct io_uring * ring, int fd) {
    87         size_t size = 1024;
    88         auto req = request_t::create(EVENT_REQUEST, size);
    89         req->fd = fd;
    90 
    91         struct io_uring_sqe * sqe = get_sqe(ring);
    92         io_uring_prep_read(sqe, fd, req->buff, size, 0);
    93         io_uring_sqe_set_data(sqe, req);
    94         submit(ring);
    95         // std::cout << "Submitted request: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
    96 }
    97 
    98100//=========================================================
     101// All answers are fixed and determined by the return code
    99102enum HttpCode {
    100103        OK200 = 0,
     
    108111};
    109112
     113// Get a fix reply based on the return code
    110114const char * http_msgs[] = {
    111         "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: %zu \n\n%s",
    112         "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    113         "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    114         "HTTP/1.1 405 Method Not Allowed\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    115         "HTTP/1.1 408 Request Timeout\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    116         "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    117         "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
     115        "HTTP/1.1 200 OK\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 15\r\nConnection: keep-alive\r\n\r\nHello, World!\r\n",
     116        "HTTP/1.1 400 Bad Request\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
     117        "HTTP/1.1 404 Not Found\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
     118        "HTTP/1.1 405 Method Not \r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
     119        "HTTP/1.1 408 Request Timeout\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
     120        "HTTP/1.1 413 Payload Too Large\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
     121        "HTTP/1.1 414 URI Too Long\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
    118122};
    119 
    120 static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0])));
    121 
    122 const int http_codes[] = {
    123         200,
    124         400,
    125         404,
    126         405,
    127         408,
    128         413,
    129         414,
     123static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0])) );
     124
     125// Pre-compute the length of these replys
     126const size_t http_lens[] = {
     127        strlen(http_msgs[0]),
     128        strlen(http_msgs[1]),
     129        strlen(http_msgs[2]),
     130        strlen(http_msgs[3]),
     131        strlen(http_msgs[4]),
     132        strlen(http_msgs[5]),
     133        strlen(http_msgs[6]),
    130134};
    131 
    132 static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0])));
    133 
    134 int code_val(HttpCode code) {
    135         return http_codes[code];
    136 }
    137 
    138 static void ring_answer(struct io_uring * ring, int fd, HttpCode code) {
    139         size_t size = 256;
    140         auto req = request_t::create(EVENT_ANSWER, size);
    141         req->fd = fd;
    142 
    143         const char * fmt = http_msgs[code];
    144         const char * date = "";
    145         size = snprintf(req->buff, size, fmt, date, size);
    146 
    147         struct io_uring_sqe * sqe = get_sqe(ring);
    148         io_uring_prep_write(sqe, fd, req->buff, size, 0);
    149         io_uring_sqe_set_data(sqe, req);
    150         submit(ring);
    151         // std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
    152 }
    153 
    154 static void ring_answer(struct io_uring * ring, int fd, const std::string &) {
    155         // size_t size = 256;
    156         // auto req = request_t::create(EVENT_ANSWER, size);
    157         // req->fd = fd;
    158 
    159         // const char * fmt = http_msgs[OK200];
    160         // const char * date = "";
    161         // size_t len = snprintf(req->buffer, size, fmt, date, ans.size(), ans.c_str());
    162         // req->length = len;
    163 
    164         // struct io_uring_sqe * sqe = get_sqe(ring);
    165         // io_uring_prep_write(sqe, fd, req->buffer, len, 0);
    166         // io_uring_sqe_set_data(sqe, req);
    167         // submit(ring);
    168         // std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
    169 
    170 
    171         static const char* RESPONSE = "HTTP/1.1 200 OK\r\n" \
    172                                                 "Content-Length: 15\r\n" \
    173                                                 "Content-Type: text/html\r\n" \
    174                                                 "Connection: keep-alive\r\n" \
    175                                                 "Server: testserver\r\n" \
    176                                                 "\r\n" \
    177                                                 "Hello, World!\r\n";
    178 
    179         static const size_t RLEN = strlen(RESPONSE);
    180 
    181         size_t size = 256;
    182         auto req = request_t::create(EVENT_ANSWER, size);
    183         req->fd = fd;
    184         req->buff = (char*)RESPONSE;
    185         req->length = RLEN;
    186 
    187         // const char * fmt = http_msgs[OK200];
    188         // const char * date = "";
    189         // size_t len = snprintf(req->buffer, size, fmt, date, ans.size(), ans.c_str());
    190         // req->length = len;
    191 
    192         struct io_uring_sqe * sqe = get_sqe(ring);
    193         io_uring_prep_write(sqe, fd, RESPONSE, RLEN, 0);
    194         io_uring_sqe_set_data(sqe, req);
    195         submit(ring);
    196 }
     135static_assert( KNOWN_CODES == (sizeof(http_lens) / sizeof(http_lens[0])) );
    197136
    198137//=========================================================
    199 static void handle_new_conn(struct io_uring * ring, int fd) {
    200         if( fd < 0 ) {
    201                 int err = -fd;
    202                 if( err == ECONNABORTED ) return;
    203                 std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
    204                 exit(EXIT_FAILURE);
    205         }
    206 
    207         ring_request(ring, fd);
    208 }
    209 
    210 static void handle_request(struct io_uring * ring, struct request_t * in, int res) {
    211         if( res < 0 ) {
    212                 int err = -res;
    213                 switch(err) {
    214                         case EPIPE:
    215                         case ECONNRESET:
    216                                 close(in->fd);
    217                                 free(in);
     138// Finate state machine responsible for handling each connection
     139class __attribute__((aligned(128))) connection {
     140private:
     141        // The state of the machine
     142        enum {
     143                ACCEPTING,  // Accept sent waiting for connection
     144                REQUESTING, // Waiting for new request
     145                ANSWERING,  // Either request received submitting answer or short answer sent, need to submit rest
     146        } state;
     147
     148        // The file descriptor of the connection
     149        int fd;
     150
     151        // request data
     152        static const size_t buffer_size = 1024; // Size of the read buffer
     153        const char * buffer;                      // Buffer into which requests are read
     154
     155        // send data
     156        size_t to_send;         // Data left to send
     157        const char * iterator;  // Pointer to rest of the message to send
     158
     159        // stats
     160        // how many requests/answers were complete, that is, a valid cqe was obtained
     161        struct {
     162                size_t requests = 0;
     163                size_t answers = 0;
     164        } stats;
     165
     166private:
     167        connection()
     168                : state(ACCEPTING)
     169                , fd(0)
     170                , buffer( new char[buffer_size])
     171                , iterator(nullptr)
     172        {}
     173
     174        ~connection() {
     175                delete [] buffer;
     176                ::stats.conns.current--;
     177        }
     178
     179        // Close the current connection
     180        void close(int err) {
     181                // std::cout << "(" << this->stats.requests << "," << this->stats.answers << ", e" << err << ") ";
     182                conns[fd] = nullptr;
     183
     184                if(fd != 0) {
     185                        ::close(fd);
     186                }
     187                delete this;
     188        }
     189
     190        //--------------------------------------------------
     191        // Wrappers for submit so we can tweak it more easily
     192        static void submit(struct io_uring * ring, struct io_uring_sqe * sqe, connection * conn) {
     193                (void)ring;
     194                #ifdef USE_ASYNC
     195                        io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
     196                #endif
     197                io_uring_sqe_set_data(sqe, conn);
     198                #ifdef NOBATCHING
     199                        io_uring_submit(ring);
     200                #endif
     201        }
     202
     203        void submit(struct io_uring * ring, struct io_uring_sqe * sqe) {
     204                submit(ring, sqe, this);
     205        }
     206
     207        //--------------------------------------------------
     208        // get a new request from the client
     209        void request(struct io_uring * ring) {
     210                state = REQUESTING;
     211                struct io_uring_sqe * sqe = get_sqe(ring);
     212                io_uring_prep_recv(sqe, fd, (void*)buffer, buffer_size, 0);
     213                submit(ring, sqe);
     214        }
     215
     216        //--------------------------------------------------
     217        // Send a new answer based on a return code
     218        void answer(struct io_uring * ring, HttpCode code) {
     219                iterator = http_msgs[code];
     220                to_send  = http_lens[code];
     221                if(to_send != 124) {
     222                        std::cerr << "Answer has weird size: " << to_send << " (" << (int)code << ")" << std::endl;
     223                }
     224                answer(ring);
     225        }
     226
     227        // send a new answer to the client
     228        // Reused for incomplete writes
     229        void answer(struct io_uring * ring) {
     230                state = ANSWERING;
     231                struct io_uring_sqe * sqe = get_sqe(ring);
     232                io_uring_prep_send(sqe, fd, iterator, to_send, 0);
     233                submit(ring, sqe);
     234        }
     235
     236        //--------------------------------------------------
     237        // Handle a new connection, results for getting an cqe while in the ACCEPTING state
     238        void newconn(struct io_uring * ring, int ret) {
     239                // Check errors
     240                if( ret < 0 ) {
     241                        int err = -ret;
     242                        if( err == ECONNABORTED ) {
     243                                ::stats.errors.conns++;
     244                                this->close(err);
    218245                                return;
    219                         default:
    220                                 std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl;
    221                                 exit(EXIT_FAILURE);
    222                 }
    223         }
    224 
    225         if(res == 0) {
    226                 close(in->fd);
    227                 free(in);
    228                 return;
    229         }
    230 
    231         const char * it = in->buff;
    232         if( !strstr( it, "\r\n\r\n" ) ) {
    233                 std::cout << "Incomplete request" << std::endl;
    234                 close(in->fd);
    235                 free(in);
    236                 return;
    237         }
    238 
    239         it = in->buff;
    240         const std::string reply = "Hello, World!\n";
    241         int ret = memcmp(it, "GET ", 4);
    242         if( ret != 0 ) {
    243                 ring_answer(ring, in->fd, E400);
    244                 goto NEXT;
    245         }
    246 
    247         it += 4;
    248         ret = memcmp(it, "/plaintext", 10);
    249         if( ret != 0 ) {
    250                 ring_answer(ring, in->fd, E404);
    251                 goto NEXT;
    252         }
    253 
    254         ring_answer(ring, in->fd, reply);
    255 
    256         NEXT:
    257                 ring_request(ring, in->fd);
    258                 return;
    259 }
    260 
    261 static void handle_answer(struct io_uring * ring, struct request_t * in, int res) {
    262         if( res < 0 ) {
    263                 int err = -res;
    264                 switch(err) {
    265                         case EPIPE:
    266                         case ECONNRESET:
    267                                 close(in->fd);
    268                                 free(in);
    269                                 return;
    270                         default:
    271                                 std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
    272                                 exit(EXIT_FAILURE);
    273                 }
    274         }
    275 
    276         if( res >= in->length ) {
    277                 free(in);
    278                 return;
    279         }
    280 
    281         struct io_uring_sqe * sqe = get_sqe(ring);
    282         io_uring_prep_write(sqe, in->fd, in->buff + res, in->length - res, 0);
    283         io_uring_sqe_set_data(sqe, in);
    284         submit(ring);
    285         // std::cout << "Re-Submitted request: " << in << " (" << (void*)in->buffer << ")"<<std::endl;
    286 
    287         ring_request(ring, in->fd);
    288 }
     246                        }
     247                        std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
     248                        exit(EXIT_FAILURE);
     249                }
     250
     251                // Count the connections
     252                ::stats.completions.conns++;
     253                ::stats.conns.current++;
     254                if(::stats.conns.current > ::stats.conns.max) {
     255                        ::stats.conns.max = ::stats.conns.current;
     256                }
     257
     258                // Read on the data
     259                fd = ret;
     260                request(ring);
     261
     262                // check the max fd so we know if we exceeded the array
     263                for(;;) {
     264                        int expected = max_fd;
     265                        if(expected >= fd) return;
     266                        if( __atomic_compare_exchange_n(&max_fd, &expected, fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) return;
     267                }
     268
     269                // check if we have enough space to fit inside the array
     270                if(fd >= array_max) {
     271                        std::cerr << "accept error: fd " << fd << " is too high" << std::endl;
     272                        return;
     273                }
     274
     275                // Put our connection into the global array
     276                // No one else should be using it so if they are that's a bug
     277                auto exist = __atomic_exchange_n( &conns[fd], this, __ATOMIC_SEQ_CST);
     278                if( exist ) {
     279                        size_t first = __atomic_fetch_add(&global_stats.recycle_errors, 1, __ATOMIC_SEQ_CST);
     280                        if( first == 0 ) {
     281                                std::cerr << "First: accept has existing connection " << std::endl;
     282                        }
     283                }
     284        }
     285
     286        // Handle a new request, results for getting an cqe while in the REQUESTING state
     287        void newrequest(struct io_uring * ring, int res) {
     288                // Check errors
     289                if( res < 0 ) {
     290                        int err = -res;
     291                        switch(err) {
     292                                case EPIPE:
     293                                        ::stats.errors.requests.pipes++;
     294                                        break;
     295                                        // Don't fall through the get better stats
     296                                case ECONNRESET:
     297                                        ::stats.errors.requests.reset++;
     298                                        break;
     299                                default:
     300                                        ::stats.errors.requests.other++;
     301                                        std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl;
     302                                        exit(EXIT_FAILURE);
     303                        }
     304
     305                        // Connection failed, close it
     306                        this->close(err);
     307                        return;
     308                }
     309
     310                // Update stats
     311                ::stats.completions.reads++;
     312
     313                // Is this an EOF
     314                if(res == 0) {
     315                        // Yes, close the connection
     316                        this->close(0);
     317                        return;
     318                }
     319
     320                // Find the end of the request header
     321                const char * it = buffer;
     322                if( !strstr( it, "\r\n\r\n" ) ) {
     323                        // This state machine doesn't support incomplete reads
     324                        // Print them to output so it's clear there is an issue
     325                        std::cout << "Incomplete request" << std::endl;
     326                        this->close(EBADR);
     327                        return;
     328                }
     329
     330                // Find the method to use
     331                it = buffer;
     332                int ret = memcmp(it, "GET ", 4);
     333                if( ret != 0 ) {
     334                        // We only support get, answer with an error
     335                        answer(ring, E400);
     336                        return;
     337                }
     338
     339                // Find the target
     340                it += 4;
     341                ret = memcmp(it, "/plaintext", 10);
     342                if( ret != 0 ) {
     343                        // We only support /plaintext, answer with an error
     344                        answer(ring, E404);
     345                        return;
     346                }
     347
     348                // Correct request, answer with the payload
     349                this->stats.requests++;
     350                answer(ring, OK200);
     351        }
     352
     353        // Handle a partial or full answer sent, results for getting an cqe while in the ANSWERING state
     354        void writedone(struct io_uring * ring, int res) {
     355                // Check errors
     356                if( res < 0 ) {
     357                        int err = -res;
     358                        switch(err) {
     359                                case EPIPE:
     360                                        ::stats.errors.answers.pipes++;
     361                                        break;
     362                                        // Don't fall through the get better stats
     363                                case ECONNRESET:
     364                                        ::stats.errors.answers.reset++;
     365                                        break;
     366                                default:
     367                                        ::stats.errors.answers.other++;
     368                                        std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
     369                                        exit(EXIT_FAILURE);
     370                        }
     371
     372                        this->close(err);
     373                        return;
     374                }
     375
     376                // Update stats
     377                ::stats.completions.writes++;
     378                if(res == 124) ::stats.completions.full_writes++;
     379
     380                // Is this write completed
     381                if( res == to_send ) {
     382                        // Yes, more stats
     383                        this->stats.answers++;
     384                        if(this->stats.answers == 1) ::stats.conns.used++;
     385                        // Then read a new request
     386                        request(ring);
     387                        return;
     388                }
     389
     390                // Not a completed read, push the rest
     391                to_send -= res;
     392                iterator += res;
     393                answer(ring);
     394        }
     395public:
     396        // Submit a call to accept and create a new connection object
     397        static void accept(struct io_uring * ring, const struct options_t & opt) {
     398                struct io_uring_sqe * sqe = get_sqe(ring);
     399                io_uring_prep_accept(sqe, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
     400                submit(ring, sqe, new connection());
     401                // std::cout << "Submitted accept: " << req << std::endl;
     402        }
     403
     404        // Handle a new cqe
     405        void handle(struct io_uring * ring, int res, const struct options_t & opt) {
     406                switch(state) {
     407                case ACCEPTING:
     408                        connection::accept(ring, opt);
     409                        newconn(ring, res);
     410                        break;
     411                case REQUESTING:
     412                        newrequest(ring, res);
     413                        break;
     414                case ANSWERING:
     415                        writedone(ring, res);
     416                        break;
     417                }
     418        }
     419};
    289420
    290421//=========================================================
    291 extern "C" {
    292 extern int __io_uring_flush_sq(struct io_uring *ring);
    293 }
    294 
     422// Main loop of the WebServer
     423// Effectively uses one thread_local copy of everything per kernel thread
    295424void * proc_loop(void * arg) {
    296         size_t count = 0;
     425        // Get the thread local argument
    297426        struct options_t & opt = *(struct options_t *)arg;
    298 
    299427        struct io_uring * ring = opt.ring;
    300428
     429        // Track the shutdown using a event_fd
    301430        char endfd_buf[8];
    302431        ring_end(ring, opt.endfd, endfd_buf, 8);
    303432
    304         ring_accept(ring, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
    305 
    306         bool done = false;
     433        // Accept our first connection
     434        // May not take effect until io_uring_submit_and_wait
     435        connection::accept(ring, opt);
     436
     437        int reset = 1;       // Counter to print stats once in a while
     438        bool done = false;   // Are we done
     439        size_t sqes = 0;     // Number of sqes we submitted
     440        size_t call = 0;     // Number of submits we made
    307441        while(!done) {
    308                 struct io_uring_cqe *cqe;
    309                 int ret;
    310                 while(-EAGAIN == (ret = io_uring_wait_cqe_nr(ring, &cqe, 0))) {
    311                         ret = io_uring_submit_and_wait(ring, 1);
    312                         if (ret < 0) {
    313                                 fprintf( stderr, "io_uring get error: (%d) %s\n", (int)-ret, strerror(-ret) );
    314                                 exit(EXIT_FAILURE);
    315                         }
    316                         opt.result.subs += ret;
    317                         opt.result.cnts++;
    318                 }
    319 
    320                 if (ret < 0 && -EAGAIN != ret) {
    321                         fprintf( stderr, "io_uring peek error: (%d) %s\n", (int)-ret, strerror(-ret) );
     442                // Submit all the answers we have and wait for responses
     443                int ret = io_uring_submit_and_wait(ring, 1);
     444
     445                // check errors
     446                if (ret < 0) {
     447                        fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );
    322448                        exit(EXIT_FAILURE);
    323449                }
    324450
    325                 auto req = (struct request_t *)cqe->user_data;
    326                 // std::cout << req << " completed with " << cqe->res << std::endl;
    327 
    328                 switch(req->type) {
    329                         case EVENT_END:
     451                // Check how good we are at batching sqes
     452                sqes += ret;
     453                call++;
     454
     455                struct io_uring_cqe *cqe;
     456                unsigned head;
     457                unsigned count = 0;
     458
     459                // go through all cqes
     460                io_uring_for_each_cqe(ring, head, cqe) {
     461                        if (0 == cqe->user_data) {
    330462                                done = true;
    331463                                break;
    332                         case EVENT_ACCEPT:
    333                                 handle_new_conn(ring, cqe->res);
    334                                 free(req);
    335                                 ring_accept(ring, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
    336                                 break;
    337                         case EVENT_REQUEST:
    338                                 handle_request(ring, req, cqe->res);
    339                                 break;
    340                         case EVENT_ANSWER:
    341                                 handle_answer(ring, req, cqe->res);
    342                                 break;
    343                 }
    344 
    345                 io_uring_cqe_seen(ring, cqe);
    346         }
    347 
    348         return (void*)count;
     464                        }
     465
     466                        auto req = (class connection *)cqe->user_data;
     467                        req->handle( ring, cqe->res, opt );
     468
     469                        // Every now and then, print some stats
     470                        reset--;
     471                        if(reset == 0) {
     472                                std::cout << "Submit average: " << sqes << "/" << call << "(" << (((double)sqes) / call) << ")" << std::endl;
     473                                // Reset to some random number of completions
     474                                // use the ring_fd in the number of threads don't all print at once
     475                                reset = 100000 + (100000 * (ring->ring_fd % 5));
     476                        }
     477
     478                        // Keep track of how many cqes we have seen
     479                        count++;
     480                }
     481
     482                // Mark the cqes as seen
     483                io_uring_cq_advance(ring, count);
     484        }
     485
     486        // Tally all the thread local statistics
     487        __atomic_fetch_add( &global_stats.completions.conns, ::stats.completions.conns, __ATOMIC_SEQ_CST );
     488        __atomic_fetch_add( &global_stats.completions.reads, ::stats.completions.reads, __ATOMIC_SEQ_CST );
     489        __atomic_fetch_add( &global_stats.completions.writes, ::stats.completions.writes, __ATOMIC_SEQ_CST );
     490        __atomic_fetch_add( &global_stats.completions.full_writes, ::stats.completions.full_writes, __ATOMIC_SEQ_CST );
     491        __atomic_fetch_add( &global_stats.errors.conns, ::stats.errors.conns, __ATOMIC_SEQ_CST );
     492        __atomic_fetch_add( &global_stats.errors.requests.pipes, ::stats.errors.requests.pipes, __ATOMIC_SEQ_CST );
     493        __atomic_fetch_add( &global_stats.errors.requests.reset, ::stats.errors.requests.reset, __ATOMIC_SEQ_CST );
     494        __atomic_fetch_add( &global_stats.errors.requests.other, ::stats.errors.requests.other, __ATOMIC_SEQ_CST );
     495        __atomic_fetch_add( &global_stats.errors.answers.pipes, ::stats.errors.answers.pipes, __ATOMIC_SEQ_CST );
     496        __atomic_fetch_add( &global_stats.errors.answers.reset, ::stats.errors.answers.reset, __ATOMIC_SEQ_CST );
     497        __atomic_fetch_add( &global_stats.errors.answers.other, ::stats.errors.answers.other, __ATOMIC_SEQ_CST );
     498        __atomic_fetch_add( &global_stats.conns.current, ::stats.conns.current, __ATOMIC_SEQ_CST );
     499        __atomic_fetch_add( &global_stats.conns.max, ::stats.conns.max, __ATOMIC_SEQ_CST );
     500        __atomic_fetch_add( &global_stats.conns.used, ::stats.conns.used, __ATOMIC_SEQ_CST );
     501
     502        return nullptr;
    349503}
    350504
    351505//=========================================================
    352 struct __attribute__((aligned(128))) aligned_ring {
    353         struct io_uring storage;
    354 };
    355 
    356 #include <bit>
    357 
    358 #include <pthread.h>
     506#include <bit> // for ispow2
     507
    359508extern "C" {
    360         #include <signal.h>
    361         #include <sys/eventfd.h>
    362         #include <sys/socket.h>
    363         #include <netinet/in.h>
     509        #include <pthread.h>      // for pthreads
     510        #include <signal.h>       // for signal(SIGPIPE, SIG_IGN);
     511        #include <sys/eventfd.h>  // use for termination
     512        #include <sys/socket.h>   // for sockets in general
     513        #include <netinet/in.h>   // for sockaddr_in, AF_INET
    364514}
    365515
    366516int main(int argc, char * argv[]) {
     517        // Initialize the array of connection-fd associations
     518        for(int i = 0; i < array_max; i++) {
     519                conns[i] = nullptr;
     520        }
     521
     522        // Make sure we ignore all sigpipes
    367523        signal(SIGPIPE, SIG_IGN);
    368524
    369         unsigned nthreads = 1;
    370         unsigned port = 8800;
    371         unsigned entries = 256;
    372         unsigned backlog = 10;
    373         bool attach = false;
     525        // Default command line arguments
     526        unsigned nthreads = 1;      // number of kernel threads
     527        unsigned port = 8800;       // which port to listen on
     528        unsigned entries = 256;     // number of entries per ring/kernel thread
     529        unsigned backlog = 262144;  // backlog argument to listen
     530        bool attach = false;        // Whether or not to attach all the rings
     531        bool sqpoll = false;        // Whether or not to use SQ Polling
    374532
    375533        //===================
    376         // Arguments
     534        // Arguments Parsing
    377535        int c;
    378         while ((c = getopt (argc, argv, "t:p:e:b:a")) != -1) {
     536        while ((c = getopt (argc, argv, "t:p:e:b:aS")) != -1) {
    379537                switch (c)
    380538                {
     
    394552                        attach = true;
    395553                        break;
     554                case 'S':
     555                        sqpoll = true;
     556                        break;
    396557                case '?':
    397558                default:
    398                         std::cerr << "Usage: -t <threads> -p <port> -e <entries> -b <backlog> -a" << std::endl;
     559                        std::cerr << "Usage: -t <threads> -p <port> -e <entries> -b <backlog> -aS" << std::endl;
    399560                        return EXIT_FAILURE;
    400561                }
     
    416577        //===================
    417578        // End FD
     579        // Create a single event fd to notify the kernel threads when the server shutsdown
    418580        int efd = eventfd(0, EFD_SEMAPHORE);
    419581        if (efd < 0) {
     
    424586        //===================
    425587        // Open Socket
     588        // Listen on specified port
    426589        std::cout << getpid() << " : Listening on port " << port << std::endl;
    427590        int server_fd = socket(AF_INET, SOCK_STREAM, 0);
     
    439602        address.sin_port = htons( port );
    440603
     604        // In case the port is already in use, don't just return an error
     605        // Linux is very slow at reclaiming port so just retry regularly
    441606        int waited = 0;
    442607        while(true) {
     
    444609                if(ret < 0) {
    445610                        if(errno == EADDRINUSE) {
     611                                // Port is in used let's retry later
    446612                                if(waited == 0) {
    447613                                        std::cerr << "Waiting for port" << std::endl;
    448614                                } else {
     615                                        // To be cure, print how long we have been waiting
    449616                                        std::cerr << "\r" << waited;
    450617                                        std::cerr.flush();
    451618                                }
    452619                                waited ++;
    453                                 usleep( 1000000 );
     620                                usleep( 1000000 ); // Wait and retry
    454621                                continue;
    455622                        }
     623                        // Some other error occured, this is a real error
    456624                        std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl;
    457625                        exit(EXIT_FAILURE);
     
    474642        std::cout << std::endl;
    475643
     644        // Create the desired number of kernel-threads and for each
     645        // create a ring. Create the rings in the main so we can attach them
     646        // Since the rings are all in a dense VLA, aligned them so we don't get false sharing
     647        // it's unlikely but better safe than sorry
     648        struct __attribute__((aligned(128))) aligned_ring {
     649                struct io_uring storage;
     650        };
    476651        aligned_ring thrd_rings[nthreads];
    477652        pthread_t    thrd_hdls[nthreads];
    478653        options_t    thrd_opts[nthreads];
     654        bool no_drops  = true;
     655        bool fast_poll = true;
     656        bool nfix_sqpl = true;
    479657        for(unsigned i = 0; i < nthreads; i++) {
    480                 if(!attach || i == 0) {
    481                         io_uring_queue_init(entries, &thrd_rings[i].storage, 0);
    482                 }
    483                 else {
    484                         struct io_uring_params p;
    485                         memset(&p, 0, sizeof(p));
    486                         p.flags = IORING_SETUP_ATTACH_WQ;
     658                struct io_uring_params p = { };
     659
     660                if(sqpoll) { // If sqpoll is on, add the flag
     661                        p.flags |= IORING_SETUP_SQPOLL;
     662                        p.sq_thread_idle = 100;
     663                }
     664
     665                if (attach && i != 0) { // If attach is on, add the flag, except for the first ring
     666                        p.flags |= IORING_SETUP_ATTACH_WQ;
    487667                        p.wq_fd = thrd_rings[0].storage.ring_fd;
    488                         io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p);
    489                 }
    490 
     668                }
     669
     670                // Create the ring
     671                io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p);
     672
     673                // Check if some of the note-worthy features are there
     674                if(0 == (p.features & IORING_FEAT_NODROP         )) { no_drops  = false; }
     675                if(0 == (p.features & IORING_FEAT_FAST_POLL      )) { fast_poll = false; }
     676                if(0 == (p.features & IORING_FEAT_SQPOLL_NONFIXED)) { nfix_sqpl = false; }
     677
     678                // Write the socket options we want to the options we pass to the threads
    491679                thrd_opts[i].acpt.sockfd  = server_fd;
    492680                thrd_opts[i].acpt.addr    = (struct sockaddr *)&address;
     
    502690                }
    503691        }
     692
     693        // Tell the user if the features are present
     694        if( no_drops ) std::cout << "No Drop Present" << std::endl;
     695        if( fast_poll) std::cout << "Fast Poll Present" << std::endl;
     696        if(!nfix_sqpl) std::cout << "Non-Fixed SQ Poll not Present" << std::endl;
    504697
    505698        //===================
     
    510703                int ret;
    511704                do {
     705                        // Wait for a Ctrl-D to close the server
    512706                        ret = read(STDIN_FILENO, buffer, 128);
    513707                        if(ret < 0) {
     
    526720
    527721        //===================
     722        // Use eventfd_write to tell the threads we are closing
    528723        (std::cout << "Sending Shutdown to Threads... ").flush();
    529724        ret = eventfd_write(efd, nthreads);
     
    535730
    536731        //===================
     732        // Join all the threads and close the rings
    537733        (std::cout << "Stopping Threads Done... ").flush();
    538         size_t total = 0;
    539         size_t count = 0;
    540734        for(unsigned i = 0; i < nthreads; i++) {
    541735                void * retval;
     
    545739                        exit(EXIT_FAILURE);
    546740                }
    547                 // total += (size_t)retval;
    548                 total += thrd_opts[i].result.subs;
    549                 count += thrd_opts[i].result.cnts;
    550741
    551742                io_uring_queue_exit(thrd_opts[i].ring);
    552743        }
    553744        std::cout << "done" << std::endl;
    554         std::cout << "Submit average: " << total << "/" << count << "(" << (((double)total) / count) << ")" << std::endl;
    555745
    556746        //===================
     747        // Close the sockets
    557748        (std::cout << "Closing Socket... ").flush();
    558749        ret = shutdown( server_fd, SHUT_RD );
     
    567758                exit(EXIT_FAILURE);
    568759        }
    569         std::cout << "done" << std::endl;
     760        std::cout << "done" << std::endl << std::endl;
     761
     762        // Print stats and exit
     763        std::cout << "Errors: " << global_stats.errors.conns << "c, (" << global_stats.errors.requests.pipes << "p, " << global_stats.errors.requests.reset << "r, " << global_stats.errors.requests.other << "o" << ")r, (" << global_stats.errors.answers.pipes << "p, " << global_stats.errors.answers.reset << "r, " << global_stats.errors.answers.other << "o" << ")a" << std::endl;
     764        std::cout << "Completions: " << global_stats.completions.conns << "c, " << global_stats.completions.reads << "r, " << global_stats.completions.writes << "w" << std::endl;
     765        std::cout << "Full Writes: " << global_stats.completions.full_writes << std::endl;
     766        std::cout << "Max FD: " << max_fd << std::endl;
     767        std::cout << "Successful connections: " << global_stats.conns.used << std::endl;
     768        std::cout << "Max concurrent connections: " << global_stats.conns.max << std::endl;
     769        std::cout << "Accepts on non-zeros: " << global_stats.recycle_errors << std::endl;
     770        std::cout << "Leaked conn objects: " << global_stats.conns.current << std::endl;
    570771}
     772
     773// compile-command: "g++ http_ring.cpp -std=c++2a -pthread -luring -O3" //
Note: See TracChangeset for help on using the changeset viewer.