Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/protocol.cfa

    r8bee858 r32d1383  
    1818extern "C" {
    1919      int snprintf ( char * s, size_t n, const char * format, ... );
     20        ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);
    2021        // #include <linux/io_uring.h>
    2122}
     
    3031#define PLAINTEXT_NOCOPY
    3132// #define LINKED_IO
     33#define TRUE_SENDFILE
    3234
    3335static inline __s32 wait_res( io_future_t & this ) {
     
    7173                int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY);
    7274                if( ret < 0 ) {
    73                         if( errno == ECONNRESET || errno == EPIPE ) { close(fd); return -ECONNRESET; }
     75                        if( errno == ECONNRESET || errno == EPIPE || errno == EBADF ) {
     76                                ret = close(fd);
     77                                if( ret != 0 ) abort( "close in 'answer' error: (%d) %s\n", (int)errno, strerror(errno) );
     78                                return -ECONNRESET;
     79                        }
    7480
    7581                        abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) );
     
    8389}
    8490
    85 int answer_error( int fd, HttpCode code ) {
    86         /* paranoid */ assert( code < KNOWN_CODES && code != OK200 );
    87         int idx = (int)code;
    88         return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len );
    89 }
     91// int answer_error( int fd, HttpCode code ) {
     92//      /* paranoid */ assert( code < KNOWN_CODES && code != OK200 );
     93//      int idx = (int)code;
     94//      return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len );
     95// }
    9096
    9197static int fill_header(char * it, size_t size) {
     
    103109}
    104110
    105 #if defined(PLAINTEXT_NOCOPY)
    106 int answer_plaintext( int fd ) {
    107         return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len); // +1 cause snprintf doesn't count nullterminator
    108 }
    109 #elif defined(PLAINTEXT_MEMCPY)
    110 #define TEXTSIZE 15
    111 int answer_plaintext( int fd ) {
    112         char text[] = "Hello, World!\n\n";
    113         char ts[] = xstr(TEXTSIZE) " \n\n";
    114         _Static_assert(sizeof(text) - 1 == TEXTSIZE);
    115         char buffer[512 + TEXTSIZE];
    116         char * it = buffer;
    117         memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
    118         it += http_msgs[OK200]->len;
    119         int len = http_msgs[OK200]->len;
    120         memcpy(it, ts, sizeof(ts) - 1);
    121         it += sizeof(ts) - 1;
    122         len += sizeof(ts) - 1;
    123         memcpy(it, text, TEXTSIZE);
    124         return answer(fd, buffer, len + TEXTSIZE);
    125 }
    126 #elif defined(PLAINTEXT_1WRITE)
    127 int answer_plaintext( int fd ) {
    128         char text[] = "Hello, World!\n\n";
    129         char buffer[512 + sizeof(text)];
    130         char * it = buffer;
    131         memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
    132         it += http_msgs[OK200]->len;
    133         int len = http_msgs[OK200]->len;
    134         int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text));
    135         it += r;
    136         len += r;
    137         memcpy(it, text, sizeof(text));
    138         return answer(fd, buffer, len + sizeof(text));
    139 }
    140 #else
    141 int answer_plaintext( int fd ) {
    142         char text[] = "Hello, World!\n\n";
    143         int ret = answer_header(fd, sizeof(text));
    144         if( ret < 0 ) return ret;
    145         return answer(fd, text, sizeof(text));
    146 }
    147 #endif
    148 
    149 int answer_empty( int fd ) {
    150         return answer_header(fd, 0);
    151 }
     111// #if defined(PLAINTEXT_NOCOPY)
     112// int answer_plaintext( int fd ) {
     113//      return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len); // +1 cause snprintf doesn't count nullterminator
     114// }
     115// #elif defined(PLAINTEXT_MEMCPY)
     116// #define TEXTSIZE 15
     117// int answer_plaintext( int fd ) {
     118//      char text[] = "Hello, World!\n\n";
     119//      char ts[] = xstr(TEXTSIZE) " \n\n";
     120//      _Static_assert(sizeof(text) - 1 == TEXTSIZE);
     121//      char buffer[512 + TEXTSIZE];
     122//      char * it = buffer;
     123//      memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
     124//      it += http_msgs[OK200]->len;
     125//      int len = http_msgs[OK200]->len;
     126//      memcpy(it, ts, sizeof(ts) - 1);
     127//      it += sizeof(ts) - 1;
     128//      len += sizeof(ts) - 1;
     129//      memcpy(it, text, TEXTSIZE);
     130//      return answer(fd, buffer, len + TEXTSIZE);
     131// }
     132// #elif defined(PLAINTEXT_1WRITE)
     133// int answer_plaintext( int fd ) {
     134//      char text[] = "Hello, World!\n\n";
     135//      char buffer[512 + sizeof(text)];
     136//      char * it = buffer;
     137//      memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
     138//      it += http_msgs[OK200]->len;
     139//      int len = http_msgs[OK200]->len;
     140//      int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text));
     141//      it += r;
     142//      len += r;
     143//      memcpy(it, text, sizeof(text));
     144//      return answer(fd, buffer, len + sizeof(text));
     145// }
     146// #else
     147// int answer_plaintext( int fd ) {
     148//      char text[] = "Hello, World!\n\n";
     149//      int ret = answer_header(fd, sizeof(text));
     150//      if( ret < 0 ) return ret;
     151//      return answer(fd, text, sizeof(text));
     152// }
     153// #endif
     154
     155// int answer_empty( int fd ) {
     156//      return answer_header(fd, 0);
     157// }
    152158
    153159static int sendfile( int pipe[2], int fd, int ans_fd, size_t count, sendfile_stats_t & stats ) {
     
    161167        if(zipf_idx < 0) mutex(serr) serr | "SENDFILE" | count | " greated than biggest zipf file";
    162168
    163 
    164         unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE;
    165         off_t offset = 0;
    166         ssize_t ret;
    167         SPLICE1: while(count > 0) {
    168                 stats.tries++;
    169                 // ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY);
    170                 ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags);
    171                 if( ret <= 0 ) {
    172                         if( errno == ECONNRESET ) return -ECONNRESET;
    173                         if( errno == EPIPE ) return -EPIPE;
    174                         abort( "splice [0] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
    175                 }
    176                 count -= ret;
    177                 stats.splcin++;
    178                 if(count > 0) stats.avgrd[zipf_idx].calls++;
    179                 stats.avgrd[zipf_idx].bytes += ret;
    180 
    181                 size_t in_pipe = ret;
    182                 SPLICE2: while(in_pipe > 0) {
    183                         // ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY);
    184                         ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags);
     169        #if defined(TRUE_SENDFILE)
     170                off_t offset = 0;
     171                ssize_t ret;
     172                int flags = fcntl(fd, F_GETFL);
     173                if(flags < 0) abort("getfl in 'true sendfile' error: (%d) %s\n", (int)errno, strerror(errno) );
     174                ret = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
     175                if(ret < 0) abort("setfl in 'true sendfile' error: (%d) %s\n", (int)errno, strerror(errno) );
     176
     177                while(count) {
     178                        ret = sendfile(fd, ans_fd, &offset, count);
    185179                        if( ret <= 0 ) {
    186                                 if( errno == ECONNRESET ) return -ECONNRESET;
    187                                 if( errno == EPIPE ) return -EPIPE;
    188                                 abort( "splice [1] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
     180                                if( errno == EAGAIN || errno == EWOULDBLOCK ) {
     181                                        stats.eagain++;
     182                                        yield();
     183                                        continue;
     184                                }
     185                                if( errno == ECONNRESET || errno == EPIPE ) {
     186                                        ret = close(fd);
     187                                        if( ret != 0 ) abort( "close in 'true sendfile' error: (%d) %s\n", (int)errno, strerror(errno) );
     188                                        return -ECONNRESET;
     189                                }
     190                                abort( "sendfile error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
    189191                        }
    190                         stats.splcot++;
    191                         in_pipe -= ret;
    192                 }
    193 
    194         }
     192                        count -= ret;
     193                        stats.splcin++;
     194                        if(count > 0) stats.avgrd[zipf_idx].calls++;
     195                        stats.avgrd[zipf_idx].bytes += ret;
     196                }
     197
     198                ret = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
     199                if(ret < 0) abort("resetfl in 'true sendfile' error: (%d) %s\n", (int)errno, strerror(errno) );
     200        #else
     201                #error not implemented
     202                // unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE;
     203                // off_t offset = 0;
     204                // ssize_t ret;
     205                // SPLICE1: while(count > 0) {
     206                //      stats.tries++;
     207                //      // ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY);
     208                //      ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags);
     209                //      if( ret <= 0 ) {
     210                //              if( errno == ECONNRESET || errno == EPIPE ) {
     211                //                      ret = close(fd);
     212                //                      if( ret != 0 ) abort( "close in 'sendfile splice in' error: (%d) %s\n", (int)errno, strerror(errno) );
     213                //                      return -ECONNRESET;
     214                //              }
     215                //              abort( "splice [0] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
     216                //      }
     217                //      count -= ret;
     218                //      stats.splcin++;
     219                //      if(count > 0) stats.avgrd[zipf_idx].calls++;
     220                //      stats.avgrd[zipf_idx].bytes += ret;
     221
     222                //      size_t in_pipe = ret;
     223                //      SPLICE2: while(in_pipe > 0) {
     224                //              ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY);
     225                //              // ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags);
     226                //              if( ret <= 0 ) {
     227                //                      if( errno == ECONNRESET || errno == EPIPE ) {
     228                //                              ret = close(fd);
     229                //                              if( ret != 0 ) abort( "close in 'sendfile splice out' error: (%d) %s\n", (int)errno, strerror(errno) );
     230                //                              return -ECONNRESET;
     231                //                      }
     232                //                      abort( "splice [1] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
     233                //              }
     234                //              stats.splcot++;
     235                //              in_pipe -= ret;
     236                //      }
     237
     238                // }
     239        #endif
     240
    195241        return count;
    196242}
    197243
    198 enum FSM_STATE {
    199         Initial,
    200         Retry,
    201         Error,
    202         Done,
    203 };
    204 
    205 struct FSM_Result {
    206         FSM_STATE state;
    207         int error;
    208 };
    209 
    210 static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; }
    211 static inline bool is_error(FSM_Result & this) { return Error == this.state; }
    212 static inline bool is_done(FSM_Result & this) { return Done == this.state; }
    213 
    214 static inline int error(FSM_Result & this, int error) {
    215         this.error = error;
    216         this.state = Error;
    217         return error;
    218 }
    219 
    220 static inline int done(FSM_Result & this) {
    221         this.state = Done;
    222         return 0;
    223 }
    224 
    225 static inline int retry(FSM_Result & this) {
    226         this.state = Retry;
    227         return 0;
    228 }
    229 
    230 static inline int need(FSM_Result & this) {
    231         switch(this.state) {
    232                 case Initial:
    233                 case Retry:
    234                         return 1;
    235                 case Error:
    236                         if(this.error == 0) mutex(serr) serr | "State marked error but code is 0";
    237                 case Done:
    238                         return 0;
    239         }
    240 }
    241 
    242 // Generator that handles sending the header
    243 generator header_g {
    244         io_future_t f;
    245         const char * next;
    246         int fd; size_t len;
    247         FSM_Result res;
    248 };
    249 
    250 static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) {
    251         this.next = it;
    252         this.fd = fd;
    253         this.len = len;
    254 }
    255 
    256 static inline void fill(header_g & this, struct io_uring_sqe * sqe) {
    257         zero_sqe(sqe);
    258         sqe->opcode = IORING_OP_SEND;
    259         sqe->user_data = (uintptr_t)&this.f;
    260         sqe->flags = IOSQE_IO_LINK;
    261         sqe->fd = this.fd;
    262         sqe->addr = (uintptr_t)this.next;
    263         sqe->len = this.len;
    264 }
    265 
    266 static inline int error(header_g & this, int error) {
    267         int ret = close(this.fd);
    268         if( ret != 0 ) {
    269                 mutex(serr) serr | "Failed to close fd" | errno;
    270         }
    271         return error(this.res, error);
    272 }
    273 
    274 static inline int wait_and_process(header_g & this, sendfile_stats_t & stats) {
    275         wait(this.f);
    276 
    277         // Did something crazy happen?
    278         if(this.f.result > this.len) {
    279                 mutex(serr) serr | "HEADER sent too much!";
    280                 return error(this, -ERANGE);
    281         }
    282 
    283         // Something failed?
    284         if(this.f.result < 0) {
    285                 int error = -this.f.result;
    286                 if( error == ECONNRESET ) return error(this, -ECONNRESET);
    287                 if( error == EPIPE ) return error(this, -EPIPE);
    288                 if( error == ECANCELED ) {
    289                         mutex(serr) serr | "HEADER was cancelled, WTF!";
    290                         return error(this, -ECONNRESET);
    291                 }
    292                 if( error == EAGAIN || error == EWOULDBLOCK) {
    293                         mutex(serr) serr | "HEADER got eagain, WTF!";
    294                         return error(this, -ECONNRESET);
    295                 }
    296         }
    297 
    298         // Done?
    299         if(this.f.result == this.len) {
    300                 return done(this.res);
    301         }
    302 
    303         stats.header++;
    304 
    305         // It must be a Short read
    306         this.len  -= this.f.result;
    307         this.next += this.f.result;
    308         reset(this.f);
    309         return retry(this.res);
    310 }
    311 
    312 // Generator that handles splicing in a file
    313 struct splice_in_t {
    314         io_future_t f;
    315         int fd; int pipe; size_t len; off_t off;
    316         short zipf_idx;
    317         FSM_Result res;
    318 };
    319 
    320 static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) {
    321         this.fd = fd;
    322         this.pipe = pipe;
    323         this.len = len;
    324         this.off = 0;
    325         this.zipf_idx = -1;
    326         STATS: for(i; zipf_cnts) {
    327                 if(len <= zipf_sizes[i]) {
    328                         this.zipf_idx = i;
    329                         break STATS;
    330                 }
    331         }
    332         if(this.zipf_idx < 0) mutex(serr) serr | "SPLICE IN" | len | " greated than biggest zipf file";
    333 }
    334 
    335 static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) {
    336         zero_sqe(sqe);
    337         sqe->opcode = IORING_OP_SPLICE;
    338         sqe->user_data = (uintptr_t)&this.f;
    339         sqe->flags = 0;
    340         sqe->splice_fd_in = this.fd;
    341         sqe->splice_off_in = this.off;
    342         sqe->fd = this.pipe;
    343         sqe->off = (__u64)-1;
    344         sqe->len = this.len;
    345         sqe->splice_flags = SPLICE_F_MOVE;
    346 }
    347 
    348 static inline int wait_and_process(splice_in_t & this, sendfile_stats_t & stats ) {
    349         wait(this.f);
    350 
    351         // Something failed?
    352         if(this.f.result < 0) {
    353                 int error = -this.f.result;
    354                 if( error == ECONNRESET ) return error(this.res, -ECONNRESET);
    355                 if( error == EPIPE ) return error(this.res, -EPIPE);
    356                 if( error == ECANCELED ) {
    357                         mutex(serr) serr | "SPLICE IN was cancelled, WTF!";
    358                         return error(this.res, -ECONNRESET);
    359                 }
    360                 if( error == EAGAIN || error == EWOULDBLOCK) {
    361                         mutex(serr) serr | "SPLICE IN got eagain, WTF!";
    362                         return error(this.res, -ECONNRESET);
    363                 }
    364                 mutex(serr) serr | "SPLICE IN got" | error | ", WTF!";
    365                 return error(this.res, -ECONNRESET);
    366         }
    367 
    368         // Did something crazy happen?
    369         if(this.f.result > this.len) {
    370                 mutex(serr) serr | "SPLICE IN spliced too much!";
    371                 return error(this.res, -ERANGE);
    372         }
    373 
    374         // Done?
    375         if(this.f.result == this.len) {
    376                 return done(this.res);
    377         }
    378 
    379         stats.splcin++;
    380         stats.avgrd[this.zipf_idx].calls++;
    381         stats.avgrd[this.zipf_idx].bytes += this.f.result;
    382 
    383         // It must be a Short read
    384         this.len -= this.f.result;
    385         this.off += this.f.result;
    386         reset(this.f);
    387         return retry(this.res);
    388 }
    389 
    390 generator splice_out_g {
    391         io_future_t f;
    392         int pipe; int fd; size_t len;
    393         FSM_Result res;
    394 };
    395 
    396 static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) {
    397         this.pipe = pipe;
    398         this.fd = fd;
    399         this.len = len;
    400 }
    401 
    402 static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) {
    403         zero_sqe(sqe);
    404         sqe->opcode = IORING_OP_SPLICE;
    405         sqe->user_data = (uintptr_t)&this.f;
    406         sqe->flags = 0;
    407         sqe->splice_fd_in = this.pipe;
    408         sqe->splice_off_in = (__u64)-1;
    409         sqe->fd = this.fd;
    410         sqe->off = (__u64)-1;
    411         sqe->len = this.len;
    412         sqe->splice_flags = SPLICE_F_MOVE;
    413 }
    414 
    415 static inline int error(splice_out_g & this, int error) {
    416         int ret = close(this.fd);
    417         if( ret != 0 ) {
    418                 mutex(serr) serr | "Failed to close fd" | errno;
    419         }
    420         return error(this.res, error);
    421 }
    422 
    423 static inline void wait_and_process(splice_out_g & this, sendfile_stats_t & stats ) {
    424         wait(this.f);
    425 
    426         // Something failed?
    427         if(this.f.result < 0) {
    428                 int error = -this.f.result;
    429                 if( error == ECONNRESET ) return error(this, -ECONNRESET);
    430                 if( error == EPIPE ) return error(this, -EPIPE);
    431                 if( error == ECANCELED ) {
    432                         this.f.result = 0;
    433                         goto SHORT_WRITE;
    434                 }
    435                 if( error == EAGAIN || error == EWOULDBLOCK) {
    436                         mutex(serr) serr | "SPLICE OUT got eagain, WTF!";
    437                         return error(this, -ECONNRESET);
    438                 }
    439                 mutex(serr) serr | "SPLICE OUT got" | error | ", WTF!";
    440                 return error(this, -ECONNRESET);
    441         }
    442 
    443         // Did something crazy happen?
    444         if(this.f.result > this.len) {
    445                 mutex(serr) serr | "SPLICE OUT spliced too much!" | this.f.result | ">" | this.len;
    446                 return error(this.res, -ERANGE);
    447         }
    448 
    449         // Done?
    450         if(this.f.result == this.len) {
    451                 return done(this.res);
    452         }
    453 
    454 SHORT_WRITE:
    455         stats.splcot++;
    456 
    457         // It must be a Short Write
    458         this.len -= this.f.result;
    459         reset(this.f);
    460         return retry(this.res);
    461 }
     244// enum FSM_STATE {
     245//      Initial,
     246//      Retry,
     247//      Error,
     248//      Done,
     249// };
     250
     251// struct FSM_Result {
     252//      FSM_STATE state;
     253//      int error;
     254// };
     255
     256// static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; }
     257// static inline bool is_error(FSM_Result & this) { return Error == this.state; }
     258// static inline bool is_done(FSM_Result & this) { return Done == this.state; }
     259
     260// static inline int error(FSM_Result & this, int error) {
     261//      this.error = error;
     262//      this.state = Error;
     263//      return error;
     264// }
     265
     266// static inline int done(FSM_Result & this) {
     267//      this.state = Done;
     268//      return 0;
     269// }
     270
     271// static inline int retry(FSM_Result & this) {
     272//      this.state = Retry;
     273//      return 0;
     274// }
     275
     276// static inline int need(FSM_Result & this) {
     277//      switch(this.state) {
     278//              case Initial:
     279//              case Retry:
     280//                      return 1;
     281//              case Error:
     282//                      if(this.error == 0) mutex(serr) serr | "State marked error but code is 0";
     283//              case Done:
     284//                      return 0;
     285//      }
     286// }
     287
     288// // Generator that handles sending the header
     289// generator header_g {
     290//      io_future_t f;
     291//      const char * next;
     292//      int fd; size_t len;
     293//      FSM_Result res;
     294// };
     295
     296// static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) {
     297//      this.next = it;
     298//      this.fd = fd;
     299//      this.len = len;
     300// }
     301
     302// static inline void fill(header_g & this, struct io_uring_sqe * sqe) {
     303//      zero_sqe(sqe);
     304//      sqe->opcode = IORING_OP_SEND;
     305//      sqe->user_data = (uintptr_t)&this.f;
     306//      sqe->flags = IOSQE_IO_LINK;
     307//      sqe->fd = this.fd;
     308//      sqe->addr = (uintptr_t)this.next;
     309//      sqe->len = this.len;
     310// }
     311
     312// static inline int error(header_g & this, int error) {
     313//      int ret = close(this.fd);
     314//      if( ret != 0 ) {
     315//              mutex(serr) serr | "Failed to close fd" | errno;
     316//      }
     317//      return error(this.res, error);
     318// }
     319
     320// static inline int wait_and_process(header_g & this, sendfile_stats_t & stats) {
     321//      wait(this.f);
     322
     323//      // Did something crazy happen?
     324//      if(this.f.result > this.len) {
     325//              mutex(serr) serr | "HEADER sent too much!";
     326//              return error(this, -ERANGE);
     327//      }
     328
     329//      // Something failed?
     330//      if(this.f.result < 0) {
     331//              int error = -this.f.result;
     332//              if( error == ECONNRESET ) return error(this, -ECONNRESET);
     333//              if( error == EPIPE ) return error(this, -EPIPE);
     334//              if( error == ECANCELED ) {
     335//                      mutex(serr) serr | "HEADER was cancelled, WTF!";
     336//                      return error(this, -ECONNRESET);
     337//              }
     338//              if( error == EAGAIN || error == EWOULDBLOCK) {
     339//                      mutex(serr) serr | "HEADER got eagain, WTF!";
     340//                      return error(this, -ECONNRESET);
     341//              }
     342//      }
     343
     344//      // Done?
     345//      if(this.f.result == this.len) {
     346//              return done(this.res);
     347//      }
     348
     349//      stats.header++;
     350
     351//      // It must be a Short read
     352//      this.len  -= this.f.result;
     353//      this.next += this.f.result;
     354//      reset(this.f);
     355//      return retry(this.res);
     356// }
     357
     358// // Generator that handles splicing in a file
     359// struct splice_in_t {
     360//      io_future_t f;
     361//      int fd; int pipe; size_t len; off_t off;
     362//      short zipf_idx;
     363//      FSM_Result res;
     364// };
     365
     366// static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) {
     367//      this.fd = fd;
     368//      this.pipe = pipe;
     369//      this.len = len;
     370//      this.off = 0;
     371//      this.zipf_idx = -1;
     372//      STATS: for(i; zipf_cnts) {
     373//              if(len <= zipf_sizes[i]) {
     374//                      this.zipf_idx = i;
     375//                      break STATS;
     376//              }
     377//      }
     378//      if(this.zipf_idx < 0) mutex(serr) serr | "SPLICE IN" | len | " greated than biggest zipf file";
     379// }
     380
     381// static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) {
     382//      zero_sqe(sqe);
     383//      sqe->opcode = IORING_OP_SPLICE;
     384//      sqe->user_data = (uintptr_t)&this.f;
     385//      sqe->flags = 0;
     386//      sqe->splice_fd_in = this.fd;
     387//      sqe->splice_off_in = this.off;
     388//      sqe->fd = this.pipe;
     389//      sqe->off = (__u64)-1;
     390//      sqe->len = this.len;
     391//      sqe->splice_flags = SPLICE_F_MOVE;
     392// }
     393
     394// static inline int wait_and_process(splice_in_t & this, sendfile_stats_t & stats ) {
     395//      wait(this.f);
     396
     397//      // Something failed?
     398//      if(this.f.result < 0) {
     399//              int error = -this.f.result;
     400//              if( error == ECONNRESET ) return error(this.res, -ECONNRESET);
     401//              if( error == EPIPE ) return error(this.res, -EPIPE);
     402//              if( error == ECANCELED ) {
     403//                      mutex(serr) serr | "SPLICE IN was cancelled, WTF!";
     404//                      return error(this.res, -ECONNRESET);
     405//              }
     406//              if( error == EAGAIN || error == EWOULDBLOCK) {
     407//                      mutex(serr) serr | "SPLICE IN got eagain, WTF!";
     408//                      return error(this.res, -ECONNRESET);
     409//              }
     410//              mutex(serr) serr | "SPLICE IN got" | error | ", WTF!";
     411//              return error(this.res, -ECONNRESET);
     412//      }
     413
     414//      // Did something crazy happen?
     415//      if(this.f.result > this.len) {
     416//              mutex(serr) serr | "SPLICE IN spliced too much!";
     417//              return error(this.res, -ERANGE);
     418//      }
     419
     420//      // Done?
     421//      if(this.f.result == this.len) {
     422//              return done(this.res);
     423//      }
     424
     425//      stats.splcin++;
     426//      stats.avgrd[this.zipf_idx].calls++;
     427//      stats.avgrd[this.zipf_idx].bytes += this.f.result;
     428
     429//      // It must be a Short read
     430//      this.len -= this.f.result;
     431//      this.off += this.f.result;
     432//      reset(this.f);
     433//      return retry(this.res);
     434// }
     435
     436// generator splice_out_g {
     437//      io_future_t f;
     438//      int pipe; int fd; size_t len;
     439//      FSM_Result res;
     440// };
     441
     442// static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) {
     443//      this.pipe = pipe;
     444//      this.fd = fd;
     445//      this.len = len;
     446// }
     447
     448// static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) {
     449//      zero_sqe(sqe);
     450//      sqe->opcode = IORING_OP_SPLICE;
     451//      sqe->user_data = (uintptr_t)&this.f;
     452//      sqe->flags = 0;
     453//      sqe->splice_fd_in = this.pipe;
     454//      sqe->splice_off_in = (__u64)-1;
     455//      sqe->fd = this.fd;
     456//      sqe->off = (__u64)-1;
     457//      sqe->len = this.len;
     458//      sqe->splice_flags = SPLICE_F_MOVE;
     459// }
     460
     461// static inline int error(splice_out_g & this, int error) {
     462//      int ret = close(this.fd);
     463//      if( ret != 0 ) {
     464//              mutex(serr) serr | "Failed to close fd" | errno;
     465//      }
     466//      return error(this.res, error);
     467// }
     468
     469// static inline void wait_and_process(splice_out_g & this, sendfile_stats_t & stats ) {
     470//      wait(this.f);
     471
     472//      // Something failed?
     473//      if(this.f.result < 0) {
     474//              int error = -this.f.result;
     475//              if( error == ECONNRESET ) return error(this, -ECONNRESET);
     476//              if( error == EPIPE ) return error(this, -EPIPE);
     477//              if( error == ECANCELED ) {
     478//                      this.f.result = 0;
     479//                      goto SHORT_WRITE;
     480//              }
     481//              if( error == EAGAIN || error == EWOULDBLOCK) {
     482//                      mutex(serr) serr | "SPLICE OUT got eagain, WTF!";
     483//                      return error(this, -ECONNRESET);
     484//              }
     485//              mutex(serr) serr | "SPLICE OUT got" | error | ", WTF!";
     486//              return error(this, -ECONNRESET);
     487//      }
     488
     489//      // Did something crazy happen?
     490//      if(this.f.result > this.len) {
     491//              mutex(serr) serr | "SPLICE OUT spliced too much!" | this.f.result | ">" | this.len;
     492//              return error(this.res, -ERANGE);
     493//      }
     494
     495//      // Done?
     496//      if(this.f.result == this.len) {
     497//              return done(this.res);
     498//      }
     499
     500// SHORT_WRITE:
     501//      stats.splcot++;
     502
     503//      // It must be a Short Write
     504//      this.len -= this.f.result;
     505//      reset(this.f);
     506//      return retry(this.res);
     507// }
    462508
    463509int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize, sendfile_stats_t & stats ) {
    464510        stats.calls++;
    465511        #if defined(LINKED_IO)
    466                 char buffer[512];
    467                 int len = fill_header(buffer, fsize);
    468                 header_g header = { fd, buffer, len };
    469                 splice_in_t splice_in = { ans_fd, pipe[1], fsize };
    470                 splice_out_g splice_out = { pipe[0], fd, fsize };
    471 
    472                 RETRY_LOOP: for() {
    473                         stats.tries++;
    474                         int have = need(header.res) + need(splice_in.res) + 1;
    475                         int idx = 0;
    476                         struct io_uring_sqe * sqes[3];
    477                         __u32 idxs[3];
    478                         struct io_context$ * ctx = cfa_io_allocate(sqes, idxs, have);
    479 
    480                         if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); }
    481                         if(need(   header.res)) { fill(header   , sqes[idx++]); }
    482                         fill(splice_out, sqes[idx]);
    483 
    484                         // Submit everything
    485                         asm volatile("": : :"memory");
    486                         cfa_io_submit( ctx, idxs, have, false );
    487 
    488                         // wait for the results
    489                         // Always wait for splice-in to complete as
    490                         // we may need to kill the connection if it fails
    491                         // If it already completed, this is a no-op
    492                         wait_and_process(splice_in, stats);
    493 
    494                         if(is_error(splice_in.res)) {
    495                                 if(splice_in.res.error == -EPIPE) return -ECONNRESET;
    496                                 mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error;
    497                                 close(fd);
    498                         }
    499 
    500                         // Process the other 2
    501                         wait_and_process(header, stats);
    502                         wait_and_process(splice_out, stats);
    503 
    504                         if(is_done(splice_out.res)) {
    505                                 break RETRY_LOOP;
    506                         }
    507 
    508                         // We need to wait for the completion if
    509                         // - both completed
    510                         // - the header failed
    511                         // -
    512 
    513                         if(  is_error(header.res)
    514                           || is_error(splice_in.res)
    515                           || is_error(splice_out.res)) {
    516                                 return -ECONNRESET;
    517                         }
    518                 }
    519 
    520                 return len + fsize;
     512                // char buffer[512];
     513                // int len = fill_header(buffer, fsize);
     514                // header_g header = { fd, buffer, len };
     515                // splice_in_t splice_in = { ans_fd, pipe[1], fsize };
     516                // splice_out_g splice_out = { pipe[0], fd, fsize };
     517
     518                // RETRY_LOOP: for() {
     519                //      stats.tries++;
     520                //      int have = need(header.res) + need(splice_in.res) + 1;
     521                //      int idx = 0;
     522                //      struct io_uring_sqe * sqes[3];
     523                //      __u32 idxs[3];
     524                //      struct $io_context * ctx = cfa_io_allocate(sqes, idxs, have);
     525
     526                //      if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); }
     527                //      if(need(   header.res)) { fill(header   , sqes[idx++]); }
     528                //      fill(splice_out, sqes[idx]);
     529
     530                //      // Submit everything
     531                //      asm volatile("": : :"memory");
     532                //      cfa_io_submit( ctx, idxs, have, false );
     533
     534                //      // wait for the results
     535                //      // Always wait for splice-in to complete as
     536                //      // we may need to kill the connection if it fails
     537                //      // If it already completed, this is a no-op
     538                //      wait_and_process(splice_in, stats);
     539
     540                //      if(is_error(splice_in.res)) {
     541                //              if(splice_in.res.error == -EPIPE) return -ECONNRESET;
     542                //              mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error;
     543                //              int ret = close(fd);
     544                //              if( ret != 0 ) abort( "close in 'answer sendfile' error: (%d) %s\n", (int)errno, strerror(errno) );
     545                //      }
     546
     547                //      // Process the other 2
     548                //      wait_and_process(header, stats);
     549                //      wait_and_process(splice_out, stats);
     550
     551                //      if(is_done(splice_out.res)) {
     552                //              break RETRY_LOOP;
     553                //      }
     554
     555                //      // We need to wait for the completion if
     556                //      // - both completed
     557                //      // - the header failed
     558                //      // -
     559
     560                //      if(  is_error(header.res)
     561                //        || is_error(splice_in.res)
     562                //        || is_error(splice_out.res)) {
     563                //              return -ECONNRESET;
     564                //      }
     565                // }
     566
     567                // return len + fsize;
    521568        #else
    522569                int ret = answer_header(fd, fsize);
    523                 if( ret < 0 ) { close(fd); return ret; }
     570                if( ret < 0 ) { return ret; }
    524571                return sendfile(pipe, fd, ans_fd, fsize, stats);
    525572        #endif
     
    541588                }
    542589                // int ret = read(fd, (void*)it, count);
    543                 if(ret == 0 ) { close(fd); return [OK200, true, 0, 0]; }
     590                if(ret == 0 ) {
     591                        ret = close(fd);
     592                        if( ret != 0 ) abort( "close in 'http read good' error: (%d) %s\n", (int)errno, strerror(errno) );
     593                        return [OK200, true, 0, 0];
     594                }
    544595                if(ret < 0 ) {
    545                         if( errno == ECONNRESET ) { close(fd); return [E408, true, 0, 0]; }
    546                         if( errno == EPIPE ) { close(fd); return [E408, true, 0, 0]; }
     596                        if( errno == ECONNRESET || errno == EPIPE ) {
     597                                ret = close(fd);
     598                                if( ret != 0 ) abort( "close in 'http read bad' error: (%d) %s\n", (int)errno, strerror(errno) );
     599                                return [E408, true, 0, 0];
     600                        }
    547601                        abort( "read error: (%d) %s\n", (int)errno, strerror(errno) );
    548602                }
Note: See TracChangeset for help on using the changeset viewer.