#include <cstdio>
#include <cstdlib>
#include <cstring>

#include <iostream>

#include <signal.h>
#include <unistd.h>
#include <liburing.h>

// #define NOBATCHING
// #define USE_ASYNC

// Options passed to each threads
struct __attribute__((aligned(128))) options_t {
	// Data passed to accept
	struct {
		int sockfd;
		struct sockaddr *addr;
		socklen_t *addrlen;
		int flags;
		unsigned cnt;
	} acpt;

	// Termination notification
	int endfd;

	// The ring to use for io
	struct io_uring * ring;
};

//=========================================================
// General statistics
struct __attribute__((aligned(128))) stats_block_t {
	struct {
		volatile size_t conns = 0;
		volatile size_t reads = 0;
		volatile size_t writes = 0;
		volatile size_t full_writes = 0;
	} completions;

	struct {
		volatile size_t conns = 0;
		struct {
			volatile size_t pipes = 0;
			volatile size_t reset = 0;
			volatile size_t other = 0;
		} requests;

		struct {
			volatile size_t pipes = 0;
			volatile size_t reset = 0;
			volatile size_t other = 0;
		} answers;
	} errors;

	struct {
		volatile size_t current = 0;
		volatile size_t max = 0;
		volatile size_t used = 0;
	} conns;

	volatile size_t recycle_errors = 0;
};

// Each thread gets its own block of stats
// and there is a global block for tallying at the end
thread_local stats_block_t stats;
stats_block_t global_stats;

thread_local struct __attribute__((aligned(128))) {
	size_t to_submit = 0;
} local;

// Get an array of current connections
// This is just for debugging, to make sure
// no two state-machines get the same fd
const size_t array_max = 25000;
class connection * volatile conns[array_max] = { 0 };

// Max fd we've seen, keep track so it's convenient to adjust the array size after
volatile int max_fd = 0;

//=========================================================
// Some small wrappers for ring operations used outside the connection state machine
// get sqe + error handling
static struct io_uring_sqe * get_sqe(struct io_uring * ring) {
	struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
	if(!sqe) {
		std::cerr << "Insufficient entries in ring" << std::endl;
		exit(EXIT_FAILURE);
	}
	return sqe;
}

// read of the event fd is not done by a connection
// use nullptr as the user data
static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) {
	struct io_uring_sqe * sqe = get_sqe(ring);
	io_uring_prep_read(sqe, fd, buffer, len, 0);
	io_uring_sqe_set_data(sqe, nullptr);
	io_uring_submit(ring);
}

//=========================================================
// All answers are fixed and determined by the return code
enum HttpCode {
	OK200 = 0,
	E400,
	E404,
	E405,
	E408,
	E413,
	E414,
	KNOWN_CODES
};

// Get a fix reply based on the return code
const char * http_msgs[] = {
	"HTTP/1.1 200 OK\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 15\r\nConnection: keep-alive\r\n\r\nHello, World!\r\n",
	"HTTP/1.1 400 Bad Request\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
	"HTTP/1.1 404 Not Found\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
	"HTTP/1.1 405 Method Not \r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
	"HTTP/1.1 408 Request Timeout\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
	"HTTP/1.1 413 Payload Too Large\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
	"HTTP/1.1 414 URI Too Long\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
};
static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0])) );

// Pre-compute the length of these replys
const size_t http_lens[] = {
	strlen(http_msgs[0]),
	strlen(http_msgs[1]),
	strlen(http_msgs[2]),
	strlen(http_msgs[3]),
	strlen(http_msgs[4]),
	strlen(http_msgs[5]),
	strlen(http_msgs[6]),
};
static_assert( KNOWN_CODES == (sizeof(http_lens) / sizeof(http_lens[0])) );

//=========================================================
// Finate state machine responsible for handling each connection
class __attribute__((aligned(128))) connection {
private:
	// The state of the machine
	enum {
		ACCEPTING,  // Accept sent waiting for connection
		REQUESTING, // Waiting for new request
		ANSWERING,  // Either request received submitting answer or short answer sent, need to submit rest
	} state;

	// The file descriptor of the connection
	int fd;

	// request data
	static const size_t buffer_size = 1024;	// Size of the read buffer
	const char * buffer;                      // Buffer into which requests are read

	// send data
	size_t to_send;		// Data left to send
	const char * iterator;	// Pointer to rest of the message to send

