Changes in / [c680a4b:44aad8f]
- Files:
-
- 1 added
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/readv.cfa
rc680a4b r44aad8f 18 18 19 19 extern bool traceHeapOn(); 20 extern ssize_t async_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);20 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags); 21 21 22 22 int fd; … … 34 34 35 35 while(__atomic_load_n(&run, __ATOMIC_RELAXED)) { 36 async_preadv2(fd, &iov, 1, 0, 0);36 cfa_preadv2(fd, &iov, 1, 0, 0); 37 37 __atomic_fetch_add( &count, 1, __ATOMIC_SEQ_CST ); 38 38 } … … 40 40 41 41 int main(int argc, char * argv[]) { 42 #if !defined(__CFA_NO_STATISTICS__) 43 print_stats_at_exit( *active_cluster() ); 44 #endif 45 42 46 double duration = 5.0; 43 47 unsigned long int nthreads = 2; … … 46 50 printf("Setting local\n"); 47 51 setlocale(LC_NUMERIC, ""); 48 49 52 50 53 arg_loop: -
examples/io/simple/server.cfa
rc680a4b r44aad8f 51 51 52 52 //---------- 53 extern ssize_t async_recvmsg(int sockfd, struct msghdr *msg, int flags);54 extern int async_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);55 extern int async_close(int fd);53 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags); 54 extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); 55 extern int cfa_close(int fd); 56 56 57 57 //---------- … … 88 88 struct sockaddr_in cli_addr; 89 89 __socklen_t clilen = sizeof(cli_addr); 90 int newsock = async_accept4(sock, (struct sockaddr *) &cli_addr, &clilen, 0);90 int newsock = cfa_accept4(sock, (struct sockaddr *) &cli_addr, &clilen, 0); 91 91 if (newsock < 0) { 92 92 error( printer, "accept", -newsock); … … 97 97 98 98 while(1) { 99 int res = async_recvmsg(newsock, &msg, 0);99 int res = cfa_recvmsg(newsock, &msg, 0); 100 100 if(res == 0) break; 101 101 if(res < 0) { … … 107 107 } 108 108 109 ret = async_close(newsock);109 ret = cfa_close(newsock); 110 110 if(ret < 0) { 111 111 error( printer, "close new", -ret); … … 113 113 } 114 114 115 ret = async_close(sock);115 ret = cfa_close(sock); 116 116 if(ret < 0) { 117 117 error( printer, "close old", -ret); -
libcfa/src/concurrency/io.cfa
rc680a4b r44aad8f 1 // 2 // Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo 3 // 4 // The contents of this file are covered under the licence agreement in the 5 // file "LICENCE" distributed with Cforall. 6 // 7 // io.cfa -- 8 // 9 // Author : Thierry Delisle 10 // Created On : Thu Apr 23 17:31:00 2020 11 // Last Modified By : 12 // Last Modified On : 13 // Update Count : 14 // 15 1 16 #include "kernel.hfa" 2 17 … … 8 23 void __kernel_io_shutdown( cluster & this ) { 9 24 // Nothing to do without io_uring 10 }11 12 bool is_async( void (*)() ) {13 return false;14 25 } 15 26 … … 90 101 91 102 // Requires features 92 // // adjust the size according to the parameters 93 // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 94 // cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz); 95 // } 103 #if defined(IORING_FEAT_SINGLE_MMAP) 104 // adjust the size according to the parameters 105 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 106 cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz); 107 } 108 #endif 96 109 97 110 // mmap the Submit Queue into existence … … 101 114 } 102 115 103 // mmap the Completion Queue into existence (may or may not be needed)104 116 // Requires features 105 // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 106 // cq->ring_ptr = sq->ring_ptr; 107 // } 108 // else { 117 #if defined(IORING_FEAT_SINGLE_MMAP) 118 // mmap the Completion Queue into existence (may or may not be needed) 119 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 120 cq->ring_ptr = sq->ring_ptr; 121 } 122 else 123 #endif 124 { 109 125 // We need multiple call to MMAP 110 126 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING); … … 113 129 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno)); 114 130 } 115 //}131 } 116 132 117 133 // mmap the submit queue entries … … 160 176 (this.io.submit){ min(*sq.num, *cq.num) }; 161 177 178 // Initialize statistics 179 #if !defined(__CFA_NO_STATISTICS__) 180 this.io.submit_q.stats.submit_avg.val = 0; 181 this.io.submit_q.stats.submit_avg.cnt = 0; 182 this.io.completion_q.stats.completed_avg.val = 0; 183 this.io.completion_q.stats.completed_avg.cnt = 0; 184 #endif 185 162 186 // Create the poller thread 163 187 this.io.stack = __create_pthread( &this.io.poller, __io_poller, &this ); … … 174 198 pthread_join( this.io.poller, 0p ); 175 199 free( this.io.stack ); 200 201 // print statistics 202 #if !defined(__CFA_NO_STATISTICS__) 203 if(this.print_stats) { 204 __cfaabi_bits_print_safe( STDERR_FILENO, 205 "----- I/O uRing Stats -----\n" 206 "- total submit calls : %llu\n" 207 "- avg submit : %lf\n" 208 "- total wait calls : %llu\n" 209 "- avg completion/wait : %lf\n", 210 this.io.submit_q.stats.submit_avg.cnt, 211 ((double)this.io.submit_q.stats.submit_avg.val) / this.io.submit_q.stats.submit_avg.cnt, 212 this.io.completion_q.stats.completed_avg.cnt, 213 ((double)this.io.completion_q.stats.completed_avg.val) / this.io.completion_q.stats.completed_avg.cnt 214 ); 215 } 216 #endif 176 217 177 218 // Shutdown the io rings … … 204 245 // Process a single completion message from the io_uring 205 246 // This is NOT thread-safe 206 static bool __io_process(struct io_ring & ring) { 247 int __drain_io( struct io_ring & ring, sigset_t & mask, int waitcnt ) { 248 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, &mask, _NSIG / 8); 249 if( ret < 0 ) { 250 switch((int)errno) { 251 case EAGAIN: 252 case EINTR: 253 return -EAGAIN; 254 default: 255 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 256 } 257 } 258 259 // Drain the queue 207 260 unsigned head = *ring.completion_q.head; 208 261 unsigned tail = __atomic_load_n(ring.completion_q.tail, __ATOMIC_ACQUIRE); 209 262 210 if (head == tail) return false; 211 212 unsigned idx = head & (*ring.completion_q.mask); 213 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; 214 215 /* paranoid */ verify(&cqe); 216 217 struct io_user_data * data = (struct io_user_data *)cqe.user_data; 218 // __cfaabi_bits_print_safe( STDERR_FILENO, "Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd ); 219 220 data->result = cqe.res; 221 __unpark( data->thrd __cfaabi_dbg_ctx2 ); 263 // Nothing was new return 0 264 if (head == tail) { 265 #if !defined(__CFA_NO_STATISTICS__) 266 ring.completion_q.stats.completed_avg.cnt += 1; 267 #endif 268 return 0; 269 } 270 271 uint32_t count = tail - head; 272 for(i; count) { 273 unsigned idx = (head + i) & (*ring.completion_q.mask); 274 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; 275 276 /* paranoid */ verify(&cqe); 277 278 struct io_user_data * data = (struct io_user_data *)cqe.user_data; 279 // __cfaabi_bits_print_safe( STDERR_FILENO, "Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd ); 280 281 data->result = cqe.res; 282 __unpark( data->thrd __cfaabi_dbg_ctx2 ); 283 } 222 284 223 285 // Allow new submissions to happen 224 V(ring.submit );286 V(ring.submit, count); 225 287 226 288 // Mark to the kernel that the cqe has been seen 227 289 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 228 __atomic_fetch_add( ring.completion_q.head, 1, __ATOMIC_RELAXED ); 229 230 return true; 290 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED ); 291 292 // Update statistics 293 #if !defined(__CFA_NO_STATISTICS__) 294 ring.completion_q.stats.completed_avg.val += count; 295 ring.completion_q.stats.completed_avg.cnt += 1; 296 #endif 297 298 return count; 231 299 } 232 300 … … 246 314 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) ); 247 315 248 LOOP: while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 249 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, 1, IORING_ENTER_GETEVENTS, &mask, _NSIG / 8); 250 if( ret < 0 ) { 251 switch((int)errno) { 252 case EAGAIN: 253 case EINTR: 254 continue LOOP; 255 default: 256 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 257 } 258 } 259 260 // Drain the queue 261 while(__io_process(ring)) {} 316 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 317 __drain_io( ring, mask, 1 ); 262 318 } 263 319 … … 293 349 // 294 350 295 static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct io_ring & ring ) { 296 // Wait for a spot to be available 297 P(ring.submit); 298 299 // Allocate the sqe 300 uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST); 301 302 // Validate that we didn't overflow anything 303 // Check that nothing overflowed 304 /* paranoid */ verify( true ); 305 306 // Check that it goes head -> tail -> alloc and never head -> alloc -> tail 307 /* paranoid */ verify( true ); 308 309 // Return the sqe 310 return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx]; 311 } 312 313 static inline void __submit( struct io_ring & ring, uint32_t idx ) { 314 // get mutual exclusion 315 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 316 317 // Append to the list of ready entries 318 uint32_t * tail = ring.submit_q.tail; 319 const uint32_t mask = *ring.submit_q.mask; 320 321 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 322 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 323 324 // Submit however, many entries need to be submitted 325 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0); 326 // __cfaabi_bits_print_safe( STDERR_FILENO, "Performed io_submit, returned %d\n", ret ); 327 if( ret < 0 ) { 328 switch((int)errno) { 329 default: 330 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 331 } 332 } 333 334 unlock(ring.submit_q.lock); 335 // Make sure that idx was submitted 336 // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us 337 } 338 339 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) { 340 this.opcode = opcode; 341 #if !defined(IOSQE_ASYNC) 342 this.flags = 0; 343 #else 344 this.flags = IOSQE_ASYNC; 345 #endif 346 this.ioprio = 0; 347 this.fd = fd; 348 this.off = 0; 349 this.addr = 0; 350 this.len = 0; 351 this.rw_flags = 0; 352 this.__pad2[0] = this.__pad2[1] = this.__pad2[2] = 0; 353 } 354 355 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd, void * addr, uint32_t len, uint64_t off ) { 356 (this){ opcode, fd }; 357 this.off = off; 358 this.addr = (uint64_t)addr; 359 this.len = len; 360 } 351 static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct io_ring & ring ) { 352 // Wait for a spot to be available 353 P(ring.submit); 354 355 // Allocate the sqe 356 uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST); 357 358 // Validate that we didn't overflow anything 359 // Check that nothing overflowed 360 /* paranoid */ verify( true ); 361 362 // Check that it goes head -> tail -> alloc and never head -> alloc -> tail 363 /* paranoid */ verify( true ); 364 365 // Return the sqe 366 return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx]; 367 } 368 369 static inline void __submit( struct io_ring & ring, uint32_t idx ) { 370 // get mutual exclusion 371 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 372 373 // Append to the list of ready entries 374 uint32_t * tail = ring.submit_q.tail; 375 const uint32_t mask = *ring.submit_q.mask; 376 377 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 378 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 379 380 // Submit however, many entries need to be submitted 381 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0); 382 // __cfaabi_bits_print_safe( STDERR_FILENO, "Performed io_submit, returned %d\n", ret ); 383 if( ret < 0 ) { 384 switch((int)errno) { 385 default: 386 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 387 } 388 } 389 390 // update statistics 391 #if !defined(__CFA_NO_STATISTICS__) 392 ring.submit_q.stats.submit_avg.val += 1; 393 ring.submit_q.stats.submit_avg.cnt += 1; 394 #endif 395 396 unlock(ring.submit_q.lock); 397 // Make sure that idx was submitted 398 // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us 399 } 400 401 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) { 402 this.opcode = opcode; 403 #if !defined(IOSQE_ASYNC) 404 this.flags = 0; 405 #else 406 this.flags = IOSQE_ASYNC; 407 #endif 408 this.ioprio = 0; 409 this.fd = fd; 410 this.off = 0; 411 this.addr = 0; 412 this.len = 0; 413 this.rw_flags = 0; 414 this.__pad2[0] = this.__pad2[1] = this.__pad2[2] = 0; 415 } 416 417 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd, void * addr, uint32_t len, uint64_t off ) { 418 (this){ opcode, fd }; 419 this.off = off; 420 this.addr = (uint64_t)addr; 421 this.len = len; 422 } 423 #endif 361 424 362 425 //============================================================================================= 363 426 // I/O Interface 364 427 //============================================================================================= 365 extern "C" { 366 #define __USE_GNU 367 #define _GNU_SOURCE 368 #include <fcntl.h> 369 #include <sys/uio.h> 370 #include <sys/socket.h> 371 #include <sys/stat.h> 372 } 373 428 #if defined(HAVE_LINUX_IO_URING_H) 374 429 #define __submit_prelude \ 375 430 struct io_ring & ring = active_cluster()->io; \ … … 385 440 park( __cfaabi_dbg_ctx ); \ 386 441 return data.result; 442 #endif 443 444 // Some forward declarations 445 extern "C" { 446 #include <sys/types.h> 447 struct iovec; 448 extern ssize_t preadv2 (int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags); 449 extern ssize_t pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags); 450 451 extern int fsync(int fd); 452 extern int sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags); 453 454 struct msghdr; 455 struct sockaddr; 456 extern ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags); 457 extern ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags); 458 extern ssize_t send(int sockfd, const void *buf, size_t len, int flags); 459 extern ssize_t recv(int sockfd, void *buf, size_t len, int flags); 460 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); 461 extern int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen); 462 463 extern int fallocate(int fd, int mode, uint64_t offset, uint64_t len); 464 extern int posix_fadvise(int fd, uint64_t offset, uint64_t len, int advice); 465 extern int madvise(void *addr, size_t length, int advice); 466 467 extern int openat(int dirfd, const char *pathname, int flags, mode_t mode); 468 extern int close(int fd); 469 470 struct statx; 471 extern int statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf); 472 473 extern ssize_t read (int fd, void *buf, size_t count); 474 } 387 475 388 476 //----------------------------------------------------------------------------- 389 477 // Asynchronous operations 390 ssize_t async_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 391 #if !defined(IORING_OP_READV) 392 return preadv2(fd, iov, iovcnt, offset, flags); 393 #else 394 __submit_prelude 395 396 (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset }; 397 398 __submit_wait 399 #endif 400 } 401 402 ssize_t async_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 403 #if !defined(IORING_OP_WRITEV) 404 return pwritev2(fd, iov, iovcnt, offset, flags); 405 #else 406 __submit_prelude 407 408 (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset }; 409 410 __submit_wait 411 #endif 412 } 413 414 int async_fsync(int fd) { 415 #if !defined(IORING_OP_FSYNC) 416 return fsync(fd); 417 #else 418 __submit_prelude 419 420 (*sqe){ IORING_OP_FSYNC, fd }; 421 422 __submit_wait 423 #endif 424 } 425 426 int async_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags) { 427 #if !defined(IORING_OP_SYNC_FILE_RANGE) 428 return sync_file_range(fd, offset, nbytes, flags); 429 #else 430 __submit_prelude 431 432 (*sqe){ IORING_OP_SYNC_FILE_RANGE, fd }; 433 sqe->off = offset; 434 sqe->len = nbytes; 435 sqe->sync_range_flags = flags; 436 437 __submit_wait 438 #endif 439 } 440 441 442 ssize_t async_sendmsg(int sockfd, const struct msghdr *msg, int flags) { 443 #if !defined(IORING_OP_SENDMSG) 444 return recv(sockfd, msg, flags); 445 #else 446 __submit_prelude 447 448 (*sqe){ IORING_OP_SENDMSG, sockfd, msg, 1, 0 }; 449 sqe->msg_flags = flags; 450 451 __submit_wait 452 #endif 453 } 454 455 ssize_t async_recvmsg(int sockfd, struct msghdr *msg, int flags) { 456 #if !defined(IORING_OP_RECVMSG) 457 return recv(sockfd, msg, flags); 458 #else 459 __submit_prelude 460 461 (*sqe){ IORING_OP_RECVMSG, sockfd, msg, 1, 0 }; 462 sqe->msg_flags = flags; 463 464 __submit_wait 465 #endif 466 } 467 468 ssize_t async_send(int sockfd, const void *buf, size_t len, int flags) { 469 #if !defined(IORING_OP_SEND) 470 return send( sockfd, buf, len, flags ); 471 #else 472 __submit_prelude 473 474 (*sqe){ IORING_OP_SEND, sockfd }; 475 sqe->addr = (uint64_t)buf; 476 sqe->len = len; 477 sqe->msg_flags = flags; 478 479 __submit_wait 480 #endif 481 } 482 483 ssize_t async_recv(int sockfd, void *buf, size_t len, int flags) { 484 #if !defined(IORING_OP_RECV) 485 return recv( sockfd, buf, len, flags ); 486 #else 487 __submit_prelude 488 489 (*sqe){ IORING_OP_RECV, sockfd }; 490 sqe->addr = (uint64_t)buf; 491 sqe->len = len; 492 sqe->msg_flags = flags; 493 494 __submit_wait 495 #endif 496 } 497 498 int async_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) { 499 #if !defined(IORING_OP_ACCEPT) 500 __SOCKADDR_ARG _addr; 501 _addr.__sockaddr__ = addr; 502 return accept4( sockfd, _addr, addrlen, flags ); 503 #else 504 __submit_prelude 505 506 (*sqe){ IORING_OP_ACCEPT, sockfd }; 507 sqe->addr = addr; 508 sqe->addr2 = addrlen; 509 sqe->accept_flags = flags; 510 511 __submit_wait 512 #endif 513 } 514 515 int async_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) { 516 #if !defined(IORING_OP_CONNECT) 517 __CONST_SOCKADDR_ARG _addr; 518 _addr.__sockaddr__ = addr; 519 return connect( sockfd, _addr, addrlen ); 520 #else 521 __submit_prelude 522 523 (*sqe){ IORING_OP_CONNECT, sockfd }; 524 sqe->addr = (uint64_t)addr; 525 sqe->off = addrlen; 526 527 __submit_wait 528 #endif 529 } 530 531 int async_fallocate(int fd, int mode, uint64_t offset, uint64_t len) { 532 #if !defined(IORING_OP_FALLOCATE) 533 return fallocate( fd, mode, offset, len ); 534 #else 535 __submit_prelude 536 537 (*sqe){ IORING_OP_FALLOCATE, fd }; 538 sqe->off = offset; 539 sqe->len = length; 540 sqe->mode = mode; 541 542 __submit_wait 543 #endif 544 } 545 546 int async_fadvise(int fd, uint64_t offset, uint64_t len, int advice) { 547 #if !defined(IORING_OP_FADVISE) 548 return posix_fadvise( fd, offset, len, advice ); 549 #else 550 __submit_prelude 551 552 (*sqe){ IORING_OP_FADVISE, fd }; 553 sqe->off = (uint64_t)offset; 554 sqe->len = length; 555 sqe->fadvise_advice = advice; 556 557 __submit_wait 558 #endif 559 } 560 561 int async_madvise(void *addr, size_t length, int advice) { 562 #if !defined(IORING_OP_MADVISE) 563 return madvise( addr, length, advice ); 564 #else 565 __submit_prelude 566 567 (*sqe){ IORING_OP_MADVISE, 0 }; 568 sqe->addr = (uint64_t)addr; 569 sqe->len = length; 570 sqe->fadvise_advice = advice; 571 572 __submit_wait 573 #endif 574 } 575 576 int async_openat(int dirfd, const char *pathname, int flags, mode_t mode) { 577 #if !defined(IORING_OP_OPENAT) 578 return openat( dirfd, pathname, flags, mode ); 579 #else 580 __submit_prelude 581 582 (*sqe){ IORING_OP_OPENAT, dirfd }; 583 sqe->addr = (uint64_t)pathname; 584 sqe->open_flags = flags; 585 sqe->mode = mode; 586 587 __submit_wait 588 #endif 589 } 590 591 int async_close(int fd) { 592 #if !defined(IORING_OP_CLOSE) 593 return close( fd ); 594 #else 595 __submit_prelude 596 597 (*sqe){ IORING_OP_CLOSE, fd }; 598 599 __submit_wait 600 #endif 601 } 602 603 int async_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf) { 604 #if !defined(IORING_OP_STATX) 605 //return statx( dirfd, pathname, flags, mask, statxbuf ); 606 return syscall( __NR_io_uring_setup, dirfd, pathname, flags, mask, statxbuf ); 607 #else 608 __submit_prelude 609 610 (*sqe){ IORING_OP_STATX, dirfd }; 611 sqe->addr = (uint64_t)pathname; 612 sqe->statx_flags = flags; 613 sqe->len = mask; 614 sqe->off = (uint64_t)statxbuf; 615 616 __submit_wait 617 #endif 618 } 619 620 621 ssize_t async_read(int fd, void *buf, size_t count) { 622 #if !defined(IORING_OP_READ) 623 return read( fd, buf, count ); 624 #else 625 __submit_prelude 626 627 (*sqe){ IORING_OP_READ, fd, buf, count, 0 }; 628 629 __submit_wait 630 #endif 631 } 632 633 ssize_t async_write(int fd, void *buf, size_t count) { 634 #if !defined(IORING_OP_WRITE) 635 return read( fd, buf, count ); 636 #else 637 __submit_prelude 638 639 (*sqe){ IORING_OP_WRITE, fd, buf, count, 0 }; 640 641 __submit_wait 642 #endif 643 } 478 ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 479 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READV) 480 return preadv2(fd, iov, iovcnt, offset, flags); 481 #else 482 __submit_prelude 483 484 (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset }; 485 486 __submit_wait 487 #endif 488 } 489 490 ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 491 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITEV) 492 return pwritev2(fd, iov, iovcnt, offset, flags); 493 #else 494 __submit_prelude 495 496 (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset }; 497 498 __submit_wait 499 #endif 500 } 501 502 int cfa_fsync(int fd) { 503 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FSYNC) 504 return fsync(fd); 505 #else 506 __submit_prelude 507 508 (*sqe){ IORING_OP_FSYNC, fd }; 509 510 __submit_wait 511 #endif 512 } 513 514 int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags) { 515 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SYNC_FILE_RANGE) 516 return sync_file_range(fd, offset, nbytes, flags); 517 #else 518 __submit_prelude 519 520 (*sqe){ IORING_OP_SYNC_FILE_RANGE, fd }; 521 sqe->off = offset; 522 sqe->len = nbytes; 523 sqe->sync_range_flags = flags; 524 525 __submit_wait 526 #endif 527 } 528 529 530 ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags) { 531 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SENDMSG) 532 return recv(sockfd, msg, flags); 533 #else 534 __submit_prelude 535 536 (*sqe){ IORING_OP_SENDMSG, sockfd, msg, 1, 0 }; 537 sqe->msg_flags = flags; 538 539 __submit_wait 540 #endif 541 } 542 543 ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags) { 544 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECVMSG) 545 return recv(sockfd, msg, flags); 546 #else 547 __submit_prelude 548 549 (*sqe){ IORING_OP_RECVMSG, sockfd, msg, 1, 0 }; 550 sqe->msg_flags = flags; 551 552 __submit_wait 553 #endif 554 } 555 556 ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags) { 557 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SEND) 558 return send( sockfd, buf, len, flags ); 559 #else 560 __submit_prelude 561 562 (*sqe){ IORING_OP_SEND, sockfd }; 563 sqe->addr = (uint64_t)buf; 564 sqe->len = len; 565 sqe->msg_flags = flags; 566 567 __submit_wait 568 #endif 569 } 570 571 ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags) { 572 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECV) 573 return recv( sockfd, buf, len, flags ); 574 #else 575 __submit_prelude 576 577 (*sqe){ IORING_OP_RECV, sockfd }; 578 sqe->addr = (uint64_t)buf; 579 sqe->len = len; 580 sqe->msg_flags = flags; 581 582 __submit_wait 583 #endif 584 } 585 586 int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) { 587 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_ACCEPT) 588 return accept4( sockfd, addr, addrlen, flags ); 589 #else 590 __submit_prelude 591 592 (*sqe){ IORING_OP_ACCEPT, sockfd }; 593 sqe->addr = addr; 594 sqe->addr2 = addrlen; 595 sqe->accept_flags = flags; 596 597 __submit_wait 598 #endif 599 } 600 601 int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) { 602 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CONNECT) 603 return connect( sockfd, addr, addrlen ); 604 #else 605 __submit_prelude 606 607 (*sqe){ IORING_OP_CONNECT, sockfd }; 608 sqe->addr = (uint64_t)addr; 609 sqe->off = addrlen; 610 611 __submit_wait 612 #endif 613 } 614 615 int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len) { 616 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FALLOCATE) 617 return fallocate( fd, mode, offset, len ); 618 #else 619 __submit_prelude 620 621 (*sqe){ IORING_OP_FALLOCATE, fd }; 622 sqe->off = offset; 623 sqe->len = length; 624 sqe->mode = mode; 625 626 __submit_wait 627 #endif 628 } 629 630 int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice) { 631 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FADVISE) 632 return posix_fadvise( fd, offset, len, advice ); 633 #else 634 __submit_prelude 635 636 (*sqe){ IORING_OP_FADVISE, fd }; 637 sqe->off = (uint64_t)offset; 638 sqe->len = length; 639 sqe->fadvise_advice = advice; 640 641 __submit_wait 642 #endif 643 } 644 645 int cfa_madvise(void *addr, size_t length, int advice) { 646 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_MADVISE) 647 return madvise( addr, length, advice ); 648 #else 649 __submit_prelude 650 651 (*sqe){ IORING_OP_MADVISE, 0 }; 652 sqe->addr = (uint64_t)addr; 653 sqe->len = length; 654 sqe->fadvise_advice = advice; 655 656 __submit_wait 657 #endif 658 } 659 660 int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode) { 661 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_OPENAT) 662 return openat( dirfd, pathname, flags, mode ); 663 #else 664 __submit_prelude 665 666 (*sqe){ IORING_OP_OPENAT, dirfd }; 667 sqe->addr = (uint64_t)pathname; 668 sqe->open_flags = flags; 669 sqe->mode = mode; 670 671 __submit_wait 672 #endif 673 } 674 675 int cfa_close(int fd) { 676 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CLOSE) 677 return close( fd ); 678 #else 679 __submit_prelude 680 681 (*sqe){ IORING_OP_CLOSE, fd }; 682 683 __submit_wait 684 #endif 685 } 686 687 int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf) { 688 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_STATX) 689 //return statx( dirfd, pathname, flags, mask, statxbuf ); 690 return syscall( __NR_io_uring_setup, dirfd, pathname, flags, mask, statxbuf ); 691 #else 692 __submit_prelude 693 694 (*sqe){ IORING_OP_STATX, dirfd }; 695 sqe->addr = (uint64_t)pathname; 696 sqe->statx_flags = flags; 697 sqe->len = mask; 698 sqe->off = (uint64_t)statxbuf; 699 700 __submit_wait 701 #endif 702 } 703 704 705 ssize_t cfa_read(int fd, void *buf, size_t count) { 706 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READ) 707 return read( fd, buf, count ); 708 #else 709 __submit_prelude 710 711 (*sqe){ IORING_OP_READ, fd, buf, count, 0 }; 712 713 __submit_wait 714 #endif 715 } 716 717 ssize_t cfa_write(int fd, void *buf, size_t count) { 718 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITE) 719 return read( fd, buf, count ); 720 #else 721 __submit_prelude 722 723 (*sqe){ IORING_OP_WRITE, fd, buf, count, 0 }; 724 725 __submit_wait 726 #endif 727 } 644 728 645 729 //----------------------------------------------------------------------------- … … 647 731 648 732 // Macro magic to reduce the size of the following switch case 649 650 651 652 653 654 bool is_async( fptr_t func ) {655 733 #define IS_DEFINED_APPLY(f, ...) f(__VA_ARGS__) 734 #define IS_DEFINED_SECOND(first, second, ...) second 735 #define IS_DEFINED_TEST(expansion) _CFA_IO_FEATURE_##expansion 736 #define IS_DEFINED(macro) IS_DEFINED_APPLY( IS_DEFINED_SECOND,IS_DEFINED_TEST(macro) false, true) 737 738 bool has_user_level_blocking( fptr_t func ) { 739 #if defined(HAVE_LINUX_IO_URING_H) 656 740 if( /*func == (fptr_t)preadv2 || */ 657 func == (fptr_t) async_preadv2 )741 func == (fptr_t)cfa_preadv2 ) 658 742 #define _CFA_IO_FEATURE_IORING_OP_READV , 659 743 return IS_DEFINED(IORING_OP_READV); 660 744 661 745 if( /*func == (fptr_t)pwritev2 || */ 662 func == (fptr_t)async_pwritev2 )746 func == (fptr_t)cfa_pwritev2 ) 663 747 #define _CFA_IO_FEATURE_IORING_OP_WRITEV , 664 748 return IS_DEFINED(IORING_OP_WRITEV); 665 749 666 750 if( /*func == (fptr_t)fsync || */ 667 func == (fptr_t)async_fsync )751 func == (fptr_t)cfa_fsync ) 668 752 #define _CFA_IO_FEATURE_IORING_OP_FSYNC , 669 753 return IS_DEFINED(IORING_OP_FSYNC); 670 754 671 755 if( /*func == (fptr_t)ync_file_range || */ 672 func == (fptr_t)async_sync_file_range )756 func == (fptr_t)cfa_sync_file_range ) 673 757 #define _CFA_IO_FEATURE_IORING_OP_SYNC_FILE_RANGE , 674 758 return IS_DEFINED(IORING_OP_SYNC_FILE_RANGE); 675 759 676 760 if( /*func == (fptr_t)sendmsg || */ 677 func == (fptr_t)async_sendmsg )761 func == (fptr_t)cfa_sendmsg ) 678 762 #define _CFA_IO_FEATURE_IORING_OP_SENDMSG , 679 763 return IS_DEFINED(IORING_OP_SENDMSG); 680 764 681 765 if( /*func == (fptr_t)recvmsg || */ 682 func == (fptr_t)async_recvmsg )766 func == (fptr_t)cfa_recvmsg ) 683 767 #define _CFA_IO_FEATURE_IORING_OP_RECVMSG , 684 768 return IS_DEFINED(IORING_OP_RECVMSG); 685 769 686 770 if( /*func == (fptr_t)send || */ 687 func == (fptr_t) async_send )771 func == (fptr_t)cfa_send ) 688 772 #define _CFA_IO_FEATURE_IORING_OP_SEND , 689 773 return IS_DEFINED(IORING_OP_SEND); 690 774 691 775 if( /*func == (fptr_t)recv || */ 692 func == (fptr_t) async_recv )776 func == (fptr_t)cfa_recv ) 693 777 #define _CFA_IO_FEATURE_IORING_OP_RECV , 694 778 return IS_DEFINED(IORING_OP_RECV); 695 779 696 780 if( /*func == (fptr_t)accept4 || */ 697 func == (fptr_t) async_accept4 )781 func == (fptr_t)cfa_accept4 ) 698 782 #define _CFA_IO_FEATURE_IORING_OP_ACCEPT , 699 783 return IS_DEFINED(IORING_OP_ACCEPT); 700 784 701 785 if( /*func == (fptr_t)connect || */ 702 func == (fptr_t) async_connect )786 func == (fptr_t)cfa_connect ) 703 787 #define _CFA_IO_FEATURE_IORING_OP_CONNECT , 704 788 return IS_DEFINED(IORING_OP_CONNECT); 705 789 706 790 if( /*func == (fptr_t)fallocate || */ 707 func == (fptr_t) async_fallocate )791 func == (fptr_t)cfa_fallocate ) 708 792 #define _CFA_IO_FEATURE_IORING_OP_FALLOCATE , 709 793 return IS_DEFINED(IORING_OP_FALLOCATE); 710 794 711 if( /*func == (fptr_t) fadvise || */712 func == (fptr_t) async_fadvise )795 if( /*func == (fptr_t)posix_fadvise || */ 796 func == (fptr_t)cfa_fadvise ) 713 797 #define _CFA_IO_FEATURE_IORING_OP_FADVISE , 714 798 return IS_DEFINED(IORING_OP_FADVISE); 715 799 716 800 if( /*func == (fptr_t)madvise || */ 717 func == (fptr_t) async_madvise )801 func == (fptr_t)cfa_madvise ) 718 802 #define _CFA_IO_FEATURE_IORING_OP_MADVISE , 719 803 return IS_DEFINED(IORING_OP_MADVISE); 720 804 721 805 if( /*func == (fptr_t)openat || */ 722 func == (fptr_t) async_openat )806 func == (fptr_t)cfa_openat ) 723 807 #define _CFA_IO_FEATURE_IORING_OP_OPENAT , 724 808 return IS_DEFINED(IORING_OP_OPENAT); 725 809 726 810 if( /*func == (fptr_t)close || */ 727 func == (fptr_t) async_close )811 func == (fptr_t)cfa_close ) 728 812 #define _CFA_IO_FEATURE_IORING_OP_CLOSE , 729 813 return IS_DEFINED(IORING_OP_CLOSE); 730 814 731 815 if( /*func == (fptr_t)statx || */ 732 func == (fptr_t) async_statx )816 func == (fptr_t)cfa_statx ) 733 817 #define _CFA_IO_FEATURE_IORING_OP_STATX , 734 818 return IS_DEFINED(IORING_OP_STATX); 735 819 736 820 if( /*func == (fptr_t)read || */ 737 func == (fptr_t)async_read )821 func == (fptr_t)cfa_read ) 738 822 #define _CFA_IO_FEATURE_IORING_OP_READ , 739 823 return IS_DEFINED(IORING_OP_READ); 740 824 741 825 if( /*func == (fptr_t)write || */ 742 func == (fptr_t)async_write )826 func == (fptr_t)cfa_write ) 743 827 #define _CFA_IO_FEATURE_IORING_OP_WRITE , 744 828 return IS_DEFINED(IORING_OP_WRITE); 745 746 return false; 747 } 748 749 #endif 829 #endif 830 831 return false; 832 } -
libcfa/src/concurrency/kernel.cfa
rc680a4b r44aad8f 257 257 ready_queue{}; 258 258 ready_queue_lock{}; 259 260 #if !defined(__CFA_NO_STATISTICS__) 261 print_stats = false; 262 #endif 259 263 260 264 procs{ __get }; … … 1004 1008 } 1005 1009 1010 bool V(semaphore & this, unsigned diff) with( this ) { 1011 $thread * thrd = 0p; 1012 lock( lock __cfaabi_dbg_ctx2 ); 1013 int release = max(-count, (int)diff); 1014 count += diff; 1015 for(release) { 1016 unpark( pop_head( waiting ) __cfaabi_dbg_ctx2 ); 1017 } 1018 1019 unlock( lock ); 1020 1021 return thrd != 0p; 1022 } 1023 1006 1024 //----------------------------------------------------------------------------- 1007 1025 // Global Queues -
libcfa/src/concurrency/kernel.hfa
rc680a4b r44aad8f 40 40 void P (semaphore & this); 41 41 bool V (semaphore & this); 42 bool V (semaphore & this, unsigned count); 42 43 43 44 … … 144 145 void * ring_ptr; 145 146 size_t ring_sz; 147 148 // Statistics 149 #if !defined(__CFA_NO_STATISTICS__) 150 struct { 151 struct { 152 unsigned long long int val; 153 unsigned long long int cnt; 154 } submit_avg; 155 } stats; 156 #endif 146 157 }; 147 158 … … 164 175 void * ring_ptr; 165 176 size_t ring_sz; 177 178 // Statistics 179 #if !defined(__CFA_NO_STATISTICS__) 180 struct { 181 struct { 182 unsigned long long int val; 183 unsigned long long int cnt; 184 } completed_avg; 185 } stats; 186 #endif 166 187 }; 167 188 … … 213 234 struct io_ring io; 214 235 #endif 236 237 #if !defined(__CFA_NO_STATISTICS__) 238 bool print_stats; 239 #endif 215 240 }; 216 241 extern Duration default_preemption(); … … 227 252 static inline struct processor * active_processor() { return TL_GET( this_processor ); } // UNSAFE 228 253 static inline struct cluster * active_cluster () { return TL_GET( this_processor )->cltr; } 254 255 #if !defined(__CFA_NO_STATISTICS__) 256 static inline void print_stats_at_exit( cluster & this ) { 257 this.print_stats = true; 258 } 259 #endif 229 260 230 261 // Local Variables: //
Note: See TracChangeset
for help on using the changeset viewer.