Changeset 342af53 for libcfa/src/concurrency/io
- Timestamp:
- Jan 14, 2021, 12:23:14 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum, stuck-waitfor-destruct
- Children:
- 8e4aa05
- Parents:
- 4468a70 (diff), ec19b21 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - Location:
- libcfa/src/concurrency/io
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io/call.cfa.in
r4468a70 r342af53 74 74 ; 75 75 76 extern [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data );76 extern [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ); 77 77 extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))); 78 78 … … 222 222 __u32 idx; 223 223 struct io_uring_sqe * sqe; 224 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future ); 225 226 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; 224 [(volatile struct io_uring_sqe *) sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future ); 225 227 226 sqe->opcode = IORING_OP_{op}; 228 sqe->flags = sflags;{body} 227 sqe->flags = sflags; 228 sqe->ioprio = 0; 229 sqe->fd = 0; 230 sqe->off = 0; 231 sqe->addr = 0; 232 sqe->len = 0; 233 sqe->fsync_flags = 0; 234 sqe->__pad2[0] = 0; 235 sqe->__pad2[1] = 0; 236 sqe->__pad2[2] = 0;{body} 237 238 asm volatile("": : :"memory"); 229 239 230 240 verify( sqe->user_data == (__u64)(uintptr_t)&future ); … … 312 322 }), 313 323 # CFA_HAVE_IORING_OP_ACCEPT 314 Call('ACCEPT 4', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', {324 Call('ACCEPT', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', { 315 325 'fd': 'sockfd', 316 'addr': ' addr',317 'addr2': ' addrlen',326 'addr': '(__u64)addr', 327 'addr2': '(__u64)addrlen', 318 328 'accept_flags': 'flags' 319 329 }), … … 464 474 465 475 print(""" 476 //----------------------------------------------------------------------------- 477 bool cancel(io_cancellation & this) { 478 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL) 479 return false; 480 #else 481 io_future_t future; 482 483 io_context * context = __get_io_context(); 484 485 __u8 sflags = 0; 486 struct __io_data & ring = *context->thrd.ring; 487 488 __u32 idx; 489 volatile struct io_uring_sqe * sqe; 490 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future ); 491 492 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; 493 sqe->opcode = IORING_OP_ASYNC_CANCEL; 494 sqe->flags = sflags; 495 sqe->addr = this.target; 496 497 verify( sqe->user_data == (__u64)(uintptr_t)&future ); 498 __submit( context, idx ); 499 500 wait(future); 501 502 if( future.result == 0 ) return true; // Entry found 503 if( future.result == -EALREADY) return true; // Entry found but in progress 504 if( future.result == -ENOENT ) return false; // Entry not found 505 return false; 506 #endif 507 } 508 466 509 //----------------------------------------------------------------------------- 467 510 // Check if a function is has asynchronous -
libcfa/src/concurrency/io/setup.cfa
r4468a70 r342af53 52 52 #include <pthread.h> 53 53 #include <sys/epoll.h> 54 #include <sys/eventfd.h> 54 55 #include <sys/mman.h> 55 56 #include <sys/syscall.h> … … 169 170 // Main loop 170 171 while( iopoll.run ) { 172 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n"); 173 171 174 // Wait for events 172 175 int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask ); 176 177 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds); 173 178 174 179 // Check if an error occured … … 181 186 $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64; 182 187 /* paranoid */ verify( io_ctx ); 183 __cfadbg_print_safe(io_core, "Kernel I/O : Unparking io poller %p\n", io_ctx);188 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->ring->fd, io_ctx); 184 189 #if !defined( __CFA_NO_STATISTICS__ ) 185 190 __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats; 186 191 #endif 192 193 eventfd_t v; 194 eventfd_read(io_ctx->ring->efd, &v); 195 187 196 post( io_ctx->sem ); 188 197 } … … 233 242 $thread & thrd = this.thrd.self; 234 243 if( cluster_context ) { 244 // We are about to do weird things with the threads 245 // we don't need interrupts to complicate everything 246 disable_interrupts(); 247 248 // Get cluster info 235 249 cluster & cltr = *thrd.curr_cluster; 236 250 /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster ); … … 239 253 // We need to adjust the clean-up based on where the thread is 240 254 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 255 // This is the tricky case 256 // The thread was preempted or ready to run and now it is on the ready queue 257 // but the cluster is shutting down, so there aren't any processors to run the ready queue 258 // the solution is to steal the thread from the ready-queue and pretend it was blocked all along 241 259 242 260 ready_schedule_lock(); 243 244 // This is the tricky case 245 // The thread was preempted and now it is on the ready queue 261 // The thread should on the list 262 /* paranoid */ verify( thrd.link.next != 0p ); 263 264 // Remove the thread from the ready queue of this cluster 246 265 // The thread should be the last on the list 247 /* paranoid */ verify( thrd.link.next != 0p );248 249 // Remove the thread from the ready queue of this cluster250 266 __attribute__((unused)) bool removed = remove_head( &cltr, &thrd ); 251 267 /* paranoid */ verify( removed ); … … 263 279 } 264 280 // !!! This is not an else if !!! 281 // Ok, now the thread is blocked (whether we cheated to get here or not) 265 282 if( thrd.state == Blocked ) { 266 267 283 // This is the "easy case" 268 284 // The thread is parked and can easily be moved to active cluster … … 274 290 } 275 291 else { 276 277 292 // The thread is in a weird state 278 293 // I don't know what to do here 279 294 abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n"); 280 295 } 296 297 // The weird thread kidnapping stuff is over, restore interrupts. 298 enable_interrupts( __cfaabi_dbg_ctx ); 281 299 } else { 282 300 post( this.thrd.sem ); … … 365 383 } 366 384 385 // Step 3 : Initialize the data structure 367 386 // Get the pointers from the kernel to fill the structure 368 387 // submit queue … … 379 398 const __u32 num = *sq.num; 380 399 for( i; num ) { 381 sq.sqes[i].user_data = 0ul64;400 __sqe_clean( &sq.sqes[i] ); 382 401 } 383 402 } … … 409 428 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes); 410 429 430 // Step 4 : eventfd 431 int efd; 432 for() { 433 efd = eventfd(0, 0); 434 if (efd < 0) { 435 if (errno == EINTR) continue; 436 abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno)); 437 } 438 break; 439 } 440 441 int ret; 442 for() { 443 ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1); 444 if (ret < 0) { 445 if (errno == EINTR) continue; 446 abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno)); 447 } 448 break; 449 } 450 411 451 // some paranoid checks 412 452 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask ); … … 423 463 this.ring_flags = params.flags; 424 464 this.fd = fd; 465 this.efd = efd; 425 466 this.eager_submits = params_in.eager_submits; 426 467 this.poller_submits = params_in.poller_submits; … … 445 486 // close the file descriptor 446 487 close(this.fd); 488 close(this.efd); 447 489 448 490 free( this.submit_q.ready ); // Maybe null, doesn't matter … … 452 494 // I/O Context Sleep 453 495 //============================================================================================= 454 455 void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev) { 456 ev.events = EPOLLIN | EPOLLONESHOT; 496 #define IOEVENTS EPOLLIN | EPOLLONESHOT 497 498 static inline void __ioctx_epoll_ctl($io_ctx_thread & ctx, int op, const char * error) { 499 struct epoll_event ev; 500 ev.events = IOEVENTS; 457 501 ev.data.u64 = (__u64)&ctx; 458 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, ctx.ring->fd, &ev);502 int ret = epoll_ctl(iopoll.epollfd, op, ctx.ring->efd, &ev); 459 503 if (ret < 0) { 460 abort( "KERNEL ERROR: EPOLL ADD - (%d) %s\n", (int)errno, strerror(errno) ); 461 } 462 } 463 464 void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev) { 465 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, ctx.ring->fd, &ev); 466 if (ret < 0) { 467 abort( "KERNEL ERROR: EPOLL REARM - (%d) %s\n", (int)errno, strerror(errno) ); 468 } 504 abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) ); 505 } 506 } 507 508 void __ioctx_register($io_ctx_thread & ctx) { 509 __ioctx_epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD"); 510 } 511 512 void __ioctx_prepare_block($io_ctx_thread & ctx) { 513 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.ring->fd, &ctx); 514 __ioctx_epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM"); 469 515 } 470 516 -
libcfa/src/concurrency/io/types.hfa
r4468a70 r342af53 65 65 66 66 // A buffer of sqes (not the actual ring) 67 struct io_uring_sqe * sqes;67 volatile struct io_uring_sqe * sqes; 68 68 69 69 // The location and size of the mmaped area … … 85 85 86 86 // the kernel ring 87 struct io_uring_cqe * cqes;87 volatile struct io_uring_cqe * cqes; 88 88 89 89 // The location and size of the mmaped area … … 97 97 __u32 ring_flags; 98 98 int fd; 99 int efd; 99 100 bool eager_submits:1; 100 101 bool poller_submits:1; … … 130 131 #endif 131 132 132 struct epoll_event;133 133 struct $io_ctx_thread; 134 void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev); 135 void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev); 134 void __ioctx_register($io_ctx_thread & ctx); 135 void __ioctx_prepare_block($io_ctx_thread & ctx); 136 void __sqe_clean( volatile struct io_uring_sqe * sqe ); 136 137 #endif 137 138
Note:
See TracChangeset
for help on using the changeset viewer.