	// stats
	// how many requests/answers were complete, that is, a valid cqe was obtained
	struct {
		size_t requests = 0;
		size_t answers = 0;
	} stats;

private:
	connection()
		: state(ACCEPTING)
		, fd(0)
		, buffer( new char[buffer_size])
		, iterator(nullptr)
	{}

	~connection() {
		delete [] buffer;
		::stats.conns.current--;
	}

	// Close the current connection
	void close(int err) {
		// std::cout << "(" << this->stats.requests << "," << this->stats.answers << ", e" << err << ") ";
		conns[fd] = nullptr;

		if(fd != 0) {
			::close(fd);
		}
		delete this;
	}

	//--------------------------------------------------
	// Wrappers for submit so we can tweak it more easily
	static void submit(struct io_uring * ring, struct io_uring_sqe * sqe, connection * conn) {
		(void)ring;
		local.to_submit++;
		#ifdef USE_ASYNC
			io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
		#endif
		io_uring_sqe_set_data(sqe, conn);
		#ifdef NOBATCHING
			io_uring_submit(ring);
		#endif
	}

	void submit(struct io_uring * ring, struct io_uring_sqe * sqe) {
		submit(ring, sqe, this);
	}

	//--------------------------------------------------
	// get a new request from the client
	void request(struct io_uring * ring) {
		state = REQUESTING;
		struct io_uring_sqe * sqe = get_sqe(ring);
		io_uring_prep_recv(sqe, fd, (void*)buffer, buffer_size, 0);
		submit(ring, sqe);
	}

	//--------------------------------------------------
	// Send a new answer based on a return code
	void answer(struct io_uring * ring, HttpCode code) {
		iterator = http_msgs[code];
		to_send  = http_lens[code];
		if(to_send != 124) {
			std::cerr << "Answer has weird size: " << to_send << " (" << (int)code << ")" << std::endl;
		}
		answer(ring);
	}

	// send a new answer to the client
	// Reused for incomplete writes
	void answer(struct io_uring * ring) {
		state = ANSWERING;
		struct io_uring_sqe * sqe = get_sqe(ring);
		io_uring_prep_send(sqe, fd, iterator, to_send, 0);
		submit(ring, sqe);
	}

