| 1 | #include <cstdio>
 | 
|---|
| 2 | #include <cstdlib>
 | 
|---|
| 3 | #include <cstring>
 | 
|---|
| 4 | 
 | 
|---|
| 5 | #include <iostream>
 | 
|---|
| 6 | 
 | 
|---|
| 7 | #include <signal.h>
 | 
|---|
| 8 | #include <unistd.h>
 | 
|---|
| 9 | #include <liburing.h>
 | 
|---|
| 10 | 
 | 
|---|
| 11 | // #define NOBATCHING
 | 
|---|
| 12 | // #define USE_ASYNC
 | 
|---|
| 13 | 
 | 
|---|
| 14 | // Options passed to each threads
 | 
|---|
| 15 | struct __attribute__((aligned(128))) options_t {
 | 
|---|
| 16 |         // Data passed to accept
 | 
|---|
| 17 |         struct {
 | 
|---|
| 18 |                 int sockfd;
 | 
|---|
| 19 |                 struct sockaddr *addr;
 | 
|---|
| 20 |                 socklen_t *addrlen;
 | 
|---|
| 21 |                 int flags;
 | 
|---|
| 22 |                 unsigned cnt;
 | 
|---|
| 23 |         } acpt;
 | 
|---|
| 24 | 
 | 
|---|
| 25 |         // Termination notification
 | 
|---|
| 26 |         int endfd;
 | 
|---|
| 27 | 
 | 
|---|
| 28 |         // The ring to use for io
 | 
|---|
| 29 |         struct io_uring * ring;
 | 
|---|
| 30 | };
 | 
|---|
| 31 | 
 | 
|---|
| 32 | //=========================================================
 | 
|---|
| 33 | // General statistics
 | 
|---|
| 34 | struct __attribute__((aligned(128))) stats_block_t {
 | 
|---|
| 35 |         struct {
 | 
|---|
| 36 |                 volatile size_t conns = 0;
 | 
|---|
| 37 |                 volatile size_t reads = 0;
 | 
|---|
| 38 |                 volatile size_t writes = 0;
 | 
|---|
| 39 |                 volatile size_t full_writes = 0;
 | 
|---|
| 40 |         } completions;
 | 
|---|
| 41 | 
 | 
|---|
| 42 |         struct {
 | 
|---|
| 43 |                 volatile size_t conns = 0;
 | 
|---|
| 44 |                 struct {
 | 
|---|
| 45 |                         volatile size_t pipes = 0;
 | 
|---|
| 46 |                         volatile size_t reset = 0;
 | 
|---|
| 47 |                         volatile size_t other = 0;
 | 
|---|
| 48 |                 } requests;
 | 
|---|
| 49 | 
 | 
|---|
| 50 |                 struct {
 | 
|---|
| 51 |                         volatile size_t pipes = 0;
 | 
|---|
| 52 |                         volatile size_t reset = 0;
 | 
|---|
| 53 |                         volatile size_t other = 0;
 | 
|---|
| 54 |                 } answers;
 | 
|---|
| 55 |         } errors;
 | 
|---|
| 56 | 
 | 
|---|
| 57 |         struct {
 | 
|---|
| 58 |                 volatile size_t current = 0;
 | 
|---|
| 59 |                 volatile size_t max = 0;
 | 
|---|
| 60 |                 volatile size_t used = 0;
 | 
|---|
| 61 |         } conns;
 | 
|---|
| 62 | 
 | 
|---|
| 63 |         volatile size_t recycle_errors = 0;
 | 
|---|
| 64 | };
 | 
|---|
| 65 | 
 | 
|---|
| 66 | // Each thread gets its own block of stats
 | 
|---|
| 67 | // and there is a global block for tallying at the end
 | 
|---|
| 68 | thread_local stats_block_t stats;
 | 
|---|
| 69 | stats_block_t global_stats;
 | 
|---|
| 70 | 
 | 
|---|
| 71 | thread_local struct __attribute__((aligned(128))) {
 | 
|---|
| 72 |         size_t to_submit = 0;
 | 
|---|
| 73 | } local;
 | 
|---|
| 74 | 
 | 
|---|
| 75 | // Get an array of current connections
 | 
|---|
| 76 | // This is just for debugging, to make sure
 | 
|---|
| 77 | // no two state-machines get the same fd
 | 
|---|
| 78 | const size_t array_max = 25000;
 | 
|---|
| 79 | class connection * volatile conns[array_max] = { 0 };
 | 
|---|
| 80 | 
 | 
|---|
| 81 | // Max fd we've seen, keep track so it's convenient to adjust the array size after
 | 
|---|
| 82 | volatile int max_fd = 0;
 | 
|---|
| 83 | 
 | 
|---|
| 84 | //=========================================================
 | 
|---|
| 85 | // Some small wrappers for ring operations used outside the connection state machine
 | 
|---|
| 86 | // get sqe + error handling
 | 
|---|
| 87 | static struct io_uring_sqe * get_sqe(struct io_uring * ring) {
 | 
|---|
| 88 |         struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
 | 
|---|
| 89 |         if(!sqe) {
 | 
|---|
| 90 |                 std::cerr << "Insufficient entries in ring" << std::endl;
 | 
|---|
| 91 |                 exit(EXIT_FAILURE);
 | 
|---|
| 92 |         }
 | 
|---|
| 93 |         return sqe;
 | 
|---|
| 94 | }
 | 
|---|
| 95 | 
 | 
|---|
| 96 | // read of the event fd is not done by a connection
 | 
|---|
| 97 | // use nullptr as the user data
 | 
|---|
| 98 | static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) {
 | 
|---|
| 99 |         struct io_uring_sqe * sqe = get_sqe(ring);
 | 
|---|
| 100 |         io_uring_prep_read(sqe, fd, buffer, len, 0);
 | 
|---|
| 101 |         io_uring_sqe_set_data(sqe, nullptr);
 | 
|---|
| 102 |         io_uring_submit(ring);
 | 
|---|
| 103 | }
 | 
|---|
| 104 | 
 | 
|---|
| 105 | //=========================================================
 | 
|---|
| 106 | // All answers are fixed and determined by the return code
 | 
|---|
| 107 | enum HttpCode {
 | 
|---|
| 108 |         OK200 = 0,
 | 
|---|
| 109 |         E400,
 | 
|---|
| 110 |         E404,
 | 
|---|
| 111 |         E405,
 | 
|---|
| 112 |         E408,
 | 
|---|
| 113 |         E413,
 | 
|---|
| 114 |         E414,
 | 
|---|
| 115 |         KNOWN_CODES
 | 
|---|
| 116 | };
 | 
|---|
| 117 | 
 | 
