Changeset 8e4aa05 for libcfa/src/concurrency/io.cfa
- Timestamp:
- Mar 4, 2021, 7:40:25 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 77d601f
- Parents:
- 342af53 (diff), a5040fe (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r342af53 r8e4aa05 32 32 extern "C" { 33 33 #include <sys/syscall.h> 34 #include <sys/eventfd.h> 34 35 35 36 #include <linux/io_uring.h> … … 41 42 #include "io/types.hfa" 42 43 43 static const char * opcodes[] = {44 __attribute__((unused)) static const char * opcodes[] = { 44 45 "OP_NOP", 45 46 "OP_READV", … … 79 80 }; 80 81 81 // returns true of acquired as leader or second leader 82 static inline bool try_lock( __leaderlock_t & this ) { 83 const uintptr_t thrd = 1z | (uintptr_t)active_thread(); 84 bool block; 85 disable_interrupts(); 86 for() { 87 struct $thread * expected = this.value; 88 if( 1p != expected && 0p != expected ) { 89 /* paranoid */ verify( thrd != (uintptr_t)expected ); // We better not already be the next leader 90 enable_interrupts( __cfaabi_dbg_ctx ); 91 return false; 92 } 93 struct $thread * desired; 94 if( 0p == expected ) { 95 // If the lock isn't locked acquire it, no need to block 96 desired = 1p; 97 block = false; 98 } 99 else { 100 // If the lock is already locked try becomming the next leader 101 desired = (struct $thread *)thrd; 102 block = true; 103 } 104 if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break; 105 } 106 if( block ) { 107 enable_interrupts( __cfaabi_dbg_ctx ); 108 park(); 109 disable_interrupts(); 110 } 111 return true; 112 } 113 114 static inline bool next( __leaderlock_t & this ) { 82 static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor *, __u32 idxs[], __u32 want ); 83 static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * , __u32 idxs[], __u32 have, bool lazy ); 84 static void __ioarbiter_flush ( $io_arbiter & mutex this, $io_context * ); 85 static inline void __ioarbiter_notify( $io_context & ctx ); 86 //============================================================================================= 87 // I/O Polling 88 //============================================================================================= 89 static inline unsigned __flush( struct $io_context & ); 90 static inline __u32 __release_sqes( struct $io_context & ); 91 92 void __cfa_io_drain( processor * proc ) { 115 93 /* paranoid */ verify( ! __preemption_enabled() ); 116 struct $thread * nextt; 117 for() { 118 struct $thread * expected = this.value; 119 /* paranoid */ verify( (1 & (uintptr_t)expected) == 1 ); // The lock better be locked 120 121 struct $thread * desired; 122 if( 1p == expected ) { 123 // No next leader, just unlock 124 desired = 0p; 125 nextt = 0p; 126 } 127 else { 128 // There is a next leader, remove but keep locked 129 desired = 1p; 130 nextt = (struct $thread *)(~1z & (uintptr_t)expected); 131 } 132 if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break; 133 } 134 135 if(nextt) { 136 unpark( nextt ); 137 enable_interrupts( __cfaabi_dbg_ctx ); 138 return true; 139 } 140 enable_interrupts( __cfaabi_dbg_ctx ); 141 return false; 142 } 143 144 //============================================================================================= 145 // I/O Syscall 146 //============================================================================================= 147 static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) { 148 bool need_sys_to_submit = false; 149 bool need_sys_to_complete = false; 150 unsigned flags = 0; 151 152 TO_SUBMIT: 153 if( to_submit > 0 ) { 154 if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 155 need_sys_to_submit = true; 156 break TO_SUBMIT; 157 } 158 if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) { 159 need_sys_to_submit = true; 160 flags |= IORING_ENTER_SQ_WAKEUP; 161 } 162 } 163 164 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 165 flags |= IORING_ENTER_GETEVENTS; 166 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) { 167 need_sys_to_complete = true; 168 } 169 } 170 171 int ret = 0; 172 if( need_sys_to_submit || need_sys_to_complete ) { 173 __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags); 174 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8); 175 if( ret < 0 ) { 176 switch((int)errno) { 177 case EAGAIN: 178 case EINTR: 179 ret = -1; 180 break; 181 default: 182 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 183 } 184 } 185 } 186 187 // Memory barrier 188 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 189 return ret; 190 } 191 192 //============================================================================================= 193 // I/O Polling 194 //============================================================================================= 195 static unsigned __collect_submitions( struct __io_data & ring ); 196 static __u32 __release_consumed_submission( struct __io_data & ring ); 197 static inline void __clean( volatile struct io_uring_sqe * sqe ); 198 199 // Process a single completion message from the io_uring 200 // This is NOT thread-safe 201 static inline void process( volatile struct io_uring_cqe & cqe ) { 202 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 203 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 204 205 fulfil( *future, cqe.res ); 206 } 207 208 static [int, bool] __drain_io( & struct __io_data ring ) { 209 /* paranoid */ verify( ! __preemption_enabled() ); 210 211 unsigned to_submit = 0; 212 if( ring.poller_submits ) { 213 // If the poller thread also submits, then we need to aggregate the submissions which are ready 214 to_submit = __collect_submitions( ring ); 215 } 216 217 int ret = __io_uring_enter(ring, to_submit, true); 218 if( ret < 0 ) { 219 return [0, true]; 220 } 221 222 // update statistics 223 if (to_submit > 0) { 224 __STATS__( true, 225 if( to_submit > 0 ) { 226 io.submit_q.submit_avg.rdy += to_submit; 227 io.submit_q.submit_avg.csm += ret; 228 io.submit_q.submit_avg.cnt += 1; 229 } 230 ) 231 } 232 233 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 234 235 // Release the consumed SQEs 236 __release_consumed_submission( ring ); 94 /* paranoid */ verify( proc ); 95 /* paranoid */ verify( proc->io.ctx ); 237 96 238 97 // Drain the queue 239 unsigned head = *ring.completion_q.head; 240 unsigned tail = *ring.completion_q.tail; 241 const __u32 mask = *ring.completion_q.mask; 242 243 // Nothing was new return 0 244 if (head == tail) { 245 return [0, to_submit > 0]; 246 } 98 $io_context * ctx = proc->io.ctx; 99 unsigned head = *ctx->cq.head; 100 unsigned tail = *ctx->cq.tail; 101 const __u32 mask = *ctx->cq.mask; 247 102 248 103 __u32 count = tail - head; 249 /* paranoid */ verify( count != 0 ); 104 __STATS__( false, io.calls.drain++; io.calls.completed += count; ) 105 250 106 for(i; count) { 251 107 unsigned idx = (head + i) & mask; 252 volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];108 volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx]; 253 109 254 110 /* paranoid */ verify(&cqe); 255 111 256 process( cqe ); 257 } 112 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 113 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 114 115 fulfil( *future, cqe.res ); 116 } 117 118 __cfadbg_print_safe(io, "Kernel I/O : %u completed\n", count); 258 119 259 120 // Mark to the kernel that the cqe has been seen 260 121 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 261 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST ); 262 263 return [count, count > 0 || to_submit > 0]; 264 } 265 266 void main( $io_ctx_thread & this ) { 267 __ioctx_register( this ); 268 269 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this); 270 271 const int reset_cnt = 5; 272 int reset = reset_cnt; 273 // Then loop until we need to start 274 LOOP: 275 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) { 276 // Drain the io 277 int count; 278 bool again; 279 disable_interrupts(); 280 [count, again] = __drain_io( *this.ring ); 281 282 if(!again) reset--; 283 122 __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST ); 123 124 /* paranoid */ verify( ! __preemption_enabled() ); 125 126 return; 127 } 128 129 void __cfa_io_flush( processor * proc ) { 130 /* paranoid */ verify( ! __preemption_enabled() ); 131 /* paranoid */ verify( proc ); 132 /* paranoid */ verify( proc->io.ctx ); 133 134 $io_context & ctx = *proc->io.ctx; 135 136 if(!ctx.ext_sq.empty) { 137 __ioarbiter_flush( *ctx.arbiter, &ctx ); 138 } 139 140 __STATS__( true, io.calls.flush++; ) 141 int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, 0, 0, (sigset_t *)0p, _NSIG / 8); 142 if( ret < 0 ) { 143 switch((int)errno) { 144 case EAGAIN: 145 case EINTR: 146 case EBUSY: 284 147 // Update statistics 285 __STATS__( true, 286 io.complete_q.completed_avg.val += count; 287 io.complete_q.completed_avg.cnt += 1; 288 ) 289 enable_interrupts( __cfaabi_dbg_ctx ); 290 291 // If we got something, just yield and check again 292 if(reset > 1) { 293 yield(); 294 continue LOOP; 148 __STATS__( false, io.calls.errors.busy ++; ) 149 return; 150 default: 151 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 295 152 } 296 297 // We alread failed to find completed entries a few time. 298 if(reset == 1) { 299 // Rearm the context so it can block 300 // but don't block right away 301 // we need to retry one last time in case 302 // something completed *just now* 303 __ioctx_prepare_block( this ); 304 continue LOOP; 305 } 306 307 __STATS__( false, 308 io.complete_q.blocks += 1; 309 ) 310 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this); 311 312 // block this thread 313 wait( this.sem ); 314 315 // restore counter 316 reset = reset_cnt; 317 } 318 319 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this); 153 } 154 155 __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd); 156 __STATS__( true, io.calls.submitted += ret; ) 157 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 158 /* paranoid */ verify( ctx.sq.to_submit >= ret ); 159 160 ctx.sq.to_submit -= ret; 161 162 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 163 164 // Release the consumed SQEs 165 __release_sqes( ctx ); 166 167 /* paranoid */ verify( ! __preemption_enabled() ); 168 169 ctx.proc->io.pending = false; 320 170 } 321 171 … … 339 189 // head and tail must be fully filled and shouldn't ever be touched again. 340 190 // 191 //============================================================================================= 192 // Allocation 193 // for user's convenience fill the sqes from the indexes 194 static inline void __fill(struct io_uring_sqe * out_sqes[], __u32 want, __u32 idxs[], struct $io_context * ctx) { 195 struct io_uring_sqe * sqes = ctx->sq.sqes; 196 for(i; want) { 197 __cfadbg_print_safe(io, "Kernel I/O : filling loop\n"); 198 out_sqes[i] = &sqes[idxs[i]]; 199 } 200 } 201 202 // Try to directly allocate from the a given context 203 // Not thread-safe 204 static inline bool __alloc(struct $io_context * ctx, __u32 idxs[], __u32 want) { 205 __sub_ring_t & sq = ctx->sq; 206 const __u32 mask = *sq.mask; 207 __u32 fhead = sq.free_ring.head; // get the current head of the queue 208 __u32 ftail = sq.free_ring.tail; // get the current tail of the queue 209 210 // If we don't have enough sqes, fail 211 if((ftail - fhead) < want) { return false; } 212 213 // copy all the indexes we want from the available list 214 for(i; want) { 215 __cfadbg_print_safe(io, "Kernel I/O : allocating loop\n"); 216 idxs[i] = sq.free_ring.array[(fhead + i) & mask]; 217 } 218 219 // Advance the head to mark the indexes as consumed 220 __atomic_store_n(&sq.free_ring.head, fhead + want, __ATOMIC_RELEASE); 221 222 // return success 223 return true; 224 } 341 225 342 226 // Allocate an submit queue entry. … … 345 229 // for convenience, return both the index and the pointer to the sqe 346 230 // sqe == &sqes[idx] 347 [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) { 348 /* paranoid */ verify( data != 0 ); 349 350 // Prepare the data we need 351 __attribute((unused)) int len = 0; 352 __attribute((unused)) int block = 0; 353 __u32 cnt = *ring.submit_q.num; 354 __u32 mask = *ring.submit_q.mask; 355 356 __u32 off = thread_rand(); 357 358 // Loop around looking for an available spot 359 for() { 360 // Look through the list starting at some offset 361 for(i; cnt) { 362 __u64 expected = 3; 363 __u32 idx = (i + off) & mask; // Get an index from a random 364 volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 365 volatile __u64 * udata = &sqe->user_data; 366 367 // Allocate the entry by CASing the user_data field from 0 to the future address 368 if( *udata == expected && 369 __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) 370 { 371 // update statistics 372 __STATS__( false, 373 io.submit_q.alloc_avg.val += len; 374 io.submit_q.alloc_avg.block += block; 375 io.submit_q.alloc_avg.cnt += 1; 376 ) 377 378 // debug log 379 __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data ); 380 381 // Success return the data 382 return [sqe, idx]; 383 } 384 verify(expected != data); 385 386 // This one was used 387 len ++; 388 } 389 390 block++; 391 392 abort( "Kernel I/O : all submit queue entries used, yielding\n" ); 393 394 yield(); 395 } 396 } 397 398 static inline __u32 __submit_to_ready_array( struct __io_data & ring, __u32 idx, const __u32 mask ) { 399 /* paranoid */ verify( idx <= mask ); 400 /* paranoid */ verify( idx != -1ul32 ); 401 402 // We need to find a spot in the ready array 403 __attribute((unused)) int len = 0; 404 __attribute((unused)) int block = 0; 405 __u32 ready_mask = ring.submit_q.ready_cnt - 1; 406 407 __u32 off = thread_rand(); 408 409 __u32 picked; 410 LOOKING: for() { 411 for(i; ring.submit_q.ready_cnt) { 412 picked = (i + off) & ready_mask; 413 __u32 expected = -1ul32; 414 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) { 415 break LOOKING; 416 } 417 verify(expected != idx); 418 419 len ++; 420 } 421 422 block++; 423 424 __u32 released = __release_consumed_submission( ring ); 425 if( released == 0 ) { 426 yield(); 427 } 428 } 429 430 // update statistics 431 __STATS__( false, 432 io.submit_q.look_avg.val += len; 433 io.submit_q.look_avg.block += block; 434 io.submit_q.look_avg.cnt += 1; 435 ) 436 437 return picked; 438 } 439 440 void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) { 441 __io_data & ring = *ctx->thrd.ring; 442 231 struct $io_context * cfa_io_allocate(struct io_uring_sqe * sqes[], __u32 idxs[], __u32 want) { 232 __cfadbg_print_safe(io, "Kernel I/O : attempting to allocate %u\n", want); 233 234 disable_interrupts(); 235 processor * proc = __cfaabi_tls.this_processor; 236 $io_context * ctx = proc->io.ctx; 237 /* paranoid */ verify( __cfaabi_tls.this_processor ); 238 /* paranoid */ verify( ctx ); 239 240 __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n"); 241 242 // We can proceed to the fast path 243 if( __alloc(ctx, idxs, want) ) { 244 // Allocation was successful 245 __STATS__( true, io.alloc.fast += 1; ) 246 enable_interrupts( __cfaabi_dbg_ctx ); 247 248 __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful from ring %d\n", ctx->fd); 249 250 __fill( sqes, want, idxs, ctx ); 251 return ctx; 252 } 253 // The fast path failed, fallback 254 __STATS__( true, io.alloc.fail += 1; ) 255 256 // Fast path failed, fallback on arbitration 257 __STATS__( true, io.alloc.slow += 1; ) 258 enable_interrupts( __cfaabi_dbg_ctx ); 259 260 $io_arbiter * ioarb = proc->cltr->io.arbiter; 261 /* paranoid */ verify( ioarb ); 262 263 __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n"); 264 265 struct $io_context * ret = __ioarbiter_allocate(*ioarb, proc, idxs, want); 266 267 __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd); 268 269 __fill( sqes, want, idxs,ret ); 270 return ret; 271 } 272 273 274 //============================================================================================= 275 // submission 276 static inline void __submit( struct $io_context * ctx, __u32 idxs[], __u32 have, bool lazy) { 277 // We can proceed to the fast path 278 // Get the right objects 279 __sub_ring_t & sq = ctx->sq; 280 const __u32 mask = *sq.mask; 281 __u32 tail = *sq.kring.tail; 282 283 // Add the sqes to the array 284 for( i; have ) { 285 __cfadbg_print_safe(io, "Kernel I/O : __submit loop\n"); 286 sq.kring.array[ (tail + i) & mask ] = idxs[i]; 287 } 288 289 // Make the sqes visible to the submitter 290 __atomic_store_n(sq.kring.tail, tail + have, __ATOMIC_RELEASE); 291 sq.to_submit++; 292 293 ctx->proc->io.pending = true; 294 ctx->proc->io.dirty = true; 295 if(sq.to_submit > 30 || !lazy) { 296 __cfa_io_flush( ctx->proc ); 297 } 298 } 299 300 void cfa_io_submit( struct $io_context * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) { 301 __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager"); 302 303 disable_interrupts(); 304 processor * proc = __cfaabi_tls.this_processor; 305 $io_context * ctx = proc->io.ctx; 306 /* paranoid */ verify( __cfaabi_tls.this_processor ); 307 /* paranoid */ verify( ctx ); 308 309 // Can we proceed to the fast path 310 if( ctx == inctx ) // We have the right instance? 443 311 { 444 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 445 __cfadbg_print_safe( io, 446 "Kernel I/O : submitting %u (%p) for %p\n" 447 " data: %p\n" 448 " opcode: %s\n" 449 " fd: %d\n" 450 " flags: %d\n" 451 " prio: %d\n" 452 " off: %p\n" 453 " addr: %p\n" 454 " len: %d\n" 455 " other flags: %d\n" 456 " splice fd: %d\n" 457 " pad[0]: %llu\n" 458 " pad[1]: %llu\n" 459 " pad[2]: %llu\n", 460 idx, sqe, 461 active_thread(), 462 (void*)sqe->user_data, 463 opcodes[sqe->opcode], 464 sqe->fd, 465 sqe->flags, 466 sqe->ioprio, 467 sqe->off, 468 sqe->addr, 469 sqe->len, 470 sqe->accept_flags, 471 sqe->splice_fd_in, 472 sqe->__pad2[0], 473 sqe->__pad2[1], 474 sqe->__pad2[2] 475 ); 476 } 477 478 479 // Get now the data we definetely need 480 volatile __u32 * const tail = ring.submit_q.tail; 481 const __u32 mask = *ring.submit_q.mask; 482 483 // There are 2 submission schemes, check which one we are using 484 if( ring.poller_submits ) { 485 // If the poller thread submits, then we just need to add this to the ready array 486 __submit_to_ready_array( ring, idx, mask ); 487 488 post( ctx->thrd.sem ); 489 490 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() ); 491 } 492 else if( ring.eager_submits ) { 493 __u32 picked = __submit_to_ready_array( ring, idx, mask ); 494 495 #if defined(LEADER_LOCK) 496 if( !try_lock(ring.submit_q.submit_lock) ) { 497 __STATS__( false, 498 io.submit_q.helped += 1; 499 ) 500 return; 501 } 502 /* paranoid */ verify( ! __preemption_enabled() ); 503 __STATS__( true, 504 io.submit_q.leader += 1; 505 ) 506 #else 507 for() { 508 yield(); 509 510 if( try_lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2) ) { 511 __STATS__( false, 512 io.submit_q.leader += 1; 513 ) 514 break; 515 } 516 517 // If some one else collected our index, we are done 518 #warning ABA problem 519 if( ring.submit_q.ready[picked] != idx ) { 520 __STATS__( false, 521 io.submit_q.helped += 1; 522 ) 523 return; 524 } 525 526 __STATS__( false, 527 io.submit_q.busy += 1; 528 ) 529 } 530 #endif 531 532 // We got the lock 533 // Collect the submissions 534 unsigned to_submit = __collect_submitions( ring ); 535 536 // Actually submit 537 int ret = __io_uring_enter( ring, to_submit, false ); 538 539 #if defined(LEADER_LOCK) 540 /* paranoid */ verify( ! __preemption_enabled() ); 541 next(ring.submit_q.submit_lock); 542 #else 543 unlock(ring.submit_q.submit_lock); 544 #endif 545 if( ret < 0 ) { 546 return; 547 } 548 549 // Release the consumed SQEs 550 __release_consumed_submission( ring ); 551 552 // update statistics 553 __STATS__( false, 554 io.submit_q.submit_avg.rdy += to_submit; 555 io.submit_q.submit_avg.csm += ret; 556 io.submit_q.submit_avg.cnt += 1; 557 ) 558 559 __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() ); 560 } 561 else 562 { 563 // get mutual exclusion 564 #if defined(LEADER_LOCK) 565 while(!try_lock(ring.submit_q.submit_lock)); 566 #else 567 lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2); 568 #endif 569 570 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64, 571 /* paranoid */ "index %u already reclaimed\n" 572 /* paranoid */ "head %u, prev %u, tail %u\n" 573 /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n", 574 /* paranoid */ idx, 575 /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail 576 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ] 577 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ] 578 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ] 579 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ] 580 /* paranoid */ ); 581 582 // Append to the list of ready entries 583 584 /* paranoid */ verify( idx <= mask ); 585 ring.submit_q.array[ (*tail) & mask ] = idx; 586 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 587 588 // Submit however, many entries need to be submitted 589 int ret = __io_uring_enter( ring, 1, false ); 590 if( ret < 0 ) { 591 switch((int)errno) { 592 default: 593 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 594 } 595 } 596 597 /* paranoid */ verify(ret == 1); 598 599 // update statistics 600 __STATS__( false, 601 io.submit_q.submit_avg.csm += 1; 602 io.submit_q.submit_avg.cnt += 1; 603 ) 604 605 { 606 __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head; 607 __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ]; 608 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx]; 609 610 __cfadbg_print_safe( io, 611 "Kernel I/O : last submitted is %u (%p)\n" 612 " data: %p\n" 613 " opcode: %s\n" 614 " fd: %d\n" 615 " flags: %d\n" 616 " prio: %d\n" 617 " off: %p\n" 618 " addr: %p\n" 619 " len: %d\n" 620 " other flags: %d\n" 621 " splice fd: %d\n" 622 " pad[0]: %llu\n" 623 " pad[1]: %llu\n" 624 " pad[2]: %llu\n", 625 last_idx, sqe, 626 (void*)sqe->user_data, 627 opcodes[sqe->opcode], 628 sqe->fd, 629 sqe->flags, 630 sqe->ioprio, 631 sqe->off, 632 sqe->addr, 633 sqe->len, 634 sqe->accept_flags, 635 sqe->splice_fd_in, 636 sqe->__pad2[0], 637 sqe->__pad2[1], 638 sqe->__pad2[2] 639 ); 640 } 641 642 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 643 // Release the consumed SQEs 644 __release_consumed_submission( ring ); 645 // ring.submit_q.sqes[idx].user_data = 3ul64; 646 647 #if defined(LEADER_LOCK) 648 next(ring.submit_q.submit_lock); 649 #else 650 unlock(ring.submit_q.submit_lock); 651 #endif 652 653 __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() ); 654 } 655 } 656 657 // #define PARTIAL_SUBMIT 32 658 659 // go through the list of submissions in the ready array and moved them into 660 // the ring's submit queue 661 static unsigned __collect_submitions( struct __io_data & ring ) { 662 /* paranoid */ verify( ring.submit_q.ready != 0p ); 663 /* paranoid */ verify( ring.submit_q.ready_cnt > 0 ); 664 665 unsigned to_submit = 0; 666 __u32 tail = *ring.submit_q.tail; 667 const __u32 mask = *ring.submit_q.mask; 668 #if defined(PARTIAL_SUBMIT) 669 #if defined(LEADER_LOCK) 670 #error PARTIAL_SUBMIT and LEADER_LOCK cannot co-exist 671 #endif 672 const __u32 cnt = ring.submit_q.ready_cnt > PARTIAL_SUBMIT ? PARTIAL_SUBMIT : ring.submit_q.ready_cnt; 673 const __u32 offset = ring.submit_q.prev_ready; 674 ring.submit_q.prev_ready += cnt; 675 #else 676 const __u32 cnt = ring.submit_q.ready_cnt; 677 const __u32 offset = 0; 678 #endif 679 680 // Go through the list of ready submissions 681 for( c; cnt ) { 682 __u32 i = (offset + c) % ring.submit_q.ready_cnt; 683 684 // replace any submission with the sentinel, to consume it. 685 __u32 idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); 686 687 // If it was already the sentinel, then we are done 688 if( idx == -1ul32 ) continue; 689 690 // If we got a real submission, append it to the list 691 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask; 692 to_submit++; 693 } 694 695 // Increment the tail based on how many we are ready to submit 696 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST); 697 698 return to_submit; 699 } 700 312 __submit(ctx, idxs, have, lazy); 313 314 // Mark the instance as no longer in-use, re-enable interrupts and return 315 __STATS__( true, io.submit.fast += 1; ) 316 enable_interrupts( __cfaabi_dbg_ctx ); 317 318 __cfadbg_print_safe(io, "Kernel I/O : submitted on fast path\n"); 319 return; 320 } 321 322 // Fast path failed, fallback on arbitration 323 __STATS__( true, io.submit.slow += 1; ) 324 enable_interrupts( __cfaabi_dbg_ctx ); 325 326 __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n"); 327 328 __ioarbiter_submit(*inctx->arbiter, inctx, idxs, have, lazy); 329 } 330 331 //============================================================================================= 332 // Flushing 701 333 // Go through the ring's submit queue and release everything that has already been consumed 702 334 // by io_uring 703 static __u32 __release_consumed_submission( struct __io_data & ring ) { 704 const __u32 smask = *ring.submit_q.mask; 705 706 // We need to get the lock to copy the old head and new head 707 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0; 335 // This cannot be done by multiple threads 336 static __u32 __release_sqes( struct $io_context & ctx ) { 337 const __u32 mask = *ctx.sq.mask; 338 708 339 __attribute__((unused)) 709 __u32 ctail = * ring.submit_q.tail;// get the current tail of the queue710 __u32 chead = * ring.submit_q.head;// get the current head of the queue711 __u32 phead = ring.submit_q.prev_head;// get the head the last time we were here712 ring.submit_q.prev_head = chead; // note up to were we processed 713 unlock(ring.submit_q.release_lock);340 __u32 ctail = *ctx.sq.kring.tail; // get the current tail of the queue 341 __u32 chead = *ctx.sq.kring.head; // get the current head of the queue 342 __u32 phead = ctx.sq.kring.released; // get the head the last time we were here 343 344 __u32 ftail = ctx.sq.free_ring.tail; // get the current tail of the queue 714 345 715 346 // the 3 fields are organized like this diagram … … 730 361 __u32 count = chead - phead; 731 362 363 if(count == 0) { 364 return 0; 365 } 366 732 367 // We acquired an previous-head/current-head range 733 368 // go through the range and release the sqes 734 369 for( i; count ) { 735 __u32 idx = ring.submit_q.array[ (phead + i) & smask ]; 736 737 /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data ); 738 __clean( &ring.submit_q.sqes[ idx ] ); 739 } 370 __cfadbg_print_safe(io, "Kernel I/O : release loop\n"); 371 __u32 idx = ctx.sq.kring.array[ (phead + i) & mask ]; 372 ctx.sq.free_ring.array[ (ftail + i) & mask ] = idx; 373 } 374 375 ctx.sq.kring.released = chead; // note up to were we processed 376 __atomic_store_n(&ctx.sq.free_ring.tail, ftail + count, __ATOMIC_SEQ_CST); 377 378 __ioarbiter_notify(ctx); 379 740 380 return count; 741 381 } 742 382 743 void __sqe_clean( volatile struct io_uring_sqe * sqe ) { 744 __clean( sqe ); 745 } 746 747 static inline void __clean( volatile struct io_uring_sqe * sqe ) { 748 // If we are in debug mode, thrash the fields to make sure we catch reclamation errors 749 __cfaabi_dbg_debug_do( 750 memset(sqe, 0xde, sizeof(*sqe)); 751 sqe->opcode = (sizeof(opcodes) / sizeof(const char *)) - 1; 752 ); 753 754 // Mark the entry as unused 755 __atomic_store_n(&sqe->user_data, 3ul64, __ATOMIC_SEQ_CST); 383 //============================================================================================= 384 // I/O Arbiter 385 //============================================================================================= 386 static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor * proc, __u32 idxs[], __u32 want ) { 387 __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n"); 388 389 __STATS__( false, io.alloc.block += 1; ) 390 391 // No one has any resources left, wait for something to finish 392 // Mark as pending 393 __atomic_store_n( &this.pending.flag, true, __ATOMIC_SEQ_CST ); 394 395 // Wait for our turn to submit 396 wait( this.pending.blocked, want ); 397 398 __attribute((unused)) bool ret = 399 __alloc( this.pending.ctx, idxs, want); 400 /* paranoid */ verify( ret ); 401 402 return this.pending.ctx; 403 404 } 405 406 static void __ioarbiter_notify( $io_arbiter & mutex this, $io_context * ctx ) { 407 /* paranoid */ verify( !is_empty(this.pending.blocked) ); 408 this.pending.ctx = ctx; 409 410 while( !is_empty(this.pending.blocked) ) { 411 __cfadbg_print_safe(io, "Kernel I/O : notifying\n"); 412 __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head; 413 __u32 want = front( this.pending.blocked ); 414 415 if( have > want ) return; 416 417 signal_block( this.pending.blocked ); 418 } 419 420 this.pending.flag = false; 421 } 422 423 static void __ioarbiter_notify( $io_context & ctx ) { 424 if(__atomic_load_n( &ctx.arbiter->pending.flag, __ATOMIC_SEQ_CST)) { 425 __ioarbiter_notify( *ctx.arbiter, &ctx ); 426 } 427 } 428 429 // Simply append to the pending 430 static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * ctx, __u32 idxs[], __u32 have, bool lazy ) { 431 __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd); 432 433 /* paranoid */ verify( &this == ctx->arbiter ); 434 435 // Mark as pending 436 __atomic_store_n( &ctx->ext_sq.empty, false, __ATOMIC_SEQ_CST ); 437 438 __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have); 439 440 // Wait for our turn to submit 441 wait( ctx->ext_sq.blocked ); 442 443 // Submit our indexes 444 __submit(ctx, idxs, have, lazy); 445 446 __cfadbg_print_safe(io, "Kernel I/O : %u submitted from arbiter\n", have); 447 } 448 449 static void __ioarbiter_flush( $io_arbiter & mutex this, $io_context * ctx ) { 450 /* paranoid */ verify( &this == ctx->arbiter ); 451 452 __STATS__( false, io.flush.external += 1; ) 453 454 __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n"); 455 456 condition & blcked = ctx->ext_sq.blocked; 457 /* paranoid */ verify( ctx->ext_sq.empty == is_empty( blcked ) ); 458 while(!is_empty( blcked )) { 459 signal_block( blcked ); 460 } 461 462 ctx->ext_sq.empty = true; 756 463 } 757 464 #endif
Note:
See TracChangeset
for help on using the changeset viewer.