Ignore:
Timestamp:
Jan 25, 2022, 4:54:35 PM (4 years ago)
Author:
caparsons <caparson@…>
Branches:
ADT, ast-experimental, enum, forall-pointer-decay, master, pthread-emulation, qualifiedEnum
Children:
6b2d444, a488783, f681823
Parents:
f57f6ea0 (diff), 4fcbf26 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Location:
libcfa/src/concurrency
Files:
11 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/clib/cfathread.cfa

    rf57f6ea0 r97fed44  
    2222#include "thread.hfa"
    2323#include "time.hfa"
     24#include "stdlib.hfa"
    2425
    2526#include "cfathread.h"
     
    195196                                eevent.data.u64 = (uint64_t)active_thread();
    196197
    197                                 int id = thread_rand() % poller_cnt;
     198                                int id = prng() % poller_cnt;
    198199                                if(0 != epoll_ctl(poller_fds[id], EPOLL_CTL_ADD, fd, &eevent))
    199200                                {
  • libcfa/src/concurrency/io.cfa

    rf57f6ea0 r97fed44  
    144144                __ioarbiter_flush( ctx );
    145145
    146                 __STATS__( true, io.calls.flush++; )
    147                 int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, min_comp > 0 ? IORING_ENTER_GETEVENTS : 0, (sigset_t *)0p, _NSIG / 8);
    148                 if( ret < 0 ) {
    149                         switch((int)errno) {
    150                         case EAGAIN:
    151                         case EINTR:
    152                         case EBUSY:
    153                                 // Update statistics
    154                                 __STATS__( false, io.calls.errors.busy ++; )
    155                                 return false;
    156                         default:
    157                                 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
     146                if(ctx.sq.to_submit != 0 || min_comp > 0) {
     147
     148                        __STATS__( true, io.calls.flush++; )
     149                        int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, min_comp > 0 ? IORING_ENTER_GETEVENTS : 0, (sigset_t *)0p, _NSIG / 8);
     150                        if( ret < 0 ) {
     151                                switch((int)errno) {
     152                                case EAGAIN:
     153                                case EINTR:
     154                                case EBUSY:
     155                                        // Update statistics
     156                                        __STATS__( false, io.calls.errors.busy ++; )
     157                                        return false;
     158                                default:
     159                                        abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
     160                                }
    158161                        }
    159                 }
    160 
    161                 __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd);
    162                 __STATS__( true, io.calls.submitted += ret; )
    163                 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
    164                 /* paranoid */ verify( ctx.sq.to_submit >= ret );
    165 
    166                 ctx.sq.to_submit -= ret;
    167 
    168                 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
    169 
    170                 // Release the consumed SQEs
    171                 __release_sqes( ctx );
    172 
    173                 /* paranoid */ verify( ! __preemption_enabled() );
    174 
    175                 ctx.proc->io.pending = false;
     162
     163                        __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd);
     164                        __STATS__( true, io.calls.submitted += ret; )
     165                        /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     166                        /* paranoid */ verify( ctx.sq.to_submit >= ret );
     167
     168                        ctx.sq.to_submit -= ret;
     169
     170                        /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     171
     172                        // Release the consumed SQEs
     173                        __release_sqes( ctx );
     174
     175                        /* paranoid */ verify( ! __preemption_enabled() );
     176
     177                        ctx.proc->io.pending = false;
     178                }
     179
    176180                ready_schedule_lock();
    177181                bool ret = __cfa_io_drain( proc );
  • libcfa/src/concurrency/kernel.cfa

    rf57f6ea0 r97fed44  
    205205                                // Don't block if we are done
    206206                                if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP;
    207 
    208                                 #if !defined(__CFA_NO_STATISTICS__)
    209                                         __tls_stats()->ready.sleep.halts++;
    210                                 #endif
    211207
    212208                                // Push self to idle stack
     
    732728// Wake a thread from the front if there are any
    733729static void __wake_one(cluster * this) {
     730        eventfd_t val;
     731
    734732        /* paranoid */ verify( ! __preemption_enabled() );
    735733        /* paranoid */ verify( ready_schedule_islocked() );
    736734
    737735        // Check if there is a sleeping processor
    738         // int fd = __atomic_load_n(&this->procs.fd, __ATOMIC_SEQ_CST);
    739         int fd = 0;
    740         if( __atomic_load_n(&this->procs.fd, __ATOMIC_SEQ_CST) != 0 ) {
    741                 fd = __atomic_exchange_n(&this->procs.fd, 0, __ATOMIC_RELAXED);
    742         }
    743 
    744         // If no one is sleeping, we are done
    745         if( fd == 0 ) return;
    746 
    747         // We found a processor, wake it up
    748         eventfd_t val;
    749         val = 1;
    750         eventfd_write( fd, val );
    751 
    752         #if !defined(__CFA_NO_STATISTICS__)
    753                 if( kernelTLS().this_stats ) {
    754                         __tls_stats()->ready.sleep.wakes++;
    755                 }
    756                 else {
    757                         __atomic_fetch_add(&this->stats->ready.sleep.wakes, 1, __ATOMIC_RELAXED);
    758                 }
    759         #endif
     736        struct __fd_waitctx * fdp = __atomic_load_n(&this->procs.fdw, __ATOMIC_SEQ_CST);
     737
     738        // If no one is sleeping: we are done
     739        if( fdp == 0p ) return;
     740
     741        int fd = 1;
     742        if( __atomic_load_n(&fdp->fd, __ATOMIC_SEQ_CST) != 1 ) {
     743                fd = __atomic_exchange_n(&fdp->fd, 1, __ATOMIC_RELAXED);
     744        }
     745
     746        switch(fd) {
     747        case 0:
     748                // If the processor isn't ready to sleep then the exchange will already wake it up
     749                #if !defined(__CFA_NO_STATISTICS__)
     750                        if( kernelTLS().this_stats ) { __tls_stats()->ready.sleep.early++;
     751                        } else { __atomic_fetch_add(&this->stats->ready.sleep.early, 1, __ATOMIC_RELAXED); }
     752                #endif
     753                break;
     754        case 1:
     755                // If someone else already said they will wake them: we are done
     756                #if !defined(__CFA_NO_STATISTICS__)
     757                        if( kernelTLS().this_stats ) { __tls_stats()->ready.sleep.seen++;
     758                        } else { __atomic_fetch_add(&this->stats->ready.sleep.seen, 1, __ATOMIC_RELAXED); }
     759                #endif
     760                break;
     761        default:
     762                // If the processor was ready to sleep, we need to wake it up with an actual write
     763                val = 1;
     764                eventfd_write( fd, val );
     765
     766                #if !defined(__CFA_NO_STATISTICS__)
     767                        if( kernelTLS().this_stats ) { __tls_stats()->ready.sleep.wakes++;
     768                        } else { __atomic_fetch_add(&this->stats->ready.sleep.wakes, 1, __ATOMIC_RELAXED); }
     769                #endif
     770                break;
     771        }
    760772
    761773        /* paranoid */ verify( ready_schedule_islocked() );
     
    770782
    771783        __cfadbg_print_safe(runtime_core, "Kernel : waking Processor %p\n", this);
     784
     785        this->idle_wctx.fd = 1;
    772786
    773787        eventfd_t val;
     
    779793
    780794static void idle_sleep(processor * this, io_future_t & future, iovec & iov) {
     795        // Tell everyone we are ready to go do sleep
     796        for() {
     797                int expected = this->idle_wctx.fd;
     798
     799                // Someone already told us to wake-up! No time for a nap.
     800                if(expected == 1) { return; }
     801
     802                // Try to mark that we are going to sleep
     803                if(__atomic_compare_exchange_n(&this->idle_wctx.fd, &expected, this->idle_fd, false,  __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) {
     804                        // Every one agreed, taking a nap
     805                        break;
     806                }
     807        }
     808
     809
    781810        #if !defined(CFA_WITH_IO_URING_IDLE)
    782811                #if !defined(__CFA_NO_STATISTICS__)
     
    825854
    826855static bool mark_idle(__cluster_proc_list & this, processor & proc) {
     856        #if !defined(__CFA_NO_STATISTICS__)
     857                __tls_stats()->ready.sleep.halts++;
     858        #endif
     859
     860        proc.idle_wctx.fd = 0;
     861
    827862        /* paranoid */ verify( ! __preemption_enabled() );
    828863        if(!try_lock( this )) return false;
     
    832867                insert_first(this.idles, proc);
    833868
    834                 __atomic_store_n(&this.fd, proc.idle_fd, __ATOMIC_SEQ_CST);
     869                __atomic_store_n(&this.fdw, &proc.idle_wctx, __ATOMIC_SEQ_CST);
    835870        unlock( this );
    836871        /* paranoid */ verify( ! __preemption_enabled() );
     
    848883
    849884                {
    850                         int fd = 0;
    851                         if(!this.idles`isEmpty) fd = this.idles`first.idle_fd;
    852                         __atomic_store_n(&this.fd, fd, __ATOMIC_SEQ_CST);
     885                        struct __fd_waitctx * wctx = 0;
     886                        if(!this.idles`isEmpty) wctx = &this.idles`first.idle_wctx;
     887                        __atomic_store_n(&this.fdw, wctx, __ATOMIC_SEQ_CST);
    853888                }
    854889
  • libcfa/src/concurrency/kernel.hfa

    rf57f6ea0 r97fed44  
    5555};
    5656
     57
     58struct __fd_waitctx {
     59        volatile int fd;
     60};
     61
    5762// Wrapper around kernel threads
    5863struct __attribute__((aligned(128))) processor {
     
    6772                unsigned target;
    6873                unsigned last;
    69                 unsigned cnt;
    70                 unsigned long long int cutoff;
     74                signed   cpu;
    7175        } rdq;
    7276
     
    102106        int idle_fd;
    103107
     108        // Idle waitctx
     109        struct __fd_waitctx idle_wctx;
     110
    104111        // Termination synchronisation (user semaphore)
    105112        oneshot terminated;
     
    152159        volatile unsigned long long tv;
    153160        volatile unsigned long long ma;
     161};
     162
     163struct __attribute__((aligned(16))) __cache_id_t {
     164        volatile unsigned id;
    154165};
    155166
     
    164175static inline void ^?{}(__timestamp_t & this) {}
    165176
     177struct __attribute__((aligned(128))) __ready_queue_caches_t;
     178void  ?{}(__ready_queue_caches_t & this);
     179void ^?{}(__ready_queue_caches_t & this);
     180
    166181//TODO adjust cache size to ARCHITECTURE
    167 // Structure holding the relaxed ready queue
     182// Structure holding the ready queue
    168183struct __ready_queue_t {
    169184        // Data tracking the actual lanes
     
    178193                __timestamp_t * volatile tscs;
    179194
     195                __cache_id_t * volatile caches;
     196
    180197                // Array of stats
    181198                __help_cnts_t * volatile help;
     
    198215
    199216        // FD to use to wake a processor
    200         volatile int fd;
     217        struct __fd_waitctx * volatile fdw;
    201218
    202219        // Total number of processors
  • libcfa/src/concurrency/kernel/fwd.hfa

    rf57f6ea0 r97fed44  
    7979                        return
    8080                        #if defined(__SIZEOF_INT128__)
    81                                 __lehmer64( kernelTLS().rand_seed );
     81                                lehmer64( kernelTLS().rand_seed );
    8282                        #else
    83                                 __xorshift64( kernelTLS().rand_seed );
     83                                xorshift_13_7_17( kernelTLS().rand_seed );
    8484                        #endif
    8585                }
    8686
    87                 #define M  (1_l64u << 48_l64u)
    88                 #define A  (25214903917_l64u)
    89                 #define AI (18446708753438544741_l64u)
    90                 #define C  (11_l64u)
    91                 #define D  (16_l64u)
    92 
    9387                static inline unsigned __tls_rand_fwd() {
    94                         kernelTLS().ready_rng.fwd_seed = (A * kernelTLS().ready_rng.fwd_seed + C) & (M - 1);
    95                         return kernelTLS().ready_rng.fwd_seed >> D;
     88                        return LCGBI_fwd( kernelTLS().ready_rng.fwd_seed );
    9689                }
    9790
    9891                static inline unsigned __tls_rand_bck() {
    99                         unsigned int r = kernelTLS().ready_rng.bck_seed >> D;
    100                         kernelTLS().ready_rng.bck_seed = AI * (kernelTLS().ready_rng.bck_seed - C) & (M - 1);
    101                         return r;
    102                 }
    103 
    104                 #undef M
    105                 #undef A
    106                 #undef AI
    107                 #undef C
    108                 #undef D
     92                        return LCGBI_bck( kernelTLS().ready_rng.bck_seed );
     93                }
    10994
    11095                static inline void __tls_rand_advance_bck(void) {
     
    140125                        }
    141126                }
    142 
    143                 extern uint64_t thread_rand();
    144127
    145128                // Semaphore which only supports a single thread
  • libcfa/src/concurrency/kernel/startup.cfa

    rf57f6ea0 r97fed44  
    3434#include "kernel_private.hfa"
    3535#include "startup.hfa"          // STARTUP_PRIORITY_XXX
     36#include "limits.hfa"
    3637#include "math.hfa"
    3738
     
    177178
    178179
    179 
    180180//=============================================================================================
    181181// Kernel Setup logic
     
    515515        this.rdq.its = 0;
    516516        this.rdq.itr = 0;
    517         this.rdq.id  = -1u;
    518         this.rdq.target = -1u;
    519         this.rdq.last = -1u;
    520         this.rdq.cutoff = 0ull;
     517        this.rdq.id  = MAX;
     518        this.rdq.target = MAX;
     519        this.rdq.last = MAX;
     520        this.rdq.cpu = 0;
     521        // this.rdq.cutoff = 0ull;
    521522        do_terminate = false;
    522523        preemption_alarm = 0p;
     
    536537        }
    537538
     539        this.idle_wctx.fd = 0;
     540
     541        // I'm assuming these two are reserved for standard input and output
     542        // so I'm using them as sentinels with idle_wctx.
     543        /* paranoid */ verify( this.idle_fd != 0 );
     544        /* paranoid */ verify( this.idle_fd != 1 );
     545
    538546        #if !defined(__CFA_NO_STATISTICS__)
    539547                print_stats = 0;
     
    589597// Cluster
    590598static void ?{}(__cluster_proc_list & this) {
    591         this.fd    = 0;
     599        this.fdw   = 0p;
    592600        this.idle  = 0;
    593601        this.total = 0;
     
    686694        uint_fast32_t last_size;
    687695        [this->unique_id, last_size] = ready_mutate_register();
     696
     697                this->rdq.cpu = __kernel_getcpu();
    688698
    689699                this->cltr->procs.total += 1u;
  • libcfa/src/concurrency/locks.hfa

    rf57f6ea0 r97fed44  
    2929#include "time_t.hfa"
    3030#include "time.hfa"
    31 
    32 //-----------------------------------------------------------------------------
    33 // Semaphores
    34 
    35 // '0-nary' semaphore
    36 // Similar to a counting semaphore except the value of one is never reached
    37 // as a consequence, a V() that would bring the value to 1 *spins* until
    38 // a P consumes it
    39 struct Semaphore0nary {
    40         __spinlock_t lock; // needed to protect
    41         mpsc_queue(thread$) queue;
    42 };
    43 
    44 static inline bool P(Semaphore0nary & this, thread$ * thrd) {
    45         /* paranoid */ verify(!thrd`next);
    46         /* paranoid */ verify(!(&(*thrd)`next));
    47 
    48         push(this.queue, thrd);
    49         return true;
    50 }
    51 
    52 static inline bool P(Semaphore0nary & this) {
    53     thread$ * thrd = active_thread();
    54     P(this, thrd);
    55     park();
    56     return true;
    57 }
    58 
    59 static inline thread$ * V(Semaphore0nary & this, bool doUnpark = true) {
    60         thread$ * next;
    61         lock(this.lock __cfaabi_dbg_ctx2);
    62                 for (;;) {
    63                         next = pop(this.queue);
    64                         if (next) break;
    65                         Pause();
    66                 }
    67         unlock(this.lock);
    68 
    69         if (doUnpark) unpark(next);
    70         return next;
    71 }
    72 
    73 // Wrapper used on top of any sempahore to avoid potential locking
    74 struct BinaryBenaphore {
    75         volatile ssize_t counter;
    76 };
    77 
    78 static inline {
    79         void ?{}(BinaryBenaphore & this) { this.counter = 0; }
    80         void ?{}(BinaryBenaphore & this, zero_t) { this.counter = 0; }
    81         void ?{}(BinaryBenaphore & this, one_t ) { this.counter = 1; }
    82 
    83         // returns true if no blocking needed
    84         bool P(BinaryBenaphore & this) {
    85                 return __atomic_fetch_sub(&this.counter, 1, __ATOMIC_SEQ_CST) > 0;
    86         }
    87 
    88         bool tryP(BinaryBenaphore & this) {
    89                 ssize_t c = this.counter;
    90                 /* paranoid */ verify( c > MIN );
    91                 return (c >= 1) && __atomic_compare_exchange_n(&this.counter, &c, c-1, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED);
    92         }
    93 
    94         // returns true if notify needed
    95         bool V(BinaryBenaphore & this) {
    96                 ssize_t c = 0;
    97                 for () {
    98                         /* paranoid */ verify( this.counter < MAX );
    99                         if (__atomic_compare_exchange_n(&this.counter, &c, c+1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    100                                 if (c == 0) return true;
    101                                 /* paranoid */ verify(c < 0);
    102                                 return false;
    103                         } else {
    104                                 if (c == 1) return true;
    105                                 /* paranoid */ verify(c < 1);
    106                                 Pause();
    107                         }
    108                 }
    109         }
    110 }
    111 
    112 // Binary Semaphore based on the BinaryBenaphore on top of the 0-nary Semaphore
    113 struct ThreadBenaphore {
    114         BinaryBenaphore ben;
    115         Semaphore0nary  sem;
    116 };
    117 
    118 static inline void ?{}(ThreadBenaphore & this) {}
    119 static inline void ?{}(ThreadBenaphore & this, zero_t) { (this.ben){ 0 }; }
    120 static inline void ?{}(ThreadBenaphore & this, one_t ) { (this.ben){ 1 }; }
    121 
    122 static inline bool P(ThreadBenaphore & this)              { return P(this.ben) ? false : P(this.sem); }
    123 static inline bool tryP(ThreadBenaphore & this)           { return tryP(this.ben); }
    124 static inline bool P(ThreadBenaphore & this, bool wait)   { return wait ? P(this) : tryP(this); }
    125 
    126 static inline thread$ * V(ThreadBenaphore & this, bool doUnpark = true) {
    127         if (V(this.ben)) return 0p;
    128         return V(this.sem, doUnpark);
    129 }
    13031
    13132//-----------------------------------------------------------------------------
     
    17172static inline void   on_wakeup( owner_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); }
    17273static inline void   on_notify( owner_lock & this, struct thread$ * t ) { on_notify( (blocking_lock &)this, t ); }
    173 
    174 struct fast_lock {
    175         thread$ * volatile owner;
    176         ThreadBenaphore sem;
    177 };
    178 
    179 static inline void ?{}(fast_lock & this) __attribute__((deprecated("use linear_backoff_then_block_lock instead")));
    180 static inline void ?{}(fast_lock & this) { this.owner = 0p; }
    181 
    182 static inline bool $try_lock(fast_lock & this, thread$ * thrd) {
    183     thread$ * exp = 0p;
    184     return __atomic_compare_exchange_n(&this.owner, &exp, thrd, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED);
    185 }
    186 
    187 static inline void lock( fast_lock & this ) __attribute__((deprecated("use linear_backoff_then_block_lock instead"), artificial));
    188 static inline void lock( fast_lock & this ) {
    189         thread$ * thrd = active_thread();
    190         /* paranoid */verify(thrd != this.owner);
    191 
    192         for (;;) {
    193                 if ($try_lock(this, thrd)) return;
    194                 P(this.sem);
    195         }
    196 }
    197 
    198 static inline bool try_lock( fast_lock & this ) __attribute__((deprecated("use linear_backoff_then_block_lock instead"), artificial));
    199 static inline bool try_lock ( fast_lock & this ) {
    200         thread$ * thrd = active_thread();
    201         /* paranoid */ verify(thrd != this.owner);
    202         return $try_lock(this, thrd);
    203 }
    204 
    205 static inline thread$ * unlock( fast_lock & this ) __attribute__((deprecated("use linear_backoff_then_block_lock instead"), artificial));
    206 static inline thread$ * unlock( fast_lock & this ) {
    207         /* paranoid */ verify(active_thread() == this.owner);
    208 
    209         // open 'owner' before unlocking anyone
    210         // so new and unlocked threads don't park incorrectly.
    211         // This may require additional fencing on ARM.
    212         this.owner = 0p;
    213 
    214         return V(this.sem);
    215 }
    216 
    217 static inline size_t on_wait( fast_lock & this ) { unlock(this); return 0; }
    218 static inline void on_wakeup( fast_lock & this, size_t ) { lock(this); }
    219 static inline void on_notify( fast_lock &, struct thread$ * t ) { unpark(t); }
    22074
    22175struct mcs_node {
  • libcfa/src/concurrency/ready_queue.cfa

    rf57f6ea0 r97fed44  
    2020
    2121
    22 #define USE_RELAXED_FIFO
     22// #define USE_RELAXED_FIFO
    2323// #define USE_WORK_STEALING
    2424// #define USE_CPU_WORK_STEALING
     25#define USE_AWARE_STEALING
    2526
    2627#include "bits/defs.hfa"
     
    2930
    3031#include "stdlib.hfa"
     32#include "limits.hfa"
    3133#include "math.hfa"
    3234
     
    5456#endif
    5557
    56 #if   defined(USE_CPU_WORK_STEALING)
     58#if   defined(USE_AWARE_STEALING)
     59        #define READYQ_SHARD_FACTOR 2
     60        #define SEQUENTIAL_SHARD 2
     61#elif defined(USE_CPU_WORK_STEALING)
    5762        #define READYQ_SHARD_FACTOR 2
    5863#elif defined(USE_RELAXED_FIFO)
     
    138143        __kernel_rseq_register();
    139144
    140         __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
    141145        bool * handle = (bool *)&kernelTLS().sched_lock;
    142146
     
    174178        }
    175179
    176         __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);
    177 
    178180        // Return new spot.
    179181        /* paranoid */ verify(n < ready);
     
    190192
    191193        __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
    192 
    193         __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
    194194
    195195        __kernel_rseq_unregister();
     
    201201uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
    202202        /* paranoid */ verify( ! __preemption_enabled() );
    203         /* paranoid */ verify( ! kernelTLS().sched_lock );
    204203
    205204        // Step 1 : lock global lock
     
    207206        //   to simply lock their own lock and enter.
    208207        __atomic_acquire( &write_lock );
     208
     209        // Make sure we won't deadlock ourself
     210        // Checking before acquiring the writer lock isn't safe
     211        // because someone else could have locked us.
     212        /* paranoid */ verify( ! kernelTLS().sched_lock );
    209213
    210214        // Step 2 : lock per-proc lock
     
    244248
    245249//=======================================================================
     250// caches handling
     251
     252struct __attribute__((aligned(128))) __ready_queue_caches_t {
     253        // Count States:
     254        // - 0  : No one is looking after this cache
     255        // - 1  : No one is looking after this cache, BUT it's not empty
     256        // - 2+ : At least one processor is looking after this cache
     257        volatile unsigned count;
     258};
     259
     260void  ?{}(__ready_queue_caches_t & this) { this.count = 0; }
     261void ^?{}(__ready_queue_caches_t & this) {}
     262
     263static inline void depart(__ready_queue_caches_t & cache) {
     264        /* paranoid */ verify( cache.count > 1);
     265        __atomic_fetch_add(&cache.count, -1, __ATOMIC_SEQ_CST);
     266        /* paranoid */ verify( cache.count != 0);
     267        /* paranoid */ verify( cache.count < 65536 ); // This verify assumes no cluster will have more than 65000 kernel threads mapped to a single cache, which could be correct but is super weird.
     268}
     269
     270static inline void arrive(__ready_queue_caches_t & cache) {
     271        // for() {
     272        //      unsigned expected = cache.count;
     273        //      unsigned desired  = 0 == expected ? 2 : expected + 1;
     274        // }
     275}
     276
     277//=======================================================================
    246278// Cforall Ready Queue used for scheduling
    247279//=======================================================================
    248 unsigned long long moving_average(unsigned long long nval, unsigned long long oval) {
    249         const unsigned long long tw = 16;
    250         const unsigned long long nw = 4;
    251         const unsigned long long ow = tw - nw;
    252         return ((nw * nval) + (ow * oval)) / tw;
     280unsigned long long moving_average(unsigned long long currtsc, unsigned long long instsc, unsigned long long old_avg) {
     281        /* paranoid */ verifyf( currtsc < 45000000000000000, "Suspiciously large current time: %'llu (%llx)\n", currtsc, currtsc );
     282        /* paranoid */ verifyf( instsc  < 45000000000000000, "Suspiciously large insert time: %'llu (%llx)\n", instsc, instsc );
     283        /* paranoid */ verifyf( old_avg < 15000000000000, "Suspiciously large previous average: %'llu (%llx)\n", old_avg, old_avg );
     284
     285        const unsigned long long new_val = currtsc > instsc ? currtsc - instsc : 0;
     286        const unsigned long long total_weight = 16;
     287        const unsigned long long new_weight   = 4;
     288        const unsigned long long old_weight = total_weight - new_weight;
     289        const unsigned long long ret = ((new_weight * new_val) + (old_weight * old_avg)) / total_weight;
     290        return ret;
    253291}
    254292
     
    271309                }
    272310        #else
    273                 lanes.data  = 0p;
    274                 lanes.tscs  = 0p;
    275                 lanes.help  = 0p;
    276                 lanes.count = 0;
     311                lanes.data   = 0p;
     312                lanes.tscs   = 0p;
     313                lanes.caches = 0p;
     314                lanes.help   = 0p;
     315                lanes.count  = 0;
    277316        #endif
    278317}
     
    285324        free(lanes.data);
    286325        free(lanes.tscs);
     326        free(lanes.caches);
    287327        free(lanes.help);
    288328}
    289329
    290330//-----------------------------------------------------------------------
     331#if defined(USE_AWARE_STEALING)
     332        __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
     333                processor * const proc = kernelTLS().this_processor;
     334                const bool external = (!proc) || (cltr != proc->cltr);
     335                const bool remote   = hint == UNPARK_REMOTE;
     336
     337                unsigned i;
     338                if( external || remote ) {
     339                        // Figure out where thread was last time and make sure it's valid
     340                        /* paranoid */ verify(thrd->preferred >= 0);
     341                        if(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count) {
     342                                /* paranoid */ verify(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count);
     343                                unsigned start = thrd->preferred * READYQ_SHARD_FACTOR;
     344                                do {
     345                                        unsigned r = __tls_rand();
     346                                        i = start + (r % READYQ_SHARD_FACTOR);
     347                                        /* paranoid */ verify( i < lanes.count );
     348                                        // If we can't lock it retry
     349                                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     350                        } else {
     351                                do {
     352                                        i = __tls_rand() % lanes.count;
     353                                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     354                        }
     355                } else {
     356                        do {
     357                                unsigned r = proc->rdq.its++;
     358                                i = proc->rdq.id + (r % READYQ_SHARD_FACTOR);
     359                                /* paranoid */ verify( i < lanes.count );
     360                                // If we can't lock it retry
     361                        } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     362                }
     363
     364                // Actually push it
     365                push(lanes.data[i], thrd);
     366
     367                // Unlock and return
     368                __atomic_unlock( &lanes.data[i].lock );
     369
     370                #if !defined(__CFA_NO_STATISTICS__)
     371                        if(unlikely(external || remote)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
     372                        else __tls_stats()->ready.push.local.success++;
     373                #endif
     374        }
     375
     376        static inline unsigned long long calc_cutoff(const unsigned long long ctsc, const processor * proc, __ready_queue_t & rdq) {
     377                unsigned start = proc->rdq.id;
     378                unsigned long long max = 0;
     379                for(i; READYQ_SHARD_FACTOR) {
     380                        unsigned long long ptsc = ts(rdq.lanes.data[start + i]);
     381                        if(ptsc != -1ull) {
     382                                /* paranoid */ verify( start + i < rdq.lanes.count );
     383                                unsigned long long tsc = moving_average(ctsc, ptsc, rdq.lanes.tscs[start + i].ma);
     384                                if(tsc > max) max = tsc;
     385                        }
     386                }
     387                return (max + 2 * max) / 2;
     388        }
     389
     390        __attribute__((hot)) struct thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
     391                /* paranoid */ verify( lanes.count > 0 );
     392                /* paranoid */ verify( kernelTLS().this_processor );
     393                /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
     394
     395                processor * const proc = kernelTLS().this_processor;
     396                unsigned this = proc->rdq.id;
     397                /* paranoid */ verify( this < lanes.count );
     398                __cfadbg_print_safe(ready_queue, "Kernel : pop from %u\n", this);
     399
     400                // Figure out the current cpu and make sure it is valid
     401                const int cpu = __kernel_getcpu();
     402                /* paranoid */ verify(cpu >= 0);
     403                /* paranoid */ verify(cpu < cpu_info.hthrd_count);
     404                unsigned this_cache = cpu_info.llc_map[cpu].cache;
     405
     406                // Super important: don't write the same value over and over again
     407                // We want to maximise our chances that his particular values stays in cache
     408                if(lanes.caches[this / READYQ_SHARD_FACTOR].id != this_cache)
     409                        __atomic_store_n(&lanes.caches[this / READYQ_SHARD_FACTOR].id, this_cache, __ATOMIC_RELAXED);
     410
     411                const unsigned long long ctsc = rdtscl();
     412
     413                if(proc->rdq.target == MAX) {
     414                        uint64_t chaos = __tls_rand();
     415                        unsigned ext = chaos & 0xff;
     416                        unsigned other  = (chaos >> 8) % (lanes.count);
     417
     418                        if(ext < 3 || __atomic_load_n(&lanes.caches[other / READYQ_SHARD_FACTOR].id, __ATOMIC_RELAXED) == this_cache) {
     419                                proc->rdq.target = other;
     420                        }
     421                }
     422                else {
     423                        const unsigned target = proc->rdq.target;
     424                        __cfadbg_print_safe(ready_queue, "Kernel : %u considering helping %u, tcsc %llu\n", this, target, lanes.tscs[target].tv);
     425                        /* paranoid */ verify( lanes.tscs[target].tv != MAX );
     426                        if(target < lanes.count) {
     427                                const unsigned long long cutoff = calc_cutoff(ctsc, proc, cltr->ready_queue);
     428                                const unsigned long long age = moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma);
     429                                __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no");
     430                                if(age > cutoff) {
     431                                        thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
     432                                        if(t) return t;
     433                                }
     434                        }
     435                        proc->rdq.target = MAX;
     436                }
     437
     438                for(READYQ_SHARD_FACTOR) {
     439                        unsigned i = this + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
     440                        if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
     441                }
     442
     443                // All lanes where empty return 0p
     444                return 0p;
     445
     446        }
     447        __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
     448                unsigned i = __tls_rand() % lanes.count;
     449                return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
     450        }
     451        __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) {
     452                return search(cltr);
     453        }
     454#endif
    291455#if defined(USE_CPU_WORK_STEALING)
    292456        __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
     
    350514                /* paranoid */ verify( kernelTLS().this_processor );
    351515
     516                processor * const proc = kernelTLS().this_processor;
    352517                const int cpu = __kernel_getcpu();
    353518                /* paranoid */ verify(cpu >= 0);
     
    360525                /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %zu lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR);
    361526
    362                 processor * const proc = kernelTLS().this_processor;
    363527                const int start = map.self * READYQ_SHARD_FACTOR;
    364528                const unsigned long long ctsc = rdtscl();
    365529
    366530                // Did we already have a help target
    367                 if(proc->rdq.target == -1u) {
     531                if(proc->rdq.target == MAX) {
    368532                        unsigned long long max = 0;
    369533                        for(i; READYQ_SHARD_FACTOR) {
    370                                 unsigned long long tsc = moving_average(ctsc - ts(lanes.data[start + i]), lanes.tscs[start + i].ma);
     534                                unsigned long long tsc = moving_average(ctsc, ts(lanes.data[start + i]), lanes.tscs[start + i].ma);
    371535                                if(tsc > max) max = tsc;
    372536                        }
    373                         proc->rdq.cutoff = (max + 2 * max) / 2;
     537                        // proc->rdq.cutoff = (max + 2 * max) / 2;
    374538                        /* paranoid */ verify(lanes.count < 65536); // The following code assumes max 65536 cores.
    375539                        /* paranoid */ verify(map.count < 65536); // The following code assumes max 65536 cores.
     
    384548                        }
    385549
    386                         /* paranoid */ verify(proc->rdq.target != -1u);
     550                        /* paranoid */ verify(proc->rdq.target != MAX);
    387551                }
    388552                else {
    389553                        unsigned long long max = 0;
    390554                        for(i; READYQ_SHARD_FACTOR) {
    391                                 unsigned long long tsc = moving_average(ctsc - ts(lanes.data[start + i]), lanes.tscs[start + i].ma);
     555                                unsigned long long tsc = moving_average(ctsc, ts(lanes.data[start + i]), lanes.tscs[start + i].ma);
    392556                                if(tsc > max) max = tsc;
    393557                        }
     
    395559                        {
    396560                                unsigned target = proc->rdq.target;
    397                                 proc->rdq.target = -1u;
     561                                proc->rdq.target = MAX;
    398562                                lanes.help[target / READYQ_SHARD_FACTOR].tri++;
    399                                 if(moving_average(ctsc - lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) {
     563                                if(moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) {
    400564                                        thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
    401565                                        proc->rdq.last = target;
    402566                                        if(t) return t;
    403                                         else proc->rdq.target = -1u;
    404567                                }
    405                                 else proc->rdq.target = -1u;
     568                                proc->rdq.target = MAX;
    406569                        }
    407570
    408571                        unsigned last = proc->rdq.last;
    409                         if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) < cutoff) {
     572                        if(last != MAX && moving_average(ctsc, lanes.tscs[last].tv, lanes.tscs[last].ma) > cutoff) {
    410573                                thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help));
    411574                                if(t) return t;
    412575                        }
    413576                        else {
    414                                 proc->rdq.last = -1u;
     577                                proc->rdq.last = MAX;
    415578                        }
    416579                }
     
    428591                processor * const proc = kernelTLS().this_processor;
    429592                unsigned last = proc->rdq.last;
    430                 if(last != -1u) {
     593                if(last != MAX) {
    431594                        struct thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.steal));
    432595                        if(t) return t;
    433                         proc->rdq.last = -1u;
     596                        proc->rdq.last = MAX;
    434597                }
    435598
     
    560723                #else
    561724                        unsigned preferred = thrd->preferred;
    562                         const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr;
     725                        const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == MAX || thrd->curr_cluster != cltr;
    563726                        /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count );
    564727
     
    612775                processor * proc = kernelTLS().this_processor;
    613776
    614                 if(proc->rdq.target == -1u) {
     777                if(proc->rdq.target == MAX) {
    615778                        unsigned long long min = ts(lanes.data[proc->rdq.id]);
    616779                        for(int i = 0; i < READYQ_SHARD_FACTOR; i++) {
     
    623786                else {
    624787                        unsigned target = proc->rdq.target;
    625                         proc->rdq.target = -1u;
     788                        proc->rdq.target = MAX;
    626789                        const unsigned long long bias = 0; //2_500_000_000;
    627790                        const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
     
    658821// try to pop from a lane given by index w
    659822static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
     823        /* paranoid */ verify( w < lanes.count );
    660824        __STATS( stats.attempt++; )
    661825
     
    681845        // Actually pop the list
    682846        struct thread$ * thrd;
    683         #if defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING)
     847        #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING)
    684848                unsigned long long tsc_before = ts(lane);
    685849        #endif
     
    697861        __STATS( stats.success++; )
    698862
    699         #if defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING)
    700                 unsigned long long now = rdtscl();
    701                 lanes.tscs[w].tv = tsv;
    702                 lanes.tscs[w].ma = moving_average(now > tsc_before ? now - tsc_before : 0, lanes.tscs[w].ma);
     863        #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING)
     864                if (tsv != MAX) {
     865                        unsigned long long now = rdtscl();
     866                        unsigned long long pma = __atomic_load_n(&lanes.tscs[w].ma, __ATOMIC_RELAXED);
     867                        __atomic_store_n(&lanes.tscs[w].tv, tsv, __ATOMIC_RELAXED);
     868                        __atomic_store_n(&lanes.tscs[w].ma, moving_average(now, tsc_before, pma), __ATOMIC_RELAXED);
     869                }
    703870        #endif
    704871
    705         #if defined(USE_CPU_WORK_STEALING)
     872        #if defined(USE_AWARE_STEALING) || defined(USE_CPU_WORK_STEALING)
    706873                thrd->preferred = w / READYQ_SHARD_FACTOR;
    707874        #else
     
    802969                /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
    803970                it->rdq.id = value;
    804                 it->rdq.target = -1u;
     971                it->rdq.target = MAX;
    805972                value += READYQ_SHARD_FACTOR;
    806973                it = &(*it)`next;
     
    815982
    816983static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) {
    817         #if defined(USE_WORK_STEALING)
     984        #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING)
    818985                lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
    819986                for(i; lanes.count) {
    820                         unsigned long long tsc1 = ts(lanes.data[i]);
    821                         unsigned long long tsc2 = rdtscl();
    822                         lanes.tscs[i].tv = min(tsc1, tsc2);
     987                        lanes.tscs[i].tv = rdtscl();
     988                        lanes.tscs[i].ma = 0;
    823989                }
    824990        #endif
     
    8661032                        // Update original
    8671033                        lanes.count = ncount;
     1034
     1035                        lanes.caches = alloc( target, lanes.caches`realloc );
    8681036                }
    8691037
     
    9421110                                fix(lanes.data[idx]);
    9431111                        }
     1112
     1113                        lanes.caches = alloc( target, lanes.caches`realloc );
    9441114                }
    9451115
    9461116                fix_times(cltr);
     1117
    9471118
    9481119                reassign_cltr_id(cltr);
  • libcfa/src/concurrency/stats.cfa

    rf57f6ea0 r97fed44  
    3131                stats->ready.sleep.halts   = 0;
    3232                stats->ready.sleep.cancels = 0;
     33                stats->ready.sleep.early   = 0;
    3334                stats->ready.sleep.wakes   = 0;
     35                stats->ready.sleep.seen    = 0;
    3436                stats->ready.sleep.exits   = 0;
    3537
     
    9193                tally_one( &cltr->ready.sleep.halts       , &proc->ready.sleep.halts        );
    9294                tally_one( &cltr->ready.sleep.cancels     , &proc->ready.sleep.cancels      );
     95                tally_one( &cltr->ready.sleep.early       , &proc->ready.sleep.early        );
    9396                tally_one( &cltr->ready.sleep.wakes       , &proc->ready.sleep.wakes        );
     97                tally_one( &cltr->ready.sleep.seen        , &proc->ready.sleep.wakes        );
    9498                tally_one( &cltr->ready.sleep.exits       , &proc->ready.sleep.exits        );
    9599
     
    153157                             | " (" | eng3(ready.pop.search.attempt) | " try)";
    154158
    155                         sstr | "- Idle Slp : " | eng3(ready.sleep.halts) | "halt," | eng3(ready.sleep.cancels) | "cancel," | eng3(ready.sleep.wakes) | "wake," | eng3(ready.sleep.exits) | "exit";
     159                        sstr | "- Idle Slp : " | eng3(ready.sleep.halts) | "halt," | eng3(ready.sleep.cancels) | "cancel,"
     160                             | eng3(ready.sleep.wakes + ready.sleep.early) | '(' | eng3(ready.sleep.early) | ',' | eng3(ready.sleep.seen) | ')' | " wake(early, seen),"
     161                             | eng3(ready.sleep.exits) | "exit";
    156162                        sstr | nl;
    157163                }
  • libcfa/src/concurrency/stats.hfa

    rf57f6ea0 r97fed44  
    6969                        volatile uint64_t halts;
    7070                        volatile uint64_t cancels;
     71                        volatile uint64_t early;
    7172                        volatile uint64_t wakes;
     73                        volatile uint64_t seen;
    7274                        volatile uint64_t exits;
    7375                } sleep;
  • libcfa/src/concurrency/thread.cfa

    rf57f6ea0 r97fed44  
    1010// Created On       : Tue Jan 17 12:27:26 2017
    1111// Last Modified By : Peter A. Buhr
    12 // Last Modified On : Wed Jan 12 18:46:48 2022
    13 // Update Count     : 36
     12// Last Modified On : Thu Jan 13 20:11:55 2022
     13// Update Count     : 42
    1414//
    1515
     
    2424#define __CFA_INVOKE_PRIVATE__
    2525#include "invoke.h"
    26 
    27 uint64_t thread_rand();
    2826
    2927extern uint32_t __global_random_seed;
     
    174172}
    175173
    176 uint64_t thread_rand() {
    177         disable_interrupts();
    178         uint64_t ret = __tls_rand();
    179         enable_interrupts();
    180         return ret;
    181 }
    182  
     174//-----------------------------------------------------------------------------
    183175#define GENERATOR LCG
    184 
    185 static inline uint32_t MarsagliaXor( uint32_t & state ) {
    186         uint32_t ret = state;
    187         state ^= state << 6;
    188         state ^= state >> 21;
    189         state ^= state << 7;
    190         return ret;
    191 } // MarsagliaXor
    192 
    193 static inline uint32_t LCG( uint32_t & state ) {                // linear congruential generator
    194         uint32_t ret = state;
    195         state = 36969 * (state & 65535) + (state >> 16);        // 36969 is NOT prime! No not change it!
    196         return ret;
    197 } // LCG
    198176
    199177void set_seed( uint32_t seed ) {
Note: See TracChangeset for help on using the changeset viewer.