| [efdfdee] | 1 | #include <cstdio>
 | 
|---|
 | 2 | #include <cstdlib>
 | 
|---|
 | 3 | #include <cstring>
 | 
|---|
 | 4 | 
 | 
|---|
 | 5 | #include <iostream>
 | 
|---|
 | 6 | 
 | 
|---|
 | 7 | #include <signal.h>
 | 
|---|
 | 8 | #include <unistd.h>
 | 
|---|
 | 9 | #include <liburing.h>
 | 
|---|
 | 10 | 
 | 
|---|
| [eeb4866] | 11 | // #define NOBATCHING
 | 
|---|
 | 12 | // #define USE_ASYNC
 | 
|---|
 | 13 | 
 | 
|---|
| [761a246] | 14 | // Options passed to each threads
 | 
|---|
| [3acbf89] | 15 | struct __attribute__((aligned(128))) options_t {
 | 
|---|
| [761a246] | 16 |         // Data passed to accept
 | 
|---|
| [efdfdee] | 17 |         struct {
 | 
|---|
 | 18 |                 int sockfd;
 | 
|---|
 | 19 |                 struct sockaddr *addr;
 | 
|---|
 | 20 |                 socklen_t *addrlen;
 | 
|---|
 | 21 |                 int flags;
 | 
|---|
| [c235179] | 22 |                 unsigned cnt;
 | 
|---|
| [efdfdee] | 23 |         } acpt;
 | 
|---|
 | 24 | 
 | 
|---|
| [761a246] | 25 |         // Termination notification
 | 
|---|
| [efdfdee] | 26 |         int endfd;
 | 
|---|
| [761a246] | 27 | 
 | 
|---|
 | 28 |         // The ring to use for io
 | 
|---|
| [f3e87af] | 29 |         struct io_uring * ring;
 | 
|---|
| [761a246] | 30 | };
 | 
|---|
 | 31 | 
 | 
|---|
 | 32 | //=========================================================
 | 
|---|
 | 33 | // General statistics
 | 
|---|
 | 34 | struct __attribute__((aligned(128))) stats_block_t {
 | 
|---|
 | 35 |         struct {
 | 
|---|
 | 36 |                 volatile size_t conns = 0;
 | 
|---|
 | 37 |                 volatile size_t reads = 0;
 | 
|---|
 | 38 |                 volatile size_t writes = 0;
 | 
|---|
 | 39 |                 volatile size_t full_writes = 0;
 | 
|---|
 | 40 |         } completions;
 | 
|---|
| [3acbf89] | 41 | 
 | 
|---|
 | 42 |         struct {
 | 
|---|
| [761a246] | 43 |                 volatile size_t conns = 0;
 | 
|---|
 | 44 |                 struct {
 | 
|---|
 | 45 |                         volatile size_t pipes = 0;
 | 
|---|
 | 46 |                         volatile size_t reset = 0;
 | 
|---|
 | 47 |                         volatile size_t other = 0;
 | 
|---|
 | 48 |                 } requests;
 | 
|---|
 | 49 | 
 | 
|---|
 | 50 |                 struct {
 | 
|---|
 | 51 |                         volatile size_t pipes = 0;
 | 
|---|
 | 52 |                         volatile size_t reset = 0;
 | 
|---|
 | 53 |                         volatile size_t other = 0;
 | 
|---|
 | 54 |                 } answers;
 | 
|---|
 | 55 |         } errors;
 | 
|---|
 | 56 | 
 | 
|---|
 | 57 |         struct {
 | 
|---|
 | 58 |                 volatile size_t current = 0;
 | 
|---|
 | 59 |                 volatile size_t max = 0;
 | 
|---|
 | 60 |                 volatile size_t used = 0;
 | 
|---|
 | 61 |         } conns;
 | 
|---|
 | 62 | 
 | 
|---|
 | 63 |         volatile size_t recycle_errors = 0;
 | 
|---|
| [efdfdee] | 64 | };
 | 
|---|
 | 65 | 
 | 
|---|
| [761a246] | 66 | // Each thread gets its own block of stats
 | 
|---|
 | 67 | // and there is a global block for tallying at the end
 | 
|---|
 | 68 | thread_local stats_block_t stats;
 | 
|---|
 | 69 | stats_block_t global_stats;
 | 
|---|
 | 70 | 
 | 
|---|
| [b664af2] | 71 | thread_local struct __attribute__((aligned(128))) {
 | 
|---|
 | 72 |         size_t to_submit = 0;
 | 
|---|
 | 73 | } local;
 | 
|---|
 | 74 | 
 | 
|---|
| [761a246] | 75 | // Get an array of current connections
 | 
|---|
 | 76 | // This is just for debugging, to make sure
 | 
|---|
 | 77 | // no two state-machines get the same fd
 | 
|---|
 | 78 | const size_t array_max = 25000;
 | 
|---|
 | 79 | class connection * volatile conns[array_max] = { 0 };
 | 
|---|
 | 80 | 
 | 
|---|
 | 81 | // Max fd we've seen, keep track so it's convenient to adjust the array size after
 | 
|---|
 | 82 | volatile int max_fd = 0;
 | 
|---|
| [c05c58f] | 83 | 
 | 
|---|
| [efdfdee] | 84 | //=========================================================
 | 
|---|
| [761a246] | 85 | // Some small wrappers for ring operations used outside the connection state machine
 | 
|---|
 | 86 | // get sqe + error handling
 | 
|---|
| [3acbf89] | 87 | static struct io_uring_sqe * get_sqe(struct io_uring * ring) {
 | 
|---|
| [efdfdee] | 88 |         struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
 | 
|---|
| [3acbf89] | 89 |         if(!sqe) {
 | 
|---|
 | 90 |                 std::cerr << "Insufficient entries in ring" << std::endl;
 | 
|---|
 | 91 |                 exit(EXIT_FAILURE);
 | 
|---|
 | 92 |         }
 | 
|---|
 | 93 |         return sqe;
 | 
|---|
 | 94 | }
 | 
|---|
 | 95 | 
 | 
|---|
| [761a246] | 96 | // read of the event fd is not done by a connection
 | 
|---|
 | 97 | // use nullptr as the user data
 | 
|---|
| [3acbf89] | 98 | static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) {
 | 
|---|
 | 99 |         struct io_uring_sqe * sqe = get_sqe(ring);
 | 
|---|
| [efdfdee] | 100 |         io_uring_prep_read(sqe, fd, buffer, len, 0);
 | 
|---|
| [761a246] | 101 |         io_uring_sqe_set_data(sqe, nullptr);
 | 
|---|
 | 102 |         io_uring_submit(ring);
 | 
|---|
| [efdfdee] | 103 | }
 | 
|---|
 | 104 | 
 | 
|---|
 | 105 | //=========================================================
 | 
|---|
| [761a246] | 106 | // All answers are fixed and determined by the return code
 | 
|---|
| [efdfdee] | 107 | enum HttpCode {
 | 
|---|
 | 108 |         OK200 = 0,
 | 
|---|
 | 109 |         E400,
 | 
|---|
 | 110 |         E404,
 | 
|---|
 | 111 |         E405,
 | 
|---|
 | 112 |         E408,
 | 
|---|
 | 113 |         E413,
 | 
|---|
 | 114 |         E414,
 | 
|---|
 | 115 |         KNOWN_CODES
 | 
|---|
 | 116 | };
 | 