|---|
| 118 | // Get a fix reply based on the return code
 | 
|---|
| 119 | const char * http_msgs[] = {
 | 
|---|
| 120 |         "HTTP/1.1 200 OK\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 15\r\nConnection: keep-alive\r\n\r\nHello, World!\r\n",
 | 
|---|
| 121 |         "HTTP/1.1 400 Bad Request\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
 | 
|---|
| 122 |         "HTTP/1.1 404 Not Found\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
 | 
|---|
| 123 |         "HTTP/1.1 405 Method Not \r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
 | 
|---|
| 124 |         "HTTP/1.1 408 Request Timeout\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
 | 
|---|
| 125 |         "HTTP/1.1 413 Payload Too Large\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
 | 
|---|
| 126 |         "HTTP/1.1 414 URI Too Long\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
 | 
|---|
| 127 | };
 | 
|---|
| 128 | static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0])) );
 | 
|---|
| 129 | 
 | 
|---|
| 130 | // Pre-compute the length of these replys
 | 
|---|
| 131 | const size_t http_lens[] = {
 | 
|---|
| 132 |         strlen(http_msgs[0]),
 | 
|---|
| 133 |         strlen(http_msgs[1]),
 | 
|---|
| 134 |         strlen(http_msgs[2]),
 | 
|---|
| 135 |         strlen(http_msgs[3]),
 | 
|---|
| 136 |         strlen(http_msgs[4]),
 | 
|---|
| 137 |         strlen(http_msgs[5]),
 | 
|---|
| 138 |         strlen(http_msgs[6]),
 | 
|---|
| 139 | };
 | 
|---|
| 140 | static_assert( KNOWN_CODES == (sizeof(http_lens) / sizeof(http_lens[0])) );
 | 
|---|
| 141 | 
 | 
|---|
| 142 | //=========================================================
 | 
|---|
| 143 | // Finate state machine responsible for handling each connection
 | 
