- File:
-
- 1 edited
-
libcfa/src/concurrency/ready_queue.cfa (modified) (26 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/ready_queue.cfa
r46bbcaf r078fb05 20 20 21 21 22 //#define USE_RELAXED_FIFO22 #define USE_RELAXED_FIFO 23 23 // #define USE_WORK_STEALING 24 24 // #define USE_CPU_WORK_STEALING 25 #define USE_AWARE_STEALING26 25 27 26 #include "bits/defs.hfa" … … 30 29 31 30 #include "stdlib.hfa" 32 #include "limits.hfa"33 31 #include "math.hfa" 34 32 … … 56 54 #endif 57 55 58 #if defined(USE_AWARE_STEALING) 59 #define READYQ_SHARD_FACTOR 2 60 #define SEQUENTIAL_SHARD 2 61 #elif defined(USE_CPU_WORK_STEALING) 56 #if defined(USE_CPU_WORK_STEALING) 62 57 #define READYQ_SHARD_FACTOR 2 63 58 #elif defined(USE_RELAXED_FIFO) … … 143 138 __kernel_rseq_register(); 144 139 140 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc); 145 141 bool * handle = (bool *)&kernelTLS().sched_lock; 146 142 … … 178 174 } 179 175 176 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n); 177 180 178 // Return new spot. 181 179 /* paranoid */ verify(n < ready); … … 192 190 193 191 __atomic_store_n(cell, 0p, __ATOMIC_RELEASE); 192 193 __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc); 194 194 195 195 __kernel_rseq_unregister(); … … 201 201 uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) { 202 202 /* paranoid */ verify( ! __preemption_enabled() ); 203 /* paranoid */ verify( ! kernelTLS().sched_lock ); 203 204 204 205 // Step 1 : lock global lock … … 206 207 // to simply lock their own lock and enter. 207 208 __atomic_acquire( &write_lock ); 208 209 // Make sure we won't deadlock ourself210 // Checking before acquiring the writer lock isn't safe211 // because someone else could have locked us.212 /* paranoid */ verify( ! kernelTLS().sched_lock );213 209 214 210 // Step 2 : lock per-proc lock … … 248 244 249 245 //======================================================================= 250 // caches handling251 252 struct __attribute__((aligned(128))) __ready_queue_caches_t {253 // Count States:254 // - 0 : No one is looking after this cache255 // - 1 : No one is looking after this cache, BUT it's not empty256 // - 2+ : At least one processor is looking after this cache257 volatile unsigned count;258 };259 260 void ?{}(__ready_queue_caches_t & this) { this.count = 0; }261 void ^?{}(__ready_queue_caches_t & this) {}262 263 static inline void depart(__ready_queue_caches_t & cache) {264 /* paranoid */ verify( cache.count > 1);265 __atomic_fetch_add(&cache.count, -1, __ATOMIC_SEQ_CST);266 /* paranoid */ verify( cache.count != 0);267 /* paranoid */ verify( cache.count < 65536 ); // This verify assumes no cluster will have more than 65000 kernel threads mapped to a single cache, which could be correct but is super weird.268 }269 270 static inline void arrive(__ready_queue_caches_t & cache) {271 // for() {272 // unsigned expected = cache.count;273 // unsigned desired = 0 == expected ? 2 : expected + 1;274 // }275 }276 277 //=======================================================================278 246 // Cforall Ready Queue used for scheduling 279 247 //======================================================================= 280 unsigned long long moving_average(unsigned long long currtsc, unsigned long long instsc, unsigned long long old_avg) { 281 /* paranoid */ verifyf( currtsc < 45000000000000000, "Suspiciously large current time: %'llu (%llx)\n", currtsc, currtsc ); 282 /* paranoid */ verifyf( instsc < 45000000000000000, "Suspiciously large insert time: %'llu (%llx)\n", instsc, instsc ); 283 /* paranoid */ verifyf( old_avg < 15000000000000, "Suspiciously large previous average: %'llu (%llx)\n", old_avg, old_avg ); 284 285 const unsigned long long new_val = currtsc > instsc ? currtsc - instsc : 0; 286 const unsigned long long total_weight = 16; 287 const unsigned long long new_weight = 4; 288 const unsigned long long old_weight = total_weight - new_weight; 289 const unsigned long long ret = ((new_weight * new_val) + (old_weight * old_avg)) / total_weight; 290 return ret; 248 unsigned long long moving_average(unsigned long long nval, unsigned long long oval) { 249 const unsigned long long tw = 16; 250 const unsigned long long nw = 4; 251 const unsigned long long ow = tw - nw; 252 return ((nw * nval) + (ow * oval)) / tw; 291 253 } 292 254 … … 309 271 } 310 272 #else 311 lanes.data = 0p; 312 lanes.tscs = 0p; 313 lanes.caches = 0p; 314 lanes.help = 0p; 315 lanes.count = 0; 273 lanes.data = 0p; 274 lanes.tscs = 0p; 275 lanes.help = 0p; 276 lanes.count = 0; 316 277 #endif 317 278 } … … 324 285 free(lanes.data); 325 286 free(lanes.tscs); 326 free(lanes.caches);327 287 free(lanes.help); 328 288 } 329 289 330 290 //----------------------------------------------------------------------- 331 #if defined(USE_AWARE_STEALING)332 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {333 processor * const proc = kernelTLS().this_processor;334 const bool external = (!proc) || (cltr != proc->cltr);335 const bool remote = hint == UNPARK_REMOTE;336 337 unsigned i;338 if( external || remote ) {339 // Figure out where thread was last time and make sure it's valid340 /* paranoid */ verify(thrd->preferred >= 0);341 if(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count) {342 /* paranoid */ verify(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count);343 unsigned start = thrd->preferred * READYQ_SHARD_FACTOR;344 do {345 unsigned r = __tls_rand();346 i = start + (r % READYQ_SHARD_FACTOR);347 /* paranoid */ verify( i < lanes.count );348 // If we can't lock it retry349 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );350 } else {351 do {352 i = __tls_rand() % lanes.count;353 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );354 }355 } else {356 do {357 unsigned r = proc->rdq.its++;358 i = proc->rdq.id + (r % READYQ_SHARD_FACTOR);359 /* paranoid */ verify( i < lanes.count );360 // If we can't lock it retry361 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );362 }363 364 // Actually push it365 push(lanes.data[i], thrd);366 367 // Unlock and return368 __atomic_unlock( &lanes.data[i].lock );369 370 #if !defined(__CFA_NO_STATISTICS__)371 if(unlikely(external || remote)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);372 else __tls_stats()->ready.push.local.success++;373 #endif374 }375 376 static inline unsigned long long calc_cutoff(const unsigned long long ctsc, const processor * proc, __ready_queue_t & rdq) {377 unsigned start = proc->rdq.id;378 unsigned long long max = 0;379 for(i; READYQ_SHARD_FACTOR) {380 unsigned long long ptsc = ts(rdq.lanes.data[start + i]);381 if(ptsc != -1ull) {382 /* paranoid */ verify( start + i < rdq.lanes.count );383 unsigned long long tsc = moving_average(ctsc, ptsc, rdq.lanes.tscs[start + i].ma);384 if(tsc > max) max = tsc;385 }386 }387 return (max + 2 * max) / 2;388 }389 390 __attribute__((hot)) struct thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {391 /* paranoid */ verify( lanes.count > 0 );392 /* paranoid */ verify( kernelTLS().this_processor );393 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );394 395 processor * const proc = kernelTLS().this_processor;396 unsigned this = proc->rdq.id;397 /* paranoid */ verify( this < lanes.count );398 __cfadbg_print_safe(ready_queue, "Kernel : pop from %u\n", this);399 400 // Figure out the current cpu and make sure it is valid401 const int cpu = __kernel_getcpu();402 /* paranoid */ verify(cpu >= 0);403 /* paranoid */ verify(cpu < cpu_info.hthrd_count);404 unsigned this_cache = cpu_info.llc_map[cpu].cache;405 406 // Super important: don't write the same value over and over again407 // We want to maximise our chances that his particular values stays in cache408 if(lanes.caches[this / READYQ_SHARD_FACTOR].id != this_cache)409 __atomic_store_n(&lanes.caches[this / READYQ_SHARD_FACTOR].id, this_cache, __ATOMIC_RELAXED);410 411 const unsigned long long ctsc = rdtscl();412 413 if(proc->rdq.target == MAX) {414 uint64_t chaos = __tls_rand();415 unsigned ext = chaos & 0xff;416 unsigned other = (chaos >> 8) % (lanes.count);417 418 if(ext < 3 || __atomic_load_n(&lanes.caches[other / READYQ_SHARD_FACTOR].id, __ATOMIC_RELAXED) == this_cache) {419 proc->rdq.target = other;420 }421 }422 else {423 const unsigned target = proc->rdq.target;424 __cfadbg_print_safe(ready_queue, "Kernel : %u considering helping %u, tcsc %llu\n", this, target, lanes.tscs[target].tv);425 /* paranoid */ verify( lanes.tscs[target].tv != MAX );426 if(target < lanes.count) {427 const unsigned long long cutoff = calc_cutoff(ctsc, proc, cltr->ready_queue);428 const unsigned long long age = moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma);429 __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no");430 if(age > cutoff) {431 thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));432 if(t) return t;433 }434 }435 proc->rdq.target = MAX;436 }437 438 for(READYQ_SHARD_FACTOR) {439 unsigned i = this + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);440 if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;441 }442 443 // All lanes where empty return 0p444 return 0p;445 446 }447 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {448 unsigned i = __tls_rand() % lanes.count;449 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));450 }451 __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) {452 return search(cltr);453 }454 #endif455 291 #if defined(USE_CPU_WORK_STEALING) 456 292 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { … … 514 350 /* paranoid */ verify( kernelTLS().this_processor ); 515 351 516 processor * const proc = kernelTLS().this_processor;517 352 const int cpu = __kernel_getcpu(); 518 353 /* paranoid */ verify(cpu >= 0); … … 525 360 /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %zu lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR); 526 361 362 processor * const proc = kernelTLS().this_processor; 527 363 const int start = map.self * READYQ_SHARD_FACTOR; 528 364 const unsigned long long ctsc = rdtscl(); 529 365 530 366 // Did we already have a help target 531 if(proc->rdq.target == MAX) {367 if(proc->rdq.target == -1u) { 532 368 unsigned long long max = 0; 533 369 for(i; READYQ_SHARD_FACTOR) { 534 unsigned long long tsc = moving_average(ctsc ,ts(lanes.data[start + i]), lanes.tscs[start + i].ma);370 unsigned long long tsc = moving_average(ctsc - ts(lanes.data[start + i]), lanes.tscs[start + i].ma); 535 371 if(tsc > max) max = tsc; 536 372 } 537 //proc->rdq.cutoff = (max + 2 * max) / 2;373 proc->rdq.cutoff = (max + 2 * max) / 2; 538 374 /* paranoid */ verify(lanes.count < 65536); // The following code assumes max 65536 cores. 539 375 /* paranoid */ verify(map.count < 65536); // The following code assumes max 65536 cores. … … 548 384 } 549 385 550 /* paranoid */ verify(proc->rdq.target != MAX);386 /* paranoid */ verify(proc->rdq.target != -1u); 551 387 } 552 388 else { 553 389 unsigned long long max = 0; 554 390 for(i; READYQ_SHARD_FACTOR) { 555 unsigned long long tsc = moving_average(ctsc ,ts(lanes.data[start + i]), lanes.tscs[start + i].ma);391 unsigned long long tsc = moving_average(ctsc - ts(lanes.data[start + i]), lanes.tscs[start + i].ma); 556 392 if(tsc > max) max = tsc; 557 393 } … … 559 395 { 560 396 unsigned target = proc->rdq.target; 561 proc->rdq.target = MAX;397 proc->rdq.target = -1u; 562 398 lanes.help[target / READYQ_SHARD_FACTOR].tri++; 563 if(moving_average(ctsc ,lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) {399 if(moving_average(ctsc - lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) { 564 400 thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 565 401 proc->rdq.last = target; 566 402 if(t) return t; 403 else proc->rdq.target = -1u; 567 404 } 568 proc->rdq.target = MAX;405 else proc->rdq.target = -1u; 569 406 } 570 407 571 408 unsigned last = proc->rdq.last; 572 if(last != MAX && moving_average(ctsc, lanes.tscs[last].tv, lanes.tscs[last].ma) >cutoff) {409 if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) < cutoff) { 573 410 thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help)); 574 411 if(t) return t; 575 412 } 576 413 else { 577 proc->rdq.last = MAX;414 proc->rdq.last = -1u; 578 415 } 579 416 } … … 591 428 processor * const proc = kernelTLS().this_processor; 592 429 unsigned last = proc->rdq.last; 593 if(last != MAX) {430 if(last != -1u) { 594 431 struct thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.steal)); 595 432 if(t) return t; 596 proc->rdq.last = MAX;433 proc->rdq.last = -1u; 597 434 } 598 435 … … 723 560 #else 724 561 unsigned preferred = thrd->preferred; 725 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == MAX|| thrd->curr_cluster != cltr;562 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr; 726 563 /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count ); 727 564 … … 775 612 processor * proc = kernelTLS().this_processor; 776 613 777 if(proc->rdq.target == MAX) {614 if(proc->rdq.target == -1u) { 778 615 unsigned long long min = ts(lanes.data[proc->rdq.id]); 779 616 for(int i = 0; i < READYQ_SHARD_FACTOR; i++) { … … 786 623 else { 787 624 unsigned target = proc->rdq.target; 788 proc->rdq.target = MAX;625 proc->rdq.target = -1u; 789 626 const unsigned long long bias = 0; //2_500_000_000; 790 627 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff; … … 821 658 // try to pop from a lane given by index w 822 659 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) { 823 /* paranoid */ verify( w < lanes.count );824 660 __STATS( stats.attempt++; ) 825 661 … … 845 681 // Actually pop the list 846 682 struct thread$ * thrd; 847 #if defined(USE_ AWARE_STEALING) || defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING)683 #if defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING) 848 684 unsigned long long tsc_before = ts(lane); 849 685 #endif … … 861 697 __STATS( stats.success++; ) 862 698 863 #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING) 864 if (tsv != MAX) { 865 unsigned long long now = rdtscl(); 866 unsigned long long pma = __atomic_load_n(&lanes.tscs[w].ma, __ATOMIC_RELAXED); 867 __atomic_store_n(&lanes.tscs[w].tv, tsv, __ATOMIC_RELAXED); 868 __atomic_store_n(&lanes.tscs[w].ma, moving_average(now, tsc_before, pma), __ATOMIC_RELAXED); 869 } 699 #if defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING) 700 unsigned long long now = rdtscl(); 701 lanes.tscs[w].tv = tsv; 702 lanes.tscs[w].ma = moving_average(now > tsc_before ? now - tsc_before : 0, lanes.tscs[w].ma); 870 703 #endif 871 704 872 #if defined(USE_ AWARE_STEALING) || defined(USE_CPU_WORK_STEALING)705 #if defined(USE_CPU_WORK_STEALING) 873 706 thrd->preferred = w / READYQ_SHARD_FACTOR; 874 707 #else … … 969 802 /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count); 970 803 it->rdq.id = value; 971 it->rdq.target = MAX;804 it->rdq.target = -1u; 972 805 value += READYQ_SHARD_FACTOR; 973 806 it = &(*it)`next; … … 982 815 983 816 static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) { 984 #if defined(USE_ AWARE_STEALING) || defined(USE_WORK_STEALING)817 #if defined(USE_WORK_STEALING) 985 818 lanes.tscs = alloc(lanes.count, lanes.tscs`realloc); 986 819 for(i; lanes.count) { 987 lanes.tscs[i].tv = rdtscl(); 988 lanes.tscs[i].ma = 0; 820 unsigned long long tsc1 = ts(lanes.data[i]); 821 unsigned long long tsc2 = rdtscl(); 822 lanes.tscs[i].tv = min(tsc1, tsc2); 989 823 } 990 824 #endif … … 1032 866 // Update original 1033 867 lanes.count = ncount; 1034 1035 lanes.caches = alloc( target, lanes.caches`realloc );1036 868 } 1037 869 … … 1110 942 fix(lanes.data[idx]); 1111 943 } 1112 1113 lanes.caches = alloc( target, lanes.caches`realloc );1114 944 } 1115 945 1116 946 fix_times(cltr); 1117 1118 947 1119 948 reassign_cltr_id(cltr);
Note:
See TracChangeset
for help on using the changeset viewer.