|---|
 | 117 | 
 | 
|---|
| [761a246] | 118 | // Get a fix reply based on the return code
 | 
|---|
| [efdfdee] | 119 | const char * http_msgs[] = {
 | 
|---|
| [1db1454] | 120 |         "HTTP/1.1 200 OK\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 15\r\nConnection: keep-alive\r\n\r\nHello, World!\r\n",
 | 
|---|
 | 121 |         "HTTP/1.1 400 Bad Request\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
 | 
|---|
 | 122 |         "HTTP/1.1 404 Not Found\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
 | 
|---|
 | 123 |         "HTTP/1.1 405 Method Not \r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
 | 
|---|
 | 124 |         "HTTP/1.1 408 Request Timeout\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
 | 
|---|
 | 125 |         "HTTP/1.1 413 Payload Too Large\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
 | 
|---|
 | 126 |         "HTTP/1.1 414 URI Too Long\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
 | 
|---|
| [efdfdee] | 127 | };
 | 
|---|
| [761a246] | 128 | static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0])) );
 | 
|---|
 | 129 | 
 | 
|---|
 | 130 | // Pre-compute the length of these replys
 | 
|---|
 | 131 | const size_t http_lens[] = {
 | 
|---|
 | 132 |         strlen(http_msgs[0]),
 | 
|---|
 | 133 |         strlen(http_msgs[1]),
 | 
|---|
 | 134 |         strlen(http_msgs[2]),
 | 
|---|
 | 135 |         strlen(http_msgs[3]),
 | 
|---|
 | 136 |         strlen(http_msgs[4]),
 | 
|---|
 | 137 |         strlen(http_msgs[5]),
 | 
|---|
 | 138 |         strlen(http_msgs[6]),
 | 
|---|
 | 139 | };
 | 
|---|
 | 140 | static_assert( KNOWN_CODES == (sizeof(http_lens) / sizeof(http_lens[0])) );
 | 
|---|
| [efdfdee] | 141 | 
 | 
|---|
| [761a246] | 142 | //=========================================================
 | 
|---|
 | 143 | // Finate state machine responsible for handling each connection
 | 
