Changeset 12daa43 for libcfa/src
- Timestamp:
- Jun 17, 2021, 3:05:54 PM (3 years ago)
- Branches:
- ADT, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- cf85f96
- Parents:
- fde879b3
- Location:
- libcfa/src/concurrency
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/kernel.cfa
rfde879b3 r12daa43 280 280 281 281 // Spin a little on I/O, just in case 282 282 for(5) { 283 283 __maybe_io_drain( this ); 284 284 readyThread = pop_fast( this->cltr ); … … 287 287 288 288 // no luck, try stealing a few times 289 289 for(5) { 290 290 if( __maybe_io_drain( this ) ) { 291 291 readyThread = pop_fast( this->cltr ); -
libcfa/src/concurrency/kernel.hfa
rfde879b3 r12daa43 66 66 unsigned id; 67 67 unsigned target; 68 unsigned last; 68 69 unsigned long long int cutoff; 69 70 } rdq; -
libcfa/src/concurrency/kernel/startup.cfa
rfde879b3 r12daa43 541 541 this.rdq.id = -1u; 542 542 this.rdq.target = -1u; 543 this.rdq.last = -1u; 543 544 this.rdq.cutoff = 0ull; 544 545 do_terminate = false; -
libcfa/src/concurrency/ready_queue.cfa
rfde879b3 r12daa43 20 20 21 21 22 #define USE_RELAXED_FIFO22 // #define USE_RELAXED_FIFO 23 23 // #define USE_WORK_STEALING 24 24 25 25 #include "bits/defs.hfa" 26 #include "device/cpu.hfa" 26 27 #include "kernel_private.hfa" 27 28 … … 47 48 #endif 48 49 49 #if defined(USE_RELAXED_FIFO) 50 #if defined(USE_CPU_WORK_STEALING) 51 #define READYQ_SHARD_FACTOR 2 52 #elif defined(USE_RELAXED_FIFO) 50 53 #define BIAS 4 51 54 #define READYQ_SHARD_FACTOR 4 … … 215 218 //======================================================================= 216 219 void ?{}(__ready_queue_t & this) with (this) { 217 lanes.data = 0p; 218 lanes.tscs = 0p; 219 lanes.count = 0; 220 #if defined(USE_CPU_WORK_STEALING) 221 lanes.count = cpu_info.hthrd_count * READYQ_SHARD_FACTOR; 222 lanes.data = alloc( lanes.count ); 223 lanes.tscs = alloc( lanes.count ); 224 225 for( idx; (size_t)lanes.count ) { 226 (lanes.data[idx]){}; 227 lanes.tscs[idx].tv = rdtscl(); 228 } 229 #else 230 lanes.data = 0p; 231 lanes.tscs = 0p; 232 lanes.count = 0; 233 #endif 220 234 } 221 235 222 236 void ^?{}(__ready_queue_t & this) with (this) { 223 verify( SEQUENTIAL_SHARD == lanes.count ); 237 #if !defined(USE_CPU_WORK_STEALING) 238 verify( SEQUENTIAL_SHARD == lanes.count ); 239 #endif 240 224 241 free(lanes.data); 225 242 free(lanes.tscs); … … 227 244 228 245 //----------------------------------------------------------------------- 246 #if defined(USE_CPU_WORK_STEALING) 247 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) { 248 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 249 250 processor * const proc = kernelTLS().this_processor; 251 const bool external = !push_local || (!proc) || (cltr != proc->cltr); 252 253 const int cpu = __kernel_getcpu(); 254 /* paranoid */ verify(cpu >= 0); 255 /* paranoid */ verify(cpu < cpu_info.hthrd_count); 256 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count); 257 258 const int start = cpu * READYQ_SHARD_FACTOR; 259 unsigned i; 260 do { 261 unsigned r; 262 if(unlikely(external)) { r = __tls_rand(); } 263 else { r = proc->rdq.its++; } 264 i = start + (r % READYQ_SHARD_FACTOR); 265 // If we can't lock it retry 266 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 267 268 // Actually push it 269 push(lanes.data[i], thrd); 270 271 // Unlock and return 272 __atomic_unlock( &lanes.data[i].lock ); 273 274 #if !defined(__CFA_NO_STATISTICS__) 275 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED); 276 else __tls_stats()->ready.push.local.success++; 277 #endif 278 279 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first); 280 281 } 282 283 // Pop from the ready queue from a given cluster 284 __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 285 /* paranoid */ verify( lanes.count > 0 ); 286 /* paranoid */ verify( kernelTLS().this_processor ); 287 288 const int cpu = __kernel_getcpu(); 289 /* paranoid */ verify(cpu >= 0); 290 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count); 291 /* paranoid */ verify(cpu < cpu_info.hthrd_count); 292 293 processor * const proc = kernelTLS().this_processor; 294 const int start = cpu * READYQ_SHARD_FACTOR; 295 296 // Did we already have a help target 297 if(proc->rdq.target == -1u) { 298 // if We don't have a 299 unsigned long long min = ts(lanes.data[start]); 300 for(i; READYQ_SHARD_FACTOR) { 301 unsigned long long tsc = ts(lanes.data[start + i]); 302 if(tsc < min) min = tsc; 303 } 304 proc->rdq.cutoff = min; 305 proc->rdq.target = __tls_rand() % lanes.count; 306 } 307 else { 308 const unsigned long long bias = 0; //2_500_000_000; 309 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff; 310 { 311 unsigned target = proc->rdq.target; 312 proc->rdq.target = -1u; 313 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) { 314 $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 315 proc->rdq.last = target; 316 if(t) return t; 317 } 318 } 319 320 unsigned last = proc->rdq.last; 321 if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) < cutoff) { 322 $thread * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help)); 323 if(t) return t; 324 } 325 else { 326 proc->rdq.last = -1u; 327 } 328 } 329 330 for(READYQ_SHARD_FACTOR) { 331 unsigned i = start + (proc->rdq.itr++ % READYQ_SHARD_FACTOR); 332 if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t; 333 } 334 335 // All lanes where empty return 0p 336 return 0p; 337 } 338 339 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) { 340 processor * const proc = kernelTLS().this_processor; 341 unsigned last = proc->rdq.last; 342 343 unsigned i = __tls_rand() % lanes.count; 344 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal)); 345 } 346 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) { 347 return search(cltr); 348 } 349 #endif 229 350 #if defined(USE_RELAXED_FIFO) 230 351 //----------------------------------------------------------------------- … … 580 701 } 581 702 582 // Grow the ready queue 583 void ready_queue_grow(struct cluster * cltr) { 584 size_t ncount; 585 int target = cltr->procs.total; 586 587 /* paranoid */ verify( ready_mutate_islocked() ); 588 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n"); 589 590 // Make sure that everything is consistent 591 /* paranoid */ check( cltr->ready_queue ); 592 593 // grow the ready queue 594 with( cltr->ready_queue ) { 595 // Find new count 596 // Make sure we always have atleast 1 list 597 if(target >= 2) { 598 ncount = target * READYQ_SHARD_FACTOR; 599 } else { 600 ncount = SEQUENTIAL_SHARD; 601 } 602 603 // Allocate new array (uses realloc and memcpies the data) 604 lanes.data = alloc( ncount, lanes.data`realloc ); 605 606 // Fix the moved data 607 for( idx; (size_t)lanes.count ) { 608 fix(lanes.data[idx]); 609 } 610 611 // Construct new data 612 for( idx; (size_t)lanes.count ~ ncount) { 613 (lanes.data[idx]){}; 614 } 615 616 // Update original 617 lanes.count = ncount; 618 } 619 620 fix_times(cltr); 621 622 reassign_cltr_id(cltr); 623 624 // Make sure that everything is consistent 625 /* paranoid */ check( cltr->ready_queue ); 626 627 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n"); 628 629 /* paranoid */ verify( ready_mutate_islocked() ); 630 } 631 632 // Shrink the ready queue 633 void ready_queue_shrink(struct cluster * cltr) { 634 /* paranoid */ verify( ready_mutate_islocked() ); 635 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n"); 636 637 // Make sure that everything is consistent 638 /* paranoid */ check( cltr->ready_queue ); 639 640 int target = cltr->procs.total; 641 642 with( cltr->ready_queue ) { 643 // Remember old count 644 size_t ocount = lanes.count; 645 646 // Find new count 647 // Make sure we always have atleast 1 list 648 lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD; 649 /* paranoid */ verify( ocount >= lanes.count ); 650 /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 ); 651 652 // for printing count the number of displaced threads 653 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) 654 __attribute__((unused)) size_t displaced = 0; 655 #endif 656 657 // redistribute old data 658 for( idx; (size_t)lanes.count ~ ocount) { 659 // Lock is not strictly needed but makes checking invariants much easier 660 __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock); 661 verify(locked); 662 663 // As long as we can pop from this lane to push the threads somewhere else in the queue 664 while(!is_empty(lanes.data[idx])) { 665 struct $thread * thrd; 666 unsigned long long _; 667 [thrd, _] = pop(lanes.data[idx]); 668 669 push(cltr, thrd, true); 670 671 // for printing count the number of displaced threads 672 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) 673 displaced++; 674 #endif 675 } 676 677 // Unlock the lane 678 __atomic_unlock(&lanes.data[idx].lock); 679 680 // TODO print the queue statistics here 681 682 ^(lanes.data[idx]){}; 683 } 684 685 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced); 686 687 // Allocate new array (uses realloc and memcpies the data) 688 lanes.data = alloc( lanes.count, lanes.data`realloc ); 689 690 // Fix the moved data 691 for( idx; (size_t)lanes.count ) { 692 fix(lanes.data[idx]); 693 } 694 } 695 696 fix_times(cltr); 697 698 reassign_cltr_id(cltr); 699 700 // Make sure that everything is consistent 701 /* paranoid */ check( cltr->ready_queue ); 702 703 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n"); 704 /* paranoid */ verify( ready_mutate_islocked() ); 705 } 703 #if defined(USE_CPU_WORK_STEALING) 704 // ready_queue size is fixed in this case 705 void ready_queue_grow(struct cluster * cltr) {} 706 void ready_queue_shrink(struct cluster * cltr) {} 707 #else 708 // Grow the ready queue 709 void ready_queue_grow(struct cluster * cltr) { 710 size_t ncount; 711 int target = cltr->procs.total; 712 713 /* paranoid */ verify( ready_mutate_islocked() ); 714 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n"); 715 716 // Make sure that everything is consistent 717 /* paranoid */ check( cltr->ready_queue ); 718 719 // grow the ready queue 720 with( cltr->ready_queue ) { 721 // Find new count 722 // Make sure we always have atleast 1 list 723 if(target >= 2) { 724 ncount = target * READYQ_SHARD_FACTOR; 725 } else { 726 ncount = SEQUENTIAL_SHARD; 727 } 728 729 // Allocate new array (uses realloc and memcpies the data) 730 lanes.data = alloc( ncount, lanes.data`realloc ); 731 732 // Fix the moved data 733 for( idx; (size_t)lanes.count ) { 734 fix(lanes.data[idx]); 735 } 736 737 // Construct new data 738 for( idx; (size_t)lanes.count ~ ncount) { 739 (lanes.data[idx]){}; 740 } 741 742 // Update original 743 lanes.count = ncount; 744 } 745 746 fix_times(cltr); 747 748 reassign_cltr_id(cltr); 749 750 // Make sure that everything is consistent 751 /* paranoid */ check( cltr->ready_queue ); 752 753 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n"); 754 755 /* paranoid */ verify( ready_mutate_islocked() ); 756 } 757 758 // Shrink the ready queue 759 void ready_queue_shrink(struct cluster * cltr) { 760 /* paranoid */ verify( ready_mutate_islocked() ); 761 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n"); 762 763 // Make sure that everything is consistent 764 /* paranoid */ check( cltr->ready_queue ); 765 766 int target = cltr->procs.total; 767 768 with( cltr->ready_queue ) { 769 // Remember old count 770 size_t ocount = lanes.count; 771 772 // Find new count 773 // Make sure we always have atleast 1 list 774 lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD; 775 /* paranoid */ verify( ocount >= lanes.count ); 776 /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 ); 777 778 // for printing count the number of displaced threads 779 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) 780 __attribute__((unused)) size_t displaced = 0; 781 #endif 782 783 // redistribute old data 784 for( idx; (size_t)lanes.count ~ ocount) { 785 // Lock is not strictly needed but makes checking invariants much easier 786 __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock); 787 verify(locked); 788 789 // As long as we can pop from this lane to push the threads somewhere else in the queue 790 while(!is_empty(lanes.data[idx])) { 791 struct $thread * thrd; 792 unsigned long long _; 793 [thrd, _] = pop(lanes.data[idx]); 794 795 push(cltr, thrd, true); 796 797 // for printing count the number of displaced threads 798 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) 799 displaced++; 800 #endif 801 } 802 803 // Unlock the lane 804 __atomic_unlock(&lanes.data[idx].lock); 805 806 // TODO print the queue statistics here 807 808 ^(lanes.data[idx]){}; 809 } 810 811 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced); 812 813 // Allocate new array (uses realloc and memcpies the data) 814 lanes.data = alloc( lanes.count, lanes.data`realloc ); 815 816 // Fix the moved data 817 for( idx; (size_t)lanes.count ) { 818 fix(lanes.data[idx]); 819 } 820 } 821 822 fix_times(cltr); 823 824 reassign_cltr_id(cltr); 825 826 // Make sure that everything is consistent 827 /* paranoid */ check( cltr->ready_queue ); 828 829 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n"); 830 /* paranoid */ verify( ready_mutate_islocked() ); 831 } 832 #endif 706 833 707 834 #if !defined(__CFA_NO_STATISTICS__)
Note: See TracChangeset
for help on using the changeset viewer.