|---|
| 144 | class __attribute__((aligned(128))) connection {
 | 
|---|
| 145 | private:
 | 
|---|
| 146 |         // The state of the machine
 | 
|---|
| 147 |         enum {
 | 
|---|
| 148 |                 ACCEPTING,  // Accept sent waiting for connection
 | 
|---|
| 149 |                 REQUESTING, // Waiting for new request
 | 
|---|
| 150 |                 ANSWERING,  // Either request received submitting answer or short answer sent, need to submit rest
 | 
|---|
| 151 |         } state;
 | 
|---|
| 152 | 
 | 
|---|
| 153 |         // The file descriptor of the connection
 | 
|---|
| 154 |         int fd;
 | 
|---|
| 155 | 
 | 
|---|
| 156 |         // request data
 | 
|---|
| 157 |         static const size_t buffer_size = 1024; // Size of the read buffer
 | 
|---|
| 158 |         const char * buffer;                      // Buffer into which requests are read
 | 
|---|
| 159 | 
 | 
|---|
| 160 |         // send data
 | 
|---|
| 161 |         size_t to_send;         // Data left to send
 | 
|---|
| 162 |         const char * iterator;  // Pointer to rest of the message to send
 | 
|---|
| 163 | 
 | 
|---|
| 164 |         // stats
 | 
|---|
| 165 |         // how many requests/answers were complete, that is, a valid cqe was obtained
 | 
|---|
| 166 |         struct {
 | 
|---|
| 167 |                 size_t requests = 0;
 | 
|---|
| 168 |                 size_t answers = 0;
 | 
|---|
| 169 |         } stats;
 | 
|---|
| 170 | 
 | 
|---|
| 171 | private:
 | 
|---|
| 172 |         connection()
 | 
|---|
| 173 |                 : state(ACCEPTING)
 | 
|---|
| 174 |                 , fd(0)
 | 
|---|
| 175 |                 , buffer( new char[buffer_size])
 | 
|---|
| 176 |                 , iterator(nullptr)
 | 
|---|
| 177 |         {}
 | 
|---|
| 178 | 
 | 
|---|
| 179 |         ~connection() {
 | 
|---|
| 180 |                 delete [] buffer;
 | 
|---|
| 181 |                 ::stats.conns.current--;
 | 
|---|
| 182 |         }
 | 
|---|
| 183 | 
 | 
|---|
| 184 |         // Close the current connection
 | 
|---|
| 185 |         void close(int err) {
 | 
|---|
| 186 |                 // std::cout << "(" << this->stats.requests << "," << this->stats.answers << ", e" << err << ") ";
 | 
|---|
| 187 |                 conns[fd] = nullptr;
 | 
|---|
| 188 | 
 | 
|---|
| 189 |                 if(fd != 0) {
 | 
|---|
| 190 |                         ::close(fd);
 | 
|---|
| 191 |                 }
 | 
|---|
| 192 |                 delete this;
 | 
|---|
| 193 |         }
 | 
|---|
| 194 | 
 | 
|---|
| 195 |         //--------------------------------------------------
 | 
|---|
| 196 |         // Wrappers for submit so we can tweak it more easily
 | 
|---|
| 197 |         static void submit(struct io_uring * ring, struct io_uring_sqe * sqe, connection * conn) {
 | 
|---|
| 198 |                 (void)ring;
 | 
|---|
| 199 |                 local.to_submit++;
 | 
|---|
| 200 |                 #ifdef USE_ASYNC
 | 
|---|
| 201 |                         io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
 | 
|---|
| 202 |                 #endif
 | 
|---|
| 203 |                 io_uring_sqe_set_data(sqe, conn);
 | 
|---|
| 204 |                 #ifdef NOBATCHING
 | 
|---|
| 205 |                         io_uring_submit(ring);
 | 
|---|
| 206 |                 #endif
 | 
|---|
| 207 |         }
 | 
|---|
| 208 | 
 | 
|---|
| 209 |         void submit(struct io_uring * ring, struct io_uring_sqe * sqe) {
 | 
|---|
| 210 |                 submit(ring, sqe, this);
 | 
|---|
| 211 |         }
 | 
|---|
| 212 | 
 | 
|---|
| 213 |         //--------------------------------------------------
 | 
|---|
| 214 |         // get a new request from the client
 | 
|---|
| 215 |         void request(struct io_uring * ring) {
 | 
|---|
| 216 |                 state = REQUESTING;
 | 
|---|
| 217 |                 struct io_uring_sqe * sqe = get_sqe(ring);
 | 
|---|
| 218 |                 io_uring_prep_recv(sqe, fd, (void*)buffer, buffer_size, 0);
 | 
|---|
| 219 |                 submit(ring, sqe);
 | 
|---|
| 220 |         }
 | 
|---|
| 221 | 
 | 
|---|
| 222 |         //--------------------------------------------------
 | 
|---|
| 223 |         // Send a new answer based on a return code
 | 
|---|
| 224 |         void answer(struct io_uring * ring, HttpCode code) {
 | 
|---|
| 225 |                 iterator = http_msgs[code];
 | 
|---|
| 226 |                 to_send  = http_lens[code];
 | 
|---|
| 227 |                 if(to_send != 124) {
 | 
|---|
| 228 |                         std::cerr << "Answer has weird size: " << to_send << " (" << (int)code << ")" << std::endl;
 | 
|---|
| 229 |                 }
 | 
|---|
| 230 |                 answer(ring);
 | 
|---|
| 231 |         }
 | 
|---|
| 232 | 
 | 
|---|
| 233 |         // send a new answer to the client
 | 
|---|
| 234 |         // Reused for incomplete writes
 | 
|---|
| 235 |         void answer(struct io_uring * ring) {
 | 
|---|
| 236 |                 state = ANSWERING;
 | 
|---|
| 237 |                 struct io_uring_sqe * sqe = get_sqe(ring);
 | 
|---|
| 238 |                 io_uring_prep_send(sqe, fd, iterator, to_send, 0);
 | 
|---|
| 239 |                 submit(ring, sqe);
 | 
|---|
| 240 |         }
 | 
|---|
| 241 | 
 | 
|---|
| 242 |         //--------------------------------------------------
 | 
|---|
| 243 |         // Handle a new connection, results for getting an cqe while in the ACCEPTING state
 | 
|---|
| 244 |         void newconn(struct io_uring * ring, int ret) {
 | 
|---|
| 245 |                 // Check errors
 | 
|---|
| 246 |                 if( ret < 0 ) {
 | 
|---|
| 247 |                         int err = -ret;
 | 
|---|
| 248 |                         if( err == ECONNABORTED ) {
 | 
|---|
| 249 |                                 ::stats.errors.conns++;
 | 
|---|
| 250 |                                 this->close(err);
 | 
|---|
| 251 |                                 return;
 | 
|---|
| 252 |                         }
 | 
|---|
| 253 |                         std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
| 254 |                         exit(EXIT_FAILURE);
 | 
|---|
| 255 |                 }
 | 
|---|
| 256 | 
 | 
|---|
| 257 |                 // Count the connections
 | 
|---|
| 258 |                 ::stats.completions.conns++;
 | 
|---|
| 259 |                 ::stats.conns.current++;
 | 
|---|
| 260 |                 if(::stats.conns.current > ::stats.conns.max) {
 | 
|---|
| 261 |                         ::stats.conns.max = ::stats.conns.current;
 | 
|---|
| 262 |                 }
 | 
|---|
| 263 | 
 | 
|---|
| 264 |                 // Read on the data
 | 
|---|
| 265 |                 fd = ret;
 | 
|---|
| 266 |                 request(ring);
 | 
|---|
| 267 | 
 | 
|---|
| 268 |                 // check the max fd so we know if we exceeded the array
 | 
|---|
| 269 |                 for(;;) {
 | 
|---|
| 270 |                         int expected = max_fd;
 | 
|---|
| 271 |                         if(expected >= fd) return;
 | 
|---|
| 272 |                         if( __atomic_compare_exchange_n(&max_fd, &expected, fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) return;
 | 
|---|
| 273 |                 }
 | 
|---|
| 274 | 
 | 
|---|
| 275 |                 // check if we have enough space to fit inside the array
 | 
|---|
| 276 |                 if(fd >= array_max) {
 | 
|---|
| 277 |                         std::cerr << "accept error: fd " << fd << " is too high" << std::endl;
 | 
|---|
| 278 |                         return;
 | 
|---|
| 279 |                 }
 | 
|---|
| 280 | 
 | 
|---|
| 281 |                 // Put our connection into the global array
 | 
|---|
| 282 |                 // No one else should be using it so if they are that's a bug
 | 
|---|
| 283 |                 auto exist = __atomic_exchange_n( &conns[fd], this, __ATOMIC_SEQ_CST);
 | 
|---|
| 284 |                 if( exist ) {
 | 
|---|
| 285 |                         size_t first = __atomic_fetch_add(&global_stats.recycle_errors, 1, __ATOMIC_SEQ_CST);
 | 
|---|
| 286 |                         if( first == 0 ) {
 | 
|---|
| 287 |                                 std::cerr << "First: accept has existing connection " << std::endl;
 | 
|---|
| 288 |                         }
 | 
|---|
| 289 |                 }
 | 
|---|
| 290 |         }
 | 
|---|
| 291 | 
 | 
|---|
| 292 |         // Handle a new request, results for getting an cqe while in the REQUESTING state
 | 
|---|
| 293 |         void newrequest(struct io_uring * ring, int res) {
 | 
|---|
| 294 |                 // Check errors
 | 
|---|
| 295 |                 if( res < 0 ) {
 | 
|---|
| 296 |                         int err = -res;
 | 
|---|
| 297 |                         switch(err) {
 | 
|---|
| 298 |                                 case EPIPE:
 | 
|---|
| 299 |                                         ::stats.errors.requests.pipes++;
 | 
|---|
| 300 |                                         break;
 | 
|---|
| 301 |                                         // Don't fall through the get better stats
 | 
|---|
| 302 |                                 case ECONNRESET:
 | 
|---|
| 303 |                                         ::stats.errors.requests.reset++;
 | 
|---|
| 304 |                                         break;
 | 
|---|
| 305 |                                 default:
 | 
|---|
| 306 |                                         ::stats.errors.requests.other++;
 | 
|---|
| 307 |                                         std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl;
 | 
|---|
| 308 |                                         exit(EXIT_FAILURE);
 | 
|---|
| 309 |                         }
 | 
|---|
| 310 | 
 | 
|---|
| 311 |                         // Connection failed, close it
 | 
|---|
| 312 |                         this->close(err);
 | 
|---|
| 313 |                         return;
 | 
|---|
| 314 |                 }
 | 
|---|
| 315 | 
 | 
|---|
| 316 |                 // Update stats
 | 
|---|
| 317 |                 ::stats.completions.reads++;
 | 
|---|
| 318 | 
 | 
|---|
| 319 |                 // Is this an EOF
 | 
|---|
| 320 |                 if(res == 0) {
 | 
|---|
| 321 |                         // Yes, close the connection
 | 
|---|
| 322 |                         this->close(0);
 | 
|---|
| 323 |                         return;
 | 
|---|
| 324 |                 }
 | 
|---|
| 325 | 
 | 
|---|
| 326 |                 // Find the end of the request header
 | 
|---|
| 327 |                 const char * it = buffer;
 | 
|---|
| 328 |                 if( !strstr( it, "\r\n\r\n" ) ) {
 | 
|---|
| 329 |                         // This state machine doesn't support incomplete reads
 | 
|---|
| 330 |                         // Print them to output so it's clear there is an issue
 | 
|---|
| 331 |                         std::cout << "Incomplete request" << std::endl;
 | 
|---|
| 332 |                         this->close(EBADR);
 | 
|---|
| 333 |                         return;
 | 
|---|
| 334 |                 }
 | 
|---|
| 335 | 
 | 
|---|
| 336 |                 // Find the method to use
 | 
|---|
| 337 |                 it = buffer;
 | 
|---|
| 338 |                 int ret = memcmp(it, "GET ", 4);
 | 
|---|
| 339 |                 if( ret != 0 ) {
 | 
|---|
| 340 |                         // We only support get, answer with an error
 | 
|---|
| 341 |                         answer(ring, E400);
 | 
|---|
| 342 |                         return;
 | 
|---|
| 343 |                 }
 | 
|---|
| 344 | 
 | 
|---|
| 345 |                 // Find the target
 | 
|---|
| 346 |                 it += 4;
 | 
|---|
| 347 |                 ret = memcmp(it, "/plaintext", 10);
 | 
|---|
| 348 |                 if( ret != 0 ) {
 | 
|---|
| 349 |                         // We only support /plaintext, answer with an error
 | 
|---|
| 350 |                         answer(ring, E404);
 | 
|---|
| 351 |                         return;
 | 
|---|
| 352 |                 }
 | 
|---|
| 353 | 
 | 
|---|
| 354 |                 // Correct request, answer with the payload
 | 
|---|
| 355 |                 this->stats.requests++;
 | 
|---|
| 356 |                 answer(ring, OK200);
 | 
|---|
| 357 |         }
 | 
|---|
| 358 | 
 | 
|---|
| 359 |         // Handle a partial or full answer sent, results for getting an cqe while in the ANSWERING state
 | 
|---|
| 360 |         void writedone(struct io_uring * ring, int res) {
 | 
|---|
| 361 |                 // Check errors
 | 
|---|
| 362 |                 if( res < 0 ) {
 | 
|---|
| 363 |                         int err = -res;
 | 
|---|
| 364 |                         switch(err) {
 | 
|---|
| 365 |                                 case EPIPE:
 | 
|---|
| 366 |                                         ::stats.errors.answers.pipes++;
 | 
|---|
| 367 |                                         break;
 | 
|---|
| 368 |                                         // Don't fall through the get better stats
 | 
|---|
| 369 |                                 case ECONNRESET:
 | 
|---|
| 370 |                                         ::stats.errors.answers.reset++;
 | 
|---|
| 371 |                                         break;
 | 
|---|
| 372 |                                 default:
 | 
|---|
| 373 |                                         ::stats.errors.answers.other++;
 | 
|---|
| 374 |                                         std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
 | 
|---|
| 375 |                                         exit(EXIT_FAILURE);
 | 
|---|
| 376 |                         }
 | 
|---|
| 377 | 
 | 
|---|
| 378 |                         this->close(err);
 | 
|---|
| 379 |                         return;
 | 
|---|
| 380 |                 }
 | 
|---|
| 381 | 
 | 
|---|
| 382 |                 // Update stats
 | 
|---|
| 383 |                 ::stats.completions.writes++;
 | 
|---|
| 384 |                 if(res == 124) ::stats.completions.full_writes++;
 | 
|---|
| 385 | 
 | 
|---|
| 386 |                 // Is this write completed
 | 
|---|
| 387 |                 if( res == to_send ) {
 | 
|---|
| 388 |                         // Yes, more stats
 | 
|---|
| 389 |                         this->stats.answers++;
 | 
|---|
| 390 |                         if(this->stats.answers == 1) ::stats.conns.used++;
 | 
|---|
| 391 |                         // Then read a new request
 | 
|---|
| 392 |                         request(ring);
 | 
|---|
| 393 |                         return;
 | 
|---|
| 394 |                 }
 | 
|---|
| 395 | 
 | 
|---|
| 396 |                 // Not a completed read, push the rest
 | 
|---|
| 397 |                 to_send -= res;
 | 
|---|
| 398 |                 iterator += res;
 | 
|---|
| 399 |                 answer(ring);
 | 
|---|
| 400 |         }
 | 
|---|
| 401 | public:
 | 
|---|
| 402 |         // Submit a call to accept and create a new connection object
 | 
|---|
| 403 |         static void accept(struct io_uring * ring, const struct options_t & opt) {
 | 
|---|
| 404 |                 struct io_uring_sqe * sqe = get_sqe(ring);
 | 
|---|
| 405 |                 io_uring_prep_accept(sqe, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
 | 
|---|
| 406 |                 submit(ring, sqe, new connection());
 | 
|---|
| 407 |                 // std::cout << "Submitted accept: " << req << std::endl;
 | 
|---|
| 408 |         }
 | 
|---|
| 409 | 
 | 
|---|
| 410 |         // Handle a new cqe
 | 
|---|
| 411 |         void handle(struct io_uring * ring, int res, const struct options_t & opt) {
 | 
|---|
| 412 |                 switch(state) {
 | 
|---|
| 413 |                 case ACCEPTING:
 | 
|---|
| 414 |                         // connection::accept(ring, opt);
 | 
|---|
| 415 |                         newconn(ring, res);
 | 
|---|
| 416 |                         break;
 | 
|---|
| 417 |                 case REQUESTING:
 | 
|---|
| 418 |                         newrequest(ring, res);
 | 
|---|
| 419 |                         break;
 | 
|---|
| 420 |                 case ANSWERING:
 | 
|---|
| 421 |                         writedone(ring, res);
 | 
|---|
| 422 |                         break;
 | 
|---|
| 423 |                 }
 | 
|---|
| 424 |         }
 | 
|---|
| 425 | };
 | 
|---|
| 426 | 
 | 
|---|
| 427 | //=========================================================
 | 
|---|
| 428 | extern "C" {
 | 
|---|
| 429 |         #include <sys/eventfd.h>  // use for termination
 | 
|---|
| 430 | }
 | 
|---|
| 431 | 
 | 
|---|
| 432 | // Main loop of the WebServer
 | 
|---|
| 433 | // Effectively uses one thread_local copy of everything per kernel thread
 | 
|---|
| 434 | void * proc_loop(void * arg) {
 | 
|---|
| 435 |         // Get the thread local argument
 | 
|---|
| 436 |         struct options_t & opt = *(struct options_t *)arg;
 | 
|---|
| 437 |         struct io_uring * ring = opt.ring;
 | 
|---|
| 438 | 
 | 
|---|
| 439 |         int blockfd = eventfd(0, 0);
 | 
|---|
| 440 |         if (blockfd < 0) {
 | 
|---|
| 441 |                 fprintf( stderr, "eventfd create error: (%d) %s\n", (int)errno, strerror(errno) );
 | 
|---|
| 442 |                 exit(EXIT_FAILURE);
 | 
|---|
| 443 |         }
 | 
|---|
| 444 | 
 | 
|---|
| 445 |         int ret = io_uring_register_eventfd(ring, blockfd);
 | 
|---|
| 446 |         if (ret < 0) {
 | 
|---|
| 447 |                 fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );
 | 
|---|
| 448 |                 exit(EXIT_FAILURE);
 | 
|---|
| 449 |         }
 | 
|---|
| 450 | 
 | 
|---|
| 451 |         // Track the shutdown using a event_fd
 | 
|---|
| 452 |         char endfd_buf[8];
 | 
|---|
| 453 |         ring_end(ring, opt.endfd, endfd_buf, 8);
 | 
|---|
| 454 | 
 | 
|---|
| 455 |         // Accept our first connection
 | 
|---|
| 456 |         // May not take effect until io_uring_submit_and_wait
 | 
|---|
| 457 |         for(unsigned i = 0; i < opt.acpt.cnt; i++) {
 | 
|---|
| 458 |                 connection::accept(ring, opt);
 | 
|---|
| 459 |         }
 | 
|---|
| 460 | 
 | 
|---|
| 461 |         int reset = 1;       // Counter to print stats once in a while
 | 
|---|
| 462 |         bool done = false;   // Are we done
 | 
|---|
| 463 |         size_t sqes = 0;     // Number of sqes we submitted
 | 
|---|
| 464 |         size_t call = 0;     // Number of submits we made
 | 
|---|
| 465 |         while(!done) {
 | 
|---|
| 466 |                 // Submit all the answers we have and wait for responses
 | 
|---|
| 467 |                 int ret = io_uring_submit(ring);
 | 
|---|
| 468 |                 local.to_submit = 0;
 | 
|---|
| 469 | 
 | 
|---|
| 470 |                 // check errors
 | 
|---|
| 471 |                 if (ret < 0) {
 | 
|---|
| 472 |                         fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );
 | 
|---|
| 473 |                         exit(EXIT_FAILURE);
 | 
|---|
| 474 |                 }
 | 
|---|
| 475 | 
 | 
|---|
| 476 |                 // Check how good we are at batching sqes
 | 
|---|
| 477 |                 sqes += ret;
 | 
|---|
| 478 |                 call++;
 | 
|---|
| 479 | 
 | 
|---|
| 480 | 
 | 
|---|
| 481 |                 eventfd_t val;
 | 
|---|
| 482 |                 ret = eventfd_read(blockfd, &val);
 | 
|---|
| 483 | 
 | 
|---|
| 484 |                 // check errors
 | 
|---|
| 485 |                 if (ret < 0) {
 | 
|---|
| 486 |                         fprintf( stderr, "eventfd read error: (%d) %s\n", (int)errno, strerror(errno) );
 | 
|---|
| 487 |                         exit(EXIT_FAILURE);
 | 
|---|
| 488 |                 }
 | 
|---|
| 489 | 
 | 
|---|
| 490 |                 struct io_uring_cqe *cqe;
 | 
|---|
| 491 |                 unsigned head;
 | 
|---|
| 492 |                 unsigned count = 0;
 | 
|---|
| 493 | 
 | 
|---|
| 494 |                 // go through all cqes
 | 
|---|
| 495 |                 io_uring_for_each_cqe(ring, head, cqe) {
 | 
|---|
| 496 |                         if (0 == cqe->user_data) {
 | 
|---|
| 497 |                                 done = true;
 | 
|---|
| 498 |                                 break;
 | 
|---|
| 499 |                         }
 | 
|---|
| 500 | 
 | 
|---|
| 501 |                         if(local.to_submit > 30) break;
 | 
|---|
| 502 | 
 | 
|---|
| 503 |                         auto req = (class connection *)cqe->user_data;
 | 
|---|
| 504 |                         req->handle( ring, cqe->res, opt );
 | 
|---|
| 505 | 
 | 
|---|
| 506 |                         // Every now and then, print some stats
 | 
|---|
| 507 |                         reset--;
 | 
|---|
| 508 |                         if(reset == 0) {
 | 
|---|
| 509 |                                 std::cout << "Submit average: " << sqes << "/" << call << "(" << (((double)sqes) / call) << ")" << std::endl;
 | 
|---|
| 510 |                                 // Reset to some random number of completions
 | 
|---|
| 511 |                                 // use the ring_fd in the number of threads don't all print at once
 | 
|---|
| 512 |                                 reset = 100000 + (100000 * (ring->ring_fd % 5));
 | 
|---|
| 513 |                         }
 | 
|---|
| 514 | 
 | 
|---|
| 515 |                         // Keep track of how many cqes we have seen
 | 
|---|
| 516 |                         count++;
 | 
|---|
| 517 |                 }
 | 
|---|
| 518 | 
 | 
|---|
| 519 |                 // Mark the cqes as seen
 | 
|---|
| 520 |                 io_uring_cq_advance(ring, count);
 | 
|---|
| 521 |         }
 | 
|---|
| 522 | 
 | 
|---|
| 523 |         // Tally all the thread local statistics
 | 
|---|
| 524 |         __atomic_fetch_add( &global_stats.completions.conns, ::stats.completions.conns, __ATOMIC_SEQ_CST );
 | 
|---|
| 525 |         __atomic_fetch_add( &global_stats.completions.reads, ::stats.completions.reads, __ATOMIC_SEQ_CST );
 | 
|---|
| 526 |         __atomic_fetch_add( &global_stats.completions.writes, ::stats.completions.writes, __ATOMIC_SEQ_CST );
 | 
|---|
| 527 |         __atomic_fetch_add( &global_stats.completions.full_writes, ::stats.completions.full_writes, __ATOMIC_SEQ_CST );
 | 
|---|
| 528 |         __atomic_fetch_add( &global_stats.errors.conns, ::stats.errors.conns, __ATOMIC_SEQ_CST );
 | 
|---|
| 529 |         __atomic_fetch_add( &global_stats.errors.requests.pipes, ::stats.errors.requests.pipes, __ATOMIC_SEQ_CST );
 | 
|---|
| 530 |         __atomic_fetch_add( &global_stats.errors.requests.reset, ::stats.errors.requests.reset, __ATOMIC_SEQ_CST );
 | 
|---|
| 531 |         __atomic_fetch_add( &global_stats.errors.requests.other, ::stats.errors.requests.other, __ATOMIC_SEQ_CST );
 | 
|---|
| 532 |         __atomic_fetch_add( &global_stats.errors.answers.pipes, ::stats.errors.answers.pipes, __ATOMIC_SEQ_CST );
 | 
|---|
| 533 |         __atomic_fetch_add( &global_stats.errors.answers.reset, ::stats.errors.answers.reset, __ATOMIC_SEQ_CST );
 | 
|---|
| 534 |         __atomic_fetch_add( &global_stats.errors.answers.other, ::stats.errors.answers.other, __ATOMIC_SEQ_CST );
 | 
|---|
| 535 |         __atomic_fetch_add( &global_stats.conns.current, ::stats.conns.current, __ATOMIC_SEQ_CST );
 | 
|---|
| 536 |         __atomic_fetch_add( &global_stats.conns.max, ::stats.conns.max, __ATOMIC_SEQ_CST );
 | 
|---|
| 537 |         __atomic_fetch_add( &global_stats.conns.used, ::stats.conns.used, __ATOMIC_SEQ_CST );
 | 
|---|
| 538 | 
 | 
|---|
| 539 |         return nullptr;
 | 
|---|
| 540 | }
 | 
