Changeset 2d8f7b0
- Timestamp:
- Apr 14, 2020, 11:53:25 AM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 34d0a28
- Parents:
- 7df014f
- Location:
- libcfa/src/concurrency
- Files:
-
- 1 added
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified libcfa/src/concurrency/alarm.cfa ¶
r7df014f r2d8f7b0 176 176 } 177 177 178 //============================================================================================= 179 // Utilities 180 //============================================================================================= 181 182 void sleep( Duration duration ) { 183 alarm_node_t node = { active_thread(), __kernel_get_time() + duration, 0`s }; 184 185 register_self( &node ); 186 park( __cfaabi_dbg_ctx ); 187 188 /* paranoid */ verify( !node.set ); 189 /* paranoid */ verify( node.next == 0p ); 190 } 191 178 192 // Local Variables: // 179 193 // mode: c // -
TabularUnified libcfa/src/concurrency/io.cfa ¶
r7df014f r2d8f7b0 45 45 memset(¶ms, 0, sizeof(params)); 46 46 47 int fd = syscall(__NR_io_uring_setup, entries_per_cluster(), ¶ms ); 47 uint32_t nentries = entries_per_cluster(); 48 49 int fd = syscall(__NR_io_uring_setup, nentries, ¶ms ); 48 50 if(fd < 0) { 49 51 abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno)); … … 52 54 // Step 2 : mmap result 53 55 memset(&this.io, 0, sizeof(struct io_ring)); 54 struct io_uring_sq * sq = &this.io.submit_q;55 struct io_uring_cq * cq = &this.io.completion_q;56 struct io_uring_sq & sq = this.io.submit_q; 57 struct io_uring_cq & cq = this.io.completion_q; 56 58 57 59 // calculate the right ring size 58 sq ->ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) );59 cq ->ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe));60 sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) ); 61 cq.ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe)); 60 62 61 63 // Requires features … … 66 68 67 69 // mmap the Submit Queue into existence 68 sq ->ring_ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);69 if (sq ->ring_ptr == (void*)MAP_FAILED) {70 sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING); 71 if (sq.ring_ptr == (void*)MAP_FAILED) { 70 72 abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno)); 71 73 } … … 78 80 // else { 79 81 // We need multiple call to MMAP 80 cq ->ring_ptr = mmap(0, cq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);81 if (cq ->ring_ptr == (void*)MAP_FAILED) {82 munmap(sq ->ring_ptr, sq->ring_sz);82 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING); 83 if (cq.ring_ptr == (void*)MAP_FAILED) { 84 munmap(sq.ring_ptr, sq.ring_sz); 83 85 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno)); 84 86 } … … 87 89 // mmap the submit queue entries 88 90 size_t size = params.sq_entries * sizeof(struct io_uring_sqe); 89 sq ->sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);90 if (sq ->sqes == (struct io_uring_sqe *)MAP_FAILED) {91 munmap(sq ->ring_ptr, sq->ring_sz);92 if (cq ->ring_ptr != sq->ring_ptr) munmap(cq->ring_ptr, cq->ring_sz);91 sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES); 92 if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) { 93 munmap(sq.ring_ptr, sq.ring_sz); 94 if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz); 93 95 abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno)); 94 96 } … … 96 98 // Get the pointers from the kernel to fill the structure 97 99 // submit queue 98 sq->head = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.head; 99 sq->tail = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.tail; 100 sq->mask = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.ring_mask; 101 sq->entries = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.ring_entries; 102 sq->flags = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.flags; 103 sq->dropped = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.dropped; 104 sq->array = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.array; 100 sq.head = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head); 101 sq.tail = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail); 102 sq.mask = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask); 103 sq.num = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries); 104 sq.flags = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags); 105 sq.dropped = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 106 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 107 sq.alloc = *sq.tail; 105 108 106 109 // completion queue 107 cq->head = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.head; 108 cq->tail = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.tail; 109 cq->mask = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.ring_mask; 110 cq->entries = (struct io_uring_cqe *)((intptr_t)cq->ring_ptr) + params.cq_off.ring_entries; 111 cq->overflow = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.overflow; 112 cq->cqes = (struct io_uring_cqe *)((intptr_t)cq->ring_ptr) + params.cq_off.cqes; 110 cq.head = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head); 111 cq.tail = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail); 112 cq.mask = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask); 113 cq.num = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries); 114 cq.overflow = ( uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow); 115 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes); 116 117 // some paranoid checks 118 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask ); 119 /* paranoid */ verifyf( (*cq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num ); 120 /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head ); 121 /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail ); 122 123 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask ); 124 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num ); 125 /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head ); 126 /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail ); 113 127 114 128 // Update the global ring info … … 116 130 this.io.fd = fd; 117 131 this.io.done = false; 132 (this.io.submit){ min(*sq.num, *cq.num) }; 118 133 119 134 // Create the poller thread … … 137 152 138 153 // unmap the submit queue entries 139 munmap(sq.sqes, *sq.entries* sizeof(struct io_uring_sqe));154 munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe)); 140 155 141 156 // unmap the Submit Queue ring … … 173 188 174 189 struct io_user_data * data = (struct io_user_data *)cqe.user_data; 190 __cfaabi_bits_print_safe( STDERR_FILENO, "Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd ); 191 175 192 data->result = cqe.res; 176 unpark( data->thrd __cfaabi_dbg_ctx2 ); 193 __unpark( data->thrd __cfaabi_dbg_ctx2 ); 194 195 // Allow new submissions to happen 196 V(ring.submit); 177 197 178 198 // Mark to the kernel that the cqe has been seen 179 199 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 180 __atomic_fetch_add( ring.completion_q.head, 1, __ATOMIC_REL EASE);200 __atomic_fetch_add( ring.completion_q.head, 1, __ATOMIC_RELAXED ); 181 201 182 202 return true; … … 200 220 LOOP: while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 201 221 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, 1, IORING_ENTER_GETEVENTS, &mask, _NSIG / 8); 222 __cfaabi_bits_print_safe( STDERR_FILENO, "Performed io_wait, returned %d errmsg %s\n", ret, strerror(errno) ); 202 223 if( ret < 0 ) { 203 224 switch((int)errno) { … … 221 242 //============================================================================================= 222 243 223 [* struct io_uring_sqe, uint32_t] io_submit_prelude( struct io_ring & ring ) { 224 return [0p, 0]; 244 // Submition steps : 245 // 1 - We need to make sure we don't overflow any of the buffer, P(ring.submit) to make sure 246 // entries are available. The semaphore make sure that there is no more operations in 247 // progress then the number of entries in the buffer. This probably limits concurrency 248 // more than necessary since submitted but not completed operations don't need any 249 // entries in user space. However, I don't know what happens if we overflow the buffers 250 // because too many requests completed at once. This is a safe approach in all cases. 251 // Furthermore, with hundreds of entries, this may be okay. 252 // 253 // 2 - Allocate a queue entry. The ring already has memory for all entries but only the ones 254 // listed in sq.array are visible by the kernel. For those not listed, the kernel does not 255 // offer any assurance that an entry is not being filled by multiple flags. Therefore, we 256 // need to write an allocator that allows allocating concurrently. 257 // 258 // 3 - Actually fill the submit entry, this is the only simple and straightforward step. 259 // 260 // 4 - Append the entry index to the array and adjust the tail accordingly. This operation 261 // needs to arrive to two concensus at the same time: 262 // A - The order in which entries are listed in the array: no two threads must pick the 263 // same index for their entries 264 // B - When can the tail be update for the kernel. EVERY entries in the array between 265 // head and tail must be fully filled and shouldn't ever be touched again. 266 // 267 268 static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct io_ring & ring ) { 269 // Wait for a spot to be available 270 P(ring.submit); 271 272 // Allocate the sqe 273 uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST); 274 275 // Validate that we didn't overflow anything 276 // Check that nothing overflowed 277 /* paranoid */ verify( true ); 278 279 // Check that it goes head -> tail -> alloc and never head -> alloc -> tail 280 /* paranoid */ verify( true ); 281 282 // Return the sqe 283 return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx]; 284 } 285 286 static inline void __submit( struct io_ring & ring, uint32_t idx ) { 287 // get mutual exclusion 288 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 289 290 // Append to the list of ready entries 291 uint32_t * tail = ring.submit_q.tail; 292 const uint32_t mask = *ring.submit_q.mask; 293 294 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 295 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 296 297 // Submit however, many entries need to be submitted 298 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0); 299 __cfaabi_bits_print_safe( STDERR_FILENO, "Performed io_submit, returned %d\n", ret ); 300 if( ret < 0 ) { 301 switch((int)errno) { 302 default: 303 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 304 } 305 } 306 307 unlock(ring.submit_q.lock); 308 // Make sure that idx was submitted 309 // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us 310 } 311 312 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) { 313 this.opcode = opcode; 314 #if !defined(IOSQE_ASYNC) 315 this.flags = 0; 316 #else 317 this.flags = IOSQE_ASYNC; 318 #endif 319 this.ioprio = 0; 320 this.fd = fd; 321 this.off = 0; 322 this.addr = 0; 323 this.len = 0; 324 this.rw_flags = 0; 325 this.__pad2[0] = this.__pad2[1] = this.__pad2[2] = 0; 326 } 327 328 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd, void * addr, uint32_t len, uint64_t off ) { 329 (this){ opcode, fd }; 330 this.off = off; 331 this.addr = (uint64_t)addr; 332 this.len = len; 225 333 } 226 334 … … 228 336 // I/O Interface 229 337 //============================================================================================= 230 231 /*232 338 extern "C" { 233 339 #define __USE_GNU 340 #define _GNU_SOURCE 341 #include <fcntl.h> 234 342 #include <sys/uio.h> 235 } 236 343 #include <sys/socket.h> 344 #include <sys/stat.h> 345 } 346 347 #define __submit_prelude \ 348 struct io_ring & ring = active_cluster()->io; \ 349 struct io_uring_sqe * sqe; \ 350 uint32_t idx; \ 351 [sqe, idx] = __submit_alloc( ring ); 352 353 #define __submit_wait \ 354 io_user_data data = { 0, active_thread() }; \ 355 __cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd ); \ 356 sqe->user_data = (uint64_t)&data; \ 357 __submit( ring, idx ); \ 358 park( __cfaabi_dbg_ctx ); \ 359 return data.result; 360 361 //----------------------------------------------------------------------------- 362 // Asynchronous operations 237 363 ssize_t async_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 238 364 #if !defined(IORING_OP_READV) 239 365 return preadv2(fd, iov, iovcnt, offset, flags); 240 366 #else 241 sqe->opcode = IORING_OP_READV; 242 sqe->flags = 0; 243 sqe->ioprio = 0; 244 sqe->fd = fd; 245 sqe->off = offset; 246 sqe->addr = (unsigned long) iov; 247 sqe->len = iovcnt; 248 sqe->rw_flags = 0; 249 sqe->user_data = 0; 250 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; 367 __submit_prelude 368 369 (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset }; 370 371 __submit_wait 251 372 #endif 252 373 } … … 256 377 return pwritev2(fd, iov, iovcnt, offset, flags); 257 378 #else 258 #warning not implemented 259 #endif 260 } 261 262 bool is_async( void (*func)() ) { 263 switch((uintptr_t)func) { 264 case (uintptr_t)preadv2: 265 case (uintptr_t)async_preadv2: 266 #if defined(IORING_OP_READV) 267 return true; 268 #else 269 return false; 270 #endif 271 default: 272 return false; 273 } 274 } 275 */ 379 __submit_prelude 380 381 (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset }; 382 383 __submit_wait 384 #endif 385 } 386 387 int async_fsync(int fd) { 388 #if !defined(IORING_OP_FSYNC) 389 return fsync(fd); 390 #else 391 __submit_prelude 392 393 (*sqe){ IORING_OP_FSYNC, fd }; 394 395 __submit_wait 396 #endif 397 } 398 399 int async_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags) { 400 #if !defined(IORING_OP_SYNC_FILE_RANGE) 401 return sync_file_range(fd, offset, nbytes, flags); 402 #else 403 __submit_prelude 404 405 (*sqe){ IORING_OP_SYNC_FILE_RANGE, fd }; 406 sqe->off = offset; 407 sqe->len = nbytes; 408 sqe->sync_range_flags = flags; 409 410 __submit_wait 411 #endif 412 } 413 414 415 ssize_t async_sendmsg(int sockfd, const struct msghdr *msg, int flags) { 416 #if !defined(IORING_OP_SENDMSG) 417 return recv(sockfd, msg, flags); 418 #else 419 __submit_prelude 420 421 (*sqe){ IORING_OP_SENDMSG, sockfd, msg, 1, 0 }; 422 sqe->msg_flags = flags; 423 424 __submit_wait 425 #endif 426 } 427 428 ssize_t async_recvmsg(int sockfd, struct msghdr *msg, int flags) { 429 #if !defined(IORING_OP_RECVMSG) 430 return recv(sockfd, msg, flags); 431 #else 432 __submit_prelude 433 434 (*sqe){ IORING_OP_RECVMSG, sockfd, msg, 1, 0 }; 435 sqe->msg_flags = flags; 436 437 __submit_wait 438 #endif 439 } 440 441 ssize_t async_send(int sockfd, const void *buf, size_t len, int flags) { 442 #if !defined(IORING_OP_SEND) 443 return send( sockfd, buf, len, flags ); 444 #else 445 __submit_prelude 446 447 (*sqe){ IORING_OP_SEND, sockfd }; 448 sqe->addr = (uint64_t)buf; 449 sqe->len = len; 450 sqe->msg_flags = flags; 451 452 __submit_wait 453 #endif 454 } 455 456 ssize_t async_recv(int sockfd, void *buf, size_t len, int flags) { 457 #if !defined(IORING_OP_RECV) 458 return recv( sockfd, buf, len, flags ); 459 #else 460 __submit_prelude 461 462 (*sqe){ IORING_OP_RECV, sockfd }; 463 sqe->addr = (uint64_t)buf; 464 sqe->len = len; 465 sqe->msg_flags = flags; 466 467 __submit_wait 468 #endif 469 } 470 471 int async_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) { 472 #if !defined(IORING_OP_ACCEPT) 473 __SOCKADDR_ARG _addr; 474 _addr.__sockaddr__ = addr; 475 return accept4( sockfd, _addr, addrlen, flags ); 476 #else 477 __submit_prelude 478 479 (*sqe){ IORING_OP_ACCEPT, sockfd }; 480 sqe->addr = addr; 481 sqe->addr2 = addrlen; 482 sqe->accept_flags = flags; 483 484 __submit_wait 485 #endif 486 } 487 488 int async_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) { 489 #if !defined(IORING_OP_CONNECT) 490 __CONST_SOCKADDR_ARG _addr; 491 _addr.__sockaddr__ = addr; 492 return connect( sockfd, _addr, addrlen ); 493 #else 494 __submit_prelude 495 496 (*sqe){ IORING_OP_CONNECT, sockfd }; 497 sqe->addr = (uint64_t)addr; 498 sqe->off = addrlen; 499 500 __submit_wait 501 #endif 502 } 503 504 int async_fallocate(int fd, int mode, uint64_t offset, uint64_t len) { 505 #if !defined(IORING_OP_FALLOCATE) 506 return fallocate( fd, mode, offset, len ); 507 #else 508 __submit_prelude 509 510 (*sqe){ IORING_OP_FALLOCATE, fd }; 511 sqe->off = offset; 512 sqe->len = length; 513 sqe->mode = mode; 514 515 __submit_wait 516 #endif 517 } 518 519 int async_fadvise(int fd, uint64_t offset, uint64_t len, int advice) { 520 #if !defined(IORING_OP_FADVISE) 521 return posix_fadvise( fd, offset, len, advice ); 522 #else 523 __submit_prelude 524 525 (*sqe){ IORING_OP_FADVISE, fd }; 526 sqe->off = (uint64_t)offset; 527 sqe->len = length; 528 sqe->fadvise_advice = advice; 529 530 __submit_wait 531 #endif 532 } 533 534 int async_madvise(void *addr, size_t length, int advice) { 535 #if !defined(IORING_OP_MADVISE) 536 return madvise( addr, length, advice ); 537 #else 538 __submit_prelude 539 540 (*sqe){ IORING_OP_MADVISE, 0 }; 541 sqe->addr = (uint64_t)addr; 542 sqe->len = length; 543 sqe->fadvise_advice = advice; 544 545 __submit_wait 546 #endif 547 } 548 549 int async_openat(int dirfd, const char *pathname, int flags, mode_t mode) { 550 #if !defined(IORING_OP_OPENAT) 551 return openat( dirfd, pathname, flags, mode ); 552 #else 553 __submit_prelude 554 555 (*sqe){ IORING_OP_OPENAT, dirfd }; 556 sqe->addr = (uint64_t)pathname; 557 sqe->open_flags = flags; 558 sqe->mode = mode; 559 560 __submit_wait 561 #endif 562 } 563 564 int async_close(int fd) { 565 #if !defined(IORING_OP_CLOSE) 566 return close( fd ); 567 #else 568 __submit_prelude 569 570 (*sqe){ IORING_OP_CLOSE, fd }; 571 572 __submit_wait 573 #endif 574 } 575 576 int async_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf) { 577 #if !defined(IORING_OP_STATX) 578 return statx( dirfd, pathname, flags, mask, statxbuf ); 579 #else 580 __submit_prelude 581 582 (*sqe){ IORING_OP_STATX, dirfd }; 583 sqe->addr = (uint64_t)pathname; 584 sqe->statx_flags = flags; 585 sqe->len = mask; 586 sqe->off = (uint64_t)statxbuf; 587 588 __submit_wait 589 #endif 590 } 591 592 593 ssize_t async_read(int fd, void *buf, size_t count) { 594 #if !defined(IORING_OP_READ) 595 return read( fd, buf, count ); 596 #else 597 __submit_prelude 598 599 (*sqe){ IORING_OP_READ, fd, buf, count, 0 }; 600 601 __submit_wait 602 #endif 603 } 604 605 ssize_t async_write(int fd, void *buf, size_t count) { 606 #if !defined(IORING_OP_WRITE) 607 return read( fd, buf, count ); 608 #else 609 __submit_prelude 610 611 (*sqe){ IORING_OP_WRITE, fd, buf, count, 0 }; 612 613 __submit_wait 614 #endif 615 } 616 617 //----------------------------------------------------------------------------- 618 // Check if a function is asynchronous 619 620 // Macro magic to reduce the size of the following switch case 621 #define IS_DEFINED_APPLY(f, ...) f(__VA_ARGS__) 622 #define IS_DEFINED_SECOND(first, second, ...) second 623 #define IS_DEFINED_TEST(expansion) _CFA_IO_FEATURE_##expansion 624 #define IS_DEFINED(macro) IS_DEFINED_APPLY( IS_DEFINED_SECOND,IS_DEFINED_TEST(macro) false, true) 625 626 bool is_async( fptr_t func ) { 627 628 if( /*func == (fptr_t)preadv2 || */ 629 func == (fptr_t)async_preadv2 ) 630 #define _CFA_IO_FEATURE_IORING_OP_READV , 631 return IS_DEFINED(IORING_OP_READV); 632 633 if( /*func == (fptr_t)pwritev2 || */ 634 func == (fptr_t)async_pwritev2 ) 635 #define _CFA_IO_FEATURE_IORING_OP_WRITEV , 636 return IS_DEFINED(IORING_OP_WRITEV); 637 638 if( /*func == (fptr_t)fsync || */ 639 func == (fptr_t)async_fsync ) 640 #define _CFA_IO_FEATURE_IORING_OP_FSYNC , 641 return IS_DEFINED(IORING_OP_FSYNC); 642 643 if( /*func == (fptr_t)ync_file_range || */ 644 func == (fptr_t)async_sync_file_range ) 645 #define _CFA_IO_FEATURE_IORING_OP_SYNC_FILE_RANGE , 646 return IS_DEFINED(IORING_OP_SYNC_FILE_RANGE); 647 648 if( /*func == (fptr_t)sendmsg || */ 649 func == (fptr_t)async_sendmsg ) 650 #define _CFA_IO_FEATURE_IORING_OP_SENDMSG , 651 return IS_DEFINED(IORING_OP_SENDMSG); 652 653 if( /*func == (fptr_t)recvmsg || */ 654 func == (fptr_t)async_recvmsg ) 655 #define _CFA_IO_FEATURE_IORING_OP_RECVMSG , 656 return IS_DEFINED(IORING_OP_RECVMSG); 657 658 if( /*func == (fptr_t)send || */ 659 func == (fptr_t)async_send ) 660 #define _CFA_IO_FEATURE_IORING_OP_SEND , 661 return IS_DEFINED(IORING_OP_SEND); 662 663 if( /*func == (fptr_t)recv || */ 664 func == (fptr_t)async_recv ) 665 #define _CFA_IO_FEATURE_IORING_OP_RECV , 666 return IS_DEFINED(IORING_OP_RECV); 667 668 if( /*func == (fptr_t)accept4 || */ 669 func == (fptr_t)async_accept4 ) 670 #define _CFA_IO_FEATURE_IORING_OP_ACCEPT , 671 return IS_DEFINED(IORING_OP_ACCEPT); 672 673 if( /*func == (fptr_t)connect || */ 674 func == (fptr_t)async_connect ) 675 #define _CFA_IO_FEATURE_IORING_OP_CONNECT , 676 return IS_DEFINED(IORING_OP_CONNECT); 677 678 if( /*func == (fptr_t)fallocate || */ 679 func == (fptr_t)async_fallocate ) 680 #define _CFA_IO_FEATURE_IORING_OP_FALLOCATE , 681 return IS_DEFINED(IORING_OP_FALLOCATE); 682 683 if( /*func == (fptr_t)fadvise || */ 684 func == (fptr_t)async_fadvise ) 685 #define _CFA_IO_FEATURE_IORING_OP_FADVISE , 686 return IS_DEFINED(IORING_OP_FADVISE); 687 688 if( /*func == (fptr_t)madvise || */ 689 func == (fptr_t)async_madvise ) 690 #define _CFA_IO_FEATURE_IORING_OP_MADVISE , 691 return IS_DEFINED(IORING_OP_MADVISE); 692 693 if( /*func == (fptr_t)openat || */ 694 func == (fptr_t)async_openat ) 695 #define _CFA_IO_FEATURE_IORING_OP_OPENAT , 696 return IS_DEFINED(IORING_OP_OPENAT); 697 698 if( /*func == (fptr_t)close || */ 699 func == (fptr_t)async_close ) 700 #define _CFA_IO_FEATURE_IORING_OP_CLOSE , 701 return IS_DEFINED(IORING_OP_CLOSE); 702 703 if( /*func == (fptr_t)statx || */ 704 func == (fptr_t)async_statx ) 705 #define _CFA_IO_FEATURE_IORING_OP_STATX , 706 return IS_DEFINED(IORING_OP_STATX); 707 708 if( /*func == (fptr_t)read || */ 709 func == (fptr_t)async_read ) 710 #define _CFA_IO_FEATURE_IORING_OP_READ , 711 return IS_DEFINED(IORING_OP_READ); 712 713 if( /*func == (fptr_t)write || */ 714 func == (fptr_t)async_write ) 715 #define _CFA_IO_FEATURE_IORING_OP_WRITE , 716 return IS_DEFINED(IORING_OP_WRITE); 717 718 return false; 719 } 276 720 277 721 #endif -
TabularUnified libcfa/src/concurrency/kernel.cfa ¶
r7df014f r2d8f7b0 615 615 } 616 616 617 void unpark( $thread * thrd __cfaabi_dbg_ctx_param2 ) { 618 if( !thrd ) return; 619 620 disable_interrupts(); 617 // KERNEL ONLY unpark with out disabling interrupts 618 void __unpark( $thread * thrd __cfaabi_dbg_ctx_param2 ) { 621 619 static_assert(sizeof(thrd->state) == sizeof(int)); 622 620 … … 647 645 abort(); 648 646 } 647 } 648 649 void unpark( $thread * thrd __cfaabi_dbg_ctx_param2 ) { 650 if( !thrd ) return; 651 652 disable_interrupts(); 653 __unpark( thrd __cfaabi_dbg_ctx_fwd2 ); 649 654 enable_interrupts( __cfaabi_dbg_ctx ); 650 655 } -
TabularUnified libcfa/src/concurrency/kernel.hfa ¶
r7df014f r2d8f7b0 115 115 #if defined(HAVE_LINUX_IO_URING_H) 116 116 struct io_uring_sq { 117 uint32_t * head; 118 uint32_t * tail; 119 uint32_t * mask; 120 uint32_t * entries; 121 uint32_t * flags; 122 uint32_t * dropped; 123 uint32_t * array; 124 struct io_uring_sqe * sqes; 125 126 uint32_t sqe_head; 127 uint32_t sqe_tail; 128 129 size_t ring_sz; 130 void * ring_ptr; 131 }; 132 133 struct io_uring_cq { 117 // Head and tail of the ring (associated with array) 134 118 volatile uint32_t * head; 135 119 volatile uint32_t * tail; 136 uint32_t * mask; 137 struct io_uring_cqe * entries; 120 121 // The actual kernel ring which uses head/tail 122 // indexes into the sqes arrays 123 uint32_t * array; 124 125 // number of entries and mask to go with it 126 const uint32_t * num; 127 const uint32_t * mask; 128 129 // Submission flags (Not sure what for) 130 uint32_t * flags; 131 132 // number of sqes not submitted (whatever that means) 133 uint32_t * dropped; 134 135 // Like head/tail but not seen by the kernel 136 volatile uint32_t alloc; 137 138 __spinlock_t lock; 139 140 // A buffer of sqes (not the actual ring) 141 struct io_uring_sqe * sqes; 142 143 // The location and size of the mmaped area 144 void * ring_ptr; 145 size_t ring_sz; 146 }; 147 148 struct io_uring_cq { 149 // Head and tail of the ring 150 volatile uint32_t * head; 151 volatile uint32_t * tail; 152 153 // number of entries and mask to go with it 154 const uint32_t * mask; 155 const uint32_t * num; 156 157 // number of cqes not submitted (whatever that means) 138 158 uint32_t * overflow; 159 160 // the kernel ring 139 161 struct io_uring_cqe * cqes; 140 162 163 // The location and size of the mmaped area 164 void * ring_ptr; 141 165 size_t ring_sz; 142 void * ring_ptr;143 166 }; 144 167 … … 151 174 void * stack; 152 175 volatile bool done; 176 semaphore submit; 153 177 }; 154 178 #endif -
TabularUnified libcfa/src/concurrency/kernel_private.hfa ¶
r7df014f r2d8f7b0 70 70 ) 71 71 72 // KERNEL ONLY unpark with out disabling interrupts 73 void __unpark( $thread * thrd __cfaabi_dbg_ctx_param2 ); 74 72 75 //----------------------------------------------------------------------------- 73 76 // I/O -
TabularUnified libcfa/src/concurrency/preemption.cfa ¶
r7df014f r2d8f7b0 268 268 // reserved for future use 269 269 static void timeout( $thread * this ) { 270 //TODO : implement waking threads270 __unpark( this __cfaabi_dbg_ctx2 ); 271 271 } 272 272 -
TabularUnified libcfa/src/concurrency/thread.hfa ¶
r7df014f r2d8f7b0 117 117 } 118 118 119 //---------- 120 // sleep: force thread to block and be rescheduled after Duration duration 121 void sleep( Duration duration ); 122 119 123 // Local Variables: // 120 124 // mode: c //
Note: See TracChangeset
for help on using the changeset viewer.