Changeset 3f39009


Ignore:
Timestamp:
Oct 29, 2021, 5:21:35 PM (8 months ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
enum, forall-pointer-decay, master
Children:
42b7fa5
Parents:
4087baf
Message:

Reimplemnted how splice is handled.

Location:
benchmark/io/http
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/protocol.cfa

    r4087baf r3f39009  
    1111#include <fstream.hfa>
    1212#include <iofwd.hfa>
     13#include <io/types.hfa>
     14#include <mutex_stmt.hfa>
    1315
    1416#include <assert.h>
     
    2628#define PLAINTEXT_MEMCPY
    2729#define PLAINTEXT_NOCOPY
     30#define LINKED_IO
    2831
    2932struct https_msg_str {
     
    5356}
    5457
    55 static inline int answer( int fd, const char * it, int len) {
     58static inline int answer( int fd, const char * it, int len ) {
    5659        while(len > 0) {
    5760                // Call write
    5861                int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY);
    5962                if( ret < 0 ) {
    60                         if( errno == ECONNRESET || errno == EPIPE ) return -ECONNRESET;
     63                        if( errno == ECONNRESET || errno == EPIPE ) { close(fd); return -ECONNRESET; }
    6164                        if( errno == EAGAIN || errno == EWOULDBLOCK) return -EAGAIN;
    6265
     
    7780}
    7881
    79 int answer_header( int fd, size_t size ) {
    80         char buffer[512];
    81         char * it = buffer;
     82static int fill_header(char * it, size_t size) {
    8283        memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
    8384        it += http_msgs[OK200]->len;
    8485        int len = http_msgs[OK200]->len;
    8586        len += snprintf(it, 512 - len, "%d \n\n", size);
     87        return len;
     88}
     89
     90static int answer_header( int fd, size_t size ) {
     91        char buffer[512];
     92        int len = fill_header(buffer, size);
    8693        return answer( fd, buffer, len );
    8794}
     
    135142}
    136143
    137 
    138 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) {
    139         char * it = buffer;
    140         size_t count = len - 1;
    141         int rlen = 0;
    142         READ:
    143         for() {
    144                 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
    145                 // int ret = read(fd, (void*)it, count);
    146                 if(ret == 0 ) return [OK200, true, 0, 0];
    147                 if(ret < 0 ) {
    148                         if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ;
    149                         if( errno == ECONNRESET ) return [E408, true, 0, 0];
    150                         if( errno == EPIPE ) return [E408, true, 0, 0];
    151                         abort( "read error: (%d) %s\n", (int)errno, strerror(errno) );
    152                 }
    153                 it[ret + 1] = '\0';
    154                 rlen += ret;
    155 
    156                 if( strstr( it, "\r\n\r\n" ) ) break;
    157 
    158                 it += ret;
    159                 count -= ret;
    160 
    161                 if( count < 1 ) return [E414, false, 0, 0];
    162         }
    163 
    164         if( options.log ) {
    165                 write(sout, buffer, rlen);
    166                 sout | nl;
    167         }
    168 
    169         it = buffer;
    170         int ret = memcmp(it, "GET /", 5);
    171         if( ret != 0 ) return [E400, false, 0, 0];
    172         it += 5;
    173 
    174         char * end = strstr( it, " " );
    175         return [OK200, false, it, end - it];
    176 }
    177 
    178 int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) {
     144static int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) {
    179145        unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE;
    180146        off_t offset = 0;
     
    207173}
    208174
     175static void zero_sqe(struct io_uring_sqe * sqe) {
     176        sqe->flags = 0;
     177        sqe->ioprio = 0;
     178        sqe->fd = 0;
     179        sqe->off = 0;
     180        sqe->addr = 0;
     181        sqe->len = 0;
     182        sqe->fsync_flags = 0;
     183        sqe->__pad2[0] = 0;
     184        sqe->__pad2[1] = 0;
     185        sqe->__pad2[2] = 0;
     186        sqe->fd = 0;
     187        sqe->off = 0;
     188        sqe->addr = 0;
     189        sqe->len = 0;
     190}
     191
     192enum FSM_STATE {
     193        Initial,
     194        Retry,
     195        Error,
     196        Done,
     197};
     198
     199struct FSM_Result {
     200        FSM_STATE state;
     201        int error;
     202};
     203
     204static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; }
     205static inline bool is_error(FSM_Result & this) { return Error == this.state; }
     206static inline bool is_done(FSM_Result & this) { return Done == this.state; }
     207
     208static inline int error(FSM_Result & this, int error) {
     209        this.error = error;
     210        this.state = Error;
     211        return error;
     212}
     213
     214static inline int done(FSM_Result & this) {
     215        this.state = Done;
     216        return 0;
     217}
     218
     219static inline int retry(FSM_Result & this) {
     220        this.state = Retry;
     221        return 0;
     222}
     223
     224static inline int need(FSM_Result & this) {
     225        switch(this.state) {
     226                case Initial:
     227                case Retry:
     228                        return 1;
     229                case Error:
     230                        if(this.error == 0) mutex(serr) serr | "State marked error but code is 0";
     231                case Done:
     232                        return 0;
     233        }
     234}
     235
     236// Generator that handles sending the header
     237generator header_g {
     238        io_future_t f;
     239        const char * next;
     240        int fd; size_t len;
     241        FSM_Result res;
     242};
     243
     244static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) {
     245        this.next = it;
     246        this.fd = fd;
     247        this.len = len;
     248}
     249
     250static inline void fill(header_g & this, struct io_uring_sqe * sqe) {
     251        zero_sqe(sqe);
     252        sqe->opcode = IORING_OP_SEND;
     253        sqe->user_data = (uintptr_t)&this.f;
     254        sqe->flags = IOSQE_IO_LINK;
     255        sqe->fd = this.fd;
     256        sqe->addr = (uintptr_t)this.next;
     257        sqe->len = this.len;
     258}
     259
     260static inline int error(header_g & this, int error) {
     261        int ret = close(this.fd);
     262        if( ret != 0 ) {
     263                mutex(serr) serr | "Failed to close fd" | errno;
     264        }
     265        return error(this.res, error);
     266}
     267
     268static inline int wait_and_process(header_g & this) {
     269        wait(this.f);
     270
     271        // Did something crazy happen?
     272        if(this.f.result > this.len) {
     273                mutex(serr) serr | "HEADER sent too much!";
     274                return error(this, -ERANGE);
     275        }
     276
     277        // Something failed?
     278        if(this.f.result < 0) {
     279                int error = -this.f.result;
     280                if( error == ECONNRESET ) return error(this, -ECONNRESET);
     281                if( error == EPIPE ) return error(this, -EPIPE);
     282                if( error == ECANCELED ) {
     283                        mutex(serr) serr | "HEADER was cancelled, WTF!";
     284                        return error(this, -ECONNRESET);
     285                }
     286                if( error == EAGAIN || error == EWOULDBLOCK) {
     287                        mutex(serr) serr | "HEADER got eagain, WTF!";
     288                        return error(this, -ECONNRESET);
     289                }
     290        }
     291
     292        // Done?
     293        if(this.f.result == this.len) {
     294                return done(this.res);
     295        }
     296
     297        // It must be a Short read
     298        this.len  -= this.f.result;
     299        this.next += this.f.result;
     300        reset(this.f);
     301        return retry(this.res);
     302}
     303
     304// Generator that handles splicing in a file
     305struct splice_in_t {
     306        io_future_t f;
     307        int fd; int pipe; size_t len; off_t off;
     308        FSM_Result res;
     309};
     310
     311static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) {
     312        this.fd = fd;
     313        this.pipe = pipe;
     314        this.len = len;
     315        this.off = 0;
     316}
     317
     318static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) {
     319        zero_sqe(sqe);
     320        sqe->opcode = IORING_OP_SPLICE;
     321        sqe->user_data = (uintptr_t)&this.f;
     322        sqe->flags = 0;
     323        sqe->splice_fd_in = this.fd;
     324        sqe->splice_off_in = this.off;
     325        sqe->fd = this.pipe;
     326        sqe->off = (__u64)-1;
     327        sqe->len = this.len;
     328        sqe->splice_flags = SPLICE_F_MOVE;
     329}
     330
     331static inline int wait_and_process(splice_in_t & this) {
     332        wait(this.f);
     333
     334        // Did something crazy happen?
     335        if(this.f.result > this.len) {
     336                mutex(serr) serr | "SPLICE IN spliced too much!";
     337                return error(this.res, -ERANGE);
     338        }
     339
     340        // Something failed?
     341        if(this.f.result < 0) {
     342                int error = -this.f.result;
     343                if( error == ECONNRESET ) return error(this.res, -ECONNRESET);
     344                if( error == EPIPE ) return error(this.res, -EPIPE);
     345                if( error == ECANCELED ) {
     346                        mutex(serr) serr | "SPLICE IN was cancelled, WTF!";
     347                        return error(this.res, -ECONNRESET);
     348                }
     349                if( error == EAGAIN || error == EWOULDBLOCK) {
     350                        mutex(serr) serr | "SPLICE IN got eagain, WTF!";
     351                        return error(this.res, -ECONNRESET);
     352                }
     353        }
     354
     355        // Done?
     356        if(this.f.result == this.len) {
     357                return done(this.res);
     358        }
     359
     360        // It must be a Short read
     361        this.len -= this.f.result;
     362        this.off += this.f.result;
     363        reset(this.f);
     364        return retry(this.res);
     365}
     366
     367generator splice_out_g {
     368        io_future_t f;
     369        int pipe; int fd; size_t len;
     370        FSM_Result res;
     371};
     372
     373static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) {
     374        this.pipe = pipe;
     375        this.fd = fd;
     376        this.len = len;
     377}
     378
     379static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) {
     380        zero_sqe(sqe);
     381        sqe->opcode = IORING_OP_SPLICE;
     382        sqe->user_data = (uintptr_t)&this.f;
     383        sqe->flags = 0;
     384        sqe->splice_fd_in = this.pipe;
     385        sqe->splice_off_in = (__u64)-1;
     386        sqe->fd = this.fd;
     387        sqe->off = (__u64)-1;
     388        sqe->len = this.len;
     389        sqe->splice_flags = SPLICE_F_MOVE;
     390}
     391
     392static inline int error(splice_out_g & this, int error) {
     393        int ret = close(this.fd);
     394        if( ret != 0 ) {
     395                mutex(serr) serr | "Failed to close fd" | errno;
     396        }
     397        return error(this.res, error);
     398}
     399
     400static inline void wait_and_process(splice_out_g & this) {
     401        wait(this.f);
     402
     403        // Did something crazy happen?
     404        if(this.f.result > this.len) {
     405                mutex(serr) serr | "SPLICE OUT spliced too much!";
     406                return error(this.res, -ERANGE);
     407        }
     408
     409        // Something failed?
     410        if(this.f.result < 0) {
     411                int error = -this.f.result;
     412                if( error == ECONNRESET ) return error(this, -ECONNRESET);
     413                if( error == EPIPE ) return error(this, -EPIPE);
     414                if( error == ECANCELED ) {
     415                        this.f.result = 0;
     416                        goto SHORT_WRITE;
     417                }
     418                if( error == EAGAIN || error == EWOULDBLOCK) {
     419                        mutex(serr) serr | "SPLICE OUT got eagain, WTF!";
     420                        return error(this, -ECONNRESET);
     421                }
     422        }
     423
     424        // Done?
     425        if(this.f.result == this.len) {
     426                return done(this.res);
     427        }
     428
     429SHORT_WRITE:
     430        // It must be a Short Write
     431        this.len -= this.f.result;
     432        reset(this.f);
     433        return retry(this.res);
     434}
     435
     436int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize ) {
     437        #if defined(LINKED_IO)
     438                char buffer[512];
     439                int len = fill_header(buffer, fsize);
     440                header_g header = { fd, buffer, len };
     441                splice_in_t splice_in = { ans_fd, pipe[1], fsize };
     442                splice_out_g splice_out = { pipe[0], fd, fsize };
     443
     444                RETRY_LOOP: for() {
     445                        int have = need(header.res) + need(splice_in.res) + 1;
     446                        int idx = 0;
     447                        struct io_uring_sqe * sqes[3];
     448                        __u32 idxs[3];
     449                        struct $io_context * ctx = cfa_io_allocate(sqes, idxs, have);
     450
     451                        if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); }
     452                        if(need(   header.res)) { fill(header   , sqes[idx++]); }
     453                        fill(splice_out, sqes[idx]);
     454
     455                        // Submit everything
     456                        asm volatile("": : :"memory");
     457                        cfa_io_submit( ctx, idxs, have, false );
     458
     459                        // wait for the results
     460                        // Always wait for splice-in to complete as
     461                        // we may need to kill the connection if it fails
     462                        // If it already completed, this is a no-op
     463                        wait_and_process(splice_in);
     464
     465                        if(is_error(splice_in.res)) {
     466                                mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error;
     467                                close(fd);
     468                        }
     469
     470                        // Process the other 2
     471                        wait_and_process(header);
     472                        wait_and_process(splice_out);
     473
     474                        if(is_done(splice_out.res)) {
     475                                break RETRY_LOOP;
     476                        }
     477
     478                        // We need to wait for the completion if
     479                        // - both completed
     480                        // - the header failed
     481                        // -
     482
     483                        if(  is_error(header.res)
     484                          || is_error(splice_in.res)
     485                          || is_error(splice_out.res)) {
     486                                return -ECONNRESET;
     487                        }
     488                }
     489
     490                return len + fsize;
     491        #else
     492                int ret = answer_header(fd, fsize);
     493                if( ret < 0 ) return ret;
     494                return sendfile(pipe, fd, ans_fd, fsize);
     495        #endif
     496}
     497
     498[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) {
     499        char * it = buffer;
     500        size_t count = len - 1;
     501        int rlen = 0;
     502        READ:
     503        for() {
     504                int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
     505                // int ret = read(fd, (void*)it, count);
     506                if(ret == 0 ) return [OK200, true, 0, 0];
     507                if(ret < 0 ) {
     508                        if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ;
     509                        if( errno == ECONNRESET ) { close(fd); return [E408, true, 0, 0]; }
     510                        if( errno == EPIPE ) { close(fd); return [E408, true, 0, 0]; }
     511                        abort( "read error: (%d) %s\n", (int)errno, strerror(errno) );
     512                }
     513                it[ret + 1] = '\0';
     514                rlen += ret;
     515
     516                if( strstr( it, "\r\n\r\n" ) ) break;
     517
     518                it += ret;
     519                count -= ret;
     520
     521                if( count < 1 ) return [E414, false, 0, 0];
     522        }
     523
     524        if( options.log ) {
     525                write(sout, buffer, rlen);
     526                sout | nl;
     527        }
     528
     529        it = buffer;
     530        int ret = memcmp(it, "GET /", 5);
     531        if( ret != 0 ) return [E400, false, 0, 0];
     532        it += 5;
     533
     534        char * end = strstr( it, " " );
     535        return [OK200, false, it, end - it];
     536}
     537
    209538//=============================================================================================
    210539
     
    251580                Time now = timeHiRes();
    252581                strftime( buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now );
    253                 if( options.log ) sout | "Updated date to '" | buff | "'";
     582                // if( options.log ) sout | "Updated date to '" | buff | "'";
    254583
    255584                for(i; KNOWN_CODES) {
     
    264593                this.idx = (this.idx + 1) % 2;
    265594
    266                 if( options.log ) sout | "Date thread sleeping";
     595                // if( options.log ) sout | "Date thread sleeping";
    267596
    268597                sleep(1`s);
  • benchmark/io/http/protocol.hfa

    r4087baf r3f39009  
    1616
    1717int answer_error( int fd, HttpCode code );
    18 int answer_header( int fd, size_t size );
    1918int answer_plaintext( int fd );
    2019int answer_empty( int fd );
     20int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t count );
    2121
    2222[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len);
    23 
    24 int sendfile( int pipe[2], int fd, int ans_fd, size_t count );
  • benchmark/io/http/worker.cfa

    r4087baf r3f39009  
    122122                        }
    123123
    124                         // Send the header
    125                         int ret = answer_header(fd, count);
    126                         if( ret == -ECONNRESET ) break REQUEST;
    127 
    128124                        // Send the desired file
    129                         ret = sendfile( this.pipe, fd, ans_fd, count);
     125                        int ret = answer_sendfile( this.pipe, fd, ans_fd, count);
    130126                        if( ret == -ECONNRESET ) break REQUEST;
    131127
     
    134130
    135131                if( options.log ) sout | "=== Connection closed ===";
    136                 close(fd);
    137132                continue CONNECTION;
    138133        }
Note: See TracChangeset for help on using the changeset viewer.