|---|
| 541 | 
 | 
|---|
| 542 | //=========================================================
 | 
|---|
| 543 | #include <bit> // for ispow2
 | 
|---|
| 544 | 
 | 
|---|
| 545 | extern "C" {
 | 
|---|
| 546 |         #include <pthread.h>      // for pthreads
 | 
|---|
| 547 |         #include <signal.h>       // for signal(SIGPIPE, SIG_IGN);
 | 
|---|
| 548 |         #include <sys/socket.h>   // for sockets in general
 | 
|---|
| 549 |         #include <netinet/in.h>   // for sockaddr_in, AF_INET
 | 
|---|
| 550 | }
 | 
|---|
| 551 | 
 | 
|---|
| 552 | int main(int argc, char * argv[]) {
 | 
|---|
| 553 |         // Initialize the array of connection-fd associations
 | 
|---|
| 554 |         for(int i = 0; i < array_max; i++) {
 | 
|---|
| 555 |                 conns[i] = nullptr;
 | 
|---|
| 556 |         }
 | 
|---|
| 557 | 
 | 
|---|
| 558 |         // Make sure we ignore all sigpipes
 | 
|---|
| 559 |         signal(SIGPIPE, SIG_IGN);
 | 
|---|
| 560 | 
 | 
|---|
| 561 |         // Default command line arguments
 | 
|---|
| 562 |         unsigned nthreads = 1;      // number of kernel threads
 | 
|---|
| 563 |         unsigned port = 8800;       // which port to listen on
 | 
|---|
| 564 |         unsigned entries = 256;     // number of entries per ring/kernel thread
 | 
|---|
| 565 |         unsigned backlog = 262144;  // backlog argument to listen
 | 
|---|
| 566 |         unsigned preaccept = 1;     // start by accepting X per threads
 | 
|---|
| 567 |         bool attach = false;        // Whether or not to attach all the rings
 | 
|---|
| 568 |         bool sqpoll = false;        // Whether or not to use SQ Polling
 | 
|---|
| 569 | 
 | 
|---|
| 570 |         //===================
 | 
|---|
| 571 |         // Arguments Parsing
 | 
|---|
| 572 |         int c;
 | 
|---|
| 573 |         while ((c = getopt (argc, argv, "t:p:e:b:c:aS")) != -1) {
 | 
|---|
| 574 |                 switch (c)
 | 
|---|
| 575 |                 {
 | 
|---|
| 576 |                 case 't':
 | 
|---|
| 577 |                         nthreads = atoi(optarg);
 | 
|---|
| 578 |                         break;
 | 
|---|
| 579 |                 case 'p':
 | 
|---|
| 580 |                         port = atoi(optarg);
 | 
|---|
| 581 |                         break;
 | 
|---|
| 582 |                 case 'e':
 | 
|---|
| 583 |                         entries = atoi(optarg);
 | 
|---|
| 584 |                         break;
 | 
|---|
| 585 |                 case 'b':
 | 
|---|
| 586 |                         backlog = atoi(optarg);
 | 
|---|
| 587 |                         break;
 | 
|---|
| 588 |                 case 'c':
 | 
|---|
| 589 |                         preaccept = atoi(optarg);
 | 
|---|
| 590 |                         break;
 | 
|---|
| 591 |                 case 'a':
 | 
|---|
| 592 |                         attach = true;
 | 
|---|
| 593 |                         break;
 | 
|---|
| 594 |                 case 'S':
 | 
|---|
| 595 |                         sqpoll = true;
 | 
|---|
| 596 |                         break;
 | 
|---|
| 597 |                 case '?':
 | 
|---|
| 598 |                 default:
 | 
|---|
| 599 |                         std::cerr << "Usage: -t <threads> -p <port> -e <entries> -b <backlog> -aS" << std::endl;
 | 
|---|
| 600 |                         return EXIT_FAILURE;
 | 
|---|
| 601 |                 }
 | 
|---|
| 602 |         }
 | 
|---|
| 603 | 
 | 
|---|
| 604 |         if( !std::ispow2(entries) ) {
 | 
|---|
| 605 |                 unsigned v = entries;
 | 
|---|
| 606 |                 v--;
 | 
|---|
| 607 |                 v |= v >> 1;
 | 
|---|
| 608 |                 v |= v >> 2;
 | 
|---|
| 609 |                 v |= v >> 4;
 | 
|---|
| 610 |                 v |= v >> 8;
 | 
|---|
| 611 |                 v |= v >> 16;
 | 
|---|
| 612 |                 v++;
 | 
|---|
| 613 |                 std::cerr << "Warning: num_entries not a power of 2 (" << entries << ") raising to " << v << std::endl;
 | 
|---|
| 614 |                 entries = v;
 | 
|---|
| 615 |         }
 | 
|---|
| 616 | 
 | 
|---|
| 617 |         //===================
 | 
|---|
| 618 |         // End FD
 | 
|---|
| 619 |         // Create a single event fd to notify the kernel threads when the server shutsdown
 | 
|---|
| 620 |         int efd = eventfd(0, EFD_SEMAPHORE);
 | 
|---|
| 621 |         if (efd < 0) {
 | 
|---|
| 622 |                 std::cerr << "eventfd error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
| 623 |                 exit(EXIT_FAILURE);
 | 
|---|
| 624 |         }
 | 
|---|
| 625 | 
 | 
|---|
| 626 |         //===================
 | 
|---|
| 627 |         // Open Socket
 | 
|---|
| 628 |         // Listen on specified port
 | 
|---|
| 629 |         std::cout << getpid() << " : Listening on port " << port << std::endl;
 | 
|---|
| 630 |         int server_fd = socket(AF_INET, SOCK_STREAM, 0);
 | 
|---|
| 631 |         if(server_fd < 0) {
 | 
|---|
| 632 |                 std::cerr << "socket error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
| 633 |                 exit(EXIT_FAILURE);
 | 
|---|
| 634 |         }
 | 
|---|
| 635 | 
 | 
|---|
| 636 |         int ret = 0;
 | 
|---|
| 637 |         struct sockaddr_in address;
 | 
|---|
| 638 |         int addrlen = sizeof(address);
 | 
|---|
| 639 |         memset( (char *)&address, '\0', addrlen );
 | 
|---|
| 640 |         address.sin_family = AF_INET;
 | 
|---|
| 641 |         address.sin_addr.s_addr = htonl(INADDR_ANY);
 | 
|---|
| 642 |         address.sin_port = htons( port );
 | 
|---|
| 643 | 
 | 
|---|
| 644 |         // In case the port is already in use, don't just return an error
 | 
|---|
| 645 |         // Linux is very slow at reclaiming port so just retry regularly
 | 
|---|
| 646 |         int waited = 0;
 | 
|---|
| 647 |         while(true) {
 | 
|---|
| 648 |                 ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) );
 | 
