Changeset 3b402339 for benchmark/io/http
- Timestamp:
- Feb 2, 2021, 9:56:41 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 3d67f83
- Parents:
- 144fa5c (diff), 1c1c180 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - Location:
- benchmark/io/http
- Files:
-
- 2 added
- 1 edited
-
http_ring.cpp (modified) (16 diffs)
-
parhttperf (added)
-
parse-httperf.py (added)
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/http_ring.cpp
r144fa5c r3b402339 9 9 #include <liburing.h> 10 10 11 typedef enum { 12 EVENT_END, 13 EVENT_ACCEPT, 14 EVENT_REQUEST, 15 EVENT_ANSWER 16 } event_t; 17 18 struct __attribute__((aligned(128))) request_t { 19 event_t type; 20 int fd; 21 size_t length; 22 char * buff; 23 char data[0]; 24 25 static struct request_t * create(event_t type, size_t extra) { 26 auto ret = (struct request_t *)malloc(sizeof(struct request_t) + extra); 27 ret->type = type; 28 ret->length = extra; 29 ret->buff = ret->data; 30 return ret; 31 } 32 33 static struct request_t * create(event_t type) { 34 return create(type, 0); 35 } 36 }; 37 11 // #define NOBATCHING 12 // #define USE_ASYNC 13 14 // Options passed to each threads 38 15 struct __attribute__((aligned(128))) options_t { 16 // Data passed to accept 39 17 struct { 40 18 int sockfd; … … 44 22 } acpt; 45 23 24 // Termination notification 46 25 int endfd; 26 27 // The ring to use for io 47 28 struct io_uring * ring; 48 29 }; 30 31 //========================================================= 32 // General statistics 33 struct __attribute__((aligned(128))) stats_block_t { 49 34 struct { 50 size_t subs = 0; 51 size_t cnts = 0; 52 } result; 35 volatile size_t conns = 0; 36 volatile size_t reads = 0; 37 volatile size_t writes = 0; 38 volatile size_t full_writes = 0; 39 } completions; 40 41 struct { 42 volatile size_t conns = 0; 43 struct { 44 volatile size_t pipes = 0; 45 volatile size_t reset = 0; 46 volatile size_t other = 0; 47 } requests; 48 49 struct { 50 volatile size_t pipes = 0; 51 volatile size_t reset = 0; 52 volatile size_t other = 0; 53 } answers; 54 } errors; 55 56 struct { 57 volatile size_t current = 0; 58 volatile size_t max = 0; 59 volatile size_t used = 0; 60 } conns; 61 62 volatile size_t recycle_errors = 0; 53 63 }; 54 64 65 // Each thread gets its own block of stats 66 // and there is a global block for tallying at the end 67 thread_local stats_block_t stats; 68 stats_block_t global_stats; 69 70 // Get an array of current connections 71 // This is just for debugging, to make sure 72 // no two state-machines get the same fd 73 const size_t array_max = 25000; 74 class connection * volatile conns[array_max] = { 0 }; 75 76 // Max fd we've seen, keep track so it's convenient to adjust the array size after 77 volatile int max_fd = 0; 78 55 79 //========================================================= 80 // Some small wrappers for ring operations used outside the connection state machine 81 // get sqe + error handling 56 82 static struct io_uring_sqe * get_sqe(struct io_uring * ring) { 57 83 struct io_uring_sqe * sqe = io_uring_get_sqe(ring); … … 63 89 } 64 90 65 static void submit(struct io_uring * ) { 66 // io_uring_submit(ring); 67 } 68 69 //========================================================= 91 // read of the event fd is not done by a connection 92 // use nullptr as the user data 70 93 static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) { 71 94 struct io_uring_sqe * sqe = get_sqe(ring); 72 95 io_uring_prep_read(sqe, fd, buffer, len, 0); 73 io_uring_sqe_set_data(sqe, request_t::create(EVENT_END));74 submit(ring);96 io_uring_sqe_set_data(sqe, nullptr); 97 io_uring_submit(ring); 75 98 } 76 99 77 static void ring_accept(struct io_uring * ring, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {78 auto req = request_t::create(EVENT_ACCEPT);79 struct io_uring_sqe * sqe = get_sqe(ring);80 io_uring_prep_accept(sqe, sockfd, addr, addrlen, flags);81 io_uring_sqe_set_data(sqe, req);82 submit(ring);83 // std::cout << "Submitted accept: " << req << std::endl;84 }85 86 static void ring_request(struct io_uring * ring, int fd) {87 size_t size = 1024;88 auto req = request_t::create(EVENT_REQUEST, size);89 req->fd = fd;90 91 struct io_uring_sqe * sqe = get_sqe(ring);92 io_uring_prep_read(sqe, fd, req->buff, size, 0);93 io_uring_sqe_set_data(sqe, req);94 submit(ring);95 // std::cout << "Submitted request: " << req << " (" << (void*)req->buffer << ")"<<std::endl;96 }97 98 100 //========================================================= 101 // All answers are fixed and determined by the return code 99 102 enum HttpCode { 100 103 OK200 = 0, … … 108 111 }; 109 112 113 // Get a fix reply based on the return code 110 114 const char * http_msgs[] = { 111 "HTTP/1.1 200 OK\ nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: %zu \n\n%s",112 "HTTP/1.1 400 Bad Request\ nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",113 "HTTP/1.1 404 Not Found\ nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",114 "HTTP/1.1 405 Method Not Allowed\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",115 "HTTP/1.1 408 Request Timeout\ nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",116 "HTTP/1.1 413 Payload Too Large\ nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",117 "HTTP/1.1 414 URI Too Long\ nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",115 "HTTP/1.1 200 OK\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 15\r\nConnection: keep-alive\r\n\r\nHello, World!\r\n", 116 "HTTP/1.1 400 Bad Request\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", 117 "HTTP/1.1 404 Not Found\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", 118 "HTTP/1.1 405 Method Not \r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", 119 "HTTP/1.1 408 Request Timeout\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", 120 "HTTP/1.1 413 Payload Too Large\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", 121 "HTTP/1.1 414 URI Too Long\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", 118 122 }; 119 120 static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0]))); 121 122 const int http_codes[] = {123 200,124 400,125 404,126 405,127 408,128 413,129 414,123 static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0])) ); 124 125 // Pre-compute the length of these replys 126 const size_t http_lens[] = { 127 strlen(http_msgs[0]), 128 strlen(http_msgs[1]), 129 strlen(http_msgs[2]), 130 strlen(http_msgs[3]), 131 strlen(http_msgs[4]), 132 strlen(http_msgs[5]), 133 strlen(http_msgs[6]), 130 134 }; 131 132 static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0]))); 133 134 int code_val(HttpCode code) { 135 return http_codes[code]; 136 } 137 138 static void ring_answer(struct io_uring * ring, int fd, HttpCode code) { 139 size_t size = 256; 140 auto req = request_t::create(EVENT_ANSWER, size); 141 req->fd = fd; 142 143 const char * fmt = http_msgs[code]; 144 const char * date = ""; 145 size = snprintf(req->buff, size, fmt, date, size); 146 147 struct io_uring_sqe * sqe = get_sqe(ring); 148 io_uring_prep_write(sqe, fd, req->buff, size, 0); 149 io_uring_sqe_set_data(sqe, req); 150 submit(ring); 151 // std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl; 152 } 153 154 static void ring_answer(struct io_uring * ring, int fd, const std::string &) { 155 // size_t size = 256; 156 // auto req = request_t::create(EVENT_ANSWER, size); 157 // req->fd = fd; 158 159 // const char * fmt = http_msgs[OK200]; 160 // const char * date = ""; 161 // size_t len = snprintf(req->buffer, size, fmt, date, ans.size(), ans.c_str()); 162 // req->length = len; 163 164 // struct io_uring_sqe * sqe = get_sqe(ring); 165 // io_uring_prep_write(sqe, fd, req->buffer, len, 0); 166 // io_uring_sqe_set_data(sqe, req); 167 // submit(ring); 168 // std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl; 169 170 171 static const char* RESPONSE = "HTTP/1.1 200 OK\r\n" \ 172 "Content-Length: 15\r\n" \ 173 "Content-Type: text/html\r\n" \ 174 "Connection: keep-alive\r\n" \ 175 "Server: testserver\r\n" \ 176 "\r\n" \ 177 "Hello, World!\r\n"; 178 179 static const size_t RLEN = strlen(RESPONSE); 180 181 size_t size = 256; 182 auto req = request_t::create(EVENT_ANSWER, size); 183 req->fd = fd; 184 req->buff = (char*)RESPONSE; 185 req->length = RLEN; 186 187 // const char * fmt = http_msgs[OK200]; 188 // const char * date = ""; 189 // size_t len = snprintf(req->buffer, size, fmt, date, ans.size(), ans.c_str()); 190 // req->length = len; 191 192 struct io_uring_sqe * sqe = get_sqe(ring); 193 io_uring_prep_write(sqe, fd, RESPONSE, RLEN, 0); 194 io_uring_sqe_set_data(sqe, req); 195 submit(ring); 196 } 135 static_assert( KNOWN_CODES == (sizeof(http_lens) / sizeof(http_lens[0])) ); 197 136 198 137 //========================================================= 199 static void handle_new_conn(struct io_uring * ring, int fd) { 200 if( fd < 0 ) { 201 int err = -fd; 202 if( err == ECONNABORTED ) return; 203 std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl; 204 exit(EXIT_FAILURE); 205 } 206 207 ring_request(ring, fd); 208 } 209 210 static void handle_request(struct io_uring * ring, struct request_t * in, int res) { 211 if( res < 0 ) { 212 int err = -res; 213 switch(err) { 214 case EPIPE: 215 case ECONNRESET: 216 close(in->fd); 217 free(in); 138 // Finate state machine responsible for handling each connection 139 class __attribute__((aligned(128))) connection { 140 private: 141 // The state of the machine 142 enum { 143 ACCEPTING, // Accept sent waiting for connection 144 REQUESTING, // Waiting for new request 145 ANSWERING, // Either request received submitting answer or short answer sent, need to submit rest 146 } state; 147 148 // The file descriptor of the connection 149 int fd; 150 151 // request data 152 static const size_t buffer_size = 1024; // Size of the read buffer 153 const char * buffer; // Buffer into which requests are read 154 155 // send data 156 size_t to_send; // Data left to send 157 const char * iterator; // Pointer to rest of the message to send 158 159 // stats 160 // how many requests/answers were complete, that is, a valid cqe was obtained 161 struct { 162 size_t requests = 0; 163 size_t answers = 0; 164 } stats; 165 166 private: 167 connection() 168 : state(ACCEPTING) 169 , fd(0) 170 , buffer( new char[buffer_size]) 171 , iterator(nullptr) 172 {} 173 174 ~connection() { 175 delete [] buffer; 176 ::stats.conns.current--; 177 } 178 179 // Close the current connection 180 void close(int err) { 181 // std::cout << "(" << this->stats.requests << "," << this->stats.answers << ", e" << err << ") "; 182 conns[fd] = nullptr; 183 184 if(fd != 0) { 185 ::close(fd); 186 } 187 delete this; 188 } 189 190 //-------------------------------------------------- 191 // Wrappers for submit so we can tweak it more easily 192 static void submit(struct io_uring * ring, struct io_uring_sqe * sqe, connection * conn) { 193 (void)ring; 194 #ifdef USE_ASYNC 195 io_uring_sqe_set_flags(sqe, IOSQE_ASYNC); 196 #endif 197 io_uring_sqe_set_data(sqe, conn); 198 #ifdef NOBATCHING 199 io_uring_submit(ring); 200 #endif 201 } 202 203 void submit(struct io_uring * ring, struct io_uring_sqe * sqe) { 204 submit(ring, sqe, this); 205 } 206 207 //-------------------------------------------------- 208 // get a new request from the client 209 void request(struct io_uring * ring) { 210 state = REQUESTING; 211 struct io_uring_sqe * sqe = get_sqe(ring); 212 io_uring_prep_recv(sqe, fd, (void*)buffer, buffer_size, 0); 213 submit(ring, sqe); 214 } 215 216 //-------------------------------------------------- 217 // Send a new answer based on a return code 218 void answer(struct io_uring * ring, HttpCode code) { 219 iterator = http_msgs[code]; 220 to_send = http_lens[code]; 221 if(to_send != 124) { 222 std::cerr << "Answer has weird size: " << to_send << " (" << (int)code << ")" << std::endl; 223 } 224 answer(ring); 225 } 226 227 // send a new answer to the client 228 // Reused for incomplete writes 229 void answer(struct io_uring * ring) { 230 state = ANSWERING; 231 struct io_uring_sqe * sqe = get_sqe(ring); 232 io_uring_prep_send(sqe, fd, iterator, to_send, 0); 233 submit(ring, sqe); 234 } 235 236 //-------------------------------------------------- 237 // Handle a new connection, results for getting an cqe while in the ACCEPTING state 238 void newconn(struct io_uring * ring, int ret) { 239 // Check errors 240 if( ret < 0 ) { 241 int err = -ret; 242 if( err == ECONNABORTED ) { 243 ::stats.errors.conns++; 244 this->close(err); 218 245 return; 219 default: 220 std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl; 221 exit(EXIT_FAILURE); 222 } 223 } 224 225 if(res == 0) { 226 close(in->fd); 227 free(in); 228 return; 229 } 230 231 const char * it = in->buff; 232 if( !strstr( it, "\r\n\r\n" ) ) { 233 std::cout << "Incomplete request" << std::endl; 234 close(in->fd); 235 free(in); 236 return; 237 } 238 239 it = in->buff; 240 const std::string reply = "Hello, World!\n"; 241 int ret = memcmp(it, "GET ", 4); 242 if( ret != 0 ) { 243 ring_answer(ring, in->fd, E400); 244 goto NEXT; 245 } 246 247 it += 4; 248 ret = memcmp(it, "/plaintext", 10); 249 if( ret != 0 ) { 250 ring_answer(ring, in->fd, E404); 251 goto NEXT; 252 } 253 254 ring_answer(ring, in->fd, reply); 255 256 NEXT: 257 ring_request(ring, in->fd); 258 return; 259 } 260 261 static void handle_answer(struct io_uring * ring, struct request_t * in, int res) { 262 if( res < 0 ) { 263 int err = -res; 264 switch(err) { 265 case EPIPE: 266 case ECONNRESET: 267 close(in->fd); 268 free(in); 269 return; 270 default: 271 std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl; 272 exit(EXIT_FAILURE); 273 } 274 } 275 276 if( res >= in->length ) { 277 free(in); 278 return; 279 } 280 281 struct io_uring_sqe * sqe = get_sqe(ring); 282 io_uring_prep_write(sqe, in->fd, in->buff + res, in->length - res, 0); 283 io_uring_sqe_set_data(sqe, in); 284 submit(ring); 285 // std::cout << "Re-Submitted request: " << in << " (" << (void*)in->buffer << ")"<<std::endl; 286 287 ring_request(ring, in->fd); 288 } 246 } 247 std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl; 248 exit(EXIT_FAILURE); 249 } 250 251 // Count the connections 252 ::stats.completions.conns++; 253 ::stats.conns.current++; 254 if(::stats.conns.current > ::stats.conns.max) { 255 ::stats.conns.max = ::stats.conns.current; 256 } 257 258 // Read on the data 259 fd = ret; 260 request(ring); 261 262 // check the max fd so we know if we exceeded the array 263 for(;;) { 264 int expected = max_fd; 265 if(expected >= fd) return; 266 if( __atomic_compare_exchange_n(&max_fd, &expected, fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) return; 267 } 268 269 // check if we have enough space to fit inside the array 270 if(fd >= array_max) { 271 std::cerr << "accept error: fd " << fd << " is too high" << std::endl; 272 return; 273 } 274 275 // Put our connection into the global array 276 // No one else should be using it so if they are that's a bug 277 auto exist = __atomic_exchange_n( &conns[fd], this, __ATOMIC_SEQ_CST); 278 if( exist ) { 279 size_t first = __atomic_fetch_add(&global_stats.recycle_errors, 1, __ATOMIC_SEQ_CST); 280 if( first == 0 ) { 281 std::cerr << "First: accept has existing connection " << std::endl; 282 } 283 } 284 } 285 286 // Handle a new request, results for getting an cqe while in the REQUESTING state 287 void newrequest(struct io_uring * ring, int res) { 288 // Check errors 289 if( res < 0 ) { 290 int err = -res; 291 switch(err) { 292 case EPIPE: 293 ::stats.errors.requests.pipes++; 294 break; 295 // Don't fall through the get better stats 296 case ECONNRESET: 297 ::stats.errors.requests.reset++; 298 break; 299 default: 300 ::stats.errors.requests.other++; 301 std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl; 302 exit(EXIT_FAILURE); 303 } 304 305 // Connection failed, close it 306 this->close(err); 307 return; 308 } 309 310 // Update stats 311 ::stats.completions.reads++; 312 313 // Is this an EOF 314 if(res == 0) { 315 // Yes, close the connection 316 this->close(0); 317 return; 318 } 319 320 // Find the end of the request header 321 const char * it = buffer; 322 if( !strstr( it, "\r\n\r\n" ) ) { 323 // This state machine doesn't support incomplete reads 324 // Print them to output so it's clear there is an issue 325 std::cout << "Incomplete request" << std::endl; 326 this->close(EBADR); 327 return; 328 } 329 330 // Find the method to use 331 it = buffer; 332 int ret = memcmp(it, "GET ", 4); 333 if( ret != 0 ) { 334 // We only support get, answer with an error 335 answer(ring, E400); 336 return; 337 } 338 339 // Find the target 340 it += 4; 341 ret = memcmp(it, "/plaintext", 10); 342 if( ret != 0 ) { 343 // We only support /plaintext, answer with an error 344 answer(ring, E404); 345 return; 346 } 347 348 // Correct request, answer with the payload 349 this->stats.requests++; 350 answer(ring, OK200); 351 } 352 353 // Handle a partial or full answer sent, results for getting an cqe while in the ANSWERING state 354 void writedone(struct io_uring * ring, int res) { 355 // Check errors 356 if( res < 0 ) { 357 int err = -res; 358 switch(err) { 359 case EPIPE: 360 ::stats.errors.answers.pipes++; 361 break; 362 // Don't fall through the get better stats 363 case ECONNRESET: 364 ::stats.errors.answers.reset++; 365 break; 366 default: 367 ::stats.errors.answers.other++; 368 std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl; 369 exit(EXIT_FAILURE); 370 } 371 372 this->close(err); 373 return; 374 } 375 376 // Update stats 377 ::stats.completions.writes++; 378 if(res == 124) ::stats.completions.full_writes++; 379 380 // Is this write completed 381 if( res == to_send ) { 382 // Yes, more stats 383 this->stats.answers++; 384 if(this->stats.answers == 1) ::stats.conns.used++; 385 // Then read a new request 386 request(ring); 387 return; 388 } 389 390 // Not a completed read, push the rest 391 to_send -= res; 392 iterator += res; 393 answer(ring); 394 } 395 public: 396 // Submit a call to accept and create a new connection object 397 static void accept(struct io_uring * ring, const struct options_t & opt) { 398 struct io_uring_sqe * sqe = get_sqe(ring); 399 io_uring_prep_accept(sqe, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags); 400 submit(ring, sqe, new connection()); 401 // std::cout << "Submitted accept: " << req << std::endl; 402 } 403 404 // Handle a new cqe 405 void handle(struct io_uring * ring, int res, const struct options_t & opt) { 406 switch(state) { 407 case ACCEPTING: 408 connection::accept(ring, opt); 409 newconn(ring, res); 410 break; 411 case REQUESTING: 412 newrequest(ring, res); 413 break; 414 case ANSWERING: 415 writedone(ring, res); 416 break; 417 } 418 } 419 }; 289 420 290 421 //========================================================= 291 extern "C" { 292 extern int __io_uring_flush_sq(struct io_uring *ring); 293 } 294 422 // Main loop of the WebServer 423 // Effectively uses one thread_local copy of everything per kernel thread 295 424 void * proc_loop(void * arg) { 296 size_t count = 0;425 // Get the thread local argument 297 426 struct options_t & opt = *(struct options_t *)arg; 298 299 427 struct io_uring * ring = opt.ring; 300 428 429 // Track the shutdown using a event_fd 301 430 char endfd_buf[8]; 302 431 ring_end(ring, opt.endfd, endfd_buf, 8); 303 432 304 ring_accept(ring, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags); 305 306 bool done = false; 433 // Accept our first connection 434 // May not take effect until io_uring_submit_and_wait 435 connection::accept(ring, opt); 436 437 int reset = 1; // Counter to print stats once in a while 438 bool done = false; // Are we done 439 size_t sqes = 0; // Number of sqes we submitted 440 size_t call = 0; // Number of submits we made 307 441 while(!done) { 308 struct io_uring_cqe *cqe; 309 int ret; 310 while(-EAGAIN == (ret = io_uring_wait_cqe_nr(ring, &cqe, 0))) { 311 ret = io_uring_submit_and_wait(ring, 1); 312 if (ret < 0) { 313 fprintf( stderr, "io_uring get error: (%d) %s\n", (int)-ret, strerror(-ret) ); 314 exit(EXIT_FAILURE); 315 } 316 opt.result.subs += ret; 317 opt.result.cnts++; 318 } 319 320 if (ret < 0 && -EAGAIN != ret) { 321 fprintf( stderr, "io_uring peek error: (%d) %s\n", (int)-ret, strerror(-ret) ); 442 // Submit all the answers we have and wait for responses 443 int ret = io_uring_submit_and_wait(ring, 1); 444 445 // check errors 446 if (ret < 0) { 447 fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) ); 322 448 exit(EXIT_FAILURE); 323 449 } 324 450 325 auto req = (struct request_t *)cqe->user_data; 326 // std::cout << req << " completed with " << cqe->res << std::endl; 327 328 switch(req->type) { 329 case EVENT_END: 451 // Check how good we are at batching sqes 452 sqes += ret; 453 call++; 454 455 struct io_uring_cqe *cqe; 456 unsigned head; 457 unsigned count = 0; 458 459 // go through all cqes 460 io_uring_for_each_cqe(ring, head, cqe) { 461 if (0 == cqe->user_data) { 330 462 done = true; 331 463 break; 332 case EVENT_ACCEPT: 333 handle_new_conn(ring, cqe->res); 334 free(req); 335 ring_accept(ring, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags); 336 break; 337 case EVENT_REQUEST: 338 handle_request(ring, req, cqe->res); 339 break; 340 case EVENT_ANSWER: 341 handle_answer(ring, req, cqe->res); 342 break; 343 } 344 345 io_uring_cqe_seen(ring, cqe); 346 } 347 348 return (void*)count; 464 } 465 466 auto req = (class connection *)cqe->user_data; 467 req->handle( ring, cqe->res, opt ); 468 469 // Every now and then, print some stats 470 reset--; 471 if(reset == 0) { 472 std::cout << "Submit average: " << sqes << "/" << call << "(" << (((double)sqes) / call) << ")" << std::endl; 473 // Reset to some random number of completions 474 // use the ring_fd in the number of threads don't all print at once 475 reset = 100000 + (100000 * (ring->ring_fd % 5)); 476 } 477 478 // Keep track of how many cqes we have seen 479 count++; 480 } 481 482 // Mark the cqes as seen 483 io_uring_cq_advance(ring, count); 484 } 485 486 // Tally all the thread local statistics 487 __atomic_fetch_add( &global_stats.completions.conns, ::stats.completions.conns, __ATOMIC_SEQ_CST ); 488 __atomic_fetch_add( &global_stats.completions.reads, ::stats.completions.reads, __ATOMIC_SEQ_CST ); 489 __atomic_fetch_add( &global_stats.completions.writes, ::stats.completions.writes, __ATOMIC_SEQ_CST ); 490 __atomic_fetch_add( &global_stats.completions.full_writes, ::stats.completions.full_writes, __ATOMIC_SEQ_CST ); 491 __atomic_fetch_add( &global_stats.errors.conns, ::stats.errors.conns, __ATOMIC_SEQ_CST ); 492 __atomic_fetch_add( &global_stats.errors.requests.pipes, ::stats.errors.requests.pipes, __ATOMIC_SEQ_CST ); 493 __atomic_fetch_add( &global_stats.errors.requests.reset, ::stats.errors.requests.reset, __ATOMIC_SEQ_CST ); 494 __atomic_fetch_add( &global_stats.errors.requests.other, ::stats.errors.requests.other, __ATOMIC_SEQ_CST ); 495 __atomic_fetch_add( &global_stats.errors.answers.pipes, ::stats.errors.answers.pipes, __ATOMIC_SEQ_CST ); 496 __atomic_fetch_add( &global_stats.errors.answers.reset, ::stats.errors.answers.reset, __ATOMIC_SEQ_CST ); 497 __atomic_fetch_add( &global_stats.errors.answers.other, ::stats.errors.answers.other, __ATOMIC_SEQ_CST ); 498 __atomic_fetch_add( &global_stats.conns.current, ::stats.conns.current, __ATOMIC_SEQ_CST ); 499 __atomic_fetch_add( &global_stats.conns.max, ::stats.conns.max, __ATOMIC_SEQ_CST ); 500 __atomic_fetch_add( &global_stats.conns.used, ::stats.conns.used, __ATOMIC_SEQ_CST ); 501 502 return nullptr; 349 503 } 350 504 351 505 //========================================================= 352 struct __attribute__((aligned(128))) aligned_ring { 353 struct io_uring storage; 354 }; 355 356 #include <bit> 357 358 #include <pthread.h> 506 #include <bit> // for ispow2 507 359 508 extern "C" { 360 #include <signal.h> 361 #include <sys/eventfd.h> 362 #include <sys/socket.h> 363 #include <netinet/in.h> 509 #include <pthread.h> // for pthreads 510 #include <signal.h> // for signal(SIGPIPE, SIG_IGN); 511 #include <sys/eventfd.h> // use for termination 512 #include <sys/socket.h> // for sockets in general 513 #include <netinet/in.h> // for sockaddr_in, AF_INET 364 514 } 365 515 366 516 int main(int argc, char * argv[]) { 517 // Initialize the array of connection-fd associations 518 for(int i = 0; i < array_max; i++) { 519 conns[i] = nullptr; 520 } 521 522 // Make sure we ignore all sigpipes 367 523 signal(SIGPIPE, SIG_IGN); 368 524 369 unsigned nthreads = 1; 370 unsigned port = 8800; 371 unsigned entries = 256; 372 unsigned backlog = 10; 373 bool attach = false; 525 // Default command line arguments 526 unsigned nthreads = 1; // number of kernel threads 527 unsigned port = 8800; // which port to listen on 528 unsigned entries = 256; // number of entries per ring/kernel thread 529 unsigned backlog = 262144; // backlog argument to listen 530 bool attach = false; // Whether or not to attach all the rings 531 bool sqpoll = false; // Whether or not to use SQ Polling 374 532 375 533 //=================== 376 // Arguments 534 // Arguments Parsing 377 535 int c; 378 while ((c = getopt (argc, argv, "t:p:e:b:a ")) != -1) {536 while ((c = getopt (argc, argv, "t:p:e:b:aS")) != -1) { 379 537 switch (c) 380 538 { … … 394 552 attach = true; 395 553 break; 554 case 'S': 555 sqpoll = true; 556 break; 396 557 case '?': 397 558 default: 398 std::cerr << "Usage: -t <threads> -p <port> -e <entries> -b <backlog> -a " << std::endl;559 std::cerr << "Usage: -t <threads> -p <port> -e <entries> -b <backlog> -aS" << std::endl; 399 560 return EXIT_FAILURE; 400 561 } … … 416 577 //=================== 417 578 // End FD 579 // Create a single event fd to notify the kernel threads when the server shutsdown 418 580 int efd = eventfd(0, EFD_SEMAPHORE); 419 581 if (efd < 0) { … … 424 586 //=================== 425 587 // Open Socket 588 // Listen on specified port 426 589 std::cout << getpid() << " : Listening on port " << port << std::endl; 427 590 int server_fd = socket(AF_INET, SOCK_STREAM, 0); … … 439 602 address.sin_port = htons( port ); 440 603 604 // In case the port is already in use, don't just return an error 605 // Linux is very slow at reclaiming port so just retry regularly 441 606 int waited = 0; 442 607 while(true) { … … 444 609 if(ret < 0) { 445 610 if(errno == EADDRINUSE) { 611 // Port is in used let's retry later 446 612 if(waited == 0) { 447 613 std::cerr << "Waiting for port" << std::endl; 448 614 } else { 615 // To be cure, print how long we have been waiting 449 616 std::cerr << "\r" << waited; 450 617 std::cerr.flush(); 451 618 } 452 619 waited ++; 453 usleep( 1000000 ); 620 usleep( 1000000 ); // Wait and retry 454 621 continue; 455 622 } 623 // Some other error occured, this is a real error 456 624 std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl; 457 625 exit(EXIT_FAILURE); … … 474 642 std::cout << std::endl; 475 643 644 // Create the desired number of kernel-threads and for each 645 // create a ring. Create the rings in the main so we can attach them 646 // Since the rings are all in a dense VLA, aligned them so we don't get false sharing 647 // it's unlikely but better safe than sorry 648 struct __attribute__((aligned(128))) aligned_ring { 649 struct io_uring storage; 650 }; 476 651 aligned_ring thrd_rings[nthreads]; 477 652 pthread_t thrd_hdls[nthreads]; 478 653 options_t thrd_opts[nthreads]; 654 bool no_drops = true; 655 bool fast_poll = true; 656 bool nfix_sqpl = true; 479 657 for(unsigned i = 0; i < nthreads; i++) { 480 if(!attach || i == 0) { 481 io_uring_queue_init(entries, &thrd_rings[i].storage, 0); 482 } 483 else { 484 struct io_uring_params p; 485 memset(&p, 0, sizeof(p)); 486 p.flags = IORING_SETUP_ATTACH_WQ; 658 struct io_uring_params p = { }; 659 660 if(sqpoll) { // If sqpoll is on, add the flag 661 p.flags |= IORING_SETUP_SQPOLL; 662 p.sq_thread_idle = 100; 663 } 664 665 if (attach && i != 0) { // If attach is on, add the flag, except for the first ring 666 p.flags |= IORING_SETUP_ATTACH_WQ; 487 667 p.wq_fd = thrd_rings[0].storage.ring_fd; 488 io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p); 489 } 490 668 } 669 670 // Create the ring 671 io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p); 672 673 // Check if some of the note-worthy features are there 674 if(0 == (p.features & IORING_FEAT_NODROP )) { no_drops = false; } 675 if(0 == (p.features & IORING_FEAT_FAST_POLL )) { fast_poll = false; } 676 if(0 == (p.features & IORING_FEAT_SQPOLL_NONFIXED)) { nfix_sqpl = false; } 677 678 // Write the socket options we want to the options we pass to the threads 491 679 thrd_opts[i].acpt.sockfd = server_fd; 492 680 thrd_opts[i].acpt.addr = (struct sockaddr *)&address; … … 502 690 } 503 691 } 692 693 // Tell the user if the features are present 694 if( no_drops ) std::cout << "No Drop Present" << std::endl; 695 if( fast_poll) std::cout << "Fast Poll Present" << std::endl; 696 if(!nfix_sqpl) std::cout << "Non-Fixed SQ Poll not Present" << std::endl; 504 697 505 698 //=================== … … 510 703 int ret; 511 704 do { 705 // Wait for a Ctrl-D to close the server 512 706 ret = read(STDIN_FILENO, buffer, 128); 513 707 if(ret < 0) { … … 526 720 527 721 //=================== 722 // Use eventfd_write to tell the threads we are closing 528 723 (std::cout << "Sending Shutdown to Threads... ").flush(); 529 724 ret = eventfd_write(efd, nthreads); … … 535 730 536 731 //=================== 732 // Join all the threads and close the rings 537 733 (std::cout << "Stopping Threads Done... ").flush(); 538 size_t total = 0;539 size_t count = 0;540 734 for(unsigned i = 0; i < nthreads; i++) { 541 735 void * retval; … … 545 739 exit(EXIT_FAILURE); 546 740 } 547 // total += (size_t)retval;548 total += thrd_opts[i].result.subs;549 count += thrd_opts[i].result.cnts;550 741 551 742 io_uring_queue_exit(thrd_opts[i].ring); 552 743 } 553 744 std::cout << "done" << std::endl; 554 std::cout << "Submit average: " << total << "/" << count << "(" << (((double)total) / count) << ")" << std::endl;555 745 556 746 //=================== 747 // Close the sockets 557 748 (std::cout << "Closing Socket... ").flush(); 558 749 ret = shutdown( server_fd, SHUT_RD ); … … 567 758 exit(EXIT_FAILURE); 568 759 } 569 std::cout << "done" << std::endl; 760 std::cout << "done" << std::endl << std::endl; 761 762 // Print stats and exit 763 std::cout << "Errors: " << global_stats.errors.conns << "c, (" << global_stats.errors.requests.pipes << "p, " << global_stats.errors.requests.reset << "r, " << global_stats.errors.requests.other << "o" << ")r, (" << global_stats.errors.answers.pipes << "p, " << global_stats.errors.answers.reset << "r, " << global_stats.errors.answers.other << "o" << ")a" << std::endl; 764 std::cout << "Completions: " << global_stats.completions.conns << "c, " << global_stats.completions.reads << "r, " << global_stats.completions.writes << "w" << std::endl; 765 std::cout << "Full Writes: " << global_stats.completions.full_writes << std::endl; 766 std::cout << "Max FD: " << max_fd << std::endl; 767 std::cout << "Successful connections: " << global_stats.conns.used << std::endl; 768 std::cout << "Max concurrent connections: " << global_stats.conns.max << std::endl; 769 std::cout << "Accepts on non-zeros: " << global_stats.recycle_errors << std::endl; 770 std::cout << "Leaked conn objects: " << global_stats.conns.current << std::endl; 570 771 } 772 773 // compile-command: "g++ http_ring.cpp -std=c++2a -pthread -luring -O3" //
Note:
See TracChangeset
for help on using the changeset viewer.