Changes in libcfa/src/concurrency/io.cfa [93526ef:eafec07]
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r93526ef reafec07 41 41 #include "kernel/fwd.hfa" 42 42 #include "io/types.hfa" 43 44 // returns true of acquired as leader or second leader45 static inline bool try_lock( __leaderlock_t & this ) {46 const uintptr_t thrd = 1z | (uintptr_t)active_thread();47 bool block;48 disable_interrupts();49 for() {50 struct $thread * expected = this.value;51 if( 1p != expected && 0p != expected ) {52 /* paranoid */ verify( thrd != (uintptr_t)expected ); // We better not already be the next leader53 enable_interrupts( __cfaabi_dbg_ctx );54 return false;55 }56 struct $thread * desired;57 if( 0p == expected ) {58 // If the lock isn't locked acquire it, no need to block59 desired = 1p;60 block = false;61 }62 else {63 // If the lock is already locked try becomming the next leader64 desired = (struct $thread *)thrd;65 block = true;66 }67 if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break;68 }69 if( block ) {70 enable_interrupts( __cfaabi_dbg_ctx );71 park( __cfaabi_dbg_ctx );72 disable_interrupts();73 }74 return true;75 }76 77 static inline bool next( __leaderlock_t & this ) {78 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );79 struct $thread * nextt;80 for() {81 struct $thread * expected = this.value;82 /* paranoid */ verify( (1 & (uintptr_t)expected) == 1 ); // The lock better be locked83 84 struct $thread * desired;85 if( 1p == expected ) {86 // No next leader, just unlock87 desired = 0p;88 nextt = 0p;89 }90 else {91 // There is a next leader, remove but keep locked92 desired = 1p;93 nextt = (struct $thread *)(~1z & (uintptr_t)expected);94 }95 if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break;96 }97 98 if(nextt) {99 unpark( nextt __cfaabi_dbg_ctx2 );100 enable_interrupts( __cfaabi_dbg_ctx );101 return true;102 }103 enable_interrupts( __cfaabi_dbg_ctx );104 return false;105 }106 43 107 44 //============================================================================================= … … 156 93 //============================================================================================= 157 94 static unsigned __collect_submitions( struct __io_data & ring ); 158 static __u32__release_consumed_submission( struct __io_data & ring );95 static uint32_t __release_consumed_submission( struct __io_data & ring ); 159 96 160 97 static inline void process(struct io_uring_cqe & cqe ) { … … 163 100 164 101 data->result = cqe.res; 165 post( data->sem);102 unpark( data->thrd __cfaabi_dbg_ctx2 ); 166 103 } 167 104 … … 199 136 unsigned head = *ring.completion_q.head; 200 137 unsigned tail = *ring.completion_q.tail; 201 const __u32mask = *ring.completion_q.mask;138 const uint32_t mask = *ring.completion_q.mask; 202 139 203 140 // Nothing was new return 0 … … 206 143 } 207 144 208 __u32count = tail - head;145 uint32_t count = tail - head; 209 146 /* paranoid */ verify( count != 0 ); 210 147 for(i; count) { … … 245 182 __STATS__( true, 246 183 io.complete_q.completed_avg.val += count; 247 io.complete_q.completed_avg. cnt += 1;184 io.complete_q.completed_avg.fast_cnt += 1; 248 185 ) 249 186 enable_interrupts( __cfaabi_dbg_ctx ); … … 255 192 // We didn't get anything baton pass to the slow poller 256 193 else { 257 __STATS__( false,258 io.complete_q.blocks += 1;259 )260 194 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self); 261 195 reset = 0; … … 290 224 // 291 225 292 [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64data ) {226 [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) { 293 227 /* paranoid */ verify( data != 0 ); 294 228 … … 296 230 __attribute((unused)) int len = 0; 297 231 __attribute((unused)) int block = 0; 298 __u32cnt = *ring.submit_q.num;299 __u32mask = *ring.submit_q.mask;232 uint32_t cnt = *ring.submit_q.num; 233 uint32_t mask = *ring.submit_q.mask; 300 234 301 235 disable_interrupts(); 302 __u32off = __tls_rand();236 uint32_t off = __tls_rand(); 303 237 enable_interrupts( __cfaabi_dbg_ctx ); 304 238 … … 307 241 // Look through the list starting at some offset 308 242 for(i; cnt) { 309 __u64expected = 0;310 __u32idx = (i + off) & mask;243 uint64_t expected = 0; 244 uint32_t idx = (i + off) & mask; 311 245 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 312 volatile __u64 * udata =&sqe->user_data;246 volatile uint64_t * udata = (volatile uint64_t *)&sqe->user_data; 313 247 314 248 if( *udata == expected && … … 336 270 } 337 271 338 static inline __u32 __submit_to_ready_array( struct __io_data & ring, __u32 idx, const __u32mask ) {272 static inline uint32_t __submit_to_ready_array( struct __io_data & ring, uint32_t idx, const uint32_t mask ) { 339 273 /* paranoid */ verify( idx <= mask ); 340 274 /* paranoid */ verify( idx != -1ul32 ); … … 343 277 __attribute((unused)) int len = 0; 344 278 __attribute((unused)) int block = 0; 345 __u32ready_mask = ring.submit_q.ready_cnt - 1;279 uint32_t ready_mask = ring.submit_q.ready_cnt - 1; 346 280 347 281 disable_interrupts(); 348 __u32off = __tls_rand();282 uint32_t off = __tls_rand(); 349 283 enable_interrupts( __cfaabi_dbg_ctx ); 350 284 351 __u32picked;285 uint32_t picked; 352 286 LOOKING: for() { 353 287 for(i; ring.submit_q.ready_cnt) { 354 288 picked = (i + off) & ready_mask; 355 __u32expected = -1ul32;289 uint32_t expected = -1ul32; 356 290 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) { 357 291 break LOOKING; … … 363 297 364 298 block++; 365 366 __u32 released = __release_consumed_submission( ring ); 367 if( released == 0 ) { 299 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) { 300 __release_consumed_submission( ring ); 301 unlock( ring.submit_q.lock ); 302 } 303 else { 368 304 yield(); 369 305 } … … 380 316 } 381 317 382 void __submit( struct io_context * ctx, __u32idx ) __attribute__((nonnull (1))) {318 void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1))) { 383 319 __io_data & ring = *ctx->thrd.ring; 384 320 // Get now the data we definetely need 385 volatile __u32* const tail = ring.submit_q.tail;386 const __u32mask = *ring.submit_q.mask;321 volatile uint32_t * const tail = ring.submit_q.tail; 322 const uint32_t mask = *ring.submit_q.mask; 387 323 388 324 // There are 2 submission schemes, check which one we are using … … 396 332 } 397 333 else if( ring.eager_submits ) { 398 __u32 picked = __submit_to_ready_array( ring, idx, mask ); 399 400 #if defined(LEADER_LOCK) 401 if( !try_lock(ring.submit_q.submit_lock) ) { 334 uint32_t picked = __submit_to_ready_array( ring, idx, mask ); 335 336 for() { 337 yield(); 338 339 // If some one else collected our index, we are done 340 #warning ABA problem 341 if( ring.submit_q.ready[picked] != idx ) { 402 342 __STATS__( false, 403 343 io.submit_q.helped += 1; … … 405 345 return; 406 346 } 407 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 408 __STATS__( true, 409 io.submit_q.leader += 1; 347 348 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) { 349 __STATS__( false, 350 io.submit_q.leader += 1; 351 ) 352 break; 353 } 354 355 __STATS__( false, 356 io.submit_q.busy += 1; 410 357 ) 411 #else 412 for() { 413 yield(); 414 415 if( try_lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2) ) { 416 __STATS__( false, 417 io.submit_q.leader += 1; 418 ) 419 break; 420 } 421 422 // If some one else collected our index, we are done 423 #warning ABA problem 424 if( ring.submit_q.ready[picked] != idx ) { 425 __STATS__( false, 426 io.submit_q.helped += 1; 427 ) 428 return; 429 } 430 431 __STATS__( false, 432 io.submit_q.busy += 1; 433 ) 434 } 435 #endif 358 } 436 359 437 360 // We got the lock 438 // Collect the submissions439 361 unsigned to_submit = __collect_submitions( ring ); 440 441 // Actually submit442 362 int ret = __io_uring_enter( ring, to_submit, false ); 443 444 #if defined(LEADER_LOCK) 445 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 446 next(ring.submit_q.submit_lock); 447 #else 448 unlock(ring.submit_q.submit_lock); 449 #endif 450 if( ret < 0 ) return; 363 if( ret < 0 ) { 364 unlock(ring.submit_q.lock); 365 return; 366 } 367 368 /* paranoid */ verify( ret > 0 || to_submit == 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) ); 451 369 452 370 // Release the consumed SQEs … … 454 372 455 373 // update statistics 456 __STATS__( false,374 __STATS__( true, 457 375 io.submit_q.submit_avg.rdy += to_submit; 458 376 io.submit_q.submit_avg.csm += ret; 459 377 io.submit_q.submit_avg.cnt += 1; 460 378 ) 379 380 unlock(ring.submit_q.lock); 461 381 } 462 382 else { 463 383 // get mutual exclusion 464 #if defined(LEADER_LOCK) 465 while(!try_lock(ring.submit_q.submit_lock)); 466 #else 467 lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2); 468 #endif 384 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 469 385 470 386 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0, … … 504 420 __release_consumed_submission( ring ); 505 421 506 #if defined(LEADER_LOCK) 507 next(ring.submit_q.submit_lock); 508 #else 509 unlock(ring.submit_q.submit_lock); 510 #endif 422 unlock(ring.submit_q.lock); 511 423 512 424 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret ); … … 514 426 } 515 427 516 // #define PARTIAL_SUBMIT 32517 428 static unsigned __collect_submitions( struct __io_data & ring ) { 518 429 /* paranoid */ verify( ring.submit_q.ready != 0p ); … … 520 431 521 432 unsigned to_submit = 0; 522 __u32 tail = *ring.submit_q.tail; 523 const __u32 mask = *ring.submit_q.mask; 524 #if defined(PARTIAL_SUBMIT) 525 #if defined(LEADER_LOCK) 526 #error PARTIAL_SUBMIT and LEADER_LOCK cannot co-exist 527 #endif 528 const __u32 cnt = ring.submit_q.ready_cnt > PARTIAL_SUBMIT ? PARTIAL_SUBMIT : ring.submit_q.ready_cnt; 529 const __u32 offset = ring.submit_q.prev_ready; 530 ring.submit_q.prev_ready += cnt; 531 #else 532 const __u32 cnt = ring.submit_q.ready_cnt; 533 const __u32 offset = 0; 534 #endif 433 uint32_t tail = *ring.submit_q.tail; 434 const uint32_t mask = *ring.submit_q.mask; 535 435 536 436 // Go through the list of ready submissions 537 for( c; cnt ) { 538 __u32 i = (offset + c) % ring.submit_q.ready_cnt; 539 437 for( i; ring.submit_q.ready_cnt ) { 540 438 // replace any submission with the sentinel, to consume it. 541 __u32idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);439 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); 542 440 543 441 // If it was already the sentinel, then we are done … … 555 453 } 556 454 557 static __u32__release_consumed_submission( struct __io_data & ring ) {558 const __u32smask = *ring.submit_q.mask;455 static uint32_t __release_consumed_submission( struct __io_data & ring ) { 456 const uint32_t smask = *ring.submit_q.mask; 559 457 560 458 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0; 561 __u32chead = *ring.submit_q.head;562 __u32phead = ring.submit_q.prev_head;459 uint32_t chead = *ring.submit_q.head; 460 uint32_t phead = ring.submit_q.prev_head; 563 461 ring.submit_q.prev_head = chead; 564 462 unlock(ring.submit_q.release_lock); 565 463 566 __u32count = chead - phead;464 uint32_t count = chead - phead; 567 465 for( i; count ) { 568 __u32idx = ring.submit_q.array[ (phead + i) & smask ];466 uint32_t idx = ring.submit_q.array[ (phead + i) & smask ]; 569 467 ring.submit_q.sqes[ idx ].user_data = 0; 570 468 }
Note:
See TracChangeset
for help on using the changeset viewer.