|---|
| 649 |                 if(ret < 0) {
 | 
|---|
| 650 |                         if(errno == EADDRINUSE) {
 | 
|---|
| 651 |                                 // Port is in used let's retry later
 | 
|---|
| 652 |                                 if(waited == 0) {
 | 
|---|
| 653 |                                         std::cerr << "Waiting for port" << std::endl;
 | 
|---|
| 654 |                                 } else {
 | 
|---|
| 655 |                                         // To be cure, print how long we have been waiting
 | 
|---|
| 656 |                                         std::cerr << "\r" << waited;
 | 
|---|
| 657 |                                         std::cerr.flush();
 | 
|---|
| 658 |                                 }
 | 
|---|
| 659 |                                 waited ++;
 | 
|---|
| 660 |                                 usleep( 1000000 ); // Wait and retry
 | 
|---|
| 661 |                                 continue;
 | 
|---|
| 662 |                         }
 | 
|---|
| 663 |                         // Some other error occured, this is a real error
 | 
|---|
| 664 |                         std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
| 665 |                         exit(EXIT_FAILURE);
 | 
|---|
| 666 |                 }
 | 
|---|
| 667 |                 break;
 | 
|---|
| 668 |         }
 | 
|---|
| 669 | 
 | 
|---|
| 670 |         ret = listen( server_fd, backlog );
 | 
