Changeset 7770cc8 for benchmark/io/http/protocol.cfa
- Timestamp:
- Nov 24, 2021, 9:47:56 PM (4 years ago)
- Branches:
- ADT, ast-experimental, enum, master, pthread-emulation, qualifiedEnum
- Children:
- 5235d49
- Parents:
- 94647b0b (diff), 3cc1111 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - File:
-
- 1 edited
-
benchmark/io/http/protocol.cfa (modified) (9 diffs)
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/protocol.cfa
r94647b0b r7770cc8 11 11 #include <fstream.hfa> 12 12 #include <iofwd.hfa> 13 #include <io/types.hfa> 14 #include <mutex_stmt.hfa> 13 15 14 16 #include <assert.h> … … 26 28 #define PLAINTEXT_MEMCPY 27 29 #define PLAINTEXT_NOCOPY 30 #define LINKED_IO 28 31 29 32 struct https_msg_str { … … 53 56 } 54 57 55 static inline int answer( int fd, const char * it, int len ) {58 static inline int answer( int fd, const char * it, int len ) { 56 59 while(len > 0) { 57 60 // Call write 58 61 int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY); 59 62 if( ret < 0 ) { 60 if( errno == ECONNRESET || errno == EPIPE ) return -ECONNRESET;63 if( errno == ECONNRESET || errno == EPIPE ) { close(fd); return -ECONNRESET; } 61 64 if( errno == EAGAIN || errno == EWOULDBLOCK) return -EAGAIN; 62 65 … … 77 80 } 78 81 79 int answer_header( int fd, size_t size ) { 80 char buffer[512]; 81 char * it = buffer; 82 static int fill_header(char * it, size_t size) { 82 83 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len); 83 84 it += http_msgs[OK200]->len; 84 85 int len = http_msgs[OK200]->len; 85 86 len += snprintf(it, 512 - len, "%d \n\n", size); 87 return len; 88 } 89 90 static int answer_header( int fd, size_t size ) { 91 char buffer[512]; 92 int len = fill_header(buffer, size); 86 93 return answer( fd, buffer, len ); 87 94 } … … 135 142 } 136 143 137 138 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) { 139 char * it = buffer; 140 size_t count = len - 1; 141 int rlen = 0; 142 READ: 143 for() { 144 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY); 145 // int ret = read(fd, (void*)it, count); 146 if(ret == 0 ) return [OK200, true, 0, 0]; 147 if(ret < 0 ) { 148 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ; 149 if( errno == ECONNRESET ) return [E408, true, 0, 0]; 150 if( errno == EPIPE ) return [E408, true, 0, 0]; 151 abort( "read error: (%d) %s\n", (int)errno, strerror(errno) ); 152 } 153 it[ret + 1] = '\0'; 154 rlen += ret; 155 156 if( strstr( it, "\r\n\r\n" ) ) break; 157 158 it += ret; 159 count -= ret; 160 161 if( count < 1 ) return [E414, false, 0, 0]; 162 } 163 164 if( options.log ) { 165 write(sout, buffer, rlen); 166 sout | nl; 167 } 168 169 it = buffer; 170 int ret = memcmp(it, "GET /", 5); 171 if( ret != 0 ) return [E400, false, 0, 0]; 172 it += 5; 173 174 char * end = strstr( it, " " ); 175 return [OK200, false, it, end - it]; 176 } 177 178 int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) { 144 static int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) { 179 145 unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE; 180 146 off_t offset = 0; … … 207 173 } 208 174 175 static void zero_sqe(struct io_uring_sqe * sqe) { 176 sqe->flags = 0; 177 sqe->ioprio = 0; 178 sqe->fd = 0; 179 sqe->off = 0; 180 sqe->addr = 0; 181 sqe->len = 0; 182 sqe->fsync_flags = 0; 183 sqe->__pad2[0] = 0; 184 sqe->__pad2[1] = 0; 185 sqe->__pad2[2] = 0; 186 sqe->fd = 0; 187 sqe->off = 0; 188 sqe->addr = 0; 189 sqe->len = 0; 190 } 191 192 enum FSM_STATE { 193 Initial, 194 Retry, 195 Error, 196 Done, 197 }; 198 199 struct FSM_Result { 200 FSM_STATE state; 201 int error; 202 }; 203 204 static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; } 205 static inline bool is_error(FSM_Result & this) { return Error == this.state; } 206 static inline bool is_done(FSM_Result & this) { return Done == this.state; } 207 208 static inline int error(FSM_Result & this, int error) { 209 this.error = error; 210 this.state = Error; 211 return error; 212 } 213 214 static inline int done(FSM_Result & this) { 215 this.state = Done; 216 return 0; 217 } 218 219 static inline int retry(FSM_Result & this) { 220 this.state = Retry; 221 return 0; 222 } 223 224 static inline int need(FSM_Result & this) { 225 switch(this.state) { 226 case Initial: 227 case Retry: 228 return 1; 229 case Error: 230 if(this.error == 0) mutex(serr) serr | "State marked error but code is 0"; 231 case Done: 232 return 0; 233 } 234 } 235 236 // Generator that handles sending the header 237 generator header_g { 238 io_future_t f; 239 const char * next; 240 int fd; size_t len; 241 FSM_Result res; 242 }; 243 244 static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) { 245 this.next = it; 246 this.fd = fd; 247 this.len = len; 248 } 249 250 static inline void fill(header_g & this, struct io_uring_sqe * sqe) { 251 zero_sqe(sqe); 252 sqe->opcode = IORING_OP_SEND; 253 sqe->user_data = (uintptr_t)&this.f; 254 sqe->flags = IOSQE_IO_LINK; 255 sqe->fd = this.fd; 256 sqe->addr = (uintptr_t)this.next; 257 sqe->len = this.len; 258 } 259 260 static inline int error(header_g & this, int error) { 261 int ret = close(this.fd); 262 if( ret != 0 ) { 263 mutex(serr) serr | "Failed to close fd" | errno; 264 } 265 return error(this.res, error); 266 } 267 268 static inline int wait_and_process(header_g & this) { 269 wait(this.f); 270 271 // Did something crazy happen? 272 if(this.f.result > this.len) { 273 mutex(serr) serr | "HEADER sent too much!"; 274 return error(this, -ERANGE); 275 } 276 277 // Something failed? 278 if(this.f.result < 0) { 279 int error = -this.f.result; 280 if( error == ECONNRESET ) return error(this, -ECONNRESET); 281 if( error == EPIPE ) return error(this, -EPIPE); 282 if( error == ECANCELED ) { 283 mutex(serr) serr | "HEADER was cancelled, WTF!"; 284 return error(this, -ECONNRESET); 285 } 286 if( error == EAGAIN || error == EWOULDBLOCK) { 287 mutex(serr) serr | "HEADER got eagain, WTF!"; 288 return error(this, -ECONNRESET); 289 } 290 } 291 292 // Done? 293 if(this.f.result == this.len) { 294 return done(this.res); 295 } 296 297 // It must be a Short read 298 this.len -= this.f.result; 299 this.next += this.f.result; 300 reset(this.f); 301 return retry(this.res); 302 } 303 304 // Generator that handles splicing in a file 305 struct splice_in_t { 306 io_future_t f; 307 int fd; int pipe; size_t len; off_t off; 308 FSM_Result res; 309 }; 310 311 static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) { 312 this.fd = fd; 313 this.pipe = pipe; 314 this.len = len; 315 this.off = 0; 316 } 317 318 static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) { 319 zero_sqe(sqe); 320 sqe->opcode = IORING_OP_SPLICE; 321 sqe->user_data = (uintptr_t)&this.f; 322 sqe->flags = 0; 323 sqe->splice_fd_in = this.fd; 324 sqe->splice_off_in = this.off; 325 sqe->fd = this.pipe; 326 sqe->off = (__u64)-1; 327 sqe->len = this.len; 328 sqe->splice_flags = SPLICE_F_MOVE; 329 } 330 331 static inline int wait_and_process(splice_in_t & this) { 332 wait(this.f); 333 334 // Something failed? 335 if(this.f.result < 0) { 336 int error = -this.f.result; 337 if( error == ECONNRESET ) return error(this.res, -ECONNRESET); 338 if( error == EPIPE ) return error(this.res, -EPIPE); 339 if( error == ECANCELED ) { 340 mutex(serr) serr | "SPLICE IN was cancelled, WTF!"; 341 return error(this.res, -ECONNRESET); 342 } 343 if( error == EAGAIN || error == EWOULDBLOCK) { 344 mutex(serr) serr | "SPLICE IN got eagain, WTF!"; 345 return error(this.res, -ECONNRESET); 346 } 347 } 348 349 // Did something crazy happen? 350 if(this.f.result > this.len) { 351 mutex(serr) serr | "SPLICE IN spliced too much!"; 352 return error(this.res, -ERANGE); 353 } 354 355 // Done? 356 if(this.f.result == this.len) { 357 return done(this.res); 358 } 359 360 // It must be a Short read 361 this.len -= this.f.result; 362 this.off += this.f.result; 363 reset(this.f); 364 return retry(this.res); 365 } 366 367 generator splice_out_g { 368 io_future_t f; 369 int pipe; int fd; size_t len; 370 FSM_Result res; 371 }; 372 373 static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) { 374 this.pipe = pipe; 375 this.fd = fd; 376 this.len = len; 377 } 378 379 static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) { 380 zero_sqe(sqe); 381 sqe->opcode = IORING_OP_SPLICE; 382 sqe->user_data = (uintptr_t)&this.f; 383 sqe->flags = 0; 384 sqe->splice_fd_in = this.pipe; 385 sqe->splice_off_in = (__u64)-1; 386 sqe->fd = this.fd; 387 sqe->off = (__u64)-1; 388 sqe->len = this.len; 389 sqe->splice_flags = SPLICE_F_MOVE; 390 } 391 392 static inline int error(splice_out_g & this, int error) { 393 int ret = close(this.fd); 394 if( ret != 0 ) { 395 mutex(serr) serr | "Failed to close fd" | errno; 396 } 397 return error(this.res, error); 398 } 399 400 static inline void wait_and_process(splice_out_g & this) { 401 wait(this.f); 402 403 // Something failed? 404 if(this.f.result < 0) { 405 int error = -this.f.result; 406 if( error == ECONNRESET ) return error(this, -ECONNRESET); 407 if( error == EPIPE ) return error(this, -EPIPE); 408 if( error == ECANCELED ) { 409 this.f.result = 0; 410 goto SHORT_WRITE; 411 } 412 if( error == EAGAIN || error == EWOULDBLOCK) { 413 mutex(serr) serr | "SPLICE OUT got eagain, WTF!"; 414 return error(this, -ECONNRESET); 415 } 416 } 417 418 // Did something crazy happen? 419 if(this.f.result > this.len) { 420 mutex(serr) serr | "SPLICE OUT spliced too much!" | this.f.result | ">" | this.len; 421 return error(this.res, -ERANGE); 422 } 423 424 // Done? 425 if(this.f.result == this.len) { 426 return done(this.res); 427 } 428 429 SHORT_WRITE: 430 // It must be a Short Write 431 this.len -= this.f.result; 432 reset(this.f); 433 return retry(this.res); 434 } 435 436 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize ) { 437 #if defined(LINKED_IO) 438 char buffer[512]; 439 int len = fill_header(buffer, fsize); 440 header_g header = { fd, buffer, len }; 441 splice_in_t splice_in = { ans_fd, pipe[1], fsize }; 442 splice_out_g splice_out = { pipe[0], fd, fsize }; 443 444 RETRY_LOOP: for() { 445 int have = need(header.res) + need(splice_in.res) + 1; 446 int idx = 0; 447 struct io_uring_sqe * sqes[3]; 448 __u32 idxs[3]; 449 struct $io_context * ctx = cfa_io_allocate(sqes, idxs, have); 450 451 if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); } 452 if(need( header.res)) { fill(header , sqes[idx++]); } 453 fill(splice_out, sqes[idx]); 454 455 // Submit everything 456 asm volatile("": : :"memory"); 457 cfa_io_submit( ctx, idxs, have, false ); 458 459 // wait for the results 460 // Always wait for splice-in to complete as 461 // we may need to kill the connection if it fails 462 // If it already completed, this is a no-op 463 wait_and_process(splice_in); 464 465 if(is_error(splice_in.res)) { 466 mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error; 467 close(fd); 468 } 469 470 // Process the other 2 471 wait_and_process(header); 472 wait_and_process(splice_out); 473 474 if(is_done(splice_out.res)) { 475 break RETRY_LOOP; 476 } 477 478 // We need to wait for the completion if 479 // - both completed 480 // - the header failed 481 // - 482 483 if( is_error(header.res) 484 || is_error(splice_in.res) 485 || is_error(splice_out.res)) { 486 return -ECONNRESET; 487 } 488 } 489 490 return len + fsize; 491 #else 492 int ret = answer_header(fd, fsize); 493 if( ret < 0 ) { close(fd); return ret; } 494 return sendfile(pipe, fd, ans_fd, fsize); 495 #endif 496 } 497 498 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) { 499 char * it = buffer; 500 size_t count = len - 1; 501 int rlen = 0; 502 READ: 503 for() { 504 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY); 505 // int ret = read(fd, (void*)it, count); 506 if(ret == 0 ) return [OK200, true, 0, 0]; 507 if(ret < 0 ) { 508 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ; 509 if( errno == ECONNRESET ) { close(fd); return [E408, true, 0, 0]; } 510 if( errno == EPIPE ) { close(fd); return [E408, true, 0, 0]; } 511 abort( "read error: (%d) %s\n", (int)errno, strerror(errno) ); 512 } 513 it[ret + 1] = '\0'; 514 rlen += ret; 515 516 if( strstr( it, "\r\n\r\n" ) ) break; 517 518 it += ret; 519 count -= ret; 520 521 if( count < 1 ) return [E414, false, 0, 0]; 522 } 523 524 if( options.log ) { 525 write(sout, buffer, rlen); 526 sout | nl; 527 } 528 529 it = buffer; 530 int ret = memcmp(it, "GET /", 5); 531 if( ret != 0 ) return [E400, false, 0, 0]; 532 it += 5; 533 534 char * end = strstr( it, " " ); 535 return [OK200, false, it, end - it]; 536 } 537 209 538 //============================================================================================= 210 539 … … 214 543 215 544 const char * original_http_msgs[] = { 216 "HTTP/1.1 200 OK\nServer: Htt oForall\nDate: %s \nContent-Type: text/plain\nContent-Length: ",217 "HTTP/1.1 200 OK\ nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 15\n\nHello, World!\n\n",218 "HTTP/1.1 400 Bad Request\nServer: Htt oForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",219 "HTTP/1.1 404 Not Found\nServer: Htt oForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",220 "HTTP/1.1 405 Method Not Allowed\nServer: Htt oForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",221 "HTTP/1.1 408 Request Timeout\nServer: Htt oForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",222 "HTTP/1.1 413 Payload Too Large\nServer: Htt oForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",223 "HTTP/1.1 414 URI Too Long\nServer: Htt oForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",545 "HTTP/1.1 200 OK\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: ", 546 "HTTP/1.1 200 OK\r\nServer: HttpForall\r\nConnection: keep-alive\r\nContent-Length: 15\r\nContent-Type: text/html\r\nDate: %s \r\n\r\nHello, World!\r\n", 547 "HTTP/1.1 400 Bad Request\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 548 "HTTP/1.1 404 Not Found\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 549 "HTTP/1.1 405 Method Not Allowed\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 550 "HTTP/1.1 408 Request Timeout\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 551 "HTTP/1.1 413 Payload Too Large\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 552 "HTTP/1.1 414 URI Too Long\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 224 553 }; 225 554 … … 251 580 Time now = timeHiRes(); 252 581 strftime( buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now ); 253 sout | "Updated date to '" | buff | "'";582 // if( options.log ) sout | "Updated date to '" | buff | "'"; 254 583 255 584 for(i; KNOWN_CODES) { … … 264 593 this.idx = (this.idx + 1) % 2; 265 594 266 sout | "Date thread sleeping";595 // if( options.log ) sout | "Date thread sleeping"; 267 596 268 597 sleep(1`s);
Note:
See TracChangeset
for help on using the changeset viewer.