|---|
 | 144 | class __attribute__((aligned(128))) connection {
 | 
|---|
 | 145 | private:
 | 
|---|
 | 146 |         // The state of the machine
 | 
|---|
 | 147 |         enum {
 | 
|---|
 | 148 |                 ACCEPTING,  // Accept sent waiting for connection
 | 
|---|
 | 149 |                 REQUESTING, // Waiting for new request
 | 
|---|
 | 150 |                 ANSWERING,  // Either request received submitting answer or short answer sent, need to submit rest
 | 
|---|
 | 151 |         } state;
 | 
|---|
 | 152 | 
 | 
|---|
 | 153 |         // The file descriptor of the connection
 | 
|---|
 | 154 |         int fd;
 | 
|---|
| [efdfdee] | 155 | 
 | 
|---|
| [761a246] | 156 |         // request data
 | 
|---|
 | 157 |         static const size_t buffer_size = 1024; // Size of the read buffer
 | 
|---|
 | 158 |         const char * buffer;                      // Buffer into which requests are read
 | 
|---|
| [efdfdee] | 159 | 
 | 
|---|
| [761a246] | 160 |         // send data
 | 
|---|
 | 161 |         size_t to_send;         // Data left to send
 | 
|---|
 | 162 |         const char * iterator;  // Pointer to rest of the message to send
 | 
|---|
| [efdfdee] | 163 | 
 | 
|---|
| [761a246] | 164 |         // stats
 | 
|---|
 | 165 |         // how many requests/answers were complete, that is, a valid cqe was obtained
 | 
|---|
 | 166 |         struct {
 | 
|---|
 | 167 |                 size_t requests = 0;
 | 
|---|
 | 168 |                 size_t answers = 0;
 | 
|---|
 | 169 |         } stats;
 | 
|---|
 | 170 | 
 | 
|---|
 | 171 | private:
 | 
|---|
 | 172 |         connection()
 | 
|---|
 | 173 |                 : state(ACCEPTING)
 | 
|---|
 | 174 |                 , fd(0)
 | 
|---|
 | 175 |                 , buffer( new char[buffer_size])
 | 
|---|
 | 176 |                 , iterator(nullptr)
 | 
|---|
| [eeb4866] | 177 |         {}
 | 
|---|
| [efdfdee] | 178 | 
 | 
|---|
| [761a246] | 179 |         ~connection() {
 | 
|---|
 | 180 |                 delete [] buffer;
 | 
|---|
 | 181 |                 ::stats.conns.current--;
 | 
|---|
 | 182 |         }
 | 
|---|
| [efdfdee] | 183 | 
 | 
|---|
| [761a246] | 184 |         // Close the current connection
 | 
|---|
 | 185 |         void close(int err) {
 | 
|---|
 | 186 |                 // std::cout << "(" << this->stats.requests << "," << this->stats.answers << ", e" << err << ") ";
 | 
|---|
 | 187 |                 conns[fd] = nullptr;
 | 
|---|
| [efdfdee] | 188 | 
 | 
|---|
| [761a246] | 189 |                 if(fd != 0) {
 | 
|---|
 | 190 |                         ::close(fd);
 | 
|---|
 | 191 |                 }
 | 
|---|
 | 192 |                 delete this;
 | 
|---|
 | 193 |         }
 | 
|---|
| [efdfdee] | 194 | 
 | 
|---|
| [761a246] | 195 |         //--------------------------------------------------
 | 
|---|
 | 196 |         // Wrappers for submit so we can tweak it more easily
 | 
|---|
 | 197 |         static void submit(struct io_uring * ring, struct io_uring_sqe * sqe, connection * conn) {
 | 
|---|
 | 198 |                 (void)ring;
 | 
|---|
| [b664af2] | 199 |                 local.to_submit++;
 | 
|---|
| [eeb4866] | 200 |                 #ifdef USE_ASYNC
 | 
|---|
 | 201 |                         io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
 | 
|---|
 | 202 |                 #endif
 | 
|---|
| [761a246] | 203 |                 io_uring_sqe_set_data(sqe, conn);
 | 
|---|
| [eeb4866] | 204 |                 #ifdef NOBATCHING
 | 
|---|
 | 205 |                         io_uring_submit(ring);
 | 
|---|
 | 206 |                 #endif
 | 
|---|
| [761a246] | 207 |         }
 | 
|---|
| [3acbf89] | 208 | 
 | 
|---|
| [761a246] | 209 |         void submit(struct io_uring * ring, struct io_uring_sqe * sqe) {
 | 
|---|
 | 210 |                 submit(ring, sqe, this);
 | 
|---|
 | 211 |         }
 | 
|---|
 | 212 | 
 | 
|---|
 | 213 |         //--------------------------------------------------
 | 
|---|
 | 214 |         // get a new request from the client
 | 
|---|
 | 215 |         void request(struct io_uring * ring) {
 | 
|---|
 | 216 |                 state = REQUESTING;
 | 
|---|
 | 217 |                 struct io_uring_sqe * sqe = get_sqe(ring);
 | 
|---|
| [eeb4866] | 218 |                 io_uring_prep_recv(sqe, fd, (void*)buffer, buffer_size, 0);
 | 
|---|
| [761a246] | 219 |                 submit(ring, sqe);
 | 
|---|
 | 220 |         }
 | 
|---|
| [3acbf89] | 221 | 
 | 
|---|
| [761a246] | 222 |         //--------------------------------------------------
 | 
|---|
 | 223 |         // Send a new answer based on a return code
 | 
|---|
 | 224 |         void answer(struct io_uring * ring, HttpCode code) {
 | 
|---|
 | 225 |                 iterator = http_msgs[code];
 | 
|---|
 | 226 |                 to_send  = http_lens[code];
 | 
|---|
 | 227 |                 if(to_send != 124) {
 | 
|---|
 | 228 |                         std::cerr << "Answer has weird size: " << to_send << " (" << (int)code << ")" << std::endl;
 | 
|---|
 | 229 |                 }
 | 
|---|
 | 230 |                 answer(ring);
 | 
|---|
 | 231 |         }
 | 
|---|
| [3acbf89] | 232 | 
 | 
|---|
| [761a246] | 233 |         // send a new answer to the client
 | 
|---|
 | 234 |         // Reused for incomplete writes
 | 
|---|
 | 235 |         void answer(struct io_uring * ring) {
 | 
|---|
 | 236 |                 state = ANSWERING;
 | 
|---|
 | 237 |                 struct io_uring_sqe * sqe = get_sqe(ring);
 | 
|---|
| [eeb4866] | 238 |                 io_uring_prep_send(sqe, fd, iterator, to_send, 0);
 | 
|---|
| [761a246] | 239 |                 submit(ring, sqe);
 | 
|---|
 | 240 |         }
 | 
|---|
| [3acbf89] | 241 | 
 | 
|---|
| [761a246] | 242 |         //--------------------------------------------------
 | 
|---|
 | 243 |         // Handle a new connection, results for getting an cqe while in the ACCEPTING state
 | 
|---|
 | 244 |         void newconn(struct io_uring * ring, int ret) {
 | 
|---|
 | 245 |                 // Check errors
 | 
|---|
 | 246 |                 if( ret < 0 ) {
 | 
|---|
 | 247 |                         int err = -ret;
 | 
|---|
 | 248 |                         if( err == ECONNABORTED ) {
 | 
|---|
 | 249 |                                 ::stats.errors.conns++;
 | 
|---|
 | 250 |                                 this->close(err);
 | 
|---|
 | 251 |                                 return;
 | 
|---|
 | 252 |                         }
 | 
|---|
 | 253 |                         std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
 | 254 |                         exit(EXIT_FAILURE);
 | 
|---|
 | 255 |                 }
 | 
|---|
| [3acbf89] | 256 | 
 | 
|---|
| [761a246] | 257 |                 // Count the connections
 | 
|---|
 | 258 |                 ::stats.completions.conns++;
 | 
|---|
| [eeb4866] | 259 |                 ::stats.conns.current++;
 | 
|---|
 | 260 |                 if(::stats.conns.current > ::stats.conns.max) {
 | 
|---|
 | 261 |                         ::stats.conns.max = ::stats.conns.current;
 | 
|---|
 | 262 |                 }
 | 
|---|
| [3acbf89] | 263 | 
 | 
|---|
| [761a246] | 264 |                 // Read on the data
 | 
|---|
 | 265 |                 fd = ret;
 | 
|---|
 | 266 |                 request(ring);
 | 
|---|
| [efdfdee] | 267 | 
 | 
|---|
| [761a246] | 268 |                 // check the max fd so we know if we exceeded the array
 | 
|---|
 | 269 |                 for(;;) {
 | 
|---|
 | 270 |                         int expected = max_fd;
 | 
|---|
 | 271 |                         if(expected >= fd) return;
 | 
|---|
 | 272 |                         if( __atomic_compare_exchange_n(&max_fd, &expected, fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) return;
 | 
|---|
 | 273 |                 }
 | 
|---|
| [efdfdee] | 274 | 
 | 
|---|
| [761a246] | 275 |                 // check if we have enough space to fit inside the array
 | 
|---|
 | 276 |                 if(fd >= array_max) {
 | 
|---|
 | 277 |                         std::cerr << "accept error: fd " << fd << " is too high" << std::endl;
 | 
|---|
 | 278 |                         return;
 | 
|---|
 | 279 |                 }
 | 
|---|
| [efdfdee] | 280 | 
 | 
|---|
| [761a246] | 281 |                 // Put our connection into the global array
 | 
|---|
 | 282 |                 // No one else should be using it so if they are that's a bug
 | 
|---|
 | 283 |                 auto exist = __atomic_exchange_n( &conns[fd], this, __ATOMIC_SEQ_CST);
 | 
|---|
 | 284 |                 if( exist ) {
 | 
|---|
 | 285 |                         size_t first = __atomic_fetch_add(&global_stats.recycle_errors, 1, __ATOMIC_SEQ_CST);
 | 
|---|
 | 286 |                         if( first == 0 ) {
 | 
|---|
 | 287 |                                 std::cerr << "First: accept has existing connection " << std::endl;
 | 
|---|
 | 288 |                         }
 | 
|---|
 | 289 |                 }
 | 
|---|
| [efdfdee] | 290 |         }
 | 
|---|
 | 291 | 
 | 
|---|
| [761a246] | 292 |         // Handle a new request, results for getting an cqe while in the REQUESTING state
 | 
|---|
| [9715567] | 293 |         void newrequest(struct io_uring * ring, int res) {
 | 
|---|
| [761a246] | 294 |                 // Check errors
 | 
|---|
| [9715567] | 295 |                 if( res < 0 ) {
 | 
|---|
 | 296 |                         int err = -res;
 | 
|---|
| [761a246] | 297 |                         switch(err) {
 | 
|---|
 | 298 |                                 case EPIPE:
 | 
|---|
 | 299 |                                         ::stats.errors.requests.pipes++;
 | 
|---|
 | 300 |                                         break;
 | 
|---|
 | 301 |                                         // Don't fall through the get better stats
 | 
|---|
 | 302 |                                 case ECONNRESET:
 | 
|---|
 | 303 |                                         ::stats.errors.requests.reset++;
 | 
|---|
 | 304 |                                         break;
 | 
|---|
 | 305 |                                 default:
 | 
|---|
 | 306 |                                         ::stats.errors.requests.other++;
 | 
|---|
 | 307 |                                         std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl;
 | 
|---|
 | 308 |                                         exit(EXIT_FAILURE);
 | 
|---|
 | 309 |                         }
 | 
|---|
| [efdfdee] | 310 | 
 | 
|---|
| [761a246] | 311 |                         // Connection failed, close it
 | 
|---|
 | 312 |                         this->close(err);
 | 
|---|
 | 313 |                         return;
 | 
|---|
| [efdfdee] | 314 |                 }
 | 
|---|
 | 315 | 
 | 
|---|
| [761a246] | 316 |                 // Update stats
 | 
|---|
 | 317 |                 ::stats.completions.reads++;
 | 
|---|
| [efdfdee] | 318 | 
 | 
|---|
| [761a246] | 319 |                 // Is this an EOF
 | 
|---|
| [9715567] | 320 |                 if(res == 0) {
 | 
|---|
| [761a246] | 321 |                         // Yes, close the connection
 | 
|---|
 | 322 |                         this->close(0);
 | 
|---|
 | 323 |                         return;
 | 
|---|
 | 324 |                 }
 | 
|---|
| [efdfdee] | 325 | 
 | 
|---|
| [761a246] | 326 |                 // Find the end of the request header
 | 
|---|
 | 327 |                 const char * it = buffer;
 | 
|---|
 | 328 |                 if( !strstr( it, "\r\n\r\n" ) ) {
 | 
|---|
 | 329 |                         // This state machine doesn't support incomplete reads
 | 
|---|
 | 330 |                         // Print them to output so it's clear there is an issue
 | 
|---|
 | 331 |                         std::cout << "Incomplete request" << std::endl;
 | 
|---|
 | 332 |                         this->close(EBADR);
 | 
|---|
 | 333 |                         return;
 | 
|---|
 | 334 |                 }
 | 
|---|
 | 335 | 
 | 
|---|
 | 336 |                 // Find the method to use
 | 
|---|
 | 337 |                 it = buffer;
 | 
|---|
 | 338 |                 int ret = memcmp(it, "GET ", 4);
 | 
|---|
 | 339 |                 if( ret != 0 ) {
 | 
|---|
 | 340 |                         // We only support get, answer with an error
 | 
|---|
 | 341 |                         answer(ring, E400);
 | 
|---|
 | 342 |                         return;
 | 
|---|
 | 343 |                 }
 | 
|---|
 | 344 | 
 | 
|---|
 | 345 |                 // Find the target
 | 
|---|
 | 346 |                 it += 4;
 | 
|---|
 | 347 |                 ret = memcmp(it, "/plaintext", 10);
 | 
|---|
 | 348 |                 if( ret != 0 ) {
 | 
|---|
 | 349 |                         // We only support /plaintext, answer with an error
 | 
|---|
 | 350 |                         answer(ring, E404);
 | 
|---|
 | 351 |                         return;
 | 
|---|
 | 352 |                 }
 | 
|---|
| [efdfdee] | 353 | 
 | 
|---|
| [761a246] | 354 |                 // Correct request, answer with the payload
 | 
|---|
 | 355 |                 this->stats.requests++;
 | 
|---|
 | 356 |                 answer(ring, OK200);
 | 
|---|
| [efdfdee] | 357 |         }
 | 
|---|
 | 358 | 
 | 
|---|
| [761a246] | 359 |         // Handle a partial or full answer sent, results for getting an cqe while in the ANSWERING state
 | 
|---|
 | 360 |         void writedone(struct io_uring * ring, int res) {
 | 
|---|
 | 361 |                 // Check errors
 | 
|---|
 | 362 |                 if( res < 0 ) {
 | 
|---|
 | 363 |                         int err = -res;
 | 
|---|
 | 364 |                         switch(err) {
 | 
|---|
 | 365 |                                 case EPIPE:
 | 
|---|
 | 366 |                                         ::stats.errors.answers.pipes++;
 | 
|---|
 | 367 |                                         break;
 | 
|---|
 | 368 |                                         // Don't fall through the get better stats
 | 
|---|
 | 369 |                                 case ECONNRESET:
 | 
|---|
 | 370 |                                         ::stats.errors.answers.reset++;
 | 
|---|
 | 371 |                                         break;
 | 
|---|
 | 372 |                                 default:
 | 
|---|
 | 373 |                                         ::stats.errors.answers.other++;
 | 
|---|
 | 374 |                                         std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
 | 
|---|
 | 375 |                                         exit(EXIT_FAILURE);
 | 
|---|
 | 376 |                         }
 | 
|---|
| [efdfdee] | 377 | 
 | 
|---|
| [761a246] | 378 |                         this->close(err);
 | 
|---|
 | 379 |                         return;
 | 
|---|
 | 380 |                 }
 | 
|---|
| [efdfdee] | 381 | 
 | 
|---|
| [761a246] | 382 |                 // Update stats
 | 
|---|
 | 383 |                 ::stats.completions.writes++;
 | 
|---|
 | 384 |                 if(res == 124) ::stats.completions.full_writes++;
 | 
|---|
 | 385 | 
 | 
|---|
 | 386 |                 // Is this write completed
 | 
|---|
 | 387 |                 if( res == to_send ) {
 | 
|---|
 | 388 |                         // Yes, more stats
 | 
|---|
| [eeb4866] | 389 |                         this->stats.answers++;
 | 
|---|
 | 390 |                         if(this->stats.answers == 1) ::stats.conns.used++;
 | 
|---|
| [761a246] | 391 |                         // Then read a new request
 | 
|---|
 | 392 |                         request(ring);
 | 
|---|
 | 393 |                         return;
 | 
|---|
| [efdfdee] | 394 |                 }
 | 
|---|
 | 395 | 
 | 
|---|
| [761a246] | 396 |                 // Not a completed read, push the rest
 | 
|---|
 | 397 |                 to_send -= res;
 | 
|---|
 | 398 |                 iterator += res;
 | 
|---|
 | 399 |                 answer(ring);
 | 
|---|
 | 400 |         }
 | 
|---|
 | 401 | public:
 | 
|---|
 | 402 |         // Submit a call to accept and create a new connection object
 | 
|---|
 | 403 |         static void accept(struct io_uring * ring, const struct options_t & opt) {
 | 
|---|
 | 404 |                 struct io_uring_sqe * sqe = get_sqe(ring);
 | 
|---|
 | 405 |                 io_uring_prep_accept(sqe, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
 | 
|---|
 | 406 |                 submit(ring, sqe, new connection());
 | 
|---|
 | 407 |                 // std::cout << "Submitted accept: " << req << std::endl;
 | 
|---|
| [efdfdee] | 408 |         }
 | 
|---|
 | 409 | 
 | 
|---|
| [761a246] | 410 |         // Handle a new cqe
 | 
|---|
 | 411 |         void handle(struct io_uring * ring, int res, const struct options_t & opt) {
 | 
|---|
 | 412 |                 switch(state) {
 | 
|---|
 | 413 |                 case ACCEPTING:
 | 
|---|
| [b664af2] | 414 |                         // connection::accept(ring, opt);
 | 
|---|
| [761a246] | 415 |                         newconn(ring, res);
 | 
|---|
 | 416 |                         break;
 | 
|---|
 | 417 |                 case REQUESTING:
 | 
|---|
 | 418 |                         newrequest(ring, res);
 | 
|---|
 | 419 |                         break;
 | 
|---|
 | 420 |                 case ANSWERING:
 | 
|---|
 | 421 |                         writedone(ring, res);
 | 
|---|
 | 422 |                         break;
 | 
|---|
 | 423 |                 }
 | 
|---|
 | 424 |         }
 | 
|---|
 | 425 | };
 | 
|---|
| [efdfdee] | 426 | 
 | 
|---|
 | 427 | //=========================================================
 | 
|---|
| [b664af2] | 428 | extern "C" {
 | 
|---|
 | 429 |         #include <sys/eventfd.h>  // use for termination
 | 
|---|
 | 430 | }
 | 
|---|
 | 431 | 
 | 
|---|
| [761a246] | 432 | // Main loop of the WebServer
 | 
|---|
 | 433 | // Effectively uses one thread_local copy of everything per kernel thread
 | 
|---|
| [efdfdee] | 434 | void * proc_loop(void * arg) {
 | 
|---|
| [761a246] | 435 |         // Get the thread local argument
 | 
|---|
| [3acbf89] | 436 |         struct options_t & opt = *(struct options_t *)arg;
 | 
|---|
| [f3e87af] | 437 |         struct io_uring * ring = opt.ring;
 | 
|---|
| [efdfdee] | 438 | 
 | 
|---|
| [b664af2] | 439 |         int blockfd = eventfd(0, 0);
 | 
|---|
 | 440 |         if (blockfd < 0) {
 | 
|---|
 | 441 |                 fprintf( stderr, "eventfd create error: (%d) %s\n", (int)errno, strerror(errno) );
 | 
|---|
 | 442 |                 exit(EXIT_FAILURE);
 | 
|---|
 | 443 |         }
 | 
|---|
 | 444 | 
 | 
|---|
 | 445 |         int ret = io_uring_register_eventfd(ring, blockfd);
 | 
|---|
 | 446 |         if (ret < 0) {
 | 
|---|
 | 447 |                 fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );
 | 
|---|
 | 448 |                 exit(EXIT_FAILURE);
 | 
|---|
 | 449 |         }
 | 
|---|
 | 450 | 
 | 
|---|
| [761a246] | 451 |         // Track the shutdown using a event_fd
 | 
|---|
| [efdfdee] | 452 |         char endfd_buf[8];
 | 
|---|
 | 453 |         ring_end(ring, opt.endfd, endfd_buf, 8);
 | 
|---|
 | 454 | 
 | 
|---|
| [761a246] | 455 |         // Accept our first connection
 | 
|---|
 | 456 |         // May not take effect until io_uring_submit_and_wait
 | 
|---|
| [c235179] | 457 |         for(unsigned i = 0; i < opt.acpt.cnt; i++) {
 | 
|---|
 | 458 |                 connection::accept(ring, opt);
 | 
|---|
 | 459 |         }
 | 
|---|
| [efdfdee] | 460 | 
 | 
|---|
| [761a246] | 461 |         int reset = 1;       // Counter to print stats once in a while
 | 
|---|
 | 462 |         bool done = false;   // Are we done
 | 
|---|
 | 463 |         size_t sqes = 0;     // Number of sqes we submitted
 | 
|---|
 | 464 |         size_t call = 0;     // Number of submits we made
 | 
|---|
| [efdfdee] | 465 |         while(!done) {
 | 
|---|
| [761a246] | 466 |                 // Submit all the answers we have and wait for responses
 | 
|---|
| [a80db97] | 467 |                 int ret = io_uring_submit(ring);
 | 
|---|
| [b664af2] | 468 |                 local.to_submit = 0;
 | 
|---|
| [3acbf89] | 469 | 
 | 
|---|
| [761a246] | 470 |                 // check errors
 | 
|---|
 | 471 |                 if (ret < 0) {
 | 
|---|
 | 472 |                         fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );
 | 
|---|
| [efdfdee] | 473 |                         exit(EXIT_FAILURE);
 | 
|---|
 | 474 |                 }
 | 
|---|
 | 475 | 
 | 
|---|
| [761a246] | 476 |                 // Check how good we are at batching sqes
 | 
|---|
 | 477 |                 sqes += ret;
 | 
|---|
 | 478 |                 call++;
 | 
|---|
 | 479 | 
 | 
|---|
| [b664af2] | 480 | 
 | 
|---|
| [a80db97] | 481 |                 eventfd_t val;
 | 
|---|
 | 482 |                 ret = eventfd_read(blockfd, &val);
 | 
|---|
| [b664af2] | 483 | 
 | 
|---|
| [a80db97] | 484 |                 // check errors
 | 
|---|
 | 485 |                 if (ret < 0) {
 | 
|---|
 | 486 |                         fprintf( stderr, "eventfd read error: (%d) %s\n", (int)errno, strerror(errno) );
 | 
|---|
 | 487 |                         exit(EXIT_FAILURE);
 | 
|---|
 | 488 |                 }
 | 
|---|
| [b664af2] | 489 | 
 | 
|---|
| [761a246] | 490 |                 struct io_uring_cqe *cqe;
 | 
|---|
 | 491 |                 unsigned head;
 | 
|---|
 | 492 |                 unsigned count = 0;
 | 
|---|
| [efdfdee] | 493 | 
 | 
|---|
| [761a246] | 494 |                 // go through all cqes
 | 
|---|
 | 495 |                 io_uring_for_each_cqe(ring, head, cqe) {
 | 
|---|
 | 496 |                         if (0 == cqe->user_data) {
 | 
|---|
| [efdfdee] | 497 |                                 done = true;
 | 
|---|
 | 498 |                                 break;
 | 
|---|
| [761a246] | 499 |                         }
 | 
|---|
| [efdfdee] | 500 | 
 | 
|---|
| [b664af2] | 501 |                         if(local.to_submit > 30) break;
 | 
|---|
 | 502 | 
 | 
|---|
| [761a246] | 503 |                         auto req = (class connection *)cqe->user_data;
 | 
|---|
 | 504 |                         req->handle( ring, cqe->res, opt );
 | 
|---|
| [c05c58f] | 505 | 
 | 
|---|
| [eeb4866] | 506 |                         // Every now and then, print some stats
 | 
|---|
| [761a246] | 507 |                         reset--;
 | 
|---|
 | 508 |                         if(reset == 0) {
 | 
|---|
 | 509 |                                 std::cout << "Submit average: " << sqes << "/" << call << "(" << (((double)sqes) / call) << ")" << std::endl;
 | 
|---|
| [eeb4866] | 510 |                                 // Reset to some random number of completions
 | 
|---|
 | 511 |                                 // use the ring_fd in the number of threads don't all print at once
 | 
|---|
| [761a246] | 512 |                                 reset = 100000 + (100000 * (ring->ring_fd % 5));
 | 
|---|
 | 513 |                         }
 | 
|---|
 | 514 | 
 | 
|---|
 | 515 |                         // Keep track of how many cqes we have seen
 | 
|---|
 | 516 |                         count++;
 | 
|---|
| [c05c58f] | 517 |                 }
 | 
|---|
| [761a246] | 518 | 
 | 
|---|
 | 519 |                 // Mark the cqes as seen
 | 
|---|
 | 520 |                 io_uring_cq_advance(ring, count);
 | 
|---|
| [efdfdee] | 521 |         }
 | 
|---|
 | 522 | 
 | 
|---|
| [761a246] | 523 |         // Tally all the thread local statistics
 | 
|---|
 | 524 |         __atomic_fetch_add( &global_stats.completions.conns, ::stats.completions.conns, __ATOMIC_SEQ_CST );
 | 
|---|
 | 525 |         __atomic_fetch_add( &global_stats.completions.reads, ::stats.completions.reads, __ATOMIC_SEQ_CST );
 | 
|---|
 | 526 |         __atomic_fetch_add( &global_stats.completions.writes, ::stats.completions.writes, __ATOMIC_SEQ_CST );
 | 
|---|
 | 527 |         __atomic_fetch_add( &global_stats.completions.full_writes, ::stats.completions.full_writes, __ATOMIC_SEQ_CST );
 | 
|---|
 | 528 |         __atomic_fetch_add( &global_stats.errors.conns, ::stats.errors.conns, __ATOMIC_SEQ_CST );
 | 
|---|
 | 529 |         __atomic_fetch_add( &global_stats.errors.requests.pipes, ::stats.errors.requests.pipes, __ATOMIC_SEQ_CST );
 | 
|---|
 | 530 |         __atomic_fetch_add( &global_stats.errors.requests.reset, ::stats.errors.requests.reset, __ATOMIC_SEQ_CST );
 | 
|---|
 | 531 |         __atomic_fetch_add( &global_stats.errors.requests.other, ::stats.errors.requests.other, __ATOMIC_SEQ_CST );
 | 
|---|
 | 532 |         __atomic_fetch_add( &global_stats.errors.answers.pipes, ::stats.errors.answers.pipes, __ATOMIC_SEQ_CST );
 | 
|---|
 | 533 |         __atomic_fetch_add( &global_stats.errors.answers.reset, ::stats.errors.answers.reset, __ATOMIC_SEQ_CST );
 | 
|---|
 | 534 |         __atomic_fetch_add( &global_stats.errors.answers.other, ::stats.errors.answers.other, __ATOMIC_SEQ_CST );
 | 
|---|
 | 535 |         __atomic_fetch_add( &global_stats.conns.current, ::stats.conns.current, __ATOMIC_SEQ_CST );
 | 
|---|
 | 536 |         __atomic_fetch_add( &global_stats.conns.max, ::stats.conns.max, __ATOMIC_SEQ_CST );
 | 
|---|
 | 537 |         __atomic_fetch_add( &global_stats.conns.used, ::stats.conns.used, __ATOMIC_SEQ_CST );
 | 
|---|
 | 538 | 
 | 
|---|
 | 539 |         return nullptr;
 | 
|---|
| [efdfdee] | 540 | }
 | 