|---|
| 671 |         if(ret < 0) {
 | 
|---|
| 672 |                 std::cerr << "listen error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
| 673 |                 exit(EXIT_FAILURE);
 | 
|---|
| 674 |         }
 | 
|---|
| 675 | 
 | 
|---|
| 676 |         //===================
 | 
|---|
| 677 |         // Run Server Threads
 | 
|---|
| 678 |         std::cout << "Starting " << nthreads << " Threads";
 | 
|---|
| 679 |         if(attach) {
 | 
|---|
| 680 |                 std::cout << " with attached Rings";
 | 
|---|
| 681 |         }
 | 
|---|
| 682 |         std::cout << std::endl;
 | 
|---|
| 683 | 
 | 
|---|
| 684 |         // Create the desired number of kernel-threads and for each
 | 
|---|
| 685 |         // create a ring. Create the rings in the main so we can attach them
 | 
|---|
| 686 |         // Since the rings are all in a dense VLA, aligned them so we don't get false sharing
 | 
|---|
| 687 |         // it's unlikely but better safe than sorry
 | 
|---|
| 688 |         struct __attribute__((aligned(128))) aligned_ring {
 | 
|---|
| 689 |                 struct io_uring storage;
 | 
|---|
| 690 |         };
 | 
|---|
| 691 |         aligned_ring thrd_rings[nthreads];
 | 
