Changes in libcfa/src/concurrency/io.cfa [11054eb:dddb3dd0]
- File:
-
- 1 edited
-
libcfa/src/concurrency/io.cfa (modified) (7 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r11054eb rdddb3dd0 80 80 }; 81 81 82 static $io_context * __ioarbiter_allocate( $io_arbiter & this, __u32 idxs[], __u32 want );83 static void __ioarbiter_submit( $io_ context * , __u32 idxs[], __u32 have, bool lazy );84 static void __ioarbiter_flush ( $io_ context &);82 static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor *, __u32 idxs[], __u32 want ); 83 static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * , __u32 idxs[], __u32 have, bool lazy ); 84 static void __ioarbiter_flush ( $io_arbiter & mutex this, $io_context * ); 85 85 static inline void __ioarbiter_notify( $io_context & ctx ); 86 86 //============================================================================================= … … 134 134 $io_context & ctx = *proc->io.ctx; 135 135 136 __ioarbiter_flush( ctx ); 136 if(!ctx.ext_sq.empty) { 137 __ioarbiter_flush( *ctx.arbiter, &ctx ); 138 } 137 139 138 140 __STATS__( true, io.calls.flush++; ) … … 261 263 __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n"); 262 264 263 struct $io_context * ret = __ioarbiter_allocate(*ioarb, idxs, want);265 struct $io_context * ret = __ioarbiter_allocate(*ioarb, proc, idxs, want); 264 266 265 267 __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd); … … 324 326 __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n"); 325 327 326 __ioarbiter_submit( inctx, idxs, have, lazy);328 __ioarbiter_submit(*inctx->arbiter, inctx, idxs, have, lazy); 327 329 } 328 330 … … 382 384 // I/O Arbiter 383 385 //============================================================================================= 384 static inline void block(__outstanding_io_queue & queue, __outstanding_io & item) { 385 // Lock the list, it's not thread safe 386 lock( queue.lock __cfaabi_dbg_ctx2 ); 387 { 388 // Add our request to the list 389 add( queue.queue, item ); 390 391 // Mark as pending 392 __atomic_store_n( &queue.empty, false, __ATOMIC_SEQ_CST ); 393 } 394 unlock( queue.lock ); 395 396 wait( item.sem ); 397 } 398 399 static inline bool empty(__outstanding_io_queue & queue ) { 400 return __atomic_load_n( &queue.empty, __ATOMIC_SEQ_CST); 401 } 402 403 static $io_context * __ioarbiter_allocate( $io_arbiter & this, __u32 idxs[], __u32 want ) { 386 static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor * proc, __u32 idxs[], __u32 want ) { 404 387 __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n"); 405 388 … … 407 390 408 391 // No one has any resources left, wait for something to finish 409 // We need to add ourself to a list of pending allocs and wait for an answer 410 __pending_alloc pa; 411 pa.idxs = idxs; 412 pa.want = want; 413 414 block(this.pending, (__outstanding_io&)pa); 415 416 return pa.ctx; 417 418 } 419 420 static void __ioarbiter_notify( $io_arbiter & this, $io_context * ctx ) { 421 /* paranoid */ verify( !empty(this.pending.queue) ); 422 423 lock( this.pending.lock __cfaabi_dbg_ctx2 ); 424 { 425 while( !empty(this.pending.queue) ) { 426 __cfadbg_print_safe(io, "Kernel I/O : notifying\n"); 427 __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head; 428 __pending_alloc & pa = (__pending_alloc&)head( this.pending.queue ); 429 430 if( have > pa.want ) goto DONE; 431 drop( this.pending.queue ); 432 433 /* paranoid */__attribute__((unused)) bool ret = 434 435 __alloc(ctx, pa.idxs, pa.want); 436 437 /* paranoid */ verify( ret ); 438 439 pa.ctx = ctx; 440 441 post( pa.sem ); 442 } 443 444 this.pending.empty = true; 445 DONE:; 446 } 447 unlock( this.pending.lock ); 392 // Mark as pending 393 __atomic_store_n( &this.pending.flag, true, __ATOMIC_SEQ_CST ); 394 395 // Wait for our turn to submit 396 wait( this.pending.blocked, want ); 397 398 __attribute((unused)) bool ret = 399 __alloc( this.pending.ctx, idxs, want); 400 /* paranoid */ verify( ret ); 401 402 return this.pending.ctx; 403 404 } 405 406 static void __ioarbiter_notify( $io_arbiter & mutex this, $io_context * ctx ) { 407 /* paranoid */ verify( !is_empty(this.pending.blocked) ); 408 this.pending.ctx = ctx; 409 410 while( !is_empty(this.pending.blocked) ) { 411 __cfadbg_print_safe(io, "Kernel I/O : notifying\n"); 412 __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head; 413 __u32 want = front( this.pending.blocked ); 414 415 if( have > want ) return; 416 417 signal_block( this.pending.blocked ); 418 } 419 420 this.pending.flag = false; 448 421 } 449 422 450 423 static void __ioarbiter_notify( $io_context & ctx ) { 451 if( !empty( ctx.arbiter->pending)) {424 if(__atomic_load_n( &ctx.arbiter->pending.flag, __ATOMIC_SEQ_CST)) { 452 425 __ioarbiter_notify( *ctx.arbiter, &ctx ); 453 426 } … … 455 428 456 429 // Simply append to the pending 457 static void __ioarbiter_submit( $io_ context * ctx, __u32 idxs[], __u32 have, bool lazy ) {430 static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * ctx, __u32 idxs[], __u32 have, bool lazy ) { 458 431 __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd); 459 432 433 /* paranoid */ verify( &this == ctx->arbiter ); 434 435 // Mark as pending 436 __atomic_store_n( &ctx->ext_sq.empty, false, __ATOMIC_SEQ_CST ); 437 460 438 __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have); 461 439 462 __external_io ei; 463 ei.idxs = idxs; 464 ei.have = have; 465 ei.lazy = lazy; 466 467 block(ctx->ext_sq, (__outstanding_io&)ei); 440 // Wait for our turn to submit 441 wait( ctx->ext_sq.blocked ); 442 443 // Submit our indexes 444 __submit(ctx, idxs, have, lazy); 468 445 469 446 __cfadbg_print_safe(io, "Kernel I/O : %u submitted from arbiter\n", have); 470 447 } 471 448 472 static void __ioarbiter_flush( $io_context & ctx ) { 473 if(!empty( ctx.ext_sq )) { 474 __STATS__( false, io.flush.external += 1; ) 475 476 __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n"); 477 478 lock( ctx.ext_sq.lock __cfaabi_dbg_ctx2 ); 479 { 480 while( !empty(ctx.ext_sq.queue) ) { 481 __external_io & ei = (__external_io&)drop( ctx.ext_sq.queue ); 482 483 __submit(&ctx, ei.idxs, ei.have, ei.lazy); 484 485 post( ei.sem ); 486 } 487 488 ctx.ext_sq.empty = true; 489 } 490 unlock(ctx.ext_sq.lock ); 491 } 449 static void __ioarbiter_flush( $io_arbiter & mutex this, $io_context * ctx ) { 450 /* paranoid */ verify( &this == ctx->arbiter ); 451 452 __STATS__( false, io.flush.external += 1; ) 453 454 __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n"); 455 456 condition & blcked = ctx->ext_sq.blocked; 457 /* paranoid */ verify( ctx->ext_sq.empty == is_empty( blcked ) ); 458 while(!is_empty( blcked )) { 459 signal_block( blcked ); 460 } 461 462 ctx->ext_sq.empty = true; 492 463 } 493 464 #endif
Note:
See TracChangeset
for help on using the changeset viewer.