Changeset 3f39009
- Timestamp:
- Oct 29, 2021, 5:21:35 PM (3 years ago)
- Branches:
- ADT, ast-experimental, enum, forall-pointer-decay, master, pthread-emulation, qualifiedEnum
- Children:
- 42b7fa5f
- Parents:
- 4087baf
- Location:
- benchmark/io/http
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/protocol.cfa
r4087baf r3f39009 11 11 #include <fstream.hfa> 12 12 #include <iofwd.hfa> 13 #include <io/types.hfa> 14 #include <mutex_stmt.hfa> 13 15 14 16 #include <assert.h> … … 26 28 #define PLAINTEXT_MEMCPY 27 29 #define PLAINTEXT_NOCOPY 30 #define LINKED_IO 28 31 29 32 struct https_msg_str { … … 53 56 } 54 57 55 static inline int answer( int fd, const char * it, int len ) {58 static inline int answer( int fd, const char * it, int len ) { 56 59 while(len > 0) { 57 60 // Call write 58 61 int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY); 59 62 if( ret < 0 ) { 60 if( errno == ECONNRESET || errno == EPIPE ) return -ECONNRESET;63 if( errno == ECONNRESET || errno == EPIPE ) { close(fd); return -ECONNRESET; } 61 64 if( errno == EAGAIN || errno == EWOULDBLOCK) return -EAGAIN; 62 65 … … 77 80 } 78 81 79 int answer_header( int fd, size_t size ) { 80 char buffer[512]; 81 char * it = buffer; 82 static int fill_header(char * it, size_t size) { 82 83 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len); 83 84 it += http_msgs[OK200]->len; 84 85 int len = http_msgs[OK200]->len; 85 86 len += snprintf(it, 512 - len, "%d \n\n", size); 87 return len; 88 } 89 90 static int answer_header( int fd, size_t size ) { 91 char buffer[512]; 92 int len = fill_header(buffer, size); 86 93 return answer( fd, buffer, len ); 87 94 } … … 135 142 } 136 143 137 138 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) { 139 char * it = buffer; 140 size_t count = len - 1; 141 int rlen = 0; 142 READ: 143 for() { 144 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY); 145 // int ret = read(fd, (void*)it, count); 146 if(ret == 0 ) return [OK200, true, 0, 0]; 147 if(ret < 0 ) { 148 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ; 149 if( errno == ECONNRESET ) return [E408, true, 0, 0]; 150 if( errno == EPIPE ) return [E408, true, 0, 0]; 151 abort( "read error: (%d) %s\n", (int)errno, strerror(errno) ); 152 } 153 it[ret + 1] = '\0'; 154 rlen += ret; 155 156 if( strstr( it, "\r\n\r\n" ) ) break; 157 158 it += ret; 159 count -= ret; 160 161 if( count < 1 ) return [E414, false, 0, 0]; 162 } 163 164 if( options.log ) { 165 write(sout, buffer, rlen); 166 sout | nl; 167 } 168 169 it = buffer; 170 int ret = memcmp(it, "GET /", 5); 171 if( ret != 0 ) return [E400, false, 0, 0]; 172 it += 5; 173 174 char * end = strstr( it, " " ); 175 return [OK200, false, it, end - it]; 176 } 177 178 int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) { 144 static int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) { 179 145 unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE; 180 146 off_t offset = 0; … … 207 173 } 208 174 175 static void zero_sqe(struct io_uring_sqe * sqe) { 176 sqe->flags = 0; 177 sqe->ioprio = 0; 178 sqe->fd = 0; 179 sqe->off = 0; 180 sqe->addr = 0; 181 sqe->len = 0; 182 sqe->fsync_flags = 0; 183 sqe->__pad2[0] = 0; 184 sqe->__pad2[1] = 0; 185 sqe->__pad2[2] = 0; 186 sqe->fd = 0; 187 sqe->off = 0; 188 sqe->addr = 0; 189 sqe->len = 0; 190 } 191 192 enum FSM_STATE { 193 Initial, 194 Retry, 195 Error, 196 Done, 197 }; 198 199 struct FSM_Result { 200 FSM_STATE state; 201 int error; 202 }; 203 204 static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; } 205 static inline bool is_error(FSM_Result & this) { return Error == this.state; } 206 static inline bool is_done(FSM_Result & this) { return Done == this.state; } 207 208 static inline int error(FSM_Result & this, int error) { 209 this.error = error; 210 this.state = Error; 211 return error; 212 } 213 214 static inline int done(FSM_Result & this) { 215 this.state = Done; 216 return 0; 217 } 218 219 static inline int retry(FSM_Result & this) { 220 this.state = Retry; 221 return 0; 222 } 223 224 static inline int need(FSM_Result & this) { 225 switch(this.state) { 226 case Initial: 227 case Retry: 228 return 1; 229 case Error: 230 if(this.error == 0) mutex(serr) serr | "State marked error but code is 0"; 231 case Done: 232 return 0; 233 } 234 } 235 236 // Generator that handles sending the header 237 generator header_g { 238 io_future_t f; 239 const char * next; 240 int fd; size_t len; 241 FSM_Result res; 242 }; 243 244 static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) { 245 this.next = it; 246 this.fd = fd; 247 this.len = len; 248 } 249 250 static inline void fill(header_g & this, struct io_uring_sqe * sqe) { 251 zero_sqe(sqe); 252 sqe->opcode = IORING_OP_SEND; 253 sqe->user_data = (uintptr_t)&this.f; 254 sqe->flags = IOSQE_IO_LINK; 255 sqe->fd = this.fd; 256 sqe->addr = (uintptr_t)this.next; 257 sqe->len = this.len; 258 } 259 260 static inline int error(header_g & this, int error) { 261 int ret = close(this.fd); 262 if( ret != 0 ) { 263 mutex(serr) serr | "Failed to close fd" | errno; 264 } 265 return error(this.res, error); 266 } 267 268 static inline int wait_and_process(header_g & this) { 269 wait(this.f); 270 271 // Did something crazy happen? 272 if(this.f.result > this.len) { 273 mutex(serr) serr | "HEADER sent too much!"; 274 return error(this, -ERANGE); 275 } 276 277 // Something failed? 278 if(this.f.result < 0) { 279 int error = -this.f.result; 280 if( error == ECONNRESET ) return error(this, -ECONNRESET); 281 if( error == EPIPE ) return error(this, -EPIPE); 282 if( error == ECANCELED ) { 283 mutex(serr) serr | "HEADER was cancelled, WTF!"; 284 return error(this, -ECONNRESET); 285 } 286 if( error == EAGAIN || error == EWOULDBLOCK) { 287 mutex(serr) serr | "HEADER got eagain, WTF!"; 288 return error(this, -ECONNRESET); 289 } 290 } 291 292 // Done? 293 if(this.f.result == this.len) { 294 return done(this.res); 295 } 296 297 // It must be a Short read 298 this.len -= this.f.result; 299 this.next += this.f.result; 300 reset(this.f); 301 return retry(this.res); 302 } 303 304 // Generator that handles splicing in a file 305 struct splice_in_t { 306 io_future_t f; 307 int fd; int pipe; size_t len; off_t off; 308 FSM_Result res; 309 }; 310 311 static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) { 312 this.fd = fd; 313 this.pipe = pipe; 314 this.len = len; 315 this.off = 0; 316 } 317 318 static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) { 319 zero_sqe(sqe); 320 sqe->opcode = IORING_OP_SPLICE; 321 sqe->user_data = (uintptr_t)&this.f; 322 sqe->flags = 0; 323 sqe->splice_fd_in = this.fd; 324 sqe->splice_off_in = this.off; 325 sqe->fd = this.pipe; 326 sqe->off = (__u64)-1; 327 sqe->len = this.len; 328 sqe->splice_flags = SPLICE_F_MOVE; 329 } 330 331 static inline int wait_and_process(splice_in_t & this) { 332 wait(this.f); 333 334 // Did something crazy happen? 335 if(this.f.result > this.len) { 336 mutex(serr) serr | "SPLICE IN spliced too much!"; 337 return error(this.res, -ERANGE); 338 } 339 340 // Something failed? 341 if(this.f.result < 0) { 342 int error = -this.f.result; 343 if( error == ECONNRESET ) return error(this.res, -ECONNRESET); 344 if( error == EPIPE ) return error(this.res, -EPIPE); 345 if( error == ECANCELED ) { 346 mutex(serr) serr | "SPLICE IN was cancelled, WTF!"; 347 return error(this.res, -ECONNRESET); 348 } 349 if( error == EAGAIN || error == EWOULDBLOCK) { 350 mutex(serr) serr | "SPLICE IN got eagain, WTF!"; 351 return error(this.res, -ECONNRESET); 352 } 353 } 354 355 // Done? 356 if(this.f.result == this.len) { 357 return done(this.res); 358 } 359 360 // It must be a Short read 361 this.len -= this.f.result; 362 this.off += this.f.result; 363 reset(this.f); 364 return retry(this.res); 365 } 366 367 generator splice_out_g { 368 io_future_t f; 369 int pipe; int fd; size_t len; 370 FSM_Result res; 371 }; 372 373 static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) { 374 this.pipe = pipe; 375 this.fd = fd; 376 this.len = len; 377 } 378 379 static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) { 380 zero_sqe(sqe); 381 sqe->opcode = IORING_OP_SPLICE; 382 sqe->user_data = (uintptr_t)&this.f; 383 sqe->flags = 0; 384 sqe->splice_fd_in = this.pipe; 385 sqe->splice_off_in = (__u64)-1; 386 sqe->fd = this.fd; 387 sqe->off = (__u64)-1; 388 sqe->len = this.len; 389 sqe->splice_flags = SPLICE_F_MOVE; 390 } 391 392 static inline int error(splice_out_g & this, int error) { 393 int ret = close(this.fd); 394 if( ret != 0 ) { 395 mutex(serr) serr | "Failed to close fd" | errno; 396 } 397 return error(this.res, error); 398 } 399 400 static inline void wait_and_process(splice_out_g & this) { 401 wait(this.f); 402 403 // Did something crazy happen? 404 if(this.f.result > this.len) { 405 mutex(serr) serr | "SPLICE OUT spliced too much!"; 406 return error(this.res, -ERANGE); 407 } 408 409 // Something failed? 410 if(this.f.result < 0) { 411 int error = -this.f.result; 412 if( error == ECONNRESET ) return error(this, -ECONNRESET); 413 if( error == EPIPE ) return error(this, -EPIPE); 414 if( error == ECANCELED ) { 415 this.f.result = 0; 416 goto SHORT_WRITE; 417 } 418 if( error == EAGAIN || error == EWOULDBLOCK) { 419 mutex(serr) serr | "SPLICE OUT got eagain, WTF!"; 420 return error(this, -ECONNRESET); 421 } 422 } 423 424 // Done? 425 if(this.f.result == this.len) { 426 return done(this.res); 427 } 428 429 SHORT_WRITE: 430 // It must be a Short Write 431 this.len -= this.f.result; 432 reset(this.f); 433 return retry(this.res); 434 } 435 436 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize ) { 437 #if defined(LINKED_IO) 438 char buffer[512]; 439 int len = fill_header(buffer, fsize); 440 header_g header = { fd, buffer, len }; 441 splice_in_t splice_in = { ans_fd, pipe[1], fsize }; 442 splice_out_g splice_out = { pipe[0], fd, fsize }; 443 444 RETRY_LOOP: for() { 445 int have = need(header.res) + need(splice_in.res) + 1; 446 int idx = 0; 447 struct io_uring_sqe * sqes[3]; 448 __u32 idxs[3]; 449 struct $io_context * ctx = cfa_io_allocate(sqes, idxs, have); 450 451 if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); } 452 if(need( header.res)) { fill(header , sqes[idx++]); } 453 fill(splice_out, sqes[idx]); 454 455 // Submit everything 456 asm volatile("": : :"memory"); 457 cfa_io_submit( ctx, idxs, have, false ); 458 459 // wait for the results 460 // Always wait for splice-in to complete as 461 // we may need to kill the connection if it fails 462 // If it already completed, this is a no-op 463 wait_and_process(splice_in); 464 465 if(is_error(splice_in.res)) { 466 mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error; 467 close(fd); 468 } 469 470 // Process the other 2 471 wait_and_process(header); 472 wait_and_process(splice_out); 473 474 if(is_done(splice_out.res)) { 475 break RETRY_LOOP; 476 } 477 478 // We need to wait for the completion if 479 // - both completed 480 // - the header failed 481 // - 482 483 if( is_error(header.res) 484 || is_error(splice_in.res) 485 || is_error(splice_out.res)) { 486 return -ECONNRESET; 487 } 488 } 489 490 return len + fsize; 491 #else 492 int ret = answer_header(fd, fsize); 493 if( ret < 0 ) return ret; 494 return sendfile(pipe, fd, ans_fd, fsize); 495 #endif 496 } 497 498 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) { 499 char * it = buffer; 500 size_t count = len - 1; 501 int rlen = 0; 502 READ: 503 for() { 504 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY); 505 // int ret = read(fd, (void*)it, count); 506 if(ret == 0 ) return [OK200, true, 0, 0]; 507 if(ret < 0 ) { 508 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ; 509 if( errno == ECONNRESET ) { close(fd); return [E408, true, 0, 0]; } 510 if( errno == EPIPE ) { close(fd); return [E408, true, 0, 0]; } 511 abort( "read error: (%d) %s\n", (int)errno, strerror(errno) ); 512 } 513 it[ret + 1] = '\0'; 514 rlen += ret; 515 516 if( strstr( it, "\r\n\r\n" ) ) break; 517 518 it += ret; 519 count -= ret; 520 521 if( count < 1 ) return [E414, false, 0, 0]; 522 } 523 524 if( options.log ) { 525 write(sout, buffer, rlen); 526 sout | nl; 527 } 528 529 it = buffer; 530 int ret = memcmp(it, "GET /", 5); 531 if( ret != 0 ) return [E400, false, 0, 0]; 532 it += 5; 533 534 char * end = strstr( it, " " ); 535 return [OK200, false, it, end - it]; 536 } 537 209 538 //============================================================================================= 210 539 … … 251 580 Time now = timeHiRes(); 252 581 strftime( buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now ); 253 if( options.log ) sout | "Updated date to '" | buff | "'";582 // if( options.log ) sout | "Updated date to '" | buff | "'"; 254 583 255 584 for(i; KNOWN_CODES) { … … 264 593 this.idx = (this.idx + 1) % 2; 265 594 266 if( options.log ) sout | "Date thread sleeping";595 // if( options.log ) sout | "Date thread sleeping"; 267 596 268 597 sleep(1`s); -
benchmark/io/http/protocol.hfa
r4087baf r3f39009 16 16 17 17 int answer_error( int fd, HttpCode code ); 18 int answer_header( int fd, size_t size );19 18 int answer_plaintext( int fd ); 20 19 int answer_empty( int fd ); 20 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t count ); 21 21 22 22 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len); 23 24 int sendfile( int pipe[2], int fd, int ans_fd, size_t count ); -
benchmark/io/http/worker.cfa
r4087baf r3f39009 122 122 } 123 123 124 // Send the header125 int ret = answer_header(fd, count);126 if( ret == -ECONNRESET ) break REQUEST;127 128 124 // Send the desired file 129 ret =sendfile( this.pipe, fd, ans_fd, count);125 int ret = answer_sendfile( this.pipe, fd, ans_fd, count); 130 126 if( ret == -ECONNRESET ) break REQUEST; 131 127 … … 134 130 135 131 if( options.log ) sout | "=== Connection closed ==="; 136 close(fd);137 132 continue CONNECTION; 138 133 }
Note: See TracChangeset
for help on using the changeset viewer.