source: benchmark/io/http/http_ring.cpp @ c05c58f

arm-ehenumforall-pointer-decayjacob/cs343-translationnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since c05c58f was c05c58f, checked in by Thierry Delisle <tdelisle@…>, 2 years ago

Minor improvements to http_ring

  • Property mode set to 100644
File size: 14.7 KB
Line 
1#include <cstdio>
2#include <cstdlib>
3#include <cstring>
4
5#include <iostream>
6
7#include <signal.h>
8#include <unistd.h>
9#include <liburing.h>
10
11typedef enum {
12        EVENT_END,
13        EVENT_ACCEPT,
14        EVENT_REQUEST,
15        EVENT_ANSWER
16} event_t;
17
18struct __attribute__((aligned(128))) request_t {
19        event_t type;
20        int fd;
21        size_t length;
22        char * buff;
23        char data[0];
24
25        static struct request_t * create(event_t type, size_t extra) {
26                auto ret = (struct request_t *)malloc(sizeof(struct request_t) + extra);
27                ret->type = type;
28                ret->length = extra;
29                ret->buff = ret->data;
30                return ret;
31        }
32
33        static struct request_t * create(event_t type) {
34                return create(type, 0);
35        }
36};
37
38struct __attribute__((aligned(128))) options_t {
39        struct {
40                int sockfd;
41                struct sockaddr *addr;
42                socklen_t *addrlen;
43                int flags;
44        } acpt;
45
46        int endfd;
47        struct io_uring * ring;
48
49        struct {
50                size_t subs = 0;
51                size_t cnts = 0;
52        } result;
53};
54
55volatile size_t total = 0;
56volatile size_t count = 0;
57
58//=========================================================
59static struct io_uring_sqe * get_sqe(struct io_uring * ring) {
60        struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
61        if(!sqe) {
62                std::cerr << "Insufficient entries in ring" << std::endl;
63                exit(EXIT_FAILURE);
64        }
65        return sqe;
66}
67
68static void submit(struct io_uring * ) {
69        // io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
70        // io_uring_submit(ring);
71}
72
73//=========================================================
74static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) {
75        struct io_uring_sqe * sqe = get_sqe(ring);
76        io_uring_prep_read(sqe, fd, buffer, len, 0);
77        io_uring_sqe_set_data(sqe, request_t::create(EVENT_END));
78        submit(ring);
79}
80
81static void ring_accept(struct io_uring * ring, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
82        auto req = request_t::create(EVENT_ACCEPT);
83        struct io_uring_sqe * sqe = get_sqe(ring);
84        io_uring_prep_accept(sqe, sockfd, addr, addrlen, flags);
85        io_uring_sqe_set_data(sqe, req);
86        submit(ring);
87        // std::cout << "Submitted accept: " << req << std::endl;
88}
89
90static void ring_request(struct io_uring * ring, int fd) {
91        size_t size = 1024;
92        auto req = request_t::create(EVENT_REQUEST, size);
93        req->fd = fd;
94
95        struct io_uring_sqe * sqe = get_sqe(ring);
96        io_uring_prep_read(sqe, fd, req->buff, size, 0);
97        io_uring_sqe_set_data(sqe, req);
98        submit(ring);
99        // std::cout << "Submitted request: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
100}
101
102//=========================================================
103enum HttpCode {
104        OK200 = 0,
105        E400,
106        E404,
107        E405,
108        E408,
109        E413,
110        E414,
111        KNOWN_CODES
112};
113
114const char * http_msgs[] = {
115        "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: %zu \n\n%s",
116        "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
117        "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
118        "HTTP/1.1 405 Method Not Allowed\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
119        "HTTP/1.1 408 Request Timeout\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
120        "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
121        "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
122};
123
124static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0])));
125
126const int http_codes[] = {
127        200,
128        400,
129        404,
130        405,
131        408,
132        413,
133        414,
134};
135
136static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0])));
137
138int code_val(HttpCode code) {
139        return http_codes[code];
140}
141
142static void ring_answer(struct io_uring * ring, int fd, HttpCode code) {
143        size_t size = 256;
144        auto req = request_t::create(EVENT_ANSWER, size);
145        req->fd = fd;
146
147        const char * fmt = http_msgs[code];
148        const char * date = "";
149        size = snprintf(req->buff, size, fmt, date, size);
150
151        struct io_uring_sqe * sqe = get_sqe(ring);
152        io_uring_prep_write(sqe, fd, req->buff, size, 0);
153        io_uring_sqe_set_data(sqe, req);
154        submit(ring);
155        // std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
156}
157
158static void ring_answer(struct io_uring * ring, int fd, const std::string &) {
159        // size_t size = 256;
160        // auto req = request_t::create(EVENT_ANSWER, size);
161        // req->fd = fd;
162
163        // const char * fmt = http_msgs[OK200];
164        // const char * date = "";
165        // size_t len = snprintf(req->buffer, size, fmt, date, ans.size(), ans.c_str());
166        // req->length = len;
167
168        // struct io_uring_sqe * sqe = get_sqe(ring);
169        // io_uring_prep_write(sqe, fd, req->buffer, len, 0);
170        // io_uring_sqe_set_data(sqe, req);
171        // submit(ring);
172        // std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
173
174
175        static const char* RESPONSE = "HTTP/1.1 200 OK\r\n" \
176                                                "Content-Length: 15\r\n" \
177                                                "Content-Type: text/html\r\n" \
178                                                "Connection: keep-alive\r\n" \
179                                                "Server: testserver\r\n" \
180                                                "\r\n" \
181                                                "Hello, World!\r\n";
182
183        static const size_t RLEN = strlen(RESPONSE);
184
185        size_t size = 256;
186        auto req = request_t::create(EVENT_ANSWER, size);
187        req->fd = fd;
188        req->buff = (char*)RESPONSE;
189        req->length = RLEN;
190
191        // const char * fmt = http_msgs[OK200];
192        // const char * date = "";
193        // size_t len = snprintf(req->buffer, size, fmt, date, ans.size(), ans.c_str());
194        // req->length = len;
195
196        struct io_uring_sqe * sqe = get_sqe(ring);
197        io_uring_prep_write(sqe, fd, RESPONSE, RLEN, 0);
198        io_uring_sqe_set_data(sqe, req);
199        submit(ring);
200}
201
202//=========================================================
203static void handle_new_conn(struct io_uring * ring, int fd) {
204        if( fd < 0 ) {
205                int err = -fd;
206                if( err == ECONNABORTED ) return;
207                std::cerr << "accept error: (" << errno << ") " << strerror(errno) << std::endl;
208                exit(EXIT_FAILURE);
209        }
210
211        ring_request(ring, fd);
212}
213
214static void handle_request(struct io_uring * ring, struct request_t * in, int res) {
215        if( res < 0 ) {
216                int err = -res;
217                switch(err) {
218                        case EPIPE:
219                        case ECONNRESET:
220                                close(in->fd);
221                                free(in);
222                                return;
223                        default:
224                                std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl;
225                                exit(EXIT_FAILURE);
226                }
227        }
228
229        if(res == 0) {
230                close(in->fd);
231                free(in);
232                return;
233        }
234
235        const char * it = in->buff;
236        if( !strstr( it, "\r\n\r\n" ) ) {
237                std::cout << "Incomplete request" << std::endl;
238                close(in->fd);
239                free(in);
240                return;
241        }
242
243        it = in->buff;
244        const std::string reply = "Hello, World!\n";
245        int ret = memcmp(it, "GET ", 4);
246        if( ret != 0 ) {
247                ring_answer(ring, in->fd, E400);
248                goto NEXT;
249        }
250
251        it += 4;
252        ret = memcmp(it, "/plaintext", 10);
253        if( ret != 0 ) {
254                ring_answer(ring, in->fd, E404);
255                goto NEXT;
256        }
257
258        ring_answer(ring, in->fd, reply);
259
260        NEXT:
261                ring_request(ring, in->fd);
262                free(in);
263                return;
264}
265
266static void handle_answer(struct io_uring * ring, struct request_t * in, int res) {
267        if( res < 0 ) {
268                int err = -res;
269                switch(err) {
270                        case EPIPE:
271                        case ECONNRESET:
272                                close(in->fd);
273                                free(in);
274                                return;
275                        default:
276                                std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
277                                exit(EXIT_FAILURE);
278                }
279        }
280
281        if( res >= in->length ) {
282                free(in);
283                return;
284        }
285
286        struct io_uring_sqe * sqe = get_sqe(ring);
287        io_uring_prep_write(sqe, in->fd, in->buff + res, in->length - res, 0);
288        io_uring_sqe_set_data(sqe, in);
289        submit(ring);
290        // std::cout << "Re-Submitted request: " << in << " (" << (void*)in->buffer << ")"<<std::endl;
291
292        ring_request(ring, in->fd);
293}
294
295//=========================================================
296extern "C" {
297extern int __io_uring_flush_sq(struct io_uring *ring);
298}
299
300void * proc_loop(void * arg) {
301        size_t count = 0;
302        struct options_t & opt = *(struct options_t *)arg;
303
304        struct io_uring * ring = opt.ring;
305
306        char endfd_buf[8];
307        ring_end(ring, opt.endfd, endfd_buf, 8);
308
309        ring_accept(ring, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
310
311        int reset = 1;
312        bool done = false;
313        while(!done) {
314                struct io_uring_cqe *cqe;
315                int ret;
316                while(-EAGAIN == (ret = io_uring_wait_cqe_nr(ring, &cqe, 0))) {
317                        ret = io_uring_submit_and_wait(ring, 1);
318                        if (ret < 0) {
319                                fprintf( stderr, "io_uring get error: (%d) %s\n", (int)-ret, strerror(-ret) );
320                                exit(EXIT_FAILURE);
321                        }
322                        opt.result.subs += ret;
323                        opt.result.cnts++;
324                }
325
326                if (ret < 0 && -EAGAIN != ret) {
327                        fprintf( stderr, "io_uring peek error: (%d) %s\n", (int)-ret, strerror(-ret) );
328                        exit(EXIT_FAILURE);
329                }
330
331                auto req = (struct request_t *)cqe->user_data;
332                // std::cout << req << " completed with " << cqe->res << std::endl;
333
334                switch(req->type) {
335                        case EVENT_END:
336                                done = true;
337                                break;
338                        case EVENT_ACCEPT:
339                                handle_new_conn(ring, cqe->res);
340                                free(req);
341                                ring_accept(ring, opt.acpt.sockfd, opt.acpt.addr, opt.acpt.addrlen, opt.acpt.flags);
342                                break;
343                        case EVENT_REQUEST:
344                                handle_request(ring, req, cqe->res);
345                                break;
346                        case EVENT_ANSWER:
347                                handle_answer(ring, req, cqe->res);
348                                break;
349                }
350
351                io_uring_cqe_seen(ring, cqe);
352                reset--;
353                if(reset == 0) {
354                        size_t ltotal = opt.result.subs;
355                        size_t lcount = opt.result.cnts;
356
357                        std::cout << "Submit average: " << ltotal << "/" << lcount << "(" << (((double)ltotal) / lcount) << ")" << std::endl;
358                        reset = 100000 + (100000 * (ring->ring_fd % 5));
359                }
360        }
361
362        return (void*)count;
363}
364
365//=========================================================
366struct __attribute__((aligned(128))) aligned_ring {
367        struct io_uring storage;
368};
369
370#include <bit>
371
372#include <pthread.h>
373extern "C" {
374        #include <signal.h>
375        #include <sys/eventfd.h>
376        #include <sys/socket.h>
377        #include <netinet/in.h>
378}
379
380int main(int argc, char * argv[]) {
381        signal(SIGPIPE, SIG_IGN);
382
383        unsigned nthreads = 1;
384        unsigned port = 8800;
385        unsigned entries = 256;
386        unsigned backlog = 10;
387        bool attach = false;
388        bool sqpoll = false;
389
390        //===================
391        // Arguments
392        int c;
393        while ((c = getopt (argc, argv, "t:p:e:b:aS")) != -1) {
394                switch (c)
395                {
396                case 't':
397                        nthreads = atoi(optarg);
398                        break;
399                case 'p':
400                        port = atoi(optarg);
401                        break;
402                case 'e':
403                        entries = atoi(optarg);
404                        break;
405                case 'b':
406                        backlog = atoi(optarg);
407                        break;
408                case 'a':
409                        attach = true;
410                        break;
411                case 'S':
412                        sqpoll = true;
413                        break;
414                case '?':
415                default:
416                        std::cerr << "Usage: -t <threads> -p <port> -e <entries> -b <backlog> -aS" << std::endl;
417                        return EXIT_FAILURE;
418                }
419        }
420
421        if( !std::ispow2(entries) ) {
422                unsigned v = entries;
423                v--;
424                v |= v >> 1;
425                v |= v >> 2;
426                v |= v >> 4;
427                v |= v >> 8;
428                v |= v >> 16;
429                v++;
430                std::cerr << "Warning: num_entries not a power of 2 (" << entries << ") raising to " << v << std::endl;
431                entries = v;
432        }
433
434        //===================
435        // End FD
436        int efd = eventfd(0, EFD_SEMAPHORE);
437        if (efd < 0) {
438                std::cerr << "eventfd error: (" << errno << ") " << strerror(errno) << std::endl;
439                exit(EXIT_FAILURE);
440        }
441
442        //===================
443        // Open Socket
444        std::cout << getpid() << " : Listening on port " << port << std::endl;
445        int server_fd = socket(AF_INET, SOCK_STREAM, 0);
446        if(server_fd < 0) {
447                std::cerr << "socket error: (" << errno << ") " << strerror(errno) << std::endl;
448                exit(EXIT_FAILURE);
449        }
450
451        int ret = 0;
452        struct sockaddr_in address;
453        int addrlen = sizeof(address);
454        memset( (char *)&address, '\0', addrlen );
455        address.sin_family = AF_INET;
456        address.sin_addr.s_addr = htonl(INADDR_ANY);
457        address.sin_port = htons( port );
458
459        int waited = 0;
460        while(true) {
461                ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) );
462                if(ret < 0) {
463                        if(errno == EADDRINUSE) {
464                                if(waited == 0) {
465                                        std::cerr << "Waiting for port" << std::endl;
466                                } else {
467                                        std::cerr << "\r" << waited;
468                                        std::cerr.flush();
469                                }
470                                waited ++;
471                                usleep( 1000000 );
472                                continue;
473                        }
474                        std::cerr << "bind error: (" << errno << ") " << strerror(errno) << std::endl;
475                        exit(EXIT_FAILURE);
476                }
477                break;
478        }
479
480        ret = listen( server_fd, backlog );
481        if(ret < 0) {
482                std::cerr << "listen error: (" << errno << ") " << strerror(errno) << std::endl;
483                exit(EXIT_FAILURE);
484        }
485
486        //===================
487        // Run Server Threads
488        std::cout << "Starting " << nthreads << " Threads";
489        if(attach) {
490                std::cout << " with attached Rings";
491        }
492        std::cout << std::endl;
493
494        aligned_ring thrd_rings[nthreads];
495        pthread_t    thrd_hdls[nthreads];
496        options_t    thrd_opts[nthreads];
497        bool fast_poll = true;
498        bool nfix_sqpl = true;
499        for(unsigned i = 0; i < nthreads; i++) {
500                struct io_uring_params p = { };
501                if(sqpoll) {
502                        p.flags |= IORING_SETUP_SQPOLL;
503                        p.sq_thread_idle = 100;
504                }
505
506                if (attach && i != 0) {
507                        p.flags |= IORING_SETUP_ATTACH_WQ;
508                        p.wq_fd = thrd_rings[0].storage.ring_fd;
509                }
510                io_uring_queue_init_params(entries, &thrd_rings[i].storage, &p);
511
512                if(0 == (p.features & IORING_FEAT_FAST_POLL      )) { fast_poll = false; }
513                if(0 == (p.features & IORING_FEAT_SQPOLL_NONFIXED)) { nfix_sqpl = false; }
514
515                thrd_opts[i].acpt.sockfd  = server_fd;
516                thrd_opts[i].acpt.addr    = (struct sockaddr *)&address;
517                thrd_opts[i].acpt.addrlen = (socklen_t*)&addrlen;
518                thrd_opts[i].acpt.flags   = 0;
519                thrd_opts[i].endfd        = efd;
520                thrd_opts[i].ring         = &thrd_rings[i].storage;
521
522                int ret = pthread_create(&thrd_hdls[i], nullptr, proc_loop, &thrd_opts[i]);
523                if (ret < 0) {
524                        std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
525                        exit(EXIT_FAILURE);
526                }
527        }
528
529        if( fast_poll) std::cout << "Fast Poll Present" << std::endl;
530        if(!nfix_sqpl) std::cout << "Non-Fixed SQ Poll not Present" << std::endl;
531
532        //===================
533        // Server Started
534        std::cout << "Server Started" << std::endl;
535        {
536                char buffer[128];
537                int ret;
538                do {
539                        ret = read(STDIN_FILENO, buffer, 128);
540                        if(ret < 0) {
541                                std::cerr << "main read error: (" << errno << ") " << strerror(errno) << std::endl;
542                                exit(EXIT_FAILURE);
543                        }
544                        else if(ret > 0) {
545                                std::cout << "User inputed '";
546                                std::cout.write(buffer, ret);
547                                std::cout << "'" << std::endl;
548                        }
549                } while(ret != 0);
550
551                std::cout << "Shutdown received" << std::endl;
552        }
553
554        //===================
555        (std::cout << "Sending Shutdown to Threads... ").flush();
556        ret = eventfd_write(efd, nthreads);
557        if (ret < 0) {
558                std::cerr << "eventfd close error: (" << errno << ") " << strerror(errno) << std::endl;
559                exit(EXIT_FAILURE);
560        }
561        std::cout << "done" << std::endl;
562
563        //===================
564        (std::cout << "Stopping Threads Done... ").flush();
565        for(unsigned i = 0; i < nthreads; i++) {
566                void * retval;
567                int ret = pthread_join(thrd_hdls[i], &retval);
568                if (ret < 0) {
569                        std::cerr << "pthread create error: (" << errno << ") " << strerror(errno) << std::endl;
570                        exit(EXIT_FAILURE);
571                }
572
573                io_uring_queue_exit(thrd_opts[i].ring);
574        }
575        std::cout << "done" << std::endl;
576
577        //===================
578        (std::cout << "Closing Socket... ").flush();
579        ret = shutdown( server_fd, SHUT_RD );
580        if( ret < 0 ) {
581                std::cerr << "shutdown socket error: (" << errno << ") " << strerror(errno) << std::endl;
582                exit(EXIT_FAILURE);
583        }
584
585        ret = close(server_fd);
586        if (ret < 0) {
587                std::cerr << "close socket error: (" << errno << ") " << strerror(errno) << std::endl;
588                exit(EXIT_FAILURE);
589        }
590        std::cout << "done" << std::endl;
591}
Note: See TracBrowser for help on using the repository browser.