|---|
 | 541 | 
 | 
|---|
 | 542 | //=========================================================
 | 
|---|
| [761a246] | 543 | #include <bit> // for ispow2
 | 
|---|
| [3acbf89] | 544 | 
 | 
|---|
| [efdfdee] | 545 | extern "C" {
 | 
|---|
| [761a246] | 546 |         #include <pthread.h>      // for pthreads
 | 
|---|
 | 547 |         #include <signal.h>       // for signal(SIGPIPE, SIG_IGN);
 | 
|---|
 | 548 |         #include <sys/socket.h>   // for sockets in general
 | 
|---|
 | 549 |         #include <netinet/in.h>   // for sockaddr_in, AF_INET
 | 
|---|
| [efdfdee] | 550 | }
 | 
|---|
 | 551 | 
 | 
|---|
| [3acbf89] | 552 | int main(int argc, char * argv[]) {
 | 
|---|
| [761a246] | 553 |         // Initialize the array of connection-fd associations
 | 
|---|
 | 554 |         for(int i = 0; i < array_max; i++) {
 | 
|---|
 | 555 |                 conns[i] = nullptr;
 | 
|---|
 | 556 |         }
 | 
|---|
 | 557 | 
 | 
|---|
 | 558 |         // Make sure we ignore all sigpipes
 | 
|---|
| [efdfdee] | 559 |         signal(SIGPIPE, SIG_IGN);
 | 
|---|
 | 560 | 
 | 
|---|
| [761a246] | 561 |         // Default command line arguments
 | 
|---|
 | 562 |         unsigned nthreads = 1;      // number of kernel threads
 | 
|---|
 | 563 |         unsigned port = 8800;       // which port to listen on
 | 
|---|
 | 564 |         unsigned entries = 256;     // number of entries per ring/kernel thread
 | 
|---|
 | 565 |         unsigned backlog = 262144;  // backlog argument to listen
 | 
|---|
| [c235179] | 566 |         unsigned preaccept = 1;     // start by accepting X per threads
 | 
|---|
| [761a246] | 567 |         bool attach = false;        // Whether or not to attach all the rings
 | 
|---|
 | 568 |         bool sqpoll = false;        // Whether or not to use SQ Polling
 | 
|---|
| [efdfdee] | 569 | 
 | 
|---|
| [3acbf89] | 570 |         //===================
 | 
|---|
| [761a246] | 571 |         // Arguments Parsing
 | 
|---|
| [3acbf89] | 572 |         int c;
 | 
|---|
| [c235179] | 573 |         while ((c = getopt (argc, argv, "t:p:e:b:c:aS")) != -1) {
 | 
|---|
| [3acbf89] | 574 |                 switch (c)
 | 
|---|
 | 575 |                 {
 | 
|---|
 | 576 |                 case 't':
 | 
|---|
 | 577 |                         nthreads = atoi(optarg);
 | 
|---|
 | 578 |                         break;
 | 
|---|
 | 579 |                 case 'p':
 | 
|---|
 | 580 |                         port = atoi(optarg);
 | 
|---|
 | 581 |                         break;
 | 
|---|
 | 582 |                 case 'e':
 | 
|---|
 | 583 |                         entries = atoi(optarg);
 | 
|---|
 | 584 |                         break;
 | 
|---|
 | 585 |                 case 'b':
 | 
|---|
 | 586 |                         backlog = atoi(optarg);
 | 
|---|
 | 587 |                         break;
 | 
|---|
| [c235179] | 588 |                 case 'c':
 | 
|---|
 | 589 |                         preaccept = atoi(optarg);
 | 
|---|
 | 590 |                         break;
 | 
|---|
| [f3e87af] | 591 |                 case 'a':
 | 
|---|
 | 592 |                         attach = true;
 | 
|---|
 | 593 |                         break;
 | 
|---|
| [c05c58f] | 594 |                 case 'S':
 | 
|---|
 | 595 |                         sqpoll = true;
 | 
|---|
 | 596 |                         break;
 | 
|---|
| [3acbf89] | 597 |                 case '?':
 | 
|---|
 | 598 |                 default:
 | 
|---|
| [c05c58f] | 599 |                         std::cerr << "Usage: -t <threads> -p <port> -e <entries> -b <backlog> -aS" << std::endl;
 | 
|---|
| [3acbf89] | 600 |                         return EXIT_FAILURE;
 | 
|---|
 | 601 |                 }
 | 
|---|
 | 602 |         }
 | 
|---|
 | 603 | 
 | 
|---|
 | 604 |         if( !std::ispow2(entries) ) {
 | 
|---|
 | 605 |                 unsigned v = entries;
 | 
|---|
 | 606 |                 v--;
 | 
|---|
 | 607 |                 v |= v >> 1;
 | 
|---|
 | 608 |                 v |= v >> 2;
 | 
|---|
 | 609 |                 v |= v >> 4;
 | 
|---|
 | 610 |                 v |= v >> 8;
 | 
|---|
 | 611 |                 v |= v >> 16;
 | 
|---|
 | 612 |                 v++;
 | 
|---|
 | 613 |                 std::cerr << "Warning: num_entries not a power of 2 (" << entries << ") raising to " << v << std::endl;
 | 
|---|
 | 614 |                 entries = v;
 | 
|---|
 | 615 |         }
 | 
|---|
 | 616 | 
 | 
|---|
| [efdfdee] | 617 |         //===================
 | 
|---|
 | 618 |         // End FD
 | 
|---|
| [761a246] | 619 |         // Create a single event fd to notify the kernel threads when the server shutsdown
 | 
|---|
| [efdfdee] | 620 |         int efd = eventfd(0, EFD_SEMAPHORE);
 | 
|---|
 | 621 |         if (efd < 0) {
 | 
|---|
 | 622 |                 std::cerr << "eventfd error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
 | 623 |                 exit(EXIT_FAILURE);
 | 
|---|
 | 624 |         }
 | 
|---|
 | 625 | 
 | 
|---|
 | 626 |         //===================
 | 
|---|
 | 627 |         // Open Socket
 | 
|---|
| [761a246] | 628 |         // Listen on specified port
 | 
|---|
| [efdfdee] | 629 |         std::cout << getpid() << " : Listening on port " << port << std::endl;
 | 
|---|
 | 630 |         int server_fd = socket(AF_INET, SOCK_STREAM, 0);
 | 
|---|
 | 631 |         if(server_fd < 0) {
 | 
|---|
 | 632 |                 std::cerr << "socket error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
 | 633 |                 exit(EXIT_FAILURE);
 | 
|---|
 | 634 |         }
 | 
|---|
 | 635 | 
 | 
|---|
 | 636 |         int ret = 0;
 | 
|---|
 | 637 |         struct sockaddr_in address;
 | 
|---|
 | 638 |         int addrlen = sizeof(address);
 | 
|---|
 | 639 |         memset( (char *)&address, '\0', addrlen );
 | 
|---|
 | 640 |         address.sin_family = AF_INET;
 | 
|---|
 | 641 |         address.sin_addr.s_addr = htonl(INADDR_ANY);
 | 
|---|
 | 642 |         address.sin_port = htons( port );
 | 
|---|
 | 643 | 
 | 
|---|
| [761a246] | 644 |         // In case the port is already in use, don't just return an error
 | 
|---|
 | 645 |         // Linux is very slow at reclaiming port so just retry regularly
 | 
|---|
| [efdfdee] | 646 |         int waited = 0;
 | 
|---|
 | 647 |         while(true) {
 | 
|---|
 | 648 |                 ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) );
 | 
