Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/kernel.cfa

    r4ccc150 rc9c1c1cb  
    1919// #define __CFA_DEBUG_PRINT_RUNTIME_CORE__
    2020
    21 #pragma GCC diagnostic push
    22 #pragma GCC diagnostic ignored "-Waddress-of-packed-member"
    23 
    2421//C Includes
    2522#include <errno.h>
     
    2825#include <signal.h>
    2926#include <unistd.h>
    30 
    3127extern "C" {
    3228        #include <sys/eventfd.h>
     
    3531
    3632//CFA Includes
    37 #include "kernel/private.hfa"
     33#include "kernel_private.hfa"
    3834#include "preemption.hfa"
    3935#include "strstream.hfa"
     
    4440#define __CFA_INVOKE_PRIVATE__
    4541#include "invoke.h"
    46 #pragma GCC diagnostic pop
    4742
    4843#if !defined(__CFA_NO_STATISTICS__)
     
    132127static void __wake_one(cluster * cltr);
    133128
    134 static void idle_sleep(processor * proc);
     129static void idle_sleep(processor * proc, io_future_t & future, iovec & iov);
    135130static bool mark_idle (__cluster_proc_list & idles, processor & proc);
    136131static void mark_awake(__cluster_proc_list & idles, processor & proc);
    137132
    138 extern bool __cfa_io_drain( processor * proc ) __attribute__((nonnull (1)));
    139 extern bool __cfa_io_flush( processor * ) __attribute__((nonnull (1)));
    140 extern void __cfa_io_idle( processor * ) __attribute__((nonnull (1)));
     133extern void __cfa_io_start( processor * );
     134extern bool __cfa_io_drain( processor * );
     135extern bool __cfa_io_flush( processor *, int min_comp );
     136extern void __cfa_io_stop ( processor * );
     137static inline bool __maybe_io_drain( processor * );
    141138
    142139#if defined(CFA_WITH_IO_URING_IDLE)
     
    162159        verify(this);
    163160
    164         /* paranoid */ verify( this->idle_wctx.ftr   != 0p );
    165         /* paranoid */ verify( this->idle_wctx.rdbuf != 0p );
    166 
    167         // used for idle sleep when io_uring is present
    168         // mark it as already fulfilled so we know if there is a pending request or not
    169         this->idle_wctx.ftr->self.ptr = 1p;
     161        io_future_t future; // used for idle sleep when io_uring is present
     162        future.self.ptr = 1p;  // mark it as already fulfilled so we know if there is a pending request or not
     163        eventfd_t idle_val;
     164        iovec idle_iovec = { &idle_val, sizeof(idle_val) };
     165
     166        __cfa_io_start( this );
    170167
    171168        __cfadbg_print_safe(runtime_core, "Kernel : core %p starting\n", this);
     
    192189                for() {
    193190                        // Check if there is pending io
    194                         __cfa_io_drain( this );
     191                        __maybe_io_drain( this );
    195192
    196193                        // Try to get the next thread
     
    198195
    199196                        if( !readyThread ) {
    200                                 // there is no point in holding submissions if we are idle
    201197                                __IO_STATS__(true, io.flush.idle++; )
    202                                 __cfa_io_flush( this );
    203 
    204                                 // drain again in case something showed up
    205                                 __cfa_io_drain( this );
     198                                __cfa_io_flush( this, 0 );
    206199
    207200                                readyThread = __next_thread( this->cltr );
     
    209202
    210203                        if( !readyThread ) for(5) {
     204                                __IO_STATS__(true, io.flush.idle++; )
     205
    211206                                readyThread = __next_thread_slow( this->cltr );
    212207
    213208                                if( readyThread ) break;
    214209
    215                                 // It's unlikely we still I/O to submit, but the arbiter could
    216                                 __IO_STATS__(true, io.flush.idle++; )
    217                                 __cfa_io_flush( this );
    218 
    219                                 // drain again in case something showed up
    220                                 __cfa_io_drain( this );
     210                                __cfa_io_flush( this, 0 );
    221211                        }
    222212
     
    241231                                }
    242232
    243                                 idle_sleep( this );
     233                                idle_sleep( this, future, idle_iovec );
    244234
    245235                                // We were woken up, remove self from idle
     
    261251                        if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP;
    262252
    263                         if(__atomic_load_n(&this->io.pending, __ATOMIC_RELAXED) && !__atomic_load_n(&this->io.dirty, __ATOMIC_RELAXED)) {
     253                        if(this->io.pending && !this->io.dirty) {
    264254                                __IO_STATS__(true, io.flush.dirty++; )
    265                                 __cfa_io_flush( this );
     255                                __cfa_io_flush( this, 0 );
    266256                        }
    267257                }
     
    269259                __cfadbg_print_safe(runtime_core, "Kernel : core %p stopping\n", this);
    270260        }
     261
     262        for(int i = 0; !available(future); i++) {
     263                if(i > 1000) __cfaabi_dbg_write( "ERROR: kernel has bin spinning on a flush after exit loop.\n", 60);
     264                __cfa_io_flush( this, 1 );
     265        }
     266
     267        __cfa_io_stop( this );
    271268
    272269        post( this->terminated );
     
    637634
    638635        int fd = 1;
    639         if( __atomic_load_n(&fdp->sem, __ATOMIC_SEQ_CST) != 1 ) {
    640                 fd = __atomic_exchange_n(&fdp->sem, 1, __ATOMIC_RELAXED);
     636        if( __atomic_load_n(&fdp->fd, __ATOMIC_SEQ_CST) != 1 ) {
     637                fd = __atomic_exchange_n(&fdp->fd, 1, __ATOMIC_RELAXED);
    641638        }
    642639
    643640        switch(fd) {
    644                 __attribute__((unused)) int ret;
    645641        case 0:
    646642                // If the processor isn't ready to sleep then the exchange will already wake it up
     
    660656                // If the processor was ready to sleep, we need to wake it up with an actual write
    661657                val = 1;
    662                 ret = eventfd_write( fd, val );
    663                 /* paranoid */ verifyf( ret == 0, "Expected return to be 0, was %d\n", ret );
     658                eventfd_write( fd, val );
    664659
    665660                #if !defined(__CFA_NO_STATISTICS__)
     
    682677        __cfadbg_print_safe(runtime_core, "Kernel : waking Processor %p\n", this);
    683678
    684         this->idle_wctx.sem = 1;
    685 
    686         this->idle_wctx.wake__time = rdtscl();
     679        this->idle_wctx.fd = 1;
    687680
    688681        eventfd_t val;
    689682        val = 1;
    690         __attribute__((unused)) int ret = eventfd_write( this->idle_wctx.evfd, val );
    691 
    692         /* paranoid */ verifyf( ret == 0, "Expected return to be 0, was %d\n", ret );
    693         /* paranoid */ verify( ! __preemption_enabled() );
    694 }
    695 
    696 static void idle_sleep(processor * this) {
    697         /* paranoid */ verify( this->idle_wctx.evfd != 1 );
    698         /* paranoid */ verify( this->idle_wctx.evfd != 2 );
    699 
     683        eventfd_write( this->idle_fd, val );
     684
     685        /* paranoid */ verify( ! __preemption_enabled() );
     686}
     687
     688static void idle_sleep(processor * this, io_future_t & future, iovec & iov) {
    700689        // Tell everyone we are ready to go do sleep
    701690        for() {
    702                 int expected = this->idle_wctx.sem;
     691                int expected = this->idle_wctx.fd;
    703692
    704693                // Someone already told us to wake-up! No time for a nap.
     
    706695
    707696                // Try to mark that we are going to sleep
    708                 if(__atomic_compare_exchange_n(&this->idle_wctx.sem, &expected, this->idle_wctx.evfd, false,  __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) {
     697                if(__atomic_compare_exchange_n(&this->idle_wctx.fd, &expected, this->idle_fd, false,  __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) {
    709698                        // Every one agreed, taking a nap
    710699                        break;
     
    724713                {
    725714                        eventfd_t val;
    726                         ssize_t ret = read( this->idle_wctx.evfd, &val, sizeof(val) );
     715                        ssize_t ret = read( this->idle_fd, &val, sizeof(val) );
    727716                        if(ret < 0) {
    728717                                switch((int)errno) {
     
    746735                #endif
    747736        #else
    748                 __cfa_io_idle( this );
     737                // Do we already have a pending read
     738                if(available(future)) {
     739                        // There is no pending read, we need to add one
     740                        reset(future);
     741
     742                        __kernel_read(this, future, iov, this->idle_fd );
     743                }
     744
     745                __cfa_io_flush( this, 1 );
    749746        #endif
    750747}
     
    753750        __STATS__(true, ready.sleep.halts++; )
    754751
    755         proc.idle_wctx.sem = 0;
     752        proc.idle_wctx.fd = 0;
    756753
    757754        /* paranoid */ verify( ! __preemption_enabled() );
     
    834831#endif
    835832
    836 
     833static inline bool __maybe_io_drain( processor * proc ) {
     834        bool ret = false;
     835        #if defined(CFA_HAVE_LINUX_IO_URING_H)
     836                __cfadbg_print_safe(runtime_core, "Kernel : core %p checking io for ring %d\n", proc, proc->io.ctx->fd);
     837
     838                // Check if we should drain the queue
     839                $io_context * ctx = proc->io.ctx;
     840                unsigned head = *ctx->cq.head;
     841                unsigned tail = *ctx->cq.tail;
     842                if(head == tail) return false;
     843                ready_schedule_lock();
     844                ret = __cfa_io_drain( proc );
     845                ready_schedule_unlock();
     846        #endif
     847        return ret;
     848}
    837849
    838850//-----------------------------------------------------------------------------
     
    891903        void print_stats_now( cluster & this, int flags ) {
    892904                crawl_cluster_stats( this );
    893                 __print_stats( this.stats, flags, "Cluster", this.name, (void*)&this );
     905                __print_stats( this.stats, this.print_stats, "Cluster", this.name, (void*)&this );
    894906        }
    895907#endif
Note: See TracChangeset for help on using the changeset viewer.