Changeset ece0e80 for libcfa/src/concurrency
- Timestamp:
- Jan 9, 2021, 4:27:57 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 561dd26
- Parents:
- 35fd2c4
- Location:
- libcfa/src/concurrency
- Files:
- 
      - 3 edited
 
 - 
          
  io.cfa (modified) (8 diffs)
- 
          
  io/call.cfa.in (modified) (1 diff)
- 
          
  io/setup.cfa (modified) (7 diffs)
 
Legend:
- Unmodified
- Added
- Removed
- 
      libcfa/src/concurrency/io.cfar35fd2c4 rece0e80 134 134 int ret = 0; 135 135 if( need_sys_to_submit || need_sys_to_complete ) { 136 __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags); 136 137 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8); 137 138 if( ret < 0 ) { … … 230 231 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring); 231 232 232 int reset = 0; 233 const int reset_cnt = 5; 234 int reset = reset_cnt; 233 235 // Then loop until we need to start 236 LOOP: 234 237 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) { 235 238 // Drain the io … … 239 242 [count, again] = __drain_io( *this.ring ); 240 243 241 if(!again) reset ++;244 if(!again) reset--; 242 245 243 246 // Update statistics … … 249 252 250 253 // If we got something, just yield and check again 251 if(reset < 5) {254 if(reset > 1) { 252 255 yield(); 253 } 254 // We didn't get anything baton pass to the slow poller 255 else { 256 continue LOOP; 257 } 258 259 // We alread failed to find events a few time. 260 if(reset == 1) { 261 // Rearm the context so it can block 262 // but don't block right away 263 // we need to retry one last time in case 264 // something completed *just now* 265 __ioctx_prepare_block( this, ev ); 266 continue LOOP; 267 } 268 256 269 __STATS__( false, 257 270 io.complete_q.blocks += 1; 258 271 ) 259 272 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self); 260 reset = 0;261 273 262 274 // block this thread 263 __ioctx_prepare_block( this, ev );264 275 wait( this.sem ); 265 } 276 277 // restore counter 278 reset = reset_cnt; 266 279 } 267 280 … … 319 332 ) 320 333 334 __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data ); 321 335 322 336 // Success return the data … … 376 390 377 391 void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) { 392 __cfadbg_print_safe( io, "Kernel I/O : submitting %u for %p\n", idx, active_thread() ); 393 378 394 __io_data & ring = *ctx->thrd.ring; 379 395 // Get now the data we definetely need … … 443 459 unlock(ring.submit_q.submit_lock); 444 460 #endif 445 if( ret < 0 ) return; 461 if( ret < 0 ) { 462 return; 463 } 446 464 447 465 // Release the consumed SQEs … … 454 472 io.submit_q.submit_avg.cnt += 1; 455 473 ) 474 475 __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() ); 456 476 } 457 477 else { 
- 
      libcfa/src/concurrency/io/call.cfa.inr35fd2c4 rece0e80 464 464 465 465 print(""" 466 //----------------------------------------------------------------------------- 467 bool cancel(io_cancellation & this) { 468 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL) 469 return false; 470 #else 471 io_future_t future; 472 473 io_context * context = __get_io_context(); 474 475 __u8 sflags = 0; 476 struct __io_data & ring = *context->thrd.ring; 477 478 __u32 idx; 479 struct io_uring_sqe * sqe; 480 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future ); 481 482 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; 483 sqe->opcode = IORING_OP_ASYNC_CANCEL; 484 sqe->flags = sflags; 485 sqe->addr = this.target; 486 487 verify( sqe->user_data == (__u64)(uintptr_t)&future ); 488 __submit( context, idx ); 489 490 wait(future); 491 492 if( future.result == 0 ) return true; // Entry found 493 if( future.result == -EALREADY) return true; // Entry found but in progress 494 if( future.result == -ENOENT ) return false; // Entry not found 495 return false; 496 #endif 497 } 498 466 499 //----------------------------------------------------------------------------- 467 500 // Check if a function is has asynchronous 
- 
      libcfa/src/concurrency/io/setup.cfar35fd2c4 rece0e80 169 169 // Main loop 170 170 while( iopoll.run ) { 171 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n"); 172 171 173 // Wait for events 172 174 int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask ); 175 176 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds); 173 177 174 178 // Check if an error occured … … 181 185 $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64; 182 186 /* paranoid */ verify( io_ctx ); 183 __cfadbg_print_safe(io_core, "Kernel I/O : Unparking io poller %p\n", io_ctx);187 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %p\n", io_ctx); 184 188 #if !defined( __CFA_NO_STATISTICS__ ) 185 189 __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats; … … 233 237 $thread & thrd = this.thrd.self; 234 238 if( cluster_context ) { 239 // We are about to do weird things with the threads 240 // we don't need interrupts to complicate everything 241 disable_interrupts(); 242 243 // Get cluster info 235 244 cluster & cltr = *thrd.curr_cluster; 236 245 /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster ); … … 239 248 // We need to adjust the clean-up based on where the thread is 240 249 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 250 // This is the tricky case 251 // The thread was preempted or ready to run and now it is on the ready queue 252 // but the cluster is shutting down, so there aren't any processors to run the ready queue 253 // the solution is to steal the thread from the ready-queue and pretend it was blocked all along 241 254 242 255 ready_schedule_lock(); 243 244 // This is the tricky case 245 // The thread was preempted and now it is on the ready queue 256 // The thread should on the list 257 /* paranoid */ verify( thrd.link.next != 0p ); 258 259 // Remove the thread from the ready queue of this cluster 246 260 // The thread should be the last on the list 247 /* paranoid */ verify( thrd.link.next != 0p );248 249 // Remove the thread from the ready queue of this cluster250 261 __attribute__((unused)) bool removed = remove_head( &cltr, &thrd ); 251 262 /* paranoid */ verify( removed ); … … 263 274 } 264 275 // !!! This is not an else if !!! 276 // Ok, now the thread is blocked (whether we cheated to get here or not) 265 277 if( thrd.state == Blocked ) { 266 267 278 // This is the "easy case" 268 279 // The thread is parked and can easily be moved to active cluster … … 274 285 } 275 286 else { 276 277 287 // The thread is in a weird state 278 288 // I don't know what to do here 279 289 abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n"); 280 290 } 291 292 // The weird thread kidnapping stuff is over, restore interrupts. 293 enable_interrupts( __cfaabi_dbg_ctx ); 281 294 } else { 282 295 post( this.thrd.sem ); … … 463 476 464 477 void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev) { 478 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %p\n", &ctx); 465 479 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, ctx.ring->fd, &ev); 466 480 if (ret < 0) { 
  Note:
 See   TracChangeset
 for help on using the changeset viewer.
  