Changes in / [c7f2d9b:623d1c8]
- Location:
- libcfa/src
- Files:
-
- 1 deleted
- 8 edited
-
Makefile.am (modified) (1 diff)
-
concurrency/io/setup.cfa (modified) (2 diffs)
-
concurrency/kernel.cfa (modified) (3 diffs)
-
concurrency/kernel.hfa (modified) (3 diffs)
-
concurrency/kernel/cluster.cfa (deleted)
-
concurrency/kernel/startup.cfa (modified) (3 diffs)
-
concurrency/kernel_private.hfa (modified) (1 diff)
-
concurrency/ready_queue.cfa (modified) (8 diffs)
-
concurrency/ready_subqueue.hfa (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/Makefile.am
rc7f2d9b r623d1c8 134 134 concurrency/io/call.cfa \ 135 135 concurrency/iofwd.hfa \ 136 concurrency/kernel/cluster.cfa \137 136 concurrency/kernel_private.hfa \ 138 137 concurrency/kernel/startup.cfa \ -
libcfa/src/concurrency/io/setup.cfa
rc7f2d9b r623d1c8 39 39 40 40 #else 41 #pragma GCC diagnostic push42 #pragma GCC diagnostic ignored "-Waddress-of-packed-member"43 41 #include <errno.h> 44 42 #include <stdint.h> … … 61 59 #include "kernel_private.hfa" 62 60 #include "thread.hfa" 63 #pragma GCC diagnostic pop64 61 65 62 void ?{}(io_context_params & this) { -
libcfa/src/concurrency/kernel.cfa
rc7f2d9b r623d1c8 19 19 // #define __CFA_DEBUG_PRINT_RUNTIME_CORE__ 20 20 21 #pragma GCC diagnostic push22 #pragma GCC diagnostic ignored "-Waddress-of-packed-member"23 24 21 //C Includes 25 22 #include <errno.h> … … 28 25 #include <signal.h> 29 26 #include <unistd.h> 30 31 27 extern "C" { 32 28 #include <sys/eventfd.h> … … 44 40 #define __CFA_INVOKE_PRIVATE__ 45 41 #include "invoke.h" 46 #pragma GCC diagnostic pop47 42 48 43 #if !defined(__CFA_NO_STATISTICS__) -
libcfa/src/concurrency/kernel.hfa
rc7f2d9b r623d1c8 155 155 void ^?{}(__intrusive_lane_t & this); 156 156 157 // Aligned timestamps which are used by the re ady queue and io subsystem157 // Aligned timestamps which are used by the relaxed ready queue 158 158 struct __attribute__((aligned(128))) __timestamp_t { 159 159 volatile unsigned long long tv; … … 161 161 }; 162 162 163 struct __attribute__((aligned(16))) __cache_id_t { 164 volatile unsigned id; 165 }; 166 167 // Aligned timestamps which are used by the relaxed ready queue 168 struct __attribute__((aligned(128))) __help_cnts_t { 169 volatile unsigned long long src; 170 volatile unsigned long long dst; 171 volatile unsigned long long tri; 172 }; 173 163 174 static inline void ?{}(__timestamp_t & this) { this.tv = 0; this.ma = 0; } 164 175 static inline void ^?{}(__timestamp_t &) {} 165 176 166 167 struct __attribute__((aligned(16))) __cache_id_t { 168 volatile unsigned id; 169 }; 170 171 // //TODO adjust cache size to ARCHITECTURE 172 // // Structure holding the ready queue 173 // struct __ready_queue_t { 174 // // Data tracking the actual lanes 175 // // On a seperate cacheline from the used struct since 176 // // used can change on each push/pop but this data 177 // // only changes on shrink/grow 178 // struct { 179 // // Arary of lanes 180 // __intrusive_lane_t * volatile data; 181 182 // __cache_id_t * volatile caches; 183 184 // // Number of lanes (empty or not) 185 // volatile size_t count; 186 // } lanes; 187 // }; 188 189 // void ?{}(__ready_queue_t & this); 190 // void ^?{}(__ready_queue_t & this); 177 struct __attribute__((aligned(128))) __ready_queue_caches_t; 178 void ?{}(__ready_queue_caches_t & this); 179 void ^?{}(__ready_queue_caches_t & this); 180 181 //TODO adjust cache size to ARCHITECTURE 182 // Structure holding the ready queue 183 struct __ready_queue_t { 184 // Data tracking the actual lanes 185 // On a seperate cacheline from the used struct since 186 // used can change on each push/pop but this data 187 // only changes on shrink/grow 188 struct { 189 // Arary of lanes 190 __intrusive_lane_t * volatile data; 191 192 // Array of times 193 __timestamp_t * volatile tscs; 194 195 __cache_id_t * volatile caches; 196 197 // Array of stats 198 __help_cnts_t * volatile help; 199 200 // Number of lanes (empty or not) 201 volatile size_t count; 202 } lanes; 203 }; 204 205 void ?{}(__ready_queue_t & this); 206 void ^?{}(__ready_queue_t & this); 207 #if !defined(__CFA_NO_STATISTICS__) 208 unsigned cnt(const __ready_queue_t & this, unsigned idx); 209 #endif 191 210 192 211 // Idle Sleep … … 214 233 // Cluster 215 234 struct __attribute__((aligned(128))) cluster { 216 struct { 217 struct { 218 // Arary of subqueues 219 __intrusive_lane_t * volatile data; 220 221 // Time since subqueues were processed 222 __timestamp_t * volatile tscs; 223 224 // Number of subqueue / timestamps 225 size_t count; 226 } readyQ; 227 228 struct { 229 // Number of I/O subqueues 230 volatile size_t count; 231 232 // Time since subqueues were processed 233 __timestamp_t * volatile tscs; 234 } io; 235 236 // Cache each kernel thread belongs to 237 __cache_id_t * volatile caches; 238 } sched; 239 240 // // Ready queue for threads 241 // __ready_queue_t ready_queue; 235 // Ready queue for threads 236 __ready_queue_t ready_queue; 242 237 243 238 // Name of the cluster -
libcfa/src/concurrency/kernel/startup.cfa
rc7f2d9b r623d1c8 515 515 this.rdq.its = 0; 516 516 this.rdq.itr = 0; 517 this.rdq.id = 0;517 this.rdq.id = MAX; 518 518 this.rdq.target = MAX; 519 519 this.rdq.last = MAX; … … 605 605 this.name = name; 606 606 this.preemption_rate = preemption_rate; 607 this.sched.readyQ.data = 0p; 608 this.sched.readyQ.tscs = 0p; 609 this.sched.readyQ.count = 0; 610 this.sched.io.tscs = 0p; 611 this.sched.caches = 0p; 607 ready_queue{}; 612 608 613 609 #if !defined(__CFA_NO_STATISTICS__) … … 648 644 // Unlock the RWlock 649 645 ready_mutate_unlock( last_size ); 650 651 ready_queue_close( &this );652 /* paranoid */ verify( this.sched.readyQ.data == 0p );653 /* paranoid */ verify( this.sched.readyQ.tscs == 0p );654 /* paranoid */ verify( this.sched.readyQ.count == 0 );655 /* paranoid */ verify( this.sched.io.tscs == 0p );656 /* paranoid */ verify( this.sched.caches == 0p );657 658 646 enable_interrupts( false ); // Don't poll, could be in main cluster 659 660 647 661 648 #if !defined(__CFA_NO_STATISTICS__) -
libcfa/src/concurrency/kernel_private.hfa
rc7f2d9b r623d1c8 365 365 void ready_queue_shrink(struct cluster * cltr); 366 366 367 //-----------------------------------------------------------------------368 // Decrease the width of the ready queue (number of lanes) by 4369 void ready_queue_close(struct cluster * cltr);370 371 //-----------------------------------------------------------------------372 // Calc moving average based on existing average, before and current time.373 static inline unsigned long long moving_average(unsigned long long currtsc, unsigned long long instsc, unsigned long long old_avg) {374 /* paranoid */ verifyf( currtsc < 45000000000000000, "Suspiciously large current time: %'llu (%llx)\n", currtsc, currtsc );375 /* paranoid */ verifyf( instsc < 45000000000000000, "Suspiciously large insert time: %'llu (%llx)\n", instsc, instsc );376 /* paranoid */ verifyf( old_avg < 15000000000000, "Suspiciously large previous average: %'llu (%llx)\n", old_avg, old_avg );377 378 const unsigned long long new_val = currtsc > instsc ? currtsc - instsc : 0;379 const unsigned long long total_weight = 16;380 const unsigned long long new_weight = 4;381 const unsigned long long old_weight = total_weight - new_weight;382 const unsigned long long ret = ((new_weight * new_val) + (old_weight * old_avg)) / total_weight;383 return ret;384 }385 386 //-----------------------------------------------------------------------387 // Calc age a timestamp should be before needing help.388 forall(Data_t * | { unsigned long long ts(Data_t & this); })389 static inline unsigned long long calc_cutoff(390 const unsigned long long ctsc,391 const processor * proc,392 size_t count,393 Data_t * data,394 __timestamp_t * tscs,395 const unsigned shard_factor396 ) {397 unsigned start = proc->rdq.id;398 unsigned long long max = 0;399 for(i; shard_factor) {400 unsigned long long ptsc = ts(data[start + i]);401 if(ptsc != -1ull) {402 /* paranoid */ verify( start + i < count );403 unsigned long long tsc = moving_average(ctsc, ptsc, tscs[start + i].ma);404 if(tsc > max) max = tsc;405 }406 }407 return (max + 2 * max) / 2;408 }409 410 static struct {411 const unsigned readyq;412 } __shard_factor __attribute__((unused)) = { 2 };413 367 414 368 // Local Variables: // -
libcfa/src/concurrency/ready_queue.cfa
rc7f2d9b r623d1c8 20 20 21 21 22 // #define USE_RELAXED_FIFO 23 // #define USE_WORK_STEALING 24 // #define USE_CPU_WORK_STEALING 22 25 #define USE_AWARE_STEALING 23 26 … … 26 29 #include "kernel_private.hfa" 27 30 31 #include "stdlib.hfa" 28 32 #include "limits.hfa" 29 30 // #include <errno.h> 31 // #include <unistd.h> 33 #include "math.hfa" 34 35 #include <errno.h> 36 #include <unistd.h> 37 38 extern "C" { 39 #include <sys/syscall.h> // __NR_xxx 40 } 32 41 33 42 #include "ready_subqueue.hfa" … … 41 50 #endif 42 51 52 // No overriden function, no environment variable, no define 53 // fall back to a magic number 54 #ifndef __CFA_MAX_PROCESSORS__ 55 #define __CFA_MAX_PROCESSORS__ 1024 56 #endif 57 58 #if defined(USE_AWARE_STEALING) 59 #define READYQ_SHARD_FACTOR 2 60 #define SEQUENTIAL_SHARD 2 61 #elif defined(USE_CPU_WORK_STEALING) 62 #define READYQ_SHARD_FACTOR 2 63 #elif defined(USE_RELAXED_FIFO) 64 #define BIAS 4 65 #define READYQ_SHARD_FACTOR 4 66 #define SEQUENTIAL_SHARD 1 67 #elif defined(USE_WORK_STEALING) 68 #define READYQ_SHARD_FACTOR 2 69 #define SEQUENTIAL_SHARD 2 70 #else 71 #error no scheduling strategy selected 72 #endif 73 43 74 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)); 44 75 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)); 45 76 static inline struct thread$ * search(struct cluster * cltr); 77 static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred); 78 79 80 // returns the maximum number of processors the RWLock support 81 __attribute__((weak)) unsigned __max_processors() { 82 const char * max_cores_s = getenv("CFA_MAX_PROCESSORS"); 83 if(!max_cores_s) { 84 __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n"); 85 return __CFA_MAX_PROCESSORS__; 86 } 87 88 char * endptr = 0p; 89 long int max_cores_l = strtol(max_cores_s, &endptr, 10); 90 if(max_cores_l < 1 || max_cores_l > 65535) { 91 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l); 92 return __CFA_MAX_PROCESSORS__; 93 } 94 if('\0' != *endptr) { 95 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s); 96 return __CFA_MAX_PROCESSORS__; 97 } 98 99 return max_cores_l; 100 } 101 102 #if defined(CFA_HAVE_LINUX_LIBRSEQ) 103 // No forward declaration needed 104 #define __kernel_rseq_register rseq_register_current_thread 105 #define __kernel_rseq_unregister rseq_unregister_current_thread 106 #elif defined(CFA_HAVE_LINUX_RSEQ_H) 107 static void __kernel_raw_rseq_register (void); 108 static void __kernel_raw_rseq_unregister(void); 109 110 #define __kernel_rseq_register __kernel_raw_rseq_register 111 #define __kernel_rseq_unregister __kernel_raw_rseq_unregister 112 #else 113 // No forward declaration needed 114 // No initialization needed 115 static inline void noop(void) {} 116 117 #define __kernel_rseq_register noop 118 #define __kernel_rseq_unregister noop 119 #endif 120 121 //======================================================================= 122 // Cluster wide reader-writer lock 123 //======================================================================= 124 void ?{}(__scheduler_RWLock_t & this) { 125 this.max = __max_processors(); 126 this.alloc = 0; 127 this.ready = 0; 128 this.data = alloc(this.max); 129 this.write_lock = false; 130 131 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc)); 132 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready)); 133 134 } 135 void ^?{}(__scheduler_RWLock_t & this) { 136 free(this.data); 137 } 138 139 140 //======================================================================= 141 // Lock-Free registering/unregistering of threads 142 unsigned register_proc_id( void ) with(*__scheduler_lock) { 143 __kernel_rseq_register(); 144 145 bool * handle = (bool *)&kernelTLS().sched_lock; 146 147 // Step - 1 : check if there is already space in the data 148 uint_fast32_t s = ready; 149 150 // Check among all the ready 151 for(uint_fast32_t i = 0; i < s; i++) { 152 bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems 153 /* paranoid */ verify( handle != *cell ); 154 155 bool * null = 0p; // Re-write every loop since compare thrashes it 156 if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null 157 && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 158 /* paranoid */ verify(i < ready); 159 /* paranoid */ verify( (kernelTLS().sched_id = i, true) ); 160 return i; 161 } 162 } 163 164 if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock->max); 165 166 // Step - 2 : F&A to get a new spot in the array. 167 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST); 168 if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock->max); 169 170 // Step - 3 : Mark space as used and then publish it. 171 data[n] = handle; 172 while() { 173 unsigned copy = n; 174 if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n 175 && __atomic_compare_exchange_n(&ready, ©, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) 176 break; 177 Pause(); 178 } 179 180 // Return new spot. 181 /* paranoid */ verify(n < ready); 182 /* paranoid */ verify( (kernelTLS().sched_id = n, true) ); 183 return n; 184 } 185 186 void unregister_proc_id( unsigned id ) with(*__scheduler_lock) { 187 /* paranoid */ verify(id < ready); 188 /* paranoid */ verify(id == kernelTLS().sched_id); 189 /* paranoid */ verify(data[id] == &kernelTLS().sched_lock); 190 191 bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems 192 193 __atomic_store_n(cell, 0p, __ATOMIC_RELEASE); 194 195 __kernel_rseq_unregister(); 196 } 197 198 //----------------------------------------------------------------------- 199 // Writer side : acquire when changing the ready queue, e.g. adding more 200 // queues or removing them. 201 uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) { 202 /* paranoid */ verify( ! __preemption_enabled() ); 203 204 // Step 1 : lock global lock 205 // It is needed to avoid processors that register mid Critical-Section 206 // to simply lock their own lock and enter. 207 __atomic_acquire( &write_lock ); 208 209 // Make sure we won't deadlock ourself 210 // Checking before acquiring the writer lock isn't safe 211 // because someone else could have locked us. 212 /* paranoid */ verify( ! kernelTLS().sched_lock ); 213 214 // Step 2 : lock per-proc lock 215 // Processors that are currently being registered aren't counted 216 // but can't be in read_lock or in the critical section. 217 // All other processors are counted 218 uint_fast32_t s = ready; 219 for(uint_fast32_t i = 0; i < s; i++) { 220 volatile bool * llock = data[i]; 221 if(llock) __atomic_acquire( llock ); 222 } 223 224 /* paranoid */ verify( ! __preemption_enabled() ); 225 return s; 226 } 227 228 void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) { 229 /* paranoid */ verify( ! __preemption_enabled() ); 230 231 // Step 1 : release local locks 232 // This must be done while the global lock is held to avoid 233 // threads that where created mid critical section 234 // to race to lock their local locks and have the writer 235 // immidiately unlock them 236 // Alternative solution : return s in write_lock and pass it to write_unlock 237 for(uint_fast32_t i = 0; i < last_s; i++) { 238 volatile bool * llock = data[i]; 239 if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE); 240 } 241 242 // Step 2 : release global lock 243 /*paranoid*/ assert(true == write_lock); 244 __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE); 245 246 /* paranoid */ verify( ! __preemption_enabled() ); 247 } 248 249 //======================================================================= 250 // caches handling 251 252 struct __attribute__((aligned(128))) __ready_queue_caches_t { 253 // Count States: 254 // - 0 : No one is looking after this cache 255 // - 1 : No one is looking after this cache, BUT it's not empty 256 // - 2+ : At least one processor is looking after this cache 257 volatile unsigned count; 258 }; 259 260 void ?{}(__ready_queue_caches_t & this) { this.count = 0; } 261 void ^?{}(__ready_queue_caches_t & this) {} 262 263 static inline void depart(__ready_queue_caches_t & cache) { 264 /* paranoid */ verify( cache.count > 1); 265 __atomic_fetch_add(&cache.count, -1, __ATOMIC_SEQ_CST); 266 /* paranoid */ verify( cache.count != 0); 267 /* paranoid */ verify( cache.count < 65536 ); // This verify assumes no cluster will have more than 65000 kernel threads mapped to a single cache, which could be correct but is super weird. 268 } 269 270 static inline void arrive(__ready_queue_caches_t & cache) { 271 // for() { 272 // unsigned expected = cache.count; 273 // unsigned desired = 0 == expected ? 2 : expected + 1; 274 // } 275 } 46 276 47 277 //======================================================================= 48 278 // Cforall Ready Queue used for scheduling 49 279 //======================================================================= 50 // void ?{}(__ready_queue_t & this) with (this) { 51 // lanes.data = 0p; 52 // lanes.tscs = 0p; 53 // lanes.caches = 0p; 54 // lanes.count = 0; 55 // } 56 57 // void ^?{}(__ready_queue_t & this) with (this) { 58 // free(lanes.data); 59 // free(lanes.tscs); 60 // free(lanes.caches); 61 // } 280 unsigned long long moving_average(unsigned long long currtsc, unsigned long long instsc, unsigned long long old_avg) { 281 /* paranoid */ verifyf( currtsc < 45000000000000000, "Suspiciously large current time: %'llu (%llx)\n", currtsc, currtsc ); 282 /* paranoid */ verifyf( instsc < 45000000000000000, "Suspiciously large insert time: %'llu (%llx)\n", instsc, instsc ); 283 /* paranoid */ verifyf( old_avg < 15000000000000, "Suspiciously large previous average: %'llu (%llx)\n", old_avg, old_avg ); 284 285 const unsigned long long new_val = currtsc > instsc ? currtsc - instsc : 0; 286 const unsigned long long total_weight = 16; 287 const unsigned long long new_weight = 4; 288 const unsigned long long old_weight = total_weight - new_weight; 289 const unsigned long long ret = ((new_weight * new_val) + (old_weight * old_avg)) / total_weight; 290 return ret; 291 } 292 293 void ?{}(__ready_queue_t & this) with (this) { 294 #if defined(USE_CPU_WORK_STEALING) 295 lanes.count = cpu_info.hthrd_count * READYQ_SHARD_FACTOR; 296 lanes.data = alloc( lanes.count ); 297 lanes.tscs = alloc( lanes.count ); 298 lanes.help = alloc( cpu_info.hthrd_count ); 299 300 for( idx; (size_t)lanes.count ) { 301 (lanes.data[idx]){}; 302 lanes.tscs[idx].tv = rdtscl(); 303 lanes.tscs[idx].ma = rdtscl(); 304 } 305 for( idx; (size_t)cpu_info.hthrd_count ) { 306 lanes.help[idx].src = 0; 307 lanes.help[idx].dst = 0; 308 lanes.help[idx].tri = 0; 309 } 310 #else 311 lanes.data = 0p; 312 lanes.tscs = 0p; 313 lanes.caches = 0p; 314 lanes.help = 0p; 315 lanes.count = 0; 316 #endif 317 } 318 319 void ^?{}(__ready_queue_t & this) with (this) { 320 #if !defined(USE_CPU_WORK_STEALING) 321 verify( SEQUENTIAL_SHARD == lanes.count ); 322 #endif 323 324 free(lanes.data); 325 free(lanes.tscs); 326 free(lanes.caches); 327 free(lanes.help); 328 } 62 329 63 330 //----------------------------------------------------------------------- 64 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->sched) { 65 processor * const proc = kernelTLS().this_processor; 66 const bool external = (!proc) || (cltr != proc->cltr); 67 const bool remote = hint == UNPARK_REMOTE; 68 const size_t lanes_count = readyQ.count; 69 70 /* paranoid */ verify( __shard_factor.readyq > 0 ); 71 /* paranoid */ verify( lanes_count > 0 ); 72 73 unsigned i; 74 if( external || remote ) { 75 // Figure out where thread was last time and make sure it's valid 76 /* paranoid */ verify(thrd->preferred >= 0); 77 unsigned start = thrd->preferred * __shard_factor.readyq; 78 if(start < lanes_count) { 79 do { 80 unsigned r = __tls_rand(); 81 i = start + (r % __shard_factor.readyq); 82 /* paranoid */ verify( i < lanes_count ); 83 // If we can't lock it retry 84 } while( !__atomic_try_acquire( &readyQ.data[i].lock ) ); 331 #if defined(USE_AWARE_STEALING) 332 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 333 processor * const proc = kernelTLS().this_processor; 334 const bool external = (!proc) || (cltr != proc->cltr); 335 const bool remote = hint == UNPARK_REMOTE; 336 337 unsigned i; 338 if( external || remote ) { 339 // Figure out where thread was last time and make sure it's valid 340 /* paranoid */ verify(thrd->preferred >= 0); 341 if(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count) { 342 /* paranoid */ verify(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count); 343 unsigned start = thrd->preferred * READYQ_SHARD_FACTOR; 344 do { 345 unsigned r = __tls_rand(); 346 i = start + (r % READYQ_SHARD_FACTOR); 347 /* paranoid */ verify( i < lanes.count ); 348 // If we can't lock it retry 349 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 350 } else { 351 do { 352 i = __tls_rand() % lanes.count; 353 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 354 } 85 355 } else { 86 356 do { 87 i = __tls_rand() % lanes_count; 88 } while( !__atomic_try_acquire( &readyQ.data[i].lock ) ); 89 } 90 } else { 357 unsigned r = proc->rdq.its++; 358 i = proc->rdq.id + (r % READYQ_SHARD_FACTOR); 359 /* paranoid */ verify( i < lanes.count ); 360 // If we can't lock it retry 361 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 362 } 363 364 // Actually push it 365 push(lanes.data[i], thrd); 366 367 // Unlock and return 368 __atomic_unlock( &lanes.data[i].lock ); 369 370 #if !defined(__CFA_NO_STATISTICS__) 371 if(unlikely(external || remote)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED); 372 else __tls_stats()->ready.push.local.success++; 373 #endif 374 } 375 376 static inline unsigned long long calc_cutoff(const unsigned long long ctsc, const processor * proc, __ready_queue_t & rdq) { 377 unsigned start = proc->rdq.id; 378 unsigned long long max = 0; 379 for(i; READYQ_SHARD_FACTOR) { 380 unsigned long long ptsc = ts(rdq.lanes.data[start + i]); 381 if(ptsc != -1ull) { 382 /* paranoid */ verify( start + i < rdq.lanes.count ); 383 unsigned long long tsc = moving_average(ctsc, ptsc, rdq.lanes.tscs[start + i].ma); 384 if(tsc > max) max = tsc; 385 } 386 } 387 return (max + 2 * max) / 2; 388 } 389 390 __attribute__((hot)) struct thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 391 /* paranoid */ verify( lanes.count > 0 ); 392 /* paranoid */ verify( kernelTLS().this_processor ); 393 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count ); 394 395 processor * const proc = kernelTLS().this_processor; 396 unsigned this = proc->rdq.id; 397 /* paranoid */ verify( this < lanes.count ); 398 __cfadbg_print_safe(ready_queue, "Kernel : pop from %u\n", this); 399 400 // Figure out the current cpu and make sure it is valid 401 const int cpu = __kernel_getcpu(); 402 /* paranoid */ verify(cpu >= 0); 403 /* paranoid */ verify(cpu < cpu_info.hthrd_count); 404 unsigned this_cache = cpu_info.llc_map[cpu].cache; 405 406 // Super important: don't write the same value over and over again 407 // We want to maximise our chances that his particular values stays in cache 408 if(lanes.caches[this / READYQ_SHARD_FACTOR].id != this_cache) 409 __atomic_store_n(&lanes.caches[this / READYQ_SHARD_FACTOR].id, this_cache, __ATOMIC_RELAXED); 410 411 const unsigned long long ctsc = rdtscl(); 412 413 if(proc->rdq.target == MAX) { 414 uint64_t chaos = __tls_rand(); 415 unsigned ext = chaos & 0xff; 416 unsigned other = (chaos >> 8) % (lanes.count); 417 418 if(ext < 3 || __atomic_load_n(&lanes.caches[other / READYQ_SHARD_FACTOR].id, __ATOMIC_RELAXED) == this_cache) { 419 proc->rdq.target = other; 420 } 421 } 422 else { 423 const unsigned target = proc->rdq.target; 424 __cfadbg_print_safe(ready_queue, "Kernel : %u considering helping %u, tcsc %llu\n", this, target, lanes.tscs[target].tv); 425 /* paranoid */ verify( lanes.tscs[target].tv != MAX ); 426 if(target < lanes.count) { 427 const unsigned long long cutoff = calc_cutoff(ctsc, proc, cltr->ready_queue); 428 const unsigned long long age = moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma); 429 __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no"); 430 if(age > cutoff) { 431 thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 432 if(t) return t; 433 } 434 } 435 proc->rdq.target = MAX; 436 } 437 438 for(READYQ_SHARD_FACTOR) { 439 unsigned i = this + (proc->rdq.itr++ % READYQ_SHARD_FACTOR); 440 if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t; 441 } 442 443 // All lanes where empty return 0p 444 return 0p; 445 446 } 447 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) { 448 unsigned i = __tls_rand() % lanes.count; 449 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal)); 450 } 451 __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) { 452 return search(cltr); 453 } 454 #endif 455 #if defined(USE_CPU_WORK_STEALING) 456 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 457 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 458 459 processor * const proc = kernelTLS().this_processor; 460 const bool external = (!proc) || (cltr != proc->cltr); 461 462 // Figure out the current cpu and make sure it is valid 463 const int cpu = __kernel_getcpu(); 464 /* paranoid */ verify(cpu >= 0); 465 /* paranoid */ verify(cpu < cpu_info.hthrd_count); 466 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count); 467 468 // Figure out where thread was last time and make sure it's 469 /* paranoid */ verify(thrd->preferred >= 0); 470 /* paranoid */ verify(thrd->preferred < cpu_info.hthrd_count); 471 /* paranoid */ verify(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count); 472 const int prf = thrd->preferred * READYQ_SHARD_FACTOR; 473 474 const cpu_map_entry_t & map; 475 choose(hint) { 476 case UNPARK_LOCAL : &map = &cpu_info.llc_map[cpu]; 477 case UNPARK_REMOTE: &map = &cpu_info.llc_map[prf]; 478 } 479 /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count); 480 /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count); 481 /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %zu lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR); 482 483 const int start = map.self * READYQ_SHARD_FACTOR; 484 unsigned i; 91 485 do { 92 unsigned r = proc->rdq.its++; 93 i = proc->rdq.id + (r % __shard_factor.readyq); 94 /* paranoid */ verify( i < lanes_count ); 486 unsigned r; 487 if(unlikely(external)) { r = __tls_rand(); } 488 else { r = proc->rdq.its++; } 489 choose(hint) { 490 case UNPARK_LOCAL : i = start + (r % READYQ_SHARD_FACTOR); 491 case UNPARK_REMOTE: i = prf + (r % READYQ_SHARD_FACTOR); 492 } 95 493 // If we can't lock it retry 96 } while( !__atomic_try_acquire( &readyQ.data[i].lock ) ); 97 } 98 99 // Actually push it 100 push(readyQ.data[i], thrd); 101 102 // Unlock and return 103 __atomic_unlock( &readyQ.data[i].lock ); 104 105 #if !defined(__CFA_NO_STATISTICS__) 106 if(unlikely(external || remote)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED); 107 else __tls_stats()->ready.push.local.success++; 108 #endif 109 } 110 111 __attribute__((hot)) struct thread$ * pop_fast(struct cluster * cltr) with (cltr->sched) { 112 const size_t lanes_count = readyQ.count; 113 114 /* paranoid */ verify( __shard_factor.readyq > 0 ); 115 /* paranoid */ verify( lanes_count > 0 ); 116 /* paranoid */ verify( kernelTLS().this_processor ); 117 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes_count ); 118 119 processor * const proc = kernelTLS().this_processor; 120 unsigned this = proc->rdq.id; 121 /* paranoid */ verify( this < lanes_count ); 122 __cfadbg_print_safe(ready_queue, "Kernel : pop from %u\n", this); 123 124 // Figure out the current cpu and make sure it is valid 125 const int cpu = __kernel_getcpu(); 126 /* paranoid */ verify(cpu >= 0); 127 /* paranoid */ verify(cpu < cpu_info.hthrd_count); 128 unsigned this_cache = cpu_info.llc_map[cpu].cache; 129 130 // Super important: don't write the same value over and over again 131 // We want to maximise our chances that his particular values stays in cache 132 if(caches[this / __shard_factor.readyq].id != this_cache) 133 __atomic_store_n(&caches[this / __shard_factor.readyq].id, this_cache, __ATOMIC_RELAXED); 134 135 const unsigned long long ctsc = rdtscl(); 136 137 if(proc->rdq.target == MAX) { 138 uint64_t chaos = __tls_rand(); 139 unsigned ext = chaos & 0xff; 140 unsigned other = (chaos >> 8) % (lanes_count); 141 142 if(ext < 3 || __atomic_load_n(&caches[other / __shard_factor.readyq].id, __ATOMIC_RELAXED) == this_cache) { 143 proc->rdq.target = other; 144 } 145 } 146 else { 147 const unsigned target = proc->rdq.target; 148 __cfadbg_print_safe(ready_queue, "Kernel : %u considering helping %u, tcsc %llu\n", this, target, readyQ.tscs[target].tv); 149 /* paranoid */ verify( readyQ.tscs[target].tv != MAX ); 150 if(target < lanes_count) { 151 const unsigned long long cutoff = calc_cutoff(ctsc, proc, lanes_count, cltr->sched.readyQ.data, cltr->sched.readyQ.tscs, __shard_factor.readyq); 152 const unsigned long long age = moving_average(ctsc, readyQ.tscs[target].tv, readyQ.tscs[target].ma); 153 __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no"); 154 if(age > cutoff) { 494 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 495 496 // Actually push it 497 push(lanes.data[i], thrd); 498 499 // Unlock and return 500 __atomic_unlock( &lanes.data[i].lock ); 501 502 #if !defined(__CFA_NO_STATISTICS__) 503 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED); 504 else __tls_stats()->ready.push.local.success++; 505 #endif 506 507 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first); 508 509 } 510 511 // Pop from the ready queue from a given cluster 512 __attribute__((hot)) thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 513 /* paranoid */ verify( lanes.count > 0 ); 514 /* paranoid */ verify( kernelTLS().this_processor ); 515 516 processor * const proc = kernelTLS().this_processor; 517 const int cpu = __kernel_getcpu(); 518 /* paranoid */ verify(cpu >= 0); 519 /* paranoid */ verify(cpu < cpu_info.hthrd_count); 520 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count); 521 522 const cpu_map_entry_t & map = cpu_info.llc_map[cpu]; 523 /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count); 524 /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count); 525 /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %zu lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR); 526 527 const int start = map.self * READYQ_SHARD_FACTOR; 528 const unsigned long long ctsc = rdtscl(); 529 530 // Did we already have a help target 531 if(proc->rdq.target == MAX) { 532 unsigned long long max = 0; 533 for(i; READYQ_SHARD_FACTOR) { 534 unsigned long long tsc = moving_average(ctsc, ts(lanes.data[start + i]), lanes.tscs[start + i].ma); 535 if(tsc > max) max = tsc; 536 } 537 // proc->rdq.cutoff = (max + 2 * max) / 2; 538 /* paranoid */ verify(lanes.count < 65536); // The following code assumes max 65536 cores. 539 /* paranoid */ verify(map.count < 65536); // The following code assumes max 65536 cores. 540 541 if(0 == (__tls_rand() % 100)) { 542 proc->rdq.target = __tls_rand() % lanes.count; 543 } else { 544 unsigned cpu_chaos = map.start + (__tls_rand() % map.count); 545 proc->rdq.target = (cpu_chaos * READYQ_SHARD_FACTOR) + (__tls_rand() % READYQ_SHARD_FACTOR); 546 /* paranoid */ verify(proc->rdq.target >= (map.start * READYQ_SHARD_FACTOR)); 547 /* paranoid */ verify(proc->rdq.target < ((map.start + map.count) * READYQ_SHARD_FACTOR)); 548 } 549 550 /* paranoid */ verify(proc->rdq.target != MAX); 551 } 552 else { 553 unsigned long long max = 0; 554 for(i; READYQ_SHARD_FACTOR) { 555 unsigned long long tsc = moving_average(ctsc, ts(lanes.data[start + i]), lanes.tscs[start + i].ma); 556 if(tsc > max) max = tsc; 557 } 558 const unsigned long long cutoff = (max + 2 * max) / 2; 559 { 560 unsigned target = proc->rdq.target; 561 proc->rdq.target = MAX; 562 lanes.help[target / READYQ_SHARD_FACTOR].tri++; 563 if(moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) { 564 thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 565 proc->rdq.last = target; 566 if(t) return t; 567 } 568 proc->rdq.target = MAX; 569 } 570 571 unsigned last = proc->rdq.last; 572 if(last != MAX && moving_average(ctsc, lanes.tscs[last].tv, lanes.tscs[last].ma) > cutoff) { 573 thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help)); 574 if(t) return t; 575 } 576 else { 577 proc->rdq.last = MAX; 578 } 579 } 580 581 for(READYQ_SHARD_FACTOR) { 582 unsigned i = start + (proc->rdq.itr++ % READYQ_SHARD_FACTOR); 583 if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t; 584 } 585 586 // All lanes where empty return 0p 587 return 0p; 588 } 589 590 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) { 591 processor * const proc = kernelTLS().this_processor; 592 unsigned last = proc->rdq.last; 593 if(last != MAX) { 594 struct thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.steal)); 595 if(t) return t; 596 proc->rdq.last = MAX; 597 } 598 599 unsigned i = __tls_rand() % lanes.count; 600 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal)); 601 } 602 __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) { 603 return search(cltr); 604 } 605 #endif 606 #if defined(USE_RELAXED_FIFO) 607 //----------------------------------------------------------------------- 608 // get index from random number with or without bias towards queues 609 static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred) { 610 unsigned i; 611 bool local; 612 unsigned rlow = r % BIAS; 613 unsigned rhigh = r / BIAS; 614 if((0 != rlow) && preferred >= 0) { 615 // (BIAS - 1) out of BIAS chances 616 // Use perferred queues 617 i = preferred + (rhigh % READYQ_SHARD_FACTOR); 618 local = true; 619 } 620 else { 621 // 1 out of BIAS chances 622 // Use all queues 623 i = rhigh; 624 local = false; 625 } 626 return [i, local]; 627 } 628 629 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 630 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 631 632 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 633 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count ); 634 635 bool local; 636 int preferred = external ? -1 : kernelTLS().this_processor->rdq.id; 637 638 // Try to pick a lane and lock it 639 unsigned i; 640 do { 641 // Pick the index of a lane 642 unsigned r = __tls_rand_fwd(); 643 [i, local] = idx_from_r(r, preferred); 644 645 i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); 646 647 #if !defined(__CFA_NO_STATISTICS__) 648 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED); 649 else if(local) __tls_stats()->ready.push.local.attempt++; 650 else __tls_stats()->ready.push.share.attempt++; 651 #endif 652 653 // If we can't lock it retry 654 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 655 656 // Actually push it 657 push(lanes.data[i], thrd); 658 659 // Unlock and return 660 __atomic_unlock( &lanes.data[i].lock ); 661 662 // Mark the current index in the tls rng instance as having an item 663 __tls_rand_advance_bck(); 664 665 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first); 666 667 // Update statistics 668 #if !defined(__CFA_NO_STATISTICS__) 669 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED); 670 else if(local) __tls_stats()->ready.push.local.success++; 671 else __tls_stats()->ready.push.share.success++; 672 #endif 673 } 674 675 // Pop from the ready queue from a given cluster 676 __attribute__((hot)) thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 677 /* paranoid */ verify( lanes.count > 0 ); 678 /* paranoid */ verify( kernelTLS().this_processor ); 679 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count ); 680 681 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); 682 int preferred = kernelTLS().this_processor->rdq.id; 683 684 685 // As long as the list is not empty, try finding a lane that isn't empty and pop from it 686 for(25) { 687 // Pick two lists at random 688 unsigned ri = __tls_rand_bck(); 689 unsigned rj = __tls_rand_bck(); 690 691 unsigned i, j; 692 __attribute__((unused)) bool locali, localj; 693 [i, locali] = idx_from_r(ri, preferred); 694 [j, localj] = idx_from_r(rj, preferred); 695 696 i %= count; 697 j %= count; 698 699 // try popping from the 2 picked lists 700 struct thread$ * thrd = try_pop(cltr, i, j __STATS(, *(locali || localj ? &__tls_stats()->ready.pop.local : &__tls_stats()->ready.pop.help))); 701 if(thrd) { 702 return thrd; 703 } 704 } 705 706 // All lanes where empty return 0p 707 return 0p; 708 } 709 710 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) { return pop_fast(cltr); } 711 __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) { 712 return search(cltr); 713 } 714 #endif 715 #if defined(USE_WORK_STEALING) 716 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 717 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 718 719 // #define USE_PREFERRED 720 #if !defined(USE_PREFERRED) 721 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 722 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count ); 723 #else 724 unsigned preferred = thrd->preferred; 725 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == MAX || thrd->curr_cluster != cltr; 726 /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count ); 727 728 unsigned r = preferred % READYQ_SHARD_FACTOR; 729 const unsigned start = preferred - r; 730 #endif 731 732 // Try to pick a lane and lock it 733 unsigned i; 734 do { 735 #if !defined(__CFA_NO_STATISTICS__) 736 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED); 737 else __tls_stats()->ready.push.local.attempt++; 738 #endif 739 740 if(unlikely(external)) { 741 i = __tls_rand() % lanes.count; 742 } 743 else { 744 #if !defined(USE_PREFERRED) 745 processor * proc = kernelTLS().this_processor; 746 unsigned r = proc->rdq.its++; 747 i = proc->rdq.id + (r % READYQ_SHARD_FACTOR); 748 #else 749 i = start + (r++ % READYQ_SHARD_FACTOR); 750 #endif 751 } 752 // If we can't lock it retry 753 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 754 755 // Actually push it 756 push(lanes.data[i], thrd); 757 758 // Unlock and return 759 __atomic_unlock( &lanes.data[i].lock ); 760 761 #if !defined(__CFA_NO_STATISTICS__) 762 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED); 763 else __tls_stats()->ready.push.local.success++; 764 #endif 765 766 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first); 767 } 768 769 // Pop from the ready queue from a given cluster 770 __attribute__((hot)) thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 771 /* paranoid */ verify( lanes.count > 0 ); 772 /* paranoid */ verify( kernelTLS().this_processor ); 773 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count ); 774 775 processor * proc = kernelTLS().this_processor; 776 777 if(proc->rdq.target == MAX) { 778 unsigned long long min = ts(lanes.data[proc->rdq.id]); 779 for(int i = 0; i < READYQ_SHARD_FACTOR; i++) { 780 unsigned long long tsc = ts(lanes.data[proc->rdq.id + i]); 781 if(tsc < min) min = tsc; 782 } 783 proc->rdq.cutoff = min; 784 proc->rdq.target = __tls_rand() % lanes.count; 785 } 786 else { 787 unsigned target = proc->rdq.target; 788 proc->rdq.target = MAX; 789 const unsigned long long bias = 0; //2_500_000_000; 790 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff; 791 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) { 155 792 thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 156 793 if(t) return t; 157 794 } 158 795 } 159 proc->rdq.target = MAX; 160 } 161 162 for(__shard_factor.readyq) { 163 unsigned i = this + (proc->rdq.itr++ % __shard_factor.readyq); 164 if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t; 165 } 166 167 // All lanes where empty return 0p 168 return 0p; 169 170 } 171 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) { 172 unsigned i = __tls_rand() % (cltr->sched.readyQ.count); 173 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal)); 174 } 175 __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) { 176 return search(cltr); 177 } 796 797 for(READYQ_SHARD_FACTOR) { 798 unsigned i = proc->rdq.id + (proc->rdq.itr++ % READYQ_SHARD_FACTOR); 799 if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t; 800 } 801 return 0p; 802 } 803 804 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) { 805 unsigned i = __tls_rand() % lanes.count; 806 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal)); 807 } 808 809 __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) with (cltr->ready_queue) { 810 return search(cltr); 811 } 812 #endif 178 813 179 814 //======================================================================= … … 185 820 //----------------------------------------------------------------------- 186 821 // try to pop from a lane given by index w 187 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr-> sched) {188 /* paranoid */ verify( w < readyQ.count );822 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) { 823 /* paranoid */ verify( w < lanes.count ); 189 824 __STATS( stats.attempt++; ) 190 825 191 826 // Get relevant elements locally 192 __intrusive_lane_t & lane = readyQ.data[w];827 __intrusive_lane_t & lane = lanes.data[w]; 193 828 194 829 // If list looks empty retry … … 210 845 // Actually pop the list 211 846 struct thread$ * thrd; 212 unsigned long long tsc_before = ts(lane); 847 #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING) 848 unsigned long long tsc_before = ts(lane); 849 #endif 213 850 unsigned long long tsv; 214 851 [thrd, tsv] = pop(lane); … … 224 861 __STATS( stats.success++; ) 225 862 226 if (tsv != MAX) { 227 unsigned long long now = rdtscl(); 228 unsigned long long pma = __atomic_load_n(&readyQ.tscs[w].ma, __ATOMIC_RELAXED); 229 __atomic_store_n(&readyQ.tscs[w].tv, tsv, __ATOMIC_RELAXED); 230 __atomic_store_n(&readyQ.tscs[w].ma, moving_average(now, tsc_before, pma), __ATOMIC_RELAXED); 231 } 232 233 thrd->preferred = w / __shard_factor.readyq; 863 #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING) 864 if (tsv != MAX) { 865 unsigned long long now = rdtscl(); 866 unsigned long long pma = __atomic_load_n(&lanes.tscs[w].ma, __ATOMIC_RELAXED); 867 __atomic_store_n(&lanes.tscs[w].tv, tsv, __ATOMIC_RELAXED); 868 __atomic_store_n(&lanes.tscs[w].ma, moving_average(now, tsc_before, pma), __ATOMIC_RELAXED); 869 } 870 #endif 871 872 #if defined(USE_AWARE_STEALING) || defined(USE_CPU_WORK_STEALING) 873 thrd->preferred = w / READYQ_SHARD_FACTOR; 874 #else 875 thrd->preferred = w; 876 #endif 234 877 235 878 // return the popped thread … … 240 883 // try to pop from any lanes making sure you don't miss any threads push 241 884 // before the start of the function 242 static inline struct thread$ * search(struct cluster * cltr) { 243 const size_t lanes_count = cltr->sched.readyQ.count; 244 /* paranoid */ verify( lanes_count > 0 ); 245 unsigned count = __atomic_load_n( &lanes_count, __ATOMIC_RELAXED ); 885 static inline struct thread$ * search(struct cluster * cltr) with (cltr->ready_queue) { 886 /* paranoid */ verify( lanes.count > 0 ); 887 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); 246 888 unsigned offset = __tls_rand(); 247 889 for(i; count) { … … 260 902 // get preferred ready for new thread 261 903 unsigned ready_queue_new_preferred() { 262 unsigned pref = MAX;904 unsigned pref = 0; 263 905 if(struct thread$ * thrd = publicTLS_get( this_thread )) { 264 906 pref = thrd->preferred; 265 907 } 908 else { 909 #if defined(USE_CPU_WORK_STEALING) 910 pref = __kernel_getcpu(); 911 #endif 912 } 913 914 #if defined(USE_CPU_WORK_STEALING) 915 /* paranoid */ verify(pref >= 0); 916 /* paranoid */ verify(pref < cpu_info.hthrd_count); 917 #endif 266 918 267 919 return pref; 920 } 921 922 //----------------------------------------------------------------------- 923 // Check that all the intrusive queues in the data structure are still consistent 924 static void check( __ready_queue_t & q ) with (q) { 925 #if defined(__CFA_WITH_VERIFY__) 926 { 927 for( idx ; lanes.count ) { 928 __intrusive_lane_t & sl = lanes.data[idx]; 929 assert(!lanes.data[idx].lock); 930 931 if(is_empty(sl)) { 932 assert( sl.anchor.next == 0p ); 933 assert( sl.anchor.ts == -1llu ); 934 assert( mock_head(sl) == sl.prev ); 935 } else { 936 assert( sl.anchor.next != 0p ); 937 assert( sl.anchor.ts != -1llu ); 938 assert( mock_head(sl) != sl.prev ); 939 } 940 } 941 } 942 #endif 268 943 } 269 944 270 945 //----------------------------------------------------------------------- 271 946 // Given 2 indexes, pick the list with the oldest push an try to pop from it 272 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr-> sched) {947 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) { 273 948 // Pick the bet list 274 949 int w = i; 275 if( __builtin_expect(!is_empty( readyQ.data[j]), true) ) {276 w = (ts( readyQ.data[i]) < ts(readyQ.data[j])) ? i : j;950 if( __builtin_expect(!is_empty(lanes.data[j]), true) ) { 951 w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j; 277 952 } 278 953 279 954 return try_pop(cltr, w __STATS(, stats)); 280 955 } 956 957 // Call this function of the intrusive list was moved using memcpy 958 // fixes the list so that the pointers back to anchors aren't left dangling 959 static inline void fix(__intrusive_lane_t & ll) { 960 if(is_empty(ll)) { 961 verify(ll.anchor.next == 0p); 962 ll.prev = mock_head(ll); 963 } 964 } 965 966 static void assign_list(unsigned & value, dlist(processor) & list, unsigned count) { 967 processor * it = &list`first; 968 for(unsigned i = 0; i < count; i++) { 969 /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count); 970 it->rdq.id = value; 971 it->rdq.target = MAX; 972 value += READYQ_SHARD_FACTOR; 973 it = &(*it)`next; 974 } 975 } 976 977 static void reassign_cltr_id(struct cluster * cltr) { 978 unsigned preferred = 0; 979 assign_list(preferred, cltr->procs.actives, cltr->procs.total - cltr->procs.idle); 980 assign_list(preferred, cltr->procs.idles , cltr->procs.idle ); 981 } 982 983 static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) { 984 #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) 985 lanes.tscs = alloc(lanes.count, lanes.tscs`realloc); 986 for(i; lanes.count) { 987 lanes.tscs[i].tv = rdtscl(); 988 lanes.tscs[i].ma = 0; 989 } 990 #endif 991 } 992 993 #if defined(USE_CPU_WORK_STEALING) 994 // ready_queue size is fixed in this case 995 void ready_queue_grow(struct cluster * cltr) {} 996 void ready_queue_shrink(struct cluster * cltr) {} 997 #else 998 // Grow the ready queue 999 void ready_queue_grow(struct cluster * cltr) { 1000 size_t ncount; 1001 int target = cltr->procs.total; 1002 1003 /* paranoid */ verify( ready_mutate_islocked() ); 1004 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n"); 1005 1006 // Make sure that everything is consistent 1007 /* paranoid */ check( cltr->ready_queue ); 1008 1009 // grow the ready queue 1010 with( cltr->ready_queue ) { 1011 // Find new count 1012 // Make sure we always have atleast 1 list 1013 if(target >= 2) { 1014 ncount = target * READYQ_SHARD_FACTOR; 1015 } else { 1016 ncount = SEQUENTIAL_SHARD; 1017 } 1018 1019 // Allocate new array (uses realloc and memcpies the data) 1020 lanes.data = alloc( ncount, lanes.data`realloc ); 1021 1022 // Fix the moved data 1023 for( idx; (size_t)lanes.count ) { 1024 fix(lanes.data[idx]); 1025 } 1026 1027 // Construct new data 1028 for( idx; (size_t)lanes.count ~ ncount) { 1029 (lanes.data[idx]){}; 1030 } 1031 1032 // Update original 1033 lanes.count = ncount; 1034 1035 lanes.caches = alloc( target, lanes.caches`realloc ); 1036 } 1037 1038 fix_times(cltr); 1039 1040 reassign_cltr_id(cltr); 1041 1042 // Make sure that everything is consistent 1043 /* paranoid */ check( cltr->ready_queue ); 1044 1045 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n"); 1046 1047 /* paranoid */ verify( ready_mutate_islocked() ); 1048 } 1049 1050 // Shrink the ready queue 1051 void ready_queue_shrink(struct cluster * cltr) { 1052 /* paranoid */ verify( ready_mutate_islocked() ); 1053 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n"); 1054 1055 // Make sure that everything is consistent 1056 /* paranoid */ check( cltr->ready_queue ); 1057 1058 int target = cltr->procs.total; 1059 1060 with( cltr->ready_queue ) { 1061 // Remember old count 1062 size_t ocount = lanes.count; 1063 1064 // Find new count 1065 // Make sure we always have atleast 1 list 1066 lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD; 1067 /* paranoid */ verify( ocount >= lanes.count ); 1068 /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 ); 1069 1070 // for printing count the number of displaced threads 1071 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) 1072 __attribute__((unused)) size_t displaced = 0; 1073 #endif 1074 1075 // redistribute old data 1076 for( idx; (size_t)lanes.count ~ ocount) { 1077 // Lock is not strictly needed but makes checking invariants much easier 1078 __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock); 1079 verify(locked); 1080 1081 // As long as we can pop from this lane to push the threads somewhere else in the queue 1082 while(!is_empty(lanes.data[idx])) { 1083 struct thread$ * thrd; 1084 unsigned long long _; 1085 [thrd, _] = pop(lanes.data[idx]); 1086 1087 push(cltr, thrd, true); 1088 1089 // for printing count the number of displaced threads 1090 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) 1091 displaced++; 1092 #endif 1093 } 1094 1095 // Unlock the lane 1096 __atomic_unlock(&lanes.data[idx].lock); 1097 1098 // TODO print the queue statistics here 1099 1100 ^(lanes.data[idx]){}; 1101 } 1102 1103 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced); 1104 1105 // Allocate new array (uses realloc and memcpies the data) 1106 lanes.data = alloc( lanes.count, lanes.data`realloc ); 1107 1108 // Fix the moved data 1109 for( idx; (size_t)lanes.count ) { 1110 fix(lanes.data[idx]); 1111 } 1112 1113 lanes.caches = alloc( target, lanes.caches`realloc ); 1114 } 1115 1116 fix_times(cltr); 1117 1118 1119 reassign_cltr_id(cltr); 1120 1121 // Make sure that everything is consistent 1122 /* paranoid */ check( cltr->ready_queue ); 1123 1124 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n"); 1125 /* paranoid */ verify( ready_mutate_islocked() ); 1126 } 1127 #endif 1128 1129 #if !defined(__CFA_NO_STATISTICS__) 1130 unsigned cnt(const __ready_queue_t & this, unsigned idx) { 1131 /* paranoid */ verify(this.lanes.count > idx); 1132 return this.lanes.data[idx].cnt; 1133 } 1134 #endif 1135 1136 1137 #if defined(CFA_HAVE_LINUX_LIBRSEQ) 1138 // No definition needed 1139 #elif defined(CFA_HAVE_LINUX_RSEQ_H) 1140 1141 #if defined( __x86_64 ) || defined( __i386 ) 1142 #define RSEQ_SIG 0x53053053 1143 #elif defined( __ARM_ARCH ) 1144 #ifdef __ARMEB__ 1145 #define RSEQ_SIG 0xf3def5e7 /* udf #24035 ; 0x5de3 (ARMv6+) */ 1146 #else 1147 #define RSEQ_SIG 0xe7f5def3 /* udf #24035 ; 0x5de3 */ 1148 #endif 1149 #endif 1150 1151 extern void __disable_interrupts_hard(); 1152 extern void __enable_interrupts_hard(); 1153 1154 static void __kernel_raw_rseq_register (void) { 1155 /* paranoid */ verify( __cfaabi_rseq.cpu_id == RSEQ_CPU_ID_UNINITIALIZED ); 1156 1157 // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, (sigset_t *)0p, _NSIG / 8); 1158 int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, RSEQ_SIG); 1159 if(ret != 0) { 1160 int e = errno; 1161 switch(e) { 1162 case EINVAL: abort("KERNEL ERROR: rseq register invalid argument"); 1163 case ENOSYS: abort("KERNEL ERROR: rseq register no supported"); 1164 case EFAULT: abort("KERNEL ERROR: rseq register with invalid argument"); 1165 case EBUSY : abort("KERNEL ERROR: rseq register already registered"); 1166 case EPERM : abort("KERNEL ERROR: rseq register sig argument on unregistration does not match the signature received on registration"); 1167 default: abort("KERNEL ERROR: rseq register unexpected return %d", e); 1168 } 1169 } 1170 } 1171 1172 static void __kernel_raw_rseq_unregister(void) { 1173 /* paranoid */ verify( __cfaabi_rseq.cpu_id >= 0 ); 1174 1175 // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, (sigset_t *)0p, _NSIG / 8); 1176 int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, RSEQ_SIG); 1177 if(ret != 0) { 1178 int e = errno; 1179 switch(e) { 1180 case EINVAL: abort("KERNEL ERROR: rseq unregister invalid argument"); 1181 case ENOSYS: abort("KERNEL ERROR: rseq unregister no supported"); 1182 case EFAULT: abort("KERNEL ERROR: rseq unregister with invalid argument"); 1183 case EBUSY : abort("KERNEL ERROR: rseq unregister already registered"); 1184 case EPERM : abort("KERNEL ERROR: rseq unregister sig argument on unregistration does not match the signature received on registration"); 1185 default: abort("KERNEL ERROR: rseq unregisteunexpected return %d", e); 1186 } 1187 } 1188 } 1189 #else 1190 // No definition needed 1191 #endif -
libcfa/src/concurrency/ready_subqueue.hfa
rc7f2d9b r623d1c8 25 25 ); 26 26 return rhead; 27 } 28 29 // Ctor 30 void ?{}( __intrusive_lane_t & this ) { 31 this.lock = false; 32 this.prev = mock_head(this); 33 this.anchor.next = 0p; 34 this.anchor.ts = -1llu; 35 #if !defined(__CFA_NO_STATISTICS__) 36 this.cnt = 0; 37 #endif 38 39 // We add a boat-load of assertions here because the anchor code is very fragile 40 /* paranoid */ _Static_assert( offsetof( thread$, link ) == offsetof(__intrusive_lane_t, anchor) ); 41 /* paranoid */ verify( offsetof( thread$, link ) == offsetof(__intrusive_lane_t, anchor) ); 42 /* paranoid */ verify( ((uintptr_t)( mock_head(this) ) + offsetof( thread$, link )) == (uintptr_t)(&this.anchor) ); 43 /* paranoid */ verify( &mock_head(this)->link.next == &this.anchor.next ); 44 /* paranoid */ verify( &mock_head(this)->link.ts == &this.anchor.ts ); 45 /* paranoid */ verify( mock_head(this)->link.next == 0p ); 46 /* paranoid */ verify( mock_head(this)->link.ts == -1llu ); 47 /* paranoid */ verify( mock_head(this) == this.prev ); 48 /* paranoid */ verify( __alignof__(__intrusive_lane_t) == 128 ); 49 /* paranoid */ verify( __alignof__(this) == 128 ); 50 /* paranoid */ verifyf( ((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128) ); 51 } 52 53 // Dtor is trivial 54 void ^?{}( __intrusive_lane_t & this ) { 55 // Make sure the list is empty 56 /* paranoid */ verify( this.anchor.next == 0p ); 57 /* paranoid */ verify( this.anchor.ts == -1llu ); 58 /* paranoid */ verify( mock_head(this) == this.prev ); 27 59 } 28 60
Note:
See TracChangeset
for help on using the changeset viewer.