|---|
| 692 |         pthread_t    thrd_hdls[nthreads];
 | 
|---|
| 693 |         options_t    thrd_opts[nthreads];
 | 
|---|
| 694 |         bool no_drops  = true;
 | 
|---|
| 695 |         bool fast_poll = true;
 | 
|---|
| 696 |         bool nfix_sqpl = true;
 | 
|---|
| 697 |         for(unsigned i = 0; i < nthreads; i++) {
 | 
|---|
| 698 |                 struct io_uring_params p = { };
 | 
|---|
| 699 | 
 | 
|---|
| 700 |                 if(sqpoll) { // If sqpoll is on, add the flag
 | 
|---|
| 701 |                         p.flags |= IORING_SETUP_SQPOLL;
 | 
|---|
| 702 |                         p.sq_thread_idle = 100;
 | 
|---|
| 703 |                 }
 | 
|---|
| 704 | 
 | 
|---|
| 705 |                 if (attach && i != 0) { // If attach is on, add the flag, except for the first ring
 | 
|---|
| 706 |                         p.flags |= IORING_SETUP_ATTACH_WQ;
 | 
|---|
| 707 |                         p.wq_fd = thrd_rings[0].storage.ring_fd;
 | 
|---|
| 708 |                 }
 | 
|---|
| 709 | 
 | 
|---|
| 710 |                 // Create the ring
 | 
|---|
| 711 |                 io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p);
 | 
|---|
| 712 | 
 | 
|---|
| 713 |                 // Check if some of the note-worthy features are there
 | 
|---|
| 714 |                 if(0 == (p.features & IORING_FEAT_NODROP         )) { no_drops  = false; }
 | 
|---|
| 715 |                 if(0 == (p.features & IORING_FEAT_FAST_POLL      )) { fast_poll = false; }
 | 
|---|
| 716 |                 if(0 == (p.features & IORING_FEAT_SQPOLL_NONFIXED)) { nfix_sqpl = false; }
 | 
|---|
| 717 | 
 | 
|---|
| 718 |                 // Write the socket options we want to the options we pass to the threads
 | 
|---|
| 719 |                 thrd_opts[i].acpt.sockfd  = server_fd;
 | 
|---|
| 720 |                 thrd_opts[i].acpt.addr    = (struct sockaddr *)&address;
 | 
|---|
| 721 |                 thrd_opts[i].acpt.addrlen = (socklen_t*)&addrlen;
 | 
|---|
| 722 |                 thrd_opts[i].acpt.flags   = 0;
 | 
|---|
| 723 |                 thrd_opts[i].acpt.cnt     = preaccept;
 | 
