- File:
-
- 1 edited
-
libcfa/src/concurrency/ready_queue.cfa (modified) (5 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/ready_queue.cfa
r07b4970 r43784ac 24 24 25 25 #include "bits/defs.hfa" 26 #include "device/cpu.hfa"27 26 #include "kernel_private.hfa" 28 27 … … 48 47 #endif 49 48 50 #if defined(USE_CPU_WORK_STEALING) 51 #define READYQ_SHARD_FACTOR 2 52 #elif defined(USE_RELAXED_FIFO) 49 #if defined(USE_RELAXED_FIFO) 53 50 #define BIAS 4 54 51 #define READYQ_SHARD_FACTOR 4 … … 218 215 //======================================================================= 219 216 void ?{}(__ready_queue_t & this) with (this) { 220 #if defined(USE_CPU_WORK_STEALING) 221 lanes.count = cpu_info.hthrd_count * READYQ_SHARD_FACTOR; 222 lanes.data = alloc( lanes.count ); 223 lanes.tscs = alloc( lanes.count ); 224 225 for( idx; (size_t)lanes.count ) { 226 (lanes.data[idx]){}; 227 lanes.tscs[idx].tv = rdtscl(); 228 } 229 #else 230 lanes.data = 0p; 231 lanes.tscs = 0p; 232 lanes.count = 0; 233 #endif 217 lanes.data = 0p; 218 lanes.tscs = 0p; 219 lanes.count = 0; 234 220 } 235 221 236 222 void ^?{}(__ready_queue_t & this) with (this) { 237 #if !defined(USE_CPU_WORK_STEALING) 238 verify( SEQUENTIAL_SHARD == lanes.count ); 239 #endif 240 223 verify( SEQUENTIAL_SHARD == lanes.count ); 241 224 free(lanes.data); 242 225 free(lanes.tscs); … … 244 227 245 228 //----------------------------------------------------------------------- 246 #if defined(USE_CPU_WORK_STEALING)247 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {248 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);249 250 processor * const proc = kernelTLS().this_processor;251 const bool external = !push_local || (!proc) || (cltr != proc->cltr);252 253 const int cpu = __kernel_getcpu();254 /* paranoid */ verify(cpu >= 0);255 /* paranoid */ verify(cpu < cpu_info.hthrd_count);256 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);257 258 const cpu_map_entry_t & map = cpu_info.llc_map[cpu];259 /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count);260 /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count);261 /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %u lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR);262 263 const int start = map.self * READYQ_SHARD_FACTOR;264 unsigned i;265 do {266 unsigned r;267 if(unlikely(external)) { r = __tls_rand(); }268 else { r = proc->rdq.its++; }269 i = start + (r % READYQ_SHARD_FACTOR);270 // If we can't lock it retry271 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );272 273 // Actually push it274 push(lanes.data[i], thrd);275 276 // Unlock and return277 __atomic_unlock( &lanes.data[i].lock );278 279 #if !defined(__CFA_NO_STATISTICS__)280 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);281 else __tls_stats()->ready.push.local.success++;282 #endif283 284 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);285 286 }287 288 // Pop from the ready queue from a given cluster289 __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {290 /* paranoid */ verify( lanes.count > 0 );291 /* paranoid */ verify( kernelTLS().this_processor );292 293 const int cpu = __kernel_getcpu();294 /* paranoid */ verify(cpu >= 0);295 /* paranoid */ verify(cpu < cpu_info.hthrd_count);296 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);297 298 const cpu_map_entry_t & map = cpu_info.llc_map[cpu];299 /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count);300 /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count);301 /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %u lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR);302 303 processor * const proc = kernelTLS().this_processor;304 const int start = map.self * READYQ_SHARD_FACTOR;305 306 // Did we already have a help target307 if(proc->rdq.target == -1u) {308 // if We don't have a309 unsigned long long min = ts(lanes.data[start]);310 for(i; READYQ_SHARD_FACTOR) {311 unsigned long long tsc = ts(lanes.data[start + i]);312 if(tsc < min) min = tsc;313 }314 proc->rdq.cutoff = min;315 proc->rdq.target = (map.start * READYQ_SHARD_FACTOR) + (__tls_rand() % (map.count* READYQ_SHARD_FACTOR));316 }317 else {318 const unsigned long long bias = 0; //2_500_000_000;319 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;320 {321 unsigned target = proc->rdq.target;322 proc->rdq.target = -1u;323 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) {324 $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));325 proc->rdq.last = target;326 if(t) return t;327 }328 }329 330 unsigned last = proc->rdq.last;331 if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) < cutoff) {332 $thread * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help));333 if(t) return t;334 }335 else {336 proc->rdq.last = -1u;337 }338 }339 340 for(READYQ_SHARD_FACTOR) {341 unsigned i = start + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);342 if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;343 }344 345 // All lanes where empty return 0p346 return 0p;347 }348 349 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {350 processor * const proc = kernelTLS().this_processor;351 unsigned last = proc->rdq.last;352 353 unsigned i = __tls_rand() % lanes.count;354 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));355 }356 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) {357 return search(cltr);358 }359 #endif360 229 #if defined(USE_RELAXED_FIFO) 361 230 //----------------------------------------------------------------------- … … 711 580 } 712 581 713 #if defined(USE_CPU_WORK_STEALING) 714 // ready_queue size is fixed in this case 715 void ready_queue_grow(struct cluster * cltr) {} 716 void ready_queue_shrink(struct cluster * cltr) {} 717 #else 718 // Grow the ready queue 719 void ready_queue_grow(struct cluster * cltr) { 720 size_t ncount; 721 int target = cltr->procs.total; 722 723 /* paranoid */ verify( ready_mutate_islocked() ); 724 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n"); 725 726 // Make sure that everything is consistent 727 /* paranoid */ check( cltr->ready_queue ); 728 729 // grow the ready queue 730 with( cltr->ready_queue ) { 731 // Find new count 732 // Make sure we always have atleast 1 list 733 if(target >= 2) { 734 ncount = target * READYQ_SHARD_FACTOR; 735 } else { 736 ncount = SEQUENTIAL_SHARD; 582 // Grow the ready queue 583 void ready_queue_grow(struct cluster * cltr) { 584 size_t ncount; 585 int target = cltr->procs.total; 586 587 /* paranoid */ verify( ready_mutate_islocked() ); 588 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n"); 589 590 // Make sure that everything is consistent 591 /* paranoid */ check( cltr->ready_queue ); 592 593 // grow the ready queue 594 with( cltr->ready_queue ) { 595 // Find new count 596 // Make sure we always have atleast 1 list 597 if(target >= 2) { 598 ncount = target * READYQ_SHARD_FACTOR; 599 } else { 600 ncount = SEQUENTIAL_SHARD; 601 } 602 603 // Allocate new array (uses realloc and memcpies the data) 604 lanes.data = alloc( ncount, lanes.data`realloc ); 605 606 // Fix the moved data 607 for( idx; (size_t)lanes.count ) { 608 fix(lanes.data[idx]); 609 } 610 611 // Construct new data 612 for( idx; (size_t)lanes.count ~ ncount) { 613 (lanes.data[idx]){}; 614 } 615 616 // Update original 617 lanes.count = ncount; 618 } 619 620 fix_times(cltr); 621 622 reassign_cltr_id(cltr); 623 624 // Make sure that everything is consistent 625 /* paranoid */ check( cltr->ready_queue ); 626 627 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n"); 628 629 /* paranoid */ verify( ready_mutate_islocked() ); 630 } 631 632 // Shrink the ready queue 633 void ready_queue_shrink(struct cluster * cltr) { 634 /* paranoid */ verify( ready_mutate_islocked() ); 635 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n"); 636 637 // Make sure that everything is consistent 638 /* paranoid */ check( cltr->ready_queue ); 639 640 int target = cltr->procs.total; 641 642 with( cltr->ready_queue ) { 643 // Remember old count 644 size_t ocount = lanes.count; 645 646 // Find new count 647 // Make sure we always have atleast 1 list 648 lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD; 649 /* paranoid */ verify( ocount >= lanes.count ); 650 /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 ); 651 652 // for printing count the number of displaced threads 653 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) 654 __attribute__((unused)) size_t displaced = 0; 655 #endif 656 657 // redistribute old data 658 for( idx; (size_t)lanes.count ~ ocount) { 659 // Lock is not strictly needed but makes checking invariants much easier 660 __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock); 661 verify(locked); 662 663 // As long as we can pop from this lane to push the threads somewhere else in the queue 664 while(!is_empty(lanes.data[idx])) { 665 struct $thread * thrd; 666 unsigned long long _; 667 [thrd, _] = pop(lanes.data[idx]); 668 669 push(cltr, thrd, true); 670 671 // for printing count the number of displaced threads 672 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) 673 displaced++; 674 #endif 737 675 } 738 676 739 // Allocate new array (uses realloc and memcpies the data) 740 lanes.data = alloc( ncount, lanes.data`realloc ); 741 742 // Fix the moved data 743 for( idx; (size_t)lanes.count ) { 744 fix(lanes.data[idx]); 745 } 746 747 // Construct new data 748 for( idx; (size_t)lanes.count ~ ncount) { 749 (lanes.data[idx]){}; 750 } 751 752 // Update original 753 lanes.count = ncount; 754 } 755 756 fix_times(cltr); 757 758 reassign_cltr_id(cltr); 759 760 // Make sure that everything is consistent 761 /* paranoid */ check( cltr->ready_queue ); 762 763 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n"); 764 765 /* paranoid */ verify( ready_mutate_islocked() ); 766 } 767 768 // Shrink the ready queue 769 void ready_queue_shrink(struct cluster * cltr) { 770 /* paranoid */ verify( ready_mutate_islocked() ); 771 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n"); 772 773 // Make sure that everything is consistent 774 /* paranoid */ check( cltr->ready_queue ); 775 776 int target = cltr->procs.total; 777 778 with( cltr->ready_queue ) { 779 // Remember old count 780 size_t ocount = lanes.count; 781 782 // Find new count 783 // Make sure we always have atleast 1 list 784 lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD; 785 /* paranoid */ verify( ocount >= lanes.count ); 786 /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 ); 787 788 // for printing count the number of displaced threads 789 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) 790 __attribute__((unused)) size_t displaced = 0; 791 #endif 792 793 // redistribute old data 794 for( idx; (size_t)lanes.count ~ ocount) { 795 // Lock is not strictly needed but makes checking invariants much easier 796 __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock); 797 verify(locked); 798 799 // As long as we can pop from this lane to push the threads somewhere else in the queue 800 while(!is_empty(lanes.data[idx])) { 801 struct $thread * thrd; 802 unsigned long long _; 803 [thrd, _] = pop(lanes.data[idx]); 804 805 push(cltr, thrd, true); 806 807 // for printing count the number of displaced threads 808 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) 809 displaced++; 810 #endif 811 } 812 813 // Unlock the lane 814 __atomic_unlock(&lanes.data[idx].lock); 815 816 // TODO print the queue statistics here 817 818 ^(lanes.data[idx]){}; 819 } 820 821 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced); 822 823 // Allocate new array (uses realloc and memcpies the data) 824 lanes.data = alloc( lanes.count, lanes.data`realloc ); 825 826 // Fix the moved data 827 for( idx; (size_t)lanes.count ) { 828 fix(lanes.data[idx]); 829 } 830 } 831 832 fix_times(cltr); 833 834 reassign_cltr_id(cltr); 835 836 // Make sure that everything is consistent 837 /* paranoid */ check( cltr->ready_queue ); 838 839 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n"); 840 /* paranoid */ verify( ready_mutate_islocked() ); 841 } 842 #endif 677 // Unlock the lane 678 __atomic_unlock(&lanes.data[idx].lock); 679 680 // TODO print the queue statistics here 681 682 ^(lanes.data[idx]){}; 683 } 684 685 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced); 686 687 // Allocate new array (uses realloc and memcpies the data) 688 lanes.data = alloc( lanes.count, lanes.data`realloc ); 689 690 // Fix the moved data 691 for( idx; (size_t)lanes.count ) { 692 fix(lanes.data[idx]); 693 } 694 } 695 696 fix_times(cltr); 697 698 reassign_cltr_id(cltr); 699 700 // Make sure that everything is consistent 701 /* paranoid */ check( cltr->ready_queue ); 702 703 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n"); 704 /* paranoid */ verify( ready_mutate_islocked() ); 705 } 843 706 844 707 #if !defined(__CFA_NO_STATISTICS__)
Note:
See TracChangeset
for help on using the changeset viewer.