Changes in libcfa/src/concurrency/io.cfa [2fab24e3:e9c0b4c]
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r2fab24e3 re9c0b4c 32 32 extern "C" { 33 33 #include <sys/syscall.h> 34 #include <sys/eventfd.h> 34 35 35 36 #include <linux/io_uring.h> … … 39 40 #include "kernel.hfa" 40 41 #include "kernel/fwd.hfa" 42 #include "kernel_private.hfa" 41 43 #include "io/types.hfa" 42 44 … … 79 81 }; 80 82 81 // returns true of acquired as leader or second leader 82 static inline bool try_lock( __leaderlock_t & this ) { 83 const uintptr_t thrd = 1z | (uintptr_t)active_thread(); 84 bool block; 85 disable_interrupts(); 86 for() { 87 struct $thread * expected = this.value; 88 if( 1p != expected && 0p != expected ) { 89 /* paranoid */ verify( thrd != (uintptr_t)expected ); // We better not already be the next leader 90 enable_interrupts( __cfaabi_dbg_ctx ); 91 return false; 92 } 93 struct $thread * desired; 94 if( 0p == expected ) { 95 // If the lock isn't locked acquire it, no need to block 96 desired = 1p; 97 block = false; 98 } 99 else { 100 // If the lock is already locked try becomming the next leader 101 desired = (struct $thread *)thrd; 102 block = true; 103 } 104 if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break; 105 } 106 if( block ) { 107 enable_interrupts( __cfaabi_dbg_ctx ); 108 park(); 109 disable_interrupts(); 110 } 111 return true; 112 } 113 114 static inline bool next( __leaderlock_t & this ) { 83 static $io_context * __ioarbiter_allocate( $io_arbiter & this, __u32 idxs[], __u32 want ); 84 static void __ioarbiter_submit( $io_context * , __u32 idxs[], __u32 have, bool lazy ); 85 static void __ioarbiter_flush ( $io_context & ); 86 static inline void __ioarbiter_notify( $io_context & ctx ); 87 //============================================================================================= 88 // I/O Polling 89 //============================================================================================= 90 static inline unsigned __flush( struct $io_context & ); 91 static inline __u32 __release_sqes( struct $io_context & ); 92 extern void __kernel_unpark( $thread * thrd ); 93 94 bool __cfa_io_drain( processor * proc ) { 115 95 /* paranoid */ verify( ! __preemption_enabled() ); 116 struct $thread * nextt; 117 for() { 118 struct $thread * expected = this.value; 119 /* paranoid */ verify( (1 & (uintptr_t)expected) == 1 ); // The lock better be locked 120 121 struct $thread * desired; 122 if( 1p == expected ) { 123 // No next leader, just unlock 124 desired = 0p; 125 nextt = 0p; 126 } 127 else { 128 // There is a next leader, remove but keep locked 129 desired = 1p; 130 nextt = (struct $thread *)(~1z & (uintptr_t)expected); 131 } 132 if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break; 133 } 134 135 if(nextt) { 136 unpark( nextt ); 137 enable_interrupts( __cfaabi_dbg_ctx ); 138 return true; 139 } 140 enable_interrupts( __cfaabi_dbg_ctx ); 141 return false; 142 } 143 144 //============================================================================================= 145 // I/O Syscall 146 //============================================================================================= 147 static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) { 148 bool need_sys_to_submit = false; 149 bool need_sys_to_complete = false; 150 unsigned flags = 0; 151 152 TO_SUBMIT: 153 if( to_submit > 0 ) { 154 if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 155 need_sys_to_submit = true; 156 break TO_SUBMIT; 157 } 158 if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) { 159 need_sys_to_submit = true; 160 flags |= IORING_ENTER_SQ_WAKEUP; 161 } 162 } 163 164 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 165 flags |= IORING_ENTER_GETEVENTS; 166 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) { 167 need_sys_to_complete = true; 168 } 169 } 170 171 int ret = 0; 172 if( need_sys_to_submit || need_sys_to_complete ) { 173 __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags); 174 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8); 175 __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING %d returned %d\n", ring.fd, ret); 176 177 if( ret < 0 ) { 178 switch((int)errno) { 179 case EAGAIN: 180 case EINTR: 181 case EBUSY: 182 ret = -1; 183 break; 184 default: 185 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 186 } 187 } 188 } 189 190 // Memory barrier 191 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 192 return ret; 193 } 194 195 //============================================================================================= 196 // I/O Polling 197 //============================================================================================= 198 static unsigned __collect_submitions( struct __io_data & ring ); 199 static __u32 __release_consumed_submission( struct __io_data & ring ); 200 static inline void __clean( volatile struct io_uring_sqe * sqe ); 201 202 // Process a single completion message from the io_uring 203 // This is NOT thread-safe 204 static inline void process( volatile struct io_uring_cqe & cqe ) { 205 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 206 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 207 208 fulfil( *future, cqe.res ); 209 } 210 211 static [int, bool] __drain_io( & struct __io_data ring ) { 212 /* paranoid */ verify( ! __preemption_enabled() ); 213 214 unsigned to_submit = 0; 215 if( ring.poller_submits ) { 216 // If the poller thread also submits, then we need to aggregate the submissions which are ready 217 to_submit = __collect_submitions( ring ); 218 } 219 220 int ret = __io_uring_enter(ring, to_submit, true); 221 if( ret < 0 ) { 222 return [0, true]; 223 } 224 225 // update statistics 226 if (to_submit > 0) { 227 __STATS__( true, 228 if( to_submit > 0 ) { 229 io.submit_q.submit_avg.rdy += to_submit; 230 io.submit_q.submit_avg.csm += ret; 231 io.submit_q.submit_avg.cnt += 1; 232 } 233 ) 234 } 235 236 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 237 238 // Release the consumed SQEs 239 __release_consumed_submission( ring ); 96 /* paranoid */ verify( ready_schedule_islocked() ); 97 /* paranoid */ verify( proc ); 98 /* paranoid */ verify( proc->io.ctx ); 240 99 241 100 // Drain the queue 242 unsigned head = *ring.completion_q.head; 243 unsigned tail = *ring.completion_q.tail; 244 const __u32 mask = *ring.completion_q.mask; 245 246 // Nothing was new return 0 247 if (head == tail) { 248 return [0, to_submit > 0]; 249 } 101 $io_context * ctx = proc->io.ctx; 102 unsigned head = *ctx->cq.head; 103 unsigned tail = *ctx->cq.tail; 104 const __u32 mask = *ctx->cq.mask; 250 105 251 106 __u32 count = tail - head; 252 /* paranoid */ verify( count != 0 ); 107 __STATS__( false, io.calls.drain++; io.calls.completed += count; ) 108 109 if(count == 0) return false; 110 253 111 for(i; count) { 254 112 unsigned idx = (head + i) & mask; 255 volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];113 volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx]; 256 114 257 115 /* paranoid */ verify(&cqe); 258 116 259 process( cqe ); 260 } 117 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 118 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 119 120 __kernel_unpark( fulfil( *future, cqe.res, false ) ); 121 } 122 123 __cfadbg_print_safe(io, "Kernel I/O : %u completed\n", count); 261 124 262 125 // Mark to the kernel that the cqe has been seen 263 126 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 264 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST ); 265 266 return [count, count > 0 || to_submit > 0]; 267 } 268 269 void main( $io_ctx_thread & this ) { 270 __ioctx_register( this ); 271 272 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this); 273 274 const int reset_cnt = 5; 275 int reset = reset_cnt; 276 // Then loop until we need to start 277 LOOP: 278 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) { 279 // Drain the io 280 int count; 281 bool again; 282 disable_interrupts(); 283 [count, again] = __drain_io( *this.ring ); 284 285 if(!again) reset--; 286 127 __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST ); 128 129 /* paranoid */ verify( ready_schedule_islocked() ); 130 /* paranoid */ verify( ! __preemption_enabled() ); 131 132 return true; 133 } 134 135 void __cfa_io_flush( processor * proc ) { 136 /* paranoid */ verify( ! __preemption_enabled() ); 137 /* paranoid */ verify( proc ); 138 /* paranoid */ verify( proc->io.ctx ); 139 140 $io_context & ctx = *proc->io.ctx; 141 142 __ioarbiter_flush( ctx ); 143 144 __STATS__( true, io.calls.flush++; ) 145 int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, 0, 0, (sigset_t *)0p, _NSIG / 8); 146 if( ret < 0 ) { 147 switch((int)errno) { 148 case EAGAIN: 149 case EINTR: 150 case EBUSY: 287 151 // Update statistics 288 __STATS__( true, 289 io.complete_q.completed_avg.val += count; 290 io.complete_q.completed_avg.cnt += 1; 291 ) 292 enable_interrupts( __cfaabi_dbg_ctx ); 293 294 // If we got something, just yield and check again 295 if(reset > 1) { 296 yield(); 297 continue LOOP; 152 __STATS__( false, io.calls.errors.busy ++; ) 153 return; 154 default: 155 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 298 156 } 299 300 // We alread failed to find completed entries a few time. 301 if(reset == 1) { 302 // Rearm the context so it can block 303 // but don't block right away 304 // we need to retry one last time in case 305 // something completed *just now* 306 __ioctx_prepare_block( this ); 307 continue LOOP; 308 } 309 310 __STATS__( false, 311 io.complete_q.blocks += 1; 312 ) 313 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this); 314 315 // block this thread 316 wait( this.sem ); 317 318 // restore counter 319 reset = reset_cnt; 320 } 321 322 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this); 323 324 __ioctx_unregister( this ); 157 } 158 159 __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd); 160 __STATS__( true, io.calls.submitted += ret; ) 161 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 162 /* paranoid */ verify( ctx.sq.to_submit >= ret ); 163 164 ctx.sq.to_submit -= ret; 165 166 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 167 168 // Release the consumed SQEs 169 __release_sqes( ctx ); 170 171 /* paranoid */ verify( ! __preemption_enabled() ); 172 173 ctx.proc->io.pending = false; 325 174 } 326 175 … … 344 193 // head and tail must be fully filled and shouldn't ever be touched again. 345 194 // 195 //============================================================================================= 196 // Allocation 197 // for user's convenience fill the sqes from the indexes 198 static inline void __fill(struct io_uring_sqe * out_sqes[], __u32 want, __u32 idxs[], struct $io_context * ctx) { 199 struct io_uring_sqe * sqes = ctx->sq.sqes; 200 for(i; want) { 201 __cfadbg_print_safe(io, "Kernel I/O : filling loop\n"); 202 out_sqes[i] = &sqes[idxs[i]]; 203 } 204 } 205 206 // Try to directly allocate from the a given context 207 // Not thread-safe 208 static inline bool __alloc(struct $io_context * ctx, __u32 idxs[], __u32 want) { 209 __sub_ring_t & sq = ctx->sq; 210 const __u32 mask = *sq.mask; 211 __u32 fhead = sq.free_ring.head; // get the current head of the queue 212 __u32 ftail = sq.free_ring.tail; // get the current tail of the queue 213 214 // If we don't have enough sqes, fail 215 if((ftail - fhead) < want) { return false; } 216 217 // copy all the indexes we want from the available list 218 for(i; want) { 219 __cfadbg_print_safe(io, "Kernel I/O : allocating loop\n"); 220 idxs[i] = sq.free_ring.array[(fhead + i) & mask]; 221 } 222 223 // Advance the head to mark the indexes as consumed 224 __atomic_store_n(&sq.free_ring.head, fhead + want, __ATOMIC_RELEASE); 225 226 // return success 227 return true; 228 } 346 229 347 230 // Allocate an submit queue entry. … … 350 233 // for convenience, return both the index and the pointer to the sqe 351 234 // sqe == &sqes[idx] 352 [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) { 353 /* paranoid */ verify( data != 0 ); 354 355 // Prepare the data we need 356 __attribute((unused)) int len = 0; 357 __attribute((unused)) int block = 0; 358 __u32 cnt = *ring.submit_q.num; 359 __u32 mask = *ring.submit_q.mask; 360 361 __u32 off = thread_rand(); 362 363 // Loop around looking for an available spot 364 for() { 365 // Look through the list starting at some offset 366 for(i; cnt) { 367 __u64 expected = 3; 368 __u32 idx = (i + off) & mask; // Get an index from a random 369 volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 370 volatile __u64 * udata = &sqe->user_data; 371 372 // Allocate the entry by CASing the user_data field from 0 to the future address 373 if( *udata == expected && 374 __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) 375 { 376 // update statistics 377 __STATS__( false, 378 io.submit_q.alloc_avg.val += len; 379 io.submit_q.alloc_avg.block += block; 380 io.submit_q.alloc_avg.cnt += 1; 381 ) 382 383 // debug log 384 __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data ); 385 386 // Success return the data 387 return [sqe, idx]; 388 } 389 verify(expected != data); 390 391 // This one was used 392 len ++; 393 } 394 395 block++; 396 397 yield(); 398 } 399 } 400 401 static inline __u32 __submit_to_ready_array( struct __io_data & ring, __u32 idx, const __u32 mask ) { 402 /* paranoid */ verify( idx <= mask ); 403 /* paranoid */ verify( idx != -1ul32 ); 404 405 // We need to find a spot in the ready array 406 __attribute((unused)) int len = 0; 407 __attribute((unused)) int block = 0; 408 __u32 ready_mask = ring.submit_q.ready_cnt - 1; 409 410 __u32 off = thread_rand(); 411 412 __u32 picked; 413 LOOKING: for() { 414 for(i; ring.submit_q.ready_cnt) { 415 picked = (i + off) & ready_mask; 416 __u32 expected = -1ul32; 417 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) { 418 break LOOKING; 419 } 420 verify(expected != idx); 421 422 len ++; 423 } 424 425 block++; 426 427 __u32 released = __release_consumed_submission( ring ); 428 if( released == 0 ) { 429 yield(); 430 } 431 } 432 433 // update statistics 434 __STATS__( false, 435 io.submit_q.look_avg.val += len; 436 io.submit_q.look_avg.block += block; 437 io.submit_q.look_avg.cnt += 1; 438 ) 439 440 return picked; 441 } 442 443 void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) { 444 __io_data & ring = *ctx->thrd.ring; 445 235 struct $io_context * cfa_io_allocate(struct io_uring_sqe * sqes[], __u32 idxs[], __u32 want) { 236 __cfadbg_print_safe(io, "Kernel I/O : attempting to allocate %u\n", want); 237 238 disable_interrupts(); 239 processor * proc = __cfaabi_tls.this_processor; 240 $io_context * ctx = proc->io.ctx; 241 /* paranoid */ verify( __cfaabi_tls.this_processor ); 242 /* paranoid */ verify( ctx ); 243 244 __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n"); 245 246 // We can proceed to the fast path 247 if( __alloc(ctx, idxs, want) ) { 248 // Allocation was successful 249 __STATS__( true, io.alloc.fast += 1; ) 250 enable_interrupts(); 251 252 __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful from ring %d\n", ctx->fd); 253 254 __fill( sqes, want, idxs, ctx ); 255 return ctx; 256 } 257 // The fast path failed, fallback 258 __STATS__( true, io.alloc.fail += 1; ) 259 260 // Fast path failed, fallback on arbitration 261 __STATS__( true, io.alloc.slow += 1; ) 262 enable_interrupts(); 263 264 $io_arbiter * ioarb = proc->cltr->io.arbiter; 265 /* paranoid */ verify( ioarb ); 266 267 __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n"); 268 269 struct $io_context * ret = __ioarbiter_allocate(*ioarb, idxs, want); 270 271 __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd); 272 273 __fill( sqes, want, idxs,ret ); 274 return ret; 275 } 276 277 278 //============================================================================================= 279 // submission 280 static inline void __submit( struct $io_context * ctx, __u32 idxs[], __u32 have, bool lazy) { 281 // We can proceed to the fast path 282 // Get the right objects 283 __sub_ring_t & sq = ctx->sq; 284 const __u32 mask = *sq.mask; 285 __u32 tail = *sq.kring.tail; 286 287 // Add the sqes to the array 288 for( i; have ) { 289 __cfadbg_print_safe(io, "Kernel I/O : __submit loop\n"); 290 sq.kring.array[ (tail + i) & mask ] = idxs[i]; 291 } 292 293 // Make the sqes visible to the submitter 294 __atomic_store_n(sq.kring.tail, tail + have, __ATOMIC_RELEASE); 295 sq.to_submit++; 296 297 ctx->proc->io.pending = true; 298 ctx->proc->io.dirty = true; 299 if(sq.to_submit > 30 || !lazy) { 300 __cfa_io_flush( ctx->proc ); 301 } 302 } 303 304 void cfa_io_submit( struct $io_context * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) { 305 __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager"); 306 307 disable_interrupts(); 308 processor * proc = __cfaabi_tls.this_processor; 309 $io_context * ctx = proc->io.ctx; 310 /* paranoid */ verify( __cfaabi_tls.this_processor ); 311 /* paranoid */ verify( ctx ); 312 313 // Can we proceed to the fast path 314 if( ctx == inctx ) // We have the right instance? 446 315 { 447 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 448 __cfadbg_print_safe( io, 449 "Kernel I/O : submitting %u (%p) for %p\n" 450 " data: %p\n" 451 " opcode: %s\n" 452 " fd: %d\n" 453 " flags: %d\n" 454 " prio: %d\n" 455 " off: %p\n" 456 " addr: %p\n" 457 " len: %d\n" 458 " other flags: %d\n" 459 " splice fd: %d\n" 460 " pad[0]: %llu\n" 461 " pad[1]: %llu\n" 462 " pad[2]: %llu\n", 463 idx, sqe, 464 active_thread(), 465 (void*)sqe->user_data, 466 opcodes[sqe->opcode], 467 sqe->fd, 468 sqe->flags, 469 sqe->ioprio, 470 (void*)sqe->off, 471 (void*)sqe->addr, 472 sqe->len, 473 sqe->accept_flags, 474 sqe->splice_fd_in, 475 sqe->__pad2[0], 476 sqe->__pad2[1], 477 sqe->__pad2[2] 478 ); 479 } 480 481 482 // Get now the data we definetely need 483 volatile __u32 * const tail = ring.submit_q.tail; 484 const __u32 mask = *ring.submit_q.mask; 485 486 // There are 2 submission schemes, check which one we are using 487 if( ring.poller_submits ) { 488 // If the poller thread submits, then we just need to add this to the ready array 489 __submit_to_ready_array( ring, idx, mask ); 490 491 post( ctx->thrd.sem ); 492 493 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() ); 494 } 495 else if( ring.eager_submits ) { 496 __attribute__((unused)) __u32 picked = __submit_to_ready_array( ring, idx, mask ); 497 498 #if defined(LEADER_LOCK) 499 if( !try_lock(ring.submit_q.submit_lock) ) { 500 __STATS__( false, 501 io.submit_q.helped += 1; 502 ) 503 return; 504 } 505 /* paranoid */ verify( ! __preemption_enabled() ); 506 __STATS__( true, 507 io.submit_q.leader += 1; 508 ) 509 #else 510 for() { 511 yield(); 512 513 if( try_lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2) ) { 514 __STATS__( false, 515 io.submit_q.leader += 1; 516 ) 517 break; 518 } 519 520 // If some one else collected our index, we are done 521 #warning ABA problem 522 if( ring.submit_q.ready[picked] != idx ) { 523 __STATS__( false, 524 io.submit_q.helped += 1; 525 ) 526 return; 527 } 528 529 __STATS__( false, 530 io.submit_q.busy += 1; 531 ) 532 } 533 #endif 534 535 // We got the lock 536 // Collect the submissions 537 unsigned to_submit = __collect_submitions( ring ); 538 539 // Actually submit 540 int ret = __io_uring_enter( ring, to_submit, false ); 541 542 #if defined(LEADER_LOCK) 543 /* paranoid */ verify( ! __preemption_enabled() ); 544 next(ring.submit_q.submit_lock); 545 #else 546 unlock(ring.submit_q.submit_lock); 547 #endif 548 if( ret < 0 ) { 549 return; 550 } 551 552 // Release the consumed SQEs 553 __release_consumed_submission( ring ); 554 555 // update statistics 556 __STATS__( false, 557 io.submit_q.submit_avg.rdy += to_submit; 558 io.submit_q.submit_avg.csm += ret; 559 io.submit_q.submit_avg.cnt += 1; 560 ) 561 562 __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() ); 563 } 564 else 565 { 566 // get mutual exclusion 567 #if defined(LEADER_LOCK) 568 while(!try_lock(ring.submit_q.submit_lock)); 569 #else 570 lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2); 571 #endif 572 573 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64, 574 /* paranoid */ "index %u already reclaimed\n" 575 /* paranoid */ "head %u, prev %u, tail %u\n" 576 /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n", 577 /* paranoid */ idx, 578 /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail 579 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ] 580 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ] 581 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ] 582 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ] 583 /* paranoid */ ); 584 585 // Append to the list of ready entries 586 587 /* paranoid */ verify( idx <= mask ); 588 ring.submit_q.array[ (*tail) & mask ] = idx; 589 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 590 591 // Submit however, many entries need to be submitted 592 int ret = __io_uring_enter( ring, 1, false ); 593 if( ret < 0 ) { 594 switch((int)errno) { 595 default: 596 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 597 } 598 } 599 600 /* paranoid */ verify(ret == 1); 601 602 // update statistics 603 __STATS__( false, 604 io.submit_q.submit_avg.csm += 1; 605 io.submit_q.submit_avg.cnt += 1; 606 ) 607 608 { 609 __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head; 610 __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ]; 611 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx]; 612 613 __cfadbg_print_safe( io, 614 "Kernel I/O : last submitted is %u (%p)\n" 615 " data: %p\n" 616 " opcode: %s\n" 617 " fd: %d\n" 618 " flags: %d\n" 619 " prio: %d\n" 620 " off: %p\n" 621 " addr: %p\n" 622 " len: %d\n" 623 " other flags: %d\n" 624 " splice fd: %d\n" 625 " pad[0]: %llu\n" 626 " pad[1]: %llu\n" 627 " pad[2]: %llu\n", 628 last_idx, sqe, 629 (void*)sqe->user_data, 630 opcodes[sqe->opcode], 631 sqe->fd, 632 sqe->flags, 633 sqe->ioprio, 634 (void*)sqe->off, 635 (void*)sqe->addr, 636 sqe->len, 637 sqe->accept_flags, 638 sqe->splice_fd_in, 639 sqe->__pad2[0], 640 sqe->__pad2[1], 641 sqe->__pad2[2] 642 ); 643 } 644 645 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 646 // Release the consumed SQEs 647 648 __release_consumed_submission( ring ); 649 // ring.submit_q.sqes[idx].user_data = 3ul64; 650 651 #if defined(LEADER_LOCK) 652 next(ring.submit_q.submit_lock); 653 #else 654 unlock(ring.submit_q.submit_lock); 655 #endif 656 657 __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() ); 658 } 659 } 660 661 // #define PARTIAL_SUBMIT 32 662 663 // go through the list of submissions in the ready array and moved them into 664 // the ring's submit queue 665 static unsigned __collect_submitions( struct __io_data & ring ) { 666 /* paranoid */ verify( ring.submit_q.ready != 0p ); 667 /* paranoid */ verify( ring.submit_q.ready_cnt > 0 ); 668 669 unsigned to_submit = 0; 670 __u32 tail = *ring.submit_q.tail; 671 const __u32 mask = *ring.submit_q.mask; 672 #if defined(PARTIAL_SUBMIT) 673 #if defined(LEADER_LOCK) 674 #error PARTIAL_SUBMIT and LEADER_LOCK cannot co-exist 675 #endif 676 const __u32 cnt = ring.submit_q.ready_cnt > PARTIAL_SUBMIT ? PARTIAL_SUBMIT : ring.submit_q.ready_cnt; 677 const __u32 offset = ring.submit_q.prev_ready; 678 ring.submit_q.prev_ready += cnt; 679 #else 680 const __u32 cnt = ring.submit_q.ready_cnt; 681 const __u32 offset = 0; 682 #endif 683 684 // Go through the list of ready submissions 685 for( c; cnt ) { 686 __u32 i = (offset + c) % ring.submit_q.ready_cnt; 687 688 // replace any submission with the sentinel, to consume it. 689 __u32 idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); 690 691 // If it was already the sentinel, then we are done 692 if( idx == -1ul32 ) continue; 693 694 // If we got a real submission, append it to the list 695 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask; 696 to_submit++; 697 } 698 699 // Increment the tail based on how many we are ready to submit 700 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST); 701 702 return to_submit; 703 } 704 316 __submit(ctx, idxs, have, lazy); 317 318 // Mark the instance as no longer in-use, re-enable interrupts and return 319 __STATS__( true, io.submit.fast += 1; ) 320 enable_interrupts(); 321 322 __cfadbg_print_safe(io, "Kernel I/O : submitted on fast path\n"); 323 return; 324 } 325 326 // Fast path failed, fallback on arbitration 327 __STATS__( true, io.submit.slow += 1; ) 328 enable_interrupts(); 329 330 __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n"); 331 332 __ioarbiter_submit(inctx, idxs, have, lazy); 333 } 334 335 //============================================================================================= 336 // Flushing 705 337 // Go through the ring's submit queue and release everything that has already been consumed 706 338 // by io_uring 707 static __u32 __release_consumed_submission( struct __io_data & ring ) { 708 const __u32 smask = *ring.submit_q.mask; 709 710 // We need to get the lock to copy the old head and new head 711 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0; 339 // This cannot be done by multiple threads 340 static __u32 __release_sqes( struct $io_context & ctx ) { 341 const __u32 mask = *ctx.sq.mask; 342 712 343 __attribute__((unused)) 713 __u32 ctail = * ring.submit_q.tail;// get the current tail of the queue714 __u32 chead = * ring.submit_q.head;// get the current head of the queue715 __u32 phead = ring.submit_q.prev_head;// get the head the last time we were here716 ring.submit_q.prev_head = chead; // note up to were we processed 717 unlock(ring.submit_q.release_lock);344 __u32 ctail = *ctx.sq.kring.tail; // get the current tail of the queue 345 __u32 chead = *ctx.sq.kring.head; // get the current head of the queue 346 __u32 phead = ctx.sq.kring.released; // get the head the last time we were here 347 348 __u32 ftail = ctx.sq.free_ring.tail; // get the current tail of the queue 718 349 719 350 // the 3 fields are organized like this diagram … … 734 365 __u32 count = chead - phead; 735 366 367 if(count == 0) { 368 return 0; 369 } 370 736 371 // We acquired an previous-head/current-head range 737 372 // go through the range and release the sqes 738 373 for( i; count ) { 739 __u32 idx = ring.submit_q.array[ (phead + i) & smask ]; 740 741 /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data ); 742 __clean( &ring.submit_q.sqes[ idx ] ); 743 } 374 __cfadbg_print_safe(io, "Kernel I/O : release loop\n"); 375 __u32 idx = ctx.sq.kring.array[ (phead + i) & mask ]; 376 ctx.sq.free_ring.array[ (ftail + i) & mask ] = idx; 377 } 378 379 ctx.sq.kring.released = chead; // note up to were we processed 380 __atomic_store_n(&ctx.sq.free_ring.tail, ftail + count, __ATOMIC_SEQ_CST); 381 382 __ioarbiter_notify(ctx); 383 744 384 return count; 745 385 } 746 386 747 void __sqe_clean( volatile struct io_uring_sqe * sqe ) { 748 __clean( sqe ); 749 } 750 751 static inline void __clean( volatile struct io_uring_sqe * sqe ) { 752 // If we are in debug mode, thrash the fields to make sure we catch reclamation errors 753 __cfaabi_dbg_debug_do( 754 memset(sqe, 0xde, sizeof(*sqe)); 755 sqe->opcode = (sizeof(opcodes) / sizeof(const char *)) - 1; 756 ); 757 758 // Mark the entry as unused 759 __atomic_store_n(&sqe->user_data, 3ul64, __ATOMIC_SEQ_CST); 387 //============================================================================================= 388 // I/O Arbiter 389 //============================================================================================= 390 static inline void block(__outstanding_io_queue & queue, __outstanding_io & item) { 391 // Lock the list, it's not thread safe 392 lock( queue.lock __cfaabi_dbg_ctx2 ); 393 { 394 // Add our request to the list 395 add( queue.queue, item ); 396 397 // Mark as pending 398 __atomic_store_n( &queue.empty, false, __ATOMIC_SEQ_CST ); 399 } 400 unlock( queue.lock ); 401 402 wait( item.sem ); 403 } 404 405 static inline bool empty(__outstanding_io_queue & queue ) { 406 return __atomic_load_n( &queue.empty, __ATOMIC_SEQ_CST); 407 } 408 409 static $io_context * __ioarbiter_allocate( $io_arbiter & this, __u32 idxs[], __u32 want ) { 410 __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n"); 411 412 __STATS__( false, io.alloc.block += 1; ) 413 414 // No one has any resources left, wait for something to finish 415 // We need to add ourself to a list of pending allocs and wait for an answer 416 __pending_alloc pa; 417 pa.idxs = idxs; 418 pa.want = want; 419 420 block(this.pending, (__outstanding_io&)pa); 421 422 return pa.ctx; 423 424 } 425 426 static void __ioarbiter_notify( $io_arbiter & this, $io_context * ctx ) { 427 /* paranoid */ verify( !empty(this.pending.queue) ); 428 429 lock( this.pending.lock __cfaabi_dbg_ctx2 ); 430 { 431 while( !empty(this.pending.queue) ) { 432 __cfadbg_print_safe(io, "Kernel I/O : notifying\n"); 433 __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head; 434 __pending_alloc & pa = (__pending_alloc&)head( this.pending.queue ); 435 436 if( have > pa.want ) goto DONE; 437 drop( this.pending.queue ); 438 439 /* paranoid */__attribute__((unused)) bool ret = 440 441 __alloc(ctx, pa.idxs, pa.want); 442 443 /* paranoid */ verify( ret ); 444 445 pa.ctx = ctx; 446 447 post( pa.sem ); 448 } 449 450 this.pending.empty = true; 451 DONE:; 452 } 453 unlock( this.pending.lock ); 454 } 455 456 static void __ioarbiter_notify( $io_context & ctx ) { 457 if(!empty( ctx.arbiter->pending )) { 458 __ioarbiter_notify( *ctx.arbiter, &ctx ); 459 } 460 } 461 462 // Simply append to the pending 463 static void __ioarbiter_submit( $io_context * ctx, __u32 idxs[], __u32 have, bool lazy ) { 464 __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd); 465 466 __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have); 467 468 __external_io ei; 469 ei.idxs = idxs; 470 ei.have = have; 471 ei.lazy = lazy; 472 473 block(ctx->ext_sq, (__outstanding_io&)ei); 474 475 __cfadbg_print_safe(io, "Kernel I/O : %u submitted from arbiter\n", have); 476 } 477 478 static void __ioarbiter_flush( $io_context & ctx ) { 479 if(!empty( ctx.ext_sq )) { 480 __STATS__( false, io.flush.external += 1; ) 481 482 __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n"); 483 484 lock( ctx.ext_sq.lock __cfaabi_dbg_ctx2 ); 485 { 486 while( !empty(ctx.ext_sq.queue) ) { 487 __external_io & ei = (__external_io&)drop( ctx.ext_sq.queue ); 488 489 __submit(&ctx, ei.idxs, ei.have, ei.lazy); 490 491 post( ei.sem ); 492 } 493 494 ctx.ext_sq.empty = true; 495 } 496 unlock(ctx.ext_sq.lock ); 497 } 760 498 } 761 499 #endif
Note:
See TracChangeset
for help on using the changeset viewer.