#include #include #include #include #include #include #include // #define NOBATCHING // #define USE_ASYNC // Options passed to each threads struct __attribute__((aligned(128))) options_t { // Data passed to accept struct { int sockfd; struct sockaddr *addr; socklen_t *addrlen; int flags; } acpt; // Termination notification int endfd; // The ring to use for io struct io_uring * ring; }; //========================================================= // General statistics struct __attribute__((aligned(128))) stats_block_t { struct { volatile size_t conns = 0; volatile size_t reads = 0; volatile size_t writes = 0; volatile size_t full_writes = 0; } completions; struct { volatile size_t conns = 0; struct { volatile size_t pipes = 0; volatile size_t reset = 0; volatile size_t other = 0; } requests; struct { volatile size_t pipes = 0; volatile size_t reset = 0; volatile size_t other = 0; } answers; } errors; struct { volatile size_t current = 0; volatile size_t max = 0; volatile size_t used = 0; } conns; volatile size_t recycle_errors = 0; }; // Each thread gets its own block of stats // and there is a global block for tallying at the end thread_local stats_block_t stats; stats_block_t global_stats; // Get an array of current connections // This is just for debugging, to make sure // no two state-machines get the same fd const size_t array_max = 25000; class connection * volatile conns[array_max] = { 0 }; // Max fd we've seen, keep track so it's convenient to adjust the array size after volatile int max_fd = 0; //========================================================= // Some small wrappers for ring operations used outside the connection state machine // get sqe + error handling static struct io_uring_sqe * get_sqe(struct io_uring * ring) { struct io_uring_sqe * sqe = io_uring_get_sqe(ring); if(!sqe) { std::cerr << "Insufficient entries in ring" << std::endl; exit(EXIT_FAILURE); } return sqe; } // read of the event fd is not done by a connection // use nullptr as the user data static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) { struct io_uring_sqe * sqe = get_sqe(ring); io_uring_prep_read(sqe, fd, buffer, len, 0); io_uring_sqe_set_data(sqe, nullptr); io_uring_submit(ring); } //========================================================= // All answers are fixed and determined by the return code enum HttpCode { OK200 = 0, E400, E404, E405, E408, E413, E414, KNOWN_CODES }; // Get a fix reply based on the return code const char * http_msgs[] = { "HTTP/1.1 200 OK\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 15\r\nConnection: keep-alive\r\n\r\nHello, World!\r\n", "HTTP/1.1 400 Bad Request\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", "HTTP/1.1 404 Not Found\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", "HTTP/1.1 405 Method Not \r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", "HTTP/1.1 408 Request Timeout\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", "HTTP/1.1 413 Payload Too Large\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", "HTTP/1.1 414 URI Too Long\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", }; static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0])) ); // Pre-compute the length of these replys const size_t http_lens[] = { strlen(http_msgs[0]), strlen(http_msgs[1]), strlen(http_msgs[2]), strlen(http_msgs[3]), strlen(http_msgs[4]), strlen(http_msgs[5]), strlen(http_msgs[6]), }; static_assert( KNOWN_CODES == (sizeof(http_lens) / sizeof(http_lens[0])) ); //========================================================= // Finate state machine responsible for handling each connection class __attribute__((aligned(128))) connection { private: // The state of the machine enum { ACCEPTING, // Accept sent waiting for connection REQUESTING, // Waiting for new request ANSWERING, // Either request received submitting answer or short answer sent, need to submit rest } state; // The file descriptor of the connection int fd; // request data static const size_t buffer_size = 1024; // Size of the read buffer const char * buffer; // Buffer into which requests are read // send data size_t to_send; // Data left to send const char * iterator; // Pointer to rest of the message to send // stats // how many requests/answers were complete, that is, a valid cqe was obtained struct { size_t requests = 0; size_t answers = 0; } stats; private: connection() : state(ACCEPTING) , fd(0) , buffer( new char[buffer_size]) , iterator(nullptr) {} ~connection() { delete [] buffer; ::stats.conns.current--; } // Close the current connection void close(int err) { // std::cout << "(" << this->stats.requests << "," << this->stats.answers << ", e" << err << ") "; conns[fd] = nullptr; if(fd != 0) { ::close(fd); } delete this; } //-------------------------------------------------- // Wrappers for submit so we can tweak it more easily static void submit(struct io_uring * ring, struct io_uring_sqe * sqe, connection * conn) { (void)ring; #ifdef USE_ASYNC io_uring_sqe_set_flags(sqe, IOSQE_ASYNC); #endif io_uring_sqe_set_data(sqe, conn); #ifdef NOBATCHING io_uring_submit(ring); #endif } void submit(struct io_uring * ring, struct io_uring_sqe * sqe) { submit(ring, sqe, this); } //-------------------------------------------------- // get a new request from the client void request(struct io_uring * ring) { state = REQUESTING; struct io_uring_sqe * sqe = get_sqe(ring); io_uring_prep_recv(sqe, fd, (void*)buffer, buffer_size, 0); submit(ring, sqe); } //-------------------------------------------------- // Send a new answer based on a return code void answer(struct io_uring * ring, HttpCode code) { iterator = http_msgs[code]; to_send = http_lens[code]; if(to_send != 124) { std::cerr << "Answer has weird size: " << to_send << " (" << (int)code << ")" << std::endl; } answer(ring); } // send a new answer to the client // Reused for incomplete writes void answer(struct io_uring * ring) { state = ANSWERING; struct io_uring_sqe * sqe = get_sqe(ring); io_uring_prep_send(sqe, fd, iterator, to_send, 0); submit(ring, sqe); } //-------------------------------------------------- // Handle a new connection, results for getting an cqe while in the ACCEPTING state void newconn(struct io_uring * ring, int ret) { // Check errors if( ret < 0 ) { int err = -ret; if( err == ECONNABORTED ) { ::stats.errors.conns++; this->close(err); return; } std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } // Count the connections ::stats.completions.conns++; ::stats.conns.current++; if(::stats.conns.current > ::stats.conns.max) { ::stats.conns.max = ::stats.conns.current; } // Read on the data fd = ret; request(ring); // check the max fd so we know if we exceeded the array for(;;) { int expected = max_fd; if(expected >= fd) return; if( __atomic_compare_exchange_n(&max_fd, &expected, fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) return; } // check if we have enough space to fit inside the array if(fd >= array_max) { std::cerr << "accept error: fd " << fd << " is too high" << std::endl; return; } // Put our connection into the global array // No one else should be using it so if they are that's a bug auto exist = __atomic_exchange_n( &conns[fd], this, __ATOMIC_SEQ_CST); if( exist ) { size_t first = __atomic_fetch_add(&global_stats.recycle_errors, 1, __ATOMIC_SEQ_CST); if( first == 0 ) { std::cerr << "First: accept has existing connection " << std::endl; } } } // Handle a new request, results for getting an cqe while in the REQUESTING state void newrequest(struct io_uring * ring, int res) { // Check errors if( res < 0 ) { int err = -res; switch(err) { case EPIPE: ::stats.errors.requests.pipes++; break; // Don't fall through the get better stats case ECONNRESET: ::stats.errors.requests.reset++; break; default: ::stats.errors.requests.other++; std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl; exit(EXIT_FAILURE); } // Connection failed, close it this->close(err); return; } // Update stats ::stats.completions.reads++; // Is this an EOF if(res == 0) { // Yes, close the connection this->close(0); return; } // Find the end of the request header const char * it = buffer; if( !strstr( it, "\r\n\r\n" ) ) { // This state machine doesn't support incomplete reads // Print them to output so it's clear there is an issue std::cout << "Incomplete request" << std::endl; this->close(EBADR); return; } // Find the method to use it = buffer; int ret = memcmp(it, "GET ", 4); if( ret != 0 ) { // We only support get, answer with an error answer(ring, E400); return; } // Find the target it += 4; ret = memcmp(it, "/plaintext", 10); if( ret != 0 ) { // We only support /plaintext, answer with an error answer(ring, E404); return; } // Correct request, answer with the payload this->stats.requests++; answer(ring, OK200); } // Handle a partial or full answer sent, results for getting an cqe while in the ANSWERING state void writedone(struct io_uring * ring, int res) { // Check errors if( res < 0 ) { int err = -res; switch(err) { case EPIPE: ::stats.errors.answers.pipes++; break; // Don't fall through the get better stats case ECONNRESET: ::stats.errors.answers.reset++; break; default: ::stats.errors.answers.other++; std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl; exit(EXIT_FAILURE); } this->close(err); return; } // Update stats ::stats.completions.writes++; if(res == 124) ::stats.completions.full_writes++; // Is this write completed if( res == to_send ) { // Yes, more stats this->stats.answers++; if(this->stats.answers == 1) ::stats.conns.used++; // Then read a new request request(ring); return; } // Not a completed read, push the rest to_send -= res; iterator += res; answer(ring); } public: // Submit a call to accept and create a new connection object static void accept(struct io_uring * ring, const struct options_t & opt) { struct io_uring_sqe * sqe = get_sqe(ring); io_uring_prep_accept(sqe, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags); submit(ring, sqe, new connection()); // std::cout << "Submitted accept: " << req << std::endl; } // Handle a new cqe void handle(struct io_uring * ring, int res, const struct options_t & opt) { switch(state) { case ACCEPTING: connection::accept(ring, opt); newconn(ring, res); break; case REQUESTING: newrequest(ring, res); break; case ANSWERING: writedone(ring, res); break; } } }; //========================================================= // Main loop of the WebServer // Effectively uses one thread_local copy of everything per kernel thread void * proc_loop(void * arg) { // Get the thread local argument struct options_t & opt = *(struct options_t *)arg; struct io_uring * ring = opt.ring; // Track the shutdown using a event_fd char endfd_buf[8]; ring_end(ring, opt.endfd, endfd_buf, 8); // Accept our first connection // May not take effect until io_uring_submit_and_wait connection::accept(ring, opt); int reset = 1; // Counter to print stats once in a while bool done = false; // Are we done size_t sqes = 0; // Number of sqes we submitted size_t call = 0; // Number of submits we made while(!done) { // Submit all the answers we have and wait for responses int ret = io_uring_submit_and_wait(ring, 1); // check errors if (ret < 0) { fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) ); exit(EXIT_FAILURE); } // Check how good we are at batching sqes sqes += ret; call++; struct io_uring_cqe *cqe; unsigned head; unsigned count = 0; // go through all cqes io_uring_for_each_cqe(ring, head, cqe) { if (0 == cqe->user_data) { done = true; break; } auto req = (class connection *)cqe->user_data; req->handle( ring, cqe->res, opt ); // Every now and then, print some stats reset--; if(reset == 0) { std::cout << "Submit average: " << sqes << "/" << call << "(" << (((double)sqes) / call) << ")" << std::endl; // Reset to some random number of completions // use the ring_fd in the number of threads don't all print at once reset = 100000 + (100000 * (ring->ring_fd % 5)); } // Keep track of how many cqes we have seen count++; } // Mark the cqes as seen io_uring_cq_advance(ring, count); } // Tally all the thread local statistics __atomic_fetch_add( &global_stats.completions.conns, ::stats.completions.conns, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.completions.reads, ::stats.completions.reads, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.completions.writes, ::stats.completions.writes, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.completions.full_writes, ::stats.completions.full_writes, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.errors.conns, ::stats.errors.conns, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.errors.requests.pipes, ::stats.errors.requests.pipes, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.errors.requests.reset, ::stats.errors.requests.reset, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.errors.requests.other, ::stats.errors.requests.other, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.errors.answers.pipes, ::stats.errors.answers.pipes, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.errors.answers.reset, ::stats.errors.answers.reset, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.errors.answers.other, ::stats.errors.answers.other, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.conns.current, ::stats.conns.current, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.conns.max, ::stats.conns.max, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.conns.used, ::stats.conns.used, __ATOMIC_SEQ_CST ); return nullptr; } //========================================================= #include // for ispow2 extern "C" { #include // for pthreads #include // for signal(SIGPIPE, SIG_IGN); #include // use for termination #include // for sockets in general #include // for sockaddr_in, AF_INET } int main(int argc, char * argv[]) { // Initialize the array of connection-fd associations for(int i = 0; i < array_max; i++) { conns[i] = nullptr; } // Make sure we ignore all sigpipes signal(SIGPIPE, SIG_IGN); // Default command line arguments unsigned nthreads = 1; // number of kernel threads unsigned port = 8800; // which port to listen on unsigned entries = 256; // number of entries per ring/kernel thread unsigned backlog = 262144; // backlog argument to listen bool attach = false; // Whether or not to attach all the rings bool sqpoll = false; // Whether or not to use SQ Polling //=================== // Arguments Parsing int c; while ((c = getopt (argc, argv, "t:p:e:b:aS")) != -1) { switch (c) { case 't': nthreads = atoi(optarg); break; case 'p': port = atoi(optarg); break; case 'e': entries = atoi(optarg); break; case 'b': backlog = atoi(optarg); break; case 'a': attach = true; break; case 'S': sqpoll = true; break; case '?': default: std::cerr << "Usage: -t -p -e -b -aS" << std::endl; return EXIT_FAILURE; } } if( !std::ispow2(entries) ) { unsigned v = entries; v--; v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16; v++; std::cerr << "Warning: num_entries not a power of 2 (" << entries << ") raising to " << v << std::endl; entries = v; } //=================== // End FD // Create a single event fd to notify the kernel threads when the server shutsdown int efd = eventfd(0, EFD_SEMAPHORE); if (efd < 0) { std::cerr << "eventfd error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } //=================== // Open Socket // Listen on specified port std::cout << getpid() << " : Listening on port " << port << std::endl; int server_fd = socket(AF_INET, SOCK_STREAM, 0); if(server_fd < 0) { std::cerr << "socket error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } int ret = 0; struct sockaddr_in address; int addrlen = sizeof(address); memset( (char *)&address, '\0', addrlen ); address.sin_family = AF_INET; address.sin_addr.s_addr = htonl(INADDR_ANY); address.sin_port = htons( port ); // In case the port is already in use, don't just return an error // Linux is very slow at reclaiming port so just retry regularly int waited = 0; while(true) { ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) ); if(ret < 0) { if(errno == EADDRINUSE) { // Port is in used let's retry later if(waited == 0) { std::cerr << "Waiting for port" << std::endl; } else { // To be cure, print how long we have been waiting std::cerr << "\r" << waited; std::cerr.flush(); } waited ++; usleep( 1000000 ); // Wait and retry continue; } // Some other error occured, this is a real error std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } break; } ret = listen( server_fd, backlog ); if(ret < 0) { std::cerr << "listen error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } //=================== // Run Server Threads std::cout << "Starting " << nthreads << " Threads"; if(attach) { std::cout << " with attached Rings"; } std::cout << std::endl; // Create the desired number of kernel-threads and for each // create a ring. Create the rings in the main so we can attach them // Since the rings are all in a dense VLA, aligned them so we don't get false sharing // it's unlikely but better safe than sorry struct __attribute__((aligned(128))) aligned_ring { struct io_uring storage; }; aligned_ring thrd_rings[nthreads]; pthread_t thrd_hdls[nthreads]; options_t thrd_opts[nthreads]; bool no_drops = true; bool fast_poll = true; bool nfix_sqpl = true; for(unsigned i = 0; i < nthreads; i++) { struct io_uring_params p = { }; if(sqpoll) { // If sqpoll is on, add the flag p.flags |= IORING_SETUP_SQPOLL; p.sq_thread_idle = 100; } if (attach && i != 0) { // If attach is on, add the flag, except for the first ring p.flags |= IORING_SETUP_ATTACH_WQ; p.wq_fd = thrd_rings[0].storage.ring_fd; } // Create the ring io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p); // Check if some of the note-worthy features are there if(0 == (p.features & IORING_FEAT_NODROP )) { no_drops = false; } if(0 == (p.features & IORING_FEAT_FAST_POLL )) { fast_poll = false; } if(0 == (p.features & IORING_FEAT_SQPOLL_NONFIXED)) { nfix_sqpl = false; } // Write the socket options we want to the options we pass to the threads thrd_opts[i].acpt.sockfd = server_fd; thrd_opts[i].acpt.addr = (struct sockaddr *)&address; thrd_opts[i].acpt.addrlen = (socklen_t*)&addrlen; thrd_opts[i].acpt.flags = 0; thrd_opts[i].endfd = efd; thrd_opts[i].ring = &thrd_rings[i].storage; int ret = pthread_create(&thrd_hdls[i], nullptr, proc_loop, &thrd_opts[i]); if (ret < 0) { std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } } // Tell the user if the features are present if( no_drops ) std::cout << "No Drop Present" << std::endl; if( fast_poll) std::cout << "Fast Poll Present" << std::endl; if(!nfix_sqpl) std::cout << "Non-Fixed SQ Poll not Present" << std::endl; //=================== // Server Started std::cout << "Server Started" << std::endl; { char buffer[128]; int ret; do { // Wait for a Ctrl-D to close the server ret = read(STDIN_FILENO, buffer, 128); if(ret < 0) { std::cerr << "main read error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } else if(ret > 0) { std::cout << "User inputed '"; std::cout.write(buffer, ret); std::cout << "'" << std::endl; } } while(ret != 0); std::cout << "Shutdown received" << std::endl; } //=================== // Use eventfd_write to tell the threads we are closing (std::cout << "Sending Shutdown to Threads... ").flush(); ret = eventfd_write(efd, nthreads); if (ret < 0) { std::cerr << "eventfd close error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } std::cout << "done" << std::endl; //=================== // Join all the threads and close the rings (std::cout << "Stopping Threads Done... ").flush(); for(unsigned i = 0; i < nthreads; i++) { void * retval; int ret = pthread_join(thrd_hdls[i], &retval); if (ret < 0) { std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } io_uring_queue_exit(thrd_opts[i].ring); } std::cout << "done" << std::endl; //=================== // Close the sockets (std::cout << "Closing Socket... ").flush(); ret = shutdown( server_fd, SHUT_RD ); if( ret < 0 ) { std::cerr << "shutdown socket error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } ret = close(server_fd); if (ret < 0) { std::cerr << "close socket error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } std::cout << "done" << std::endl << std::endl; // Print stats and exit std::cout << "Errors: " << global_stats.errors.conns << "c, (" << global_stats.errors.requests.pipes << "p, " << global_stats.errors.requests.reset << "r, " << global_stats.errors.requests.other << "o" << ")r, (" << global_stats.errors.answers.pipes << "p, " << global_stats.errors.answers.reset << "r, " << global_stats.errors.answers.other << "o" << ")a" << std::endl; std::cout << "Completions: " << global_stats.completions.conns << "c, " << global_stats.completions.reads << "r, " << global_stats.completions.writes << "w" << std::endl; std::cout << "Full Writes: " << global_stats.completions.full_writes << std::endl; std::cout << "Max FD: " << max_fd << std::endl; std::cout << "Successful connections: " << global_stats.conns.used << std::endl; std::cout << "Max concurrent connections: " << global_stats.conns.max << std::endl; std::cout << "Accepts on non-zeros: " << global_stats.recycle_errors << std::endl; std::cout << "Leaked conn objects: " << global_stats.conns.current << std::endl; } // compile-command: "g++ http_ring.cpp -std=c++2a -pthread -luring -O3" //