Changes in / [cb0bcf1:4a40fca7]
- Location:
- libcfa/src/concurrency
- Files:
-
- 3 edited
-
io.cfa (modified) (5 diffs)
-
io/types.hfa (modified) (3 diffs)
-
pthread.cfa (modified) (8 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
rcb0bcf1 r4a40fca7 594 594 lock( queue.lock __cfaabi_dbg_ctx2 ); 595 595 { 596 was_empty = queue.queue`isEmpty;596 was_empty = empty(queue.queue); 597 597 598 598 // Add our request to the list 599 insert_last( queue.queue, item );599 add( queue.queue, item ); 600 600 601 601 // Mark as pending … … 632 632 // notify the arbiter that new allocations are available 633 633 static void __ioarbiter_notify( io_arbiter$ & this, io_context$ * ctx ) { 634 /* paranoid */ verify( ! this.pending.queue`isEmpty);634 /* paranoid */ verify( !empty(this.pending.queue) ); 635 635 /* paranoid */ verify( __preemption_enabled() ); 636 636 … … 642 642 // as long as there are pending allocations try to satisfy them 643 643 // for simplicity do it in FIFO order 644 while( ! this.pending.queue`isEmpty) {644 while( !empty(this.pending.queue) ) { 645 645 // get first pending allocs 646 646 __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head; 647 __pending_alloc & pa = (__pending_alloc&) (this.pending.queue`first);647 __pending_alloc & pa = (__pending_alloc&)head( this.pending.queue ); 648 648 649 649 // check if we have enough to satisfy the request … … 651 651 652 652 // if there are enough allocations it means we can drop the request 653 try_pop_front( this.pending.queue );653 drop( this.pending.queue ); 654 654 655 655 /* paranoid */__attribute__((unused)) bool ret = … … 727 727 // pop each operation one at a time. 728 728 // There is no wait morphing because of the io sq ring 729 while( ! ctx.ext_sq.queue`isEmpty) {729 while( !empty(ctx.ext_sq.queue) ) { 730 730 // drop the element from the queue 731 __external_io & ei = (__external_io&) try_pop_front( ctx.ext_sq.queue );731 __external_io & ei = (__external_io&)drop( ctx.ext_sq.queue ); 732 732 733 733 // submit it -
libcfa/src/concurrency/io/types.hfa
rcb0bcf1 r4a40fca7 24 24 25 25 #include "bits/locks.hfa" 26 #include "bits/queue.hfa" 26 27 #include "iofwd.hfa" 27 28 #include "kernel/fwd.hfa" 28 29 29 30 #if defined(CFA_HAVE_LINUX_IO_URING_H) 31 #include "bits/sequence.hfa" 30 32 #include "monitor.hfa" 31 33 … … 118 120 struct __outstanding_io { 119 121 // intrusive link fields 120 inline dlink(__outstanding_io);122 inline Colable; 121 123 122 124 // primitive on which to block until the io is processed 123 125 oneshot waitctx; 124 126 }; 125 P9_EMBEDDED( __outstanding_io, dlink(__outstanding_io) ) 127 static inline __outstanding_io *& Next( __outstanding_io * n ) { return (__outstanding_io *)Next( (Colable *)n ); } 126 128 127 129 // queue of operations that are outstanding … … 132 134 133 135 // the actual queue 134 dlist(__outstanding_io) queue;136 Queue(__outstanding_io) queue; 135 137 136 138 // volatile used to avoid the need for taking the lock if it's empty -
libcfa/src/concurrency/pthread.cfa
rcb0bcf1 r4a40fca7 20 20 #include <errno.h> 21 21 #include "locks.hfa" 22 #include "bits/stack.hfa" 23 #include "bits/sequence.hfa" 24 22 25 23 26 #define check_nonnull(x) asm("": "+rm"(x)); if( x == 0p ) return EINVAL; … … 31 34 32 35 struct pthread_values{ 33 inline dlink(pthread_values);36 inline Seqable; 34 37 void * value; 35 38 bool in_use; 36 39 }; 37 P9_EMBEDDED( pthread_values, dlink(pthread_values) ) 40 41 static inline { 42 pthread_values *& Back( pthread_values * n ) { 43 return (pthread_values *)Back( (Seqable *)n ); 44 } 45 46 pthread_values *& Next( pthread_values * n ) { 47 return (pthread_values *)Next( (Colable *)n ); 48 } 49 } 38 50 39 51 struct pthread_keys { 40 52 bool in_use; 41 53 void (* destructor)( void * ); 42 dlist( pthread_values) threads;54 Sequence(pthread_values) threads; 43 55 }; 44 56 … … 66 78 67 79 struct Pthread_kernel_threads{ 68 inline dlink(Pthread_kernel_threads);80 inline Colable; 69 81 processor p; 70 82 }; 71 P9_EMBEDDED( Pthread_kernel_threads, dlink(Pthread_kernel_threads) ) 72 73 static dlist(Pthread_kernel_threads) cfa_pthreads_kernel_threads; 83 84 Pthread_kernel_threads *& Next( Pthread_kernel_threads * n ) { 85 return (Pthread_kernel_threads *)Next( (Colable *)n ); 86 } 87 88 static Stack(Pthread_kernel_threads) cfa_pthreads_kernel_threads; 74 89 static bool cfa_pthreads_kernel_threads_zero = false; // set to zero ? 75 90 static int cfa_pthreads_no_kernel_threads = 1; // number of kernel threads … … 216 231 key = &cfa_pthread_keys[i]; 217 232 value->in_use = false; 218 remove(*value); 219 233 remove(key->threads, *value); 220 234 // if a key value has a non-NULL destructor pointer, and the thread has a non-NULL value associated with that key, 221 235 // the value of the key is set to NULL, and then the function pointed to is called with the previously associated value as its sole argument. … … 537 551 538 552 // Remove key from all threads with a value. 539 540 // Sequence(pthread_values)& head = cfa_pthread_keys[key].threads; 541 // for ( SeqIter(pthread_values) iter = { head }; iter | p; ) { 542 // remove(head, p); 543 // p.in_use = false; 544 // } 545 pthread_values * p = &try_pop_front( cfa_pthread_keys[key].threads ); 546 for ( ; p; ) { 547 p->in_use = false; 548 p = &try_pop_front( cfa_pthread_keys[key].threads ); 549 } 553 pthread_values& p; 554 Sequence(pthread_values)& head = cfa_pthread_keys[key].threads; 555 for ( SeqIter(pthread_values) iter = { head }; iter | p; ) { 556 remove(head, p); 557 p.in_use = false; 558 } 550 559 unlock(key_lock); 551 560 return 0; … … 576 585 if ( ! entry.in_use ) { 577 586 entry.in_use = true; 578 insert_last(cfa_pthread_keys[key].threads, entry);587 add(cfa_pthread_keys[key].threads, entry); 579 588 } // if 580 589 entry.value = (void *)value; … … 603 612 //######################### Parallelism ######################### 604 613 void pthread_delete_kernel_threads_() __THROW { // see uMain::~uMain 605 Pthread_kernel_threads * p = &try_pop_front(cfa_pthreads_kernel_threads); 606 for ( ; p; ) { 607 delete(p); 608 p = &try_pop_front(cfa_pthreads_kernel_threads); 614 Pthread_kernel_threads& p; 615 for ( StackIter(Pthread_kernel_threads) iter = {cfa_pthreads_kernel_threads}; iter | p; ) { 616 delete(&p); 609 617 } // for 610 618 } // pthread_delete_kernel_threads_ … … 623 631 lock( concurrency_lock ); 624 632 for ( ; new_level > cfa_pthreads_no_kernel_threads; cfa_pthreads_no_kernel_threads += 1 ) { // add processors ? 625 insert_last(cfa_pthreads_kernel_threads, *new() );633 push(cfa_pthreads_kernel_threads, *new() ); 626 634 } // for 627 635 for ( ; new_level < cfa_pthreads_no_kernel_threads; cfa_pthreads_no_kernel_threads -= 1 ) { // remove processors ? 628 delete(& try_pop_front(cfa_pthreads_kernel_threads));636 delete(&pop(cfa_pthreads_kernel_threads)); 629 637 } // for 630 638 unlock( concurrency_lock );
Note:
See TracChangeset
for help on using the changeset viewer.