Changeset 3263e2a4
- Timestamp:
- Feb 15, 2022, 4:28:29 PM (3 years ago)
- Branches:
- ADT, ast-experimental, enum, master, pthread-emulation, qualifiedEnum
- Children:
- 3a40df6, 5614a191
- Parents:
- 6dc17a3d
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/sendfile/producer.c
r6dc17a3d r3263e2a4 10 10 11 11 #include <errno.h> 12 #include <locale.h> 12 13 #include <time.h> 13 14 #include <unistd.h> … … 24 25 #include <netdb.h> 25 26 27 #include <liburing.h> 26 28 27 29 enum { … … 34 36 SENDFILE_ERROR, 35 37 SPLICEIN_ERROR, 36 SPLICEOUT_ERROR 38 SPLICEOUT_ERROR, 39 URINGWAIT_ERROR 37 40 }; 38 41 … … 43 46 44 47 int pipefd[2]; 48 struct io_uring ring; 45 49 46 50 struct stats { … … 56 60 static void my_sendfile(int out, int in, size_t size, struct stats *); 57 61 static void my_splice (int out, int in, size_t size, struct stats *); 62 static void my_iouring (int out, int in, size_t size, struct stats *); 63 static void my_ringlink(int out, int in, size_t size, struct stats *); 58 64 typedef void (*sender_t)(int out, int in, size_t size, struct stats *); 59 65 … … 61 67 62 68 int main(int argc, char * argv[]) { 69 setlocale(LC_ALL, ""); 63 70 const char * file_path; 64 71 struct addrinfo * addr; … … 167 174 DONE: 168 175 176 io_uring_queue_init(16, &ring, 0); 177 169 178 { 170 179 char addr_str[INET_ADDRSTRLEN]; … … 211 220 printf("--- sendfile ---\n"); 212 221 run(my_sendfile, addr, file_fd, file_size); 222 printf("--- io_uring ---\n"); 223 run(my_iouring, addr, file_fd, file_size); 224 printf("--- io_uring + link ---\n"); 225 run(my_ringlink, addr, file_fd, file_size); 213 226 214 227 close(pipefd[0]); … … 258 271 printf("Sent %'zu bytes in %'zu files, %f seconds\n", st.bytes, st.calls, secs); 259 272 printf(" - %'3.3f bytes per second\n", (((double)st.bytes) / secs)); 273 printf(" - %'f seconds per file\n", secs / st.calls); 260 274 printf(" - %'3.3f bytes per calls\n", (((double)st.bytes) / st.calls)); 261 275 if(st.shorts.r.cnt ){ … … 323 337 st->bytes += writes; 324 338 } 339 340 static ssize_t naive_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags) { 341 struct io_uring_sqe * sqe = io_uring_get_sqe(&ring); 342 343 io_uring_prep_splice(sqe, fd_in, NULL != off_in ? *off_in: -1, fd_out, NULL != off_out ? *off_out: -1, len, flags); 344 345 io_uring_submit(&ring); 346 347 struct io_uring_cqe * cqe = NULL; 348 /* wait for the sqe to complete */ 349 int ret = io_uring_wait_cqe_nr(&ring, &cqe, 1); 350 351 /* read and process cqe event */ 352 switch(ret) { 353 case 0: 354 { 355 ssize_t val = cqe->res; 356 if( cqe->res < 0 ) { 357 printf("Completion Error : %s\n", strerror( -cqe->res )); 358 return EXIT_FAILURE; 359 } 360 io_uring_cqe_seen(&ring, cqe); 361 return val; 362 } 363 default: 364 fprintf( stderr, "io_uring_wait error: (%d) %s\n\n", (int)-ret, strerror(-ret) ); 365 exit( URINGWAIT_ERROR ); 366 } 367 } 368 369 static void my_iouring (int out, int in, size_t size, struct stats * st) { 370 unsigned flags = 0; //SPLICE_F_MOVE; // | SPLICE_F_MORE; 371 off_t offset = 0; 372 size_t writes = 0; 373 for(;;) { 374 ssize_t reti = 0; 375 reti = naive_splice(in, &offset, pipefd[1], NULL, size, flags); 376 if( reti < 0 ) { 377 fprintf( stderr, "splice in error: (%d) %s\n\n", (int)errno, strerror(errno) ); 378 exit( SPLICEIN_ERROR ); 379 } 380 381 size -= reti; 382 size_t in_pipe = reti; 383 for(;;) { 384 ssize_t reto = 0; 385 reto = naive_splice(pipefd[0], NULL, out, NULL, in_pipe, flags); 386 if( reto < 0 ) { 387 fprintf( stderr, "splice out error: (%d) %s\n\n", (int)errno, strerror(errno) ); 388 exit( SPLICEOUT_ERROR ); 389 } 390 in_pipe -= reto; 391 writes += reto; 392 if(0 == in_pipe) break; 393 st->shorts.w.cnt++; 394 st->shorts.w.bytes += reto; 395 } 396 if(0 == size) break; 397 st->shorts.r.cnt++; 398 st->shorts.r.bytes += reti; 399 } 400 st->calls++; 401 st->bytes += writes; 402 } 403 404 static void my_ringlink(int out, int in, size_t size, struct stats * st) { 405 enum { SPLICE_IN, SPLICE_OUT }; 406 407 size_t in_pipe = size; 408 off_t offset = 0; 409 bool has_in = false; 410 bool has_out = false; 411 while(true) { 412 if(!has_in && size > 0) { 413 struct io_uring_sqe * sqe = io_uring_get_sqe(&ring); 414 io_uring_prep_splice(sqe, in, offset, pipefd[1], -1, size, 0); 415 sqe->user_data = SPLICE_IN; 416 has_in = true; 417 } 418 if(!has_out) { 419 struct io_uring_sqe * sqe = io_uring_get_sqe(&ring); 420 io_uring_prep_splice(sqe, pipefd[0], -1, out, -1, in_pipe, 0); 421 sqe->user_data = SPLICE_OUT; 422 if(has_in) sqe->flags = IOSQE_IO_LINK; 423 has_out = true; 424 } 425 426 int ret = io_uring_submit_and_wait(&ring, 1); 427 if(ret < 0) { 428 fprintf( stderr, "io_uring_submit error: (%d) %s\n\n", (int)-ret, strerror(-ret) ); 429 exit( URINGWAIT_ERROR ); 430 } 431 432 /* poll the cq and count how much polling we did */ 433 while(true) { 434 struct io_uring_cqe * cqe = NULL; 435 /* wait for the sqe to complete */ 436 int ret = io_uring_wait_cqe_nr(&ring, &cqe, 0); 437 438 /* read and process cqe event */ 439 switch(ret) { 440 case 0: 441 if( cqe->res < 0 ) { 442 printf("Completion Error : %s\n", strerror( -cqe->res )); 443 exit( URINGWAIT_ERROR ); 444 } 445 446 ssize_t write = cqe->res; 447 int which = cqe->user_data; 448 io_uring_cqe_seen(&ring, cqe); 449 switch( which ) { 450 case SPLICE_IN: 451 has_in = false; 452 size -= write; 453 offset += write; 454 if(0 == size) break; 455 st->shorts.r.cnt++; 456 st->shorts.r.bytes += write; 457 break; 458 case SPLICE_OUT: 459 has_out = false; 460 in_pipe -= write; 461 st->bytes += write; 462 if(0 == in_pipe) break; 463 st->shorts.w.cnt++; 464 st->shorts.w.bytes += write; 465 break; 466 default: 467 printf("Completion Error : unknown user data\n"); 468 exit( URINGWAIT_ERROR ); 469 } 470 continue; 471 case -EAGAIN: 472 goto OUTER; 473 default: 474 fprintf( stderr, "io_uring_get_cqe error: (%d) %s\n\n", (int)-ret, strerror(-ret) ); 475 exit( URINGWAIT_ERROR ); 476 } 477 } 478 OUTER: 479 if(0 == in_pipe) break; 480 } 481 st->calls++; 482 }
Note: See TracChangeset
for help on using the changeset viewer.