Changeset 18f7858
- Timestamp:
- Mar 28, 2022, 4:00:32 PM (20 months ago)
- Branches:
- ADT, ast-experimental, enum, master, pthread-emulation, qualifiedEnum
- Children:
- 37a3aa23
- Parents:
- 2377ca2
- Location:
- libcfa/src/concurrency
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r2377ca2 r18f7858 94 94 extern void __kernel_unpark( thread$ * thrd, unpark_hint ); 95 95 96 static bool __cfa_do_drain( $io_context * ctx, cluster * cltr ) { 96 static void ioring_syscsll( struct $io_context & ctx, unsigned int min_comp, unsigned int flags ) { 97 __STATS__( true, io.calls.flush++; ) 98 int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, flags, (sigset_t *)0p, _NSIG / 8); 99 if( ret < 0 ) { 100 switch((int)errno) { 101 case EAGAIN: 102 case EINTR: 103 case EBUSY: 104 // Update statistics 105 __STATS__( false, io.calls.errors.busy ++; ) 106 return false; 107 default: 108 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 109 } 110 } 111 112 __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd); 113 __STATS__( true, io.calls.submitted += ret; ) 114 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 115 /* paranoid */ verify( ctx.sq.to_submit >= ret ); 116 117 ctx.sq.to_submit -= ret; 118 119 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 120 121 // Release the consumed SQEs 122 __release_sqes( ctx ); 123 124 /* paranoid */ verify( ! __preemption_enabled() ); 125 126 __atomic_store_n(&ctx.proc->io.pending, false, __ATOMIC_RELAXED); 127 } 128 129 static bool try_acquire( $io_context * ctx ) __attribute__((nonnull(1))) { 97 130 /* paranoid */ verify( ! __preemption_enabled() ); 98 131 /* paranoid */ verify( ready_schedule_islocked() ); 99 /* paranoid */ verify( ctx );100 101 const __u32 mask = *ctx->cq.mask;102 132 103 133 … … 115 145 } 116 146 147 return true; 148 } 149 150 static bool __cfa_do_drain( $io_context * ctx, cluster * cltr ) __attribute__((nonnull(1, 2))) { 151 /* paranoid */ verify( ! __preemption_enabled() ); 152 /* paranoid */ verify( ready_schedule_islocked() ); 153 /* paranoid */ verify( ctx->cq.lock == true ); 154 155 const __u32 mask = *ctx->cq.mask; 117 156 unsigned long long ts_prev = ctx->cq.ts; 118 157 … … 155 194 bool local = false; 156 195 bool remote = false; 196 197 ready_schedule_lock(); 157 198 158 199 cluster * const cltr = proc->cltr; … … 186 227 const unsigned target = proc->io.target; 187 228 /* paranoid */ verify( io.tscs[target].tv != MAX ); 188 if(target < ctxs_count) {229 HELP: if(target < ctxs_count) { 189 230 const unsigned long long cutoff = calc_cutoff(ctsc, ctx->cq.id, ctxs_count, io.data, io.tscs, __shard_factor.io); 190 231 const unsigned long long age = moving_average(ctsc, io.tscs[target].tv, io.tscs[target].ma); 191 232 // __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no"); 192 if(age > cutoff) { 193 remote = __cfa_do_drain( io.data[target], cltr ); 194 if(remote) __STATS__( false, io.calls.helped++; ) 195 } 233 if(age <= cutoff) break HELP; 234 235 if(!try_acquire(io.data[target])) break HELP; 236 237 if(!__cfa_do_drain( io.data[target], cltr )) break HELP; 238 239 remote = true; 240 __STATS__( false, io.calls.helped++; ) 196 241 } 197 242 proc->io.target = MAX; … … 201 246 202 247 // Drain the local queue 203 local = __cfa_do_drain( proc->io.ctx, cltr ); 248 if(try_acquire( proc->io.ctx )) { 249 local = __cfa_do_drain( proc->io.ctx, cltr ); 250 } 204 251 205 252 /* paranoid */ verify( ready_schedule_islocked() ); 206 253 /* paranoid */ verify( ! __preemption_enabled() ); 207 254 /* paranoid */ verify( active_processor() == proc ); 255 256 ready_schedule_unlock(); 208 257 return local || remote; 209 258 } 210 259 211 bool __cfa_io_flush( processor * proc , int min_comp) {260 bool __cfa_io_flush( processor * proc ) { 212 261 /* paranoid */ verify( ! __preemption_enabled() ); 213 262 /* paranoid */ verify( proc ); … … 219 268 __ioarbiter_flush( ctx ); 220 269 221 if(ctx.sq.to_submit != 0 || min_comp > 0) { 222 223 __STATS__( true, io.calls.flush++; ) 224 int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, min_comp > 0 ? IORING_ENTER_GETEVENTS : 0, (sigset_t *)0p, _NSIG / 8); 225 if( ret < 0 ) { 226 switch((int)errno) { 227 case EAGAIN: 228 case EINTR: 229 case EBUSY: 230 // Update statistics 231 __STATS__( false, io.calls.errors.busy ++; ) 232 return false; 233 default: 234 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 235 } 236 } 237 238 __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd); 239 __STATS__( true, io.calls.submitted += ret; ) 240 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 241 /* paranoid */ verify( ctx.sq.to_submit >= ret ); 242 243 ctx.sq.to_submit -= ret; 244 245 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 246 247 // Release the consumed SQEs 248 __release_sqes( ctx ); 249 250 /* paranoid */ verify( ! __preemption_enabled() ); 251 252 __atomic_store_n(&ctx.proc->io.pending, false, __ATOMIC_RELAXED); 253 } 254 255 ready_schedule_lock(); 256 bool ret = __cfa_io_drain( proc ); 257 ready_schedule_unlock(); 258 return ret; 270 if(ctx.sq.to_submit != 0) { 271 ioring_syscsll(ctx, 0, 0); 272 273 } 274 275 return __cfa_io_drain( proc ); 259 276 } 260 277 … … 389 406 if(sq.to_submit > 30) { 390 407 __tls_stats()->io.flush.full++; 391 __cfa_io_flush( ctx->proc , 0);408 __cfa_io_flush( ctx->proc ); 392 409 } 393 410 if(!lazy) { 394 411 __tls_stats()->io.flush.eager++; 395 __cfa_io_flush( ctx->proc , 0);412 __cfa_io_flush( ctx->proc ); 396 413 } 397 414 } … … 656 673 return true; 657 674 } 675 676 void __cfa_io_idle( processor * proc ) { 677 iovec iov; 678 __atomic_acquire( &proc->io.ctx->cq.lock ); 679 680 with( this->idle_wctx) { 681 682 // Do we already have a pending read 683 if(available(*ftr)) { 684 // There is no pending read, we need to add one 685 reset(*ftr); 686 687 iov.iov_base = rdbuf; 688 iov.iov_len = sizeof(eventfd_t); 689 __kernel_read(proc, *ftr, iov, evfd ); 690 } 691 692 __ioarbiter_flush( *proc->io.ctx ); 693 ioring_syscsll(ctx, 1, IORING_ENTER_GETEVENTS); 694 695 __cfa_do_drain( proc->io.ctx, proc->cltr ); 696 } 658 697 #endif 659 698 #endif -
libcfa/src/concurrency/io/setup.cfa
r2377ca2 r18f7858 32 32 33 33 void __cfa_io_start( processor * proc ) {} 34 bool __cfa_io_flush( processor * proc, int ) { return false; } 34 bool __cfa_io_flush( processor * proc ) { return false; } 35 bool __cfa_io_drain( processor * proc ) __attribute__((nonnull (1))); 36 void __cfa_io_idle ( processor * ) __attribute__((nonnull (1))); 35 37 void __cfa_io_stop ( processor * proc ) {} 36 38 … … 215 217 216 218 // completion queue 217 cq.lock = 0;219 cq.lock = false; 218 220 cq.id = MAX; 219 221 cq.ts = rdtscl(); -
libcfa/src/concurrency/kernel.cfa
r2377ca2 r18f7858 132 132 static void __wake_one(cluster * cltr); 133 133 134 static void idle_sleep(processor * proc , io_future_t & future, iovec & iov);134 static void idle_sleep(processor * proc); 135 135 static bool mark_idle (__cluster_proc_list & idles, processor & proc); 136 136 static void mark_awake(__cluster_proc_list & idles, processor & proc); 137 137 138 138 extern bool __cfa_io_drain( processor * proc ) __attribute__((nonnull (1))); 139 extern bool __cfa_io_flush( processor * , int min_comp);140 static inline bool __maybe_io_drain( processor *);139 extern bool __cfa_io_flush( processor * ) __attribute__((nonnull (1))); 140 extern void __cfa_io_idle( processor * ) __attribute__((nonnull (1))); 141 141 142 142 #if defined(CFA_WITH_IO_URING_IDLE) … … 168 168 // mark it as already fulfilled so we know if there is a pending request or not 169 169 this->idle_wctx.ftr->self.ptr = 1p; 170 iovec idle_iovec = { this->idle_wctx.rdbuf, sizeof(eventfd_t) };171 170 172 171 __cfadbg_print_safe(runtime_core, "Kernel : core %p starting\n", this); … … 193 192 for() { 194 193 // Check if there is pending io 195 __ maybe_io_drain( this );194 __cfa_io_drain( this ); 196 195 197 196 // Try to get the next thread … … 199 198 200 199 if( !readyThread ) { 200 // there is no point in holding submissions if we are idle 201 201 __IO_STATS__(true, io.flush.idle++; ) 202 __cfa_io_flush( this, 0 ); 202 __cfa_io_flush( this ); 203 204 // drain again in case something showed up 205 __cfa_io_drain( this ); 203 206 204 207 readyThread = __next_thread( this->cltr ); … … 206 209 207 210 if( !readyThread ) for(5) { 211 readyThread = __next_thread_slow( this->cltr ); 212 213 if( readyThread ) break; 214 215 // It's unlikely we still I/O to submit, but the arbiter could 208 216 __IO_STATS__(true, io.flush.idle++; ) 209 210 readyThread = __next_thread_slow( this->cltr ); 211 212 if( readyThread ) break; 213 214 __cfa_io_flush( this, 0 ); 217 __cfa_io_flush( this ); 218 219 // drain again in case something showed up 220 __cfa_io_drain( this ); 215 221 } 216 222 … … 235 241 } 236 242 237 idle_sleep( this , *this->idle_wctx.ftr, idle_iovec);243 idle_sleep( this ); 238 244 239 245 // We were woken up, remove self from idle … … 257 263 if(__atomic_load_n(&this->io.pending, __ATOMIC_RELAXED) && !__atomic_load_n(&this->io.dirty, __ATOMIC_RELAXED)) { 258 264 __IO_STATS__(true, io.flush.dirty++; ) 259 __cfa_io_flush( this , 0);265 __cfa_io_flush( this ); 260 266 } 261 267 } … … 683 689 } 684 690 685 static void idle_sleep(processor * this , io_future_t & future, iovec & iov) {691 static void idle_sleep(processor * this) { 686 692 /* paranoid */ verify( this->idle_wctx.evfd != 1 ); 687 693 /* paranoid */ verify( this->idle_wctx.evfd != 2 ); … … 735 741 #endif 736 742 #else 737 // Do we already have a pending read 738 if(available(future)) { 739 // There is no pending read, we need to add one 740 reset(future); 741 742 __kernel_read(this, future, iov, this->idle_wctx.evfd ); 743 } 744 745 __cfa_io_flush( this, 1 ); 743 __cfa_io_idle( this ); 746 744 #endif 747 745 } … … 831 829 #endif 832 830 833 static inline bool __maybe_io_drain( processor * proc ) { 834 /* paranoid */ verify( proc ); 835 bool ret = false; 836 #if defined(CFA_HAVE_LINUX_IO_URING_H) 837 __cfadbg_print_safe(runtime_core, "Kernel : core %p checking io for ring %d\n", proc, proc->io.ctx->fd); 838 839 // Check if we should drain the queue 840 $io_context * ctx = proc->io.ctx; 841 unsigned head = *ctx->cq.head; 842 unsigned tail = *ctx->cq.tail; 843 if(head == tail) return false; 844 ready_schedule_lock(); 845 ret = __cfa_io_drain( proc ); 846 ready_schedule_unlock(); 847 #endif 848 return ret; 849 } 831 850 832 851 833 //-----------------------------------------------------------------------------
Note: See TracChangeset
for help on using the changeset viewer.