Changeset 18f7858 for libcfa/src/concurrency/io.cfa
- Timestamp:
- Mar 28, 2022, 4:00:32 PM (2 years ago)
- Branches:
- ADT, ast-experimental, enum, master, pthread-emulation, qualifiedEnum
- Children:
- 37a3aa23
- Parents:
- 2377ca2
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r2377ca2 r18f7858 94 94 extern void __kernel_unpark( thread$ * thrd, unpark_hint ); 95 95 96 static bool __cfa_do_drain( $io_context * ctx, cluster * cltr ) { 96 static void ioring_syscsll( struct $io_context & ctx, unsigned int min_comp, unsigned int flags ) { 97 __STATS__( true, io.calls.flush++; ) 98 int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, flags, (sigset_t *)0p, _NSIG / 8); 99 if( ret < 0 ) { 100 switch((int)errno) { 101 case EAGAIN: 102 case EINTR: 103 case EBUSY: 104 // Update statistics 105 __STATS__( false, io.calls.errors.busy ++; ) 106 return false; 107 default: 108 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 109 } 110 } 111 112 __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd); 113 __STATS__( true, io.calls.submitted += ret; ) 114 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 115 /* paranoid */ verify( ctx.sq.to_submit >= ret ); 116 117 ctx.sq.to_submit -= ret; 118 119 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 120 121 // Release the consumed SQEs 122 __release_sqes( ctx ); 123 124 /* paranoid */ verify( ! __preemption_enabled() ); 125 126 __atomic_store_n(&ctx.proc->io.pending, false, __ATOMIC_RELAXED); 127 } 128 129 static bool try_acquire( $io_context * ctx ) __attribute__((nonnull(1))) { 97 130 /* paranoid */ verify( ! __preemption_enabled() ); 98 131 /* paranoid */ verify( ready_schedule_islocked() ); 99 /* paranoid */ verify( ctx );100 101 const __u32 mask = *ctx->cq.mask;102 132 103 133 … … 115 145 } 116 146 147 return true; 148 } 149 150 static bool __cfa_do_drain( $io_context * ctx, cluster * cltr ) __attribute__((nonnull(1, 2))) { 151 /* paranoid */ verify( ! __preemption_enabled() ); 152 /* paranoid */ verify( ready_schedule_islocked() ); 153 /* paranoid */ verify( ctx->cq.lock == true ); 154 155 const __u32 mask = *ctx->cq.mask; 117 156 unsigned long long ts_prev = ctx->cq.ts; 118 157 … … 155 194 bool local = false; 156 195 bool remote = false; 196 197 ready_schedule_lock(); 157 198 158 199 cluster * const cltr = proc->cltr; … … 186 227 const unsigned target = proc->io.target; 187 228 /* paranoid */ verify( io.tscs[target].tv != MAX ); 188 if(target < ctxs_count) {229 HELP: if(target < ctxs_count) { 189 230 const unsigned long long cutoff = calc_cutoff(ctsc, ctx->cq.id, ctxs_count, io.data, io.tscs, __shard_factor.io); 190 231 const unsigned long long age = moving_average(ctsc, io.tscs[target].tv, io.tscs[target].ma); 191 232 // __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no"); 192 if(age > cutoff) { 193 remote = __cfa_do_drain( io.data[target], cltr ); 194 if(remote) __STATS__( false, io.calls.helped++; ) 195 } 233 if(age <= cutoff) break HELP; 234 235 if(!try_acquire(io.data[target])) break HELP; 236 237 if(!__cfa_do_drain( io.data[target], cltr )) break HELP; 238 239 remote = true; 240 __STATS__( false, io.calls.helped++; ) 196 241 } 197 242 proc->io.target = MAX; … … 201 246 202 247 // Drain the local queue 203 local = __cfa_do_drain( proc->io.ctx, cltr ); 248 if(try_acquire( proc->io.ctx )) { 249 local = __cfa_do_drain( proc->io.ctx, cltr ); 250 } 204 251 205 252 /* paranoid */ verify( ready_schedule_islocked() ); 206 253 /* paranoid */ verify( ! __preemption_enabled() ); 207 254 /* paranoid */ verify( active_processor() == proc ); 255 256 ready_schedule_unlock(); 208 257 return local || remote; 209 258 } 210 259 211 bool __cfa_io_flush( processor * proc , int min_comp) {260 bool __cfa_io_flush( processor * proc ) { 212 261 /* paranoid */ verify( ! __preemption_enabled() ); 213 262 /* paranoid */ verify( proc ); … … 219 268 __ioarbiter_flush( ctx ); 220 269 221 if(ctx.sq.to_submit != 0 || min_comp > 0) { 222 223 __STATS__( true, io.calls.flush++; ) 224 int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, min_comp > 0 ? IORING_ENTER_GETEVENTS : 0, (sigset_t *)0p, _NSIG / 8); 225 if( ret < 0 ) { 226 switch((int)errno) { 227 case EAGAIN: 228 case EINTR: 229 case EBUSY: 230 // Update statistics 231 __STATS__( false, io.calls.errors.busy ++; ) 232 return false; 233 default: 234 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 235 } 236 } 237 238 __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd); 239 __STATS__( true, io.calls.submitted += ret; ) 240 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 241 /* paranoid */ verify( ctx.sq.to_submit >= ret ); 242 243 ctx.sq.to_submit -= ret; 244 245 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 246 247 // Release the consumed SQEs 248 __release_sqes( ctx ); 249 250 /* paranoid */ verify( ! __preemption_enabled() ); 251 252 __atomic_store_n(&ctx.proc->io.pending, false, __ATOMIC_RELAXED); 253 } 254 255 ready_schedule_lock(); 256 bool ret = __cfa_io_drain( proc ); 257 ready_schedule_unlock(); 258 return ret; 270 if(ctx.sq.to_submit != 0) { 271 ioring_syscsll(ctx, 0, 0); 272 273 } 274 275 return __cfa_io_drain( proc ); 259 276 } 260 277 … … 389 406 if(sq.to_submit > 30) { 390 407 __tls_stats()->io.flush.full++; 391 __cfa_io_flush( ctx->proc , 0);408 __cfa_io_flush( ctx->proc ); 392 409 } 393 410 if(!lazy) { 394 411 __tls_stats()->io.flush.eager++; 395 __cfa_io_flush( ctx->proc , 0);412 __cfa_io_flush( ctx->proc ); 396 413 } 397 414 } … … 656 673 return true; 657 674 } 675 676 void __cfa_io_idle( processor * proc ) { 677 iovec iov; 678 __atomic_acquire( &proc->io.ctx->cq.lock ); 679 680 with( this->idle_wctx) { 681 682 // Do we already have a pending read 683 if(available(*ftr)) { 684 // There is no pending read, we need to add one 685 reset(*ftr); 686 687 iov.iov_base = rdbuf; 688 iov.iov_len = sizeof(eventfd_t); 689 __kernel_read(proc, *ftr, iov, evfd ); 690 } 691 692 __ioarbiter_flush( *proc->io.ctx ); 693 ioring_syscsll(ctx, 1, IORING_ENTER_GETEVENTS); 694 695 __cfa_do_drain( proc->io.ctx, proc->cltr ); 696 } 658 697 #endif 659 698 #endif
Note: See TracChangeset
for help on using the changeset viewer.