	//--------------------------------------------------
	// Handle a new connection, results for getting an cqe while in the ACCEPTING state
	void newconn(struct io_uring * ring, int ret) {
		// Check errors
		if( ret < 0 ) {
			int err = -ret;
			if( err == ECONNABORTED ) {
				::stats.errors.conns++;
				this->close(err);
				return;
			}
			std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
			exit(EXIT_FAILURE);
		}

		// Count the connections
		::stats.completions.conns++;
		::stats.conns.current++;
		if(::stats.conns.current > ::stats.conns.max) {
			::stats.conns.max = ::stats.conns.current;
		}

		// Read on the data
		fd = ret;
		request(ring);

		// check the max fd so we know if we exceeded the array
		for(;;) {
			int expected = max_fd;
			if(expected >= fd) return;
			if( __atomic_compare_exchange_n(&max_fd, &expected, fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) return;
		}

		// check if we have enough space to fit inside the array
		if(fd >= array_max) {
			std::cerr << "accept error: fd " << fd << " is too high" << std::endl;
			return;
		}

		// Put our connection into the global array
		// No one else should be using it so if they are that's a bug
		auto exist = __atomic_exchange_n( &conns[fd], this, __ATOMIC_SEQ_CST);
		if( exist ) {
			size_t first = __atomic_fetch_add(&global_stats.recycle_errors, 1, __ATOMIC_SEQ_CST);
			if( first == 0 ) {
				std::cerr << "First: accept has existing connection " << std::endl;
			}
		}
	}

	// Handle a new request, results for getting an cqe while in the REQUESTING state
	void newrequest(struct io_uring * ring, int res) {
		// Check errors
		if( res < 0 ) {
			int err = -res;
			switch(err) {
				case EPIPE:
					::stats.errors.requests.pipes++;
					break;
					// Don't fall through the get better stats
				case ECONNRESET:
					::stats.errors.requests.reset++;
					break;
				default:
					::stats.errors.requests.other++;
					std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl;
					exit(EXIT_FAILURE);
			}

			// Connection failed, close it
			this->close(err);
			return;
		}

		// Update stats
		::stats.completions.reads++;

		// Is this an EOF
		if(res == 0) {
			// Yes, close the connection
			this->close(0);
			return;
		}

		// Find the end of the request header
		const char * it = buffer;
		if( !strstr( it, "\r\n\r\n" ) ) {
			// This state machine doesn't support incomplete reads
			// Print them to output so it's clear there is an issue
			std::cout << "Incomplete request" << std::endl;
			this->close(EBADR);
			return;
		}

		// Find the method to use
		it = buffer;
		int ret = memcmp(it, "GET ", 4);
		if( ret != 0 ) {
			// We only support get, answer with an error
			answer(ring, E400);
			return;
		}

		// Find the target
		it += 4;
		ret = memcmp(it, "/plaintext", 10);
		if( ret != 0 ) {
			// We only support /plaintext, answer with an error
			answer(ring, E404);
			return;
		}

		// Correct request, answer with the payload
		this->stats.requests++;
		answer(ring, OK200);
	}

	// Handle a partial or full answer sent, results for getting an cqe while in the ANSWERING state
	void writedone(struct io_uring * ring, int res) {
		// Check errors
		if( res < 0 ) {
			int err = -res;
			switch(err) {
				case EPIPE:
					::stats.errors.answers.pipes++;
					break;
					// Don't fall through the get better stats
				case ECONNRESET:
					::stats.errors.answers.reset++;
					break;
				default:
					::stats.errors.answers.other++;
					std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
					exit(EXIT_FAILURE);
			}

			this->close(err);
			return;
		}

		// Update stats
		::stats.completions.writes++;
		if(res == 124) ::stats.completions.full_writes++;

		// Is this write completed
		if( res == to_send ) {
			// Yes, more stats
			this->stats.answers++;
			if(this->stats.answers == 1) ::stats.conns.used++;
			// Then read a new request
			request(ring);
			return;
		}

		// Not a completed read, push the rest
		to_send -= res;
		iterator += res;
		answer(ring);
	}
public:
	// Submit a call to accept and create a new connection object
	static void accept(struct io_uring * ring, const struct options_t & opt) {
		struct io_uring_sqe * sqe = get_sqe(ring);
		io_uring_prep_accept(sqe, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
		submit(ring, sqe, new connection());
		// std::cout << "Submitted accept: " << req << std::endl;
	}

	// Handle a new cqe
	void handle(struct io_uring * ring, int res, const struct options_t & opt) {
		switch(state) {
		case ACCEPTING:
			// connection::accept(ring, opt);
			newconn(ring, res);
			break;
		case REQUESTING:
			newrequest(ring, res);
			break;
		case ANSWERING:
			writedone(ring, res);
			break;
		}
	}
};

//=========================================================
extern "C" {
	#include <sys/eventfd.h>  // use for termination
}

// Main loop of the WebServer
// Effectively uses one thread_local copy of everything per kernel thread
void * proc_loop(void * arg) {
	// Get the thread local argument
	struct options_t & opt = *(struct options_t *)arg;
	struct io_uring * ring = opt.ring;

	int blockfd = eventfd(0, 0);
	if (blockfd < 0) {
		fprintf( stderr, "eventfd create error: (%d) %s\n", (int)errno, strerror(errno) );
		exit(EXIT_FAILURE);
	}

	int ret = io_uring_register_eventfd(ring, blockfd);
	if (ret < 0) {
		fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );
		exit(EXIT_FAILURE);
	}

	// Track the shutdown using a event_fd
	char endfd_buf[8];
	ring_end(ring, opt.endfd, endfd_buf, 8);

	// Accept our first connection
	// May not take effect until io_uring_submit_and_wait
	for(unsigned i = 0; i < opt.acpt.cnt; i++) {
		connection::accept(ring, opt);
	}

	int reset = 1;       // Counter to print stats once in a while
	bool done = false;   // Are we done
	size_t sqes = 0;     // Number of sqes we submitted
	size_t call = 0;     // Number of submits we made
	while(!done) {
		// Submit all the answers we have and wait for responses
		int ret = io_uring_submit(ring);
		local.to_submit = 0;

		// check errors
		if (ret < 0) {
			fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );
			exit(EXIT_FAILURE);
		}

		// Check how good we are at batching sqes
		sqes += ret;
		call++;


		eventfd_t val;
		ret = eventfd_read(blockfd, &val);

		// check errors
		if (ret < 0) {
			fprintf( stderr, "eventfd read error: (%d) %s\n", (int)errno, strerror(errno) );
			exit(EXIT_FAILURE);
		}

		struct io_uring_cqe *cqe;
		unsigned head;
		unsigned count = 0;

        	// go through all cqes
        	io_uring_for_each_cqe(ring, head, cqe) {
			if (0 == cqe->user_data) {
				done = true;
				break;
			}

			if(local.to_submit > 30) break;

			auto req = (class connection *)cqe->user_data;
			req->handle( ring, cqe->res, opt );

			// Every now and then, print some stats
			reset--;
			if(reset == 0) {
				std::cout << "Submit average: " << sqes << "/" << call << "(" << (((double)sqes) / call) << ")" << std::endl;
				// Reset to some random number of completions
				// use the ring_fd in the number of threads don't all print at once
				reset = 100000 + (100000 * (ring->ring_fd % 5));
			}

			// Keep track of how many cqes we have seen
			count++;
		}

		// Mark the cqes as seen
		io_uring_cq_advance(ring, count);
	}

