Index: benchmark/io/http/http_ring.cpp
===================================================================
--- benchmark/io/http/http_ring.cpp	(revision c05c58f5be1c7fe6b4c0b065b52021f075f2638c)
+++ benchmark/io/http/http_ring.cpp	(revision 761a2467835175a5da87d3c9dbcec0dfda257378)
@@ -9,32 +9,7 @@
 #include <liburing.h>
 
-typedef enum {
-	EVENT_END,
-	EVENT_ACCEPT,
-	EVENT_REQUEST,
-	EVENT_ANSWER
-} event_t;
-
-struct __attribute__((aligned(128))) request_t {
-	event_t type;
-	int fd;
-	size_t length;
-	char * buff;
-	char data[0];
-
-	static struct request_t * create(event_t type, size_t extra) {
-		auto ret = (struct request_t *)malloc(sizeof(struct request_t) + extra);
-		ret->type = type;
-		ret->length = extra;
-		ret->buff = ret->data;
-		return ret;
-	}
-
-	static struct request_t * create(event_t type) {
-		return create(type, 0);
-	}
-};
-
+// Options passed to each threads
 struct __attribute__((aligned(128))) options_t {
+	// Data passed to accept
 	struct {
 		int sockfd;
@@ -44,17 +19,62 @@
 	} acpt;
 
+	// Termination notification
 	int endfd;
+
+	// The ring to use for io
 	struct io_uring * ring;
-
+};
+
+//=========================================================
+// General statistics
+struct __attribute__((aligned(128))) stats_block_t {
 	struct {
-		size_t subs = 0;
-		size_t cnts = 0;
-	} result;
+		volatile size_t conns = 0;
+		volatile size_t reads = 0;
+		volatile size_t writes = 0;
+		volatile size_t full_writes = 0;
+	} completions;
+
+	struct {
+		volatile size_t conns = 0;
+		struct {
+			volatile size_t pipes = 0;
+			volatile size_t reset = 0;
+			volatile size_t other = 0;
+		} requests;
+
+		struct {
+			volatile size_t pipes = 0;
+			volatile size_t reset = 0;
+			volatile size_t other = 0;
+		} answers;
+	} errors;
+
+	struct {
+		volatile size_t current = 0;
+		volatile size_t max = 0;
+		volatile size_t used = 0;
+	} conns;
+
+	volatile size_t recycle_errors = 0;
 };
 
-volatile size_t total = 0;
-volatile size_t count = 0;
+// Each thread gets its own block of stats
+// and there is a global block for tallying at the end
+thread_local stats_block_t stats;
+stats_block_t global_stats;
+
+// Get an array of current connections
+// This is just for debugging, to make sure
+// no two state-machines get the same fd
+const size_t array_max = 25000;
+class connection * volatile conns[array_max] = { 0 };
+
+// Max fd we've seen, keep track so it's convenient to adjust the array size after
+volatile int max_fd = 0;
 
 //=========================================================
+// Some small wrappers for ring operations used outside the connection state machine
+// get sqe + error handling
 static struct io_uring_sqe * get_sqe(struct io_uring * ring) {
 	struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
@@ -66,39 +86,15 @@
 }
 
-static void submit(struct io_uring * ) {
-	// io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
-	// io_uring_submit(ring);
-}
-
-//=========================================================
+// read of the event fd is not done by a connection
+// use nullptr as the user data
 static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) {
 	struct io_uring_sqe * sqe = get_sqe(ring);
 	io_uring_prep_read(sqe, fd, buffer, len, 0);
-	io_uring_sqe_set_data(sqe, request_t::create(EVENT_END));
-	submit(ring);
+	io_uring_sqe_set_data(sqe, nullptr);
+	io_uring_submit(ring);
 }
 
