- Timestamp:
- Jan 28, 2021, 9:46:29 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 9715567
- Parents:
- c05c58f
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/http_ring.cpp
rc05c58f r761a246 9 9 #include <liburing.h> 10 10 11 typedef enum { 12 EVENT_END, 13 EVENT_ACCEPT, 14 EVENT_REQUEST, 15 EVENT_ANSWER 16 } event_t; 17 18 struct __attribute__((aligned(128))) request_t { 19 event_t type; 20 int fd; 21 size_t length; 22 char * buff; 23 char data[0]; 24 25 static struct request_t * create(event_t type, size_t extra) { 26 auto ret = (struct request_t *)malloc(sizeof(struct request_t) + extra); 27 ret->type = type; 28 ret->length = extra; 29 ret->buff = ret->data; 30 return ret; 31 } 32 33 static struct request_t * create(event_t type) { 34 return create(type, 0); 35 } 36 }; 37 11 // Options passed to each threads 38 12 struct __attribute__((aligned(128))) options_t { 13 // Data passed to accept 39 14 struct { 40 15 int sockfd; … … 44 19 } acpt; 45 20 21 // Termination notification 46 22 int endfd; 23 24 // The ring to use for io 47 25 struct io_uring * ring; 48 26 }; 27 28 //========================================================= 29 // General statistics 30 struct __attribute__((aligned(128))) stats_block_t { 49 31 struct { 50 size_t subs = 0; 51 size_t cnts = 0; 52 } result; 32 volatile size_t conns = 0; 33 volatile size_t reads = 0; 34 volatile size_t writes = 0; 35 volatile size_t full_writes = 0; 36 } completions; 37 38 struct { 39 volatile size_t conns = 0; 40 struct { 41 volatile size_t pipes = 0; 42 volatile size_t reset = 0; 43 volatile size_t other = 0; 44 } requests; 45 46 struct { 47 volatile size_t pipes = 0; 48 volatile size_t reset = 0; 49 volatile size_t other = 0; 50 } answers; 51 } errors; 52 53 struct { 54 volatile size_t current = 0; 55 volatile size_t max = 0; 56 volatile size_t used = 0; 57 } conns; 58 59 volatile size_t recycle_errors = 0; 53 60 }; 54 61 55 volatile size_t total = 0; 56 volatile size_t count = 0; 62 // Each thread gets its own block of stats 63 // and there is a global block for tallying at the end 64 thread_local stats_block_t stats; 65 stats_block_t global_stats; 66 67 // Get an array of current connections 68 // This is just for debugging, to make sure 69 // no two state-machines get the same fd 70 const size_t array_max = 25000; 71 class connection * volatile conns[array_max] = { 0 }; 72 73 // Max fd we've seen, keep track so it's convenient to adjust the array size after 74 volatile int max_fd = 0; 57 75 58 76 //========================================================= 77 // Some small wrappers for ring operations used outside the connection state machine 78 // get sqe + error handling 59 79 static struct io_uring_sqe * get_sqe(struct io_uring * ring) { 60 80 struct io_uring_sqe * sqe = io_uring_get_sqe(ring); … … 66 86 } 67 87 68 static void submit(struct io_uring * ) { 69 // io_uring_sqe_set_flags(sqe, IOSQE_ASYNC); 70 // io_uring_submit(ring); 71 } 72 73 //========================================================= 88 // read of the event fd is not done by a connection 89 // use nullptr as the user data 74 90 static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) { 75 91 struct io_uring_sqe * sqe = get_sqe(ring); 76 92 io_uring_prep_read(sqe, fd, buffer, len, 0); 77 io_uring_sqe_set_data(sqe, request_t::create(EVENT_END));78 submit(ring);93 io_uring_sqe_set_data(sqe, nullptr); 94 io_uring_submit(ring); 79 95 } 80 96 81 static void ring_accept(struct io_uring * ring, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {82 auto req = request_t::create(EVENT_ACCEPT);83 struct io_uring_sqe * sqe = get_sqe(ring);84 io_uring_prep_accept(sqe, sockfd, addr, addrlen, flags);85 io_uring_sqe_set_data(sqe, req);86 submit(ring);87 // std::cout << "Submitted accept: " << req << std::endl;88 }89 90 static void ring_request(struct io_uring * ring, int fd) {91 size_t size = 1024;92 auto req = request_t::create(EVENT_REQUEST, size);93 req->fd = fd;94 95 struct io_uring_sqe * sqe = get_sqe(ring);96 io_uring_prep_read(sqe, fd, req->buff, size, 0);97 io_uring_sqe_set_data(sqe, req);98 submit(ring);99 // std::cout << "Submitted request: " << req << " (" << (void*)req->buffer << ")"<<std::endl;100 }101 102 97 //========================================================= 98 // All answers are fixed and determined by the return code 103 99 enum HttpCode { 104 100 OK200 = 0, … … 112 108 }; 113 109 110 // Get a fix reply based on the return code 114 111 const char * http_msgs[] = { 115 "HTTP/1.1 200 OK\ nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: %zu \n\n%s",116 "HTTP/1.1 400 Bad Request\ nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",117 "HTTP/1.1 404 Not Found\ nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",118 "HTTP/1.1 405 Method Not Allowed\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",119 "HTTP/1.1 408 Request Timeout\ nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",120 "HTTP/1.1 413 Payload Too Large\ nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",121 "HTTP/1.1 414 URI Too Long\ nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",112 "HTTP/1.1 200 OK\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 15\r\nConnection: keep-alive\r\n\r\nHello, World!\r\n", 113 "HTTP/1.1 400 Bad Request\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", 114 "HTTP/1.1 404 Not Found\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", 115 "HTTP/1.1 405 Method Not \r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", 116 "HTTP/1.1 408 Request Timeout\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", 117 "HTTP/1.1 413 Payload Too Large\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", 118 "HTTP/1.1 414 URI Too Long\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", 122 119 }; 123 124 static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0]))); 125 126 const int http_codes[] = {127 200,128 400,129 404,130 405,131 408,132 413,133 414,120 static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0])) ); 121 122 // Pre-compute the length of these replys 123 const size_t http_lens[] = { 124 strlen(http_msgs[0]), 125 strlen(http_msgs[1]), 126 strlen(http_msgs[2]), 127 strlen(http_msgs[3]), 128 strlen(http_msgs[4]), 129 strlen(http_msgs[5]), 130 strlen(http_msgs[6]), 134 131 }; 135 136 static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0]))); 137 138 int code_val(HttpCode code) { 139 return http_codes[code]; 140 } 141 142 static void ring_answer(struct io_uring * ring, int fd, HttpCode code) { 143 size_t size = 256; 144 auto req = request_t::create(EVENT_ANSWER, size); 145 req->fd = fd; 146 147 const char * fmt = http_msgs[code]; 148 const char * date = ""; 149 size = snprintf(req->buff, size, fmt, date, size); 150 151 struct io_uring_sqe * sqe = get_sqe(ring); 152 io_uring_prep_write(sqe, fd, req->buff, size, 0); 153 io_uring_sqe_set_data(sqe, req); 154 submit(ring); 155 // std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl; 156 } 157 158 static void ring_answer(struct io_uring * ring, int fd, const std::string &) { 159 // size_t size = 256; 160 // auto req = request_t::create(EVENT_ANSWER, size); 161 // req->fd = fd; 162 163 // const char * fmt = http_msgs[OK200]; 164 // const char * date = ""; 165 // size_t len = snprintf(req->buffer, size, fmt, date, ans.size(), ans.c_str()); 166 // req->length = len; 167 168 // struct io_uring_sqe * sqe = get_sqe(ring); 169 // io_uring_prep_write(sqe, fd, req->buffer, len, 0); 170 // io_uring_sqe_set_data(sqe, req); 171 // submit(ring); 172 // std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl; 173 174 175 static const char* RESPONSE = "HTTP/1.1 200 OK\r\n" \ 176 "Content-Length: 15\r\n" \ 177 "Content-Type: text/html\r\n" \ 178 "Connection: keep-alive\r\n" \ 179 "Server: testserver\r\n" \ 180 "\r\n" \ 181 "Hello, World!\r\n"; 182 183 static const size_t RLEN = strlen(RESPONSE); 184 185 size_t size = 256; 186 auto req = request_t::create(EVENT_ANSWER, size); 187 req->fd = fd; 188 req->buff = (char*)RESPONSE; 189 req->length = RLEN; 190 191 // const char * fmt = http_msgs[OK200]; 192 // const char * date = ""; 193 // size_t len = snprintf(req->buffer, size, fmt, date, ans.size(), ans.c_str()); 194 // req->length = len; 195 196 struct io_uring_sqe * sqe = get_sqe(ring); 197 io_uring_prep_write(sqe, fd, RESPONSE, RLEN, 0); 198 io_uring_sqe_set_data(sqe, req); 199 submit(ring); 200 } 132 static_assert( KNOWN_CODES == (sizeof(http_lens) / sizeof(http_lens[0])) ); 201 133 202 134 //========================================================= 203 static void handle_new_conn(struct io_uring * ring, int fd) { 204 if( fd < 0 ) { 205 int err = -fd; 206 if( err == ECONNABORTED ) return; 207 std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl; 208 exit(EXIT_FAILURE); 209 } 210 211 ring_request(ring, fd); 212 } 213 214 static void handle_request(struct io_uring * ring, struct request_t * in, int res) { 215 if( res < 0 ) { 216 int err = -res; 217 switch(err) { 218 case EPIPE: 219 case ECONNRESET: 220 close(in->fd); 221 free(in); 135 // Finate state machine responsible for handling each connection 136 class __attribute__((aligned(128))) connection { 137 private: 138 // The state of the machine 139 enum { 140 ACCEPTING, // Accept sent waiting for connection 141 REQUESTING, // Waiting for new request 142 ANSWERING, // Either request received submitting answer or short answer sent, need to submit rest 143 } state; 144 145 // The file descriptor of the connection 146 int fd; 147 148 // request data 149 static const size_t buffer_size = 1024; // Size of the read buffer 150 const char * buffer; // Buffer into which requests are read 151 152 // send data 153 size_t to_send; // Data left to send 154 const char * iterator; // Pointer to rest of the message to send 155 156 // stats 157 // how many requests/answers were complete, that is, a valid cqe was obtained 158 struct { 159 size_t requests = 0; 160 size_t answers = 0; 161 } stats; 162 163 private: 164 connection() 165 : state(ACCEPTING) 166 , fd(0) 167 , buffer( new char[buffer_size]) 168 , iterator(nullptr) 169 { 170 ::stats.conns.max++; 171 ::stats.conns.current++; 172 } 173 174 ~connection() { 175 delete [] buffer; 176 ::stats.conns.current--; 177 } 178 179 // Close the current connection 180 void close(int err) { 181 // std::cout << "(" << this->stats.requests << "," << this->stats.answers << ", e" << err << ") "; 182 conns[fd] = nullptr; 183 184 if(fd != 0) { 185 ::close(fd); 186 } 187 delete this; 188 } 189 190 //-------------------------------------------------- 191 // Wrappers for submit so we can tweak it more easily 192 static void submit(struct io_uring * ring, struct io_uring_sqe * sqe, connection * conn) { 193 (void)ring; 194 // io_uring_sqe_set_flags(sqe, IOSQE_ASYNC); 195 io_uring_sqe_set_data(sqe, conn); 196 // io_uring_submit(ring); 197 } 198 199 void submit(struct io_uring * ring, struct io_uring_sqe * sqe) { 200 submit(ring, sqe, this); 201 } 202 203 //-------------------------------------------------- 204 // get a new request from the client 205 void request(struct io_uring * ring) { 206 state = REQUESTING; 207 struct io_uring_sqe * sqe = get_sqe(ring); 208 io_uring_prep_read(sqe, fd, (void*)buffer, buffer_size, 0); 209 submit(ring, sqe); 210 } 211 212 //-------------------------------------------------- 213 // Send a new answer based on a return code 214 void answer(struct io_uring * ring, HttpCode code) { 215 iterator = http_msgs[code]; 216 to_send = http_lens[code]; 217 if(to_send != 124) { 218 std::cerr << "Answer has weird size: " << to_send << " (" << (int)code << ")" << std::endl; 219 } 220 answer(ring); 221 } 222 223 // send a new answer to the client 224 // Reused for incomplete writes 225 void answer(struct io_uring * ring) { 226 state = ANSWERING; 227 struct io_uring_sqe * sqe = get_sqe(ring); 228 io_uring_prep_write(sqe, fd, iterator, to_send, 0); 229 submit(ring, sqe); 230 } 231 232 //-------------------------------------------------- 233 // Handle a new connection, results for getting an cqe while in the ACCEPTING state 234 void newconn(struct io_uring * ring, int ret) { 235 // Check errors 236 if( ret < 0 ) { 237 int err = -ret; 238 if( err == ECONNABORTED ) { 239 ::stats.errors.conns++; 240 this->close(err); 222 241 return; 223 default: 224 std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl; 225 exit(EXIT_FAILURE); 226 } 227 } 228 229 if(res == 0) { 230 close(in->fd); 231 free(in); 232 return; 233 } 234 235 const char * it = in->buff; 236 if( !strstr( it, "\r\n\r\n" ) ) { 237 std::cout << "Incomplete request" << std::endl; 238 close(in->fd); 239 free(in); 240 return; 241 } 242 243 it = in->buff; 244 const std::string reply = "Hello, World!\n"; 245 int ret = memcmp(it, "GET ", 4); 246 if( ret != 0 ) { 247 ring_answer(ring, in->fd, E400); 248 goto NEXT; 249 } 250 251 it += 4; 252 ret = memcmp(it, "/plaintext", 10); 253 if( ret != 0 ) { 254 ring_answer(ring, in->fd, E404); 255 goto NEXT; 256 } 257 258 ring_answer(ring, in->fd, reply); 259 260 NEXT: 261 ring_request(ring, in->fd); 262 free(in); 263 return; 264 } 265 266 static void handle_answer(struct io_uring * ring, struct request_t * in, int res) { 267 if( res < 0 ) { 268 int err = -res; 269 switch(err) { 270 case EPIPE: 271 case ECONNRESET: 272 close(in->fd); 273 free(in); 274 return; 275 default: 276 std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl; 277 exit(EXIT_FAILURE); 278 } 279 } 280 281 if( res >= in->length ) { 282 free(in); 283 return; 284 } 285 286 struct io_uring_sqe * sqe = get_sqe(ring); 287 io_uring_prep_write(sqe, in->fd, in->buff + res, in->length - res, 0); 288 io_uring_sqe_set_data(sqe, in); 289 submit(ring); 290 // std::cout << "Re-Submitted request: " << in << " (" << (void*)in->buffer << ")"<<std::endl; 291 292 ring_request(ring, in->fd); 293 } 242 } 243 std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl; 244 exit(EXIT_FAILURE); 245 } 246 247 // Count the connections 248 ::stats.completions.conns++; 249 250 // Read on the data 251 fd = ret; 252 request(ring); 253 254 // check the max fd so we know if we exceeded the array 255 for(;;) { 256 int expected = max_fd; 257 if(expected >= fd) return; 258 if( __atomic_compare_exchange_n(&max_fd, &expected, fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) return; 259 } 260 261 // check if we have enough space to fit inside the array 262 if(fd >= array_max) { 263 std::cerr << "accept error: fd " << fd << " is too high" << std::endl; 264 return; 265 } 266 267 // Put our connection into the global array 268 // No one else should be using it so if they are that's a bug 269 auto exist = __atomic_exchange_n( &conns[fd], this, __ATOMIC_SEQ_CST); 270 if( exist ) { 271 size_t first = __atomic_fetch_add(&global_stats.recycle_errors, 1, __ATOMIC_SEQ_CST); 272 if( first == 0 ) { 273 std::cerr << "First: accept has existing connection " << std::endl; 274 } 275 } 276 } 277 278 // Handle a new request, results for getting an cqe while in the REQUESTING state 279 void newrequest(struct io_uring * ring, int ret) { 280 // Check errors 281 if( ret < 0 ) { 282 int err = -ret; 283 switch(err) { 284 case EPIPE: 285 ::stats.errors.requests.pipes++; 286 break; 287 // Don't fall through the get better stats 288 case ECONNRESET: 289 ::stats.errors.requests.reset++; 290 break; 291 default: 292 ::stats.errors.requests.other++; 293 std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl; 294 exit(EXIT_FAILURE); 295 } 296 297 // Connection failed, close it 298 this->close(err); 299 return; 300 } 301 302 // Update stats 303 ::stats.completions.reads++; 304 305 // Is this an EOF 306 if(ret == 0) { 307 // Yes, close the connection 308 this->close(0); 309 return; 310 } 311 312 // Find the end of the request header 313 const char * it = buffer; 314 if( !strstr( it, "\r\n\r\n" ) ) { 315 // This state machine doesn't support incomplete reads 316 // Print them to output so it's clear there is an issue 317 std::cout << "Incomplete request" << std::endl; 318 this->close(EBADR); 319 return; 320 } 321 322 // Find the method to use 323 it = buffer; 324 int ret = memcmp(it, "GET ", 4); 325 if( ret != 0 ) { 326 // We only support get, answer with an error 327 answer(ring, E400); 328 return; 329 } 330 331 // Find the target 332 it += 4; 333 ret = memcmp(it, "/plaintext", 10); 334 if( ret != 0 ) { 335 // We only support /plaintext, answer with an error 336 answer(ring, E404); 337 return; 338 } 339 340 // Correct request, answer with the payload 341 this->stats.requests++; 342 answer(ring, OK200); 343 } 344 345 // Handle a partial or full answer sent, results for getting an cqe while in the ANSWERING state 346 void writedone(struct io_uring * ring, int res) { 347 // Check errors 348 if( res < 0 ) { 349 int err = -res; 350 switch(err) { 351 case EPIPE: 352 ::stats.errors.answers.pipes++; 353 break; 354 // Don't fall through the get better stats 355 case ECONNRESET: 356 ::stats.errors.answers.reset++; 357 break; 358 default: 359 ::stats.errors.answers.other++; 360 std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl; 361 exit(EXIT_FAILURE); 362 } 363 364 this->close(err); 365 return; 366 } 367 368 // Update stats 369 ::stats.completions.writes++; 370 if(res == 124) ::stats.completions.full_writes++; 371 372 // Is this write completed 373 if( res == to_send ) { 374 // Yes, more stats 375 stats.answers++; 376 if(stats.answers == 2) ::stats.conns.used++; 377 // Then read a new request 378 request(ring); 379 return; 380 } 381 382 // Not a completed read, push the rest 383 to_send -= res; 384 iterator += res; 385 answer(ring); 386 } 387 public: 388 // Submit a call to accept and create a new connection object 389 static void accept(struct io_uring * ring, const struct options_t & opt) { 390 struct io_uring_sqe * sqe = get_sqe(ring); 391 io_uring_prep_accept(sqe, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags); 392 submit(ring, sqe, new connection()); 393 // std::cout << "Submitted accept: " << req << std::endl; 394 } 395 396 // Handle a new cqe 397 void handle(struct io_uring * ring, int res, const struct options_t & opt) { 398 switch(state) { 399 case ACCEPTING: 400 connection::accept(ring, opt); 401 newconn(ring, res); 402 break; 403 case REQUESTING: 404 newrequest(ring, res); 405 break; 406 case ANSWERING: 407 writedone(ring, res); 408 break; 409 } 410 } 411 }; 294 412 295 413 //========================================================= 296 extern "C" { 297 extern int __io_uring_flush_sq(struct io_uring *ring); 298 } 299 414 // Main loop of the WebServer 415 // Effectively uses one thread_local copy of everything per kernel thread 300 416 void * proc_loop(void * arg) { 301 size_t count = 0;417 // Get the thread local argument 302 418 struct options_t & opt = *(struct options_t *)arg; 303 304 419 struct io_uring * ring = opt.ring; 305 420 421 // Track the shutdown using a event_fd 306 422 char endfd_buf[8]; 307 423 ring_end(ring, opt.endfd, endfd_buf, 8); 308 424 309 ring_accept(ring, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags); 310 311 int reset = 1; 312 bool done = false; 425 // Accept our first connection 426 // May not take effect until io_uring_submit_and_wait 427 connection::accept(ring, opt); 428 429 int reset = 1; // Counter to print stats once in a while 430 bool done = false; // Are we done 431 size_t sqes = 0; // Number of sqes we submitted 432 size_t call = 0; // Number of submits we made 313 433 while(!done) { 314 struct io_uring_cqe *cqe; 315 int ret; 316 while(-EAGAIN == (ret = io_uring_wait_cqe_nr(ring, &cqe, 0))) { 317 ret = io_uring_submit_and_wait(ring, 1); 318 if (ret < 0) { 319 fprintf( stderr, "io_uring get error: (%d) %s\n", (int)-ret, strerror(-ret) ); 320 exit(EXIT_FAILURE); 321 } 322 opt.result.subs += ret; 323 opt.result.cnts++; 324 } 325 326 if (ret < 0 && -EAGAIN != ret) { 327 fprintf( stderr, "io_uring peek error: (%d) %s\n", (int)-ret, strerror(-ret) ); 434 // Submit all the answers we have and wait for responses 435 int ret = io_uring_submit_and_wait(ring, 1); 436 437 // check errors 438 if (ret < 0) { 439 fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) ); 328 440 exit(EXIT_FAILURE); 329 441 } 330 442 331 auto req = (struct request_t *)cqe->user_data; 332 // std::cout << req << " completed with " << cqe->res << std::endl; 333 334 switch(req->type) { 335 case EVENT_END: 443 // Check how good we are at batching sqes 444 sqes += ret; 445 call++; 446 447 struct io_uring_cqe *cqe; 448 unsigned head; 449 unsigned count = 0; 450 451 // go through all cqes 452 io_uring_for_each_cqe(ring, head, cqe) { 453 if (0 == cqe->user_data) { 336 454 done = true; 337 455 break; 338 case EVENT_ACCEPT: 339 handle_new_conn(ring, cqe->res); 340 free(req); 341 ring_accept(ring, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags); 342 break; 343 case EVENT_REQUEST: 344 handle_request(ring, req, cqe->res); 345 break; 346 case EVENT_ANSWER: 347 handle_answer(ring, req, cqe->res); 348 break; 349 } 350 351 io_uring_cqe_seen(ring, cqe); 352 reset--; 353 if(reset == 0) { 354 size_t ltotal = opt.result.subs; 355 size_t lcount = opt.result.cnts; 356 357 std::cout << "Submit average: " << ltotal << "/" << lcount << "(" << (((double)ltotal) / lcount) << ")" << std::endl; 358 reset = 100000 + (100000 * (ring->ring_fd % 5)); 359 } 360 } 361 362 return (void*)count; 456 } 457 458 auto req = (class connection *)cqe->user_data; 459 req->handle( ring, cqe->res, opt ); 460 461 reset--; 462 if(reset == 0) { 463 std::cout << "Submit average: " << sqes << "/" << call << "(" << (((double)sqes) / call) << ")" << std::endl; 464 reset = 100000 + (100000 * (ring->ring_fd % 5)); 465 } 466 467 // Keep track of how many cqes we have seen 468 count++; 469 } 470 471 // Mark the cqes as seen 472 io_uring_cq_advance(ring, count); 473 } 474 475 // Tally all the thread local statistics 476 __atomic_fetch_add( &global_stats.completions.conns, ::stats.completions.conns, __ATOMIC_SEQ_CST ); 477 __atomic_fetch_add( &global_stats.completions.reads, ::stats.completions.reads, __ATOMIC_SEQ_CST ); 478 __atomic_fetch_add( &global_stats.completions.writes, ::stats.completions.writes, __ATOMIC_SEQ_CST ); 479 __atomic_fetch_add( &global_stats.completions.full_writes, ::stats.completions.full_writes, __ATOMIC_SEQ_CST ); 480 __atomic_fetch_add( &global_stats.errors.conns, ::stats.errors.conns, __ATOMIC_SEQ_CST ); 481 __atomic_fetch_add( &global_stats.errors.requests.pipes, ::stats.errors.requests.pipes, __ATOMIC_SEQ_CST ); 482 __atomic_fetch_add( &global_stats.errors.requests.reset, ::stats.errors.requests.reset, __ATOMIC_SEQ_CST ); 483 __atomic_fetch_add( &global_stats.errors.requests.other, ::stats.errors.requests.other, __ATOMIC_SEQ_CST ); 484 __atomic_fetch_add( &global_stats.errors.answers.pipes, ::stats.errors.answers.pipes, __ATOMIC_SEQ_CST ); 485 __atomic_fetch_add( &global_stats.errors.answers.reset, ::stats.errors.answers.reset, __ATOMIC_SEQ_CST ); 486 __atomic_fetch_add( &global_stats.errors.answers.other, ::stats.errors.answers.other, __ATOMIC_SEQ_CST ); 487 __atomic_fetch_add( &global_stats.conns.current, ::stats.conns.current, __ATOMIC_SEQ_CST ); 488 __atomic_fetch_add( &global_stats.conns.max, ::stats.conns.max, __ATOMIC_SEQ_CST ); 489 __atomic_fetch_add( &global_stats.conns.used, ::stats.conns.used, __ATOMIC_SEQ_CST ); 490 491 return nullptr; 363 492 } 364 493 365 494 //========================================================= 366 struct __attribute__((aligned(128))) aligned_ring { 367 struct io_uring storage; 368 }; 369 370 #include <bit> 371 372 #include <pthread.h> 495 #include <bit> // for ispow2 496 373 497 extern "C" { 374 #include <signal.h> 375 #include <sys/eventfd.h> 376 #include <sys/socket.h> 377 #include <netinet/in.h> 498 #include <pthread.h> // for pthreads 499 #include <signal.h> // for signal(SIGPIPE, SIG_IGN); 500 #include <sys/eventfd.h> // use for termination 501 #include <sys/socket.h> // for sockets in general 502 #include <netinet/in.h> // for sockaddr_in, AF_INET 378 503 } 379 504 380 505 int main(int argc, char * argv[]) { 506 // Initialize the array of connection-fd associations 507 for(int i = 0; i < array_max; i++) { 508 conns[i] = nullptr; 509 } 510 511 // Make sure we ignore all sigpipes 381 512 signal(SIGPIPE, SIG_IGN); 382 513 383 unsigned nthreads = 1; 384 unsigned port = 8800; 385 unsigned entries = 256; 386 unsigned backlog = 10; 387 bool attach = false; 388 bool sqpoll = false; 514 // Default command line arguments 515 unsigned nthreads = 1; // number of kernel threads 516 unsigned port = 8800; // which port to listen on 517 unsigned entries = 256; // number of entries per ring/kernel thread 518 unsigned backlog = 262144; // backlog argument to listen 519 bool attach = false; // Whether or not to attach all the rings 520 bool sqpoll = false; // Whether or not to use SQ Polling 389 521 390 522 //=================== 391 // Arguments 523 // Arguments Parsing 392 524 int c; 393 525 while ((c = getopt (argc, argv, "t:p:e:b:aS")) != -1) { … … 434 566 //=================== 435 567 // End FD 568 // Create a single event fd to notify the kernel threads when the server shutsdown 436 569 int efd = eventfd(0, EFD_SEMAPHORE); 437 570 if (efd < 0) { … … 442 575 //=================== 443 576 // Open Socket 577 // Listen on specified port 444 578 std::cout << getpid() << " : Listening on port " << port << std::endl; 445 579 int server_fd = socket(AF_INET, SOCK_STREAM, 0); … … 457 591 address.sin_port = htons( port ); 458 592 593 // In case the port is already in use, don't just return an error 594 // Linux is very slow at reclaiming port so just retry regularly 459 595 int waited = 0; 460 596 while(true) { … … 462 598 if(ret < 0) { 463 599 if(errno == EADDRINUSE) { 600 // Port is in used let's retry later 464 601 if(waited == 0) { 465 602 std::cerr << "Waiting for port" << std::endl; 466 603 } else { 604 // To be cure, print how long we have been waiting 467 605 std::cerr << "\r" << waited; 468 606 std::cerr.flush(); 469 607 } 470 608 waited ++; 471 usleep( 1000000 ); 609 usleep( 1000000 ); // Wait and retry 472 610 continue; 473 611 } 612 // Some other error occured, this is a real error 474 613 std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl; 475 614 exit(EXIT_FAILURE); … … 492 631 std::cout << std::endl; 493 632 633 // Create the desired number of kernel-threads and for each 634 // create a ring. Create the rings in the main so we can attach them 635 // Since the rings are all in a dense VLA, aligned them so we don't get false sharing 636 // it's unlikely but better safe than sorry 637 struct __attribute__((aligned(128))) aligned_ring { 638 struct io_uring storage; 639 }; 494 640 aligned_ring thrd_rings[nthreads]; 495 641 pthread_t thrd_hdls[nthreads]; 496 642 options_t thrd_opts[nthreads]; 643 bool no_drops = true; 497 644 bool fast_poll = true; 498 645 bool nfix_sqpl = true; 499 646 for(unsigned i = 0; i < nthreads; i++) { 500 647 struct io_uring_params p = { }; 501 if(sqpoll) { 648 649 if(sqpoll) { // If sqpoll is on, add the flag 502 650 p.flags |= IORING_SETUP_SQPOLL; 503 651 p.sq_thread_idle = 100; 504 652 } 505 653 506 if (attach && i != 0) { 654 if (attach && i != 0) { // If attach is on, add the flag, except for the first ring 507 655 p.flags |= IORING_SETUP_ATTACH_WQ; 508 656 p.wq_fd = thrd_rings[0].storage.ring_fd; 509 657 } 658 659 // Create the ring 510 660 io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p); 511 661 662 // Check if some of the note-worthy features are there 663 if(0 == (p.features & IORING_FEAT_NODROP )) { no_drops = false; } 512 664 if(0 == (p.features & IORING_FEAT_FAST_POLL )) { fast_poll = false; } 513 665 if(0 == (p.features & IORING_FEAT_SQPOLL_NONFIXED)) { nfix_sqpl = false; } 514 666 667 // Write the socket options we want to the options we pass to the threads 515 668 thrd_opts[i].acpt.sockfd = server_fd; 516 669 thrd_opts[i].acpt.addr = (struct sockaddr *)&address; … … 527 680 } 528 681 682 // Tell the user if the features are present 683 if( no_drops ) std::cout << "No Drop Present" << std::endl; 529 684 if( fast_poll) std::cout << "Fast Poll Present" << std::endl; 530 685 if(!nfix_sqpl) std::cout << "Non-Fixed SQ Poll not Present" << std::endl; … … 537 692 int ret; 538 693 do { 694 // Wait for a Ctrl-D to close the server 539 695 ret = read(STDIN_FILENO, buffer, 128); 540 696 if(ret < 0) { … … 553 709 554 710 //=================== 711 // Use eventfd_write to tell the threads we are closing 555 712 (std::cout << "Sending Shutdown to Threads... ").flush(); 556 713 ret = eventfd_write(efd, nthreads); … … 562 719 563 720 //=================== 721 // Join all the threads and close the rings 564 722 (std::cout << "Stopping Threads Done... ").flush(); 565 723 for(unsigned i = 0; i < nthreads; i++) { … … 576 734 577 735 //=================== 736 // Close the sockets 578 737 (std::cout << "Closing Socket... ").flush(); 579 738 ret = shutdown( server_fd, SHUT_RD ); … … 588 747 exit(EXIT_FAILURE); 589 748 } 590 std::cout << "done" << std::endl; 749 std::cout << "done" << std::endl << std::endl; 750 751 // Print stats and exit 752 std::cout << "Errors: " << global_stats.errors.conns << "c, (" << global_stats.errors.requests.pipes << "p, " << global_stats.errors.requests.reset << "r, " << global_stats.errors.requests.other << "o" << ")r, (" << global_stats.errors.answers.pipes << "p, " << global_stats.errors.answers.reset << "r, " << global_stats.errors.answers.other << "o" << ")a" << std::endl; 753 std::cout << "Completions: " << global_stats.completions.conns << "c, " << global_stats.completions.reads << "r, " << global_stats.completions.writes << "w" << std::endl; 754 std::cout << "Full Writes: " << global_stats.completions.full_writes << std::endl; 755 std::cout << "Max FD: " << max_fd << std::endl; 756 std::cout << "Successful connections: " << global_stats.conns.used << std::endl; 757 std::cout << "Accepts on non-zeros: " << global_stats.recycle_errors << std::endl; 758 std::cout << "Leaked conn objects: " << global_stats.conns.current << std::endl; 591 759 }
Note: See TracChangeset
for help on using the changeset viewer.