Changeset 3c4bf05
- Timestamp:
- Mar 11, 2022, 12:36:30 PM (3 years ago)
- Branches:
- ADT, ast-experimental, enum, master, pthread-emulation, qualifiedEnum
- Children:
- c42b8a1
- Parents:
- 630c4bb
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/ready_queue.cfa
r630c4bb r3c4bf05 20 20 21 21 22 // #define USE_RELAXED_FIFO23 // #define USE_WORK_STEALING24 // #define USE_CPU_WORK_STEALING25 22 #define USE_AWARE_STEALING 26 23 … … 56 53 #endif 57 54 58 #if defined(USE_AWARE_STEALING) 59 #define READYQ_SHARD_FACTOR 2 60 #define SEQUENTIAL_SHARD 2 61 #elif defined(USE_CPU_WORK_STEALING) 62 #define READYQ_SHARD_FACTOR 2 63 #elif defined(USE_RELAXED_FIFO) 64 #define BIAS 4 65 #define READYQ_SHARD_FACTOR 4 66 #define SEQUENTIAL_SHARD 1 67 #elif defined(USE_WORK_STEALING) 68 #define READYQ_SHARD_FACTOR 2 69 #define SEQUENTIAL_SHARD 2 70 #else 71 #error no scheduling strategy selected 72 #endif 55 #define READYQ_SHARD_FACTOR 2 56 #define SEQUENTIAL_SHARD 2 73 57 74 58 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)); 75 59 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)); 76 60 static inline struct thread$ * search(struct cluster * cltr); 77 static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred);78 61 79 62 … … 248 231 249 232 //======================================================================= 250 // caches handling251 252 struct __attribute__((aligned(128))) __ready_queue_caches_t {253 // Count States:254 // - 0 : No one is looking after this cache255 // - 1 : No one is looking after this cache, BUT it's not empty256 // - 2+ : At least one processor is looking after this cache257 volatile unsigned count;258 };259 260 void ?{}(__ready_queue_caches_t & this) { this.count = 0; }261 void ^?{}(__ready_queue_caches_t & this) {}262 263 static inline void depart(__ready_queue_caches_t & cache) {264 /* paranoid */ verify( cache.count > 1);265 __atomic_fetch_add(&cache.count, -1, __ATOMIC_SEQ_CST);266 /* paranoid */ verify( cache.count != 0);267 /* paranoid */ verify( cache.count < 65536 ); // This verify assumes no cluster will have more than 65000 kernel threads mapped to a single cache, which could be correct but is super weird.268 }269 270 static inline void arrive(__ready_queue_caches_t & cache) {271 // for() {272 // unsigned expected = cache.count;273 // unsigned desired = 0 == expected ? 2 : expected + 1;274 // }275 }276 277 //=======================================================================278 233 // Cforall Ready Queue used for scheduling 279 234 //======================================================================= … … 292 247 293 248 void ?{}(__ready_queue_t & this) with (this) { 294 #if defined(USE_CPU_WORK_STEALING) 295 lanes.count = cpu_info.hthrd_count * READYQ_SHARD_FACTOR; 296 lanes.data = alloc( lanes.count ); 297 lanes.tscs = alloc( lanes.count ); 298 lanes.help = alloc( cpu_info.hthrd_count ); 299 300 for( idx; (size_t)lanes.count ) { 301 (lanes.data[idx]){}; 302 lanes.tscs[idx].tv = rdtscl(); 303 lanes.tscs[idx].ma = rdtscl(); 304 } 305 for( idx; (size_t)cpu_info.hthrd_count ) { 306 lanes.help[idx].src = 0; 307 lanes.help[idx].dst = 0; 308 lanes.help[idx].tri = 0; 309 } 310 #else 311 lanes.data = 0p; 312 lanes.tscs = 0p; 313 lanes.caches = 0p; 314 lanes.help = 0p; 315 lanes.count = 0; 316 #endif 249 lanes.data = 0p; 250 lanes.tscs = 0p; 251 lanes.caches = 0p; 252 lanes.help = 0p; 253 lanes.count = 0; 317 254 } 318 255 319 256 void ^?{}(__ready_queue_t & this) with (this) { 320 #if !defined(USE_CPU_WORK_STEALING)321 verify( SEQUENTIAL_SHARD == lanes.count );322 #endif323 324 257 free(lanes.data); 325 258 free(lanes.tscs); … … 329 262 330 263 //----------------------------------------------------------------------- 331 #if defined(USE_AWARE_STEALING) 332 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 333 processor * const proc = kernelTLS().this_processor; 334 const bool external = (!proc) || (cltr != proc->cltr); 335 const bool remote = hint == UNPARK_REMOTE; 336 337 unsigned i; 338 if( external || remote ) { 339 // Figure out where thread was last time and make sure it's valid 340 /* paranoid */ verify(thrd->preferred >= 0); 341 if(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count) { 342 /* paranoid */ verify(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count); 343 unsigned start = thrd->preferred * READYQ_SHARD_FACTOR; 344 do { 345 unsigned r = __tls_rand(); 346 i = start + (r % READYQ_SHARD_FACTOR); 347 /* paranoid */ verify( i < lanes.count ); 348 // If we can't lock it retry 349 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 350 } else { 351 do { 352 i = __tls_rand() % lanes.count; 353 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 354 } 355 } else { 264 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 265 processor * const proc = kernelTLS().this_processor; 266 const bool external = (!proc) || (cltr != proc->cltr); 267 const bool remote = hint == UNPARK_REMOTE; 268 269 unsigned i; 270 if( external || remote ) { 271 // Figure out where thread was last time and make sure it's valid 272 /* paranoid */ verify(thrd->preferred >= 0); 273 if(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count) { 274 /* paranoid */ verify(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count); 275 unsigned start = thrd->preferred * READYQ_SHARD_FACTOR; 356 276 do { 357 unsigned r = proc->rdq.its++;358 i = proc->rdq.id+ (r % READYQ_SHARD_FACTOR);277 unsigned r = __tls_rand(); 278 i = start + (r % READYQ_SHARD_FACTOR); 359 279 /* paranoid */ verify( i < lanes.count ); 360 280 // If we can't lock it retry 361 281 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 362 } 363 364 // Actually push it 365 push(lanes.data[i], thrd); 366 367 // Unlock and return 368 __atomic_unlock( &lanes.data[i].lock ); 369 370 #if !defined(__CFA_NO_STATISTICS__) 371 if(unlikely(external || remote)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED); 372 else __tls_stats()->ready.push.local.success++; 373 #endif 374 } 375 376 static inline unsigned long long calc_cutoff(const unsigned long long ctsc, const processor * proc, __ready_queue_t & rdq) { 377 unsigned start = proc->rdq.id; 378 unsigned long long max = 0; 379 for(i; READYQ_SHARD_FACTOR) { 380 unsigned long long ptsc = ts(rdq.lanes.data[start + i]); 381 if(ptsc != -1ull) { 382 /* paranoid */ verify( start + i < rdq.lanes.count ); 383 unsigned long long tsc = moving_average(ctsc, ptsc, rdq.lanes.tscs[start + i].ma); 384 if(tsc > max) max = tsc; 385 } 386 } 387 return (max + 2 * max) / 2; 388 } 389 390 __attribute__((hot)) struct thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 391 /* paranoid */ verify( lanes.count > 0 ); 392 /* paranoid */ verify( kernelTLS().this_processor ); 393 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count ); 394 395 processor * const proc = kernelTLS().this_processor; 396 unsigned this = proc->rdq.id; 397 /* paranoid */ verify( this < lanes.count ); 398 __cfadbg_print_safe(ready_queue, "Kernel : pop from %u\n", this); 399 400 // Figure out the current cpu and make sure it is valid 401 const int cpu = __kernel_getcpu(); 402 /* paranoid */ verify(cpu >= 0); 403 /* paranoid */ verify(cpu < cpu_info.hthrd_count); 404 unsigned this_cache = cpu_info.llc_map[cpu].cache; 405 406 // Super important: don't write the same value over and over again 407 // We want to maximise our chances that his particular values stays in cache 408 if(lanes.caches[this / READYQ_SHARD_FACTOR].id != this_cache) 409 __atomic_store_n(&lanes.caches[this / READYQ_SHARD_FACTOR].id, this_cache, __ATOMIC_RELAXED); 410 411 const unsigned long long ctsc = rdtscl(); 412 413 if(proc->rdq.target == MAX) { 414 uint64_t chaos = __tls_rand(); 415 unsigned ext = chaos & 0xff; 416 unsigned other = (chaos >> 8) % (lanes.count); 417 418 if(ext < 3 || __atomic_load_n(&lanes.caches[other / READYQ_SHARD_FACTOR].id, __ATOMIC_RELAXED) == this_cache) { 419 proc->rdq.target = other; 420 } 421 } 422 else { 423 const unsigned target = proc->rdq.target; 424 __cfadbg_print_safe(ready_queue, "Kernel : %u considering helping %u, tcsc %llu\n", this, target, lanes.tscs[target].tv); 425 /* paranoid */ verify( lanes.tscs[target].tv != MAX ); 426 if(target < lanes.count) { 427 const unsigned long long cutoff = calc_cutoff(ctsc, proc, cltr->ready_queue); 428 const unsigned long long age = moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma); 429 __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no"); 430 if(age > cutoff) { 431 thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 432 if(t) return t; 433 } 434 } 435 proc->rdq.target = MAX; 436 } 437 438 for(READYQ_SHARD_FACTOR) { 439 unsigned i = this + (proc->rdq.itr++ % READYQ_SHARD_FACTOR); 440 if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t; 441 } 442 443 // All lanes where empty return 0p 444 return 0p; 445 446 } 447 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) { 448 unsigned i = __tls_rand() % lanes.count; 449 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal)); 450 } 451 __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) { 452 return search(cltr); 453 } 454 #endif 455 #if defined(USE_CPU_WORK_STEALING) 456 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 457 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 458 459 processor * const proc = kernelTLS().this_processor; 460 const bool external = (!proc) || (cltr != proc->cltr); 461 462 // Figure out the current cpu and make sure it is valid 463 const int cpu = __kernel_getcpu(); 464 /* paranoid */ verify(cpu >= 0); 465 /* paranoid */ verify(cpu < cpu_info.hthrd_count); 466 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count); 467 468 // Figure out where thread was last time and make sure it's 469 /* paranoid */ verify(thrd->preferred >= 0); 470 /* paranoid */ verify(thrd->preferred < cpu_info.hthrd_count); 471 /* paranoid */ verify(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count); 472 const int prf = thrd->preferred * READYQ_SHARD_FACTOR; 473 474 const cpu_map_entry_t & map; 475 choose(hint) { 476 case UNPARK_LOCAL : &map = &cpu_info.llc_map[cpu]; 477 case UNPARK_REMOTE: &map = &cpu_info.llc_map[prf]; 478 } 479 /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count); 480 /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count); 481 /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %zu lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR); 482 483 const int start = map.self * READYQ_SHARD_FACTOR; 484 unsigned i; 282 } else { 283 do { 284 i = __tls_rand() % lanes.count; 285 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 286 } 287 } else { 485 288 do { 486 unsigned r; 487 if(unlikely(external)) { r = __tls_rand(); } 488 else { r = proc->rdq.its++; } 489 choose(hint) { 490 case UNPARK_LOCAL : i = start + (r % READYQ_SHARD_FACTOR); 491 case UNPARK_REMOTE: i = prf + (r % READYQ_SHARD_FACTOR); 492 } 289 unsigned r = proc->rdq.its++; 290 i = proc->rdq.id + (r % READYQ_SHARD_FACTOR); 291 /* paranoid */ verify( i < lanes.count ); 493 292 // If we can't lock it retry 494 293 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 495 496 // Actually push it 497 push(lanes.data[i], thrd); 498 499 // Unlock and return 500 __atomic_unlock( &lanes.data[i].lock ); 501 502 #if !defined(__CFA_NO_STATISTICS__) 503 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED); 504 else __tls_stats()->ready.push.local.success++; 505 #endif 506 507 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first); 508 509 } 510 511 // Pop from the ready queue from a given cluster 512 __attribute__((hot)) thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 513 /* paranoid */ verify( lanes.count > 0 ); 514 /* paranoid */ verify( kernelTLS().this_processor ); 515 516 processor * const proc = kernelTLS().this_processor; 517 const int cpu = __kernel_getcpu(); 518 /* paranoid */ verify(cpu >= 0); 519 /* paranoid */ verify(cpu < cpu_info.hthrd_count); 520 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count); 521 522 const cpu_map_entry_t & map = cpu_info.llc_map[cpu]; 523 /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count); 524 /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count); 525 /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %zu lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR); 526 527 const int start = map.self * READYQ_SHARD_FACTOR; 528 const unsigned long long ctsc = rdtscl(); 529 530 // Did we already have a help target 531 if(proc->rdq.target == MAX) { 532 unsigned long long max = 0; 533 for(i; READYQ_SHARD_FACTOR) { 534 unsigned long long tsc = moving_average(ctsc, ts(lanes.data[start + i]), lanes.tscs[start + i].ma); 535 if(tsc > max) max = tsc; 536 } 537 // proc->rdq.cutoff = (max + 2 * max) / 2; 538 /* paranoid */ verify(lanes.count < 65536); // The following code assumes max 65536 cores. 539 /* paranoid */ verify(map.count < 65536); // The following code assumes max 65536 cores. 540 541 if(0 == (__tls_rand() % 100)) { 542 proc->rdq.target = __tls_rand() % lanes.count; 543 } else { 544 unsigned cpu_chaos = map.start + (__tls_rand() % map.count); 545 proc->rdq.target = (cpu_chaos * READYQ_SHARD_FACTOR) + (__tls_rand() % READYQ_SHARD_FACTOR); 546 /* paranoid */ verify(proc->rdq.target >= (map.start * READYQ_SHARD_FACTOR)); 547 /* paranoid */ verify(proc->rdq.target < ((map.start + map.count) * READYQ_SHARD_FACTOR)); 548 } 549 550 /* paranoid */ verify(proc->rdq.target != MAX); 551 } 552 else { 553 unsigned long long max = 0; 554 for(i; READYQ_SHARD_FACTOR) { 555 unsigned long long tsc = moving_average(ctsc, ts(lanes.data[start + i]), lanes.tscs[start + i].ma); 556 if(tsc > max) max = tsc; 557 } 558 const unsigned long long cutoff = (max + 2 * max) / 2; 559 { 560 unsigned target = proc->rdq.target; 561 proc->rdq.target = MAX; 562 lanes.help[target / READYQ_SHARD_FACTOR].tri++; 563 if(moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) { 564 thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 565 proc->rdq.last = target; 566 if(t) return t; 567 } 568 proc->rdq.target = MAX; 569 } 570 571 unsigned last = proc->rdq.last; 572 if(last != MAX && moving_average(ctsc, lanes.tscs[last].tv, lanes.tscs[last].ma) > cutoff) { 573 thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help)); 574 if(t) return t; 575 } 576 else { 577 proc->rdq.last = MAX; 578 } 579 } 580 581 for(READYQ_SHARD_FACTOR) { 582 unsigned i = start + (proc->rdq.itr++ % READYQ_SHARD_FACTOR); 583 if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t; 584 } 585 586 // All lanes where empty return 0p 587 return 0p; 588 } 589 590 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) { 591 processor * const proc = kernelTLS().this_processor; 592 unsigned last = proc->rdq.last; 593 if(last != MAX) { 594 struct thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.steal)); 595 if(t) return t; 596 proc->rdq.last = MAX; 597 } 598 599 unsigned i = __tls_rand() % lanes.count; 600 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal)); 601 } 602 __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) { 603 return search(cltr); 604 } 605 #endif 606 #if defined(USE_RELAXED_FIFO) 607 //----------------------------------------------------------------------- 608 // get index from random number with or without bias towards queues 609 static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred) { 610 unsigned i; 611 bool local; 612 unsigned rlow = r % BIAS; 613 unsigned rhigh = r / BIAS; 614 if((0 != rlow) && preferred >= 0) { 615 // (BIAS - 1) out of BIAS chances 616 // Use perferred queues 617 i = preferred + (rhigh % READYQ_SHARD_FACTOR); 618 local = true; 619 } 620 else { 621 // 1 out of BIAS chances 622 // Use all queues 623 i = rhigh; 624 local = false; 625 } 626 return [i, local]; 627 } 628 629 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 630 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 631 632 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 633 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count ); 634 635 bool local; 636 int preferred = external ? -1 : kernelTLS().this_processor->rdq.id; 637 638 // Try to pick a lane and lock it 639 unsigned i; 640 do { 641 // Pick the index of a lane 642 unsigned r = __tls_rand_fwd(); 643 [i, local] = idx_from_r(r, preferred); 644 645 i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); 646 647 #if !defined(__CFA_NO_STATISTICS__) 648 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED); 649 else if(local) __tls_stats()->ready.push.local.attempt++; 650 else __tls_stats()->ready.push.share.attempt++; 651 #endif 652 653 // If we can't lock it retry 654 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 655 656 // Actually push it 657 push(lanes.data[i], thrd); 658 659 // Unlock and return 660 __atomic_unlock( &lanes.data[i].lock ); 661 662 // Mark the current index in the tls rng instance as having an item 663 __tls_rand_advance_bck(); 664 665 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first); 666 667 // Update statistics 668 #if !defined(__CFA_NO_STATISTICS__) 669 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED); 670 else if(local) __tls_stats()->ready.push.local.success++; 671 else __tls_stats()->ready.push.share.success++; 672 #endif 673 } 674 675 // Pop from the ready queue from a given cluster 676 __attribute__((hot)) thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 677 /* paranoid */ verify( lanes.count > 0 ); 678 /* paranoid */ verify( kernelTLS().this_processor ); 679 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count ); 680 681 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); 682 int preferred = kernelTLS().this_processor->rdq.id; 683 684 685 // As long as the list is not empty, try finding a lane that isn't empty and pop from it 686 for(25) { 687 // Pick two lists at random 688 unsigned ri = __tls_rand_bck(); 689 unsigned rj = __tls_rand_bck(); 690 691 unsigned i, j; 692 __attribute__((unused)) bool locali, localj; 693 [i, locali] = idx_from_r(ri, preferred); 694 [j, localj] = idx_from_r(rj, preferred); 695 696 i %= count; 697 j %= count; 698 699 // try popping from the 2 picked lists 700 struct thread$ * thrd = try_pop(cltr, i, j __STATS(, *(locali || localj ? &__tls_stats()->ready.pop.local : &__tls_stats()->ready.pop.help))); 701 if(thrd) { 702 return thrd; 703 } 704 } 705 706 // All lanes where empty return 0p 707 return 0p; 708 } 709 710 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) { return pop_fast(cltr); } 711 __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) { 712 return search(cltr); 713 } 714 #endif 715 #if defined(USE_WORK_STEALING) 716 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 717 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 718 719 // #define USE_PREFERRED 720 #if !defined(USE_PREFERRED) 721 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 722 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count ); 723 #else 724 unsigned preferred = thrd->preferred; 725 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == MAX || thrd->curr_cluster != cltr; 726 /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count ); 727 728 unsigned r = preferred % READYQ_SHARD_FACTOR; 729 const unsigned start = preferred - r; 730 #endif 731 732 // Try to pick a lane and lock it 733 unsigned i; 734 do { 735 #if !defined(__CFA_NO_STATISTICS__) 736 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED); 737 else __tls_stats()->ready.push.local.attempt++; 738 #endif 739 740 if(unlikely(external)) { 741 i = __tls_rand() % lanes.count; 742 } 743 else { 744 #if !defined(USE_PREFERRED) 745 processor * proc = kernelTLS().this_processor; 746 unsigned r = proc->rdq.its++; 747 i = proc->rdq.id + (r % READYQ_SHARD_FACTOR); 748 #else 749 i = start + (r++ % READYQ_SHARD_FACTOR); 750 #endif 751 } 752 // If we can't lock it retry 753 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 754 755 // Actually push it 756 push(lanes.data[i], thrd); 757 758 // Unlock and return 759 __atomic_unlock( &lanes.data[i].lock ); 760 761 #if !defined(__CFA_NO_STATISTICS__) 762 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED); 763 else __tls_stats()->ready.push.local.success++; 764 #endif 765 766 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first); 767 } 768 769 // Pop from the ready queue from a given cluster 770 __attribute__((hot)) thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 771 /* paranoid */ verify( lanes.count > 0 ); 772 /* paranoid */ verify( kernelTLS().this_processor ); 773 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count ); 774 775 processor * proc = kernelTLS().this_processor; 776 777 if(proc->rdq.target == MAX) { 778 unsigned long long min = ts(lanes.data[proc->rdq.id]); 779 for(int i = 0; i < READYQ_SHARD_FACTOR; i++) { 780 unsigned long long tsc = ts(lanes.data[proc->rdq.id + i]); 781 if(tsc < min) min = tsc; 782 } 783 proc->rdq.cutoff = min; 784 proc->rdq.target = __tls_rand() % lanes.count; 785 } 786 else { 787 unsigned target = proc->rdq.target; 788 proc->rdq.target = MAX; 789 const unsigned long long bias = 0; //2_500_000_000; 790 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff; 791 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) { 294 } 295 296 // Actually push it 297 push(lanes.data[i], thrd); 298 299 // Unlock and return 300 __atomic_unlock( &lanes.data[i].lock ); 301 302 #if !defined(__CFA_NO_STATISTICS__) 303 if(unlikely(external || remote)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED); 304 else __tls_stats()->ready.push.local.success++; 305 #endif 306 } 307 308 static inline unsigned long long calc_cutoff(const unsigned long long ctsc, const processor * proc, __ready_queue_t & rdq) { 309 unsigned start = proc->rdq.id; 310 unsigned long long max = 0; 311 for(i; READYQ_SHARD_FACTOR) { 312 unsigned long long ptsc = ts(rdq.lanes.data[start + i]); 313 if(ptsc != -1ull) { 314 /* paranoid */ verify( start + i < rdq.lanes.count ); 315 unsigned long long tsc = moving_average(ctsc, ptsc, rdq.lanes.tscs[start + i].ma); 316 if(tsc > max) max = tsc; 317 } 318 } 319 return (max + 2 * max) / 2; 320 } 321 322 __attribute__((hot)) struct thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 323 /* paranoid */ verify( lanes.count > 0 ); 324 /* paranoid */ verify( kernelTLS().this_processor ); 325 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count ); 326 327 processor * const proc = kernelTLS().this_processor; 328 unsigned this = proc->rdq.id; 329 /* paranoid */ verify( this < lanes.count ); 330 __cfadbg_print_safe(ready_queue, "Kernel : pop from %u\n", this); 331 332 // Figure out the current cpu and make sure it is valid 333 const int cpu = __kernel_getcpu(); 334 /* paranoid */ verify(cpu >= 0); 335 /* paranoid */ verify(cpu < cpu_info.hthrd_count); 336 unsigned this_cache = cpu_info.llc_map[cpu].cache; 337 338 // Super important: don't write the same value over and over again 339 // We want to maximise our chances that his particular values stays in cache 340 if(lanes.caches[this / READYQ_SHARD_FACTOR].id != this_cache) 341 __atomic_store_n(&lanes.caches[this / READYQ_SHARD_FACTOR].id, this_cache, __ATOMIC_RELAXED); 342 343 const unsigned long long ctsc = rdtscl(); 344 345 if(proc->rdq.target == MAX) { 346 uint64_t chaos = __tls_rand(); 347 unsigned ext = chaos & 0xff; 348 unsigned other = (chaos >> 8) % (lanes.count); 349 350 if(ext < 3 || __atomic_load_n(&lanes.caches[other / READYQ_SHARD_FACTOR].id, __ATOMIC_RELAXED) == this_cache) { 351 proc->rdq.target = other; 352 } 353 } 354 else { 355 const unsigned target = proc->rdq.target; 356 __cfadbg_print_safe(ready_queue, "Kernel : %u considering helping %u, tcsc %llu\n", this, target, lanes.tscs[target].tv); 357 /* paranoid */ verify( lanes.tscs[target].tv != MAX ); 358 if(target < lanes.count) { 359 const unsigned long long cutoff = calc_cutoff(ctsc, proc, cltr->ready_queue); 360 const unsigned long long age = moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma); 361 __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no"); 362 if(age > cutoff) { 792 363 thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 793 364 if(t) return t; 794 365 } 795 366 } 796 797 for(READYQ_SHARD_FACTOR) { 798 unsigned i = proc->rdq.id + (proc->rdq.itr++ % READYQ_SHARD_FACTOR); 799 if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t; 800 } 801 return 0p; 802 } 803 804 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) { 805 unsigned i = __tls_rand() % lanes.count; 806 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal)); 807 } 808 809 __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) with (cltr->ready_queue) { 810 return search(cltr); 811 } 812 #endif 367 proc->rdq.target = MAX; 368 } 369 370 for(READYQ_SHARD_FACTOR) { 371 unsigned i = this + (proc->rdq.itr++ % READYQ_SHARD_FACTOR); 372 if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t; 373 } 374 375 // All lanes where empty return 0p 376 return 0p; 377 378 } 379 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) { 380 unsigned i = __tls_rand() % lanes.count; 381 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal)); 382 } 383 __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) { 384 return search(cltr); 385 } 813 386 814 387 //======================================================================= … … 845 418 // Actually pop the list 846 419 struct thread$ * thrd; 847 #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING) 848 unsigned long long tsc_before = ts(lane); 849 #endif 420 unsigned long long tsc_before = ts(lane); 850 421 unsigned long long tsv; 851 422 [thrd, tsv] = pop(lane); … … 861 432 __STATS( stats.success++; ) 862 433 863 #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING) 864 if (tsv != MAX) { 865 unsigned long long now = rdtscl(); 866 unsigned long long pma = __atomic_load_n(&lanes.tscs[w].ma, __ATOMIC_RELAXED); 867 __atomic_store_n(&lanes.tscs[w].tv, tsv, __ATOMIC_RELAXED); 868 __atomic_store_n(&lanes.tscs[w].ma, moving_average(now, tsc_before, pma), __ATOMIC_RELAXED); 869 } 870 #endif 871 872 #if defined(USE_AWARE_STEALING) || defined(USE_CPU_WORK_STEALING) 873 thrd->preferred = w / READYQ_SHARD_FACTOR; 874 #else 875 thrd->preferred = w; 876 #endif 434 if (tsv != MAX) { 435 unsigned long long now = rdtscl(); 436 unsigned long long pma = __atomic_load_n(&lanes.tscs[w].ma, __ATOMIC_RELAXED); 437 __atomic_store_n(&lanes.tscs[w].tv, tsv, __ATOMIC_RELAXED); 438 __atomic_store_n(&lanes.tscs[w].ma, moving_average(now, tsc_before, pma), __ATOMIC_RELAXED); 439 } 440 441 thrd->preferred = w / READYQ_SHARD_FACTOR; 877 442 878 443 // return the popped thread … … 902 467 // get preferred ready for new thread 903 468 unsigned ready_queue_new_preferred() { 904 unsigned pref = 0;469 unsigned pref = MAX; 905 470 if(struct thread$ * thrd = publicTLS_get( this_thread )) { 906 471 pref = thrd->preferred; 907 472 } 908 else {909 #if defined(USE_CPU_WORK_STEALING)910 pref = __kernel_getcpu();911 #endif912 }913 914 #if defined(USE_CPU_WORK_STEALING)915 /* paranoid */ verify(pref >= 0);916 /* paranoid */ verify(pref < cpu_info.hthrd_count);917 #endif918 473 919 474 return pref; … … 982 537 983 538 static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) { 984 #if defined(USE_AWARE_STEALING) || defined(USE_WORK_STEALING) 985 lanes.tscs = alloc(lanes.count, lanes.tscs`realloc); 986 for(i; lanes.count) { 987 lanes.tscs[i].tv = rdtscl(); 988 lanes.tscs[i].ma = 0; 989 } 990 #endif 991 } 992 993 #if defined(USE_CPU_WORK_STEALING) 994 // ready_queue size is fixed in this case 995 void ready_queue_grow(struct cluster * cltr) {} 996 void ready_queue_shrink(struct cluster * cltr) {} 997 #else 998 // Grow the ready queue 999 void ready_queue_grow(struct cluster * cltr) { 1000 size_t ncount; 1001 int target = cltr->procs.total; 1002 1003 /* paranoid */ verify( ready_mutate_islocked() ); 1004 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n"); 1005 1006 // Make sure that everything is consistent 1007 /* paranoid */ check( cltr->ready_queue ); 1008 1009 // grow the ready queue 1010 with( cltr->ready_queue ) { 1011 // Find new count 1012 // Make sure we always have atleast 1 list 1013 if(target >= 2) { 1014 ncount = target * READYQ_SHARD_FACTOR; 1015 } else { 1016 ncount = SEQUENTIAL_SHARD; 539 lanes.tscs = alloc(lanes.count, lanes.tscs`realloc); 540 for(i; lanes.count) { 541 lanes.tscs[i].tv = rdtscl(); 542 lanes.tscs[i].ma = 0; 543 } 544 } 545 546 // Grow the ready queue 547 void ready_queue_grow(struct cluster * cltr) { 548 size_t ncount; 549 int target = cltr->procs.total; 550 551 /* paranoid */ verify( ready_mutate_islocked() ); 552 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n"); 553 554 // Make sure that everything is consistent 555 /* paranoid */ check( cltr->ready_queue ); 556 557 // grow the ready queue 558 with( cltr->ready_queue ) { 559 // Find new count 560 // Make sure we always have atleast 1 list 561 if(target >= 2) { 562 ncount = target * READYQ_SHARD_FACTOR; 563 } else { 564 ncount = SEQUENTIAL_SHARD; 565 } 566 567 // Allocate new array (uses realloc and memcpies the data) 568 lanes.data = alloc( ncount, lanes.data`realloc ); 569 570 // Fix the moved data 571 for( idx; (size_t)lanes.count ) { 572 fix(lanes.data[idx]); 573 } 574 575 // Construct new data 576 for( idx; (size_t)lanes.count ~ ncount) { 577 (lanes.data[idx]){}; 578 } 579 580 // Update original 581 lanes.count = ncount; 582 583 lanes.caches = alloc( target, lanes.caches`realloc ); 584 } 585 586 fix_times(cltr); 587 588 reassign_cltr_id(cltr); 589 590 // Make sure that everything is consistent 591 /* paranoid */ check( cltr->ready_queue ); 592 593 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n"); 594 595 /* paranoid */ verify( ready_mutate_islocked() ); 596 } 597 598 // Shrink the ready queue 599 void ready_queue_shrink(struct cluster * cltr) { 600 /* paranoid */ verify( ready_mutate_islocked() ); 601 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n"); 602 603 // Make sure that everything is consistent 604 /* paranoid */ check( cltr->ready_queue ); 605 606 int target = cltr->procs.total; 607 608 with( cltr->ready_queue ) { 609 // Remember old count 610 size_t ocount = lanes.count; 611 612 // Find new count 613 // Make sure we always have atleast 1 list 614 lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD; 615 /* paranoid */ verify( ocount >= lanes.count ); 616 /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 ); 617 618 // for printing count the number of displaced threads 619 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) 620 __attribute__((unused)) size_t displaced = 0; 621 #endif 622 623 // redistribute old data 624 for( idx; (size_t)lanes.count ~ ocount) { 625 // Lock is not strictly needed but makes checking invariants much easier 626 __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock); 627 verify(locked); 628 629 // As long as we can pop from this lane to push the threads somewhere else in the queue 630 while(!is_empty(lanes.data[idx])) { 631 struct thread$ * thrd; 632 unsigned long long _; 633 [thrd, _] = pop(lanes.data[idx]); 634 635 push(cltr, thrd, true); 636 637 // for printing count the number of displaced threads 638 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) 639 displaced++; 640 #endif 1017 641 } 1018 642 1019 // Allocate new array (uses realloc and memcpies the data) 1020 lanes.data = alloc( ncount, lanes.data`realloc ); 1021 1022 // Fix the moved data 1023 for( idx; (size_t)lanes.count ) { 1024 fix(lanes.data[idx]); 1025 } 1026 1027 // Construct new data 1028 for( idx; (size_t)lanes.count ~ ncount) { 1029 (lanes.data[idx]){}; 1030 } 1031 1032 // Update original 1033 lanes.count = ncount; 1034 1035 lanes.caches = alloc( target, lanes.caches`realloc ); 1036 } 1037 1038 fix_times(cltr); 1039 1040 reassign_cltr_id(cltr); 1041 1042 // Make sure that everything is consistent 1043 /* paranoid */ check( cltr->ready_queue ); 1044 1045 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n"); 1046 1047 /* paranoid */ verify( ready_mutate_islocked() ); 1048 } 1049 1050 // Shrink the ready queue 1051 void ready_queue_shrink(struct cluster * cltr) { 1052 /* paranoid */ verify( ready_mutate_islocked() ); 1053 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n"); 1054 1055 // Make sure that everything is consistent 1056 /* paranoid */ check( cltr->ready_queue ); 1057 1058 int target = cltr->procs.total; 1059 1060 with( cltr->ready_queue ) { 1061 // Remember old count 1062 size_t ocount = lanes.count; 1063 1064 // Find new count 1065 // Make sure we always have atleast 1 list 1066 lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD; 1067 /* paranoid */ verify( ocount >= lanes.count ); 1068 /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 ); 1069 1070 // for printing count the number of displaced threads 1071 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) 1072 __attribute__((unused)) size_t displaced = 0; 1073 #endif 1074 1075 // redistribute old data 1076 for( idx; (size_t)lanes.count ~ ocount) { 1077 // Lock is not strictly needed but makes checking invariants much easier 1078 __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock); 1079 verify(locked); 1080 1081 // As long as we can pop from this lane to push the threads somewhere else in the queue 1082 while(!is_empty(lanes.data[idx])) { 1083 struct thread$ * thrd; 1084 unsigned long long _; 1085 [thrd, _] = pop(lanes.data[idx]); 1086 1087 push(cltr, thrd, true); 1088 1089 // for printing count the number of displaced threads 1090 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) 1091 displaced++; 1092 #endif 1093 } 1094 1095 // Unlock the lane 1096 __atomic_unlock(&lanes.data[idx].lock); 1097 1098 // TODO print the queue statistics here 1099 1100 ^(lanes.data[idx]){}; 1101 } 1102 1103 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced); 1104 1105 // Allocate new array (uses realloc and memcpies the data) 1106 lanes.data = alloc( lanes.count, lanes.data`realloc ); 1107 1108 // Fix the moved data 1109 for( idx; (size_t)lanes.count ) { 1110 fix(lanes.data[idx]); 1111 } 1112 1113 lanes.caches = alloc( target, lanes.caches`realloc ); 1114 } 1115 1116 fix_times(cltr); 1117 1118 1119 reassign_cltr_id(cltr); 1120 1121 // Make sure that everything is consistent 1122 /* paranoid */ check( cltr->ready_queue ); 1123 1124 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n"); 1125 /* paranoid */ verify( ready_mutate_islocked() ); 1126 } 1127 #endif 643 // Unlock the lane 644 __atomic_unlock(&lanes.data[idx].lock); 645 646 // TODO print the queue statistics here 647 648 ^(lanes.data[idx]){}; 649 } 650 651 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced); 652 653 // Allocate new array (uses realloc and memcpies the data) 654 lanes.data = alloc( lanes.count, lanes.data`realloc ); 655 656 // Fix the moved data 657 for( idx; (size_t)lanes.count ) { 658 fix(lanes.data[idx]); 659 } 660 661 lanes.caches = alloc( target, lanes.caches`realloc ); 662 } 663 664 fix_times(cltr); 665 666 667 reassign_cltr_id(cltr); 668 669 // Make sure that everything is consistent 670 /* paranoid */ check( cltr->ready_queue ); 671 672 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n"); 673 /* paranoid */ verify( ready_mutate_islocked() ); 674 } 1128 675 1129 676 #if !defined(__CFA_NO_STATISTICS__)
Note: See TracChangeset
for help on using the changeset viewer.