- File:
-
- 1 edited
-
libcfa/src/concurrency/kernel.cfa (modified) (21 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/kernel.cfa
r4ccc150 rc9c1c1cb 19 19 // #define __CFA_DEBUG_PRINT_RUNTIME_CORE__ 20 20 21 #pragma GCC diagnostic push22 #pragma GCC diagnostic ignored "-Waddress-of-packed-member"23 24 21 //C Includes 25 22 #include <errno.h> … … 28 25 #include <signal.h> 29 26 #include <unistd.h> 30 31 27 extern "C" { 32 28 #include <sys/eventfd.h> … … 35 31 36 32 //CFA Includes 37 #include "kernel /private.hfa"33 #include "kernel_private.hfa" 38 34 #include "preemption.hfa" 39 35 #include "strstream.hfa" … … 44 40 #define __CFA_INVOKE_PRIVATE__ 45 41 #include "invoke.h" 46 #pragma GCC diagnostic pop47 42 48 43 #if !defined(__CFA_NO_STATISTICS__) … … 132 127 static void __wake_one(cluster * cltr); 133 128 134 static void idle_sleep(processor * proc );129 static void idle_sleep(processor * proc, io_future_t & future, iovec & iov); 135 130 static bool mark_idle (__cluster_proc_list & idles, processor & proc); 136 131 static void mark_awake(__cluster_proc_list & idles, processor & proc); 137 132 138 extern bool __cfa_io_drain( processor * proc ) __attribute__((nonnull (1))); 139 extern bool __cfa_io_flush( processor * ) __attribute__((nonnull (1))); 140 extern void __cfa_io_idle( processor * ) __attribute__((nonnull (1))); 133 extern void __cfa_io_start( processor * ); 134 extern bool __cfa_io_drain( processor * ); 135 extern bool __cfa_io_flush( processor *, int min_comp ); 136 extern void __cfa_io_stop ( processor * ); 137 static inline bool __maybe_io_drain( processor * ); 141 138 142 139 #if defined(CFA_WITH_IO_URING_IDLE) … … 162 159 verify(this); 163 160 164 /* paranoid */ verify( this->idle_wctx.ftr != 0p );165 /* paranoid */ verify( this->idle_wctx.rdbuf != 0p );166 167 // used for idle sleep when io_uring is present168 // mark it as already fulfilled so we know if there is a pending request or not 169 this->idle_wctx.ftr->self.ptr = 1p;161 io_future_t future; // used for idle sleep when io_uring is present 162 future.self.ptr = 1p; // mark it as already fulfilled so we know if there is a pending request or not 163 eventfd_t idle_val; 164 iovec idle_iovec = { &idle_val, sizeof(idle_val) }; 165 166 __cfa_io_start( this ); 170 167 171 168 __cfadbg_print_safe(runtime_core, "Kernel : core %p starting\n", this); … … 192 189 for() { 193 190 // Check if there is pending io 194 __ cfa_io_drain( this );191 __maybe_io_drain( this ); 195 192 196 193 // Try to get the next thread … … 198 195 199 196 if( !readyThread ) { 200 // there is no point in holding submissions if we are idle201 197 __IO_STATS__(true, io.flush.idle++; ) 202 __cfa_io_flush( this ); 203 204 // drain again in case something showed up 205 __cfa_io_drain( this ); 198 __cfa_io_flush( this, 0 ); 206 199 207 200 readyThread = __next_thread( this->cltr ); … … 209 202 210 203 if( !readyThread ) for(5) { 204 __IO_STATS__(true, io.flush.idle++; ) 205 211 206 readyThread = __next_thread_slow( this->cltr ); 212 207 213 208 if( readyThread ) break; 214 209 215 // It's unlikely we still I/O to submit, but the arbiter could 216 __IO_STATS__(true, io.flush.idle++; ) 217 __cfa_io_flush( this ); 218 219 // drain again in case something showed up 220 __cfa_io_drain( this ); 210 __cfa_io_flush( this, 0 ); 221 211 } 222 212 … … 241 231 } 242 232 243 idle_sleep( this );233 idle_sleep( this, future, idle_iovec ); 244 234 245 235 // We were woken up, remove self from idle … … 261 251 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP; 262 252 263 if( __atomic_load_n(&this->io.pending, __ATOMIC_RELAXED) && !__atomic_load_n(&this->io.dirty, __ATOMIC_RELAXED)) {253 if(this->io.pending && !this->io.dirty) { 264 254 __IO_STATS__(true, io.flush.dirty++; ) 265 __cfa_io_flush( this );255 __cfa_io_flush( this, 0 ); 266 256 } 267 257 } … … 269 259 __cfadbg_print_safe(runtime_core, "Kernel : core %p stopping\n", this); 270 260 } 261 262 for(int i = 0; !available(future); i++) { 263 if(i > 1000) __cfaabi_dbg_write( "ERROR: kernel has bin spinning on a flush after exit loop.\n", 60); 264 __cfa_io_flush( this, 1 ); 265 } 266 267 __cfa_io_stop( this ); 271 268 272 269 post( this->terminated ); … … 637 634 638 635 int fd = 1; 639 if( __atomic_load_n(&fdp-> sem, __ATOMIC_SEQ_CST) != 1 ) {640 fd = __atomic_exchange_n(&fdp-> sem, 1, __ATOMIC_RELAXED);636 if( __atomic_load_n(&fdp->fd, __ATOMIC_SEQ_CST) != 1 ) { 637 fd = __atomic_exchange_n(&fdp->fd, 1, __ATOMIC_RELAXED); 641 638 } 642 639 643 640 switch(fd) { 644 __attribute__((unused)) int ret;645 641 case 0: 646 642 // If the processor isn't ready to sleep then the exchange will already wake it up … … 660 656 // If the processor was ready to sleep, we need to wake it up with an actual write 661 657 val = 1; 662 ret = eventfd_write( fd, val ); 663 /* paranoid */ verifyf( ret == 0, "Expected return to be 0, was %d\n", ret ); 658 eventfd_write( fd, val ); 664 659 665 660 #if !defined(__CFA_NO_STATISTICS__) … … 682 677 __cfadbg_print_safe(runtime_core, "Kernel : waking Processor %p\n", this); 683 678 684 this->idle_wctx.sem = 1; 685 686 this->idle_wctx.wake__time = rdtscl(); 679 this->idle_wctx.fd = 1; 687 680 688 681 eventfd_t val; 689 682 val = 1; 690 __attribute__((unused)) int ret = eventfd_write( this->idle_wctx.evfd, val ); 691 692 /* paranoid */ verifyf( ret == 0, "Expected return to be 0, was %d\n", ret ); 693 /* paranoid */ verify( ! __preemption_enabled() ); 694 } 695 696 static void idle_sleep(processor * this) { 697 /* paranoid */ verify( this->idle_wctx.evfd != 1 ); 698 /* paranoid */ verify( this->idle_wctx.evfd != 2 ); 699 683 eventfd_write( this->idle_fd, val ); 684 685 /* paranoid */ verify( ! __preemption_enabled() ); 686 } 687 688 static void idle_sleep(processor * this, io_future_t & future, iovec & iov) { 700 689 // Tell everyone we are ready to go do sleep 701 690 for() { 702 int expected = this->idle_wctx. sem;691 int expected = this->idle_wctx.fd; 703 692 704 693 // Someone already told us to wake-up! No time for a nap. … … 706 695 707 696 // Try to mark that we are going to sleep 708 if(__atomic_compare_exchange_n(&this->idle_wctx. sem, &expected, this->idle_wctx.evfd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) {697 if(__atomic_compare_exchange_n(&this->idle_wctx.fd, &expected, this->idle_fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) { 709 698 // Every one agreed, taking a nap 710 699 break; … … 724 713 { 725 714 eventfd_t val; 726 ssize_t ret = read( this->idle_ wctx.evfd, &val, sizeof(val) );715 ssize_t ret = read( this->idle_fd, &val, sizeof(val) ); 727 716 if(ret < 0) { 728 717 switch((int)errno) { … … 746 735 #endif 747 736 #else 748 __cfa_io_idle( this ); 737 // Do we already have a pending read 738 if(available(future)) { 739 // There is no pending read, we need to add one 740 reset(future); 741 742 __kernel_read(this, future, iov, this->idle_fd ); 743 } 744 745 __cfa_io_flush( this, 1 ); 749 746 #endif 750 747 } … … 753 750 __STATS__(true, ready.sleep.halts++; ) 754 751 755 proc.idle_wctx. sem= 0;752 proc.idle_wctx.fd = 0; 756 753 757 754 /* paranoid */ verify( ! __preemption_enabled() ); … … 834 831 #endif 835 832 836 833 static inline bool __maybe_io_drain( processor * proc ) { 834 bool ret = false; 835 #if defined(CFA_HAVE_LINUX_IO_URING_H) 836 __cfadbg_print_safe(runtime_core, "Kernel : core %p checking io for ring %d\n", proc, proc->io.ctx->fd); 837 838 // Check if we should drain the queue 839 $io_context * ctx = proc->io.ctx; 840 unsigned head = *ctx->cq.head; 841 unsigned tail = *ctx->cq.tail; 842 if(head == tail) return false; 843 ready_schedule_lock(); 844 ret = __cfa_io_drain( proc ); 845 ready_schedule_unlock(); 846 #endif 847 return ret; 848 } 837 849 838 850 //----------------------------------------------------------------------------- … … 891 903 void print_stats_now( cluster & this, int flags ) { 892 904 crawl_cluster_stats( this ); 893 __print_stats( this.stats, flags, "Cluster", this.name, (void*)&this );905 __print_stats( this.stats, this.print_stats, "Cluster", this.name, (void*)&this ); 894 906 } 895 907 #endif
Note:
See TracChangeset
for help on using the changeset viewer.