	// Tally all the thread local statistics
	__atomic_fetch_add( &global_stats.completions.conns, ::stats.completions.conns, __ATOMIC_SEQ_CST );
	__atomic_fetch_add( &global_stats.completions.reads, ::stats.completions.reads, __ATOMIC_SEQ_CST );
	__atomic_fetch_add( &global_stats.completions.writes, ::stats.completions.writes, __ATOMIC_SEQ_CST );
	__atomic_fetch_add( &global_stats.completions.full_writes, ::stats.completions.full_writes, __ATOMIC_SEQ_CST );
	__atomic_fetch_add( &global_stats.errors.conns, ::stats.errors.conns, __ATOMIC_SEQ_CST );
	__atomic_fetch_add( &global_stats.errors.requests.pipes, ::stats.errors.requests.pipes, __ATOMIC_SEQ_CST );
	__atomic_fetch_add( &global_stats.errors.requests.reset, ::stats.errors.requests.reset, __ATOMIC_SEQ_CST );
	__atomic_fetch_add( &global_stats.errors.requests.other, ::stats.errors.requests.other, __ATOMIC_SEQ_CST );
	__atomic_fetch_add( &global_stats.errors.answers.pipes, ::stats.errors.answers.pipes, __ATOMIC_SEQ_CST );
	__atomic_fetch_add( &global_stats.errors.answers.reset, ::stats.errors.answers.reset, __ATOMIC_SEQ_CST );
	__atomic_fetch_add( &global_stats.errors.answers.other, ::stats.errors.answers.other, __ATOMIC_SEQ_CST );
	__atomic_fetch_add( &global_stats.conns.current, ::stats.conns.current, __ATOMIC_SEQ_CST );
	__atomic_fetch_add( &global_stats.conns.max, ::stats.conns.max, __ATOMIC_SEQ_CST );
	__atomic_fetch_add( &global_stats.conns.used, ::stats.conns.used, __ATOMIC_SEQ_CST );

	return nullptr;
}

//=========================================================
#include <bit> // for ispow2

extern "C" {
	#include <pthread.h>      // for pthreads
	#include <signal.h>       // for signal(SIGPIPE, SIG_IGN);
	#include <sys/socket.h>   // for sockets in general
	#include <netinet/in.h>   // for sockaddr_in, AF_INET
}

