- File:
-
- 1 edited
-
libcfa/src/concurrency/ready_queue.cfa (modified) (11 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/ready_queue.cfa
r6ba6846 rb808625 15 15 16 16 #define __cforall_thread__ 17 #define _GNU_SOURCE18 19 17 // #define __CFA_DEBUG_PRINT_READY_QUEUE__ 20 18 … … 22 20 #define USE_RELAXED_FIFO 23 21 // #define USE_WORK_STEALING 24 // #define USE_CPU_WORK_STEALING25 22 26 23 #include "bits/defs.hfa" 27 #include "device/cpu.hfa"28 24 #include "kernel_private.hfa" 29 25 26 #define _GNU_SOURCE 30 27 #include "stdlib.hfa" 31 28 #include "math.hfa" 32 29 33 #include <errno.h>34 30 #include <unistd.h> 35 36 extern "C" {37 #include <sys/syscall.h> // __NR_xxx38 }39 31 40 32 #include "ready_subqueue.hfa" … … 54 46 #endif 55 47 56 #if defined(USE_CPU_WORK_STEALING) 57 #define READYQ_SHARD_FACTOR 2 58 #elif defined(USE_RELAXED_FIFO) 48 #if defined(USE_RELAXED_FIFO) 59 49 #define BIAS 4 60 50 #define READYQ_SHARD_FACTOR 4 … … 95 85 } 96 86 97 #if defined(CFA_HAVE_LINUX_LIBRSEQ)98 // No forward declaration needed99 #define __kernel_rseq_register rseq_register_current_thread100 #define __kernel_rseq_unregister rseq_unregister_current_thread101 #elif defined(CFA_HAVE_LINUX_RSEQ_H)102 void __kernel_raw_rseq_register (void);103 void __kernel_raw_rseq_unregister(void);104 105 #define __kernel_rseq_register __kernel_raw_rseq_register106 #define __kernel_rseq_unregister __kernel_raw_rseq_unregister107 #else108 // No forward declaration needed109 // No initialization needed110 static inline void noop(void) {}111 112 #define __kernel_rseq_register noop113 #define __kernel_rseq_unregister noop114 #endif115 116 87 //======================================================================= 117 88 // Cluster wide reader-writer lock … … 136 107 // Lock-Free registering/unregistering of threads 137 108 unsigned register_proc_id( void ) with(*__scheduler_lock) { 138 __kernel_rseq_register();139 140 109 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc); 141 110 bool * handle = (bool *)&kernelTLS().sched_lock; … … 192 161 193 162 __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc); 194 195 __kernel_rseq_unregister();196 163 } 197 164 … … 247 214 //======================================================================= 248 215 void ?{}(__ready_queue_t & this) with (this) { 249 #if defined(USE_CPU_WORK_STEALING) 250 lanes.count = cpu_info.hthrd_count * READYQ_SHARD_FACTOR; 251 lanes.data = alloc( lanes.count ); 252 lanes.tscs = alloc( lanes.count ); 253 254 for( idx; (size_t)lanes.count ) { 255 (lanes.data[idx]){}; 256 lanes.tscs[idx].tv = rdtscl(); 257 } 258 #else 259 lanes.data = 0p; 260 lanes.tscs = 0p; 261 lanes.count = 0; 262 #endif 216 lanes.data = 0p; 217 lanes.tscs = 0p; 218 lanes.count = 0; 263 219 } 264 220 265 221 void ^?{}(__ready_queue_t & this) with (this) { 266 #if !defined(USE_CPU_WORK_STEALING) 267 verify( SEQUENTIAL_SHARD == lanes.count ); 268 #endif 269 222 verify( SEQUENTIAL_SHARD == lanes.count ); 270 223 free(lanes.data); 271 224 free(lanes.tscs); … … 273 226 274 227 //----------------------------------------------------------------------- 275 #if defined(USE_CPU_WORK_STEALING)276 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {277 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);278 279 processor * const proc = kernelTLS().this_processor;280 const bool external = !push_local || (!proc) || (cltr != proc->cltr);281 282 const int cpu = __kernel_getcpu();283 /* paranoid */ verify(cpu >= 0);284 /* paranoid */ verify(cpu < cpu_info.hthrd_count);285 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);286 287 const cpu_map_entry_t & map = cpu_info.llc_map[cpu];288 /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count);289 /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count);290 /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %zu lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR);291 292 const int start = map.self * READYQ_SHARD_FACTOR;293 unsigned i;294 do {295 unsigned r;296 if(unlikely(external)) { r = __tls_rand(); }297 else { r = proc->rdq.its++; }298 i = start + (r % READYQ_SHARD_FACTOR);299 // If we can't lock it retry300 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );301 302 // Actually push it303 push(lanes.data[i], thrd);304 305 // Unlock and return306 __atomic_unlock( &lanes.data[i].lock );307 308 #if !defined(__CFA_NO_STATISTICS__)309 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);310 else __tls_stats()->ready.push.local.success++;311 #endif312 313 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);314 315 }316 317 // Pop from the ready queue from a given cluster318 __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {319 /* paranoid */ verify( lanes.count > 0 );320 /* paranoid */ verify( kernelTLS().this_processor );321 322 const int cpu = __kernel_getcpu();323 /* paranoid */ verify(cpu >= 0);324 /* paranoid */ verify(cpu < cpu_info.hthrd_count);325 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);326 327 const cpu_map_entry_t & map = cpu_info.llc_map[cpu];328 /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count);329 /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count);330 /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %zu lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR);331 332 processor * const proc = kernelTLS().this_processor;333 const int start = map.self * READYQ_SHARD_FACTOR;334 335 // Did we already have a help target336 if(proc->rdq.target == -1u) {337 // if We don't have a338 unsigned long long min = ts(lanes.data[start]);339 for(i; READYQ_SHARD_FACTOR) {340 unsigned long long tsc = ts(lanes.data[start + i]);341 if(tsc < min) min = tsc;342 }343 proc->rdq.cutoff = min;344 345 /* paranoid */ verify(lanes.count < 65536); // The following code assumes max 65536 cores.346 /* paranoid */ verify(map.count < 65536); // The following code assumes max 65536 cores.347 uint64_t chaos = __tls_rand();348 uint64_t high_chaos = (chaos >> 32);349 uint64_t mid_chaos = (chaos >> 16) & 0xffff;350 uint64_t low_chaos = chaos & 0xffff;351 352 unsigned me = map.self;353 unsigned cpu_chaos = map.start + (mid_chaos % map.count);354 bool global = cpu_chaos == me;355 356 if(global) {357 proc->rdq.target = high_chaos % lanes.count;358 } else {359 proc->rdq.target = (cpu_chaos * READYQ_SHARD_FACTOR) + (low_chaos % READYQ_SHARD_FACTOR);360 /* paranoid */ verify(proc->rdq.target >= (map.start * READYQ_SHARD_FACTOR));361 /* paranoid */ verify(proc->rdq.target < ((map.start + map.count) * READYQ_SHARD_FACTOR));362 }363 364 /* paranoid */ verify(proc->rdq.target != -1u);365 }366 else {367 const unsigned long long bias = 0; //2_500_000_000;368 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;369 {370 unsigned target = proc->rdq.target;371 proc->rdq.target = -1u;372 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) {373 $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));374 proc->rdq.last = target;375 if(t) return t;376 }377 }378 379 unsigned last = proc->rdq.last;380 if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) < cutoff) {381 $thread * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help));382 if(t) return t;383 }384 else {385 proc->rdq.last = -1u;386 }387 }388 389 for(READYQ_SHARD_FACTOR) {390 unsigned i = start + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);391 if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;392 }393 394 // All lanes where empty return 0p395 return 0p;396 }397 398 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {399 processor * const proc = kernelTLS().this_processor;400 unsigned last = proc->rdq.last;401 if(last != -1u) {402 struct $thread * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.steal));403 if(t) return t;404 proc->rdq.last = -1u;405 }406 407 unsigned i = __tls_rand() % lanes.count;408 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));409 }410 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) {411 return search(cltr);412 }413 #endif414 228 #if defined(USE_RELAXED_FIFO) 415 229 //----------------------------------------------------------------------- … … 705 519 if(is_empty(sl)) { 706 520 assert( sl.anchor.next == 0p ); 707 assert( sl.anchor.ts == -1llu);521 assert( sl.anchor.ts == 0 ); 708 522 assert( mock_head(sl) == sl.prev ); 709 523 } else { 710 524 assert( sl.anchor.next != 0p ); 711 assert( sl.anchor.ts != -1llu);525 assert( sl.anchor.ts != 0 ); 712 526 assert( mock_head(sl) != sl.prev ); 713 527 } … … 759 573 lanes.tscs = alloc(lanes.count, lanes.tscs`realloc); 760 574 for(i; lanes.count) { 761 unsigned long long tsc1 = ts(lanes.data[i]); 762 unsigned long long tsc2 = rdtscl(); 763 lanes.tscs[i].tv = min(tsc1, tsc2); 575 unsigned long long tsc = ts(lanes.data[i]); 576 lanes.tscs[i].tv = tsc != 0 ? tsc : rdtscl(); 764 577 } 765 578 #endif 766 579 } 767 580 768 #if defined(USE_CPU_WORK_STEALING) 769 // ready_queue size is fixed in this case 770 void ready_queue_grow(struct cluster * cltr) {} 771 void ready_queue_shrink(struct cluster * cltr) {} 772 #else 773 // Grow the ready queue 774 void ready_queue_grow(struct cluster * cltr) { 775 size_t ncount; 776 int target = cltr->procs.total; 777 778 /* paranoid */ verify( ready_mutate_islocked() ); 779 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n"); 780 781 // Make sure that everything is consistent 782 /* paranoid */ check( cltr->ready_queue ); 783 784 // grow the ready queue 785 with( cltr->ready_queue ) { 786 // Find new count 787 // Make sure we always have atleast 1 list 788 if(target >= 2) { 789 ncount = target * READYQ_SHARD_FACTOR; 790 } else { 791 ncount = SEQUENTIAL_SHARD; 581 // Grow the ready queue 582 void ready_queue_grow(struct cluster * cltr) { 583 size_t ncount; 584 int target = cltr->procs.total; 585 586 /* paranoid */ verify( ready_mutate_islocked() ); 587 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n"); 588 589 // Make sure that everything is consistent 590 /* paranoid */ check( cltr->ready_queue ); 591 592 // grow the ready queue 593 with( cltr->ready_queue ) { 594 // Find new count 595 // Make sure we always have atleast 1 list 596 if(target >= 2) { 597 ncount = target * READYQ_SHARD_FACTOR; 598 } else { 599 ncount = SEQUENTIAL_SHARD; 600 } 601 602 // Allocate new array (uses realloc and memcpies the data) 603 lanes.data = alloc( ncount, lanes.data`realloc ); 604 605 // Fix the moved data 606 for( idx; (size_t)lanes.count ) { 607 fix(lanes.data[idx]); 608 } 609 610 // Construct new data 611 for( idx; (size_t)lanes.count ~ ncount) { 612 (lanes.data[idx]){}; 613 } 614 615 // Update original 616 lanes.count = ncount; 617 } 618 619 fix_times(cltr); 620 621 reassign_cltr_id(cltr); 622 623 // Make sure that everything is consistent 624 /* paranoid */ check( cltr->ready_queue ); 625 626 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n"); 627 628 /* paranoid */ verify( ready_mutate_islocked() ); 629 } 630 631 // Shrink the ready queue 632 void ready_queue_shrink(struct cluster * cltr) { 633 /* paranoid */ verify( ready_mutate_islocked() ); 634 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n"); 635 636 // Make sure that everything is consistent 637 /* paranoid */ check( cltr->ready_queue ); 638 639 int target = cltr->procs.total; 640 641 with( cltr->ready_queue ) { 642 // Remember old count 643 size_t ocount = lanes.count; 644 645 // Find new count 646 // Make sure we always have atleast 1 list 647 lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD; 648 /* paranoid */ verify( ocount >= lanes.count ); 649 /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 ); 650 651 // for printing count the number of displaced threads 652 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) 653 __attribute__((unused)) size_t displaced = 0; 654 #endif 655 656 // redistribute old data 657 for( idx; (size_t)lanes.count ~ ocount) { 658 // Lock is not strictly needed but makes checking invariants much easier 659 __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock); 660 verify(locked); 661 662 // As long as we can pop from this lane to push the threads somewhere else in the queue 663 while(!is_empty(lanes.data[idx])) { 664 struct $thread * thrd; 665 unsigned long long _; 666 [thrd, _] = pop(lanes.data[idx]); 667 668 push(cltr, thrd, true); 669 670 // for printing count the number of displaced threads 671 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) 672 displaced++; 673 #endif 792 674 } 793 675 794 // Allocate new array (uses realloc and memcpies the data) 795 lanes.data = alloc( ncount, lanes.data`realloc ); 796 797 // Fix the moved data 798 for( idx; (size_t)lanes.count ) { 799 fix(lanes.data[idx]); 800 } 801 802 // Construct new data 803 for( idx; (size_t)lanes.count ~ ncount) { 804 (lanes.data[idx]){}; 805 } 806 807 // Update original 808 lanes.count = ncount; 809 } 810 811 fix_times(cltr); 812 813 reassign_cltr_id(cltr); 814 815 // Make sure that everything is consistent 816 /* paranoid */ check( cltr->ready_queue ); 817 818 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n"); 819 820 /* paranoid */ verify( ready_mutate_islocked() ); 821 } 822 823 // Shrink the ready queue 824 void ready_queue_shrink(struct cluster * cltr) { 825 /* paranoid */ verify( ready_mutate_islocked() ); 826 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n"); 827 828 // Make sure that everything is consistent 829 /* paranoid */ check( cltr->ready_queue ); 830 831 int target = cltr->procs.total; 832 833 with( cltr->ready_queue ) { 834 // Remember old count 835 size_t ocount = lanes.count; 836 837 // Find new count 838 // Make sure we always have atleast 1 list 839 lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD; 840 /* paranoid */ verify( ocount >= lanes.count ); 841 /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 ); 842 843 // for printing count the number of displaced threads 844 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) 845 __attribute__((unused)) size_t displaced = 0; 846 #endif 847 848 // redistribute old data 849 for( idx; (size_t)lanes.count ~ ocount) { 850 // Lock is not strictly needed but makes checking invariants much easier 851 __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock); 852 verify(locked); 853 854 // As long as we can pop from this lane to push the threads somewhere else in the queue 855 while(!is_empty(lanes.data[idx])) { 856 struct $thread * thrd; 857 unsigned long long _; 858 [thrd, _] = pop(lanes.data[idx]); 859 860 push(cltr, thrd, true); 861 862 // for printing count the number of displaced threads 863 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) 864 displaced++; 865 #endif 866 } 867 868 // Unlock the lane 869 __atomic_unlock(&lanes.data[idx].lock); 870 871 // TODO print the queue statistics here 872 873 ^(lanes.data[idx]){}; 874 } 875 876 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced); 877 878 // Allocate new array (uses realloc and memcpies the data) 879 lanes.data = alloc( lanes.count, lanes.data`realloc ); 880 881 // Fix the moved data 882 for( idx; (size_t)lanes.count ) { 883 fix(lanes.data[idx]); 884 } 885 } 886 887 fix_times(cltr); 888 889 reassign_cltr_id(cltr); 890 891 // Make sure that everything is consistent 892 /* paranoid */ check( cltr->ready_queue ); 893 894 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n"); 895 /* paranoid */ verify( ready_mutate_islocked() ); 896 } 897 #endif 676 // Unlock the lane 677 __atomic_unlock(&lanes.data[idx].lock); 678 679 // TODO print the queue statistics here 680 681 ^(lanes.data[idx]){}; 682 } 683 684 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced); 685 686 // Allocate new array (uses realloc and memcpies the data) 687 lanes.data = alloc( lanes.count, lanes.data`realloc ); 688 689 // Fix the moved data 690 for( idx; (size_t)lanes.count ) { 691 fix(lanes.data[idx]); 692 } 693 } 694 695 fix_times(cltr); 696 697 reassign_cltr_id(cltr); 698 699 // Make sure that everything is consistent 700 /* paranoid */ check( cltr->ready_queue ); 701 702 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n"); 703 /* paranoid */ verify( ready_mutate_islocked() ); 704 } 898 705 899 706 #if !defined(__CFA_NO_STATISTICS__) … … 903 710 } 904 711 #endif 905 906 907 #if defined(CFA_HAVE_LINUX_LIBRSEQ)908 // No definition needed909 #elif defined(CFA_HAVE_LINUX_RSEQ_H)910 911 #if defined( __x86_64 ) || defined( __i386 )912 #define RSEQ_SIG 0x53053053913 #elif defined( __ARM_ARCH )914 #ifdef __ARMEB__915 #define RSEQ_SIG 0xf3def5e7 /* udf #24035 ; 0x5de3 (ARMv6+) */916 #else917 #define RSEQ_SIG 0xe7f5def3 /* udf #24035 ; 0x5de3 */918 #endif919 #endif920 921 extern void __disable_interrupts_hard();922 extern void __enable_interrupts_hard();923 924 void __kernel_raw_rseq_register (void) {925 /* paranoid */ verify( __cfaabi_rseq.cpu_id == RSEQ_CPU_ID_UNINITIALIZED );926 927 // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, (sigset_t *)0p, _NSIG / 8);928 int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, RSEQ_SIG);929 if(ret != 0) {930 int e = errno;931 switch(e) {932 case EINVAL: abort("KERNEL ERROR: rseq register invalid argument");933 case ENOSYS: abort("KERNEL ERROR: rseq register no supported");934 case EFAULT: abort("KERNEL ERROR: rseq register with invalid argument");935 case EBUSY : abort("KERNEL ERROR: rseq register already registered");936 case EPERM : abort("KERNEL ERROR: rseq register sig argument on unregistration does not match the signature received on registration");937 default: abort("KERNEL ERROR: rseq register unexpected return %d", e);938 }939 }940 }941 942 void __kernel_raw_rseq_unregister(void) {943 /* paranoid */ verify( __cfaabi_rseq.cpu_id >= 0 );944 945 // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, (sigset_t *)0p, _NSIG / 8);946 int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, RSEQ_SIG);947 if(ret != 0) {948 int e = errno;949 switch(e) {950 case EINVAL: abort("KERNEL ERROR: rseq unregister invalid argument");951 case ENOSYS: abort("KERNEL ERROR: rseq unregister no supported");952 case EFAULT: abort("KERNEL ERROR: rseq unregister with invalid argument");953 case EBUSY : abort("KERNEL ERROR: rseq unregister already registered");954 case EPERM : abort("KERNEL ERROR: rseq unregister sig argument on unregistration does not match the signature received on registration");955 default: abort("KERNEL ERROR: rseq unregisteunexpected return %d", e);956 }957 }958 }959 #else960 // No definition needed961 #endif
Note:
See TracChangeset
for help on using the changeset viewer.