Ignore:
Timestamp:
Jan 19, 2021, 2:40:19 PM (3 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
f3e87af
Parents:
efdfdee
Message:

Tentative improvement to batch more requests together.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/http_ring.cpp

    refdfdee r3acbf89  
    1616} event_t;
    1717
    18 struct request_t {
     18struct __attribute__((aligned(128))) request_t {
    1919        event_t type;
    2020        int fd;
    2121        size_t length;
    22         char buffer[0];
     22        char * buff;
     23        char data[0];
    2324
    2425        static struct request_t * create(event_t type, size_t extra) {
     
    2627                ret->type = type;
    2728                ret->length = extra;
     29                ret->buff = ret->data;
    2830                return ret;
    2931        }
     
    3436};
    3537
    36 struct options_t {
     38struct __attribute__((aligned(128))) options_t {
    3739        struct {
    3840                int sockfd;
     
    4446        int endfd;
    4547        unsigned entries;
     48
     49        struct {
     50                size_t subs = 0;
     51                size_t cnts = 0;
     52        } result;
    4653};
     54
     55//=========================================================
     56static struct io_uring_sqe * get_sqe(struct io_uring * ring) {
     57        struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
     58        if(!sqe) {
     59                std::cerr << "Insufficient entries in ring" << std::endl;
     60                exit(EXIT_FAILURE);
     61        }
     62        return sqe;
     63}
     64
     65static void submit(struct io_uring * ) {
     66        // io_uring_submit(ring);
     67}
    4768
    4869//=========================================================
    4970static void ring_end(struct io_uring * ring, int fd, char * buffer, size_t len) {
    50         struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
     71        struct io_uring_sqe * sqe = get_sqe(ring);
    5172        io_uring_prep_read(sqe, fd, buffer, len, 0);
    5273        io_uring_sqe_set_data(sqe, request_t::create(EVENT_END));
    53         io_uring_submit(ring);
     74        submit(ring);
    5475}
    5576
    5677static void ring_accept(struct io_uring * ring, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
    5778        auto req = request_t::create(EVENT_ACCEPT);
    58         struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
     79        struct io_uring_sqe * sqe = get_sqe(ring);
    5980        io_uring_prep_accept(sqe, sockfd, addr, addrlen, flags);
    6081        io_uring_sqe_set_data(sqe, req);
    61         io_uring_submit(ring);
    62         std::cout << "Submitted accept: " << req << std::endl;
     82        submit(ring);
     83        // std::cout << "Submitted accept: " << req << std::endl;
    6384}
    6485
     
    6889        req->fd = fd;
    6990
    70         struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
    71         io_uring_prep_read(sqe, fd, req->buffer, size, 0);
     91        struct io_uring_sqe * sqe = get_sqe(ring);
     92        io_uring_prep_read(sqe, fd, req->buff, size, 0);
    7293        io_uring_sqe_set_data(sqe, req);
    73         io_uring_submit(ring);
    74         std::cout << "Submitted request: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
     94        submit(ring);
     95        // std::cout << "Submitted request: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
    7596}
    7697
     
    122143        const char * fmt = http_msgs[code];
    123144        const char * date = "";
    124         size = snprintf(req->buffer, size, fmt, date, size);
    125 
    126         struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
    127         io_uring_prep_write(sqe, fd, req->buffer, size, 0);
     145        size = snprintf(req->buff, size, fmt, date, size);
     146
     147        struct io_uring_sqe * sqe = get_sqe(ring);
     148        io_uring_prep_write(sqe, fd, req->buff, size, 0);
    128149        io_uring_sqe_set_data(sqe, req);
    129         io_uring_submit(ring);
    130         std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
    131 }
    132 
    133 static void ring_answer(struct io_uring * ring, int fd, const std::string & ans) {
     150        submit(ring);
     151        // std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
     152}
     153
     154static void ring_answer(struct io_uring * ring, int fd, const std::string &) {
     155        // size_t size = 256;
     156        // auto req = request_t::create(EVENT_ANSWER, size);
     157        // req->fd = fd;
     158
     159        // const char * fmt = http_msgs[OK200];
     160        // const char * date = "";
     161        // size_t len = snprintf(req->buffer, size, fmt, date, ans.size(), ans.c_str());
     162        // req->length = len;
     163
     164        // struct io_uring_sqe * sqe = get_sqe(ring);
     165        // io_uring_prep_write(sqe, fd, req->buffer, len, 0);
     166        // io_uring_sqe_set_data(sqe, req);
     167        // submit(ring);
     168        // std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
     169
     170
     171        static const char* RESPONSE = "HTTP/1.1 200 OK\r\n" \
     172                                                "Content-Length: 15\r\n" \
     173                                                "Content-Type: text/html\r\n" \
     174                                                "Connection: keep-alive\r\n" \
     175                                                "Server: testserver\r\n" \
     176                                                "\r\n" \
     177                                                "Hello, World!\r\n";
     178
     179        static const size_t RLEN = strlen(RESPONSE);
     180
    134181        size_t size = 256;
    135182        auto req = request_t::create(EVENT_ANSWER, size);
    136183        req->fd = fd;
    137 
    138         const char * fmt = http_msgs[OK200];
    139         const char * date = "";
    140         size_t len = snprintf(req->buffer, size, fmt, date, ans.size(), ans.c_str());
    141         req->length = len;
    142 
    143         struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
    144         io_uring_prep_write(sqe, fd, req->buffer, len, 0);
     184        req->buff = (char*)RESPONSE;
     185        req->length = RLEN;
     186
     187        // const char * fmt = http_msgs[OK200];
     188        // const char * date = "";
     189        // size_t len = snprintf(req->buffer, size, fmt, date, ans.size(), ans.c_str());
     190        // req->length = len;
     191
     192        struct io_uring_sqe * sqe = get_sqe(ring);
     193        io_uring_prep_write(sqe, fd, RESPONSE, RLEN, 0);
    145194        io_uring_sqe_set_data(sqe, req);
    146         io_uring_submit(ring);
    147         std::cout << "Submitted good answer: " << req << " (" << (void*)req->buffer << ")"<<std::endl;
     195        submit(ring);
    148196}
    149197
     
    170218                                return;
    171219                        default:
    172                                 std::cerr << "answer error: (" << err << ") " << strerror(err) << std::endl;
     220                                std::cerr << "request error: (" << err << ") " << strerror(err) << std::endl;
    173221                                exit(EXIT_FAILURE);
    174222                }
     
    181229        }
    182230
    183         char * it = in->buffer;
     231        const char * it = in->buff;
    184232        if( !strstr( it, "\r\n\r\n" ) ) {
    185233                std::cout << "Incomplete request" << std::endl;
     
    189237        }
    190238
    191         it = in->buffer;
     239        it = in->buff;
    192240        const std::string reply = "Hello, World!\n";
    193241        int ret = memcmp(it, "GET ", 4);
     
    231279        }
    232280
    233         struct io_uring_sqe * sqe = io_uring_get_sqe(ring);
    234         io_uring_prep_write(sqe, in->fd, in->buffer + res, in->length - res, 0);
     281        struct io_uring_sqe * sqe = get_sqe(ring);
     282        io_uring_prep_write(sqe, in->fd, in->buff + res, in->length - res, 0);
    235283        io_uring_sqe_set_data(sqe, in);
    236         io_uring_submit(ring);
    237         std::cout << "Re-Submitted request: " << in << " (" << (void*)in->buffer << ")"<<std::endl;
     284        submit(ring);
     285        // std::cout << "Re-Submitted request: " << in << " (" << (void*)in->buffer << ")"<<std::endl;
    238286
    239287        ring_request(ring, in->fd);
     
    241289
    242290//=========================================================
     291extern "C" {
     292extern int __io_uring_flush_sq(struct io_uring *ring);
     293}
     294
    243295void * proc_loop(void * arg) {
    244         const struct options_t & opt = *(const struct options_t *)arg;
     296        size_t count = 0;
     297        struct options_t & opt = *(struct options_t *)arg;
    245298
    246299        struct io_uring ring_storage;
     
    256309        while(!done) {
    257310                struct io_uring_cqe *cqe;
    258                 int ret = io_uring_wait_cqe(ring, &cqe);
    259                 if (ret < 0) {
    260                         fprintf( stderr, "io_uring error: (%d) %s\n", (int)-ret, strerror(-ret) );
     311                int ret;
     312                while(-EAGAIN == (ret = io_uring_wait_cqe_nr(ring, &cqe, 0))) {
     313                        ret = io_uring_submit_and_wait(ring, 1);
     314                        if (ret < 0) {
     315                                fprintf( stderr, "io_uring get error: (%d) %s\n", (int)-ret, strerror(-ret) );
     316                                exit(EXIT_FAILURE);
     317                        }
     318                        opt.result.subs += ret;
     319                        opt.result.cnts++;
     320                }
     321
     322                if (ret < 0 && -EAGAIN != ret) {
     323                        fprintf( stderr, "io_uring peek error: (%d) %s\n", (int)-ret, strerror(-ret) );
    261324                        exit(EXIT_FAILURE);
    262325                }
    263326
    264327                auto req = (struct request_t *)cqe->user_data;
    265                 std::cout << req << " completed with " << cqe->res << std::endl;
     328                // std::cout << req << " completed with " << cqe->res << std::endl;
    266329
    267330                switch(req->type) {
     
    287350        io_uring_queue_exit(ring);
    288351
    289         return NULL;
     352        return (void*)count;
    290353}
    291354
    292355//=========================================================
     356#include <bit>
     357
    293358#include <pthread.h>
    294359extern "C" {
     
    299364}
    300365
    301 int main() {
     366int main(int argc, char * argv[]) {
    302367        signal(SIGPIPE, SIG_IGN);
    303368
     
    306371        unsigned entries = 256;
    307372        unsigned backlog = 10;
     373
     374        //===================
     375        // Arguments
     376        int c;
     377        while ((c = getopt (argc, argv, "t:p:e:b:")) != -1) {
     378                switch (c)
     379                {
     380                case 't':
     381                        nthreads = atoi(optarg);
     382                        break;
     383                case 'p':
     384                        port = atoi(optarg);
     385                        break;
     386                case 'e':
     387                        entries = atoi(optarg);
     388                        break;
     389                case 'b':
     390                        backlog = atoi(optarg);
     391                        break;
     392                case '?':
     393                default:
     394                        std::cerr << "Usage: -t <threads> -p <port> -e <entries> -b <backlog>" << std::endl;
     395                        return EXIT_FAILURE;
     396                }
     397        }
     398
     399        if( !std::ispow2(entries) ) {
     400                unsigned v = entries;
     401                v--;
     402                v |= v >> 1;
     403                v |= v >> 2;
     404                v |= v >> 4;
     405                v |= v >> 8;
     406                v |= v >> 16;
     407                v++;
     408                std::cerr << "Warning: num_entries not a power of 2 (" << entries << ") raising to " << v << std::endl;
     409                entries = v;
     410        }
    308411
    309412        //===================
     
    412515        //===================
    413516        (std::cout << "Stopping Threads Done... ").flush();
     517        size_t total = 0;
     518        size_t count = 0;
    414519        for(unsigned i = 0; i < nthreads; i++) {
    415520                void * retval;
     
    419524                        exit(EXIT_FAILURE);
    420525                }
     526                // total += (size_t)retval;
     527                total += thrd_opts[i].result.subs;
     528                count += thrd_opts[i].result.cnts;
    421529        }
    422530        std::cout << "done" << std::endl;
     531        std::cout << "Submit average: " << total << "/" << count << "(" << (((double)total) / count) << ")" << std::endl;
    423532
    424533        //===================
Note: See TracChangeset for help on using the changeset viewer.