int main(int argc, char * argv[]) {
	// Initialize the array of connection-fd associations
	for(int i = 0; i < array_max; i++) {
		conns[i] = nullptr;
	}

	// Make sure we ignore all sigpipes
	signal(SIGPIPE, SIG_IGN);

	// Default command line arguments
	unsigned nthreads = 1;      // number of kernel threads
	unsigned port = 8800;       // which port to listen on
	unsigned entries = 256;     // number of entries per ring/kernel thread
	unsigned backlog = 262144;  // backlog argument to listen
	unsigned preaccept = 1;     // start by accepting X per threads
	bool attach = false;        // Whether or not to attach all the rings
	bool sqpoll = false;        // Whether or not to use SQ Polling

	//===================
	// Arguments Parsing
	int c;
	while ((c = getopt (argc, argv, "t:p:e:b:c:aS")) != -1) {
		switch (c)
		{
		case 't':
			nthreads = atoi(optarg);
			break;
		case 'p':
			port = atoi(optarg);
			break;
		case 'e':
			entries = atoi(optarg);
			break;
		case 'b':
			backlog = atoi(optarg);
			break;
		case 'c':
			preaccept = atoi(optarg);
			break;
		case 'a':
			attach = true;
			break;
		case 'S':
			sqpoll = true;
			break;
		case '?':
		default:
			std::cerr << "Usage: -t <threads> -p <port> -e <entries> -b <backlog> -aS" << std::endl;
			return EXIT_FAILURE;
		}
	}

	if( !std::ispow2(entries) ) {
		unsigned v = entries;
		v--;
		v |= v >> 1;
		v |= v >> 2;
		v |= v >> 4;
		v |= v >> 8;
		v |= v >> 16;
		v++;
		std::cerr << "Warning: num_entries not a power of 2 (" << entries << ") raising to " << v << std::endl;
		entries = v;
	}

	//===================
	// End FD
	// Create a single event fd to notify the kernel threads when the server shutsdown
	int efd = eventfd(0, EFD_SEMAPHORE);
	if (efd < 0) {
		std::cerr << "eventfd error: (" << errno << ") " << strerror(errno) << std::endl;
		exit(EXIT_FAILURE);
	}

	//===================
	// Open Socket
	// Listen on specified port
	std::cout << getpid() << " : Listening on port " << port << std::endl;
	int server_fd = socket(AF_INET, SOCK_STREAM, 0);
	if(server_fd < 0) {
		std::cerr << "socket error: (" << errno << ") " << strerror(errno) << std::endl;
		exit(EXIT_FAILURE);
	}

	int ret = 0;
	struct sockaddr_in address;
	int addrlen = sizeof(address);
	memset( (char *)&address, '\0', addrlen );
	address.sin_family = AF_INET;
	address.sin_addr.s_addr = htonl(INADDR_ANY);
	address.sin_port = htons( port );

	// In case the port is already in use, don't just return an error
	// Linux is very slow at reclaiming port so just retry regularly
	int waited = 0;
	while(true) {
		ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) );
		if(ret < 0) {
			if(errno == EADDRINUSE) {
				// Port is in used let's retry later
				if(waited == 0) {
					std::cerr << "Waiting for port" << std::endl;
				} else {
					// To be cure, print how long we have been waiting
					std::cerr << "\r" << waited;
					std::cerr.flush();
				}
				waited ++;
				usleep( 1000000 ); // Wait and retry
				continue;
			}
			// Some other error occured, this is a real error
			std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl;
			exit(EXIT_FAILURE);
		}
		break;
	}

	ret = listen( server_fd, backlog );
	if(ret < 0) {
		std::cerr << "listen error: (" << errno << ") " << strerror(errno) << std::endl;
		exit(EXIT_FAILURE);
	}

	//===================
	// Run Server Threads
	std::cout << "Starting " << nthreads << " Threads";
	if(attach) {
		std::cout << " with attached Rings";
	}
	std::cout << std::endl;

	// Create the desired number of kernel-threads and for each
	// create a ring. Create the rings in the main so we can attach them
	// Since the rings are all in a dense VLA, aligned them so we don't get false sharing
	// it's unlikely but better safe than sorry
	struct __attribute__((aligned(128))) aligned_ring {
		struct io_uring storage;
	};
	aligned_ring thrd_rings[nthreads];
	pthread_t    thrd_hdls[nthreads];
	options_t    thrd_opts[nthreads];
	bool no_drops  = true;
	bool fast_poll = true;
	bool nfix_sqpl = true;
	for(unsigned i = 0; i < nthreads; i++) {
		struct io_uring_params p = { };

		if(sqpoll) { // If sqpoll is on, add the flag
			p.flags |= IORING_SETUP_SQPOLL;
			p.sq_thread_idle = 100;
		}

		if (attach && i != 0) { // If attach is on, add the flag, except for the first ring
			p.flags |= IORING_SETUP_ATTACH_WQ;
			p.wq_fd = thrd_rings[0].storage.ring_fd;
		}

		// Create the ring
		io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p);

		// Check if some of the note-worthy features are there
		if(0 == (p.features & IORING_FEAT_NODROP         )) { no_drops  = false; }
		if(0 == (p.features & IORING_FEAT_FAST_POLL      )) { fast_poll = false; }
		if(0 == (p.features & IORING_FEAT_SQPOLL_NONFIXED)) { nfix_sqpl = false; }

		// Write the socket options we want to the options we pass to the threads
		thrd_opts[i].acpt.sockfd  = server_fd;
		thrd_opts[i].acpt.addr    = (struct sockaddr *)&address;
		thrd_opts[i].acpt.addrlen = (socklen_t*)&addrlen;
		thrd_opts[i].acpt.flags   = 0;
		thrd_opts[i].acpt.cnt     = preaccept;
		thrd_opts[i].endfd        = efd;
		thrd_opts[i].ring         = &thrd_rings[i].storage;

		int ret = pthread_create(&thrd_hdls[i], nullptr, proc_loop, &thrd_opts[i]);
		if (ret < 0) {
			std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
			exit(EXIT_FAILURE);
		}
	}

	// Tell the user if the features are present
	if( no_drops ) std::cout << "No Drop Present" << std::endl;
	if( fast_poll) std::cout << "Fast Poll Present" << std::endl;
	if(!nfix_sqpl) std::cout << "Non-Fixed SQ Poll not Present" << std::endl;

	//===================
	// Server Started
	std::cout << "Server Started" << std::endl;
	{
		char buffer[128];
		int ret;
		do {
			// Wait for a Ctrl-D to close the server
			ret = read(STDIN_FILENO, buffer, 128);
			if(ret < 0) {
				std::cerr << "main read error: (" << errno << ") " << strerror(errno) << std::endl;
				exit(EXIT_FAILURE);
			}
			else if(ret > 0) {
				std::cout << "User inputed '";
				std::cout.write(buffer, ret);
				std::cout << "'" << std::endl;
			}
		} while(ret != 0);

		std::cout << "Shutdown received" << std::endl;
	}

	//===================
	// Use eventfd_write to tell the threads we are closing
	(std::cout << "Sending Shutdown to Threads... ").flush();
	ret = eventfd_write(efd, nthreads);
	if (ret < 0) {
		std::cerr << "eventfd close error: (" << errno << ") " << strerror(errno) << std::endl;
		exit(EXIT_FAILURE);
	}
	std::cout << "done" << std::endl;

	//===================
	// Join all the threads and close the rings
	(std::cout << "Stopping Threads Done... ").flush();
	for(unsigned i = 0; i < nthreads; i++) {
		void * retval;
		int ret = pthread_join(thrd_hdls[i], &retval);
		if (ret < 0) {
			std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
			exit(EXIT_FAILURE);
		}

		io_uring_queue_exit(thrd_opts[i].ring);
	}
	std::cout << "done" << std::endl;

	//===================
	// Close the sockets
	(std::cout << "Closing Socket... ").flush();
	ret = shutdown( server_fd, SHUT_RD );
	if( ret < 0 ) {
		std::cerr << "shutdown socket error: (" << errno << ") " << strerror(errno) << std::endl;
		exit(EXIT_FAILURE);
	}

	ret = close(server_fd);
	if (ret < 0) {
		std::cerr << "close socket error: (" << errno << ") " << strerror(errno) << std::endl;
		exit(EXIT_FAILURE);
	}
	std::cout << "done" << std::endl << std::endl;

	// Print stats and exit
	std::cout << "Errors: " << global_stats.errors.conns << "c, (" << global_stats.errors.requests.pipes << "p, " << global_stats.errors.requests.reset << "r, " << global_stats.errors.requests.other << "o" << ")r, (" << global_stats.errors.answers.pipes << "p, " << global_stats.errors.answers.reset << "r, " << global_stats.errors.answers.other << "o" << ")a" << std::endl;
	std::cout << "Completions: " << global_stats.completions.conns << "c, " << global_stats.completions.reads << "r, " << global_stats.completions.writes << "w" << std::endl;
	std::cout << "Full Writes: " << global_stats.completions.full_writes << std::endl;
	std::cout << "Max FD: " << max_fd << std::endl;
	std::cout << "Successful connections: " << global_stats.conns.used << std::endl;
	std::cout << "Max concurrent connections: " << global_stats.conns.max << std::endl;
	std::cout << "Accepts on non-zeros: " << global_stats.recycle_errors << std::endl;
	std::cout << "Leaked conn objects: " << global_stats.conns.current << std::endl;
}

// compile-command: "g++ http_ring.cpp -std=c++2a -pthread -luring -O3" //