Changeset 97fed44 for libcfa/src/concurrency
- Timestamp:
- Jan 25, 2022, 4:54:35 PM (4 years ago)
- Branches:
- ADT, ast-experimental, enum, forall-pointer-decay, master, pthread-emulation, qualifiedEnum
- Children:
- 6b2d444, a488783, f681823
- Parents:
- f57f6ea0 (diff), 4fcbf26 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- libcfa/src/concurrency
- Files:
-
- 11 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/clib/cfathread.cfa
rf57f6ea0 r97fed44 22 22 #include "thread.hfa" 23 23 #include "time.hfa" 24 #include "stdlib.hfa" 24 25 25 26 #include "cfathread.h" … … 195 196 eevent.data.u64 = (uint64_t)active_thread(); 196 197 197 int id = thread_rand() % poller_cnt;198 int id = prng() % poller_cnt; 198 199 if(0 != epoll_ctl(poller_fds[id], EPOLL_CTL_ADD, fd, &eevent)) 199 200 { -
libcfa/src/concurrency/io.cfa
rf57f6ea0 r97fed44 144 144 __ioarbiter_flush( ctx ); 145 145 146 __STATS__( true, io.calls.flush++; ) 147 int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, min_comp > 0 ? IORING_ENTER_GETEVENTS : 0, (sigset_t *)0p, _NSIG / 8); 148 if( ret < 0 ) { 149 switch((int)errno) { 150 case EAGAIN: 151 case EINTR: 152 case EBUSY: 153 // Update statistics 154 __STATS__( false, io.calls.errors.busy ++; ) 155 return false; 156 default: 157 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 146 if(ctx.sq.to_submit != 0 || min_comp > 0) { 147 148 __STATS__( true, io.calls.flush++; ) 149 int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, min_comp > 0 ? IORING_ENTER_GETEVENTS : 0, (sigset_t *)0p, _NSIG / 8); 150 if( ret < 0 ) { 151 switch((int)errno) { 152 case EAGAIN: 153 case EINTR: 154 case EBUSY: 155 // Update statistics 156 __STATS__( false, io.calls.errors.busy ++; ) 157 return false; 158 default: 159 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 160 } 158 161 } 159 } 160 161 __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd); 162 __STATS__( true, io.calls.submitted += ret; ) 163 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 164 /* paranoid */ verify( ctx.sq.to_submit >= ret ); 165 166 ctx.sq.to_submit -= ret; 167 168 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 169 170 // Release the consumed SQEs 171 __release_sqes( ctx ); 172 173 /* paranoid */ verify( ! __preemption_enabled() ); 174 175 ctx.proc->io.pending = false; 162 163 __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd); 164 __STATS__( true, io.calls.submitted += ret; ) 165 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 166 /* paranoid */ verify( ctx.sq.to_submit >= ret ); 167 168 ctx.sq.to_submit -= ret; 169 170 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 171 172 // Release the consumed SQEs 173 __release_sqes( ctx ); 174 175 /* paranoid */ verify( ! __preemption_enabled() ); 176 177 ctx.proc->io.pending = false; 178 } 179 176 180 ready_schedule_lock(); 177 181 bool ret = __cfa_io_drain( proc ); -
libcfa/src/concurrency/kernel.cfa
rf57f6ea0 r97fed44 205 205 // Don't block if we are done 206 206 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP; 207 208 #if !defined(__CFA_NO_STATISTICS__)209 __tls_stats()->ready.sleep.halts++;210 #endif211 207 212 208 // Push self to idle stack … … 732 728 // Wake a thread from the front if there are any 733 729 static void __wake_one(cluster * this) { 730 eventfd_t val; 731 734 732 /* paranoid */ verify( ! __preemption_enabled() ); 735 733 /* paranoid */ verify( ready_schedule_islocked() ); 736 734 737 735 // Check if there is a sleeping processor 738 // int fd = __atomic_load_n(&this->procs.fd, __ATOMIC_SEQ_CST); 739 int fd = 0; 740 if( __atomic_load_n(&this->procs.fd, __ATOMIC_SEQ_CST) != 0 ) { 741 fd = __atomic_exchange_n(&this->procs.fd, 0, __ATOMIC_RELAXED); 742 } 743 744 // If no one is sleeping, we are done 745 if( fd == 0 ) return; 746 747 // We found a processor, wake it up 748 eventfd_t val; 749 val = 1; 750 eventfd_write( fd, val ); 751 752 #if !defined(__CFA_NO_STATISTICS__) 753 if( kernelTLS().this_stats ) { 754 __tls_stats()->ready.sleep.wakes++; 755 } 756 else { 757 __atomic_fetch_add(&this->stats->ready.sleep.wakes, 1, __ATOMIC_RELAXED); 758 } 759 #endif 736 struct __fd_waitctx * fdp = __atomic_load_n(&this->procs.fdw, __ATOMIC_SEQ_CST); 737 738 // If no one is sleeping: we are done 739 if( fdp == 0p ) return; 740 741 int fd = 1; 742 if( __atomic_load_n(&fdp->fd, __ATOMIC_SEQ_CST) != 1 ) { 743 fd = __atomic_exchange_n(&fdp->fd, 1, __ATOMIC_RELAXED); 744 } 745 746 switch(fd) { 747 case 0: 748 // If the processor isn't ready to sleep then the exchange will already wake it up 749 #if !defined(__CFA_NO_STATISTICS__) 750 if( kernelTLS().this_stats ) { __tls_stats()->ready.sleep.early++; 751 } else { __atomic_fetch_add(&this->stats->ready.sleep.early, 1, __ATOMIC_RELAXED); } 752 #endif 753 break; 754 case 1: 755 // If someone else already said they will wake them: we are done 756 #if !defined(__CFA_NO_STATISTICS__) 757 if( kernelTLS().this_stats ) { __tls_stats()->ready.sleep.seen++; 758 } else { __atomic_fetch_add(&this->stats->ready.sleep.seen, 1, __ATOMIC_RELAXED); } 759 #endif 760 break; 761 default: 762 // If the processor was ready to sleep, we need to wake it up with an actual write 763 val = 1; 764 eventfd_write( fd, val ); 765 766 #if !defined(__CFA_NO_STATISTICS__) 767 if( kernelTLS().this_stats ) { __tls_stats()->ready.sleep.wakes++; 768 } else { __atomic_fetch_add(&this->stats->ready.sleep.wakes, 1, __ATOMIC_RELAXED); } 769 #endif 770 break; 771 } 760 772 761 773 /* paranoid */ verify( ready_schedule_islocked() ); … … 770 782 771 783 __cfadbg_print_safe(runtime_core, "Kernel : waking Processor %p\n", this); 784 785 this->idle_wctx.fd = 1; 772 786 773 787 eventfd_t val; … … 779 793 780 794 static void idle_sleep(processor * this, io_future_t & future, iovec & iov) { 795 // Tell everyone we are ready to go do sleep 796 for() { 797 int expected = this->idle_wctx.fd; 798 799 // Someone already told us to wake-up! No time for a nap. 800 if(expected == 1) { return; } 801 802 // Try to mark that we are going to sleep 803 if(__atomic_compare_exchange_n(&this->idle_wctx.fd, &expected, this->idle_fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) { 804 // Every one agreed, taking a nap 805 break; 806 } 807 } 808 809 781 810 #if !defined(CFA_WITH_IO_URING_IDLE) 782 811 #if !defined(__CFA_NO_STATISTICS__) … … 825 854 826 855 static bool mark_idle(__cluster_proc_list & this, processor & proc) { 856 #if !defined(__CFA_NO_STATISTICS__) 857 __tls_stats()->ready.sleep.halts++; 858 #endif 859 860 proc.idle_wctx.fd = 0; 861 827 862 /* paranoid */ verify( ! __preemption_enabled() ); 828 863 if(!try_lock( this )) return false; … … 832 867 insert_first(this.idles, proc); 833 868 834 __atomic_store_n(&this.fd , proc.idle_fd, __ATOMIC_SEQ_CST);869 __atomic_store_n(&this.fdw, &proc.idle_wctx, __ATOMIC_SEQ_CST); 835 870 unlock( this ); 836 871 /* paranoid */ verify( ! __preemption_enabled() ); … … 848 883 849 884 { 850 int fd= 0;851 if(!this.idles`isEmpty) fd = this.idles`first.idle_fd;852 __atomic_store_n(&this.fd , fd, __ATOMIC_SEQ_CST);885 struct __fd_waitctx * wctx = 0; 886 if(!this.idles`isEmpty) wctx = &this.idles`first.idle_wctx; 887 __atomic_store_n(&this.fdw, wctx, __ATOMIC_SEQ_CST); 853 888 } 854 889 -
libcfa/src/concurrency/kernel.hfa
rf57f6ea0 r97fed44 55 55 }; 56 56 57 58 struct __fd_waitctx { 59 volatile int fd; 60 }; 61 57 62 // Wrapper around kernel threads 58 63 struct __attribute__((aligned(128))) processor { … … 67 72 unsigned target; 68 73 unsigned last; 69 unsigned cnt; 70 unsigned long long int cutoff; 74 signed cpu; 71 75 } rdq; 72 76 … … 102 106 int idle_fd; 103 107 108 // Idle waitctx 109 struct __fd_waitctx idle_wctx; 110 104 111 // Termination synchronisation (user semaphore) 105 112 oneshot terminated; … … 152 159 volatile unsigned long long tv; 153 160 volatile unsigned long long ma; 161 }; 162 163 struct __attribute__((aligned(16))) __cache_id_t { 164 volatile unsigned id; 154 165 }; 155 166 … … 164 175 static inline void ^?{}(__timestamp_t & this) {} 165 176 177 struct __attribute__((aligned(128))) __ready_queue_caches_t; 178 void ?{}(__ready_queue_caches_t & this); 179 void ^?{}(__ready_queue_caches_t & this); 180 166 181 //TODO adjust cache size to ARCHITECTURE 167 // Structure holding the re laxed ready queue182 // Structure holding the ready queue 168 183 struct __ready_queue_t { 169 184 // Data tracking the actual lanes … … 178 193 __timestamp_t * volatile tscs; 179 194 195 __cache_id_t * volatile caches; 196 180 197 // Array of stats 181 198 __help_cnts_t * volatile help; … … 198 215 199 216 // FD to use to wake a processor 200 volatile int fd;217 struct __fd_waitctx * volatile fdw; 201 218 202 219 // Total number of processors -
libcfa/src/concurrency/kernel/fwd.hfa
rf57f6ea0 r97fed44 79 79 return 80 80 #if defined(__SIZEOF_INT128__) 81 __lehmer64( kernelTLS().rand_seed );81 lehmer64( kernelTLS().rand_seed ); 82 82 #else 83 __xorshift64( kernelTLS().rand_seed );83 xorshift_13_7_17( kernelTLS().rand_seed ); 84 84 #endif 85 85 } 86 86 87 #define M (1_l64u << 48_l64u)88 #define A (25214903917_l64u)89 #define AI (18446708753438544741_l64u)90 #define C (11_l64u)91 #define D (16_l64u)92 93 87 static inline unsigned __tls_rand_fwd() { 94 kernelTLS().ready_rng.fwd_seed = (A * kernelTLS().ready_rng.fwd_seed + C) & (M - 1); 95 return kernelTLS().ready_rng.fwd_seed >> D; 88 return LCGBI_fwd( kernelTLS().ready_rng.fwd_seed ); 96 89 } 97 90 98 91 static inline unsigned __tls_rand_bck() { 99 unsigned int r = kernelTLS().ready_rng.bck_seed >> D; 100 kernelTLS().ready_rng.bck_seed = AI * (kernelTLS().ready_rng.bck_seed - C) & (M - 1); 101 return r; 102 } 103 104 #undef M 105 #undef A 106 #undef AI 107 #undef C 108 #undef D 92 return LCGBI_bck( kernelTLS().ready_rng.bck_seed ); 93 } 109 94 110 95 static inline void __tls_rand_advance_bck(void) { … … 140 125 } 141 126 } 142 143 extern uint64_t thread_rand();144 127 145 128 // Semaphore which only supports a single thread -
libcfa/src/concurrency/kernel/startup.cfa
rf57f6ea0 r97fed44 34 34 #include "kernel_private.hfa" 35 35 #include "startup.hfa" // STARTUP_PRIORITY_XXX 36 #include "limits.hfa" 36 37 #include "math.hfa" 37 38 … … 177 178 178 179 179 180 180 //============================================================================================= 181 181 // Kernel Setup logic … … 515 515 this.rdq.its = 0; 516 516 this.rdq.itr = 0; 517 this.rdq.id = -1u; 518 this.rdq.target = -1u; 519 this.rdq.last = -1u; 520 this.rdq.cutoff = 0ull; 517 this.rdq.id = MAX; 518 this.rdq.target = MAX; 519 this.rdq.last = MAX; 520 this.rdq.cpu = 0; 521 // this.rdq.cutoff = 0ull; 521 522 do_terminate = false; 522 523 preemption_alarm = 0p; … … 536 537 } 537 538 539 this.idle_wctx.fd = 0; 540 541 // I'm assuming these two are reserved for standard input and output 542 // so I'm using them as sentinels with idle_wctx. 543 /* paranoid */ verify( this.idle_fd != 0 ); 544 /* paranoid */ verify( this.idle_fd != 1 ); 545 538 546 #if !defined(__CFA_NO_STATISTICS__) 539 547 print_stats = 0; … … 589 597 // Cluster 590 598 static void ?{}(__cluster_proc_list & this) { 591 this.fd = 0;599 this.fdw = 0p; 592 600 this.idle = 0; 593 601 this.total = 0; … … 686 694 uint_fast32_t last_size; 687 695 [this->unique_id, last_size] = ready_mutate_register(); 696 697 this->rdq.cpu = __kernel_getcpu(); 688 698 689 699 this->cltr->procs.total += 1u; -
libcfa/src/concurrency/locks.hfa
rf57f6ea0 r97fed44 29 29 #include "time_t.hfa" 30 30 #include "time.hfa" 31 32 //-----------------------------------------------------------------------------33 // Semaphores34 35 // '0-nary' semaphore36 // Similar to a counting semaphore except the value of one is never reached37 // as a consequence, a V() that would bring the value to 1 *spins* until38 // a P consumes it39 struct Semaphore0nary {40 __spinlock_t lock; // needed to protect41 mpsc_queue(thread$) queue;42 };43 44 static inline bool P(Semaphore0nary & this, thread$ * thrd) {45 /* paranoid */ verify(!thrd`next);46 /* paranoid */ verify(!(&(*thrd)`next));47 48 push(this.queue, thrd);49 return true;50 }51 52 static inline bool P(Semaphore0nary & this) {53 thread$ * thrd = active_thread();54 P(this, thrd);55 park();56 return true;57 }58 59 static inline thread$ * V(Semaphore0nary & this, bool doUnpark = true) {60 thread$ * next;61 lock(this.lock __cfaabi_dbg_ctx2);62 for (;;) {63 next = pop(this.queue);64 if (next) break;65 Pause();66 }67 unlock(this.lock);68 69 if (doUnpark) unpark(next);70 return next;71 }72 73 // Wrapper used on top of any sempahore to avoid potential locking74 struct BinaryBenaphore {75 volatile ssize_t counter;76 };77 78 static inline {79 void ?{}(BinaryBenaphore & this) { this.counter = 0; }80 void ?{}(BinaryBenaphore & this, zero_t) { this.counter = 0; }81 void ?{}(BinaryBenaphore & this, one_t ) { this.counter = 1; }82 83 // returns true if no blocking needed84 bool P(BinaryBenaphore & this) {85 return __atomic_fetch_sub(&this.counter, 1, __ATOMIC_SEQ_CST) > 0;86 }87 88 bool tryP(BinaryBenaphore & this) {89 ssize_t c = this.counter;90 /* paranoid */ verify( c > MIN );91 return (c >= 1) && __atomic_compare_exchange_n(&this.counter, &c, c-1, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED);92 }93 94 // returns true if notify needed95 bool V(BinaryBenaphore & this) {96 ssize_t c = 0;97 for () {98 /* paranoid */ verify( this.counter < MAX );99 if (__atomic_compare_exchange_n(&this.counter, &c, c+1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {100 if (c == 0) return true;101 /* paranoid */ verify(c < 0);102 return false;103 } else {104 if (c == 1) return true;105 /* paranoid */ verify(c < 1);106 Pause();107 }108 }109 }110 }111 112 // Binary Semaphore based on the BinaryBenaphore on top of the 0-nary Semaphore113 struct ThreadBenaphore {114 BinaryBenaphore ben;115 Semaphore0nary sem;116 };117 118 static inline void ?{}(ThreadBenaphore & this) {}119 static inline void ?{}(ThreadBenaphore & this, zero_t) { (this.ben){ 0 }; }120 static inline void ?{}(ThreadBenaphore & this, one_t ) { (this.ben){ 1 }; }121 122 static inline bool P(ThreadBenaphore & this) { return P(this.ben) ? false : P(this.sem); }123 static inline bool tryP(ThreadBenaphore & this) { return tryP(this.ben); }124 static inline bool P(ThreadBenaphore & this, bool wait) { return wait ? P(this) : tryP(this); }125 126 static inline thread$ * V(ThreadBenaphore & this, bool doUnpark = true) {127 if (V(this.ben)) return 0p;128 return V(this.sem, doUnpark);129 }130 31 131 32 //----------------------------------------------------------------------------- … … 171 72 static inline void on_wakeup( owner_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); } 172 73 static inline void on_notify( owner_lock & this, struct thread$ * t ) { on_notify( (blocking_lock &)this, t ); } 173 174 struct fast_lock {175 thread$ * volatile owner;176 ThreadBenaphore sem;177 };178 179 static inline void ?{}(fast_lock & this) __attribute__((deprecated("use linear_backoff_then_block_lock instead")));180 static inline void ?{}(fast_lock & this) { this.owner = 0p; }181 182 static inline bool $try_lock(fast_lock & this, thread$ * thrd) {183 thread$ * exp = 0p;184 return __atomic_compare_exchange_n(&this.owner, &exp, thrd, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED);185 }186 187 static inline void lock( fast_lock & this ) __attribute__((deprecated("use linear_backoff_then_block_lock instead"), artificial));188 static inline void lock( fast_lock & this ) {189 thread$ * thrd = active_thread();190 /* paranoid */verify(thrd != this.owner);191 192 for (;;) {193 if ($try_lock(this, thrd)) return;194 P(this.sem);195 }196 }197 198 static inline bool try_lock( fast_lock & this ) __attribute__((deprecated("use linear_backoff_then_block_lock instead"), artificial));199 static inline bool try_lock ( fast_lock & this ) {200 thread$ * thrd = active_thread();201 /* paranoid */ verify(thrd != this.owner);202 return $try_lock(this, thrd);203 }204 205 static inline thread$ * unlock( fast_lock & this ) __attribute__((deprecated("use linear_backoff_then_block_lock instead"), artificial));206 static inline thread$ * unlock( fast_lock & this ) {207 /* paranoid */ verify(active_thread() == this.owner);208 209 // open 'owner' before unlocking anyone210 // so new and unlocked threads don't park incorrectly.211 // This may require additional fencing on ARM.212 this.owner = 0p;213 214 return V(this.sem);215 }216 217 static inline size_t on_wait( fast_lock & this ) { unlock(this); return 0; }218 static inline void on_wakeup( fast_lock & this, size_t ) { lock(this); }219 static inline void on_notify( fast_lock &, struct thread$ * t ) { unpark(t); }220 74 221 75 struct mcs_node { -
libcfa/src/concurrency/ready_queue.cfa
rf57f6ea0 r97fed44 20 20 21 21 22 #define USE_RELAXED_FIFO22 // #define USE_RELAXED_FIFO 23 23 // #define USE_WORK_STEALING 24 24 // #define USE_CPU_WORK_STEALING 25 #define USE_AWARE_STEALING 25 26 26 27 #include "bits/defs.hfa" … … 29 30 30 31 #include "stdlib.hfa" 32 #include "limits.hfa" 31 33 #include "math.hfa" 32 34 … … 54 56 #endif 55 57 56 #if defined(USE_CPU_WORK_STEALING) 58 #if defined(USE_AWARE_STEALING) 59 #define READYQ_SHARD_FACTOR 2 60 #define SEQUENTIAL_SHARD 2 61 #elif defined(USE_CPU_WORK_STEALING) 57 62 #define READYQ_SHARD_FACTOR 2 58 63 #elif defined(USE_RELAXED_FIFO) … … 138 143 __kernel_rseq_register(); 139 144 140 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);141 145 bool * handle = (bool *)&kernelTLS().sched_lock; 142 146 … … 174 178 } 175 179 176 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);177 178 180 // Return new spot. 179 181 /* paranoid */ verify(n < ready); … … 190 192 191 193 __atomic_store_n(cell, 0p, __ATOMIC_RELEASE); 192 193 __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);194 194 195 195 __kernel_rseq_unregister(); … … 201 201 uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) { 202 202 /* paranoid */ verify( ! __preemption_enabled() ); 203 /* paranoid */ verify( ! kernelTLS().sched_lock );204 203 205 204 // Step 1 : lock global lock … … 207 206 // to simply lock their own lock and enter. 208 207 __atomic_acquire( &write_lock ); 208 209 // Make sure we won't deadlock ourself 210 // Checking before acquiring the writer lock isn't safe 211 // because someone else could have locked us. 212 /* paranoid */ verify( ! kernelTLS().sched_lock ); 209 213 210 214 // Step 2 : lock per-proc lock … … 244 248 245 249 //======================================================================= 250 // caches handling 251 252 struct __attribute__((aligned(128))) __ready_queue_caches_t { 253 // Count States: 254 // - 0 : No one is looking after this cache 255 // - 1 : No one is looking after this cache, BUT it's not empty 256 // - 2+ : At least one processor is looking after this cache 257 volatile unsigned count; 258 }; 259 260 void ?{}(__ready_queue_caches_t & this) { this.count = 0; } 261 void ^?{}(__ready_queue_caches_t & this) {} 262 263 static inline void depart(__ready_queue_caches_t & cache) { 264 /* paranoid */ verify( cache.count > 1); 265 __atomic_fetch_add(&cache.count, -1, __ATOMIC_SEQ_CST); 266 /* paranoid */ verify( cache.count != 0); 267 /* paranoid */ verify( cache.count < 65536 ); // This verify assumes no cluster will have more than 65000 kernel threads mapped to a single cache, which could be correct but is super weird. 268 } 269 270 static inline void arrive(__ready_queue_caches_t & cache) { 271 // for() { 272 // unsigned expected = cache.count; 273 // unsigned desired = 0 == expected ? 2 : expected + 1; 274 // } 275 } 276 277 //======================================================================= 246 278 // Cforall Ready Queue used for scheduling 247 279 //======================================================================= 248 unsigned long long moving_average(unsigned long long nval, unsigned long long oval) { 249 const unsigned long long tw = 16; 250 const unsigned long long nw = 4; 251 const unsigned long long ow = tw - nw; 252 return ((nw * nval) + (ow * oval)) / tw; 280 unsigned long long moving_average(unsigned long long currtsc, unsigned long long instsc, unsigned long long old_avg) { 281 /* paranoid */ verifyf( currtsc < 45000000000000000, "Suspiciously large current time: %'llu (%llx)\n", currtsc, currtsc ); 282 /* paranoid */ verifyf( instsc < 45000000000000000, "Suspiciously large insert time: %'llu (%llx)\n", instsc, instsc ); 283 /* paranoid */ verifyf( old_avg < 15000000000000, "Suspiciously large previous average: %'llu (%llx)\n", old_avg, old_avg ); 284 285 const unsigned long long new_val = currtsc > instsc ? currtsc - instsc : 0; 286 const unsigned long long total_weight = 16; 287 const unsigned long long new_weight = 4; 288 const unsigned long long old_weight = total_weight - new_weight; 289 const unsigned long long ret = ((new_weight * new_val) + (old_weight * old_avg)) / total_weight; 290 return ret; 253 291 } 254 292 … … 271 309 } 272 310 #else 273 lanes.data = 0p; 274 lanes.tscs = 0p; 275 lanes.help = 0p; 276 lanes.count = 0; 311 lanes.data = 0p; 312 lanes.tscs = 0p; 313 lanes.caches = 0p; 314 lanes.help = 0p; 315 lanes.count = 0; 277 316 #endif 278 317 } … … 285 324 free(lanes.data); 286 325 free(lanes.tscs); 326 free(lanes.caches); 287 327 free(lanes.help); 288 328 } 289 329 290 330 //----------------------------------------------------------------------- 331 #if defined(USE_AWARE_STEALING) 332 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 333 processor * const proc = kernelTLS().this_processor; 334 const bool external = (!proc) || (cltr != proc->cltr); 335 const bool remote = hint == UNPARK_REMOTE; 336 337 unsigned i; 338 if( external || remote ) { 339 // Figure out where thread was last time and make sure it's valid 340 /* paranoid */ verify(thrd->preferred >= 0); 341 if(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count) { 342 /* paranoid */ verify(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count); 343 unsigned start = thrd->preferred * READYQ_SHARD_FACTOR; 344 do { 345 unsigned r = __tls_rand(); 346 i = start + (r % READYQ_SHARD_FACTOR); 347 /* paranoid */ verify( i < lanes.count ); 348 // If we can't lock it retry 349 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 350 } else { 351 do { 352 i = __tls_rand() % lanes.count; 353 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 354 } 355 } else { 356 do { 357 unsigned r = proc->rdq.its++; 358 i = proc->rdq.id + (r % READYQ_SHARD_FACTOR); 359 /* paranoid */ verify( i < lanes.count ); 360 // If we can't lock it retry 361 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 362 } 363 364 // Actually push it 365 push(lanes.data[i], thrd); 366 367 // Unlock and return 368 __atomic_unlock( &lanes.data[i].lock ); 369 370 #if !defined(__CFA_NO_STATISTICS__) 371 if(unlikely(external || remote)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED); 372 else __tls_stats()->ready.push.local.success++; 373 #endif 374 } 375 376 static inline unsigned long long calc_cutoff(const unsigned long long ctsc, const processor * proc, __ready_queue_t & rdq) { 377 unsigned start = proc->rdq.id; 378 unsigned long long max = 0; 379 for(i; READYQ_SHARD_FACTOR) { 380 unsigned long long ptsc = ts(rdq.lanes.data[start + i]); 381 if(ptsc != -1ull) { 382 /* paranoid */ verify( start + i < rdq.lanes.count ); 383 unsigned long long tsc = moving_average(ctsc, ptsc, rdq.lanes.tscs[start + i].ma); 384 if(tsc > max) max = tsc; 385 } 386 } 387 return (max + 2 * max) / 2; 388 } 389 390 __attribute__((hot)) struct thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 391 /* paranoid */ verify( lanes.count > 0 ); 392 /* paranoid */ verify( kernelTLS().this_processor ); 393 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count ); 394 395 processor * const proc = kernelTLS().this_processor; 396 unsigned this = proc->rdq.id; 397 /* paranoid */ verify( this < lanes.count ); 398 __cfadbg_print_safe(ready_queue, "Kernel : pop from %u\n", this); 399 400 // Figure out the current cpu and make sure it is valid 401 const int cpu = __kernel_getcpu(); 402 /* paranoid */ verify(cpu >= 0); 403 /* paranoid */ verify(cpu < cpu_info.hthrd_count); 404 unsigned this_cache = cpu_info.llc_map[cpu].cache; 405 406 // Super important: don't write the same value over and over again 407 // We want to maximise our chances that his particular values stays in cache 408 if(lanes.caches[this / READYQ_SHARD_FACTOR].id != this_cache) 409 __atomic_store_n(&lanes.caches[this / READYQ_SHARD_FACTOR].id, this_cache, __ATOMIC_RELAXED); 410 411 const unsigned long long ctsc = rdtscl(); 412 413 if(proc->rdq.target == MAX) { 414 uint64_t chaos = __tls_rand(); 415 unsigned ext = chaos & 0xff; 416 unsigned other = (chaos >> 8) % (lanes.count); 417 418 if(ext < 3 || __atomic_load_n(&lanes.caches[other / READYQ_SHARD_FACTOR].id, __ATOMIC_RELAXED) == this_cache) { 419 proc->rdq.target = other; 420 } 421 } 422 else { 423 const unsigned target = proc->rdq.target; 424 __cfadbg_print_safe(ready_queue, "Kernel : %u considering helping %u, tcsc %llu\n", this, target, lanes.tscs[target].tv); 425 /* paranoid */ verify( lanes.tscs[target].tv != MAX ); 426 if(target < lanes.count) { 427 const unsigned long long cutoff = calc_cutoff(ctsc, proc, cltr->ready_queue); 428 const unsigned long long age = moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma); 429 __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no"); 430 if(age > cutoff) { 431 thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 432 if(t) return t; 433 } 434 } 435 proc->rdq.target = MAX; 436 } 437 438 for(READYQ_SHARD_FACTOR) { 439 unsigned i = this + (proc->rdq.itr++ % READYQ_SHARD_FACTOR); 440 if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t; 441 } 442 443 // All lanes where empty return 0p 444 return 0p; 445 446 } 447 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) { 448 unsigned i = __tls_rand() % lanes.count; 449 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal)); 450 } 451 __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) { 452 return search(cltr); 453 } 454 #endif 291 455 #if defined(USE_CPU_WORK_STEALING) 292 456 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { … … 350 514 /* paranoid */ verify( kernelTLS().this_processor ); 351 515 516 processor * const proc = kernelTLS().this_processor; 352 517 const int cpu = __kernel_getcpu(); 353 518 /* paranoid */ verify(cpu >= 0); … … 360 525 /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %zu lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR); 361 526 362 processor * const proc = kernelTLS().this_processor;363 527 const int start = map.self * READYQ_SHARD_FACTOR; 364 528 const unsigned long long ctsc = rdtscl(); 365 529 366 530 // Did we already have a help target 367 if(proc->rdq.target == -1u) {531 if(proc->rdq.target == MAX) { 368 532 unsigned long long max = 0; 369 533 for(i; READYQ_SHARD_FACTOR) { 370 unsigned long long tsc = moving_average(ctsc -ts(lanes.data[start + i]), lanes.tscs[start + i].ma);534 unsigned long long tsc = moving_average(ctsc, ts(lanes.data[start + i]), lanes.tscs[start + i].ma); 371 535 if(tsc > max) max = tsc; 372 536 } 373 proc->rdq.cutoff = (max + 2 * max) / 2;537 // proc->rdq.cutoff = (max + 2 * max) / 2; 374 538 /* paranoid */ verify(lanes.count < 65536); // The following code assumes max 65536 cores. 375 539 /* paranoid */ verify(map.count < 65536); // The following code assumes max 65536 cores. … … 384 548 } 385 549 386 /* paranoid */ verify(proc->rdq.target != -1u);550 /* paranoid */ verify(proc->rdq.target != MAX); 387 551 } 388 552 else { 389 553 unsigned long long max = 0; 390 554 for(i; READYQ_SHARD_FACTOR) { 391 unsigned long long tsc = moving_average(ctsc -ts(lanes.data[start + i]), lanes.tscs[start + i].ma);555 unsigned long long tsc = moving_average(ctsc, ts(lanes.data[start + i]), lanes.tscs[start + i].ma); 392 556 if(tsc > max) max = tsc; 393 557 } … … 395 559 { 396 560 unsigned target = proc->rdq.target; 397 proc->rdq.target = -1u;561 proc->rdq.target = MAX; 398 562 lanes.help[target / READYQ_SHARD_FACTOR].tri++; 399 if(moving_average(ctsc -lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) {563 if(moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) { 400 564 thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 401 565 proc->rdq.last = target; 402 566 if(t) return t; 403 else proc->rdq.target = -1u;404 567 } 405 else proc->rdq.target = -1u;568 proc->rdq.target = MAX; 406 569 } 407 570 408 571 unsigned last = proc->rdq.last; 409 if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) <cutoff) {572 if(last != MAX && moving_average(ctsc, lanes.tscs[last].tv, lanes.tscs[last].ma) > cutoff) { 410 573 thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help)); 411 574 if(t) return t; 412 575 } 413 576 else { 414 proc->rdq.last = -1u;577 proc->rdq.last = MAX; 415 578 } 416 579 } … … 428 591 processor * const proc = kernelTLS().this_processor; 429 592 unsigned last = proc->rdq.last; 430 if(last != -1u) {593 if(last != MAX) { 431 594 struct thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.steal)); 432 595 if(t) return t; 433 proc->rdq.last = -1u;596 proc->rdq.last = MAX; 434 597 } 435 598 … … 560 723 #else 561 724 unsigned preferred = thrd->preferred; 562 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == -1u|| thrd->curr_cluster != cltr;725 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == MAX || thrd->curr_cluster != cltr; 563 726 /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count ); 564 727 … … 612 775 processor * proc = kernelTLS().this_processor; 613 776 614 if(proc->rdq.target == -1u) {777 if(proc->rdq.target == MAX) { 615 778 unsigned long long min = ts(lanes.data[proc->rdq.id]); 616 779 for(int i = 0; i < READYQ_SHARD_FACTOR; i++) { … … 623 786 else { 624 787 unsigned target = proc->rdq.target; 625 proc->rdq.target = -1u;788 proc->rdq.target = MAX; 626 789 const unsigned long long bias = 0; //2_500_000_000; 627 790 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff; … … 658 821 // try to pop from a lane given by index w 659 822 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) { 823 /* paranoid */ verify( w < lanes.count ); 660 824 __STATS( stats.attempt++; ) 661 825 … … 681 845 // Actually pop the list 682 846 struct thread$ * thrd; 683 #if defined(USE_ WORK_STEALING) || defined(USE_CPU_WORK_STEALING)847 #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING) 684 848 unsigned long long tsc_before = ts(lane); 685 849 #endif … … 697 861 __STATS( stats.success++; ) 698 862 699 #if defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING) 700 unsigned long long now = rdtscl(); 701 lanes.tscs[w].tv = tsv; 702 lanes.tscs[w].ma = moving_average(now > tsc_before ? now - tsc_before : 0, lanes.tscs[w].ma); 863 #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING) 864 if (tsv != MAX) { 865 unsigned long long now = rdtscl(); 866 unsigned long long pma = __atomic_load_n(&lanes.tscs[w].ma, __ATOMIC_RELAXED); 867 __atomic_store_n(&lanes.tscs[w].tv, tsv, __ATOMIC_RELAXED); 868 __atomic_store_n(&lanes.tscs[w].ma, moving_average(now, tsc_before, pma), __ATOMIC_RELAXED); 869 } 703 870 #endif 704 871 705 #if defined(USE_ CPU_WORK_STEALING)872 #if defined(USE_AWARE_STEALING) || defined(USE_CPU_WORK_STEALING) 706 873 thrd->preferred = w / READYQ_SHARD_FACTOR; 707 874 #else … … 802 969 /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count); 803 970 it->rdq.id = value; 804 it->rdq.target = -1u;971 it->rdq.target = MAX; 805 972 value += READYQ_SHARD_FACTOR; 806 973 it = &(*it)`next; … … 815 982 816 983 static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) { 817 #if defined(USE_ WORK_STEALING)984 #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) 818 985 lanes.tscs = alloc(lanes.count, lanes.tscs`realloc); 819 986 for(i; lanes.count) { 820 unsigned long long tsc1 = ts(lanes.data[i]); 821 unsigned long long tsc2 = rdtscl(); 822 lanes.tscs[i].tv = min(tsc1, tsc2); 987 lanes.tscs[i].tv = rdtscl(); 988 lanes.tscs[i].ma = 0; 823 989 } 824 990 #endif … … 866 1032 // Update original 867 1033 lanes.count = ncount; 1034 1035 lanes.caches = alloc( target, lanes.caches`realloc ); 868 1036 } 869 1037 … … 942 1110 fix(lanes.data[idx]); 943 1111 } 1112 1113 lanes.caches = alloc( target, lanes.caches`realloc ); 944 1114 } 945 1115 946 1116 fix_times(cltr); 1117 947 1118 948 1119 reassign_cltr_id(cltr); -
libcfa/src/concurrency/stats.cfa
rf57f6ea0 r97fed44 31 31 stats->ready.sleep.halts = 0; 32 32 stats->ready.sleep.cancels = 0; 33 stats->ready.sleep.early = 0; 33 34 stats->ready.sleep.wakes = 0; 35 stats->ready.sleep.seen = 0; 34 36 stats->ready.sleep.exits = 0; 35 37 … … 91 93 tally_one( &cltr->ready.sleep.halts , &proc->ready.sleep.halts ); 92 94 tally_one( &cltr->ready.sleep.cancels , &proc->ready.sleep.cancels ); 95 tally_one( &cltr->ready.sleep.early , &proc->ready.sleep.early ); 93 96 tally_one( &cltr->ready.sleep.wakes , &proc->ready.sleep.wakes ); 97 tally_one( &cltr->ready.sleep.seen , &proc->ready.sleep.wakes ); 94 98 tally_one( &cltr->ready.sleep.exits , &proc->ready.sleep.exits ); 95 99 … … 153 157 | " (" | eng3(ready.pop.search.attempt) | " try)"; 154 158 155 sstr | "- Idle Slp : " | eng3(ready.sleep.halts) | "halt," | eng3(ready.sleep.cancels) | "cancel," | eng3(ready.sleep.wakes) | "wake," | eng3(ready.sleep.exits) | "exit"; 159 sstr | "- Idle Slp : " | eng3(ready.sleep.halts) | "halt," | eng3(ready.sleep.cancels) | "cancel," 160 | eng3(ready.sleep.wakes + ready.sleep.early) | '(' | eng3(ready.sleep.early) | ',' | eng3(ready.sleep.seen) | ')' | " wake(early, seen)," 161 | eng3(ready.sleep.exits) | "exit"; 156 162 sstr | nl; 157 163 } -
libcfa/src/concurrency/stats.hfa
rf57f6ea0 r97fed44 69 69 volatile uint64_t halts; 70 70 volatile uint64_t cancels; 71 volatile uint64_t early; 71 72 volatile uint64_t wakes; 73 volatile uint64_t seen; 72 74 volatile uint64_t exits; 73 75 } sleep; -
libcfa/src/concurrency/thread.cfa
rf57f6ea0 r97fed44 10 10 // Created On : Tue Jan 17 12:27:26 2017 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Wed Jan 12 18:46:48202213 // Update Count : 3612 // Last Modified On : Thu Jan 13 20:11:55 2022 13 // Update Count : 42 14 14 // 15 15 … … 24 24 #define __CFA_INVOKE_PRIVATE__ 25 25 #include "invoke.h" 26 27 uint64_t thread_rand();28 26 29 27 extern uint32_t __global_random_seed; … … 174 172 } 175 173 176 uint64_t thread_rand() { 177 disable_interrupts(); 178 uint64_t ret = __tls_rand(); 179 enable_interrupts(); 180 return ret; 181 } 182 174 //----------------------------------------------------------------------------- 183 175 #define GENERATOR LCG 184 185 static inline uint32_t MarsagliaXor( uint32_t & state ) {186 uint32_t ret = state;187 state ^= state << 6;188 state ^= state >> 21;189 state ^= state << 7;190 return ret;191 } // MarsagliaXor192 193 static inline uint32_t LCG( uint32_t & state ) { // linear congruential generator194 uint32_t ret = state;195 state = 36969 * (state & 65535) + (state >> 16); // 36969 is NOT prime! No not change it!196 return ret;197 } // LCG198 176 199 177 void set_seed( uint32_t seed ) {
Note:
See TracChangeset
for help on using the changeset viewer.