Changes in benchmark/io/http/protocol.cfa [8bee858:32d1383]
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/protocol.cfa
r8bee858 r32d1383 18 18 extern "C" { 19 19 int snprintf ( char * s, size_t n, const char * format, ... ); 20 ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count); 20 21 // #include <linux/io_uring.h> 21 22 } … … 30 31 #define PLAINTEXT_NOCOPY 31 32 // #define LINKED_IO 33 #define TRUE_SENDFILE 32 34 33 35 static inline __s32 wait_res( io_future_t & this ) { … … 71 73 int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY); 72 74 if( ret < 0 ) { 73 if( errno == ECONNRESET || errno == EPIPE ) { close(fd); return -ECONNRESET; } 75 if( errno == ECONNRESET || errno == EPIPE || errno == EBADF ) { 76 ret = close(fd); 77 if( ret != 0 ) abort( "close in 'answer' error: (%d) %s\n", (int)errno, strerror(errno) ); 78 return -ECONNRESET; 79 } 74 80 75 81 abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) ); … … 83 89 } 84 90 85 int answer_error( int fd, HttpCode code ) {86 /* paranoid */ assert( code < KNOWN_CODES && code != OK200 );87 int idx = (int)code;88 return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len );89 }91 // int answer_error( int fd, HttpCode code ) { 92 // /* paranoid */ assert( code < KNOWN_CODES && code != OK200 ); 93 // int idx = (int)code; 94 // return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len ); 95 // } 90 96 91 97 static int fill_header(char * it, size_t size) { … … 103 109 } 104 110 105 #if defined(PLAINTEXT_NOCOPY)106 int answer_plaintext( int fd ) {107 return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len); // +1 cause snprintf doesn't count nullterminator108 }109 #elif defined(PLAINTEXT_MEMCPY)110 #define TEXTSIZE 15111 int answer_plaintext( int fd ) {112 char text[] = "Hello, World!\n\n";113 char ts[] = xstr(TEXTSIZE) " \n\n";114 _Static_assert(sizeof(text) - 1 == TEXTSIZE);115 char buffer[512 + TEXTSIZE];116 char * it = buffer;117 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);118 it += http_msgs[OK200]->len;119 int len = http_msgs[OK200]->len;120 memcpy(it, ts, sizeof(ts) - 1);121 it += sizeof(ts) - 1;122 len += sizeof(ts) - 1;123 memcpy(it, text, TEXTSIZE);124 return answer(fd, buffer, len + TEXTSIZE);125 }126 #elif defined(PLAINTEXT_1WRITE)127 int answer_plaintext( int fd ) {128 char text[] = "Hello, World!\n\n";129 char buffer[512 + sizeof(text)];130 char * it = buffer;131 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);132 it += http_msgs[OK200]->len;133 int len = http_msgs[OK200]->len;134 int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text));135 it += r;136 len += r;137 memcpy(it, text, sizeof(text));138 return answer(fd, buffer, len + sizeof(text));139 }140 #else141 int answer_plaintext( int fd ) {142 char text[] = "Hello, World!\n\n";143 int ret = answer_header(fd, sizeof(text));144 if( ret < 0 ) return ret;145 return answer(fd, text, sizeof(text));146 }147 #endif148 149 int answer_empty( int fd ) {150 return answer_header(fd, 0);151 }111 // #if defined(PLAINTEXT_NOCOPY) 112 // int answer_plaintext( int fd ) { 113 // return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len); // +1 cause snprintf doesn't count nullterminator 114 // } 115 // #elif defined(PLAINTEXT_MEMCPY) 116 // #define TEXTSIZE 15 117 // int answer_plaintext( int fd ) { 118 // char text[] = "Hello, World!\n\n"; 119 // char ts[] = xstr(TEXTSIZE) " \n\n"; 120 // _Static_assert(sizeof(text) - 1 == TEXTSIZE); 121 // char buffer[512 + TEXTSIZE]; 122 // char * it = buffer; 123 // memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len); 124 // it += http_msgs[OK200]->len; 125 // int len = http_msgs[OK200]->len; 126 // memcpy(it, ts, sizeof(ts) - 1); 127 // it += sizeof(ts) - 1; 128 // len += sizeof(ts) - 1; 129 // memcpy(it, text, TEXTSIZE); 130 // return answer(fd, buffer, len + TEXTSIZE); 131 // } 132 // #elif defined(PLAINTEXT_1WRITE) 133 // int answer_plaintext( int fd ) { 134 // char text[] = "Hello, World!\n\n"; 135 // char buffer[512 + sizeof(text)]; 136 // char * it = buffer; 137 // memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len); 138 // it += http_msgs[OK200]->len; 139 // int len = http_msgs[OK200]->len; 140 // int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text)); 141 // it += r; 142 // len += r; 143 // memcpy(it, text, sizeof(text)); 144 // return answer(fd, buffer, len + sizeof(text)); 145 // } 146 // #else 147 // int answer_plaintext( int fd ) { 148 // char text[] = "Hello, World!\n\n"; 149 // int ret = answer_header(fd, sizeof(text)); 150 // if( ret < 0 ) return ret; 151 // return answer(fd, text, sizeof(text)); 152 // } 153 // #endif 154 155 // int answer_empty( int fd ) { 156 // return answer_header(fd, 0); 157 // } 152 158 153 159 static int sendfile( int pipe[2], int fd, int ans_fd, size_t count, sendfile_stats_t & stats ) { … … 161 167 if(zipf_idx < 0) mutex(serr) serr | "SENDFILE" | count | " greated than biggest zipf file"; 162 168 163 164 unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE; 165 off_t offset = 0; 166 ssize_t ret; 167 SPLICE1: while(count > 0) { 168 stats.tries++; 169 // ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY); 170 ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags); 171 if( ret <= 0 ) { 172 if( errno == ECONNRESET ) return -ECONNRESET; 173 if( errno == EPIPE ) return -EPIPE; 174 abort( "splice [0] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) ); 175 } 176 count -= ret; 177 stats.splcin++; 178 if(count > 0) stats.avgrd[zipf_idx].calls++; 179 stats.avgrd[zipf_idx].bytes += ret; 180 181 size_t in_pipe = ret; 182 SPLICE2: while(in_pipe > 0) { 183 // ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY); 184 ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags); 169 #if defined(TRUE_SENDFILE) 170 off_t offset = 0; 171 ssize_t ret; 172 int flags = fcntl(fd, F_GETFL); 173 if(flags < 0) abort("getfl in 'true sendfile' error: (%d) %s\n", (int)errno, strerror(errno) ); 174 ret = fcntl(fd, F_SETFL, flags | O_NONBLOCK); 175 if(ret < 0) abort("setfl in 'true sendfile' error: (%d) %s\n", (int)errno, strerror(errno) ); 176 177 while(count) { 178 ret = sendfile(fd, ans_fd, &offset, count); 185 179 if( ret <= 0 ) { 186 if( errno == ECONNRESET ) return -ECONNRESET; 187 if( errno == EPIPE ) return -EPIPE; 188 abort( "splice [1] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) ); 180 if( errno == EAGAIN || errno == EWOULDBLOCK ) { 181 stats.eagain++; 182 yield(); 183 continue; 184 } 185 if( errno == ECONNRESET || errno == EPIPE ) { 186 ret = close(fd); 187 if( ret != 0 ) abort( "close in 'true sendfile' error: (%d) %s\n", (int)errno, strerror(errno) ); 188 return -ECONNRESET; 189 } 190 abort( "sendfile error: %d (%d) %s\n", ret, (int)errno, strerror(errno) ); 189 191 } 190 stats.splcot++; 191 in_pipe -= ret; 192 } 193 194 } 192 count -= ret; 193 stats.splcin++; 194 if(count > 0) stats.avgrd[zipf_idx].calls++; 195 stats.avgrd[zipf_idx].bytes += ret; 196 } 197 198 ret = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); 199 if(ret < 0) abort("resetfl in 'true sendfile' error: (%d) %s\n", (int)errno, strerror(errno) ); 200 #else 201 #error not implemented 202 // unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE; 203 // off_t offset = 0; 204 // ssize_t ret; 205 // SPLICE1: while(count > 0) { 206 // stats.tries++; 207 // // ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY); 208 // ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags); 209 // if( ret <= 0 ) { 210 // if( errno == ECONNRESET || errno == EPIPE ) { 211 // ret = close(fd); 212 // if( ret != 0 ) abort( "close in 'sendfile splice in' error: (%d) %s\n", (int)errno, strerror(errno) ); 213 // return -ECONNRESET; 214 // } 215 // abort( "splice [0] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) ); 216 // } 217 // count -= ret; 218 // stats.splcin++; 219 // if(count > 0) stats.avgrd[zipf_idx].calls++; 220 // stats.avgrd[zipf_idx].bytes += ret; 221 222 // size_t in_pipe = ret; 223 // SPLICE2: while(in_pipe > 0) { 224 // ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY); 225 // // ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags); 226 // if( ret <= 0 ) { 227 // if( errno == ECONNRESET || errno == EPIPE ) { 228 // ret = close(fd); 229 // if( ret != 0 ) abort( "close in 'sendfile splice out' error: (%d) %s\n", (int)errno, strerror(errno) ); 230 // return -ECONNRESET; 231 // } 232 // abort( "splice [1] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) ); 233 // } 234 // stats.splcot++; 235 // in_pipe -= ret; 236 // } 237 238 // } 239 #endif 240 195 241 return count; 196 242 } 197 243 198 enum FSM_STATE {199 Initial,200 Retry,201 Error,202 Done,203 };204 205 struct FSM_Result {206 FSM_STATE state;207 int error;208 };209 210 static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; }211 static inline bool is_error(FSM_Result & this) { return Error == this.state; }212 static inline bool is_done(FSM_Result & this) { return Done == this.state; }213 214 static inline int error(FSM_Result & this, int error) {215 this.error = error;216 this.state = Error;217 return error;218 }219 220 static inline int done(FSM_Result & this) {221 this.state = Done;222 return 0;223 }224 225 static inline int retry(FSM_Result & this) {226 this.state = Retry;227 return 0;228 }229 230 static inline int need(FSM_Result & this) {231 switch(this.state) {232 case Initial:233 case Retry:234 return 1;235 case Error:236 if(this.error == 0) mutex(serr) serr | "State marked error but code is 0";237 case Done:238 return 0;239 }240 }241 242 // Generator that handles sending the header243 generator header_g {244 io_future_t f;245 const char * next;246 int fd; size_t len;247 FSM_Result res;248 };249 250 static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) {251 this.next = it;252 this.fd = fd;253 this.len = len;254 }255 256 static inline void fill(header_g & this, struct io_uring_sqe * sqe) {257 zero_sqe(sqe);258 sqe->opcode = IORING_OP_SEND;259 sqe->user_data = (uintptr_t)&this.f;260 sqe->flags = IOSQE_IO_LINK;261 sqe->fd = this.fd;262 sqe->addr = (uintptr_t)this.next;263 sqe->len = this.len;264 }265 266 static inline int error(header_g & this, int error) {267 int ret = close(this.fd);268 if( ret != 0 ) {269 mutex(serr) serr | "Failed to close fd" | errno;270 }271 return error(this.res, error);272 }273 274 static inline int wait_and_process(header_g & this, sendfile_stats_t & stats) {275 wait(this.f);276 277 // Did something crazy happen?278 if(this.f.result > this.len) {279 mutex(serr) serr | "HEADER sent too much!";280 return error(this, -ERANGE);281 }282 283 // Something failed?284 if(this.f.result < 0) {285 int error = -this.f.result;286 if( error == ECONNRESET ) return error(this, -ECONNRESET);287 if( error == EPIPE ) return error(this, -EPIPE);288 if( error == ECANCELED ) {289 mutex(serr) serr | "HEADER was cancelled, WTF!";290 return error(this, -ECONNRESET);291 }292 if( error == EAGAIN || error == EWOULDBLOCK) {293 mutex(serr) serr | "HEADER got eagain, WTF!";294 return error(this, -ECONNRESET);295 }296 }297 298 // Done?299 if(this.f.result == this.len) {300 return done(this.res);301 }302 303 stats.header++;304 305 // It must be a Short read306 this.len -= this.f.result;307 this.next += this.f.result;308 reset(this.f);309 return retry(this.res);310 }311 312 // Generator that handles splicing in a file313 struct splice_in_t {314 io_future_t f;315 int fd; int pipe; size_t len; off_t off;316 short zipf_idx;317 FSM_Result res;318 };319 320 static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) {321 this.fd = fd;322 this.pipe = pipe;323 this.len = len;324 this.off = 0;325 this.zipf_idx = -1;326 STATS: for(i; zipf_cnts) {327 if(len <= zipf_sizes[i]) {328 this.zipf_idx = i;329 break STATS;330 }331 }332 if(this.zipf_idx < 0) mutex(serr) serr | "SPLICE IN" | len | " greated than biggest zipf file";333 }334 335 static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) {336 zero_sqe(sqe);337 sqe->opcode = IORING_OP_SPLICE;338 sqe->user_data = (uintptr_t)&this.f;339 sqe->flags = 0;340 sqe->splice_fd_in = this.fd;341 sqe->splice_off_in = this.off;342 sqe->fd = this.pipe;343 sqe->off = (__u64)-1;344 sqe->len = this.len;345 sqe->splice_flags = SPLICE_F_MOVE;346 }347 348 static inline int wait_and_process(splice_in_t & this, sendfile_stats_t & stats ) {349 wait(this.f);350 351 // Something failed?352 if(this.f.result < 0) {353 int error = -this.f.result;354 if( error == ECONNRESET ) return error(this.res, -ECONNRESET);355 if( error == EPIPE ) return error(this.res, -EPIPE);356 if( error == ECANCELED ) {357 mutex(serr) serr | "SPLICE IN was cancelled, WTF!";358 return error(this.res, -ECONNRESET);359 }360 if( error == EAGAIN || error == EWOULDBLOCK) {361 mutex(serr) serr | "SPLICE IN got eagain, WTF!";362 return error(this.res, -ECONNRESET);363 }364 mutex(serr) serr | "SPLICE IN got" | error | ", WTF!";365 return error(this.res, -ECONNRESET);366 }367 368 // Did something crazy happen?369 if(this.f.result > this.len) {370 mutex(serr) serr | "SPLICE IN spliced too much!";371 return error(this.res, -ERANGE);372 }373 374 // Done?375 if(this.f.result == this.len) {376 return done(this.res);377 }378 379 stats.splcin++;380 stats.avgrd[this.zipf_idx].calls++;381 stats.avgrd[this.zipf_idx].bytes += this.f.result;382 383 // It must be a Short read384 this.len -= this.f.result;385 this.off += this.f.result;386 reset(this.f);387 return retry(this.res);388 }389 390 generator splice_out_g {391 io_future_t f;392 int pipe; int fd; size_t len;393 FSM_Result res;394 };395 396 static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) {397 this.pipe = pipe;398 this.fd = fd;399 this.len = len;400 }401 402 static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) {403 zero_sqe(sqe);404 sqe->opcode = IORING_OP_SPLICE;405 sqe->user_data = (uintptr_t)&this.f;406 sqe->flags = 0;407 sqe->splice_fd_in = this.pipe;408 sqe->splice_off_in = (__u64)-1;409 sqe->fd = this.fd;410 sqe->off = (__u64)-1;411 sqe->len = this.len;412 sqe->splice_flags = SPLICE_F_MOVE;413 }414 415 static inline int error(splice_out_g & this, int error) {416 int ret = close(this.fd);417 if( ret != 0 ) {418 mutex(serr) serr | "Failed to close fd" | errno;419 }420 return error(this.res, error);421 }422 423 static inline void wait_and_process(splice_out_g & this, sendfile_stats_t & stats ) {424 wait(this.f);425 426 // Something failed?427 if(this.f.result < 0) {428 int error = -this.f.result;429 if( error == ECONNRESET ) return error(this, -ECONNRESET);430 if( error == EPIPE ) return error(this, -EPIPE);431 if( error == ECANCELED ) {432 this.f.result = 0;433 goto SHORT_WRITE;434 }435 if( error == EAGAIN || error == EWOULDBLOCK) {436 mutex(serr) serr | "SPLICE OUT got eagain, WTF!";437 return error(this, -ECONNRESET);438 }439 mutex(serr) serr | "SPLICE OUT got" | error | ", WTF!";440 return error(this, -ECONNRESET);441 }442 443 // Did something crazy happen?444 if(this.f.result > this.len) {445 mutex(serr) serr | "SPLICE OUT spliced too much!" | this.f.result | ">" | this.len;446 return error(this.res, -ERANGE);447 }448 449 // Done?450 if(this.f.result == this.len) {451 return done(this.res);452 }453 454 SHORT_WRITE:455 stats.splcot++;456 457 // It must be a Short Write458 this.len -= this.f.result;459 reset(this.f);460 return retry(this.res);461 }244 // enum FSM_STATE { 245 // Initial, 246 // Retry, 247 // Error, 248 // Done, 249 // }; 250 251 // struct FSM_Result { 252 // FSM_STATE state; 253 // int error; 254 // }; 255 256 // static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; } 257 // static inline bool is_error(FSM_Result & this) { return Error == this.state; } 258 // static inline bool is_done(FSM_Result & this) { return Done == this.state; } 259 260 // static inline int error(FSM_Result & this, int error) { 261 // this.error = error; 262 // this.state = Error; 263 // return error; 264 // } 265 266 // static inline int done(FSM_Result & this) { 267 // this.state = Done; 268 // return 0; 269 // } 270 271 // static inline int retry(FSM_Result & this) { 272 // this.state = Retry; 273 // return 0; 274 // } 275 276 // static inline int need(FSM_Result & this) { 277 // switch(this.state) { 278 // case Initial: 279 // case Retry: 280 // return 1; 281 // case Error: 282 // if(this.error == 0) mutex(serr) serr | "State marked error but code is 0"; 283 // case Done: 284 // return 0; 285 // } 286 // } 287 288 // // Generator that handles sending the header 289 // generator header_g { 290 // io_future_t f; 291 // const char * next; 292 // int fd; size_t len; 293 // FSM_Result res; 294 // }; 295 296 // static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) { 297 // this.next = it; 298 // this.fd = fd; 299 // this.len = len; 300 // } 301 302 // static inline void fill(header_g & this, struct io_uring_sqe * sqe) { 303 // zero_sqe(sqe); 304 // sqe->opcode = IORING_OP_SEND; 305 // sqe->user_data = (uintptr_t)&this.f; 306 // sqe->flags = IOSQE_IO_LINK; 307 // sqe->fd = this.fd; 308 // sqe->addr = (uintptr_t)this.next; 309 // sqe->len = this.len; 310 // } 311 312 // static inline int error(header_g & this, int error) { 313 // int ret = close(this.fd); 314 // if( ret != 0 ) { 315 // mutex(serr) serr | "Failed to close fd" | errno; 316 // } 317 // return error(this.res, error); 318 // } 319 320 // static inline int wait_and_process(header_g & this, sendfile_stats_t & stats) { 321 // wait(this.f); 322 323 // // Did something crazy happen? 324 // if(this.f.result > this.len) { 325 // mutex(serr) serr | "HEADER sent too much!"; 326 // return error(this, -ERANGE); 327 // } 328 329 // // Something failed? 330 // if(this.f.result < 0) { 331 // int error = -this.f.result; 332 // if( error == ECONNRESET ) return error(this, -ECONNRESET); 333 // if( error == EPIPE ) return error(this, -EPIPE); 334 // if( error == ECANCELED ) { 335 // mutex(serr) serr | "HEADER was cancelled, WTF!"; 336 // return error(this, -ECONNRESET); 337 // } 338 // if( error == EAGAIN || error == EWOULDBLOCK) { 339 // mutex(serr) serr | "HEADER got eagain, WTF!"; 340 // return error(this, -ECONNRESET); 341 // } 342 // } 343 344 // // Done? 345 // if(this.f.result == this.len) { 346 // return done(this.res); 347 // } 348 349 // stats.header++; 350 351 // // It must be a Short read 352 // this.len -= this.f.result; 353 // this.next += this.f.result; 354 // reset(this.f); 355 // return retry(this.res); 356 // } 357 358 // // Generator that handles splicing in a file 359 // struct splice_in_t { 360 // io_future_t f; 361 // int fd; int pipe; size_t len; off_t off; 362 // short zipf_idx; 363 // FSM_Result res; 364 // }; 365 366 // static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) { 367 // this.fd = fd; 368 // this.pipe = pipe; 369 // this.len = len; 370 // this.off = 0; 371 // this.zipf_idx = -1; 372 // STATS: for(i; zipf_cnts) { 373 // if(len <= zipf_sizes[i]) { 374 // this.zipf_idx = i; 375 // break STATS; 376 // } 377 // } 378 // if(this.zipf_idx < 0) mutex(serr) serr | "SPLICE IN" | len | " greated than biggest zipf file"; 379 // } 380 381 // static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) { 382 // zero_sqe(sqe); 383 // sqe->opcode = IORING_OP_SPLICE; 384 // sqe->user_data = (uintptr_t)&this.f; 385 // sqe->flags = 0; 386 // sqe->splice_fd_in = this.fd; 387 // sqe->splice_off_in = this.off; 388 // sqe->fd = this.pipe; 389 // sqe->off = (__u64)-1; 390 // sqe->len = this.len; 391 // sqe->splice_flags = SPLICE_F_MOVE; 392 // } 393 394 // static inline int wait_and_process(splice_in_t & this, sendfile_stats_t & stats ) { 395 // wait(this.f); 396 397 // // Something failed? 398 // if(this.f.result < 0) { 399 // int error = -this.f.result; 400 // if( error == ECONNRESET ) return error(this.res, -ECONNRESET); 401 // if( error == EPIPE ) return error(this.res, -EPIPE); 402 // if( error == ECANCELED ) { 403 // mutex(serr) serr | "SPLICE IN was cancelled, WTF!"; 404 // return error(this.res, -ECONNRESET); 405 // } 406 // if( error == EAGAIN || error == EWOULDBLOCK) { 407 // mutex(serr) serr | "SPLICE IN got eagain, WTF!"; 408 // return error(this.res, -ECONNRESET); 409 // } 410 // mutex(serr) serr | "SPLICE IN got" | error | ", WTF!"; 411 // return error(this.res, -ECONNRESET); 412 // } 413 414 // // Did something crazy happen? 415 // if(this.f.result > this.len) { 416 // mutex(serr) serr | "SPLICE IN spliced too much!"; 417 // return error(this.res, -ERANGE); 418 // } 419 420 // // Done? 421 // if(this.f.result == this.len) { 422 // return done(this.res); 423 // } 424 425 // stats.splcin++; 426 // stats.avgrd[this.zipf_idx].calls++; 427 // stats.avgrd[this.zipf_idx].bytes += this.f.result; 428 429 // // It must be a Short read 430 // this.len -= this.f.result; 431 // this.off += this.f.result; 432 // reset(this.f); 433 // return retry(this.res); 434 // } 435 436 // generator splice_out_g { 437 // io_future_t f; 438 // int pipe; int fd; size_t len; 439 // FSM_Result res; 440 // }; 441 442 // static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) { 443 // this.pipe = pipe; 444 // this.fd = fd; 445 // this.len = len; 446 // } 447 448 // static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) { 449 // zero_sqe(sqe); 450 // sqe->opcode = IORING_OP_SPLICE; 451 // sqe->user_data = (uintptr_t)&this.f; 452 // sqe->flags = 0; 453 // sqe->splice_fd_in = this.pipe; 454 // sqe->splice_off_in = (__u64)-1; 455 // sqe->fd = this.fd; 456 // sqe->off = (__u64)-1; 457 // sqe->len = this.len; 458 // sqe->splice_flags = SPLICE_F_MOVE; 459 // } 460 461 // static inline int error(splice_out_g & this, int error) { 462 // int ret = close(this.fd); 463 // if( ret != 0 ) { 464 // mutex(serr) serr | "Failed to close fd" | errno; 465 // } 466 // return error(this.res, error); 467 // } 468 469 // static inline void wait_and_process(splice_out_g & this, sendfile_stats_t & stats ) { 470 // wait(this.f); 471 472 // // Something failed? 473 // if(this.f.result < 0) { 474 // int error = -this.f.result; 475 // if( error == ECONNRESET ) return error(this, -ECONNRESET); 476 // if( error == EPIPE ) return error(this, -EPIPE); 477 // if( error == ECANCELED ) { 478 // this.f.result = 0; 479 // goto SHORT_WRITE; 480 // } 481 // if( error == EAGAIN || error == EWOULDBLOCK) { 482 // mutex(serr) serr | "SPLICE OUT got eagain, WTF!"; 483 // return error(this, -ECONNRESET); 484 // } 485 // mutex(serr) serr | "SPLICE OUT got" | error | ", WTF!"; 486 // return error(this, -ECONNRESET); 487 // } 488 489 // // Did something crazy happen? 490 // if(this.f.result > this.len) { 491 // mutex(serr) serr | "SPLICE OUT spliced too much!" | this.f.result | ">" | this.len; 492 // return error(this.res, -ERANGE); 493 // } 494 495 // // Done? 496 // if(this.f.result == this.len) { 497 // return done(this.res); 498 // } 499 500 // SHORT_WRITE: 501 // stats.splcot++; 502 503 // // It must be a Short Write 504 // this.len -= this.f.result; 505 // reset(this.f); 506 // return retry(this.res); 507 // } 462 508 463 509 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize, sendfile_stats_t & stats ) { 464 510 stats.calls++; 465 511 #if defined(LINKED_IO) 466 char buffer[512]; 467 int len = fill_header(buffer, fsize); 468 header_g header = { fd, buffer, len }; 469 splice_in_t splice_in = { ans_fd, pipe[1], fsize }; 470 splice_out_g splice_out = { pipe[0], fd, fsize }; 471 472 RETRY_LOOP: for() { 473 stats.tries++; 474 int have = need(header.res) + need(splice_in.res) + 1; 475 int idx = 0; 476 struct io_uring_sqe * sqes[3]; 477 __u32 idxs[3]; 478 struct io_context$ * ctx = cfa_io_allocate(sqes, idxs, have); 479 480 if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); } 481 if(need( header.res)) { fill(header , sqes[idx++]); } 482 fill(splice_out, sqes[idx]); 483 484 // Submit everything 485 asm volatile("": : :"memory"); 486 cfa_io_submit( ctx, idxs, have, false ); 487 488 // wait for the results 489 // Always wait for splice-in to complete as 490 // we may need to kill the connection if it fails 491 // If it already completed, this is a no-op 492 wait_and_process(splice_in, stats); 493 494 if(is_error(splice_in.res)) { 495 if(splice_in.res.error == -EPIPE) return -ECONNRESET; 496 mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error; 497 close(fd); 498 } 499 500 // Process the other 2 501 wait_and_process(header, stats); 502 wait_and_process(splice_out, stats); 503 504 if(is_done(splice_out.res)) { 505 break RETRY_LOOP; 506 } 507 508 // We need to wait for the completion if 509 // - both completed 510 // - the header failed 511 // - 512 513 if( is_error(header.res) 514 || is_error(splice_in.res) 515 || is_error(splice_out.res)) { 516 return -ECONNRESET; 517 } 518 } 519 520 return len + fsize; 512 // char buffer[512]; 513 // int len = fill_header(buffer, fsize); 514 // header_g header = { fd, buffer, len }; 515 // splice_in_t splice_in = { ans_fd, pipe[1], fsize }; 516 // splice_out_g splice_out = { pipe[0], fd, fsize }; 517 518 // RETRY_LOOP: for() { 519 // stats.tries++; 520 // int have = need(header.res) + need(splice_in.res) + 1; 521 // int idx = 0; 522 // struct io_uring_sqe * sqes[3]; 523 // __u32 idxs[3]; 524 // struct $io_context * ctx = cfa_io_allocate(sqes, idxs, have); 525 526 // if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); } 527 // if(need( header.res)) { fill(header , sqes[idx++]); } 528 // fill(splice_out, sqes[idx]); 529 530 // // Submit everything 531 // asm volatile("": : :"memory"); 532 // cfa_io_submit( ctx, idxs, have, false ); 533 534 // // wait for the results 535 // // Always wait for splice-in to complete as 536 // // we may need to kill the connection if it fails 537 // // If it already completed, this is a no-op 538 // wait_and_process(splice_in, stats); 539 540 // if(is_error(splice_in.res)) { 541 // if(splice_in.res.error == -EPIPE) return -ECONNRESET; 542 // mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error; 543 // int ret = close(fd); 544 // if( ret != 0 ) abort( "close in 'answer sendfile' error: (%d) %s\n", (int)errno, strerror(errno) ); 545 // } 546 547 // // Process the other 2 548 // wait_and_process(header, stats); 549 // wait_and_process(splice_out, stats); 550 551 // if(is_done(splice_out.res)) { 552 // break RETRY_LOOP; 553 // } 554 555 // // We need to wait for the completion if 556 // // - both completed 557 // // - the header failed 558 // // - 559 560 // if( is_error(header.res) 561 // || is_error(splice_in.res) 562 // || is_error(splice_out.res)) { 563 // return -ECONNRESET; 564 // } 565 // } 566 567 // return len + fsize; 521 568 #else 522 569 int ret = answer_header(fd, fsize); 523 if( ret < 0 ) { close(fd);return ret; }570 if( ret < 0 ) { return ret; } 524 571 return sendfile(pipe, fd, ans_fd, fsize, stats); 525 572 #endif … … 541 588 } 542 589 // int ret = read(fd, (void*)it, count); 543 if(ret == 0 ) { close(fd); return [OK200, true, 0, 0]; } 590 if(ret == 0 ) { 591 ret = close(fd); 592 if( ret != 0 ) abort( "close in 'http read good' error: (%d) %s\n", (int)errno, strerror(errno) ); 593 return [OK200, true, 0, 0]; 594 } 544 595 if(ret < 0 ) { 545 if( errno == ECONNRESET ) { close(fd); return [E408, true, 0, 0]; } 546 if( errno == EPIPE ) { close(fd); return [E408, true, 0, 0]; } 596 if( errno == ECONNRESET || errno == EPIPE ) { 597 ret = close(fd); 598 if( ret != 0 ) abort( "close in 'http read bad' error: (%d) %s\n", (int)errno, strerror(errno) ); 599 return [E408, true, 0, 0]; 600 } 547 601 abort( "read error: (%d) %s\n", (int)errno, strerror(errno) ); 548 602 }
Note: See TracChangeset
for help on using the changeset viewer.