source: benchmark/io/http/http_ring.cpp@ efdfdee

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since efdfdee was efdfdee, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Prototype webserver using C and io_uring directly.

  • Property mode set to 100644
File size: 11.2 KB
Line 
1#include <cstdio>
2#include <cstdlib>
3#include <cstring>
4
5#include <iostream>
6
7#include <signal.h>
8#include <unistd.h>
9#include <liburing.h>
10
11typedef enum {
12 EVENT_END,
13 EVENT_ACCEPT,
14 EVENT_REQUEST,
15 EVENT_ANSWER
16} event_t;
17
18struct request_t {
19 event_t type;
20 int fd;
21 size_t length;
22 char buffer[0];
23
24 static struct request_t * create(event_t type, size_t extra) {
25 auto ret = (struct request_t *)malloc(sizeof(struct request_t) + extra);
26 ret->type = type;
27 ret->length = extra;
28 return ret;
29 }
30
31 static struct request_t * create(event_t type) {
32 return create(type, 0);
33 }
34};
35
36struct options_t {
37 struct {
38 int sockfd;
39 struct sockaddr *addr;
40 socklen_t *addrlen;
41 int flags;
42 } acpt;
43
44 int endfd;
45 unsigned entries;
46};
47
48//=========================================================
49static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) {
50 struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
51 io_uring_prep_read(sqe, fd, buffer, len, 0);
52 io_uring_sqe_set_data(sqe, request_t::create(EVENT_END));
53 io_uring_submit(ring);
54}
55
56static void ring_accept(struct io_uring * ring, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
57 auto req = request_t::create(EVENT_ACCEPT);
58 struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
59 io_uring_prep_accept(sqe, sockfd, addr, addrlen, flags);
60 io_uring_sqe_set_data(sqe, req);
61 io_uring_submit(ring);
62 std::cout << "Submitted accept: " << req << std::endl;
63}
64
65static void ring_request(struct io_uring * ring, int fd) {
66 size_t size = 1024;
67 auto req = request_t::create(EVENT_REQUEST, size);
68 req->fd = fd;
69
70 struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
71 io_uring_prep_read(sqe, fd, req->buffer, size, 0);
72 io_uring_sqe_set_data(sqe, req);
73 io_uring_submit(ring);
74 std::cout << "Submitted request: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
75}
76
77//=========================================================
78enum HttpCode {
79 OK200 = 0,
80 E400,
81 E404,
82 E405,
83 E408,
84 E413,
85 E414,
86 KNOWN_CODES
87};
88
89const char * http_msgs[] = {
90 "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: %zu \n\n%s",
91 "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
92 "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
93 "HTTP/1.1 405 Method Not Allowed\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
94 "HTTP/1.1 408 Request Timeout\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
95 "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
96 "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
97};
98
99static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0])));
100
101const int http_codes[] = {
102 200,
103 400,
104 404,
105 405,
106 408,
107 413,
108 414,
109};
110
111static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0])));
112
113int code_val(HttpCode code) {
114 return http_codes[code];
115}
116
117static void ring_answer(struct io_uring * ring, int fd, HttpCode code) {
118 size_t size = 256;
119 auto req = request_t::create(EVENT_ANSWER, size);
120 req->fd = fd;
121
122 const char * fmt = http_msgs[code];
123 const char * date = "";
124 size = snprintf(req->buffer, size, fmt, date, size);
125
126 struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
127 io_uring_prep_write(sqe, fd, req->buffer, size, 0);
128 io_uring_sqe_set_data(sqe, req);
129 io_uring_submit(ring);
130 std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
131}
132
133static void ring_answer(struct io_uring * ring, int fd, const std::string & ans) {
134 size_t size = 256;
135 auto req = request_t::create(EVENT_ANSWER, size);
136 req->fd = fd;
137
138 const char * fmt = http_msgs[OK200];
139 const char * date = "";
140 size_t len = snprintf(req->buffer, size, fmt, date, ans.size(), ans.c_str());
141 req->length = len;
142
143 struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
144 io_uring_prep_write(sqe, fd, req->buffer, len, 0);
145 io_uring_sqe_set_data(sqe, req);
146 io_uring_submit(ring);
147 std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
148}
149
150//=========================================================
151static void handle_new_conn(struct io_uring * ring, int fd) {
152 if( fd < 0 ) {
153 int err = -fd;
154 if( err == ECONNABORTED ) return;
155 std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
156 exit(EXIT_FAILURE);
157 }
158
159 ring_request(ring, fd);
160}
161
162static void handle_request(struct io_uring * ring, struct request_t * in, int res) {
163 if( res < 0 ) {
164 int err = -res;
165 switch(err) {
166 case EPIPE:
167 case ECONNRESET:
168 close(in->fd);
169 free(in);
170 return;
171 default:
172 std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
173 exit(EXIT_FAILURE);
174 }
175 }
176
177 if(res == 0) {
178 close(in->fd);
179 free(in);
180 return;
181 }
182
183 char * it = in->buffer;
184 if( !strstr( it, "\r\n\r\n" ) ) {
185 std::cout << "Incomplete request" << std::endl;
186 close(in->fd);
187 free(in);
188 return;
189 }
190
191 it = in->buffer;
192 const std::string reply = "Hello, World!\n";
193 int ret = memcmp(it, "GET ", 4);
194 if( ret != 0 ) {
195 ring_answer(ring, in->fd, E400);
196 goto NEXT;
197 }
198
199 it += 4;
200 ret = memcmp(it, "/plaintext", 10);
201 if( ret != 0 ) {
202 ring_answer(ring, in->fd, E404);
203 goto NEXT;
204 }
205
206 ring_answer(ring, in->fd, reply);
207
208 NEXT:
209 ring_request(ring, in->fd);
210 return;
211}
212
213static void handle_answer(struct io_uring * ring, struct request_t * in, int res) {
214 if( res < 0 ) {
215 int err = -res;
216 switch(err) {
217 case EPIPE:
218 case ECONNRESET:
219 close(in->fd);
220 free(in);
221 return;
222 default:
223 std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
224 exit(EXIT_FAILURE);
225 }
226 }
227
228 if( res >= in->length ) {
229 free(in);
230 return;
231 }
232
233 struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
234 io_uring_prep_write(sqe, in->fd, in->buffer + res, in->length - res, 0);
235 io_uring_sqe_set_data(sqe, in);
236 io_uring_submit(ring);
237 std::cout << "Re-Submitted request: " << in << " (" << (void*)in->buffer << ")"<<std::endl;
238
239 ring_request(ring, in->fd);
240}
241
242//=========================================================
243void * proc_loop(void * arg) {
244 const struct options_t & opt = *(const struct options_t *)arg;
245
246 struct io_uring ring_storage;
247 struct io_uring * ring = &ring_storage;
248 io_uring_queue_init(opt.entries, ring, 0);
249
250 char endfd_buf[8];
251 ring_end(ring, opt.endfd, endfd_buf, 8);
252
253 ring_accept(ring, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
254
255 bool done = false;
256 while(!done) {
257 struct io_uring_cqe *cqe;
258 int ret = io_uring_wait_cqe(ring, &cqe);
259 if (ret < 0) {
260 fprintf( stderr, "io_uring error: (%d) %s\n", (int)-ret, strerror(-ret) );
261 exit(EXIT_FAILURE);
262 }
263
264 auto req = (struct request_t *)cqe->user_data;
265 std::cout << req << " completed with " << cqe->res << std::endl;
266
267 switch(req->type) {
268 case EVENT_END:
269 done = true;
270 break;
271 case EVENT_ACCEPT:
272 handle_new_conn(ring, cqe->res);
273 free(req);
274 ring_accept(ring, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
275 break;
276 case EVENT_REQUEST:
277 handle_request(ring, req, cqe->res);
278 break;
279 case EVENT_ANSWER:
280 handle_answer(ring, req, cqe->res);
281 break;
282 }
283
284 io_uring_cqe_seen(ring, cqe);
285 }
286
287 io_uring_queue_exit(ring);
288
289 return NULL;
290}
291
292//=========================================================
293#include <pthread.h>
294extern "C" {
295 #include <signal.h>
296 #include <sys/eventfd.h>
297 #include <sys/socket.h>
298 #include <netinet/in.h>
299}
300
301int main() {
302 signal(SIGPIPE, SIG_IGN);
303
304 unsigned nthreads = 1;
305 unsigned port = 8800;
306 unsigned entries = 256;
307 unsigned backlog = 10;
308
309 //===================
310 // End FD
311 int efd = eventfd(0, EFD_SEMAPHORE);
312 if (efd < 0) {
313 std::cerr << "eventfd error: (" << errno << ") " << strerror(errno) << std::endl;
314 exit(EXIT_FAILURE);
315 }
316
317 //===================
318 // Open Socket
319 std::cout << getpid() << " : Listening on port " << port << std::endl;
320 int server_fd = socket(AF_INET, SOCK_STREAM, 0);
321 if(server_fd < 0) {
322 std::cerr << "socket error: (" << errno << ") " << strerror(errno) << std::endl;
323 exit(EXIT_FAILURE);
324 }
325
326 int ret = 0;
327 struct sockaddr_in address;
328 int addrlen = sizeof(address);
329 memset( (char *)&address, '\0', addrlen );
330 address.sin_family = AF_INET;
331 address.sin_addr.s_addr = htonl(INADDR_ANY);
332 address.sin_port = htons( port );
333
334 int waited = 0;
335 while(true) {
336 ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) );
337 if(ret < 0) {
338 if(errno == EADDRINUSE) {
339 if(waited == 0) {
340 std::cerr << "Waiting for port" << std::endl;
341 } else {
342 std::cerr << "\r" << waited;
343 std::cerr.flush();
344 }
345 waited ++;
346 usleep( 1000000 );
347 continue;
348 }
349 std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl;
350 exit(EXIT_FAILURE);
351 }
352 break;
353 }
354
355 ret = listen( server_fd, backlog );
356 if(ret < 0) {
357 std::cerr << "listen error: (" << errno << ") " << strerror(errno) << std::endl;
358 exit(EXIT_FAILURE);
359 }
360
361 //===================
362 // Run Server Threads
363 std::cout << "Starting " << nthreads << " Threads" << std::endl;
364 pthread_t thrd_hdls[nthreads];
365 options_t thrd_opts[nthreads];
366 for(unsigned i = 0; i < nthreads; i++) {
367 thrd_opts[i].acpt.sockfd = server_fd;
368 thrd_opts[i].acpt.addr = (struct sockaddr *)&address;
369 thrd_opts[i].acpt.addrlen = (socklen_t*)&addrlen;
370 thrd_opts[i].acpt.flags = 0;
371 thrd_opts[i].endfd = efd;
372 thrd_opts[i].entries = entries;
373
374 int ret = pthread_create(&thrd_hdls[i], nullptr, proc_loop, &thrd_opts[i]);
375 if (ret < 0) {
376 std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
377 exit(EXIT_FAILURE);
378 }
379 }
380
381 //===================
382 // Server Started
383 std::cout << "Server Started" << std::endl;
384 {
385 char buffer[128];
386 int ret;
387 do {
388 ret = read(STDIN_FILENO, buffer, 128);
389 if(ret < 0) {
390 std::cerr << "main read error: (" << errno << ") " << strerror(errno) << std::endl;
391 exit(EXIT_FAILURE);
392 }
393 else if(ret > 0) {
394 std::cout << "User inputed '";
395 std::cout.write(buffer, ret);
396 std::cout << "'" << std::endl;
397 }
398 } while(ret != 0);
399
400 std::cout << "Shutdown received" << std::endl;
401 }
402
403 //===================
404 (std::cout << "Sending Shutdown to Threads... ").flush();
405 ret = eventfd_write(efd, nthreads);
406 if (ret < 0) {
407 std::cerr << "eventfd close error: (" << errno << ") " << strerror(errno) << std::endl;
408 exit(EXIT_FAILURE);
409 }
410 std::cout << "done" << std::endl;
411
412 //===================
413 (std::cout << "Stopping Threads Done... ").flush();
414 for(unsigned i = 0; i < nthreads; i++) {
415 void * retval;
416 int ret = pthread_join(thrd_hdls[i], &retval);
417 if (ret < 0) {
418 std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
419 exit(EXIT_FAILURE);
420 }
421 }
422 std::cout << "done" << std::endl;
423
424 //===================
425 (std::cout << "Closing Socket... ").flush();
426 ret = shutdown( server_fd, SHUT_RD );
427 if( ret < 0 ) {
428 std::cerr << "shutdown socket error: (" << errno << ") " << strerror(errno) << std::endl;
429 exit(EXIT_FAILURE);
430 }
431
432 ret = close(server_fd);
433 if (ret < 0) {
434 std::cerr << "close socket error: (" << errno << ") " << strerror(errno) << std::endl;
435 exit(EXIT_FAILURE);
436 }
437 std::cout << "done" << std::endl;
438}
Note: See TracBrowser for help on using the repository browser.