Changeset 2e9b59b for libcfa/src/concurrency/io.cfa
- Timestamp:
- Apr 19, 2022, 3:00:04 PM (3 years ago)
- Branches:
- ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
- Children:
- 5b84a321
- Parents:
- ba897d21 (diff), bb7c77d (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
rba897d21 r2e9b59b 41 41 #include "kernel.hfa" 42 42 #include "kernel/fwd.hfa" 43 #include "kernel_private.hfa" 43 #include "kernel/private.hfa" 44 #include "kernel/cluster.hfa" 44 45 #include "io/types.hfa" 45 46 … … 93 94 extern void __kernel_unpark( thread$ * thrd, unpark_hint ); 94 95 95 bool __cfa_io_drain( processor * proc ) { 96 /* paranoid */ verify( ! __preemption_enabled() ); 97 /* paranoid */ verify( ready_schedule_islocked() ); 98 /* paranoid */ verify( proc ); 99 /* paranoid */ verify( proc->io.ctx ); 100 101 // Drain the queue 102 $io_context * ctx = proc->io.ctx; 103 unsigned head = *ctx->cq.head; 104 unsigned tail = *ctx->cq.tail; 105 const __u32 mask = *ctx->cq.mask; 106 107 __u32 count = tail - head; 108 __STATS__( false, io.calls.drain++; io.calls.completed += count; ) 109 110 if(count == 0) return false; 111 112 for(i; count) { 113 unsigned idx = (head + i) & mask; 114 volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx]; 115 116 /* paranoid */ verify(&cqe); 117 118 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 119 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 120 121 __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL ); 122 } 123 124 __cfadbg_print_safe(io, "Kernel I/O : %u completed\n", count); 125 126 // Mark to the kernel that the cqe has been seen 127 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 128 __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST ); 129 130 /* paranoid */ verify( ready_schedule_islocked() ); 131 /* paranoid */ verify( ! __preemption_enabled() ); 132 133 return true; 134 } 135 136 bool __cfa_io_flush( processor * proc, int min_comp ) { 137 /* paranoid */ verify( ! __preemption_enabled() ); 138 /* paranoid */ verify( proc ); 139 /* paranoid */ verify( proc->io.ctx ); 140 141 __attribute__((unused)) cluster * cltr = proc->cltr; 142 $io_context & ctx = *proc->io.ctx; 143 144 __ioarbiter_flush( ctx ); 145 146 if(ctx.sq.to_submit != 0 || min_comp > 0) { 147 148 __STATS__( true, io.calls.flush++; ) 149 int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, min_comp > 0 ? IORING_ENTER_GETEVENTS : 0, (sigset_t *)0p, _NSIG / 8); 96 static void ioring_syscsll( struct $io_context & ctx, unsigned int min_comp, unsigned int flags ) { 97 __STATS__( true, io.calls.flush++; ) 98 int ret; 99 for() { 100 ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, flags, (sigset_t *)0p, _NSIG / 8); 150 101 if( ret < 0 ) { 151 102 switch((int)errno) { 103 case EINTR: 104 continue; 152 105 case EAGAIN: 153 case EINTR:154 106 case EBUSY: 155 107 // Update statistics … … 160 112 } 161 113 } 162 163 __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd); 164 __STATS__( true, io.calls.submitted += ret; ) 165 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 166 /* paranoid */ verify( ctx.sq.to_submit >= ret ); 167 168 ctx.sq.to_submit -= ret; 169 170 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 171 172 // Release the consumed SQEs 173 __release_sqes( ctx ); 174 114 break; 115 } 116 117 __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd); 118 __STATS__( true, io.calls.submitted += ret; ) 119 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 120 /* paranoid */ verify( ctx.sq.to_submit >= ret ); 121 122 ctx.sq.to_submit -= ret; 123 124 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 125 126 // Release the consumed SQEs 127 __release_sqes( ctx ); 128 129 /* paranoid */ verify( ! __preemption_enabled() ); 130 131 __atomic_store_n(&ctx.proc->io.pending, false, __ATOMIC_RELAXED); 132 } 133 134 static bool try_acquire( $io_context * ctx ) __attribute__((nonnull(1))) { 135 /* paranoid */ verify( ! __preemption_enabled() ); 136 /* paranoid */ verify( ready_schedule_islocked() ); 137 138 139 { 140 const __u32 head = *ctx->cq.head; 141 const __u32 tail = *ctx->cq.tail; 142 143 if(head == tail) return false; 144 } 145 146 // Drain the queue 147 if(!__atomic_try_acquire(&ctx->cq.lock)) { 148 __STATS__( false, io.calls.locked++; ) 149 return false; 150 } 151 152 return true; 153 } 154 155 static bool __cfa_do_drain( $io_context * ctx, cluster * cltr ) __attribute__((nonnull(1, 2))) { 156 /* paranoid */ verify( ! __preemption_enabled() ); 157 /* paranoid */ verify( ready_schedule_islocked() ); 158 /* paranoid */ verify( ctx->cq.lock == true ); 159 160 const __u32 mask = *ctx->cq.mask; 161 unsigned long long ts_prev = ctx->cq.ts; 162 163 // re-read the head and tail in case it already changed. 164 const __u32 head = *ctx->cq.head; 165 const __u32 tail = *ctx->cq.tail; 166 const __u32 count = tail - head; 167 __STATS__( false, io.calls.drain++; io.calls.completed += count; ) 168 169 for(i; count) { 170 unsigned idx = (head + i) & mask; 171 volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx]; 172 173 /* paranoid */ verify(&cqe); 174 175 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 176 // __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 177 178 __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL ); 179 } 180 181 unsigned long long ts_next = ctx->cq.ts = rdtscl(); 182 183 // Mark to the kernel that the cqe has been seen 184 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 185 __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST ); 186 ctx->proc->idle_wctx.drain_time = ts_next; 187 188 __cfadbg_print_safe(io, "Kernel I/O : %u completed age %llu\n", count, ts_next); 189 /* paranoid */ verify( ready_schedule_islocked() ); 190 /* paranoid */ verify( ! __preemption_enabled() ); 191 192 __atomic_unlock(&ctx->cq.lock); 193 194 touch_tsc( cltr->sched.io.tscs, ctx->cq.id, ts_prev, ts_next ); 195 196 return true; 197 } 198 199 bool __cfa_io_drain( processor * proc ) { 200 bool local = false; 201 bool remote = false; 202 203 ready_schedule_lock(); 204 205 cluster * const cltr = proc->cltr; 206 $io_context * const ctx = proc->io.ctx; 207 /* paranoid */ verify( cltr ); 208 /* paranoid */ verify( ctx ); 209 210 with(cltr->sched) { 211 const size_t ctxs_count = io.count; 212 213 /* paranoid */ verify( ready_schedule_islocked() ); 175 214 /* paranoid */ verify( ! __preemption_enabled() ); 176 177 ctx.proc->io.pending = false; 178 } 179 180 ready_schedule_lock(); 181 bool ret = __cfa_io_drain( proc ); 215 /* paranoid */ verify( active_processor() == proc ); 216 /* paranoid */ verify( __shard_factor.io > 0 ); 217 /* paranoid */ verify( ctxs_count > 0 ); 218 /* paranoid */ verify( ctx->cq.id < ctxs_count ); 219 220 const unsigned this_cache = cache_id(cltr, ctx->cq.id / __shard_factor.io); 221 const unsigned long long ctsc = rdtscl(); 222 223 if(proc->io.target == MAX) { 224 uint64_t chaos = __tls_rand(); 225 unsigned ext = chaos & 0xff; 226 unsigned other = (chaos >> 8) % (ctxs_count); 227 228 if(ext < 3 || __atomic_load_n(&caches[other / __shard_factor.io].id, __ATOMIC_RELAXED) == this_cache) { 229 proc->io.target = other; 230 } 231 } 232 else { 233 const unsigned target = proc->io.target; 234 /* paranoid */ verify( io.tscs[target].tv != MAX ); 235 HELP: if(target < ctxs_count) { 236 const unsigned long long cutoff = calc_cutoff(ctsc, ctx->cq.id, ctxs_count, io.data, io.tscs, __shard_factor.io); 237 const unsigned long long age = moving_average(ctsc, io.tscs[target].tv, io.tscs[target].ma); 238 __cfadbg_print_safe(io, "Kernel I/O: Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, ctx->cq.id, age, cutoff, age > cutoff ? "yes" : "no"); 239 if(age <= cutoff) break HELP; 240 241 if(!try_acquire(io.data[target])) break HELP; 242 243 if(!__cfa_do_drain( io.data[target], cltr )) break HELP; 244 245 remote = true; 246 __STATS__( false, io.calls.helped++; ) 247 } 248 proc->io.target = MAX; 249 } 250 } 251 252 253 // Drain the local queue 254 if(try_acquire( proc->io.ctx )) { 255 local = __cfa_do_drain( proc->io.ctx, cltr ); 256 } 257 258 /* paranoid */ verify( ready_schedule_islocked() ); 259 /* paranoid */ verify( ! __preemption_enabled() ); 260 /* paranoid */ verify( active_processor() == proc ); 261 182 262 ready_schedule_unlock(); 183 return ret; 263 return local || remote; 264 } 265 266 bool __cfa_io_flush( processor * proc ) { 267 /* paranoid */ verify( ! __preemption_enabled() ); 268 /* paranoid */ verify( proc ); 269 /* paranoid */ verify( proc->io.ctx ); 270 271 $io_context & ctx = *proc->io.ctx; 272 273 __ioarbiter_flush( ctx ); 274 275 if(ctx.sq.to_submit != 0) { 276 ioring_syscsll(ctx, 0, 0); 277 278 } 279 280 return __cfa_io_drain( proc ); 184 281 } 185 282 … … 209 306 struct io_uring_sqe * sqes = ctx->sq.sqes; 210 307 for(i; want) { 211 __cfadbg_print_safe(io, "Kernel I/O : filling loop\n");308 // __cfadbg_print_safe(io, "Kernel I/O : filling loop\n"); 212 309 out_sqes[i] = &sqes[idxs[i]]; 213 310 } … … 227 324 // copy all the indexes we want from the available list 228 325 for(i; want) { 229 __cfadbg_print_safe(io, "Kernel I/O : allocating loop\n");326 // __cfadbg_print_safe(io, "Kernel I/O : allocating loop\n"); 230 327 idxs[i] = sq.free_ring.array[(fhead + i) & mask]; 231 328 } … … 244 341 // sqe == &sqes[idx] 245 342 struct $io_context * cfa_io_allocate(struct io_uring_sqe * sqes[], __u32 idxs[], __u32 want) { 246 __cfadbg_print_safe(io, "Kernel I/O : attempting to allocate %u\n", want);343 // __cfadbg_print_safe(io, "Kernel I/O : attempting to allocate %u\n", want); 247 344 248 345 disable_interrupts(); … … 252 349 /* paranoid */ verify( ctx ); 253 350 254 __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n");351 // __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n"); 255 352 256 353 // We can proceed to the fast path … … 260 357 enable_interrupts(); 261 358 262 __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful from ring %d\n", ctx->fd);359 // __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful from ring %d\n", ctx->fd); 263 360 264 361 __fill( sqes, want, idxs, ctx ); … … 275 372 /* paranoid */ verify( ioarb ); 276 373 277 __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n");374 // __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n"); 278 375 279 376 struct $io_context * ret = __ioarbiter_allocate(*ioarb, idxs, want); 280 377 281 __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd);378 // __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd); 282 379 283 380 __fill( sqes, want, idxs,ret ); … … 296 393 // Add the sqes to the array 297 394 for( i; have ) { 298 __cfadbg_print_safe(io, "Kernel I/O : __submit loop\n");395 // __cfadbg_print_safe(io, "Kernel I/O : __submit loop\n"); 299 396 sq.kring.array[ (tail + i) & mask ] = idxs[i]; 300 397 } … … 304 401 sq.to_submit += have; 305 402 306 ctx->proc->io.pending = true;307 ctx->proc->io.dirty = true;403 __atomic_store_n(&ctx->proc->io.pending, true, __ATOMIC_RELAXED); 404 __atomic_store_n(&ctx->proc->io.dirty , true, __ATOMIC_RELAXED); 308 405 } 309 406 … … 314 411 if(sq.to_submit > 30) { 315 412 __tls_stats()->io.flush.full++; 316 __cfa_io_flush( ctx->proc , 0);413 __cfa_io_flush( ctx->proc ); 317 414 } 318 415 if(!lazy) { 319 416 __tls_stats()->io.flush.eager++; 320 __cfa_io_flush( ctx->proc , 0);417 __cfa_io_flush( ctx->proc ); 321 418 } 322 419 } 323 420 324 421 void cfa_io_submit( struct $io_context * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) { 325 __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager");422 // __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager"); 326 423 327 424 disable_interrupts(); … … 340 437 enable_interrupts(); 341 438 342 __cfadbg_print_safe(io, "Kernel I/O : submitted on fast path\n");439 // __cfadbg_print_safe(io, "Kernel I/O : submitted on fast path\n"); 343 440 return; 344 441 } … … 348 445 enable_interrupts(); 349 446 350 __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n");447 // __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n"); 351 448 352 449 __ioarbiter_submit(inctx, idxs, have, lazy); … … 392 489 // go through the range and release the sqes 393 490 for( i; count ) { 394 __cfadbg_print_safe(io, "Kernel I/O : release loop\n");491 // __cfadbg_print_safe(io, "Kernel I/O : release loop\n"); 395 492 __u32 idx = ctx.sq.kring.array[ (phead + i) & mask ]; 396 493 ctx.sq.free_ring.array[ (ftail + i) & mask ] = idx; … … 432 529 433 530 static $io_context * __ioarbiter_allocate( $io_arbiter & this, __u32 idxs[], __u32 want ) { 434 __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n");531 // __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n"); 435 532 436 533 __STATS__( false, io.alloc.block += 1; ) … … 499 596 bool we = enqueue(ctx->ext_sq, (__outstanding_io&)ei); 500 597 501 ctx->proc->io.pending = true;598 __atomic_store_n(&ctx->proc->io.pending, true, __ATOMIC_SEQ_CST); 502 599 503 600 if( we ) { … … 544 641 545 642 // We can proceed to the fast path 546 if( !__alloc(ctx, &idx, 1) ) return false; 643 if( !__alloc(ctx, &idx, 1) ) { 644 /* paranoid */ verify( false ); // for now check if this happens, next time just abort the sleep. 645 return false; 646 } 547 647 548 648 // Allocation was successful … … 574 674 575 675 /* paranoid */ verify( sqe->user_data == (uintptr_t)&future ); 576 __submit ( ctx, &idx, 1, true);676 __submit_only( ctx, &idx, 1 ); 577 677 578 678 /* paranoid */ verify( proc == __cfaabi_tls.this_processor ); … … 581 681 return true; 582 682 } 683 684 void __cfa_io_idle( processor * proc ) { 685 iovec iov; 686 __atomic_acquire( &proc->io.ctx->cq.lock ); 687 688 __attribute__((used)) volatile bool was_reset = false; 689 690 with( proc->idle_wctx) { 691 692 // Do we already have a pending read 693 if(available(*ftr)) { 694 // There is no pending read, we need to add one 695 reset(*ftr); 696 697 iov.iov_base = rdbuf; 698 iov.iov_len = sizeof(eventfd_t); 699 __kernel_read(proc, *ftr, iov, evfd ); 700 ftr->result = 0xDEADDEAD; 701 *((eventfd_t *)rdbuf) = 0xDEADDEADDEADDEAD; 702 was_reset = true; 703 } 704 } 705 706 if( !__atomic_load_n( &proc->do_terminate, __ATOMIC_SEQ_CST ) ) { 707 __ioarbiter_flush( *proc->io.ctx ); 708 proc->idle_wctx.sleep_time = rdtscl(); 709 ioring_syscsll( *proc->io.ctx, 1, IORING_ENTER_GETEVENTS); 710 } 711 712 ready_schedule_lock(); 713 __cfa_do_drain( proc->io.ctx, proc->cltr ); 714 ready_schedule_unlock(); 715 716 asm volatile ("" :: "m" (was_reset)); 717 } 583 718 #endif 584 719 #endif
Note:
See TracChangeset
for help on using the changeset viewer.