|---|
 | 649 |                 if(ret < 0) {
 | 
|---|
 | 650 |                         if(errno == EADDRINUSE) {
 | 
|---|
| [761a246] | 651 |                                 // Port is in used let's retry later
 | 
|---|
| [efdfdee] | 652 |                                 if(waited == 0) {
 | 
|---|
 | 653 |                                         std::cerr << "Waiting for port" << std::endl;
 | 
|---|
 | 654 |                                 } else {
 | 
|---|
| [761a246] | 655 |                                         // To be cure, print how long we have been waiting
 | 
|---|
| [efdfdee] | 656 |                                         std::cerr << "\r" << waited;
 | 
|---|
 | 657 |                                         std::cerr.flush();
 | 
|---|
 | 658 |                                 }
 | 
|---|
 | 659 |                                 waited ++;
 | 
|---|
| [761a246] | 660 |                                 usleep( 1000000 ); // Wait and retry
 | 
|---|
| [efdfdee] | 661 |                                 continue;
 | 
|---|
 | 662 |                         }
 | 
|---|
| [761a246] | 663 |                         // Some other error occured, this is a real error
 | 
|---|
| [efdfdee] | 664 |                         std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
 | 665 |                         exit(EXIT_FAILURE);
 | 
|---|
 | 666 |                 }
 | 