|---|
| 724 |                 thrd_opts[i].endfd        = efd;
 | 
|---|
| 725 |                 thrd_opts[i].ring         = &thrd_rings[i].storage;
 | 
|---|
| 726 | 
 | 
|---|
| 727 |                 int ret = pthread_create(&thrd_hdls[i], nullptr, proc_loop, &thrd_opts[i]);
 | 
|---|
| 728 |                 if (ret < 0) {
 | 
|---|
| 729 |                         std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
| 730 |                         exit(EXIT_FAILURE);
 | 
|---|
| 731 |                 }
 | 
|---|
| 732 |         }
 | 
|---|
| 733 | 
 | 
|---|
| 734 |         // Tell the user if the features are present
 | 
|---|
| 735 |         if( no_drops ) std::cout << "No Drop Present" << std::endl;
 | 
|---|
| 736 |         if( fast_poll) std::cout << "Fast Poll Present" << std::endl;
 | 
|---|
| 737 |         if(!nfix_sqpl) std::cout << "Non-Fixed SQ Poll not Present" << std::endl;
 | 
|---|
| 738 | 
 | 
|---|
| 739 |         //===================
 | 
|---|
| 740 |         // Server Started
 | 
|---|
| 741 |         std::cout << "Server Started" << std::endl;
 | 
|---|
| 742 |         {
 | 
|---|
| 743 |                 char buffer[128];
 | 
|---|
| 744 |                 int ret;
 | 
|---|
| 745 |                 do {
 | 
|---|
| 746 |                         // Wait for a Ctrl-D to close the server
 | 
|---|
| 747 |                         ret = read(STDIN_FILENO, buffer, 128);
 | 
|---|
| 748 |                         if(ret < 0) {
 | 
|---|
| 749 |                                 std::cerr << "main read error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
| 750 |                                 exit(EXIT_FAILURE);
 | 
|---|
| 751 |                         }
 | 
|---|
| 752 |                         else if(ret > 0) {
 | 
|---|
| 753 |                                 std::cout << "User inputed '";
 | 
|---|
| 754 |                                 std::cout.write(buffer, ret);
 | 
|---|
| 755 |                                 std::cout << "'" << std::endl;
 | 
|---|
| 756 |                         }
 | 
|---|
| 757 |                 } while(ret != 0);
 | 
|---|
| 758 | 
 | 
|---|
| 759 |                 std::cout << "Shutdown received" << std::endl;
 | 
|---|
| 760 |         }
 | 
|---|
| 761 | 
 | 
|---|
| 762 |         //===================
 | 
|---|
| 763 |         // Use eventfd_write to tell the threads we are closing
 | 
|---|
| 764 |         (std::cout << "Sending Shutdown to Threads... ").flush();
 | 
|---|
| 765 |         ret = eventfd_write(efd, nthreads);
 | 
|---|
| 766 |         if (ret < 0) {
 | 
|---|
| 767 |                 std::cerr << "eventfd close error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
| 768 |                 exit(EXIT_FAILURE);
 | 
|---|
| 769 |         }
 | 
|---|
| 770 |         std::cout << "done" << std::endl;
 | 
|---|
| 771 | 
 | 
|---|
| 772 |         //===================
 | 
|---|
| 773 |         // Join all the threads and close the rings
 | 
|---|
| 774 |         (std::cout << "Stopping Threads Done... ").flush();
 | 
|---|
| 775 |         for(unsigned i = 0; i < nthreads; i++) {
 | 
|---|
| 776 |                 void * retval;
 | 
|---|
| 777 |                 int ret = pthread_join(thrd_hdls[i], &retval);
 | 
|---|
| 778 |                 if (ret < 0) {
 | 
|---|
| 779 |                         std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
| 780 |                         exit(EXIT_FAILURE);
 | 
|---|
| 781 |                 }
 | 
|---|
| 782 | 
 | 
|---|
| 783 |                 io_uring_queue_exit(thrd_opts[i].ring);
 | 
|---|
| 784 |         }
 | 
|---|
| 785 |         std::cout << "done" << std::endl;
 | 
|---|
| 786 | 
 | 
|---|
| 787 |         //===================
 | 
|---|
| 788 |         // Close the sockets
 | 
|---|
| 789 |         (std::cout << "Closing Socket... ").flush();
 | 
|---|
| 790 |         ret = shutdown( server_fd, SHUT_RD );
 | 
|---|
| 791 |         if( ret < 0 ) {
 | 
|---|
| 792 |                 std::cerr << "shutdown socket error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
| 793 |                 exit(EXIT_FAILURE);
 | 
|---|
| 794 |         }
 | 
|---|
| 795 | 
 | 
|---|
| 796 |         ret = close(server_fd);
 | 
|---|
| 797 |         if (ret < 0) {
 | 
|---|
| 798 |                 std::cerr << "close socket error: (" << errno << ") " << strerror(errno) << std::endl;
 | 
|---|
| 799 |                 exit(EXIT_FAILURE);
 | 
|---|
| 800 |         }
 | 
|---|
| 801 |         std::cout << "done" << std::endl << std::endl;
 | 
|---|
| 802 | 
 | 
|---|
| 803 |         // Print stats and exit
 | 
|---|
| 804 |         std::cout << "Errors: " << global_stats.errors.conns << "c, (" << global_stats.errors.requests.pipes << "p, " << global_stats.errors.requests.reset << "r, " << global_stats.errors.requests.other << "o" << ")r, (" << global_stats.errors.answers.pipes << "p, " << global_stats.errors.answers.reset << "r, " << global_stats.errors.answers.other << "o" << ")a" << std::endl;
 | 
|---|
| 805 |         std::cout << "Completions: " << global_stats.completions.conns << "c, " << global_stats.completions.reads << "r, " << global_stats.completions.writes << "w" << std::endl;
 | 
|---|
| 806 |         std::cout << "Full Writes: " << global_stats.completions.full_writes << std::endl;
 | 
|---|
| 807 |         std::cout << "Max FD: " << max_fd << std::endl;
 | 
|---|
| 808 |         std::cout << "Successful connections: " << global_stats.conns.used << std::endl;
 | 
|---|
| 809 |         std::cout << "Max concurrent connections: " << global_stats.conns.max << std::endl;
 | 
|---|
| 810 |         std::cout << "Accepts on non-zeros: " << global_stats.recycle_errors << std::endl;
 | 
|---|
| 811 |         std::cout << "Leaked conn objects: " << global_stats.conns.current << std::endl;
 | 
|---|
| 812 | }
 | 
|---|
| 813 | 
 | 
|---|
| 814 | // compile-command: "g++ http_ring.cpp -std=c++2a -pthread -luring -O3" //
 | 
|---|