Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/protocol.cfa

    r32d1383 r8bee858  
    1818extern "C" {
    1919      int snprintf ( char * s, size_t n, const char * format, ... );
    20         ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);
    2120        // #include <linux/io_uring.h>
    2221}
     
    3130#define PLAINTEXT_NOCOPY
    3231// #define LINKED_IO
    33 #define TRUE_SENDFILE
    3432
    3533static inline __s32 wait_res( io_future_t & this ) {
     
    7371                int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY);
    7472                if( ret < 0 ) {
    75                         if( errno == ECONNRESET || errno == EPIPE || errno == EBADF ) {
    76                                 ret = close(fd);
    77                                 if( ret != 0 ) abort( "close in 'answer' error: (%d) %s\n", (int)errno, strerror(errno) );
    78                                 return -ECONNRESET;
    79                         }
     73                        if( errno == ECONNRESET || errno == EPIPE ) { close(fd); return -ECONNRESET; }
    8074
    8175                        abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) );
     
    8983}
    9084
    91 // int answer_error( int fd, HttpCode code ) {
    92 //      /* paranoid */ assert( code < KNOWN_CODES && code != OK200 );
    93 //      int idx = (int)code;
    94 //      return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len );
    95 // }
     85int answer_error( int fd, HttpCode code ) {
     86        /* paranoid */ assert( code < KNOWN_CODES && code != OK200 );
     87        int idx = (int)code;
     88        return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len );
     89}
    9690
    9791static int fill_header(char * it, size_t size) {
     
    109103}
    110104
    111 // #if defined(PLAINTEXT_NOCOPY)
    112 // int answer_plaintext( int fd ) {
    113 //      return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len); // +1 cause snprintf doesn't count nullterminator
    114 // }
    115 // #elif defined(PLAINTEXT_MEMCPY)
    116 // #define TEXTSIZE 15
    117 // int answer_plaintext( int fd ) {
    118 //      char text[] = "Hello, World!\n\n";
    119 //      char ts[] = xstr(TEXTSIZE) " \n\n";
    120 //      _Static_assert(sizeof(text) - 1 == TEXTSIZE);
    121 //      char buffer[512 + TEXTSIZE];
    122 //      char * it = buffer;
    123 //      memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
    124 //      it += http_msgs[OK200]->len;
    125 //      int len = http_msgs[OK200]->len;
    126 //      memcpy(it, ts, sizeof(ts) - 1);
    127 //      it += sizeof(ts) - 1;
    128 //      len += sizeof(ts) - 1;
    129 //      memcpy(it, text, TEXTSIZE);
    130 //      return answer(fd, buffer, len + TEXTSIZE);
    131 // }
    132 // #elif defined(PLAINTEXT_1WRITE)
    133 // int answer_plaintext( int fd ) {
    134 //      char text[] = "Hello, World!\n\n";
    135 //      char buffer[512 + sizeof(text)];
    136 //      char * it = buffer;
    137 //      memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
    138 //      it += http_msgs[OK200]->len;
    139 //      int len = http_msgs[OK200]->len;
    140 //      int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text));
    141 //      it += r;
    142 //      len += r;
    143 //      memcpy(it, text, sizeof(text));
    144 //      return answer(fd, buffer, len + sizeof(text));
    145 // }
    146 // #else
    147 // int answer_plaintext( int fd ) {
    148 //      char text[] = "Hello, World!\n\n";
    149 //      int ret = answer_header(fd, sizeof(text));
    150 //      if( ret < 0 ) return ret;
    151 //      return answer(fd, text, sizeof(text));
    152 // }
    153 // #endif
    154 
    155 // int answer_empty( int fd ) {
    156 //      return answer_header(fd, 0);
    157 // }
     105#if defined(PLAINTEXT_NOCOPY)
     106int answer_plaintext( int fd ) {
     107        return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len); // +1 cause snprintf doesn't count nullterminator
     108}
     109#elif defined(PLAINTEXT_MEMCPY)
     110#define TEXTSIZE 15
     111int answer_plaintext( int fd ) {
     112        char text[] = "Hello, World!\n\n";
     113        char ts[] = xstr(TEXTSIZE) " \n\n";
     114        _Static_assert(sizeof(text) - 1 == TEXTSIZE);
     115        char buffer[512 + TEXTSIZE];
     116        char * it = buffer;
     117        memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
     118        it += http_msgs[OK200]->len;
     119        int len = http_msgs[OK200]->len;
     120        memcpy(it, ts, sizeof(ts) - 1);
     121        it += sizeof(ts) - 1;
     122        len += sizeof(ts) - 1;
     123        memcpy(it, text, TEXTSIZE);
     124        return answer(fd, buffer, len + TEXTSIZE);
     125}
     126#elif defined(PLAINTEXT_1WRITE)
     127int answer_plaintext( int fd ) {
     128        char text[] = "Hello, World!\n\n";
     129        char buffer[512 + sizeof(text)];
     130        char * it = buffer;
     131        memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
     132        it += http_msgs[OK200]->len;
     133        int len = http_msgs[OK200]->len;
     134        int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text));
     135        it += r;
     136        len += r;
     137        memcpy(it, text, sizeof(text));
     138        return answer(fd, buffer, len + sizeof(text));
     139}
     140#else
     141int answer_plaintext( int fd ) {
     142        char text[] = "Hello, World!\n\n";
     143        int ret = answer_header(fd, sizeof(text));
     144        if( ret < 0 ) return ret;
     145        return answer(fd, text, sizeof(text));
     146}
     147#endif
     148
     149int answer_empty( int fd ) {
     150        return answer_header(fd, 0);
     151}
    158152
    159153static int sendfile( int pipe[2], int fd, int ans_fd, size_t count, sendfile_stats_t & stats ) {
     
    167161        if(zipf_idx < 0) mutex(serr) serr | "SENDFILE" | count | " greated than biggest zipf file";
    168162
    169         #if defined(TRUE_SENDFILE)
    170                 off_t offset = 0;
    171                 ssize_t ret;
    172                 int flags = fcntl(fd, F_GETFL);
    173                 if(flags < 0) abort("getfl in 'true sendfile' error: (%d) %s\n", (int)errno, strerror(errno) );
    174                 ret = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
    175                 if(ret < 0) abort("setfl in 'true sendfile' error: (%d) %s\n", (int)errno, strerror(errno) );
    176 
    177                 while(count) {
    178                         ret = sendfile(fd, ans_fd, &offset, count);
     163
     164        unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE;
     165        off_t offset = 0;
     166        ssize_t ret;
     167        SPLICE1: while(count > 0) {
     168                stats.tries++;
     169                // ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY);
     170                ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags);
     171                if( ret <= 0 ) {
     172                        if( errno == ECONNRESET ) return -ECONNRESET;
     173                        if( errno == EPIPE ) return -EPIPE;
     174                        abort( "splice [0] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
     175                }
     176                count -= ret;
     177                stats.splcin++;
     178                if(count > 0) stats.avgrd[zipf_idx].calls++;
     179                stats.avgrd[zipf_idx].bytes += ret;
     180
     181                size_t in_pipe = ret;
     182                SPLICE2: while(in_pipe > 0) {
     183                        // ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY);
     184                        ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags);
    179185                        if( ret <= 0 ) {
    180                                 if( errno == EAGAIN || errno == EWOULDBLOCK ) {
    181                                         stats.eagain++;
    182                                         yield();
    183                                         continue;
    184                                 }
    185                                 if( errno == ECONNRESET || errno == EPIPE ) {
    186                                         ret = close(fd);
    187                                         if( ret != 0 ) abort( "close in 'true sendfile' error: (%d) %s\n", (int)errno, strerror(errno) );
    188                                         return -ECONNRESET;
    189                                 }
    190                                 abort( "sendfile error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
     186                                if( errno == ECONNRESET ) return -ECONNRESET;
     187                                if( errno == EPIPE ) return -EPIPE;
     188                                abort( "splice [1] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
    191189                        }
    192                         count -= ret;
    193                         stats.splcin++;
    194                         if(count > 0) stats.avgrd[zipf_idx].calls++;
    195                         stats.avgrd[zipf_idx].bytes += ret;
    196                 }
    197 
    198                 ret = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
    199                 if(ret < 0) abort("resetfl in 'true sendfile' error: (%d) %s\n", (int)errno, strerror(errno) );
    200         #else
    201                 #error not implemented
    202                 // unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE;
    203                 // off_t offset = 0;
    204                 // ssize_t ret;
    205                 // SPLICE1: while(count > 0) {
    206                 //      stats.tries++;
    207                 //      // ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY);
    208                 //      ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags);
    209                 //      if( ret <= 0 ) {
    210                 //              if( errno == ECONNRESET || errno == EPIPE ) {
    211                 //                      ret = close(fd);
    212                 //                      if( ret != 0 ) abort( "close in 'sendfile splice in' error: (%d) %s\n", (int)errno, strerror(errno) );
    213                 //                      return -ECONNRESET;
    214                 //              }
    215                 //              abort( "splice [0] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
    216                 //      }
    217                 //      count -= ret;
    218                 //      stats.splcin++;
    219                 //      if(count > 0) stats.avgrd[zipf_idx].calls++;
    220                 //      stats.avgrd[zipf_idx].bytes += ret;
    221 
    222                 //      size_t in_pipe = ret;
    223                 //      SPLICE2: while(in_pipe > 0) {
    224                 //              ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY);
    225                 //              // ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags);
    226                 //              if( ret <= 0 ) {
    227                 //                      if( errno == ECONNRESET || errno == EPIPE ) {
    228                 //                              ret = close(fd);
    229                 //                              if( ret != 0 ) abort( "close in 'sendfile splice out' error: (%d) %s\n", (int)errno, strerror(errno) );
    230                 //                              return -ECONNRESET;
    231                 //                      }
    232                 //                      abort( "splice [1] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
    233                 //              }
    234                 //              stats.splcot++;
    235                 //              in_pipe -= ret;
    236                 //      }
    237 
    238                 // }
    239         #endif
    240 
     190                        stats.splcot++;
     191                        in_pipe -= ret;
     192                }
     193
     194        }
    241195        return count;
    242196}
    243197
    244 // enum FSM_STATE {
    245 //      Initial,
    246 //      Retry,
    247 //      Error,
    248 //      Done,
    249 // };
    250 
    251 // struct FSM_Result {
    252 //      FSM_STATE state;
    253 //      int error;
    254 // };
    255 
    256 // static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; }
    257 // static inline bool is_error(FSM_Result & this) { return Error == this.state; }
    258 // static inline bool is_done(FSM_Result & this) { return Done == this.state; }
    259 
    260 // static inline int error(FSM_Result & this, int error) {
    261 //      this.error = error;
    262 //      this.state = Error;
    263 //      return error;
    264 // }
    265 
    266 // static inline int done(FSM_Result & this) {
    267 //      this.state = Done;
    268 //      return 0;
    269 // }
    270 
    271 // static inline int retry(FSM_Result & this) {
    272 //      this.state = Retry;
    273 //      return 0;
    274 // }
    275 
    276 // static inline int need(FSM_Result & this) {
    277 //      switch(this.state) {
    278 //              case Initial:
    279 //              case Retry:
    280 //                      return 1;
    281 //              case Error:
    282 //                      if(this.error == 0) mutex(serr) serr | "State marked error but code is 0";
    283 //              case Done:
    284 //                      return 0;
    285 //      }
    286 // }
    287 
    288 // // Generator that handles sending the header
    289 // generator header_g {
    290 //      io_future_t f;
    291 //      const char * next;
    292 //      int fd; size_t len;
    293 //      FSM_Result res;
    294 // };
    295 
    296 // static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) {
    297 //      this.next = it;
    298 //      this.fd = fd;
    299 //      this.len = len;
    300 // }
    301 
    302 // static inline void fill(header_g & this, struct io_uring_sqe * sqe) {
    303 //      zero_sqe(sqe);
    304 //      sqe->opcode = IORING_OP_SEND;
    305 //      sqe->user_data = (uintptr_t)&this.f;
    306 //      sqe->flags = IOSQE_IO_LINK;
    307 //      sqe->fd = this.fd;
    308 //      sqe->addr = (uintptr_t)this.next;
    309 //      sqe->len = this.len;
    310 // }
    311 
    312 // static inline int error(header_g & this, int error) {
    313 //      int ret = close(this.fd);
    314 //      if( ret != 0 ) {
    315 //              mutex(serr) serr | "Failed to close fd" | errno;
    316 //      }
    317 //      return error(this.res, error);
    318 // }
    319 
    320 // static inline int wait_and_process(header_g & this, sendfile_stats_t & stats) {
    321 //      wait(this.f);
    322 
    323 //      // Did something crazy happen?
    324 //      if(this.f.result > this.len) {
    325 //              mutex(serr) serr | "HEADER sent too much!";
    326 //              return error(this, -ERANGE);
    327 //      }
    328 
    329 //      // Something failed?
    330 //      if(this.f.result < 0) {
    331 //              int error = -this.f.result;
    332 //              if( error == ECONNRESET ) return error(this, -ECONNRESET);
    333 //              if( error == EPIPE ) return error(this, -EPIPE);
    334 //              if( error == ECANCELED ) {
    335 //                      mutex(serr) serr | "HEADER was cancelled, WTF!";
    336 //                      return error(this, -ECONNRESET);
    337 //              }
    338 //              if( error == EAGAIN || error == EWOULDBLOCK) {
    339 //                      mutex(serr) serr | "HEADER got eagain, WTF!";
    340 //                      return error(this, -ECONNRESET);
    341 //              }
    342 //      }
    343 
    344 //      // Done?
    345 //      if(this.f.result == this.len) {
    346 //              return done(this.res);
    347 //      }
    348 
    349 //      stats.header++;
    350 
    351 //      // It must be a Short read
    352 //      this.len  -= this.f.result;
    353 //      this.next += this.f.result;
    354 //      reset(this.f);
    355 //      return retry(this.res);
    356 // }
    357 
    358 // // Generator that handles splicing in a file
    359 // struct splice_in_t {
    360 //      io_future_t f;
    361 //      int fd; int pipe; size_t len; off_t off;
    362 //      short zipf_idx;
    363 //      FSM_Result res;
    364 // };
    365 
    366 // static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) {
    367 //      this.fd = fd;
    368 //      this.pipe = pipe;
    369 //      this.len = len;
    370 //      this.off = 0;
    371 //      this.zipf_idx = -1;
    372 //      STATS: for(i; zipf_cnts) {
    373 //              if(len <= zipf_sizes[i]) {
    374 //                      this.zipf_idx = i;
    375 //                      break STATS;
    376 //              }
    377 //      }
    378 //      if(this.zipf_idx < 0) mutex(serr) serr | "SPLICE IN" | len | " greated than biggest zipf file";
    379 // }
    380 
    381 // static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) {
    382 //      zero_sqe(sqe);
    383 //      sqe->opcode = IORING_OP_SPLICE;
    384 //      sqe->user_data = (uintptr_t)&this.f;
    385 //      sqe->flags = 0;
    386 //      sqe->splice_fd_in = this.fd;
    387 //      sqe->splice_off_in = this.off;
    388 //      sqe->fd = this.pipe;
    389 //      sqe->off = (__u64)-1;
    390 //      sqe->len = this.len;
    391 //      sqe->splice_flags = SPLICE_F_MOVE;
    392 // }
    393 
    394 // static inline int wait_and_process(splice_in_t & this, sendfile_stats_t & stats ) {
    395 //      wait(this.f);
    396 
    397 //      // Something failed?
    398 //      if(this.f.result < 0) {
    399 //              int error = -this.f.result;
    400 //              if( error == ECONNRESET ) return error(this.res, -ECONNRESET);
    401 //              if( error == EPIPE ) return error(this.res, -EPIPE);
    402 //              if( error == ECANCELED ) {
    403 //                      mutex(serr) serr | "SPLICE IN was cancelled, WTF!";
    404 //                      return error(this.res, -ECONNRESET);
    405 //              }
    406 //              if( error == EAGAIN || error == EWOULDBLOCK) {
    407 //                      mutex(serr) serr | "SPLICE IN got eagain, WTF!";
    408 //                      return error(this.res, -ECONNRESET);
    409 //              }
    410 //              mutex(serr) serr | "SPLICE IN got" | error | ", WTF!";
    411 //              return error(this.res, -ECONNRESET);
    412 //      }
    413 
    414 //      // Did something crazy happen?
    415 //      if(this.f.result > this.len) {
    416 //              mutex(serr) serr | "SPLICE IN spliced too much!";
    417 //              return error(this.res, -ERANGE);
    418 //      }
    419 
    420 //      // Done?
    421 //      if(this.f.result == this.len) {
    422 //              return done(this.res);
    423 //      }
    424 
    425 //      stats.splcin++;
    426 //      stats.avgrd[this.zipf_idx].calls++;
    427 //      stats.avgrd[this.zipf_idx].bytes += this.f.result;
    428 
    429 //      // It must be a Short read
    430 //      this.len -= this.f.result;
    431 //      this.off += this.f.result;
    432 //      reset(this.f);
    433 //      return retry(this.res);
    434 // }
    435 
    436 // generator splice_out_g {
    437 //      io_future_t f;
    438 //      int pipe; int fd; size_t len;
    439 //      FSM_Result res;
    440 // };
    441 
    442 // static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) {
    443 //      this.pipe = pipe;
    444 //      this.fd = fd;
    445 //      this.len = len;
    446 // }
    447 
    448 // static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) {
    449 //      zero_sqe(sqe);
    450 //      sqe->opcode = IORING_OP_SPLICE;
    451 //      sqe->user_data = (uintptr_t)&this.f;
    452 //      sqe->flags = 0;
    453 //      sqe->splice_fd_in = this.pipe;
    454 //      sqe->splice_off_in = (__u64)-1;
    455 //      sqe->fd = this.fd;
    456 //      sqe->off = (__u64)-1;
    457 //      sqe->len = this.len;
    458 //      sqe->splice_flags = SPLICE_F_MOVE;
    459 // }
    460 
    461 // static inline int error(splice_out_g & this, int error) {
    462 //      int ret = close(this.fd);
    463 //      if( ret != 0 ) {
    464 //              mutex(serr) serr | "Failed to close fd" | errno;
    465 //      }
    466 //      return error(this.res, error);
    467 // }
    468 
    469 // static inline void wait_and_process(splice_out_g & this, sendfile_stats_t & stats ) {
    470 //      wait(this.f);
    471 
    472 //      // Something failed?
    473 //      if(this.f.result < 0) {
    474 //              int error = -this.f.result;
    475 //              if( error == ECONNRESET ) return error(this, -ECONNRESET);
    476 //              if( error == EPIPE ) return error(this, -EPIPE);
    477 //              if( error == ECANCELED ) {
    478 //                      this.f.result = 0;
    479 //                      goto SHORT_WRITE;
    480 //              }
    481 //              if( error == EAGAIN || error == EWOULDBLOCK) {
    482 //                      mutex(serr) serr | "SPLICE OUT got eagain, WTF!";
    483 //                      return error(this, -ECONNRESET);
    484 //              }
    485 //              mutex(serr) serr | "SPLICE OUT got" | error | ", WTF!";
    486 //              return error(this, -ECONNRESET);
    487 //      }
    488 
    489 //      // Did something crazy happen?
    490 //      if(this.f.result > this.len) {
    491 //              mutex(serr) serr | "SPLICE OUT spliced too much!" | this.f.result | ">" | this.len;
    492 //              return error(this.res, -ERANGE);
    493 //      }
    494 
    495 //      // Done?
    496 //      if(this.f.result == this.len) {
    497 //              return done(this.res);
    498 //      }
    499 
    500 // SHORT_WRITE:
    501 //      stats.splcot++;
    502 
    503 //      // It must be a Short Write
    504 //      this.len -= this.f.result;
    505 //      reset(this.f);
    506 //      return retry(this.res);
    507 // }
     198enum FSM_STATE {
     199        Initial,
     200        Retry,
     201        Error,
     202        Done,
     203};
     204
     205struct FSM_Result {
     206        FSM_STATE state;
     207        int error;
     208};
     209
     210static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; }
     211static inline bool is_error(FSM_Result & this) { return Error == this.state; }
     212static inline bool is_done(FSM_Result & this) { return Done == this.state; }
     213
     214static inline int error(FSM_Result & this, int error) {
     215        this.error = error;
     216        this.state = Error;
     217        return error;
     218}
     219
     220static inline int done(FSM_Result & this) {
     221        this.state = Done;
     222        return 0;
     223}
     224
     225static inline int retry(FSM_Result & this) {
     226        this.state = Retry;
     227        return 0;
     228}
     229
     230static inline int need(FSM_Result & this) {
     231        switch(this.state) {
     232                case Initial:
     233                case Retry:
     234                        return 1;
     235                case Error:
     236                        if(this.error == 0) mutex(serr) serr | "State marked error but code is 0";
     237                case Done:
     238                        return 0;
     239        }
     240}
     241
     242// Generator that handles sending the header
     243generator header_g {
     244        io_future_t f;
     245        const char * next;
     246        int fd; size_t len;
     247        FSM_Result res;
     248};
     249
     250static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) {
     251        this.next = it;
     252        this.fd = fd;
     253        this.len = len;
     254}
     255
     256static inline void fill(header_g & this, struct io_uring_sqe * sqe) {
     257        zero_sqe(sqe);
     258        sqe->opcode = IORING_OP_SEND;
     259        sqe->user_data = (uintptr_t)&this.f;
     260        sqe->flags = IOSQE_IO_LINK;
     261        sqe->fd = this.fd;
     262        sqe->addr = (uintptr_t)this.next;
     263        sqe->len = this.len;
     264}
     265
     266static inline int error(header_g & this, int error) {
     267        int ret = close(this.fd);
     268        if( ret != 0 ) {
     269                mutex(serr) serr | "Failed to close fd" | errno;
     270        }
     271        return error(this.res, error);
     272}
     273
     274static inline int wait_and_process(header_g & this, sendfile_stats_t & stats) {
     275        wait(this.f);
     276
     277        // Did something crazy happen?
     278        if(this.f.result > this.len) {
     279                mutex(serr) serr | "HEADER sent too much!";
     280                return error(this, -ERANGE);
     281        }
     282
     283        // Something failed?
     284        if(this.f.result < 0) {
     285                int error = -this.f.result;
     286                if( error == ECONNRESET ) return error(this, -ECONNRESET);
     287                if( error == EPIPE ) return error(this, -EPIPE);
     288                if( error == ECANCELED ) {
     289                        mutex(serr) serr | "HEADER was cancelled, WTF!";
     290                        return error(this, -ECONNRESET);
     291                }
     292                if( error == EAGAIN || error == EWOULDBLOCK) {
     293                        mutex(serr) serr | "HEADER got eagain, WTF!";
     294                        return error(this, -ECONNRESET);
     295                }
     296        }
     297
     298        // Done?
     299        if(this.f.result == this.len) {
     300                return done(this.res);
     301        }
     302
     303        stats.header++;
     304
     305        // It must be a Short read
     306        this.len  -= this.f.result;
     307        this.next += this.f.result;
     308        reset(this.f);
     309        return retry(this.res);
     310}
     311
     312// Generator that handles splicing in a file
     313struct splice_in_t {
     314        io_future_t f;
     315        int fd; int pipe; size_t len; off_t off;
     316        short zipf_idx;
     317        FSM_Result res;
     318};
     319
     320static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) {
     321        this.fd = fd;
     322        this.pipe = pipe;
     323        this.len = len;
     324        this.off = 0;
     325        this.zipf_idx = -1;
     326        STATS: for(i; zipf_cnts) {
     327                if(len <= zipf_sizes[i]) {
     328                        this.zipf_idx = i;
     329                        break STATS;
     330                }
     331        }
     332        if(this.zipf_idx < 0) mutex(serr) serr | "SPLICE IN" | len | " greated than biggest zipf file";
     333}
     334
     335static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) {
     336        zero_sqe(sqe);
     337        sqe->opcode = IORING_OP_SPLICE;
     338        sqe->user_data = (uintptr_t)&this.f;
     339        sqe->flags = 0;
     340        sqe->splice_fd_in = this.fd;
     341        sqe->splice_off_in = this.off;
     342        sqe->fd = this.pipe;
     343        sqe->off = (__u64)-1;
     344        sqe->len = this.len;
     345        sqe->splice_flags = SPLICE_F_MOVE;
     346}
     347
     348static inline int wait_and_process(splice_in_t & this, sendfile_stats_t & stats ) {
     349        wait(this.f);
     350
     351        // Something failed?
     352        if(this.f.result < 0) {
     353                int error = -this.f.result;
     354                if( error == ECONNRESET ) return error(this.res, -ECONNRESET);
     355                if( error == EPIPE ) return error(this.res, -EPIPE);
     356                if( error == ECANCELED ) {
     357                        mutex(serr) serr | "SPLICE IN was cancelled, WTF!";
     358                        return error(this.res, -ECONNRESET);
     359                }
     360                if( error == EAGAIN || error == EWOULDBLOCK) {
     361                        mutex(serr) serr | "SPLICE IN got eagain, WTF!";
     362                        return error(this.res, -ECONNRESET);
     363                }
     364                mutex(serr) serr | "SPLICE IN got" | error | ", WTF!";
     365                return error(this.res, -ECONNRESET);
     366        }
     367
     368        // Did something crazy happen?
     369        if(this.f.result > this.len) {
     370                mutex(serr) serr | "SPLICE IN spliced too much!";
     371                return error(this.res, -ERANGE);
     372        }
     373
     374        // Done?
     375        if(this.f.result == this.len) {
     376                return done(this.res);
     377        }
     378
     379        stats.splcin++;
     380        stats.avgrd[this.zipf_idx].calls++;
     381        stats.avgrd[this.zipf_idx].bytes += this.f.result;
     382
     383        // It must be a Short read
     384        this.len -= this.f.result;
     385        this.off += this.f.result;
     386        reset(this.f);
     387        return retry(this.res);
     388}
     389
     390generator splice_out_g {
     391        io_future_t f;
     392        int pipe; int fd; size_t len;
     393        FSM_Result res;
     394};
     395
     396static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) {
     397        this.pipe = pipe;
     398        this.fd = fd;
     399        this.len = len;
     400}
     401
     402static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) {
     403        zero_sqe(sqe);
     404        sqe->opcode = IORING_OP_SPLICE;
     405        sqe->user_data = (uintptr_t)&this.f;
     406        sqe->flags = 0;
     407        sqe->splice_fd_in = this.pipe;
     408        sqe->splice_off_in = (__u64)-1;
     409        sqe->fd = this.fd;
     410        sqe->off = (__u64)-1;
     411        sqe->len = this.len;
     412        sqe->splice_flags = SPLICE_F_MOVE;
     413}
     414
     415static inline int error(splice_out_g & this, int error) {
     416        int ret = close(this.fd);
     417        if( ret != 0 ) {
     418                mutex(serr) serr | "Failed to close fd" | errno;
     419        }
     420        return error(this.res, error);
     421}
     422
     423static inline void wait_and_process(splice_out_g & this, sendfile_stats_t & stats ) {
     424        wait(this.f);
     425
     426        // Something failed?
     427        if(this.f.result < 0) {
     428                int error = -this.f.result;
     429                if( error == ECONNRESET ) return error(this, -ECONNRESET);
     430                if( error == EPIPE ) return error(this, -EPIPE);
     431                if( error == ECANCELED ) {
     432                        this.f.result = 0;
     433                        goto SHORT_WRITE;
     434                }
     435                if( error == EAGAIN || error == EWOULDBLOCK) {
     436                        mutex(serr) serr | "SPLICE OUT got eagain, WTF!";
     437                        return error(this, -ECONNRESET);
     438                }
     439                mutex(serr) serr | "SPLICE OUT got" | error | ", WTF!";
     440                return error(this, -ECONNRESET);
     441        }
     442
     443        // Did something crazy happen?
     444        if(this.f.result > this.len) {
     445                mutex(serr) serr | "SPLICE OUT spliced too much!" | this.f.result | ">" | this.len;
     446                return error(this.res, -ERANGE);
     447        }
     448
     449        // Done?
     450        if(this.f.result == this.len) {
     451                return done(this.res);
     452        }
     453
     454SHORT_WRITE:
     455        stats.splcot++;
     456
     457        // It must be a Short Write
     458        this.len -= this.f.result;
     459        reset(this.f);
     460        return retry(this.res);
     461}
    508462
    509463int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize, sendfile_stats_t & stats ) {
    510464        stats.calls++;
    511465        #if defined(LINKED_IO)
    512                 // char buffer[512];
    513                 // int len = fill_header(buffer, fsize);
    514                 // header_g header = { fd, buffer, len };
    515                 // splice_in_t splice_in = { ans_fd, pipe[1], fsize };
    516                 // splice_out_g splice_out = { pipe[0], fd, fsize };
    517 
    518                 // RETRY_LOOP: for() {
    519                 //      stats.tries++;
    520                 //      int have = need(header.res) + need(splice_in.res) + 1;
    521                 //      int idx = 0;
    522                 //      struct io_uring_sqe * sqes[3];
    523                 //      __u32 idxs[3];
    524                 //      struct $io_context * ctx = cfa_io_allocate(sqes, idxs, have);
    525 
    526                 //      if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); }
    527                 //      if(need(   header.res)) { fill(header   , sqes[idx++]); }
    528                 //      fill(splice_out, sqes[idx]);
    529 
    530                 //      // Submit everything
    531                 //      asm volatile("": : :"memory");
    532                 //      cfa_io_submit( ctx, idxs, have, false );
    533 
    534                 //      // wait for the results
    535                 //      // Always wait for splice-in to complete as
    536                 //      // we may need to kill the connection if it fails
    537                 //      // If it already completed, this is a no-op
    538                 //      wait_and_process(splice_in, stats);
    539 
    540                 //      if(is_error(splice_in.res)) {
    541                 //              if(splice_in.res.error == -EPIPE) return -ECONNRESET;
    542                 //              mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error;
    543                 //              int ret = close(fd);
    544                 //              if( ret != 0 ) abort( "close in 'answer sendfile' error: (%d) %s\n", (int)errno, strerror(errno) );
    545                 //      }
    546 
    547                 //      // Process the other 2
    548                 //      wait_and_process(header, stats);
    549                 //      wait_and_process(splice_out, stats);
    550 
    551                 //      if(is_done(splice_out.res)) {
    552                 //              break RETRY_LOOP;
    553                 //      }
    554 
    555                 //      // We need to wait for the completion if
    556                 //      // - both completed
    557                 //      // - the header failed
    558                 //      // -
    559 
    560                 //      if(  is_error(header.res)
    561                 //        || is_error(splice_in.res)
    562                 //        || is_error(splice_out.res)) {
    563                 //              return -ECONNRESET;
    564                 //      }
    565                 // }
    566 
    567                 // return len + fsize;
     466                char buffer[512];
     467                int len = fill_header(buffer, fsize);
     468                header_g header = { fd, buffer, len };
     469                splice_in_t splice_in = { ans_fd, pipe[1], fsize };
     470                splice_out_g splice_out = { pipe[0], fd, fsize };
     471
     472                RETRY_LOOP: for() {
     473                        stats.tries++;
     474                        int have = need(header.res) + need(splice_in.res) + 1;
     475                        int idx = 0;
     476                        struct io_uring_sqe * sqes[3];
     477                        __u32 idxs[3];
     478                        struct io_context$ * ctx = cfa_io_allocate(sqes, idxs, have);
     479
     480                        if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); }
     481                        if(need(   header.res)) { fill(header   , sqes[idx++]); }
     482                        fill(splice_out, sqes[idx]);
     483
     484                        // Submit everything
     485                        asm volatile("": : :"memory");
     486                        cfa_io_submit( ctx, idxs, have, false );
     487
     488                        // wait for the results
     489                        // Always wait for splice-in to complete as
     490                        // we may need to kill the connection if it fails
     491                        // If it already completed, this is a no-op
     492                        wait_and_process(splice_in, stats);
     493
     494                        if(is_error(splice_in.res)) {
     495                                if(splice_in.res.error == -EPIPE) return -ECONNRESET;
     496                                mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error;
     497                                close(fd);
     498                        }
     499
     500                        // Process the other 2
     501                        wait_and_process(header, stats);
     502                        wait_and_process(splice_out, stats);
     503
     504                        if(is_done(splice_out.res)) {
     505                                break RETRY_LOOP;
     506                        }
     507
     508                        // We need to wait for the completion if
     509                        // - both completed
     510                        // - the header failed
     511                        // -
     512
     513                        if(  is_error(header.res)
     514                          || is_error(splice_in.res)
     515                          || is_error(splice_out.res)) {
     516                                return -ECONNRESET;
     517                        }
     518                }
     519
     520                return len + fsize;
    568521        #else
    569522                int ret = answer_header(fd, fsize);
    570                 if( ret < 0 ) { return ret; }
     523                if( ret < 0 ) { close(fd); return ret; }
    571524                return sendfile(pipe, fd, ans_fd, fsize, stats);
    572525        #endif
     
    588541                }
    589542                // int ret = read(fd, (void*)it, count);
    590                 if(ret == 0 ) {
    591                         ret = close(fd);
    592                         if( ret != 0 ) abort( "close in 'http read good' error: (%d) %s\n", (int)errno, strerror(errno) );
    593                         return [OK200, true, 0, 0];
    594                 }
     543                if(ret == 0 ) { close(fd); return [OK200, true, 0, 0]; }
    595544                if(ret < 0 ) {
    596                         if( errno == ECONNRESET || errno == EPIPE ) {
    597                                 ret = close(fd);
    598                                 if( ret != 0 ) abort( "close in 'http read bad' error: (%d) %s\n", (int)errno, strerror(errno) );
    599                                 return [E408, true, 0, 0];
    600                         }
     545                        if( errno == ECONNRESET ) { close(fd); return [E408, true, 0, 0]; }
     546                        if( errno == EPIPE ) { close(fd); return [E408, true, 0, 0]; }
    601547                        abort( "read error: (%d) %s\n", (int)errno, strerror(errno) );
    602548                }
Note: See TracChangeset for help on using the changeset viewer.