|---|
 | 667 |                 break;
 | 
|---|
 | 668 |         }
 | 
|---|
 | 669 | 
 | 
|---|
 | 670 |         ret = listen( server_fd, backlog );
 | 
|---|
 | 671 |         if(ret < 0) {
 | 
|---|
 | 672 |                 std::cerr << "listen error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
 | 673 |                 exit(EXIT_FAILURE);
 | 
|---|
 | 674 |         }
 | 
|---|
 | 675 | 
 | 
|---|
 | 676 |         //===================
 | 
|---|
 | 677 |         // Run Server Threads
 | 
|---|
| [f3e87af] | 678 |         std::cout << "Starting " << nthreads << " Threads";
 | 
|---|
 | 679 |         if(attach) {
 | 
|---|
 | 680 |                 std::cout << " with attached Rings";
 | 
|---|
 | 681 |         }
 | 
|---|
 | 682 |         std::cout << std::endl;
 | 
|---|
 | 683 | 
 | 
|---|
| [761a246] | 684 |         // Create the desired number of kernel-threads and for each
 | 
|---|
 | 685 |         // create a ring. Create the rings in the main so we can attach them
 | 
|---|
 | 686 |         // Since the rings are all in a dense VLA, aligned them so we don't get false sharing
 | 
|---|
 | 687 |         // it's unlikely but better safe than sorry
 | 
