Changeset a2a4566
- Timestamp:
- Jan 14, 2022, 7:24:45 PM (3 years ago)
- Branches:
- ADT, ast-experimental, enum, forall-pointer-decay, master, pthread-emulation, qualifiedEnum
- Children:
- 0fc447c
- Parents:
- c90db2d
- Location:
- libcfa/src/concurrency
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/kernel.hfa
rc90db2d ra2a4566 67 67 unsigned target; 68 68 unsigned last; 69 unsigned cnt;70 unsigned long long int cutoff;69 signed cpu; 70 // unsigned long long int cutoff; 71 71 } rdq; 72 72 … … 152 152 volatile unsigned long long tv; 153 153 volatile unsigned long long ma; 154 }; 155 156 struct __attribute__((aligned(128))) __cache_id_t { 157 volatile unsigned id; 154 158 }; 155 159 … … 164 168 static inline void ^?{}(__timestamp_t & this) {} 165 169 170 struct __attribute__((aligned(128))) __ready_queue_caches_t; 171 void ?{}(__ready_queue_caches_t & this); 172 void ^?{}(__ready_queue_caches_t & this); 173 166 174 //TODO adjust cache size to ARCHITECTURE 167 // Structure holding the re laxed ready queue175 // Structure holding the ready queue 168 176 struct __ready_queue_t { 169 177 // Data tracking the actual lanes … … 177 185 // Array of times 178 186 __timestamp_t * volatile tscs; 187 188 __cache_id_t * volatile caches; 179 189 180 190 // Array of stats -
libcfa/src/concurrency/ready_queue.cfa
rc90db2d ra2a4566 20 20 21 21 22 #define USE_RELAXED_FIFO22 // #define USE_RELAXED_FIFO 23 23 // #define USE_WORK_STEALING 24 24 // #define USE_CPU_WORK_STEALING 25 #define USE_AWARE_STEALING 25 26 26 27 #include "bits/defs.hfa" … … 29 30 30 31 #include "stdlib.hfa" 32 #include "limits.hfa" 31 33 #include "math.hfa" 32 34 … … 54 56 #endif 55 57 56 #if defined(USE_CPU_WORK_STEALING) 58 #if defined(USE_AWARE_STEALING) 59 #define READYQ_SHARD_FACTOR 2 60 #define SEQUENTIAL_SHARD 2 61 #elif defined(USE_CPU_WORK_STEALING) 57 62 #define READYQ_SHARD_FACTOR 2 58 63 #elif defined(USE_RELAXED_FIFO) … … 138 143 __kernel_rseq_register(); 139 144 140 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);141 145 bool * handle = (bool *)&kernelTLS().sched_lock; 142 146 … … 174 178 } 175 179 176 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);177 178 180 // Return new spot. 179 181 /* paranoid */ verify(n < ready); … … 190 192 191 193 __atomic_store_n(cell, 0p, __ATOMIC_RELEASE); 192 193 __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);194 194 195 195 __kernel_rseq_unregister(); … … 244 244 245 245 //======================================================================= 246 // caches handling 247 248 struct __attribute__((aligned(128))) __ready_queue_caches_t { 249 // Count States: 250 // - 0 : No one is looking after this cache 251 // - 1 : No one is looking after this cache, BUT it's not empty 252 // - 2+ : At least one processor is looking after this cache 253 volatile unsigned count; 254 }; 255 256 void ?{}(__ready_queue_caches_t & this) { this.count = 0; } 257 void ^?{}(__ready_queue_caches_t & this) {} 258 259 static inline void depart(__ready_queue_caches_t & cache) { 260 /* paranoid */ verify( cache.count > 1); 261 __atomic_fetch_add(&cache.count, -1, __ATOMIC_SEQ_CST); 262 /* paranoid */ verify( cache.count != 0); 263 /* paranoid */ verify( cache.count < 65536 ); // This verify assumes no cluster will have more than 65000 kernel threads mapped to a single cache, which could be correct but is super weird. 264 } 265 266 static inline void arrive(__ready_queue_caches_t & cache) { 267 // for() { 268 // unsigned expected = cache.count; 269 // unsigned desired = 0 == expected ? 2 : expected + 1; 270 // } 271 } 272 273 //======================================================================= 246 274 // Cforall Ready Queue used for scheduling 247 275 //======================================================================= 248 unsigned long long moving_average(unsigned long long nval, unsigned long long oval) { 249 const unsigned long long tw = 16; 250 const unsigned long long nw = 4; 251 const unsigned long long ow = tw - nw; 252 return ((nw * nval) + (ow * oval)) / tw; 276 unsigned long long moving_average(unsigned long long currtsc, unsigned long long instsc, unsigned long long old_avg) { 277 /* paranoid */ verifyf( currtsc < 45000000000000000, "Suspiciously large current time: %'llu (%llx)\n", currtsc, currtsc ); 278 /* paranoid */ verifyf( instsc < 45000000000000000, "Suspiciously large insert time: %'llu (%llx)\n", instsc, instsc ); 279 /* paranoid */ verifyf( old_avg < 15000000000000, "Suspiciously large previous average: %'llu (%llx)\n", old_avg, old_avg ); 280 281 const unsigned long long new_val = currtsc > instsc ? currtsc - instsc : 0; 282 const unsigned long long total_weight = 16; 283 const unsigned long long new_weight = 4; 284 const unsigned long long old_weight = total_weight - new_weight; 285 const unsigned long long ret = ((new_weight * new_val) + (old_weight * old_avg)) / total_weight; 286 return ret; 253 287 } 254 288 … … 270 304 lanes.help[idx].tri = 0; 271 305 } 306 307 caches = alloc( cpu_info.llc_count ); 308 for( idx; (size_t)cpu_info.llc_count ) { 309 (caches[idx]){}; 310 } 272 311 #else 273 lanes.data = 0p; 274 lanes.tscs = 0p; 275 lanes.help = 0p; 276 lanes.count = 0; 312 lanes.data = 0p; 313 lanes.tscs = 0p; 314 lanes.caches = 0p; 315 lanes.help = 0p; 316 lanes.count = 0; 277 317 #endif 278 318 } … … 285 325 free(lanes.data); 286 326 free(lanes.tscs); 327 free(lanes.caches); 287 328 free(lanes.help); 288 329 } 289 330 290 331 //----------------------------------------------------------------------- 332 #if defined(USE_AWARE_STEALING) 333 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 334 processor * const proc = kernelTLS().this_processor; 335 const bool external = (!proc) || (cltr != proc->cltr); 336 const bool remote = hint == UNPARK_REMOTE; 337 338 unsigned i; 339 if( external || remote ) { 340 // Figure out where thread was last time and make sure it's valid 341 /* paranoid */ verify(thrd->preferred >= 0); 342 if(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count) { 343 /* paranoid */ verify(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count); 344 unsigned start = thrd->preferred * READYQ_SHARD_FACTOR; 345 do { 346 unsigned r = __tls_rand(); 347 i = start + (r % READYQ_SHARD_FACTOR); 348 /* paranoid */ verify( i < lanes.count ); 349 // If we can't lock it retry 350 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 351 } else { 352 do { 353 i = __tls_rand() % lanes.count; 354 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 355 } 356 } else { 357 do { 358 unsigned r = proc->rdq.its++; 359 i = proc->rdq.id + (r % READYQ_SHARD_FACTOR); 360 /* paranoid */ verify( i < lanes.count ); 361 // If we can't lock it retry 362 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 363 } 364 365 // Actually push it 366 push(lanes.data[i], thrd); 367 368 // Unlock and return 369 __atomic_unlock( &lanes.data[i].lock ); 370 371 #if !defined(__CFA_NO_STATISTICS__) 372 if(unlikely(external || remote)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED); 373 else __tls_stats()->ready.push.local.success++; 374 #endif 375 } 376 377 static inline unsigned long long calc_cutoff(const unsigned long long ctsc, const processor * proc, __ready_queue_t & rdq) { 378 unsigned start = proc->rdq.id; 379 unsigned long long max = 0; 380 for(i; READYQ_SHARD_FACTOR) { 381 unsigned long long ptsc = ts(rdq.lanes.data[start + i]); 382 if(ptsc != -1ull) { 383 /* paranoid */ verify( start + i < rdq.lanes.count ); 384 unsigned long long tsc = moving_average(ctsc, ptsc, rdq.lanes.tscs[start + i].ma); 385 if(tsc > max) max = tsc; 386 } 387 } 388 return (max + 2 * max) / 2; 389 } 390 391 __attribute__((hot)) struct thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 392 /* paranoid */ verify( lanes.count > 0 ); 393 /* paranoid */ verify( kernelTLS().this_processor ); 394 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count ); 395 396 processor * const proc = kernelTLS().this_processor; 397 unsigned this = proc->rdq.id; 398 /* paranoid */ verify( this < lanes.count ); 399 __cfadbg_print_safe(ready_queue, "Kernel : pop from %u\n", this); 400 401 // Figure out the current cpu and make sure it is valid 402 const int cpu = __kernel_getcpu(); 403 /* paranoid */ verify(cpu >= 0); 404 /* paranoid */ verify(cpu < cpu_info.hthrd_count); 405 unsigned this_cache = cpu_info.llc_map[cpu].cache; 406 __atomic_store_n(&lanes.caches[this / READYQ_SHARD_FACTOR].id, this_cache, __ATOMIC_RELAXED); 407 408 const unsigned long long ctsc = rdtscl(); 409 410 if(proc->rdq.target == MAX) { 411 uint64_t chaos = __tls_rand(); 412 unsigned ext = chaos & 0xff; 413 unsigned other = (chaos >> 8) % (lanes.count); 414 415 if(ext < 3 || __atomic_load_n(&lanes.caches[other / READYQ_SHARD_FACTOR].id, __ATOMIC_RELAXED) == this_cache) { 416 proc->rdq.target = other; 417 } 418 } 419 else { 420 const unsigned target = proc->rdq.target; 421 __cfadbg_print_safe(ready_queue, "Kernel : %u considering helping %u, tcsc %llu\n", this, target, lanes.tscs[target].tv); 422 /* paranoid */ verify( lanes.tscs[target].tv != MAX ); 423 if(target < lanes.count) { 424 const unsigned long long cutoff = calc_cutoff(ctsc, proc, cltr->ready_queue); 425 const unsigned long long age = moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma); 426 __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no"); 427 if(age > cutoff) { 428 thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 429 if(t) return t; 430 } 431 } 432 proc->rdq.target = MAX; 433 } 434 435 for(READYQ_SHARD_FACTOR) { 436 unsigned i = this + (proc->rdq.itr++ % READYQ_SHARD_FACTOR); 437 if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t; 438 } 439 440 // All lanes where empty return 0p 441 return 0p; 442 443 } 444 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) { 445 unsigned i = __tls_rand() % lanes.count; 446 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal)); 447 } 448 __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) { 449 return search(cltr); 450 } 451 #endif 291 452 #if defined(USE_CPU_WORK_STEALING) 292 453 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { … … 345 506 } 346 507 508 static inline int pop_getcpu(processor * proc, __ready_queue_caches_t * caches) { 509 const int prv = proc->rdq.cpu; 510 const int cpu = __kernel_getcpu(); 511 if( prv != proc->rdq.cpu ) { 512 unsigned pidx = cpu_info.llc_map[prv].cache; 513 /* paranoid */ verify(pidx < cpu_info.llc_count); 514 515 unsigned nidx = cpu_info.llc_map[cpu].cache; 516 /* paranoid */ verify(pidx < cpu_info.llc_count); 517 518 depart(caches[pidx]); 519 arrive(caches[nidx]); 520 521 __STATS( /* cpu migs++ */ ) 522 } 523 return proc->rdq.cpu = cpu; 524 } 525 347 526 // Pop from the ready queue from a given cluster 348 527 __attribute__((hot)) thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { … … 350 529 /* paranoid */ verify( kernelTLS().this_processor ); 351 530 352 const int cpu = __kernel_getcpu(); 531 processor * const proc = kernelTLS().this_processor; 532 const int cpu = pop_getcpu( proc, caches ); 533 // const int cpu = __kernel_getcpu(); 353 534 /* paranoid */ verify(cpu >= 0); 354 535 /* paranoid */ verify(cpu < cpu_info.hthrd_count); … … 360 541 /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %zu lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR); 361 542 362 processor * const proc = kernelTLS().this_processor;363 543 const int start = map.self * READYQ_SHARD_FACTOR; 364 544 const unsigned long long ctsc = rdtscl(); 365 545 366 546 // Did we already have a help target 367 if(proc->rdq.target == -1u) {547 if(proc->rdq.target == MAX) { 368 548 unsigned long long max = 0; 369 549 for(i; READYQ_SHARD_FACTOR) { … … 371 551 if(tsc > max) max = tsc; 372 552 } 373 proc->rdq.cutoff = (max + 2 * max) / 2;553 // proc->rdq.cutoff = (max + 2 * max) / 2; 374 554 /* paranoid */ verify(lanes.count < 65536); // The following code assumes max 65536 cores. 375 555 /* paranoid */ verify(map.count < 65536); // The following code assumes max 65536 cores. … … 384 564 } 385 565 386 /* paranoid */ verify(proc->rdq.target != -1u);566 /* paranoid */ verify(proc->rdq.target != MAX); 387 567 } 388 568 else { … … 395 575 { 396 576 unsigned target = proc->rdq.target; 397 proc->rdq.target = -1u;577 proc->rdq.target = MAX; 398 578 lanes.help[target / READYQ_SHARD_FACTOR].tri++; 399 579 if(moving_average(ctsc - lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) { 580 __STATS( __tls_stats()->ready.pop.helped[target]++; ) 400 581 thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 401 582 proc->rdq.last = target; 402 583 if(t) return t; 403 else proc->rdq.target = -1u;404 584 } 405 else proc->rdq.target = -1u;585 proc->rdq.target = MAX; 406 586 } 407 587 408 588 unsigned last = proc->rdq.last; 409 if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) < cutoff) { 589 if(last != MAX && moving_average(ctsc - lanes.tscs[last].tv, lanes.tscs[last].ma) > cutoff) { 590 __STATS( __tls_stats()->ready.pop.helped[last]++; ) 410 591 thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help)); 411 592 if(t) return t; 412 593 } 413 594 else { 414 proc->rdq.last = -1u;595 proc->rdq.last = MAX; 415 596 } 416 597 } … … 428 609 processor * const proc = kernelTLS().this_processor; 429 610 unsigned last = proc->rdq.last; 430 if(last != -1u) {611 if(last != MAX) { 431 612 struct thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.steal)); 432 613 if(t) return t; 433 proc->rdq.last = -1u;614 proc->rdq.last = MAX; 434 615 } 435 616 … … 560 741 #else 561 742 unsigned preferred = thrd->preferred; 562 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == -1u|| thrd->curr_cluster != cltr;743 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == MAX || thrd->curr_cluster != cltr; 563 744 /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count ); 564 745 … … 612 793 processor * proc = kernelTLS().this_processor; 613 794 614 if(proc->rdq.target == -1u) {795 if(proc->rdq.target == MAX) { 615 796 unsigned long long min = ts(lanes.data[proc->rdq.id]); 616 797 for(int i = 0; i < READYQ_SHARD_FACTOR; i++) { … … 623 804 else { 624 805 unsigned target = proc->rdq.target; 625 proc->rdq.target = -1u;806 proc->rdq.target = MAX; 626 807 const unsigned long long bias = 0; //2_500_000_000; 627 808 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff; … … 658 839 // try to pop from a lane given by index w 659 840 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) { 841 /* paranoid */ verify( w < lanes.count ); 660 842 __STATS( stats.attempt++; ) 661 843 … … 681 863 // Actually pop the list 682 864 struct thread$ * thrd; 683 #if defined(USE_ WORK_STEALING) || defined(USE_CPU_WORK_STEALING)865 #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING) 684 866 unsigned long long tsc_before = ts(lane); 685 867 #endif … … 697 879 __STATS( stats.success++; ) 698 880 699 #if defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING) 700 unsigned long long now = rdtscl(); 701 lanes.tscs[w].tv = tsv; 702 lanes.tscs[w].ma = moving_average(now > tsc_before ? now - tsc_before : 0, lanes.tscs[w].ma); 881 #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING) 882 if (tsv != MAX) { 883 unsigned long long now = rdtscl(); 884 unsigned long long pma = __atomic_load_n(&lanes.tscs[w].ma, __ATOMIC_RELAXED); 885 __atomic_store_n(&lanes.tscs[w].tv, tsv, __ATOMIC_RELAXED); 886 __atomic_store_n(&lanes.tscs[w].ma, moving_average(now, tsc_before, pma), __ATOMIC_RELAXED); 887 } 703 888 #endif 704 889 705 #if defined(USE_ CPU_WORK_STEALING)890 #if defined(USE_AWARE_STEALING) || defined(USE_CPU_WORK_STEALING) 706 891 thrd->preferred = w / READYQ_SHARD_FACTOR; 707 892 #else … … 802 987 /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count); 803 988 it->rdq.id = value; 804 it->rdq.target = -1u;989 it->rdq.target = MAX; 805 990 value += READYQ_SHARD_FACTOR; 806 991 it = &(*it)`next; … … 815 1000 816 1001 static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) { 817 #if defined(USE_ WORK_STEALING)1002 #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) 818 1003 lanes.tscs = alloc(lanes.count, lanes.tscs`realloc); 819 1004 for(i; lanes.count) { 820 unsigned long long tsc1 = ts(lanes.data[i]); 821 unsigned long long tsc2 = rdtscl(); 822 lanes.tscs[i].tv = min(tsc1, tsc2); 1005 lanes.tscs[i].tv = rdtscl(); 1006 lanes.tscs[i].ma = 0; 823 1007 } 824 1008 #endif … … 866 1050 // Update original 867 1051 lanes.count = ncount; 1052 1053 lanes.caches = alloc( target, lanes.caches`realloc ); 868 1054 } 869 1055 … … 942 1128 fix(lanes.data[idx]); 943 1129 } 1130 1131 lanes.caches = alloc( target, lanes.caches`realloc ); 944 1132 } 945 1133 946 1134 fix_times(cltr); 1135 947 1136 948 1137 reassign_cltr_id(cltr);
Note: See TracChangeset
for help on using the changeset viewer.