Changes in benchmark/io/http/protocol.cfa [32d1383:8bee858]
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/protocol.cfa
r32d1383 r8bee858 18 18 extern "C" { 19 19 int snprintf ( char * s, size_t n, const char * format, ... ); 20 ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);21 20 // #include <linux/io_uring.h> 22 21 } … … 31 30 #define PLAINTEXT_NOCOPY 32 31 // #define LINKED_IO 33 #define TRUE_SENDFILE34 32 35 33 static inline __s32 wait_res( io_future_t & this ) { … … 73 71 int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY); 74 72 if( ret < 0 ) { 75 if( errno == ECONNRESET || errno == EPIPE || errno == EBADF ) { 76 ret = close(fd); 77 if( ret != 0 ) abort( "close in 'answer' error: (%d) %s\n", (int)errno, strerror(errno) ); 78 return -ECONNRESET; 79 } 73 if( errno == ECONNRESET || errno == EPIPE ) { close(fd); return -ECONNRESET; } 80 74 81 75 abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) ); … … 89 83 } 90 84 91 //int answer_error( int fd, HttpCode code ) {92 ///* paranoid */ assert( code < KNOWN_CODES && code != OK200 );93 //int idx = (int)code;94 //return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len );95 //}85 int answer_error( int fd, HttpCode code ) { 86 /* paranoid */ assert( code < KNOWN_CODES && code != OK200 ); 87 int idx = (int)code; 88 return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len ); 89 } 96 90 97 91 static int fill_header(char * it, size_t size) { … … 109 103 } 110 104 111 //#if defined(PLAINTEXT_NOCOPY)112 //int answer_plaintext( int fd ) {113 //return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len); // +1 cause snprintf doesn't count nullterminator114 //}115 //#elif defined(PLAINTEXT_MEMCPY)116 //#define TEXTSIZE 15117 //int answer_plaintext( int fd ) {118 //char text[] = "Hello, World!\n\n";119 //char ts[] = xstr(TEXTSIZE) " \n\n";120 //_Static_assert(sizeof(text) - 1 == TEXTSIZE);121 //char buffer[512 + TEXTSIZE];122 //char * it = buffer;123 //memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);124 //it += http_msgs[OK200]->len;125 //int len = http_msgs[OK200]->len;126 //memcpy(it, ts, sizeof(ts) - 1);127 //it += sizeof(ts) - 1;128 //len += sizeof(ts) - 1;129 //memcpy(it, text, TEXTSIZE);130 //return answer(fd, buffer, len + TEXTSIZE);131 //}132 //#elif defined(PLAINTEXT_1WRITE)133 //int answer_plaintext( int fd ) {134 //char text[] = "Hello, World!\n\n";135 //char buffer[512 + sizeof(text)];136 //char * it = buffer;137 //memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);138 //it += http_msgs[OK200]->len;139 //int len = http_msgs[OK200]->len;140 //int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text));141 //it += r;142 //len += r;143 //memcpy(it, text, sizeof(text));144 //return answer(fd, buffer, len + sizeof(text));145 //}146 //#else147 //int answer_plaintext( int fd ) {148 //char text[] = "Hello, World!\n\n";149 //int ret = answer_header(fd, sizeof(text));150 //if( ret < 0 ) return ret;151 //return answer(fd, text, sizeof(text));152 //}153 //#endif154 155 //int answer_empty( int fd ) {156 //return answer_header(fd, 0);157 //}105 #if defined(PLAINTEXT_NOCOPY) 106 int answer_plaintext( int fd ) { 107 return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len); // +1 cause snprintf doesn't count nullterminator 108 } 109 #elif defined(PLAINTEXT_MEMCPY) 110 #define TEXTSIZE 15 111 int answer_plaintext( int fd ) { 112 char text[] = "Hello, World!\n\n"; 113 char ts[] = xstr(TEXTSIZE) " \n\n"; 114 _Static_assert(sizeof(text) - 1 == TEXTSIZE); 115 char buffer[512 + TEXTSIZE]; 116 char * it = buffer; 117 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len); 118 it += http_msgs[OK200]->len; 119 int len = http_msgs[OK200]->len; 120 memcpy(it, ts, sizeof(ts) - 1); 121 it += sizeof(ts) - 1; 122 len += sizeof(ts) - 1; 123 memcpy(it, text, TEXTSIZE); 124 return answer(fd, buffer, len + TEXTSIZE); 125 } 126 #elif defined(PLAINTEXT_1WRITE) 127 int answer_plaintext( int fd ) { 128 char text[] = "Hello, World!\n\n"; 129 char buffer[512 + sizeof(text)]; 130 char * it = buffer; 131 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len); 132 it += http_msgs[OK200]->len; 133 int len = http_msgs[OK200]->len; 134 int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text)); 135 it += r; 136 len += r; 137 memcpy(it, text, sizeof(text)); 138 return answer(fd, buffer, len + sizeof(text)); 139 } 140 #else 141 int answer_plaintext( int fd ) { 142 char text[] = "Hello, World!\n\n"; 143 int ret = answer_header(fd, sizeof(text)); 144 if( ret < 0 ) return ret; 145 return answer(fd, text, sizeof(text)); 146 } 147 #endif 148 149 int answer_empty( int fd ) { 150 return answer_header(fd, 0); 151 } 158 152 159 153 static int sendfile( int pipe[2], int fd, int ans_fd, size_t count, sendfile_stats_t & stats ) { … … 167 161 if(zipf_idx < 0) mutex(serr) serr | "SENDFILE" | count | " greated than biggest zipf file"; 168 162 169 #if defined(TRUE_SENDFILE) 170 off_t offset = 0; 171 ssize_t ret; 172 int flags = fcntl(fd, F_GETFL); 173 if(flags < 0) abort("getfl in 'true sendfile' error: (%d) %s\n", (int)errno, strerror(errno) ); 174 ret = fcntl(fd, F_SETFL, flags | O_NONBLOCK); 175 if(ret < 0) abort("setfl in 'true sendfile' error: (%d) %s\n", (int)errno, strerror(errno) ); 176 177 while(count) { 178 ret = sendfile(fd, ans_fd, &offset, count); 163 164 unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE; 165 off_t offset = 0; 166 ssize_t ret; 167 SPLICE1: while(count > 0) { 168 stats.tries++; 169 // ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY); 170 ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags); 171 if( ret <= 0 ) { 172 if( errno == ECONNRESET ) return -ECONNRESET; 173 if( errno == EPIPE ) return -EPIPE; 174 abort( "splice [0] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) ); 175 } 176 count -= ret; 177 stats.splcin++; 178 if(count > 0) stats.avgrd[zipf_idx].calls++; 179 stats.avgrd[zipf_idx].bytes += ret; 180 181 size_t in_pipe = ret; 182 SPLICE2: while(in_pipe > 0) { 183 // ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY); 184 ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags); 179 185 if( ret <= 0 ) { 180 if( errno == EAGAIN || errno == EWOULDBLOCK ) { 181 stats.eagain++; 182 yield(); 183 continue; 184 } 185 if( errno == ECONNRESET || errno == EPIPE ) { 186 ret = close(fd); 187 if( ret != 0 ) abort( "close in 'true sendfile' error: (%d) %s\n", (int)errno, strerror(errno) ); 188 return -ECONNRESET; 189 } 190 abort( "sendfile error: %d (%d) %s\n", ret, (int)errno, strerror(errno) ); 186 if( errno == ECONNRESET ) return -ECONNRESET; 187 if( errno == EPIPE ) return -EPIPE; 188 abort( "splice [1] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) ); 191 189 } 192 count -= ret; 193 stats.splcin++; 194 if(count > 0) stats.avgrd[zipf_idx].calls++; 195 stats.avgrd[zipf_idx].bytes += ret; 196 } 197 198 ret = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); 199 if(ret < 0) abort("resetfl in 'true sendfile' error: (%d) %s\n", (int)errno, strerror(errno) ); 200 #else 201 #error not implemented 202 // unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE; 203 // off_t offset = 0; 204 // ssize_t ret; 205 // SPLICE1: while(count > 0) { 206 // stats.tries++; 207 // // ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY); 208 // ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags); 209 // if( ret <= 0 ) { 210 // if( errno == ECONNRESET || errno == EPIPE ) { 211 // ret = close(fd); 212 // if( ret != 0 ) abort( "close in 'sendfile splice in' error: (%d) %s\n", (int)errno, strerror(errno) ); 213 // return -ECONNRESET; 214 // } 215 // abort( "splice [0] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) ); 216 // } 217 // count -= ret; 218 // stats.splcin++; 219 // if(count > 0) stats.avgrd[zipf_idx].calls++; 220 // stats.avgrd[zipf_idx].bytes += ret; 221 222 // size_t in_pipe = ret; 223 // SPLICE2: while(in_pipe > 0) { 224 // ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY); 225 // // ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags); 226 // if( ret <= 0 ) { 227 // if( errno == ECONNRESET || errno == EPIPE ) { 228 // ret = close(fd); 229 // if( ret != 0 ) abort( "close in 'sendfile splice out' error: (%d) %s\n", (int)errno, strerror(errno) ); 230 // return -ECONNRESET; 231 // } 232 // abort( "splice [1] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) ); 233 // } 234 // stats.splcot++; 235 // in_pipe -= ret; 236 // } 237 238 // } 239 #endif 240 190 stats.splcot++; 191 in_pipe -= ret; 192 } 193 194 } 241 195 return count; 242 196 } 243 197 244 //enum FSM_STATE {245 //Initial,246 //Retry,247 //Error,248 //Done,249 //};250 251 //struct FSM_Result {252 //FSM_STATE state;253 //int error;254 //};255 256 //static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; }257 //static inline bool is_error(FSM_Result & this) { return Error == this.state; }258 //static inline bool is_done(FSM_Result & this) { return Done == this.state; }259 260 //static inline int error(FSM_Result & this, int error) {261 //this.error = error;262 //this.state = Error;263 //return error;264 //}265 266 //static inline int done(FSM_Result & this) {267 //this.state = Done;268 //return 0;269 //}270 271 //static inline int retry(FSM_Result & this) {272 //this.state = Retry;273 //return 0;274 //}275 276 //static inline int need(FSM_Result & this) {277 //switch(this.state) {278 //case Initial:279 //case Retry:280 //return 1;281 //case Error:282 //if(this.error == 0) mutex(serr) serr | "State marked error but code is 0";283 //case Done:284 //return 0;285 //}286 //}287 288 // //Generator that handles sending the header289 //generator header_g {290 //io_future_t f;291 //const char * next;292 //int fd; size_t len;293 //FSM_Result res;294 //};295 296 //static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) {297 //this.next = it;298 //this.fd = fd;299 //this.len = len;300 //}301 302 //static inline void fill(header_g & this, struct io_uring_sqe * sqe) {303 //zero_sqe(sqe);304 //sqe->opcode = IORING_OP_SEND;305 //sqe->user_data = (uintptr_t)&this.f;306 //sqe->flags = IOSQE_IO_LINK;307 //sqe->fd = this.fd;308 //sqe->addr = (uintptr_t)this.next;309 //sqe->len = this.len;310 //}311 312 //static inline int error(header_g & this, int error) {313 //int ret = close(this.fd);314 //if( ret != 0 ) {315 //mutex(serr) serr | "Failed to close fd" | errno;316 //}317 //return error(this.res, error);318 //}319 320 //static inline int wait_and_process(header_g & this, sendfile_stats_t & stats) {321 //wait(this.f);322 323 //// Did something crazy happen?324 //if(this.f.result > this.len) {325 //mutex(serr) serr | "HEADER sent too much!";326 //return error(this, -ERANGE);327 //}328 329 //// Something failed?330 //if(this.f.result < 0) {331 //int error = -this.f.result;332 //if( error == ECONNRESET ) return error(this, -ECONNRESET);333 //if( error == EPIPE ) return error(this, -EPIPE);334 //if( error == ECANCELED ) {335 //mutex(serr) serr | "HEADER was cancelled, WTF!";336 //return error(this, -ECONNRESET);337 //}338 //if( error == EAGAIN || error == EWOULDBLOCK) {339 //mutex(serr) serr | "HEADER got eagain, WTF!";340 //return error(this, -ECONNRESET);341 //}342 //}343 344 //// Done?345 //if(this.f.result == this.len) {346 //return done(this.res);347 //}348 349 //stats.header++;350 351 //// It must be a Short read352 //this.len -= this.f.result;353 //this.next += this.f.result;354 //reset(this.f);355 //return retry(this.res);356 //}357 358 // //Generator that handles splicing in a file359 //struct splice_in_t {360 //io_future_t f;361 //int fd; int pipe; size_t len; off_t off;362 //short zipf_idx;363 //FSM_Result res;364 //};365 366 //static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) {367 //this.fd = fd;368 //this.pipe = pipe;369 //this.len = len;370 //this.off = 0;371 //this.zipf_idx = -1;372 //STATS: for(i; zipf_cnts) {373 //if(len <= zipf_sizes[i]) {374 //this.zipf_idx = i;375 //break STATS;376 //}377 //}378 //if(this.zipf_idx < 0) mutex(serr) serr | "SPLICE IN" | len | " greated than biggest zipf file";379 //}380 381 //static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) {382 //zero_sqe(sqe);383 //sqe->opcode = IORING_OP_SPLICE;384 //sqe->user_data = (uintptr_t)&this.f;385 //sqe->flags = 0;386 //sqe->splice_fd_in = this.fd;387 //sqe->splice_off_in = this.off;388 //sqe->fd = this.pipe;389 //sqe->off = (__u64)-1;390 //sqe->len = this.len;391 //sqe->splice_flags = SPLICE_F_MOVE;392 //}393 394 //static inline int wait_and_process(splice_in_t & this, sendfile_stats_t & stats ) {395 //wait(this.f);396 397 //// Something failed?398 //if(this.f.result < 0) {399 //int error = -this.f.result;400 //if( error == ECONNRESET ) return error(this.res, -ECONNRESET);401 //if( error == EPIPE ) return error(this.res, -EPIPE);402 //if( error == ECANCELED ) {403 //mutex(serr) serr | "SPLICE IN was cancelled, WTF!";404 //return error(this.res, -ECONNRESET);405 //}406 //if( error == EAGAIN || error == EWOULDBLOCK) {407 //mutex(serr) serr | "SPLICE IN got eagain, WTF!";408 //return error(this.res, -ECONNRESET);409 //}410 //mutex(serr) serr | "SPLICE IN got" | error | ", WTF!";411 //return error(this.res, -ECONNRESET);412 //}413 414 //// Did something crazy happen?415 //if(this.f.result > this.len) {416 //mutex(serr) serr | "SPLICE IN spliced too much!";417 //return error(this.res, -ERANGE);418 //}419 420 //// Done?421 //if(this.f.result == this.len) {422 //return done(this.res);423 //}424 425 //stats.splcin++;426 //stats.avgrd[this.zipf_idx].calls++;427 //stats.avgrd[this.zipf_idx].bytes += this.f.result;428 429 //// It must be a Short read430 //this.len -= this.f.result;431 //this.off += this.f.result;432 //reset(this.f);433 //return retry(this.res);434 //}435 436 //generator splice_out_g {437 //io_future_t f;438 //int pipe; int fd; size_t len;439 //FSM_Result res;440 //};441 442 //static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) {443 //this.pipe = pipe;444 //this.fd = fd;445 //this.len = len;446 //}447 448 //static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) {449 //zero_sqe(sqe);450 //sqe->opcode = IORING_OP_SPLICE;451 //sqe->user_data = (uintptr_t)&this.f;452 //sqe->flags = 0;453 //sqe->splice_fd_in = this.pipe;454 //sqe->splice_off_in = (__u64)-1;455 //sqe->fd = this.fd;456 //sqe->off = (__u64)-1;457 //sqe->len = this.len;458 //sqe->splice_flags = SPLICE_F_MOVE;459 //}460 461 //static inline int error(splice_out_g & this, int error) {462 //int ret = close(this.fd);463 //if( ret != 0 ) {464 //mutex(serr) serr | "Failed to close fd" | errno;465 //}466 //return error(this.res, error);467 //}468 469 //static inline void wait_and_process(splice_out_g & this, sendfile_stats_t & stats ) {470 //wait(this.f);471 472 //// Something failed?473 //if(this.f.result < 0) {474 //int error = -this.f.result;475 //if( error == ECONNRESET ) return error(this, -ECONNRESET);476 //if( error == EPIPE ) return error(this, -EPIPE);477 //if( error == ECANCELED ) {478 //this.f.result = 0;479 //goto SHORT_WRITE;480 //}481 //if( error == EAGAIN || error == EWOULDBLOCK) {482 //mutex(serr) serr | "SPLICE OUT got eagain, WTF!";483 //return error(this, -ECONNRESET);484 //}485 //mutex(serr) serr | "SPLICE OUT got" | error | ", WTF!";486 //return error(this, -ECONNRESET);487 //}488 489 //// Did something crazy happen?490 //if(this.f.result > this.len) {491 //mutex(serr) serr | "SPLICE OUT spliced too much!" | this.f.result | ">" | this.len;492 //return error(this.res, -ERANGE);493 //}494 495 //// Done?496 //if(this.f.result == this.len) {497 //return done(this.res);498 //}499 500 //SHORT_WRITE:501 //stats.splcot++;502 503 //// It must be a Short Write504 //this.len -= this.f.result;505 //reset(this.f);506 //return retry(this.res);507 //}198 enum FSM_STATE { 199 Initial, 200 Retry, 201 Error, 202 Done, 203 }; 204 205 struct FSM_Result { 206 FSM_STATE state; 207 int error; 208 }; 209 210 static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; } 211 static inline bool is_error(FSM_Result & this) { return Error == this.state; } 212 static inline bool is_done(FSM_Result & this) { return Done == this.state; } 213 214 static inline int error(FSM_Result & this, int error) { 215 this.error = error; 216 this.state = Error; 217 return error; 218 } 219 220 static inline int done(FSM_Result & this) { 221 this.state = Done; 222 return 0; 223 } 224 225 static inline int retry(FSM_Result & this) { 226 this.state = Retry; 227 return 0; 228 } 229 230 static inline int need(FSM_Result & this) { 231 switch(this.state) { 232 case Initial: 233 case Retry: 234 return 1; 235 case Error: 236 if(this.error == 0) mutex(serr) serr | "State marked error but code is 0"; 237 case Done: 238 return 0; 239 } 240 } 241 242 // Generator that handles sending the header 243 generator header_g { 244 io_future_t f; 245 const char * next; 246 int fd; size_t len; 247 FSM_Result res; 248 }; 249 250 static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) { 251 this.next = it; 252 this.fd = fd; 253 this.len = len; 254 } 255 256 static inline void fill(header_g & this, struct io_uring_sqe * sqe) { 257 zero_sqe(sqe); 258 sqe->opcode = IORING_OP_SEND; 259 sqe->user_data = (uintptr_t)&this.f; 260 sqe->flags = IOSQE_IO_LINK; 261 sqe->fd = this.fd; 262 sqe->addr = (uintptr_t)this.next; 263 sqe->len = this.len; 264 } 265 266 static inline int error(header_g & this, int error) { 267 int ret = close(this.fd); 268 if( ret != 0 ) { 269 mutex(serr) serr | "Failed to close fd" | errno; 270 } 271 return error(this.res, error); 272 } 273 274 static inline int wait_and_process(header_g & this, sendfile_stats_t & stats) { 275 wait(this.f); 276 277 // Did something crazy happen? 278 if(this.f.result > this.len) { 279 mutex(serr) serr | "HEADER sent too much!"; 280 return error(this, -ERANGE); 281 } 282 283 // Something failed? 284 if(this.f.result < 0) { 285 int error = -this.f.result; 286 if( error == ECONNRESET ) return error(this, -ECONNRESET); 287 if( error == EPIPE ) return error(this, -EPIPE); 288 if( error == ECANCELED ) { 289 mutex(serr) serr | "HEADER was cancelled, WTF!"; 290 return error(this, -ECONNRESET); 291 } 292 if( error == EAGAIN || error == EWOULDBLOCK) { 293 mutex(serr) serr | "HEADER got eagain, WTF!"; 294 return error(this, -ECONNRESET); 295 } 296 } 297 298 // Done? 299 if(this.f.result == this.len) { 300 return done(this.res); 301 } 302 303 stats.header++; 304 305 // It must be a Short read 306 this.len -= this.f.result; 307 this.next += this.f.result; 308 reset(this.f); 309 return retry(this.res); 310 } 311 312 // Generator that handles splicing in a file 313 struct splice_in_t { 314 io_future_t f; 315 int fd; int pipe; size_t len; off_t off; 316 short zipf_idx; 317 FSM_Result res; 318 }; 319 320 static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) { 321 this.fd = fd; 322 this.pipe = pipe; 323 this.len = len; 324 this.off = 0; 325 this.zipf_idx = -1; 326 STATS: for(i; zipf_cnts) { 327 if(len <= zipf_sizes[i]) { 328 this.zipf_idx = i; 329 break STATS; 330 } 331 } 332 if(this.zipf_idx < 0) mutex(serr) serr | "SPLICE IN" | len | " greated than biggest zipf file"; 333 } 334 335 static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) { 336 zero_sqe(sqe); 337 sqe->opcode = IORING_OP_SPLICE; 338 sqe->user_data = (uintptr_t)&this.f; 339 sqe->flags = 0; 340 sqe->splice_fd_in = this.fd; 341 sqe->splice_off_in = this.off; 342 sqe->fd = this.pipe; 343 sqe->off = (__u64)-1; 344 sqe->len = this.len; 345 sqe->splice_flags = SPLICE_F_MOVE; 346 } 347 348 static inline int wait_and_process(splice_in_t & this, sendfile_stats_t & stats ) { 349 wait(this.f); 350 351 // Something failed? 352 if(this.f.result < 0) { 353 int error = -this.f.result; 354 if( error == ECONNRESET ) return error(this.res, -ECONNRESET); 355 if( error == EPIPE ) return error(this.res, -EPIPE); 356 if( error == ECANCELED ) { 357 mutex(serr) serr | "SPLICE IN was cancelled, WTF!"; 358 return error(this.res, -ECONNRESET); 359 } 360 if( error == EAGAIN || error == EWOULDBLOCK) { 361 mutex(serr) serr | "SPLICE IN got eagain, WTF!"; 362 return error(this.res, -ECONNRESET); 363 } 364 mutex(serr) serr | "SPLICE IN got" | error | ", WTF!"; 365 return error(this.res, -ECONNRESET); 366 } 367 368 // Did something crazy happen? 369 if(this.f.result > this.len) { 370 mutex(serr) serr | "SPLICE IN spliced too much!"; 371 return error(this.res, -ERANGE); 372 } 373 374 // Done? 375 if(this.f.result == this.len) { 376 return done(this.res); 377 } 378 379 stats.splcin++; 380 stats.avgrd[this.zipf_idx].calls++; 381 stats.avgrd[this.zipf_idx].bytes += this.f.result; 382 383 // It must be a Short read 384 this.len -= this.f.result; 385 this.off += this.f.result; 386 reset(this.f); 387 return retry(this.res); 388 } 389 390 generator splice_out_g { 391 io_future_t f; 392 int pipe; int fd; size_t len; 393 FSM_Result res; 394 }; 395 396 static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) { 397 this.pipe = pipe; 398 this.fd = fd; 399 this.len = len; 400 } 401 402 static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) { 403 zero_sqe(sqe); 404 sqe->opcode = IORING_OP_SPLICE; 405 sqe->user_data = (uintptr_t)&this.f; 406 sqe->flags = 0; 407 sqe->splice_fd_in = this.pipe; 408 sqe->splice_off_in = (__u64)-1; 409 sqe->fd = this.fd; 410 sqe->off = (__u64)-1; 411 sqe->len = this.len; 412 sqe->splice_flags = SPLICE_F_MOVE; 413 } 414 415 static inline int error(splice_out_g & this, int error) { 416 int ret = close(this.fd); 417 if( ret != 0 ) { 418 mutex(serr) serr | "Failed to close fd" | errno; 419 } 420 return error(this.res, error); 421 } 422 423 static inline void wait_and_process(splice_out_g & this, sendfile_stats_t & stats ) { 424 wait(this.f); 425 426 // Something failed? 427 if(this.f.result < 0) { 428 int error = -this.f.result; 429 if( error == ECONNRESET ) return error(this, -ECONNRESET); 430 if( error == EPIPE ) return error(this, -EPIPE); 431 if( error == ECANCELED ) { 432 this.f.result = 0; 433 goto SHORT_WRITE; 434 } 435 if( error == EAGAIN || error == EWOULDBLOCK) { 436 mutex(serr) serr | "SPLICE OUT got eagain, WTF!"; 437 return error(this, -ECONNRESET); 438 } 439 mutex(serr) serr | "SPLICE OUT got" | error | ", WTF!"; 440 return error(this, -ECONNRESET); 441 } 442 443 // Did something crazy happen? 444 if(this.f.result > this.len) { 445 mutex(serr) serr | "SPLICE OUT spliced too much!" | this.f.result | ">" | this.len; 446 return error(this.res, -ERANGE); 447 } 448 449 // Done? 450 if(this.f.result == this.len) { 451 return done(this.res); 452 } 453 454 SHORT_WRITE: 455 stats.splcot++; 456 457 // It must be a Short Write 458 this.len -= this.f.result; 459 reset(this.f); 460 return retry(this.res); 461 } 508 462 509 463 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize, sendfile_stats_t & stats ) { 510 464 stats.calls++; 511 465 #if defined(LINKED_IO) 512 // char buffer[512]; 513 // int len = fill_header(buffer, fsize); 514 // header_g header = { fd, buffer, len }; 515 // splice_in_t splice_in = { ans_fd, pipe[1], fsize }; 516 // splice_out_g splice_out = { pipe[0], fd, fsize }; 517 518 // RETRY_LOOP: for() { 519 // stats.tries++; 520 // int have = need(header.res) + need(splice_in.res) + 1; 521 // int idx = 0; 522 // struct io_uring_sqe * sqes[3]; 523 // __u32 idxs[3]; 524 // struct $io_context * ctx = cfa_io_allocate(sqes, idxs, have); 525 526 // if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); } 527 // if(need( header.res)) { fill(header , sqes[idx++]); } 528 // fill(splice_out, sqes[idx]); 529 530 // // Submit everything 531 // asm volatile("": : :"memory"); 532 // cfa_io_submit( ctx, idxs, have, false ); 533 534 // // wait for the results 535 // // Always wait for splice-in to complete as 536 // // we may need to kill the connection if it fails 537 // // If it already completed, this is a no-op 538 // wait_and_process(splice_in, stats); 539 540 // if(is_error(splice_in.res)) { 541 // if(splice_in.res.error == -EPIPE) return -ECONNRESET; 542 // mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error; 543 // int ret = close(fd); 544 // if( ret != 0 ) abort( "close in 'answer sendfile' error: (%d) %s\n", (int)errno, strerror(errno) ); 545 // } 546 547 // // Process the other 2 548 // wait_and_process(header, stats); 549 // wait_and_process(splice_out, stats); 550 551 // if(is_done(splice_out.res)) { 552 // break RETRY_LOOP; 553 // } 554 555 // // We need to wait for the completion if 556 // // - both completed 557 // // - the header failed 558 // // - 559 560 // if( is_error(header.res) 561 // || is_error(splice_in.res) 562 // || is_error(splice_out.res)) { 563 // return -ECONNRESET; 564 // } 565 // } 566 567 // return len + fsize; 466 char buffer[512]; 467 int len = fill_header(buffer, fsize); 468 header_g header = { fd, buffer, len }; 469 splice_in_t splice_in = { ans_fd, pipe[1], fsize }; 470 splice_out_g splice_out = { pipe[0], fd, fsize }; 471 472 RETRY_LOOP: for() { 473 stats.tries++; 474 int have = need(header.res) + need(splice_in.res) + 1; 475 int idx = 0; 476 struct io_uring_sqe * sqes[3]; 477 __u32 idxs[3]; 478 struct io_context$ * ctx = cfa_io_allocate(sqes, idxs, have); 479 480 if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); } 481 if(need( header.res)) { fill(header , sqes[idx++]); } 482 fill(splice_out, sqes[idx]); 483 484 // Submit everything 485 asm volatile("": : :"memory"); 486 cfa_io_submit( ctx, idxs, have, false ); 487 488 // wait for the results 489 // Always wait for splice-in to complete as 490 // we may need to kill the connection if it fails 491 // If it already completed, this is a no-op 492 wait_and_process(splice_in, stats); 493 494 if(is_error(splice_in.res)) { 495 if(splice_in.res.error == -EPIPE) return -ECONNRESET; 496 mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error; 497 close(fd); 498 } 499 500 // Process the other 2 501 wait_and_process(header, stats); 502 wait_and_process(splice_out, stats); 503 504 if(is_done(splice_out.res)) { 505 break RETRY_LOOP; 506 } 507 508 // We need to wait for the completion if 509 // - both completed 510 // - the header failed 511 // - 512 513 if( is_error(header.res) 514 || is_error(splice_in.res) 515 || is_error(splice_out.res)) { 516 return -ECONNRESET; 517 } 518 } 519 520 return len + fsize; 568 521 #else 569 522 int ret = answer_header(fd, fsize); 570 if( ret < 0 ) { return ret; }523 if( ret < 0 ) { close(fd); return ret; } 571 524 return sendfile(pipe, fd, ans_fd, fsize, stats); 572 525 #endif … … 588 541 } 589 542 // int ret = read(fd, (void*)it, count); 590 if(ret == 0 ) { 591 ret = close(fd); 592 if( ret != 0 ) abort( "close in 'http read good' error: (%d) %s\n", (int)errno, strerror(errno) ); 593 return [OK200, true, 0, 0]; 594 } 543 if(ret == 0 ) { close(fd); return [OK200, true, 0, 0]; } 595 544 if(ret < 0 ) { 596 if( errno == ECONNRESET || errno == EPIPE ) { 597 ret = close(fd); 598 if( ret != 0 ) abort( "close in 'http read bad' error: (%d) %s\n", (int)errno, strerror(errno) ); 599 return [E408, true, 0, 0]; 600 } 545 if( errno == ECONNRESET ) { close(fd); return [E408, true, 0, 0]; } 546 if( errno == EPIPE ) { close(fd); return [E408, true, 0, 0]; } 601 547 abort( "read error: (%d) %s\n", (int)errno, strerror(errno) ); 602 548 }
Note: See TracChangeset
for help on using the changeset viewer.