-static void ring_accept(struct io_uring * ring, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
-	auto req = request_t::create(EVENT_ACCEPT);
-	struct io_uring_sqe * sqe = get_sqe(ring);
-	io_uring_prep_accept(sqe, sockfd, addr, addrlen, flags);
-	io_uring_sqe_set_data(sqe, req);
-	submit(ring);
-	// std::cout << "Submitted accept: " << req << std::endl;
-}
-
-static void ring_request(struct io_uring * ring, int fd) {
-	size_t size = 1024;
-	auto req = request_t::create(EVENT_REQUEST, size);
-	req->fd = fd;
-
-	struct io_uring_sqe * sqe = get_sqe(ring);
-	io_uring_prep_read(sqe, fd, req->buff, size, 0);
-	io_uring_sqe_set_data(sqe, req);
-	submit(ring);
-	// std::cout << "Submitted request: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
-}
-
 //=========================================================
+// All answers are fixed and determined by the return code
 enum HttpCode {
 	OK200 = 0,
@@ -112,282 +108,418 @@
 };
 
+// Get a fix reply based on the return code
 const char * http_msgs[] = {
-	"HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: %zu \n\n%s",
-	"HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
-	"HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
-	"HTTP/1.1 405 Method Not Allowed\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
-	"HTTP/1.1 408 Request Timeout\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
-	"HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
-	"HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
+	"HTTP/1.1 200 OK\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 15\r\nConnection: keep-alive\r\n\r\nHello, World!\r\n",
+	"HTTP/1.1 400 Bad Request\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
+	"HTTP/1.1 404 Not Found\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
+	"HTTP/1.1 405 Method Not \r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
+	"HTTP/1.1 408 Request Timeout\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
+	"HTTP/1.1 413 Payload Too Large\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
+	"HTTP/1.1 414 URI Too Long\r\nServer: HttoForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",
 };
-
-static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0])));
-
-const int http_codes[] = {
-	200,
-	400,
-	404,
-	405,
-	408,
-	413,
-	414,
+static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0])) );
+
+// Pre-compute the length of these replys
+const size_t http_lens[] = {
+	strlen(http_msgs[0]),
+	strlen(http_msgs[1]),
+	strlen(http_msgs[2]),
+	strlen(http_msgs[3]),
+	strlen(http_msgs[4]),
+	strlen(http_msgs[5]),
+	strlen(http_msgs[6]),
 };
-
-static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0])));
-
-int code_val(HttpCode code) {
-	return http_codes[code];
-}
-
-static void ring_answer(struct io_uring * ring, int fd, HttpCode code) {
-	size_t size = 256;
-	auto req = request_t::create(EVENT_ANSWER, size);
-	req->fd = fd;
-
-	const char * fmt = http_msgs[code];
-	const char * date = "";
-	size = snprintf(req->buff, size, fmt, date, size);
-
-	struct io_uring_sqe * sqe = get_sqe(ring);
-	io_uring_prep_write(sqe, fd, req->buff, size, 0);
-	io_uring_sqe_set_data(sqe, req);
-	submit(ring);
-	// std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
-}
-
-static void ring_answer(struct io_uring * ring, int fd, const std::string &) {
-	// size_t size = 256;
-	// auto req = request_t::create(EVENT_ANSWER, size);
-	// req->fd = fd;
-
-	// const char * fmt = http_msgs[OK200];
-	// const char * date = "";
-	// size_t len = snprintf(req->buffer, size, fmt, date, ans.size(), ans.c_str());
-	// req->length = len;
-
-	// struct io_uring_sqe * sqe = get_sqe(ring);
-	// io_uring_prep_write(sqe, fd, req->buffer, len, 0);
-	// io_uring_sqe_set_data(sqe, req);
-	// submit(ring);
-	// std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
-
-
-	static const char* RESPONSE = "HTTP/1.1 200 OK\r\n" \
-						"Content-Length: 15\r\n" \
-						"Content-Type: text/html\r\n" \
-						"Connection: keep-alive\r\n" \
-						"Server: testserver\r\n" \
-						"\r\n" \
-						"Hello, World!\r\n";
-
-	static const size_t RLEN = strlen(RESPONSE);
-
-	size_t size = 256;
-	auto req = request_t::create(EVENT_ANSWER, size);
-	req->fd = fd;
-	req->buff = (char*)RESPONSE;
-	req->length = RLEN;
-
-	// const char * fmt = http_msgs[OK200];
-	// const char * date = "";
-	// size_t len = snprintf(req->buffer, size, fmt, date, ans.size(), ans.c_str());
-	// req->length = len;
-
-	struct io_uring_sqe * sqe = get_sqe(ring);
-	io_uring_prep_write(sqe, fd, RESPONSE, RLEN, 0);
-	io_uring_sqe_set_data(sqe, req);
-	submit(ring);
-}
+static_assert( KNOWN_CODES == (sizeof(http_lens) / sizeof(http_lens[0])) );
 
 //=========================================================