|---|
 | 688 |         struct __attribute__((aligned(128))) aligned_ring {
 | 
|---|
 | 689 |                 struct io_uring storage;
 | 
|---|
 | 690 |         };
 | 
|---|
| [f3e87af] | 691 |         aligned_ring thrd_rings[nthreads];
 | 
|---|
 | 692 |         pthread_t    thrd_hdls[nthreads];
 | 
|---|
 | 693 |         options_t    thrd_opts[nthreads];
 | 
|---|
| [761a246] | 694 |         bool no_drops  = true;
 | 
|---|
| [c05c58f] | 695 |         bool fast_poll = true;
 | 
|---|
 | 696 |         bool nfix_sqpl = true;
 | 
|---|
| [efdfdee] | 697 |         for(unsigned i = 0; i < nthreads; i++) {
 | 
|---|
| [c05c58f] | 698 |                 struct io_uring_params p = { };
 | 
|---|
| [761a246] | 699 | 
 | 
|---|
 | 700 |                 if(sqpoll) { // If sqpoll is on, add the flag
 | 
|---|
| [c05c58f] | 701 |                         p.flags |= IORING_SETUP_SQPOLL;
 | 
|---|
 | 702 |                         p.sq_thread_idle = 100;
 | 
|---|
| [f3e87af] | 703 |                 }
 | 
|---|
| [c05c58f] | 704 | 
 | 
|---|
| [761a246] | 705 |                 if (attach && i != 0) { // If attach is on, add the flag, except for the first ring
 | 
|---|
| [c05c58f] | 706 |                         p.flags |= IORING_SETUP_ATTACH_WQ;
 | 
|---|
| [f3e87af] | 707 |                         p.wq_fd = thrd_rings[0].storage.ring_fd;
 | 
|---|
 | 708 |                 }
 | 
|---|
| [761a246] | 709 | 
 | 
|---|
 | 710 |                 // Create the ring
 | 
|---|
| [c05c58f] | 711 |                 io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p);
 | 
|---|
 | 712 | 
 | 
|---|
| [761a246] | 713 |                 // Check if some of the note-worthy features are there
 | 
|---|
 | 714 |                 if(0 == (p.features & IORING_FEAT_NODROP         )) { no_drops  = false; }
 | 
|---|
| [c05c58f] | 715 |                 if(0 == (p.features & IORING_FEAT_FAST_POLL      )) { fast_poll = false; }
 | 
|---|
 | 716 |                 if(0 == (p.features & IORING_FEAT_SQPOLL_NONFIXED)) { nfix_sqpl = false; }
 | 
|---|
| [f3e87af] | 717 | 
 | 
|---|
| [761a246] | 718 |                 // Write the socket options we want to the options we pass to the threads
 | 
|---|
| [efdfdee] | 719 |                 thrd_opts[i].acpt.sockfd  = server_fd;
 | 
|---|
 | 720 |                 thrd_opts[i].acpt.addr    = (struct sockaddr *)&address;
 | 
|---|
 | 721 |                 thrd_opts[i].acpt.addrlen = (socklen_t*)&addrlen;
 | 
