source: benchmark/io/http/http_ring.cpp @ efdfdee

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since efdfdee was efdfdee, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Prototype webserver using C and io_uring directly.

  • Property mode set to 100644
File size: 11.2 KB
Line 
1#include <cstdio>
2#include <cstdlib>
3#include <cstring>
4
5#include <iostream>
6
7#include <signal.h>
8#include <unistd.h>
9#include <liburing.h>
10
11typedef enum {
12        EVENT_END,
13        EVENT_ACCEPT,
14        EVENT_REQUEST,
15        EVENT_ANSWER
16} event_t;
17
18struct request_t {
19        event_t type;
20        int fd;
21        size_t length;
22        char buffer[0];
23
24        static struct request_t * create(event_t type, size_t extra) {
25                auto ret = (struct request_t *)malloc(sizeof(struct request_t) + extra);
26                ret->type = type;
27                ret->length = extra;
28                return ret;
29        }
30
31        static struct request_t * create(event_t type) {
32                return create(type, 0);
33        }
34};
35
36struct options_t {
37        struct {
38                int sockfd;
39                struct sockaddr *addr;
40                socklen_t *addrlen;
41                int flags;
42        } acpt;
43
44        int endfd;
45        unsigned entries;
46};
47
48//=========================================================
49static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) {
50        struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
51        io_uring_prep_read(sqe, fd, buffer, len, 0);
52        io_uring_sqe_set_data(sqe, request_t::create(EVENT_END));
53        io_uring_submit(ring);
54}
55
56static void ring_accept(struct io_uring * ring, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
57        auto req = request_t::create(EVENT_ACCEPT);
58        struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
59        io_uring_prep_accept(sqe, sockfd, addr, addrlen, flags);
60        io_uring_sqe_set_data(sqe, req);
61        io_uring_submit(ring);
62        std::cout << "Submitted accept: " << req << std::endl;
63}
64
65static void ring_request(struct io_uring * ring, int fd) {
66        size_t size = 1024;
67        auto req = request_t::create(EVENT_REQUEST, size);
68        req->fd = fd;
69
70        struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
71        io_uring_prep_read(sqe, fd, req->buffer, size, 0);
72        io_uring_sqe_set_data(sqe, req);
73        io_uring_submit(ring);
74        std::cout << "Submitted request: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
75}
76
77//=========================================================
78enum HttpCode {
79        OK200 = 0,
80        E400,
81        E404,
82        E405,
83        E408,
84        E413,
85        E414,
86        KNOWN_CODES
87};
88
89const char * http_msgs[] = {
90        "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: %zu \n\n%s",
91        "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
92        "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
93        "HTTP/1.1 405 Method Not Allowed\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
94        "HTTP/1.1 408 Request Timeout\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
95        "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
96        "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
97};
98
99static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0])));
100
101const int http_codes[] = {
102        200,
103        400,
104        404,
105        405,
106        408,
107        413,
108        414,
109};
110
111static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0])));
112
113int code_val(HttpCode code) {
114        return http_codes[code];
115}
116
117static void ring_answer(struct io_uring * ring, int fd, HttpCode code) {
118        size_t size = 256;
119        auto req = request_t::create(EVENT_ANSWER, size);
120        req->fd = fd;
121
122        const char * fmt = http_msgs[code];
123        const char * date = "";
124        size = snprintf(req->buffer, size, fmt, date, size);
125
126        struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
127        io_uring_prep_write(sqe, fd, req->buffer, size, 0);
128        io_uring_sqe_set_data(sqe, req);
129        io_uring_submit(ring);
130        std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
131}
132
133static void ring_answer(struct io_uring * ring, int fd, const std::string & ans) {
134        size_t size = 256;
135        auto req = request_t::create(EVENT_ANSWER, size);
136        req->fd = fd;
137
138        const char * fmt = http_msgs[OK200];
139        const char * date = "";
140        size_t len = snprintf(req->buffer, size, fmt, date, ans.size(), ans.c_str());
141        req->length = len;
142
143        struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
144        io_uring_prep_write(sqe, fd, req->buffer, len, 0);
145        io_uring_sqe_set_data(sqe, req);
146        io_uring_submit(ring);
147        std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
148}
149
150//=========================================================
151static void handle_new_conn(struct io_uring * ring, int fd) {
152        if( fd < 0 ) {
153                int err = -fd;
154                if( err == ECONNABORTED ) return;
155                std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
156                exit(EXIT_FAILURE);
157        }
158
159        ring_request(ring, fd);
160}
161
162static void handle_request(struct io_uring * ring, struct request_t * in, int res) {
163        if( res < 0 ) {
164                int err = -res;
165                switch(err) {
166                        case EPIPE:
167                        case ECONNRESET:
168                                close(in->fd);
169                                free(in);
170                                return;
171                        default:
172                                std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
173                                exit(EXIT_FAILURE);
174                }
175        }
176
177        if(res == 0) {
178                close(in->fd);
179                free(in);
180                return;
181        }
182
183        char * it = in->buffer;
184        if( !strstr( it, "\r\n\r\n" ) ) {
185                std::cout << "Incomplete request" << std::endl;
186                close(in->fd);
187                free(in);
188                return;
189        }
190
191        it = in->buffer;
192        const std::string reply = "Hello, World!\n";
193        int ret = memcmp(it, "GET ", 4);
194        if( ret != 0 ) {
195                ring_answer(ring, in->fd, E400);
196                goto NEXT;
197        }
198
199        it += 4;
200        ret = memcmp(it, "/plaintext", 10);
201        if( ret != 0 ) {
202                ring_answer(ring, in->fd, E404);
203                goto NEXT;
204        }
205
206        ring_answer(ring, in->fd, reply);
207
208        NEXT:
209                ring_request(ring, in->fd);
210                return;
211}
212
213static void handle_answer(struct io_uring * ring, struct request_t * in, int res) {
214        if( res < 0 ) {
215                int err = -res;
216                switch(err) {
217                        case EPIPE:
218                        case ECONNRESET:
219                                close(in->fd);
220                                free(in);
221                                return;
222                        default:
223                                std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
224                                exit(EXIT_FAILURE);
225                }
226        }
227
228        if( res >= in->length ) {
229                free(in);
230                return;
231        }
232
233        struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
234        io_uring_prep_write(sqe, in->fd, in->buffer + res, in->length - res, 0);
235        io_uring_sqe_set_data(sqe, in);
236        io_uring_submit(ring);
237        std::cout << "Re-Submitted request: " << in << " (" << (void*)in->buffer << ")"<<std::endl;
238
239        ring_request(ring, in->fd);
240}
241
242//=========================================================
243void * proc_loop(void * arg) {
244        const struct options_t & opt = *(const struct options_t *)arg;
245
246        struct io_uring ring_storage;
247        struct io_uring * ring = &ring_storage;
248        io_uring_queue_init(opt.entries, ring, 0);
249
250        char endfd_buf[8];
251        ring_end(ring, opt.endfd, endfd_buf, 8);
252
253        ring_accept(ring, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
254
255        bool done = false;
256        while(!done) {
257                struct io_uring_cqe *cqe;
258                int ret = io_uring_wait_cqe(ring, &cqe);
259                if (ret < 0) {
260                        fprintf( stderr, "io_uring error: (%d) %s\n", (int)-ret, strerror(-ret) );
261                        exit(EXIT_FAILURE);
262                }
263
264                auto req = (struct request_t *)cqe->user_data;
265                std::cout << req << " completed with " << cqe->res << std::endl;
266
267                switch(req->type) {
268                        case EVENT_END:
269                                done = true;
270                                break;
271                        case EVENT_ACCEPT:
272                                handle_new_conn(ring, cqe->res);
273                                free(req);
274                                ring_accept(ring, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
275                                break;
276                        case EVENT_REQUEST:
277                                handle_request(ring, req, cqe->res);
278                                break;
279                        case EVENT_ANSWER:
280                                handle_answer(ring, req, cqe->res);
281                                break;
282                }
283
284                io_uring_cqe_seen(ring, cqe);
285        }
286
287        io_uring_queue_exit(ring);
288
289        return NULL;
290}
291
292//=========================================================
293#include <pthread.h>
294extern "C" {
295        #include <signal.h>
296        #include <sys/eventfd.h>
297        #include <sys/socket.h>
298        #include <netinet/in.h>
299}
300
301int main() {
302        signal(SIGPIPE, SIG_IGN);
303
304        unsigned nthreads = 1;
305        unsigned port = 8800;
306        unsigned entries = 256;
307        unsigned backlog = 10;
308
309        //===================
310        // End FD
311        int efd = eventfd(0, EFD_SEMAPHORE);
312        if (efd < 0) {
313                std::cerr << "eventfd error: (" << errno << ") " << strerror(errno) << std::endl;
314                exit(EXIT_FAILURE);
315        }
316
317        //===================
318        // Open Socket
319        std::cout << getpid() << " : Listening on port " << port << std::endl;
320        int server_fd = socket(AF_INET, SOCK_STREAM, 0);
321        if(server_fd < 0) {
322                std::cerr << "socket error: (" << errno << ") " << strerror(errno) << std::endl;
323                exit(EXIT_FAILURE);
324        }
325
326        int ret = 0;
327        struct sockaddr_in address;
328        int addrlen = sizeof(address);
329        memset( (char *)&address, '\0', addrlen );
330        address.sin_family = AF_INET;
331        address.sin_addr.s_addr = htonl(INADDR_ANY);
332        address.sin_port = htons( port );
333
334        int waited = 0;
335        while(true) {
336                ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) );
337                if(ret < 0) {
338                        if(errno == EADDRINUSE) {
339                                if(waited == 0) {
340                                        std::cerr << "Waiting for port" << std::endl;
341                                } else {
342                                        std::cerr << "\r" << waited;
343                                        std::cerr.flush();
344                                }
345                                waited ++;
346                                usleep( 1000000 );
347                                continue;
348                        }
349                        std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl;
350                        exit(EXIT_FAILURE);
351                }
352                break;
353        }
354
355        ret = listen( server_fd, backlog );
356        if(ret < 0) {
357                std::cerr << "listen error: (" << errno << ") " << strerror(errno) << std::endl;
358                exit(EXIT_FAILURE);
359        }
360
361        //===================
362        // Run Server Threads
363        std::cout << "Starting " << nthreads << " Threads" << std::endl;
364        pthread_t thrd_hdls[nthreads];
365        options_t thrd_opts[nthreads];
366        for(unsigned i = 0; i < nthreads; i++) {
367                thrd_opts[i].acpt.sockfd  = server_fd;
368                thrd_opts[i].acpt.addr    = (struct sockaddr *)&address;
369                thrd_opts[i].acpt.addrlen = (socklen_t*)&addrlen;
370                thrd_opts[i].acpt.flags   = 0;
371                thrd_opts[i].endfd   = efd;
372                thrd_opts[i].entries = entries;
373
374                int ret = pthread_create(&thrd_hdls[i], nullptr, proc_loop, &thrd_opts[i]);
375                if (ret < 0) {
376                        std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
377                        exit(EXIT_FAILURE);
378                }
379        }
380
381        //===================
382        // Server Started
383        std::cout << "Server Started" << std::endl;
384        {
385                char buffer[128];
386                int ret;
387                do {
388                        ret = read(STDIN_FILENO, buffer, 128);
389                        if(ret < 0) {
390                                std::cerr << "main read error: (" << errno << ") " << strerror(errno) << std::endl;
391                                exit(EXIT_FAILURE);
392                        }
393                        else if(ret > 0) {
394                                std::cout << "User inputed '";
395                                std::cout.write(buffer, ret);
396                                std::cout << "'" << std::endl;
397                        }
398                } while(ret != 0);
399
400                std::cout << "Shutdown received" << std::endl;
401        }
402
403        //===================
404        (std::cout << "Sending Shutdown to Threads... ").flush();
405        ret = eventfd_write(efd, nthreads);
406        if (ret < 0) {
407                std::cerr << "eventfd close error: (" << errno << ") " << strerror(errno) << std::endl;
408                exit(EXIT_FAILURE);
409        }
410        std::cout << "done" << std::endl;
411
412        //===================
413        (std::cout << "Stopping Threads Done... ").flush();
414        for(unsigned i = 0; i < nthreads; i++) {
415                void * retval;
416                int ret = pthread_join(thrd_hdls[i], &retval);
417                if (ret < 0) {
418                        std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
419                        exit(EXIT_FAILURE);
420                }
421        }
422        std::cout << "done" << std::endl;
423
424        //===================
425        (std::cout << "Closing Socket... ").flush();
426        ret = shutdown( server_fd, SHUT_RD );
427        if( ret < 0 ) {
428                std::cerr << "shutdown socket error: (" << errno << ") " << strerror(errno) << std::endl;
429                exit(EXIT_FAILURE);
430        }
431
432        ret = close(server_fd);
433        if (ret < 0) {
434                std::cerr << "close socket error: (" << errno << ") " << strerror(errno) << std::endl;
435                exit(EXIT_FAILURE);
436        }
437        std::cout << "done" << std::endl;
438}
Note: See TracBrowser for help on using the repository browser.