-static void handle_new_conn(struct io_uring * ring, int fd) {
-	if( fd < 0 ) {
-		int err = -fd;
-		if( err == ECONNABORTED ) return;
-		std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
-		exit(EXIT_FAILURE);
-	}
-
-	ring_request(ring, fd);
-}
-
-static void handle_request(struct io_uring * ring, struct request_t * in, int res) {
-	if( res < 0 ) {
-		int err = -res;
-		switch(err) {
-			case EPIPE:
-			case ECONNRESET:
-				close(in->fd);
-				free(in);
+// Finate state machine responsible for handling each connection
+class __attribute__((aligned(128))) connection {
+private:
+	// The state of the machine
+	enum {
+		ACCEPTING,  // Accept sent waiting for connection
+		REQUESTING, // Waiting for new request
+		ANSWERING,  // Either request received submitting answer or short answer sent, need to submit rest
+	} state;
+
+	// The file descriptor of the connection
+	int fd;
+
+	// request data
+	static const size_t buffer_size = 1024;	// Size of the read buffer
+	const char * buffer;                      // Buffer into which requests are read
+
+	// send data
+	size_t to_send;		// Data left to send
+	const char * iterator;	// Pointer to rest of the message to send
+
+	// stats
+	// how many requests/answers were complete, that is, a valid cqe was obtained
+	struct {
+		size_t requests = 0;
+		size_t answers = 0;
+	} stats;
+
+private:
+	connection()
+		: state(ACCEPTING)
+		, fd(0)
+		, buffer( new char[buffer_size])
+		, iterator(nullptr)
+	{
+		::stats.conns.max++;
+		::stats.conns.current++;
+	}
+
+	~connection() {
+		delete [] buffer;
+		::stats.conns.current--;
+	}
+
+	// Close the current connection
+	void close(int err) {
+		// std::cout << "(" << this->stats.requests << "," << this->stats.answers << ", e" << err << ") ";
+		conns[fd] = nullptr;
+
+		if(fd != 0) {
+			::close(fd);
+		}
+		delete this;
+	}
+
+	//--------------------------------------------------
+	// Wrappers for submit so we can tweak it more easily
+	static void submit(struct io_uring * ring, struct io_uring_sqe * sqe, connection * conn) {
+		(void)ring;
+		// io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
+		io_uring_sqe_set_data(sqe, conn);
+		// io_uring_submit(ring);
+	}
+
+	void submit(struct io_uring * ring, struct io_uring_sqe * sqe) {
+		submit(ring, sqe, this);
+	}
+
+	//--------------------------------------------------
+	// get a new request from the client
+	void request(struct io_uring * ring) {
+		state = REQUESTING;
+		struct io_uring_sqe * sqe = get_sqe(ring);
+		io_uring_prep_read(sqe, fd, (void*)buffer, buffer_size, 0);
+		submit(ring, sqe);
+	}
+
+	//--------------------------------------------------
+	// Send a new answer based on a return code
+	void answer(struct io_uring * ring, HttpCode code) {
+		iterator = http_msgs[code];
+		to_send  = http_lens[code];
+		if(to_send != 124) {
+			std::cerr << "Answer has weird size: " << to_send << " (" << (int)code << ")" << std::endl;
+		}
+		answer(ring);
+	}
+
+	// send a new answer to the client
+	// Reused for incomplete writes
+	void answer(struct io_uring * ring) {
+		state = ANSWERING;
+		struct io_uring_sqe * sqe = get_sqe(ring);
+		io_uring_prep_write(sqe, fd, iterator, to_send, 0);
+		submit(ring, sqe);
+	}
+
+	//--------------------------------------------------
+	// Handle a new connection, results for getting an cqe while in the ACCEPTING state
+	void newconn(struct io_uring * ring, int ret) {
+		// Check errors
+		if( ret < 0 ) {
+			int err = -ret;
+			if( err == ECONNABORTED ) {
+				::stats.errors.conns++;
+				this->close(err);
 				return;
-			default:
-				std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl;
-				exit(EXIT_FAILURE);
-		}
-	}
-
-	if(res == 0) {
-		close(in->fd);
-		free(in);
-		return;
-	}
-
-	const char * it = in->buff;
-	if( !strstr( it, "\r\n\r\n" ) ) {
-		std::cout << "Incomplete request" << std::endl;
-		close(in->fd);
-		free(in);
-		return;
-	}
-
-	it = in->buff;
-	const std::string reply = "Hello, World!\n";
-	int ret = memcmp(it, "GET ", 4);
-	if( ret != 0 ) {
-		ring_answer(ring, in->fd, E400);
-		goto NEXT;
-	}
-
-	it += 4;
-	ret = memcmp(it, "/plaintext", 10);
-	if( ret != 0 ) {
-		ring_answer(ring, in->fd, E404);
-		goto NEXT;
-	}
-
-	ring_answer(ring, in->fd, reply);
-
-	NEXT:
-		ring_request(ring, in->fd);
-		free(in);
-		return;
-}
-
-static void handle_answer(struct io_uring * ring, struct request_t * in, int res) {
-	if( res < 0 ) {
-		int err = -res;
-		switch(err) {
-			case EPIPE:
-			case ECONNRESET:
-				close(in->fd);
-				free(in);
-				return;
-			default:
-				std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
-				exit(EXIT_FAILURE);
-		}
-	}
-
-	if( res >= in->length ) {
-		free(in);
-		return;
-	}
-
-	struct io_uring_sqe * sqe = get_sqe(ring);
-	io_uring_prep_write(sqe, in->fd, in->buff + res, in->length - res, 0);
-	io_uring_sqe_set_data(sqe, in);
-	submit(ring);
-	// std::cout << "Re-Submitted request: " << in << " (" << (void*)in->buffer << ")"<<std::endl;
-
-	ring_request(ring, in->fd);
-}
+			}
+			std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
+			exit(EXIT_FAILURE);
+		}
+
+		// Count the connections
+		::stats.completions.conns++;
+
+		// Read on the data
+		fd = ret;
+		request(ring);
+
+		// check the max fd so we know if we exceeded the array
+		for(;;) {
+			int expected = max_fd;
+			if(expected >= fd) return;
+			if( __atomic_compare_exchange_n(&max_fd, &expected, fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) return;
+		}
+
+		// check if we have enough space to fit inside the array
+		if(fd >= array_max) {
+			std::cerr << "accept error: fd " << fd << " is too high" << std::endl;
+			return;
+		}
+
+		// Put our connection into the global array
+		// No one else should be using it so if they are that's a bug
+		auto exist = __atomic_exchange_n( &conns[fd], this, __ATOMIC_SEQ_CST);
+		if( exist ) {
+			size_t first = __atomic_fetch_add(&global_stats.recycle_errors, 1, __ATOMIC_SEQ_CST);
+			if( first == 0 ) {
+				std::cerr << "First: accept has existing connection " << std::endl;
+			}
+		}
+	}
+
+	// Handle a new request, results for getting an cqe while in the REQUESTING state
+	void newrequest(struct io_uring * ring, int ret) {
+		// Check errors
+		if( ret < 0 ) {
+			int err = -ret;
+			switch(err) {
+				case EPIPE:
+					::stats.errors.requests.pipes++;
+					break;
+					// Don't fall through the get better stats
+				case ECONNRESET:
+					::stats.errors.requests.reset++;
+					break;
+				default:
+					::stats.errors.requests.other++;
+					std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl;
+					exit(EXIT_FAILURE);
+			}
+
+			// Connection failed, close it
+			this->close(err);
+			return;
+		}
+
+		// Update stats
+		::stats.completions.reads++;
+
+		// Is this an EOF
+		if(ret == 0) {
+			// Yes, close the connection
+			this->close(0);
+			return;
+		}
+
+		// Find the end of the request header
+		const char * it = buffer;
+		if( !strstr( it, "\r\n\r\n" ) ) {
+			// This state machine doesn't support incomplete reads
+			// Print them to output so it's clear there is an issue
+			std::cout << "Incomplete request" << std::endl;
+			this->close(EBADR);
+			return;
+		}
+
+		// Find the method to use
+		it = buffer;
+		int ret = memcmp(it, "GET ", 4);
+		if( ret != 0 ) {
+			// We only support get, answer with an error
+			answer(ring, E400);
+			return;
+		}
+
+		// Find the target
+		it += 4;
+		ret = memcmp(it, "/plaintext", 10);
+		if( ret != 0 ) {
+			// We only support /plaintext, answer with an error
+			answer(ring, E404);
+			return;
+		}
+
+		// Correct request, answer with the payload
+		this->stats.requests++;
+		answer(ring, OK200);
+	}
+
+	// Handle a partial or full answer sent, results for getting an cqe while in the ANSWERING state
+	void writedone(struct io_uring * ring, int res) {
+		// Check errors
+		if( res < 0 ) {
+			int err = -res;
+			switch(err) {
+				case EPIPE:
+					::stats.errors.answers.pipes++;
+					break;
+					// Don't fall through the get better stats
+				case ECONNRESET:
+					::stats.errors.answers.reset++;
+					break;
+				default:
+					::stats.errors.answers.other++;
+					std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
+					exit(EXIT_FAILURE);
+			}
+
+			this->close(err);
+			return;
+		}
+
+		// Update stats
+		::stats.completions.writes++;
+		if(res == 124) ::stats.completions.full_writes++;
+
+		// Is this write completed
+		if( res == to_send ) {
+			// Yes, more stats
+			stats.answers++;
+			if(stats.answers == 2) ::stats.conns.used++;
+			// Then read a new request
+			request(ring);
+			return;
+		}
+
+		// Not a completed read, push the rest
+		to_send -= res;
+		iterator += res;
+		answer(ring);
+	}
+public:
+	// Submit a call to accept and create a new connection object
+	static void accept(struct io_uring * ring, const struct options_t & opt) {
+		struct io_uring_sqe * sqe = get_sqe(ring);
+		io_uring_prep_accept(sqe, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
+		submit(ring, sqe, new connection());
+		// std::cout << "Submitted accept: " << req << std::endl;
+	}
+
+	// Handle a new cqe
+	void handle(struct io_uring * ring, int res, const struct options_t & opt) {
+		switch(state) {
+		case ACCEPTING:
+			connection::accept(ring, opt);
+			newconn(ring, res);
+			break;
+		case REQUESTING:
+			newrequest(ring, res);
+			break;
+		case ANSWERING:
+			writedone(ring, res);
+			break;
+		}
+	}
+};
 
 //=========================================================
-extern "C" {
-extern int __io_uring_flush_sq(struct io_uring *ring);
-}
-
+// Main loop of the WebServer
+// Effectively uses one thread_local copy of everything per kernel thread
 void * proc_loop(void * arg) {
-	size_t count = 0;
+	// Get the thread local argument
 	struct options_t & opt = *(struct options_t *)arg;
-
 	struct io_uring * ring = opt.ring;
 
+	// Track the shutdown using a event_fd
 	char endfd_buf[8];
 	ring_end(ring, opt.endfd, endfd_buf, 8);
 
-	ring_accept(ring, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
-
-	int reset = 1;
-	bool done = false;
+	// Accept our first connection
+	// May not take effect until io_uring_submit_and_wait
+	connection::accept(ring, opt);
+
+	int reset = 1;       // Counter to print stats once in a while
+	bool done = false;   // Are we done
+	size_t sqes = 0;     // Number of sqes we submitted
+	size_t call = 0;     // Number of submits we made
 	while(!done) {
-    		struct io_uring_cqe *cqe;
-		int ret;
-		while(-EAGAIN == (ret = io_uring_wait_cqe_nr(ring, &cqe, 0))) {
-			ret = io_uring_submit_and_wait(ring, 1);
-			if (ret < 0) {
-				fprintf( stderr, "io_uring get error: (%d) %s\n", (int)-ret, strerror(-ret) );
-				exit(EXIT_FAILURE);
-			}
-			opt.result.subs += ret;
-			opt.result.cnts++;
-		}
-
-		if (ret < 0 && -EAGAIN != ret) {
-			fprintf( stderr, "io_uring peek error: (%d) %s\n", (int)-ret, strerror(-ret) );
+		// Submit all the answers we have and wait for responses
+		int ret = io_uring_submit_and_wait(ring, 1);
+
+		// check errors
+		if (ret < 0) {
+			fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );
 			exit(EXIT_FAILURE);
 		}
 
-		auto req = (struct request_t *)cqe->user_data;
-		// std::cout << req << " completed with " << cqe->res << std::endl;
-
-		switch(req->type) {
-			case EVENT_END:
+		// Check how good we are at batching sqes
+		sqes += ret;
+		call++;
+
+		struct io_uring_cqe *cqe;
+		unsigned head;
+		unsigned count = 0;
+
+        	// go through all cqes
+        	io_uring_for_each_cqe(ring, head, cqe) {
+			if (0 == cqe->user_data) {
 				done = true;
 				break;
-			case EVENT_ACCEPT:
-				handle_new_conn(ring, cqe->res);
-				free(req);
-				ring_accept(ring, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
-				break;
-			case EVENT_REQUEST:
-				handle_request(ring, req, cqe->res);
-				break;
-			case EVENT_ANSWER:
-				handle_answer(ring, req, cqe->res);
-				break;
-		}
-
-		io_uring_cqe_seen(ring, cqe);
-		reset--;
-		if(reset == 0) {
-			size_t ltotal = opt.result.subs;
-			size_t lcount = opt.result.cnts;
-
-			std::cout << "Submit average: " << ltotal << "/" << lcount << "(" << (((double)ltotal) / lcount) << ")" << std::endl;
-			reset = 100000 + (100000 * (ring->ring_fd % 5));
-		}
-	}
-
-	return (void*)count;
+			}
+
+			auto req = (class connection *)cqe->user_data;
+			req->handle( ring, cqe->res, opt );
+
+			reset--;
+			if(reset == 0) {
+				std::cout << "Submit average: " << sqes << "/" << call << "(" << (((double)sqes) / call) << ")" << std::endl;
+				reset = 100000 + (100000 * (ring->ring_fd % 5));
+			}
+
+			// Keep track of how many cqes we have seen
+			count++;
+		}
+
+		// Mark the cqes as seen
+		io_uring_cq_advance(ring, count);
+	}
+
+	// Tally all the thread local statistics
+	__atomic_fetch_add( &global_stats.completions.conns, ::stats.completions.conns, __ATOMIC_SEQ_CST );
+	__atomic_fetch_add( &global_stats.completions.reads, ::stats.completions.reads, __ATOMIC_SEQ_CST );
+	__atomic_fetch_add( &global_stats.completions.writes, ::stats.completions.writes, __ATOMIC_SEQ_CST );
+	__atomic_fetch_add( &global_stats.completions.full_writes, ::stats.completions.full_writes, __ATOMIC_SEQ_CST );
+	__atomic_fetch_add( &global_stats.errors.conns, ::stats.errors.conns, __ATOMIC_SEQ_CST );
+	__atomic_fetch_add( &global_stats.errors.requests.pipes, ::stats.errors.requests.pipes, __ATOMIC_SEQ_CST );
+	__atomic_fetch_add( &global_stats.errors.requests.reset, ::stats.errors.requests.reset, __ATOMIC_SEQ_CST );
+	__atomic_fetch_add( &global_stats.errors.requests.other, ::stats.errors.requests.other, __ATOMIC_SEQ_CST );
+	__atomic_fetch_add( &global_stats.errors.answers.pipes, ::stats.errors.answers.pipes, __ATOMIC_SEQ_CST );
+	__atomic_fetch_add( &global_stats.errors.answers.reset, ::stats.errors.answers.reset, __ATOMIC_SEQ_CST );
+	__atomic_fetch_add( &global_stats.errors.answers.other, ::stats.errors.answers.other, __ATOMIC_SEQ_CST );
+	__atomic_fetch_add( &global_stats.conns.current, ::stats.conns.current, __ATOMIC_SEQ_CST );
+	__atomic_fetch_add( &global_stats.conns.max, ::stats.conns.max, __ATOMIC_SEQ_CST );
+	__atomic_fetch_add( &global_stats.conns.used, ::stats.conns.used, __ATOMIC_SEQ_CST );
+
+	return nullptr;
 }
 
 //=========================================================
-struct __attribute__((aligned(128))) aligned_ring {
-	struct io_uring storage;
-};
-
-#include <bit>
-
-#include <pthread.h>
+#include <bit> // for ispow2
+
 extern "C" {
-	#include <signal.h>
-	#include <sys/eventfd.h>
-	#include <sys/socket.h>
-	#include <netinet/in.h>
+	#include <pthread.h>      // for pthreads
+	#include <signal.h>       // for signal(SIGPIPE, SIG_IGN);
+	#include <sys/eventfd.h>  // use for termination
+	#include <sys/socket.h>   // for sockets in general
+	#include <netinet/in.h>   // for sockaddr_in, AF_INET
 }
 
 int main(int argc, char * argv[]) {
+	// Initialize the array of connection-fd associations
+	for(int i = 0; i < array_max; i++) {
+		conns[i] = nullptr;
+	}
+
+	// Make sure we ignore all sigpipes
 	signal(SIGPIPE, SIG_IGN);
 
-	unsigned nthreads = 1;
-	unsigned port = 8800;
-	unsigned entries = 256;
-	unsigned backlog = 10;
-	bool attach = false;
-	bool sqpoll = false;
+	// Default command line arguments
+	unsigned nthreads = 1;      // number of kernel threads
+	unsigned port = 8800;       // which port to listen on
+	unsigned entries = 256;     // number of entries per ring/kernel thread
+	unsigned backlog = 262144;  // backlog argument to listen
+	bool attach = false;        // Whether or not to attach all the rings
+	bool sqpoll = false;        // Whether or not to use SQ Polling
 
 	//===================
-	// Arguments
+	// Arguments Parsing
 	int c;
 	while ((c = getopt (argc, argv, "t:p:e:b:aS")) != -1) {
@@ -434,4 +566,5 @@
 	//===================
 	// End FD
+	// Create a single event fd to notify the kernel threads when the server shutsdown
 	int efd = eventfd(0, EFD_SEMAPHORE);
 	if (efd < 0) {
@@ -442,4 +575,5 @@
 	//===================
 	// Open Socket
+	// Listen on specified port
 	std::cout << getpid() << " : Listening on port " << port << std::endl;
 	int server_fd = socket(AF_INET, SOCK_STREAM, 0);
@@ -457,4 +591,6 @@
 	address.sin_port = htons( port );
 
+	// In case the port is already in use, don't just return an error
+	// Linux is very slow at reclaiming port so just retry regularly
 	int waited = 0;
 	while(true) {
@@ -462,14 +598,17 @@
 		if(ret < 0) {
 			if(errno == EADDRINUSE) {
+				// Port is in used let's retry later
 				if(waited == 0) {
 					std::cerr << "Waiting for port" << std::endl;
 				} else {
+					// To be cure, print how long we have been waiting
 					std::cerr << "\r" << waited;
 					std::cerr.flush();
 				}
 				waited ++;
-				usleep( 1000000 );
+				usleep( 1000000 ); // Wait and retry
 				continue;
 			}
+			// Some other error occured, this is a real error
 			std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl;
 			exit(EXIT_FAILURE);
@@ -492,25 +631,39 @@
 	std::cout << std::endl;
 
+	// Create the desired number of kernel-threads and for each
+	// create a ring. Create the rings in the main so we can attach them
+	// Since the rings are all in a dense VLA, aligned them so we don't get false sharing
+	// it's unlikely but better safe than sorry
+	struct __attribute__((aligned(128))) aligned_ring {
+		struct io_uring storage;
+	};
 	aligned_ring thrd_rings[nthreads];
 	pthread_t    thrd_hdls[nthreads];
 	options_t    thrd_opts[nthreads];
+	bool no_drops  = true;
 	bool fast_poll = true;
 	bool nfix_sqpl = true;
 	for(unsigned i = 0; i < nthreads; i++) {
 		struct io_uring_params p = { };
-		if(sqpoll) {
+
+		if(sqpoll) { // If sqpoll is on, add the flag
 			p.flags |= IORING_SETUP_SQPOLL;
 			p.sq_thread_idle = 100;
 		}
 
-		if (attach && i != 0) {
+		if (attach && i != 0) { // If attach is on, add the flag, except for the first ring
 			p.flags |= IORING_SETUP_ATTACH_WQ;
 			p.wq_fd = thrd_rings[0].storage.ring_fd;
 		}
+
+		// Create the ring
 		io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p);
 
+		// Check if some of the note-worthy features are there
+		if(0 == (p.features & IORING_FEAT_NODROP         )) { no_drops  = false; }
 		if(0 == (p.features & IORING_FEAT_FAST_POLL      )) { fast_poll = false; }
 		if(0 == (p.features & IORING_FEAT_SQPOLL_NONFIXED)) { nfix_sqpl = false; }
 
+		// Write the socket options we want to the options we pass to the threads
 		thrd_opts[i].acpt.sockfd  = server_fd;
 		thrd_opts[i].acpt.addr    = (struct sockaddr *)&address;
@@ -527,4 +680,6 @@
 	}
 
+	// Tell the user if the features are present
+	if( no_drops ) std::cout << "No Drop Present" << std::endl;
 	if( fast_poll) std::cout << "Fast Poll Present" << std::endl;
 	if(!nfix_sqpl) std::cout << "Non-Fixed SQ Poll not Present" << std::endl;
@@ -537,4 +692,5 @@
 		int ret;
 		do {
+			// Wait for a Ctrl-D to close the server
 			ret = read(STDIN_FILENO, buffer, 128);
 			if(ret < 0) {
@@ -553,4 +709,5 @@
 
 	//===================
+	// Use eventfd_write to tell the threads we are closing
 	(std::cout << "Sending Shutdown to Threads... ").flush();
 	ret = eventfd_write(efd, nthreads);
@@ -562,4 +719,5 @@
 
 	//===================
+	// Join all the threads and close the rings
 	(std::cout << "Stopping Threads Done... ").flush();
 	for(unsigned i = 0; i < nthreads; i++) {
@@ -576,4 +734,5 @@
 
 	//===================
+	// Close the sockets
 	(std::cout << "Closing Socket... ").flush();
 	ret = shutdown( server_fd, SHUT_RD );
@@ -588,4 +747,13 @@
 		exit(EXIT_FAILURE);
 	}
-	std::cout << "done" << std::endl;
+	std::cout << "done" << std::endl << std::endl;
+
+	// Print stats and exit
+	std::cout << "Errors: " << global_stats.errors.conns << "c, (" << global_stats.errors.requests.pipes << "p, " << global_stats.errors.requests.reset << "r, " << global_stats.errors.requests.other << "o" << ")r, (" << global_stats.errors.answers.pipes << "p, " << global_stats.errors.answers.reset << "r, " << global_stats.errors.answers.other << "o" << ")a" << std::endl;
+	std::cout << "Completions: " << global_stats.completions.conns << "c, " << global_stats.completions.reads << "r, " << global_stats.completions.writes << "w" << std::endl;
+	std::cout << "Full Writes: " << global_stats.completions.full_writes << std::endl;
+	std::cout << "Max FD: " << max_fd << std::endl;
+	std::cout << "Successful connections: " << global_stats.conns.used << std::endl;
+	std::cout << "Accepts on non-zeros: " << global_stats.recycle_errors << std::endl;
+	std::cout << "Leaked conn objects: " << global_stats.conns.current << std::endl;
 }