|---|
 | 722 |                 thrd_opts[i].acpt.flags   = 0;
 | 
|---|
| [c235179] | 723 |                 thrd_opts[i].acpt.cnt     = preaccept;
 | 
|---|
| [f3e87af] | 724 |                 thrd_opts[i].endfd        = efd;
 | 
|---|
 | 725 |                 thrd_opts[i].ring         = &thrd_rings[i].storage;
 | 
|---|
| [efdfdee] | 726 | 
 | 
|---|
 | 727 |                 int ret = pthread_create(&thrd_hdls[i], nullptr, proc_loop, &thrd_opts[i]);
 | 
|---|
 | 728 |                 if (ret < 0) {
 | 
|---|
 | 729 |                         std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
 | 730 |                         exit(EXIT_FAILURE);
 | 
|---|
 | 731 |                 }
 | 
|---|
 | 732 |         }
 | 
|---|
 | 733 | 
 | 
|---|
| [761a246] | 734 |         // Tell the user if the features are present
 | 
|---|
 | 735 |         if( no_drops ) std::cout << "No Drop Present" << std::endl;
 | 
|---|
| [c05c58f] | 736 |         if( fast_poll) std::cout << "Fast Poll Present" << std::endl;
 | 
|---|
 | 737 |         if(!nfix_sqpl) std::cout << "Non-Fixed SQ Poll not Present" << std::endl;
 | 
|---|
 | 738 | 
 | 
|---|
| [efdfdee] | 739 |         //===================
 | 
|---|
 | 740 |         // Server Started
 | 
|---|
 | 741 |         std::cout << "Server Started" << std::endl;
 | 
|---|
 | 742 |         {
 | 
|---|
 | 743 |                 char buffer[128];
 | 
|---|
 | 744 |                 int ret;
 | 
|---|
 | 745 |                 do {
 | 
|---|
| [761a246] | 746 |                         // Wait for a Ctrl-D to close the server
 | 
|---|
| [efdfdee] | 747 |                         ret = read(STDIN_FILENO, buffer, 128);
 | 
|---|
 | 748 |                         if(ret < 0) {
 | 
|---|
 | 749 |                                 std::cerr << "main read error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
 | 750 |                                 exit(EXIT_FAILURE);
 | 
|---|
 | 751 |                         }
 | 
|---|
 | 752 |                         else if(ret > 0) {
 | 
|---|
 | 753 |                                 std::cout << "User inputed '";
 | 
|---|
 | 754 |                                 std::cout.write(buffer, ret);
 | 
|---|
 | 755 |                                 std::cout << "'" << std::endl;
 | 
|---|
 | 756 |                         }
 | 
|---|
 | 757 |                 } while(ret != 0);
 | 
|---|
 | 758 | 
 | 
|---|
 | 759 |                 std::cout << "Shutdown received" << std::endl;
 | 
|---|
 | 760 |         }
 | 
|---|
 | 761 | 
 | 
|---|
 | 762 |         //===================
 | 
|---|
| [761a246] | 763 |         // Use eventfd_write to tell the threads we are closing
 | 
|---|
| [efdfdee] | 764 |         (std::cout << "Sending Shutdown to Threads... ").flush();
 | 
|---|
 | 765 |         ret = eventfd_write(efd, nthreads);
 | 
|---|
 | 766 |         if (ret < 0) {
 | 
|---|
 | 767 |                 std::cerr << "eventfd close error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
 | 768 |                 exit(EXIT_FAILURE);
 | 
|---|
 | 769 |         }
 | 
|---|
 | 770 |         std::cout << "done" << std::endl;
 | 
|---|
 | 771 | 
 | 
|---|
 | 772 |         //===================
 | 
|---|
| [761a246] | 773 |         // Join all the threads and close the rings
 | 
|---|
| [efdfdee] | 774 |         (std::cout << "Stopping Threads Done... ").flush();
 | 
|---|
 | 775 |         for(unsigned i = 0; i < nthreads; i++) {
 | 
|---|
 | 776 |                 void * retval;
 | 
|---|
 | 777 |                 int ret = pthread_join(thrd_hdls[i], &retval);
 | 
|---|
 | 778 |                 if (ret < 0) {
 | 
|---|
 | 779 |                         std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
 | 780 |                         exit(EXIT_FAILURE);
 | 
|---|
 | 781 |                 }
 | 
|---|
| [f3e87af] | 782 | 
 | 
|---|
 | 783 |                 io_uring_queue_exit(thrd_opts[i].ring);
 | 
|---|
| [efdfdee] | 784 |         }
 | 
|---|
 | 785 |         std::cout << "done" << std::endl;
 | 
|---|
 | 786 | 
 | 
|---|
 | 787 |         //===================
 | 
|---|
| [761a246] | 788 |         // Close the sockets
 | 
|---|
| [efdfdee] | 789 |         (std::cout << "Closing Socket... ").flush();
 | 
|---|
 | 790 |         ret = shutdown( server_fd, SHUT_RD );
 | 
|---|
 | 791 |         if( ret < 0 ) {
 | 
|---|
 | 792 |                 std::cerr << "shutdown socket error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
 | 793 |                 exit(EXIT_FAILURE);
 | 
|---|
 | 794 |         }
 | 
|---|
 | 795 | 
 | 
|---|
 | 796 |         ret = close(server_fd);
 | 
|---|
 | 797 |         if (ret < 0) {
 | 
|---|
 | 798 |                 std::cerr << "close socket error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
 | 799 |                 exit(EXIT_FAILURE);
 | 
|---|
 | 800 |         }
 | 
|---|
| [761a246] | 801 |         std::cout << "done" << std::endl << std::endl;
 | 
|---|
 | 802 | 
 | 
|---|
 | 803 |         // Print stats and exit
 | 
|---|
 | 804 |         std::cout << "Errors: " << global_stats.errors.conns << "c, (" << global_stats.errors.requests.pipes << "p, " << global_stats.errors.requests.reset << "r, " << global_stats.errors.requests.other << "o" << ")r, (" << global_stats.errors.answers.pipes << "p, " << global_stats.errors.answers.reset << "r, " << global_stats.errors.answers.other << "o" << ")a" << std::endl;
 | 
|---|
 | 805 |         std::cout << "Completions: " << global_stats.completions.conns << "c, " << global_stats.completions.reads << "r, " << global_stats.completions.writes << "w" << std::endl;
 | 
|---|
 | 806 |         std::cout << "Full Writes: " << global_stats.completions.full_writes << std::endl;
 | 
|---|
 | 807 |         std::cout << "Max FD: " << max_fd << std::endl;
 | 
|---|
 | 808 |         std::cout << "Successful connections: " << global_stats.conns.used << std::endl;
 | 
|---|
| [eeb4866] | 809 |         std::cout << "Max concurrent connections: " << global_stats.conns.max << std::endl;
 | 
|---|
| [761a246] | 810 |         std::cout << "Accepts on non-zeros: " << global_stats.recycle_errors << std::endl;
 | 
|---|
 | 811 |         std::cout << "Leaked conn objects: " << global_stats.conns.current << std::endl;
 | 
|---|
| [eeb4866] | 812 | }
 | 
|---|
 | 813 | 
 | 
|---|
 | 814 | // compile-command: "g++ http_ring.cpp -std=c++2a -pthread -luring -O3" //
 | 
|---|