#include #include #include #include #include #include #include typedef enum { EVENT_END, EVENT_ACCEPT, EVENT_REQUEST, EVENT_ANSWER } event_t; struct request_t { event_t type; int fd; size_t length; char buffer[0]; static struct request_t * create(event_t type, size_t extra) { auto ret = (struct request_t *)malloc(sizeof(struct request_t) + extra); ret->type = type; ret->length = extra; return ret; } static struct request_t * create(event_t type) { return create(type, 0); } }; struct options_t { struct { int sockfd; struct sockaddr *addr; socklen_t *addrlen; int flags; } acpt; int endfd; unsigned entries; }; //========================================================= static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) { struct io_uring_sqe * sqe = io_uring_get_sqe(ring); io_uring_prep_read(sqe, fd, buffer, len, 0); io_uring_sqe_set_data(sqe, request_t::create(EVENT_END)); io_uring_submit(ring); } static void ring_accept(struct io_uring * ring, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) { auto req = request_t::create(EVENT_ACCEPT); struct io_uring_sqe * sqe = io_uring_get_sqe(ring); io_uring_prep_accept(sqe, sockfd, addr, addrlen, flags); io_uring_sqe_set_data(sqe, req); io_uring_submit(ring); std::cout << "Submitted accept: " << req << std::endl; } static void ring_request(struct io_uring * ring, int fd) { size_t size = 1024; auto req = request_t::create(EVENT_REQUEST, size); req->fd = fd; struct io_uring_sqe * sqe = io_uring_get_sqe(ring); io_uring_prep_read(sqe, fd, req->buffer, size, 0); io_uring_sqe_set_data(sqe, req); io_uring_submit(ring); std::cout << "Submitted request: " << req << " (" << (void*)req->buffer << ")"<fd = fd; const char * fmt = http_msgs[code]; const char * date = ""; size = snprintf(req->buffer, size, fmt, date, size); struct io_uring_sqe * sqe = io_uring_get_sqe(ring); io_uring_prep_write(sqe, fd, req->buffer, size, 0); io_uring_sqe_set_data(sqe, req); io_uring_submit(ring); std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<fd = fd; const char * fmt = http_msgs[OK200]; const char * date = ""; size_t len = snprintf(req->buffer, size, fmt, date, ans.size(), ans.c_str()); req->length = len; struct io_uring_sqe * sqe = io_uring_get_sqe(ring); io_uring_prep_write(sqe, fd, req->buffer, len, 0); io_uring_sqe_set_data(sqe, req); io_uring_submit(ring); std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<fd); free(in); return; default: std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl; exit(EXIT_FAILURE); } } if(res == 0) { close(in->fd); free(in); return; } char * it = in->buffer; if( !strstr( it, "\r\n\r\n" ) ) { std::cout << "Incomplete request" << std::endl; close(in->fd); free(in); return; } it = in->buffer; const std::string reply = "Hello, World!\n"; int ret = memcmp(it, "GET ", 4); if( ret != 0 ) { ring_answer(ring, in->fd, E400); goto NEXT; } it += 4; ret = memcmp(it, "/plaintext", 10); if( ret != 0 ) { ring_answer(ring, in->fd, E404); goto NEXT; } ring_answer(ring, in->fd, reply); NEXT: ring_request(ring, in->fd); return; } static void handle_answer(struct io_uring * ring, struct request_t * in, int res) { if( res < 0 ) { int err = -res; switch(err) { case EPIPE: case ECONNRESET: close(in->fd); free(in); return; default: std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl; exit(EXIT_FAILURE); } } if( res >= in->length ) { free(in); return; } struct io_uring_sqe * sqe = io_uring_get_sqe(ring); io_uring_prep_write(sqe, in->fd, in->buffer + res, in->length - res, 0); io_uring_sqe_set_data(sqe, in); io_uring_submit(ring); std::cout << "Re-Submitted request: " << in << " (" << (void*)in->buffer << ")"<fd); } //========================================================= void * proc_loop(void * arg) { const struct options_t & opt = *(const struct options_t *)arg; struct io_uring ring_storage; struct io_uring * ring = &ring_storage; io_uring_queue_init(opt.entries, ring, 0); char endfd_buf[8]; ring_end(ring, opt.endfd, endfd_buf, 8); ring_accept(ring, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags); bool done = false; while(!done) { struct io_uring_cqe *cqe; int ret = io_uring_wait_cqe(ring, &cqe); if (ret < 0) { fprintf( stderr, "io_uring error: (%d) %s\n", (int)-ret, strerror(-ret) ); exit(EXIT_FAILURE); } auto req = (struct request_t *)cqe->user_data; std::cout << req << " completed with " << cqe->res << std::endl; switch(req->type) { case EVENT_END: done = true; break; case EVENT_ACCEPT: handle_new_conn(ring, cqe->res); free(req); ring_accept(ring, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags); break; case EVENT_REQUEST: handle_request(ring, req, cqe->res); break; case EVENT_ANSWER: handle_answer(ring, req, cqe->res); break; } io_uring_cqe_seen(ring, cqe); } io_uring_queue_exit(ring); return NULL; } //========================================================= #include extern "C" { #include #include #include #include } int main() { signal(SIGPIPE, SIG_IGN); unsigned nthreads = 1; unsigned port = 8800; unsigned entries = 256; unsigned backlog = 10; //=================== // End FD int efd = eventfd(0, EFD_SEMAPHORE); if (efd < 0) { std::cerr << "eventfd error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } //=================== // Open Socket std::cout << getpid() << " : Listening on port " << port << std::endl; int server_fd = socket(AF_INET, SOCK_STREAM, 0); if(server_fd < 0) { std::cerr << "socket error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } int ret = 0; struct sockaddr_in address; int addrlen = sizeof(address); memset( (char *)&address, '\0', addrlen ); address.sin_family = AF_INET; address.sin_addr.s_addr = htonl(INADDR_ANY); address.sin_port = htons( port ); int waited = 0; while(true) { ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) ); if(ret < 0) { if(errno == EADDRINUSE) { if(waited == 0) { std::cerr << "Waiting for port" << std::endl; } else { std::cerr << "\r" << waited; std::cerr.flush(); } waited ++; usleep( 1000000 ); continue; } std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } break; } ret = listen( server_fd, backlog ); if(ret < 0) { std::cerr << "listen error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } //=================== // Run Server Threads std::cout << "Starting " << nthreads << " Threads" << std::endl; pthread_t thrd_hdls[nthreads]; options_t thrd_opts[nthreads]; for(unsigned i = 0; i < nthreads; i++) { thrd_opts[i].acpt.sockfd = server_fd; thrd_opts[i].acpt.addr = (struct sockaddr *)&address; thrd_opts[i].acpt.addrlen = (socklen_t*)&addrlen; thrd_opts[i].acpt.flags = 0; thrd_opts[i].endfd = efd; thrd_opts[i].entries = entries; int ret = pthread_create(&thrd_hdls[i], nullptr, proc_loop, &thrd_opts[i]); if (ret < 0) { std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } } //=================== // Server Started std::cout << "Server Started" << std::endl; { char buffer[128]; int ret; do { ret = read(STDIN_FILENO, buffer, 128); if(ret < 0) { std::cerr << "main read error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } else if(ret > 0) { std::cout << "User inputed '"; std::cout.write(buffer, ret); std::cout << "'" << std::endl; } } while(ret != 0); std::cout << "Shutdown received" << std::endl; } //=================== (std::cout << "Sending Shutdown to Threads... ").flush(); ret = eventfd_write(efd, nthreads); if (ret < 0) { std::cerr << "eventfd close error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } std::cout << "done" << std::endl; //=================== (std::cout << "Stopping Threads Done... ").flush(); for(unsigned i = 0; i < nthreads; i++) { void * retval; int ret = pthread_join(thrd_hdls[i], &retval); if (ret < 0) { std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } } std::cout << "done" << std::endl; //=================== (std::cout << "Closing Socket... ").flush(); ret = shutdown( server_fd, SHUT_RD ); if( ret < 0 ) { std::cerr << "shutdown socket error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } ret = close(server_fd); if (ret < 0) { std::cerr << "close socket error: (" << errno << ") " << strerror(errno) << std::endl; exit(EXIT_FAILURE); } std::cout << "done" << std::endl; }