Changeset 13d33a75 for libcfa/src/concurrency/io.cfa
- Timestamp:
- Aug 18, 2020, 4:31:19 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 8e9d567
- Parents:
- ef9988b (diff), f2384c9a (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
ref9988b r13d33a75 41 41 #include "kernel/fwd.hfa" 42 42 #include "io/types.hfa" 43 44 // returns true of acquired as leader or second leader 45 static inline bool try_lock( __leaderlock_t & this ) { 46 const uintptr_t thrd = 1z | (uintptr_t)active_thread(); 47 bool block; 48 disable_interrupts(); 49 for() { 50 struct $thread * expected = this.value; 51 if( 1p != expected && 0p != expected ) { 52 /* paranoid */ verify( thrd != (uintptr_t)expected ); // We better not already be the next leader 53 enable_interrupts( __cfaabi_dbg_ctx ); 54 return false; 55 } 56 struct $thread * desired; 57 if( 0p == expected ) { 58 // If the lock isn't locked acquire it, no need to block 59 desired = 1p; 60 block = false; 61 } 62 else { 63 // If the lock is already locked try becomming the next leader 64 desired = (struct $thread *)thrd; 65 block = true; 66 } 67 if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break; 68 } 69 if( block ) { 70 enable_interrupts( __cfaabi_dbg_ctx ); 71 park( __cfaabi_dbg_ctx ); 72 disable_interrupts(); 73 } 74 return true; 75 } 76 77 static inline bool next( __leaderlock_t & this ) { 78 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 79 struct $thread * nextt; 80 for() { 81 struct $thread * expected = this.value; 82 /* paranoid */ verify( (1 & (uintptr_t)expected) == 1 ); // The lock better be locked 83 84 struct $thread * desired; 85 if( 1p == expected ) { 86 // No next leader, just unlock 87 desired = 0p; 88 nextt = 0p; 89 } 90 else { 91 // There is a next leader, remove but keep locked 92 desired = 1p; 93 nextt = (struct $thread *)(~1z & (uintptr_t)expected); 94 } 95 if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break; 96 } 97 98 if(nextt) { 99 unpark( nextt __cfaabi_dbg_ctx2 ); 100 enable_interrupts( __cfaabi_dbg_ctx ); 101 return true; 102 } 103 enable_interrupts( __cfaabi_dbg_ctx ); 104 return false; 105 } 43 106 44 107 //============================================================================================= … … 93 156 //============================================================================================= 94 157 static unsigned __collect_submitions( struct __io_data & ring ); 95 static uint32_t__release_consumed_submission( struct __io_data & ring );158 static __u32 __release_consumed_submission( struct __io_data & ring ); 96 159 97 160 static inline void process(struct io_uring_cqe & cqe ) { … … 100 163 101 164 data->result = cqe.res; 102 unpark( data->thrd __cfaabi_dbg_ctx2);165 post( data->sem ); 103 166 } 104 167 … … 136 199 unsigned head = *ring.completion_q.head; 137 200 unsigned tail = *ring.completion_q.tail; 138 const uint32_tmask = *ring.completion_q.mask;201 const __u32 mask = *ring.completion_q.mask; 139 202 140 203 // Nothing was new return 0 … … 143 206 } 144 207 145 uint32_tcount = tail - head;208 __u32 count = tail - head; 146 209 /* paranoid */ verify( count != 0 ); 147 210 for(i; count) { … … 182 245 __STATS__( true, 183 246 io.complete_q.completed_avg.val += count; 184 io.complete_q.completed_avg. fast_cnt += 1;247 io.complete_q.completed_avg.cnt += 1; 185 248 ) 186 249 enable_interrupts( __cfaabi_dbg_ctx ); … … 192 255 // We didn't get anything baton pass to the slow poller 193 256 else { 257 __STATS__( false, 258 io.complete_q.blocks += 1; 259 ) 194 260 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self); 195 261 reset = 0; … … 224 290 // 225 291 226 [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_tdata ) {292 [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) { 227 293 /* paranoid */ verify( data != 0 ); 228 294 … … 230 296 __attribute((unused)) int len = 0; 231 297 __attribute((unused)) int block = 0; 232 uint32_tcnt = *ring.submit_q.num;233 uint32_tmask = *ring.submit_q.mask;298 __u32 cnt = *ring.submit_q.num; 299 __u32 mask = *ring.submit_q.mask; 234 300 235 301 disable_interrupts(); 236 uint32_toff = __tls_rand();302 __u32 off = __tls_rand(); 237 303 enable_interrupts( __cfaabi_dbg_ctx ); 238 304 … … 241 307 // Look through the list starting at some offset 242 308 for(i; cnt) { 243 uint64_texpected = 0;244 uint32_tidx = (i + off) & mask;309 __u64 expected = 0; 310 __u32 idx = (i + off) & mask; 245 311 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 246 volatile uint64_t * udata = (volatile uint64_t *)&sqe->user_data;312 volatile __u64 * udata = &sqe->user_data; 247 313 248 314 if( *udata == expected && … … 270 336 } 271 337 272 static inline uint32_t __submit_to_ready_array( struct __io_data & ring, uint32_t idx, const uint32_tmask ) {338 static inline __u32 __submit_to_ready_array( struct __io_data & ring, __u32 idx, const __u32 mask ) { 273 339 /* paranoid */ verify( idx <= mask ); 274 340 /* paranoid */ verify( idx != -1ul32 ); … … 277 343 __attribute((unused)) int len = 0; 278 344 __attribute((unused)) int block = 0; 279 uint32_tready_mask = ring.submit_q.ready_cnt - 1;345 __u32 ready_mask = ring.submit_q.ready_cnt - 1; 280 346 281 347 disable_interrupts(); 282 uint32_toff = __tls_rand();348 __u32 off = __tls_rand(); 283 349 enable_interrupts( __cfaabi_dbg_ctx ); 284 350 285 uint32_tpicked;351 __u32 picked; 286 352 LOOKING: for() { 287 353 for(i; ring.submit_q.ready_cnt) { 288 354 picked = (i + off) & ready_mask; 289 uint32_texpected = -1ul32;355 __u32 expected = -1ul32; 290 356 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) { 291 357 break LOOKING; … … 297 363 298 364 block++; 299 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) { 300 __release_consumed_submission( ring ); 301 unlock( ring.submit_q.lock ); 302 } 303 else { 365 366 __u32 released = __release_consumed_submission( ring ); 367 if( released == 0 ) { 304 368 yield(); 305 369 } … … 316 380 } 317 381 318 void __submit( struct io_context * ctx, uint32_tidx ) __attribute__((nonnull (1))) {382 void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) { 319 383 __io_data & ring = *ctx->thrd.ring; 320 384 // Get now the data we definetely need 321 volatile uint32_t* const tail = ring.submit_q.tail;322 const uint32_tmask = *ring.submit_q.mask;385 volatile __u32 * const tail = ring.submit_q.tail; 386 const __u32 mask = *ring.submit_q.mask; 323 387 324 388 // There are 2 submission schemes, check which one we are using … … 332 396 } 333 397 else if( ring.eager_submits ) { 334 uint32_t picked = __submit_to_ready_array( ring, idx, mask ); 335 336 for() { 337 yield(); 338 339 // If some one else collected our index, we are done 340 #warning ABA problem 341 if( ring.submit_q.ready[picked] != idx ) { 398 __u32 picked = __submit_to_ready_array( ring, idx, mask ); 399 400 #if defined(LEADER_LOCK) 401 if( !try_lock(ring.submit_q.submit_lock) ) { 342 402 __STATS__( false, 343 403 io.submit_q.helped += 1; … … 345 405 return; 346 406 } 347 348 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) { 407 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 408 __STATS__( true, 409 io.submit_q.leader += 1; 410 ) 411 #else 412 for() { 413 yield(); 414 415 if( try_lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2) ) { 416 __STATS__( false, 417 io.submit_q.leader += 1; 418 ) 419 break; 420 } 421 422 // If some one else collected our index, we are done 423 #warning ABA problem 424 if( ring.submit_q.ready[picked] != idx ) { 425 __STATS__( false, 426 io.submit_q.helped += 1; 427 ) 428 return; 429 } 430 349 431 __STATS__( false, 350 io.submit_q. leader+= 1;432 io.submit_q.busy += 1; 351 433 ) 352 break; 353 } 354 355 __STATS__( false, 356 io.submit_q.busy += 1; 357 ) 358 } 434 } 435 #endif 359 436 360 437 // We got the lock 438 // Collect the submissions 361 439 unsigned to_submit = __collect_submitions( ring ); 440 441 // Actually submit 362 442 int ret = __io_uring_enter( ring, to_submit, false ); 363 if( ret < 0 ) { 364 unlock(ring.submit_q.lock); 365 return; 366 } 367 368 /* paranoid */ verify( ret > 0 || to_submit == 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) ); 443 444 #if defined(LEADER_LOCK) 445 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 446 next(ring.submit_q.submit_lock); 447 #else 448 unlock(ring.submit_q.submit_lock); 449 #endif 450 if( ret < 0 ) return; 369 451 370 452 // Release the consumed SQEs … … 372 454 373 455 // update statistics 374 __STATS__( true,456 __STATS__( false, 375 457 io.submit_q.submit_avg.rdy += to_submit; 376 458 io.submit_q.submit_avg.csm += ret; 377 459 io.submit_q.submit_avg.cnt += 1; 378 460 ) 379 380 unlock(ring.submit_q.lock);381 461 } 382 462 else { 383 463 // get mutual exclusion 384 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 464 #if defined(LEADER_LOCK) 465 while(!try_lock(ring.submit_q.submit_lock)); 466 #else 467 lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2); 468 #endif 385 469 386 470 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0, … … 420 504 __release_consumed_submission( ring ); 421 505 422 unlock(ring.submit_q.lock); 506 #if defined(LEADER_LOCK) 507 next(ring.submit_q.submit_lock); 508 #else 509 unlock(ring.submit_q.submit_lock); 510 #endif 423 511 424 512 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret ); … … 426 514 } 427 515 516 // #define PARTIAL_SUBMIT 32 428 517 static unsigned __collect_submitions( struct __io_data & ring ) { 429 518 /* paranoid */ verify( ring.submit_q.ready != 0p ); … … 431 520 432 521 unsigned to_submit = 0; 433 uint32_t tail = *ring.submit_q.tail; 434 const uint32_t mask = *ring.submit_q.mask; 522 __u32 tail = *ring.submit_q.tail; 523 const __u32 mask = *ring.submit_q.mask; 524 #if defined(PARTIAL_SUBMIT) 525 #if defined(LEADER_LOCK) 526 #error PARTIAL_SUBMIT and LEADER_LOCK cannot co-exist 527 #endif 528 const __u32 cnt = ring.submit_q.ready_cnt > PARTIAL_SUBMIT ? PARTIAL_SUBMIT : ring.submit_q.ready_cnt; 529 const __u32 offset = ring.submit_q.prev_ready; 530 ring.submit_q.prev_ready += cnt; 531 #else 532 const __u32 cnt = ring.submit_q.ready_cnt; 533 const __u32 offset = 0; 534 #endif 435 535 436 536 // Go through the list of ready submissions 437 for( i; ring.submit_q.ready_cnt ) { 537 for( c; cnt ) { 538 __u32 i = (offset + c) % ring.submit_q.ready_cnt; 539 438 540 // replace any submission with the sentinel, to consume it. 439 uint32_tidx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);541 __u32 idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); 440 542 441 543 // If it was already the sentinel, then we are done … … 453 555 } 454 556 455 static uint32_t__release_consumed_submission( struct __io_data & ring ) {456 const uint32_tsmask = *ring.submit_q.mask;557 static __u32 __release_consumed_submission( struct __io_data & ring ) { 558 const __u32 smask = *ring.submit_q.mask; 457 559 458 560 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0; 459 uint32_tchead = *ring.submit_q.head;460 uint32_tphead = ring.submit_q.prev_head;561 __u32 chead = *ring.submit_q.head; 562 __u32 phead = ring.submit_q.prev_head; 461 563 ring.submit_q.prev_head = chead; 462 564 unlock(ring.submit_q.release_lock); 463 565 464 uint32_tcount = chead - phead;566 __u32 count = chead - phead; 465 567 for( i; count ) { 466 uint32_tidx = ring.submit_q.array[ (phead + i) & smask ];568 __u32 idx = ring.submit_q.array[ (phead + i) & smask ]; 467 569 ring.submit_q.sqes[ idx ].user_data = 0; 468 570 }
Note: See TracChangeset
for help on using the changeset viewer.