Ignore:
Timestamp:
Feb 15, 2022, 4:28:29 PM (3 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, ast-experimental, enum, master, pthread-emulation, qualifiedEnum
Children:
3a40df6, 5614a191
Parents:
6dc17a3d
Message:

Producer now has multiple io_uring implementations.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/sendfile/producer.c

    r6dc17a3d r3263e2a4  
    1010
    1111#include <errno.h>
     12#include <locale.h>
    1213#include <time.h>
    1314#include <unistd.h>
     
    2425#include <netdb.h>
    2526
     27#include <liburing.h>
    2628
    2729enum {
     
    3436        SENDFILE_ERROR,
    3537        SPLICEIN_ERROR,
    36         SPLICEOUT_ERROR
     38        SPLICEOUT_ERROR,
     39        URINGWAIT_ERROR
    3740};
    3841
     
    4346
    4447int pipefd[2];
     48struct io_uring ring;
    4549
    4650struct stats {
     
    5660static void my_sendfile(int out, int in, size_t size, struct stats *);
    5761static void my_splice  (int out, int in, size_t size, struct stats *);
     62static void my_iouring (int out, int in, size_t size, struct stats *);
     63static void my_ringlink(int out, int in, size_t size, struct stats *);
    5864typedef void (*sender_t)(int out, int in, size_t size, struct stats *);
    5965
     
    6167
    6268int main(int argc, char * argv[]) {
     69        setlocale(LC_ALL, "");
    6370        const char * file_path;
    6471        struct addrinfo * addr;
     
    167174        DONE:
    168175
     176        io_uring_queue_init(16, &ring, 0);
     177
    169178        {
    170179                char addr_str[INET_ADDRSTRLEN];
     
    211220        printf("--- sendfile ---\n");
    212221        run(my_sendfile, addr, file_fd, file_size);
     222        printf("--- io_uring ---\n");
     223        run(my_iouring, addr, file_fd, file_size);
     224        printf("--- io_uring + link ---\n");
     225        run(my_ringlink, addr, file_fd, file_size);
    213226
    214227        close(pipefd[0]);
     
    258271        printf("Sent %'zu bytes in %'zu files, %f seconds\n", st.bytes, st.calls, secs);
    259272        printf(" - %'3.3f bytes per second\n", (((double)st.bytes) / secs));
     273        printf(" - %'f seconds per file\n", secs / st.calls);
    260274        printf(" - %'3.3f bytes per calls\n", (((double)st.bytes) / st.calls));
    261275        if(st.shorts.r.cnt ){
     
    323337        st->bytes += writes;
    324338}
     339
     340static ssize_t naive_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags) {
     341        struct io_uring_sqe * sqe = io_uring_get_sqe(&ring);
     342
     343        io_uring_prep_splice(sqe, fd_in, NULL != off_in ? *off_in: -1, fd_out, NULL != off_out ? *off_out: -1, len, flags);
     344
     345        io_uring_submit(&ring);
     346
     347        struct io_uring_cqe * cqe = NULL;
     348        /* wait for the sqe to complete */
     349        int ret = io_uring_wait_cqe_nr(&ring, &cqe, 1);
     350
     351        /* read and process cqe event */
     352        switch(ret) {
     353        case 0:
     354                {
     355                        ssize_t val = cqe->res;
     356                        if( cqe->res < 0 ) {
     357                                printf("Completion Error : %s\n", strerror( -cqe->res ));
     358                                return EXIT_FAILURE;
     359                        }
     360                        io_uring_cqe_seen(&ring, cqe);
     361                        return val;
     362                }
     363        default:
     364                fprintf( stderr, "io_uring_wait error: (%d) %s\n\n", (int)-ret, strerror(-ret) );
     365                exit( URINGWAIT_ERROR );
     366        }
     367}
     368
     369static void my_iouring (int out, int in, size_t size, struct stats * st) {
     370        unsigned flags = 0; //SPLICE_F_MOVE; // | SPLICE_F_MORE;
     371        off_t offset = 0;
     372        size_t writes = 0;
     373        for(;;) {
     374                ssize_t reti = 0;
     375                reti = naive_splice(in, &offset, pipefd[1], NULL, size, flags);
     376                if( reti < 0 ) {
     377                        fprintf( stderr, "splice in error: (%d) %s\n\n", (int)errno, strerror(errno) );
     378                        exit( SPLICEIN_ERROR );
     379                }
     380
     381                size -= reti;
     382                size_t in_pipe = reti;
     383                for(;;) {
     384                        ssize_t reto = 0;
     385                        reto = naive_splice(pipefd[0], NULL, out, NULL, in_pipe, flags);
     386                        if( reto < 0 ) {
     387                                fprintf( stderr, "splice out error: (%d) %s\n\n", (int)errno, strerror(errno) );
     388                                exit( SPLICEOUT_ERROR );
     389                        }
     390                        in_pipe -= reto;
     391                        writes += reto;
     392                        if(0 == in_pipe) break;
     393                        st->shorts.w.cnt++;
     394                        st->shorts.w.bytes += reto;
     395                }
     396                if(0 == size) break;
     397                st->shorts.r.cnt++;
     398                st->shorts.r.bytes += reti;
     399        }
     400        st->calls++;
     401        st->bytes += writes;
     402}
     403
     404static void my_ringlink(int out, int in, size_t size, struct stats * st) {
     405        enum { SPLICE_IN, SPLICE_OUT };
     406
     407        size_t in_pipe = size;
     408        off_t offset = 0;
     409        bool has_in = false;
     410        bool has_out = false;
     411        while(true) {
     412                if(!has_in && size > 0) {
     413                        struct io_uring_sqe * sqe = io_uring_get_sqe(&ring);
     414                        io_uring_prep_splice(sqe, in, offset, pipefd[1], -1, size, 0);
     415                        sqe->user_data = SPLICE_IN;
     416                        has_in = true;
     417                }
     418                if(!has_out) {
     419                        struct io_uring_sqe * sqe = io_uring_get_sqe(&ring);
     420                        io_uring_prep_splice(sqe, pipefd[0], -1, out, -1, in_pipe, 0);
     421                        sqe->user_data = SPLICE_OUT;
     422                        if(has_in) sqe->flags = IOSQE_IO_LINK;
     423                        has_out = true;
     424                }
     425
     426                int ret = io_uring_submit_and_wait(&ring, 1);
     427                if(ret < 0) {
     428                        fprintf( stderr, "io_uring_submit error: (%d) %s\n\n", (int)-ret, strerror(-ret) );
     429                        exit( URINGWAIT_ERROR );
     430                }
     431
     432                /* poll the cq and count how much polling we did */
     433                while(true) {
     434                        struct io_uring_cqe * cqe = NULL;
     435                        /* wait for the sqe to complete */
     436                        int ret = io_uring_wait_cqe_nr(&ring, &cqe, 0);
     437
     438                        /* read and process cqe event */
     439                        switch(ret) {
     440                        case 0:
     441                                if( cqe->res < 0 ) {
     442                                        printf("Completion Error : %s\n", strerror( -cqe->res ));
     443                                        exit( URINGWAIT_ERROR );
     444                                }
     445
     446                                ssize_t write = cqe->res;
     447                                int which = cqe->user_data;
     448                                io_uring_cqe_seen(&ring, cqe);
     449                                switch( which ) {
     450                                case SPLICE_IN:
     451                                        has_in = false;
     452                                        size -= write;
     453                                        offset += write;
     454                                        if(0 == size) break;
     455                                        st->shorts.r.cnt++;
     456                                        st->shorts.r.bytes += write;
     457                                        break;
     458                                case SPLICE_OUT:
     459                                        has_out = false;
     460                                        in_pipe -= write;
     461                                        st->bytes += write;
     462                                        if(0 == in_pipe) break;
     463                                        st->shorts.w.cnt++;
     464                                        st->shorts.w.bytes += write;
     465                                        break;
     466                                default:
     467                                        printf("Completion Error : unknown user data\n");
     468                                        exit( URINGWAIT_ERROR );
     469                                }
     470                                continue;
     471                        case -EAGAIN:
     472                                goto OUTER;
     473                        default:
     474                                fprintf( stderr, "io_uring_get_cqe error: (%d) %s\n\n", (int)-ret, strerror(-ret) );
     475                                exit( URINGWAIT_ERROR );
     476                        }
     477                }
     478                OUTER:
     479                if(0 == in_pipe) break;
     480        }
     481        st->calls++;
     482}
Note: See TracChangeset for help on using the changeset viewer.