Ignore:
Timestamp:
Jan 28, 2021, 9:46:29 PM (9 months ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
arm-eh, jacob/cs343-translation, master, new-ast-unique-expr
Children:
9715567
Parents:
c05c58f
Message:

Massive changes to how http_ring works

File:
1 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/http_ring.cpp

    rc05c58f r761a246  
    99#include <liburing.h>
    1010
    11 typedef enum {
    12         EVENT_END,
    13         EVENT_ACCEPT,
    14         EVENT_REQUEST,
    15         EVENT_ANSWER
    16 } event_t;
    17 
    18 struct __attribute__((aligned(128))) request_t {
    19         event_t type;
    20         int fd;
    21         size_t length;
    22         char * buff;
    23         char data[0];
    24 
    25         static struct request_t * create(event_t type, size_t extra) {
    26                 auto ret = (struct request_t *)malloc(sizeof(struct request_t) + extra);
    27                 ret->type = type;
    28                 ret->length = extra;
    29                 ret->buff = ret->data;
    30                 return ret;
    31         }
    32 
    33         static struct request_t * create(event_t type) {
    34                 return create(type, 0);
    35         }
    36 };
    37 
     11// Options passed to each threads
    3812struct __attribute__((aligned(128))) options_t {
     13        // Data passed to accept
    3914        struct {
    4015                int sockfd;
     
    4419        } acpt;
    4520
     21        // Termination notification
    4622        int endfd;
     23
     24        // The ring to use for io
    4725        struct io_uring * ring;
    48 
     26};
     27
     28//=========================================================
     29// General statistics
     30struct __attribute__((aligned(128))) stats_block_t {
    4931        struct {
    50                 size_t subs = 0;
    51                 size_t cnts = 0;
    52         } result;
     32                volatile size_t conns = 0;
     33                volatile size_t reads = 0;
     34                volatile size_t writes = 0;
     35                volatile size_t full_writes = 0;
     36        } completions;
     37
     38        struct {
     39                volatile size_t conns = 0;
     40                struct {
     41                        volatile size_t pipes = 0;
     42                        volatile size_t reset = 0;
     43                        volatile size_t other = 0;
     44                } requests;
     45
     46                struct {
     47                        volatile size_t pipes = 0;
     48                        volatile size_t reset = 0;
     49                        volatile size_t other = 0;
     50                } answers;
     51        } errors;
     52
     53        struct {
     54                volatile size_t current = 0;
     55                volatile size_t max = 0;
     56                volatile size_t used = 0;
     57        } conns;
     58
     59        volatile size_t recycle_errors = 0;
    5360};
    5461
    55 volatile size_t total = 0;
    56 volatile size_t count = 0;
     62// Each thread gets its own block of stats
     63// and there is a global block for tallying at the end
     64thread_local stats_block_t stats;
     65stats_block_t global_stats;
     66
     67// Get an array of current connections
     68// This is just for debugging, to make sure
     69// no two state-machines get the same fd
     70const size_t array_max = 25000;
     71class connection * volatile conns[array_max] = { 0 };
     72
     73// Max fd we've seen, keep track so it's convenient to adjust the array size after
     74volatile int max_fd = 0;
    5775
    5876//=========================================================
     77// Some small wrappers for ring operations used outside the connection state machine
     78// get sqe + error handling
    5979static struct io_uring_sqe * get_sqe(struct io_uring * ring) {
    6080        struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
     
    6686}
    6787
    68 static void submit(struct io_uring * ) {
    69         // io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
    70         // io_uring_submit(ring);
    71 }
    72 
    73 //=========================================================
     88// read of the event fd is not done by a connection
     89// use nullptr as the user data
    7490static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) {
    7591        struct io_uring_sqe * sqe = get_sqe(ring);
    7692        io_uring_prep_read(sqe, fd, buffer, len, 0);
    77         io_uring_sqe_set_data(sqe, request_t::create(EVENT_END));
    78         submit(ring);
     93        io_uring_sqe_set_data(sqe, nullptr);
     94        io_uring_submit(ring);
    7995}
    8096
    81 static void ring_accept(struct io_uring * ring, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
    82         auto req = request_t::create(EVENT_ACCEPT);
    83         struct io_uring_sqe * sqe = get_sqe(ring);
    84         io_uring_prep_accept(sqe, sockfd, addr, addrlen, flags);
    85         io_uring_sqe_set_data(sqe, req);
    86         submit(ring);
    87         // std::cout << "Submitted accept: " << req << std::endl;
    88 }
    89 
    90 static void ring_request(struct io_uring * ring, int fd) {
    91         size_t size = 1024;
    92         auto req = request_t::create(EVENT_REQUEST, size);
    93         req->fd = fd;
    94 
    95         struct io_uring_sqe * sqe = get_sqe(ring);
    96         io_uring_prep_read(sqe, fd, req->buff, size, 0);
    97         io_uring_sqe_set_data(sqe, req);
    98         submit(ring);
    99         // std::cout << "Submitted request: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
    100 }
    101 
    10297//=========================================================
     98// All answers are fixed and determined by the return code
    10399enum HttpCode {
    104100        OK200 = 0,
     
    112108};
    113109
     110// Get a fix reply based on the return code
    114111const char * http_msgs[] = {
    115         "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: %zu \n\n%s",
    116         "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    117         "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    118         "HTTP/1.1 405 Method Not Allowed\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    119         "HTTP/1.1 408 Request Timeout\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    120         "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    121         "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
     112        "HTTP/1.1 200 OK\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 15\r\nConnection: keep-alive\r\n\r\nHello, World!\r\n",
     113        "HTTP/1.1 400 Bad Request\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
     114        "HTTP/1.1 404 Not Found\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
     115        "HTTP/1.1 405 Method Not \r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
     116        "HTTP/1.1 408 Request Timeout\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
     117        "HTTP/1.1 413 Payload Too Large\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
     118        "HTTP/1.1 414 URI Too Long\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
    122119};
    123 
    124 static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0])));
    125 
    126 const int http_codes[] = {
    127         200,
    128         400,
    129         404,
    130         405,
    131         408,
    132         413,
    133         414,
     120static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0])) );
     121
     122// Pre-compute the length of these replys
     123const size_t http_lens[] = {
     124        strlen(http_msgs[0]),
     125        strlen(http_msgs[1]),
     126        strlen(http_msgs[2]),
     127        strlen(http_msgs[3]),
     128        strlen(http_msgs[4]),
     129        strlen(http_msgs[5]),
     130        strlen(http_msgs[6]),
    134131};
    135 
    136 static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0])));
    137 
    138 int code_val(HttpCode code) {
    139         return http_codes[code];
    140 }
    141 
    142 static void ring_answer(struct io_uring * ring, int fd, HttpCode code) {
    143         size_t size = 256;
    144         auto req = request_t::create(EVENT_ANSWER, size);
    145         req->fd = fd;
    146 
    147         const char * fmt = http_msgs[code];
    148         const char * date = "";
    149         size = snprintf(req->buff, size, fmt, date, size);
    150 
    151         struct io_uring_sqe * sqe = get_sqe(ring);
    152         io_uring_prep_write(sqe, fd, req->buff, size, 0);
    153         io_uring_sqe_set_data(sqe, req);
    154         submit(ring);
    155         // std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
    156 }
    157 
    158 static void ring_answer(struct io_uring * ring, int fd, const std::string &) {
    159         // size_t size = 256;
    160         // auto req = request_t::create(EVENT_ANSWER, size);
    161         // req->fd = fd;
    162 
    163         // const char * fmt = http_msgs[OK200];
    164         // const char * date = "";
    165         // size_t len = snprintf(req->buffer, size, fmt, date, ans.size(), ans.c_str());
    166         // req->length = len;
    167 
    168         // struct io_uring_sqe * sqe = get_sqe(ring);
    169         // io_uring_prep_write(sqe, fd, req->buffer, len, 0);
    170         // io_uring_sqe_set_data(sqe, req);
    171         // submit(ring);
    172         // std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
    173 
    174 
    175         static const char* RESPONSE = "HTTP/1.1 200 OK\r\n" \
    176                                                 "Content-Length: 15\r\n" \
    177                                                 "Content-Type: text/html\r\n" \
    178                                                 "Connection: keep-alive\r\n" \
    179                                                 "Server: testserver\r\n" \
    180                                                 "\r\n" \
    181                                                 "Hello, World!\r\n";
    182 
    183         static const size_t RLEN = strlen(RESPONSE);
    184 
    185         size_t size = 256;
    186         auto req = request_t::create(EVENT_ANSWER, size);
    187         req->fd = fd;
    188         req->buff = (char*)RESPONSE;
    189         req->length = RLEN;
    190 
    191         // const char * fmt = http_msgs[OK200];
    192         // const char * date = "";
    193         // size_t len = snprintf(req->buffer, size, fmt, date, ans.size(), ans.c_str());
    194         // req->length = len;
    195 
    196         struct io_uring_sqe * sqe = get_sqe(ring);
    197         io_uring_prep_write(sqe, fd, RESPONSE, RLEN, 0);
    198         io_uring_sqe_set_data(sqe, req);
    199         submit(ring);
    200 }
     132static_assert( KNOWN_CODES == (sizeof(http_lens) / sizeof(http_lens[0])) );
    201133
    202134//=========================================================
    203 static void handle_new_conn(struct io_uring * ring, int fd) {
    204         if( fd < 0 ) {
    205                 int err = -fd;
    206                 if( err == ECONNABORTED ) return;
    207                 std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
    208                 exit(EXIT_FAILURE);
    209         }
    210 
    211         ring_request(ring, fd);
    212 }
    213 
    214 static void handle_request(struct io_uring * ring, struct request_t * in, int res) {
    215         if( res < 0 ) {
    216                 int err = -res;
    217                 switch(err) {
    218                         case EPIPE:
    219                         case ECONNRESET:
    220                                 close(in->fd);
    221                                 free(in);
     135// Finate state machine responsible for handling each connection
     136class __attribute__((aligned(128))) connection {
     137private:
     138        // The state of the machine
     139        enum {
     140                ACCEPTING,  // Accept sent waiting for connection
     141                REQUESTING, // Waiting for new request
     142                ANSWERING,  // Either request received submitting answer or short answer sent, need to submit rest
     143        } state;
     144
     145        // The file descriptor of the connection
     146        int fd;
     147
     148        // request data
     149        static const size_t buffer_size = 1024; // Size of the read buffer
     150        const char * buffer;                      // Buffer into which requests are read
     151
     152        // send data
     153        size_t to_send;         // Data left to send
     154        const char * iterator;  // Pointer to rest of the message to send
     155
     156        // stats
     157        // how many requests/answers were complete, that is, a valid cqe was obtained
     158        struct {
     159                size_t requests = 0;
     160                size_t answers = 0;
     161        } stats;
     162
     163private:
     164        connection()
     165                : state(ACCEPTING)
     166                , fd(0)
     167                , buffer( new char[buffer_size])
     168                , iterator(nullptr)
     169        {
     170                ::stats.conns.max++;
     171                ::stats.conns.current++;
     172        }
     173
     174        ~connection() {
     175                delete [] buffer;
     176                ::stats.conns.current--;
     177        }
     178
     179        // Close the current connection
     180        void close(int err) {
     181                // std::cout << "(" << this->stats.requests << "," << this->stats.answers << ", e" << err << ") ";
     182                conns[fd] = nullptr;
     183
     184                if(fd != 0) {
     185                        ::close(fd);
     186                }
     187                delete this;
     188        }
     189
     190        //--------------------------------------------------
     191        // Wrappers for submit so we can tweak it more easily
     192        static void submit(struct io_uring * ring, struct io_uring_sqe * sqe, connection * conn) {
     193                (void)ring;
     194                // io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
     195                io_uring_sqe_set_data(sqe, conn);
     196                // io_uring_submit(ring);
     197        }
     198
     199        void submit(struct io_uring * ring, struct io_uring_sqe * sqe) {
     200                submit(ring, sqe, this);
     201        }
     202
     203        //--------------------------------------------------
     204        // get a new request from the client
     205        void request(struct io_uring * ring) {
     206                state = REQUESTING;
     207                struct io_uring_sqe * sqe = get_sqe(ring);
     208                io_uring_prep_read(sqe, fd, (void*)buffer, buffer_size, 0);
     209                submit(ring, sqe);
     210        }
     211
     212        //--------------------------------------------------
     213        // Send a new answer based on a return code
     214        void answer(struct io_uring * ring, HttpCode code) {
     215                iterator = http_msgs[code];
     216                to_send  = http_lens[code];
     217                if(to_send != 124) {
     218                        std::cerr << "Answer has weird size: " << to_send << " (" << (int)code << ")" << std::endl;
     219                }
     220                answer(ring);
     221        }
     222
     223        // send a new answer to the client
     224        // Reused for incomplete writes
     225        void answer(struct io_uring * ring) {
     226                state = ANSWERING;
     227                struct io_uring_sqe * sqe = get_sqe(ring);
     228                io_uring_prep_write(sqe, fd, iterator, to_send, 0);
     229                submit(ring, sqe);
     230        }
     231
     232        //--------------------------------------------------
     233        // Handle a new connection, results for getting an cqe while in the ACCEPTING state
     234        void newconn(struct io_uring * ring, int ret) {
     235                // Check errors
     236                if( ret < 0 ) {
     237                        int err = -ret;
     238                        if( err == ECONNABORTED ) {
     239                                ::stats.errors.conns++;
     240                                this->close(err);
    222241                                return;
    223                         default:
    224                                 std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl;
    225                                 exit(EXIT_FAILURE);
    226                 }
    227         }
    228 
    229         if(res == 0) {
    230                 close(in->fd);
    231                 free(in);
    232                 return;
    233         }
    234 
    235         const char * it = in->buff;
    236         if( !strstr( it, "\r\n\r\n" ) ) {
    237                 std::cout << "Incomplete request" << std::endl;
    238                 close(in->fd);
    239                 free(in);
    240                 return;
    241         }
    242 
    243         it = in->buff;
    244         const std::string reply = "Hello, World!\n";
    245         int ret = memcmp(it, "GET ", 4);
    246         if( ret != 0 ) {
    247                 ring_answer(ring, in->fd, E400);
    248                 goto NEXT;
    249         }
    250 
    251         it += 4;
    252         ret = memcmp(it, "/plaintext", 10);
    253         if( ret != 0 ) {
    254                 ring_answer(ring, in->fd, E404);
    255                 goto NEXT;
    256         }
    257 
    258         ring_answer(ring, in->fd, reply);
    259 
    260         NEXT:
    261                 ring_request(ring, in->fd);
    262                 free(in);
    263                 return;
    264 }
    265 
    266 static void handle_answer(struct io_uring * ring, struct request_t * in, int res) {
    267         if( res < 0 ) {
    268                 int err = -res;
    269                 switch(err) {
    270                         case EPIPE:
    271                         case ECONNRESET:
    272                                 close(in->fd);
    273                                 free(in);
    274                                 return;
    275                         default:
    276                                 std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
    277                                 exit(EXIT_FAILURE);
    278                 }
    279         }
    280 
    281         if( res >= in->length ) {
    282                 free(in);
    283                 return;
    284         }
    285 
    286         struct io_uring_sqe * sqe = get_sqe(ring);
    287         io_uring_prep_write(sqe, in->fd, in->buff + res, in->length - res, 0);
    288         io_uring_sqe_set_data(sqe, in);
    289         submit(ring);
    290         // std::cout << "Re-Submitted request: " << in << " (" << (void*)in->buffer << ")"<<std::endl;
    291 
    292         ring_request(ring, in->fd);
    293 }
     242                        }
     243                        std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
     244                        exit(EXIT_FAILURE);
     245                }
     246
     247                // Count the connections
     248                ::stats.completions.conns++;
     249
     250                // Read on the data
     251                fd = ret;
     252                request(ring);
     253
     254                // check the max fd so we know if we exceeded the array
     255                for(;;) {
     256                        int expected = max_fd;
     257                        if(expected >= fd) return;
     258                        if( __atomic_compare_exchange_n(&max_fd, &expected, fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) return;
     259                }
     260
     261                // check if we have enough space to fit inside the array
     262                if(fd >= array_max) {
     263                        std::cerr << "accept error: fd " << fd << " is too high" << std::endl;
     264                        return;
     265                }
     266
     267                // Put our connection into the global array
     268                // No one else should be using it so if they are that's a bug
     269                auto exist = __atomic_exchange_n( &conns[fd], this, __ATOMIC_SEQ_CST);
     270                if( exist ) {
     271                        size_t first = __atomic_fetch_add(&global_stats.recycle_errors, 1, __ATOMIC_SEQ_CST);
     272                        if( first == 0 ) {
     273                                std::cerr << "First: accept has existing connection " << std::endl;
     274                        }
     275                }
     276        }
     277
     278        // Handle a new request, results for getting an cqe while in the REQUESTING state
     279        void newrequest(struct io_uring * ring, int ret) {
     280                // Check errors
     281                if( ret < 0 ) {
     282                        int err = -ret;
     283                        switch(err) {
     284                                case EPIPE:
     285                                        ::stats.errors.requests.pipes++;
     286                                        break;
     287                                        // Don't fall through the get better stats
     288                                case ECONNRESET:
     289                                        ::stats.errors.requests.reset++;
     290                                        break;
     291                                default:
     292                                        ::stats.errors.requests.other++;
     293                                        std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl;
     294                                        exit(EXIT_FAILURE);
     295                        }
     296
     297                        // Connection failed, close it
     298                        this->close(err);
     299                        return;
     300                }
     301
     302                // Update stats
     303                ::stats.completions.reads++;
     304
     305                // Is this an EOF
     306                if(ret == 0) {
     307                        // Yes, close the connection
     308                        this->close(0);
     309                        return;
     310                }
     311
     312                // Find the end of the request header
     313                const char * it = buffer;
     314                if( !strstr( it, "\r\n\r\n" ) ) {
     315                        // This state machine doesn't support incomplete reads
     316                        // Print them to output so it's clear there is an issue
     317                        std::cout << "Incomplete request" << std::endl;
     318                        this->close(EBADR);
     319                        return;
     320                }
     321
     322                // Find the method to use
     323                it = buffer;
     324                int ret = memcmp(it, "GET ", 4);
     325                if( ret != 0 ) {
     326                        // We only support get, answer with an error
     327                        answer(ring, E400);
     328                        return;
     329                }
     330
     331                // Find the target
     332                it += 4;
     333                ret = memcmp(it, "/plaintext", 10);
     334                if( ret != 0 ) {
     335                        // We only support /plaintext, answer with an error
     336                        answer(ring, E404);
     337                        return;
     338                }
     339
     340                // Correct request, answer with the payload
     341                this->stats.requests++;
     342                answer(ring, OK200);
     343        }
     344
     345        // Handle a partial or full answer sent, results for getting an cqe while in the ANSWERING state
     346        void writedone(struct io_uring * ring, int res) {
     347                // Check errors
     348                if( res < 0 ) {
     349                        int err = -res;
     350                        switch(err) {
     351                                case EPIPE:
     352                                        ::stats.errors.answers.pipes++;
     353                                        break;
     354                                        // Don't fall through the get better stats
     355                                case ECONNRESET:
     356                                        ::stats.errors.answers.reset++;
     357                                        break;
     358                                default:
     359                                        ::stats.errors.answers.other++;
     360                                        std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
     361                                        exit(EXIT_FAILURE);
     362                        }
     363
     364                        this->close(err);
     365                        return;
     366                }
     367
     368                // Update stats
     369                ::stats.completions.writes++;
     370                if(res == 124) ::stats.completions.full_writes++;
     371
     372                // Is this write completed
     373                if( res == to_send ) {
     374                        // Yes, more stats
     375                        stats.answers++;
     376                        if(stats.answers == 2) ::stats.conns.used++;
     377                        // Then read a new request
     378                        request(ring);
     379                        return;
     380                }
     381
     382                // Not a completed read, push the rest
     383                to_send -= res;
     384                iterator += res;
     385                answer(ring);
     386        }
     387public:
     388        // Submit a call to accept and create a new connection object
     389        static void accept(struct io_uring * ring, const struct options_t & opt) {
     390                struct io_uring_sqe * sqe = get_sqe(ring);
     391                io_uring_prep_accept(sqe, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
     392                submit(ring, sqe, new connection());
     393                // std::cout << "Submitted accept: " << req << std::endl;
     394        }
     395
     396        // Handle a new cqe
     397        void handle(struct io_uring * ring, int res, const struct options_t & opt) {
     398                switch(state) {
     399                case ACCEPTING:
     400                        connection::accept(ring, opt);
     401                        newconn(ring, res);
     402                        break;
     403                case REQUESTING:
     404                        newrequest(ring, res);
     405                        break;
     406                case ANSWERING:
     407                        writedone(ring, res);
     408                        break;
     409                }
     410        }
     411};
    294412
    295413//=========================================================
    296 extern "C" {
    297 extern int __io_uring_flush_sq(struct io_uring *ring);
    298 }
    299 
     414// Main loop of the WebServer
     415// Effectively uses one thread_local copy of everything per kernel thread
    300416void * proc_loop(void * arg) {
    301         size_t count = 0;
     417        // Get the thread local argument
    302418        struct options_t & opt = *(struct options_t *)arg;
    303 
    304419        struct io_uring * ring = opt.ring;
    305420
     421        // Track the shutdown using a event_fd
    306422        char endfd_buf[8];
    307423        ring_end(ring, opt.endfd, endfd_buf, 8);
    308424
    309         ring_accept(ring, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
    310 
    311         int reset = 1;
    312         bool done = false;
     425        // Accept our first connection
     426        // May not take effect until io_uring_submit_and_wait
     427        connection::accept(ring, opt);
     428
     429        int reset = 1;       // Counter to print stats once in a while
     430        bool done = false;   // Are we done
     431        size_t sqes = 0;     // Number of sqes we submitted
     432        size_t call = 0;     // Number of submits we made
    313433        while(!done) {
    314                 struct io_uring_cqe *cqe;
    315                 int ret;
    316                 while(-EAGAIN == (ret = io_uring_wait_cqe_nr(ring, &cqe, 0))) {
    317                         ret = io_uring_submit_and_wait(ring, 1);
    318                         if (ret < 0) {
    319                                 fprintf( stderr, "io_uring get error: (%d) %s\n", (int)-ret, strerror(-ret) );
    320                                 exit(EXIT_FAILURE);
    321                         }
    322                         opt.result.subs += ret;
    323                         opt.result.cnts++;
    324                 }
    325 
    326                 if (ret < 0 && -EAGAIN != ret) {
    327                         fprintf( stderr, "io_uring peek error: (%d) %s\n", (int)-ret, strerror(-ret) );
     434                // Submit all the answers we have and wait for responses
     435                int ret = io_uring_submit_and_wait(ring, 1);
     436
     437                // check errors
     438                if (ret < 0) {
     439                        fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );
    328440                        exit(EXIT_FAILURE);
    329441                }
    330442
    331                 auto req = (struct request_t *)cqe->user_data;
    332                 // std::cout << req << " completed with " << cqe->res << std::endl;
    333 
    334                 switch(req->type) {
    335                         case EVENT_END:
     443                // Check how good we are at batching sqes
     444                sqes += ret;
     445                call++;
     446
     447                struct io_uring_cqe *cqe;
     448                unsigned head;
     449                unsigned count = 0;
     450
     451                // go through all cqes
     452                io_uring_for_each_cqe(ring, head, cqe) {
     453                        if (0 == cqe->user_data) {
    336454                                done = true;
    337455                                break;
    338                         case EVENT_ACCEPT:
    339                                 handle_new_conn(ring, cqe->res);
    340                                 free(req);
    341                                 ring_accept(ring, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
    342                                 break;
    343                         case EVENT_REQUEST:
    344                                 handle_request(ring, req, cqe->res);
    345                                 break;
    346                         case EVENT_ANSWER:
    347                                 handle_answer(ring, req, cqe->res);
    348                                 break;
    349                 }
    350 
    351                 io_uring_cqe_seen(ring, cqe);
    352                 reset--;
    353                 if(reset == 0) {
    354                         size_t ltotal = opt.result.subs;
    355                         size_t lcount = opt.result.cnts;
    356 
    357                         std::cout << "Submit average: " << ltotal << "/" << lcount << "(" << (((double)ltotal) / lcount) << ")" << std::endl;
    358                         reset = 100000 + (100000 * (ring->ring_fd % 5));
    359                 }
    360         }
    361 
    362         return (void*)count;
     456                        }
     457
     458                        auto req = (class connection *)cqe->user_data;
     459                        req->handle( ring, cqe->res, opt );
     460
     461                        reset--;
     462                        if(reset == 0) {
     463                                std::cout << "Submit average: " << sqes << "/" << call << "(" << (((double)sqes) / call) << ")" << std::endl;
     464                                reset = 100000 + (100000 * (ring->ring_fd % 5));
     465                        }
     466
     467                        // Keep track of how many cqes we have seen
     468                        count++;
     469                }
     470
     471                // Mark the cqes as seen
     472                io_uring_cq_advance(ring, count);
     473        }
     474
     475        // Tally all the thread local statistics
     476        __atomic_fetch_add( &global_stats.completions.conns, ::stats.completions.conns, __ATOMIC_SEQ_CST );
     477        __atomic_fetch_add( &global_stats.completions.reads, ::stats.completions.reads, __ATOMIC_SEQ_CST );
     478        __atomic_fetch_add( &global_stats.completions.writes, ::stats.completions.writes, __ATOMIC_SEQ_CST );
     479        __atomic_fetch_add( &global_stats.completions.full_writes, ::stats.completions.full_writes, __ATOMIC_SEQ_CST );
     480        __atomic_fetch_add( &global_stats.errors.conns, ::stats.errors.conns, __ATOMIC_SEQ_CST );
     481        __atomic_fetch_add( &global_stats.errors.requests.pipes, ::stats.errors.requests.pipes, __ATOMIC_SEQ_CST );
     482        __atomic_fetch_add( &global_stats.errors.requests.reset, ::stats.errors.requests.reset, __ATOMIC_SEQ_CST );
     483        __atomic_fetch_add( &global_stats.errors.requests.other, ::stats.errors.requests.other, __ATOMIC_SEQ_CST );
     484        __atomic_fetch_add( &global_stats.errors.answers.pipes, ::stats.errors.answers.pipes, __ATOMIC_SEQ_CST );
     485        __atomic_fetch_add( &global_stats.errors.answers.reset, ::stats.errors.answers.reset, __ATOMIC_SEQ_CST );
     486        __atomic_fetch_add( &global_stats.errors.answers.other, ::stats.errors.answers.other, __ATOMIC_SEQ_CST );
     487        __atomic_fetch_add( &global_stats.conns.current, ::stats.conns.current, __ATOMIC_SEQ_CST );
     488        __atomic_fetch_add( &global_stats.conns.max, ::stats.conns.max, __ATOMIC_SEQ_CST );
     489        __atomic_fetch_add( &global_stats.conns.used, ::stats.conns.used, __ATOMIC_SEQ_CST );
     490
     491        return nullptr;
    363492}
    364493
    365494//=========================================================
    366 struct __attribute__((aligned(128))) aligned_ring {
    367         struct io_uring storage;
    368 };
    369 
    370 #include <bit>
    371 
    372 #include <pthread.h>
     495#include <bit> // for ispow2
     496
    373497extern "C" {
    374         #include <signal.h>
    375         #include <sys/eventfd.h>
    376         #include <sys/socket.h>
    377         #include <netinet/in.h>
     498        #include <pthread.h>      // for pthreads
     499        #include <signal.h>       // for signal(SIGPIPE, SIG_IGN);
     500        #include <sys/eventfd.h>  // use for termination
     501        #include <sys/socket.h>   // for sockets in general
     502        #include <netinet/in.h>   // for sockaddr_in, AF_INET
    378503}
    379504
    380505int main(int argc, char * argv[]) {
     506        // Initialize the array of connection-fd associations
     507        for(int i = 0; i < array_max; i++) {
     508                conns[i] = nullptr;
     509        }
     510
     511        // Make sure we ignore all sigpipes
    381512        signal(SIGPIPE, SIG_IGN);
    382513
    383         unsigned nthreads = 1;
    384         unsigned port = 8800;
    385         unsigned entries = 256;
    386         unsigned backlog = 10;
    387         bool attach = false;
    388         bool sqpoll = false;
     514        // Default command line arguments
     515        unsigned nthreads = 1;      // number of kernel threads
     516        unsigned port = 8800;       // which port to listen on
     517        unsigned entries = 256;     // number of entries per ring/kernel thread
     518        unsigned backlog = 262144;  // backlog argument to listen
     519        bool attach = false;        // Whether or not to attach all the rings
     520        bool sqpoll = false;        // Whether or not to use SQ Polling
    389521
    390522        //===================
    391         // Arguments
     523        // Arguments Parsing
    392524        int c;
    393525        while ((c = getopt (argc, argv, "t:p:e:b:aS")) != -1) {
     
    434566        //===================
    435567        // End FD
     568        // Create a single event fd to notify the kernel threads when the server shutsdown
    436569        int efd = eventfd(0, EFD_SEMAPHORE);
    437570        if (efd < 0) {
     
    442575        //===================
    443576        // Open Socket
     577        // Listen on specified port
    444578        std::cout << getpid() << " : Listening on port " << port << std::endl;
    445579        int server_fd = socket(AF_INET, SOCK_STREAM, 0);
     
    457591        address.sin_port = htons( port );
    458592
     593        // In case the port is already in use, don't just return an error
     594        // Linux is very slow at reclaiming port so just retry regularly
    459595        int waited = 0;
    460596        while(true) {
     
    462598                if(ret < 0) {
    463599                        if(errno == EADDRINUSE) {
     600                                // Port is in used let's retry later
    464601                                if(waited == 0) {
    465602                                        std::cerr << "Waiting for port" << std::endl;
    466603                                } else {
     604                                        // To be cure, print how long we have been waiting
    467605                                        std::cerr << "\r" << waited;
    468606                                        std::cerr.flush();
    469607                                }
    470608                                waited ++;
    471                                 usleep( 1000000 );
     609                                usleep( 1000000 ); // Wait and retry
    472610                                continue;
    473611                        }
     612                        // Some other error occured, this is a real error
    474613                        std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl;
    475614                        exit(EXIT_FAILURE);
     
    492631        std::cout << std::endl;
    493632
     633        // Create the desired number of kernel-threads and for each
     634        // create a ring. Create the rings in the main so we can attach them
     635        // Since the rings are all in a dense VLA, aligned them so we don't get false sharing
     636        // it's unlikely but better safe than sorry
     637        struct __attribute__((aligned(128))) aligned_ring {
     638                struct io_uring storage;
     639        };
    494640        aligned_ring thrd_rings[nthreads];
    495641        pthread_t    thrd_hdls[nthreads];
    496642        options_t    thrd_opts[nthreads];
     643        bool no_drops  = true;
    497644        bool fast_poll = true;
    498645        bool nfix_sqpl = true;
    499646        for(unsigned i = 0; i < nthreads; i++) {
    500647                struct io_uring_params p = { };
    501                 if(sqpoll) {
     648
     649                if(sqpoll) { // If sqpoll is on, add the flag
    502650                        p.flags |= IORING_SETUP_SQPOLL;
    503651                        p.sq_thread_idle = 100;
    504652                }
    505653
    506                 if (attach && i != 0) {
     654                if (attach && i != 0) { // If attach is on, add the flag, except for the first ring
    507655                        p.flags |= IORING_SETUP_ATTACH_WQ;
    508656                        p.wq_fd = thrd_rings[0].storage.ring_fd;
    509657                }
     658
     659                // Create the ring
    510660                io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p);
    511661
     662                // Check if some of the note-worthy features are there
     663                if(0 == (p.features & IORING_FEAT_NODROP         )) { no_drops  = false; }
    512664                if(0 == (p.features & IORING_FEAT_FAST_POLL      )) { fast_poll = false; }
    513665                if(0 == (p.features & IORING_FEAT_SQPOLL_NONFIXED)) { nfix_sqpl = false; }
    514666
     667                // Write the socket options we want to the options we pass to the threads
    515668                thrd_opts[i].acpt.sockfd  = server_fd;
    516669                thrd_opts[i].acpt.addr    = (struct sockaddr *)&address;
     
    527680        }
    528681
     682        // Tell the user if the features are present
     683        if( no_drops ) std::cout << "No Drop Present" << std::endl;
    529684        if( fast_poll) std::cout << "Fast Poll Present" << std::endl;
    530685        if(!nfix_sqpl) std::cout << "Non-Fixed SQ Poll not Present" << std::endl;
     
    537692                int ret;
    538693                do {
     694                        // Wait for a Ctrl-D to close the server
    539695                        ret = read(STDIN_FILENO, buffer, 128);
    540696                        if(ret < 0) {
     
    553709
    554710        //===================
     711        // Use eventfd_write to tell the threads we are closing
    555712        (std::cout << "Sending Shutdown to Threads... ").flush();
    556713        ret = eventfd_write(efd, nthreads);
     
    562719
    563720        //===================
     721        // Join all the threads and close the rings
    564722        (std::cout << "Stopping Threads Done... ").flush();
    565723        for(unsigned i = 0; i < nthreads; i++) {
     
    576734
    577735        //===================
     736        // Close the sockets
    578737        (std::cout << "Closing Socket... ").flush();
    579738        ret = shutdown( server_fd, SHUT_RD );
     
    588747                exit(EXIT_FAILURE);
    589748        }
    590         std::cout << "done" << std::endl;
     749        std::cout << "done" << std::endl << std::endl;
     750
     751        // Print stats and exit
     752        std::cout << "Errors: " << global_stats.errors.conns << "c, (" << global_stats.errors.requests.pipes << "p, " << global_stats.errors.requests.reset << "r, " << global_stats.errors.requests.other << "o" << ")r, (" << global_stats.errors.answers.pipes << "p, " << global_stats.errors.answers.reset << "r, " << global_stats.errors.answers.other << "o" << ")a" << std::endl;
     753        std::cout << "Completions: " << global_stats.completions.conns << "c, " << global_stats.completions.reads << "r, " << global_stats.completions.writes << "w" << std::endl;
     754        std::cout << "Full Writes: " << global_stats.completions.full_writes << std::endl;
     755        std::cout << "Max FD: " << max_fd << std::endl;
     756        std::cout << "Successful connections: " << global_stats.conns.used << std::endl;
     757        std::cout << "Accepts on non-zeros: " << global_stats.recycle_errors << std::endl;
     758        std::cout << "Leaked conn objects: " << global_stats.conns.current << std::endl;
    591759}
Note: See TracChangeset for help on using the changeset viewer.