Changes in / [7f9968ad:8b58bae]
- Files:
-
- 18 added
- 23 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/readv.cfa
r7f9968ad r8b58bae 17 17 #include <time.hfa> 18 18 19 #include "../benchcltr.hfa" 20 19 21 extern bool traceHeapOn(); 20 22 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags); … … 26 28 unsigned long int buflen = 50; 27 29 28 cluster * the_cluster; 29 30 thread Reader {}; 30 thread __attribute__((aligned(128))) Reader {}; 31 31 void ?{}( Reader & this ) { 32 ((thread&)this){ "Reader Thread", *the_cluster }; 33 } 34 35 struct my_processor { 36 processor p; 37 }; 38 39 void ?{}( my_processor & this ) { 40 (this.p){ "I/O Processor", *the_cluster }; 32 ((thread&)this){ "Reader Thread", *the_benchmark_cluster }; 41 33 } 42 34 43 35 void main( Reader & ) { 44 while(!__atomic_load_n(&run, __ATOMIC_RELAXED)) yield(); 36 park( __cfaabi_dbg_ctx ); 37 /* paranoid */ assert( true == __atomic_load_n(&run, __ATOMIC_RELAXED) ); 45 38 46 39 char data[buflen]; … … 153 146 { 154 147 Time start, end; 155 cluster cl = { "IO Cluster", flags }; 156 the_cluster = &cl; 148 BenchCluster cl = { flags }; 157 149 #if !defined(__CFA_NO_STATISTICS__) 158 print_stats_at_exit( cl );150 print_stats_at_exit( cl.self ); 159 151 #endif 160 152 { 161 my_processorprocs[nprocs];153 BenchProc procs[nprocs]; 162 154 { 163 155 Reader threads[nthreads]; 164 156 165 157 printf("Starting\n"); 158 bool is_tty = isatty(STDOUT_FILENO); 166 159 start = getTime(); 167 160 run = true; 168 do { 169 sleep(500`ms); 170 end = getTime(); 171 } while( (end - start) < duration`s ); 161 162 for(i; nthreads) { 163 unpark( threads[i] __cfaabi_dbg_ctx2 ); 164 } 165 wait(duration, start, end, is_tty); 166 172 167 run = false; 173 168 end = getTime(); 174 printf(" Done\n");169 printf("\nDone\n"); 175 170 } 176 171 } -
doc/theses/thierry_delisle_PhD/code/relaxed_list.cpp
r7f9968ad r8b58bae 1 #include "relaxed_list.hpp" 1 #if !defined(LIST_VARIANT_HPP) 2 #define LIST_VARIANT_HPP "relaxed_list.hpp" 3 #endif 4 5 #include LIST_VARIANT_HPP 6 #if !defined(LIST_VARIANT) 7 #error not variant selected 8 #endif 2 9 3 10 #include <array> … … 35 42 36 43 template<> 37 thread_local relaxed_list<Node>::TLS relaxed_list<Node>::tls = {};44 thread_local LIST_VARIANT<Node>::TLS LIST_VARIANT<Node>::tls = {}; 38 45 39 46 template<> 40 relaxed_list<Node> * relaxed_list<Node>::head = nullptr;47 std::atomic_uint32_t LIST_VARIANT<Node>::ticket = { 0 }; 41 48 42 49 #ifndef NO_STATS 43 50 template<> 44 relaxed_list<Node>::GlobalStats relaxed_list<Node>::global_stats = {};51 LIST_VARIANT<Node>::GlobalStats LIST_VARIANT<Node>::global_stats = {}; 45 52 #endif 46 53 … … 57 64 size_t valmax = 0; 58 65 size_t valmin = 100000000ul; 66 struct { 67 size_t val = 0; 68 size_t cnt = 0; 69 } comp; 70 struct { 71 size_t val = 0; 72 size_t cnt = 0; 73 } subm; 59 74 }; 60 75 … … 67 82 std::atomic_size_t valmax = { 0 }; 68 83 std::atomic_size_t valmin = { 100000000ul }; 84 struct { 85 std::atomic_size_t val = { 0 }; 86 std::atomic_size_t cnt = { 0 }; 87 } comp; 88 struct { 89 std::atomic_size_t val = { 0 }; 90 std::atomic_size_t cnt = { 0 }; 91 } subm; 69 92 }; 70 93 … … 96 119 global.crc_out += local.crc_out; 97 120 121 global.comp.val += local.comp.val; 122 global.comp.cnt += local.comp.cnt; 123 global.subm.val += local.subm.val; 124 global.subm.cnt += local.subm.cnt; 125 98 126 atomic_max(global.valmax, local.valmax); 99 127 atomic_min(global.valmin, local.valmin); 100 128 101 relaxed_list<Node>::stats_tls_tally();129 LIST_VARIANT<Node>::stats_tls_tally(); 102 130 } 103 131 … … 106 134 auto before = Clock::now(); 107 135 barrier.wait(0); 136 bool is_tty = isatty(STDOUT_FILENO); 108 137 109 138 while(true) { … … 115 144 break; 116 145 } 117 std::cout << "\r" << std::setprecision(4) << durr.count(); 118 std::cout.flush(); 146 if(is_tty) { 147 std::cout << "\r" << std::setprecision(4) << durr.count(); 148 std::cout.flush(); 149 } 119 150 } 120 151 … … 159 190 auto dur_nano = duration_cast<std::nano>(1.0); 160 191 192 if(global.valmax != 0) { 193 std::cout << "Max runs : " << global.valmax << "\n"; 194 std::cout << "Min runs : " << global.valmin << "\n"; 195 } 196 if(global.comp.cnt != 0) { 197 std::cout << "Submit count : " << global.subm.cnt << "\n"; 198 std::cout << "Submit average: " << ((double(global.subm.val)) / global.subm.cnt) << "\n"; 199 std::cout << "Complete count: " << global.comp.cnt << "\n"; 200 std::cout << "Complete avg : " << ((double(global.comp.val)) / global.comp.cnt) << "\n"; 201 } 161 202 std::cout << "Duration : " << duration << "s\n"; 162 203 std::cout << "ns/Op : " << ( dur_nano / ops_thread )<< "\n"; … … 164 205 std::cout << "Ops/sec : " << ops_sec << "\n"; 165 206 std::cout << "Total ops : " << ops << "(" << global.in << "i, " << global.out << "o, " << global.empty << "e)\n"; 166 if(global.valmax != 0) {167 std::cout << "Max runs : " << global.valmax << "\n";168 std::cout << "Min runs : " << global.valmin << "\n";169 }170 207 #ifndef NO_STATS 171 relaxed_list<Node>::stats_print(std::cout);208 LIST_VARIANT<Node>::stats_print(std::cout); 172 209 #endif 173 210 } … … 186 223 unsigned nslots, 187 224 local_stat_t & local, 188 relaxed_list<Node> & list225 LIST_VARIANT<Node> & list 189 226 ) { 190 227 while(__builtin_expect(!done.load(std::memory_order_relaxed), true)) { … … 224 261 std::cout << "Initializing "; 225 262 size_t npushed = 0; 226 relaxed_list<Node> list = { nthread *nqueues };263 LIST_VARIANT<Node> list = { nthread, nqueues }; 227 264 { 228 265 Node** all_nodes[nthread]; … … 310 347 unsigned nnodes, 311 348 local_stat_t & local, 312 relaxed_list<Node> & list349 LIST_VARIANT<Node> & list 313 350 ) { 314 351 Node * nodes[nnodes]; … … 354 391 std::cout << "Initializing "; 355 392 // List being tested 356 relaxed_list<Node> list = { nthread *nqueues };393 LIST_VARIANT<Node> list = { nthread, nqueues }; 357 394 { 358 395 enable_stats = true; … … 395 432 396 433 enable_stats = false; 434 } 435 436 print_stats(duration, nthread, global); 437 } 438 439 // ================================================================================================ 440 struct __attribute__((aligned(64))) Slot { 441 Node * volatile node; 442 }; 443 444 __attribute__((noinline)) void runProducer_body( 445 std::atomic<bool>& done, 446 Random & rand, 447 Slot * slots, 448 int nslots, 449 local_stat_t & local, 450 LIST_VARIANT<Node> & list 451 ) { 452 while(__builtin_expect(!done.load(std::memory_order_relaxed), true)) { 453 454 Node * node = list.pop(); 455 if(!node) { 456 local.empty ++; 457 continue; 458 } 459 460 local.crc_out += node->value; 461 local.out++; 462 463 if(node->id == 0) { 464 unsigned cnt = 0; 465 for(int i = 0; i < nslots; i++) { 466 Node * found = __atomic_exchange_n( &slots[i].node, nullptr, __ATOMIC_SEQ_CST ); 467 if( found ) { 468 local.crc_in += found->value; 469 local.in++; 470 cnt++; 471 list.push( found ); 472 } 473 } 474 475 local.crc_in += node->value; 476 local.in++; 477 list.push( node ); 478 479 local.comp.cnt++; 480 local.comp.val += cnt; 481 } 482 else { 483 unsigned len = 0; 484 while(true) { 485 auto off = rand.next(); 486 for(int i = 0; i < nslots; i++) { 487 Node * expected = nullptr; 488 int idx = (i + off) % nslots; 489 Slot & slot = slots[ idx ]; 490 if( 491 slot.node == nullptr && 492 __atomic_compare_exchange_n( &slot.node, &expected, node, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) 493 ) { 494 local.subm.cnt++; 495 local.subm.val += len; 496 goto LOOP; 497 } 498 assert( expected != node ); 499 len++; 500 } 501 } 502 } 503 504 LOOP:; 505 } 506 } 507 508 void runProducer(unsigned nthread, unsigned nqueues, double duration, unsigned nnodes) { 509 std::cout << "Producer Benchmark" << std::endl; 510 511 // Barrier for synchronization 512 barrier_t barrier(nthread + 1); 513 514 // Data to check everything is OK 515 global_stat_t global; 516 517 // Flag to signal termination 518 std::atomic_bool done = { false }; 519 520 std::cout << "Initializing "; 521 522 int nslots = nnodes * 4; 523 Slot * slots = new Slot[nslots]; 524 std::cout << nnodes << " nodes (" << nslots << " slots)" << std::endl; 525 526 // List being tested 527 LIST_VARIANT<Node> list = { nthread, nqueues }; 528 { 529 Random rand(rdtscl()); 530 for(unsigned i = 0; i < nnodes; i++) { 531 Node * node = new Node(rand.next() % 100); 532 node->id = i; 533 global.crc_in += node->value; 534 list.push(node); 535 } 536 537 for(int i = 0; i < nslots; i++) { 538 slots[i].node = nullptr; 539 } 540 } 541 542 { 543 enable_stats = true; 544 545 std::thread * threads[nthread]; 546 unsigned i = 1; 547 for(auto & t : threads) { 548 t = new std::thread([&done, &list, &barrier, &global, slots, nslots](unsigned tid) { 549 Random rand(tid + rdtscl()); 550 551 local_stat_t local; 552 barrier.wait(tid); 553 554 // EXPERIMENT START 555 556 runProducer_body(done, rand, slots, nslots, local, list); 557 558 // EXPERIMENT END 559 560 barrier.wait(tid); 561 562 tally_stats(global, local); 563 }, i++); 564 } 565 566 waitfor(duration, barrier, done); 567 568 for(auto t : threads) { 569 t->join(); 570 delete t; 571 } 572 573 enable_stats = false; 574 } 575 576 { 577 while(Node * node = list.pop()) { 578 global.crc_out += node->value; 579 delete node; 580 } 581 582 for(int i = 0; i < nslots; i++) { 583 delete slots[i].node; 584 } 585 586 delete [] slots; 397 587 } 398 588 … … 410 600 unsigned nnodes, 411 601 local_stat_t & local, 412 relaxed_list<Node> & list602 LIST_VARIANT<Node> & list 413 603 ) { 414 604 Node * nodes[nnodes]; … … 470 660 471 661 // List being tested 472 relaxed_list<Node> list = { nthread *nqueues };662 LIST_VARIANT<Node> list = { nthread, nqueues }; 473 663 { 474 664 enable_stats = true; … … 521 711 print_stats(duration, nthread, global); 522 712 523 save_fairness(data_out.get(), 100, nthread, width, length, output);713 // save_fairness(data_out.get(), 100, nthread, width, length, output); 524 714 } 525 715 … … 547 737 Churn, 548 738 PingPong, 739 Producer, 549 740 Fairness, 550 741 NONE … … 577 768 case PingPong: 578 769 nnodes = 1; 579 nslots = 1;580 770 switch(argc - optind) { 581 771 case 0: break; … … 591 781 break; 592 782 default: 593 std::cerr << "'PingPong' benchmark doesn't accept more than 2 extra arguments" << std::endl; 783 std::cerr << "'PingPong' benchmark doesn't accept more than 1 extra arguments" << std::endl; 784 goto usage; 785 } 786 break; 787 case Producer: 788 nnodes = 32; 789 switch(argc - optind) { 790 case 0: break; 791 case 1: 792 try { 793 arg = optarg = argv[optind]; 794 nnodes = stoul(optarg, &len); 795 if(len != arg.size()) { throw std::invalid_argument(""); } 796 } catch(std::invalid_argument &) { 797 std::cerr << "Number of nodes must be a positive integer, was " << arg << std::endl; 798 goto usage; 799 } 800 break; 801 default: 802 std::cerr << "'Producer' benchmark doesn't accept more than 1 extra arguments" << std::endl; 594 803 goto usage; 595 804 } … … 662 871 break; 663 872 } 873 if(iequals(arg, "producer")) { 874 benchmark = Producer; 875 break; 876 } 664 877 if(iequals(arg, "fairness")) { 665 878 benchmark = Fairness; … … 702 915 std::cerr << "Usage: " << argv[0] << ": [options] -b churn [NNODES] [NSLOTS = NNODES]" << std::endl; 703 916 std::cerr << " or: " << argv[0] << ": [options] -b pingpong [NNODES]" << std::endl; 917 std::cerr << " or: " << argv[0] << ": [options] -b producer [NNODES]" << std::endl; 704 918 std::cerr << std::endl; 705 919 std::cerr << " -d, --duration=DURATION Duration of the experiment, in seconds" << std::endl; … … 714 928 715 929 std::cout << "Running " << nthreads << " threads (" << (nthreads * nqueues) << " queues) for " << duration << " seconds" << std::endl; 930 std::cout << "Relaxed list variant: " << LIST_VARIANT<Node>::name() << std::endl; 716 931 switch(benchmark) { 717 932 case Churn: … … 720 935 case PingPong: 721 936 runPingPong(nthreads, nqueues, duration, nnodes); 937 break; 938 case Producer: 939 runProducer(nthreads, nqueues, duration, nnodes); 722 940 break; 723 941 case Fairness: … … 801 1019 } 802 1020 803 void save_fairness(const int data[], int factor, unsigned nthreads, size_t columns, size_t rows, const std::string & output) {804 std::ofstream os(output);805 os << "<html>\n";806 os << "<head>\n";807 os << "<style>\n";808 os << "</style>\n";809 os << "</head>\n";810 os << "<body>\n";811 os << "<table style=\"width=100%\">\n";812 813 size_t idx = 0;814 for(size_t r = 0ul; r < rows; r++) {815 os << "<tr>\n";816 for(size_t c = 0ul; c < columns; c++) {817 os << "<td class=\"custom custom" << data[idx] << "\"></td>\n";818 idx++;819 }820 os << "</tr>\n";821 }822 823 os << "</table>\n";824 os << "</body>\n";825 os << "</html>\n";826 os << std::endl;827 }828 829 #include <png.h>830 #include <setjmp.h>1021 // void save_fairness(const int data[], int factor, unsigned nthreads, size_t columns, size_t rows, const std::string & output) { 1022 // std::ofstream os(output); 1023 // os << "<html>\n"; 1024 // os << "<head>\n"; 1025 // os << "<style>\n"; 1026 // os << "</style>\n"; 1027 // os << "</head>\n"; 1028 // os << "<body>\n"; 1029 // os << "<table style=\"width=100%\">\n"; 1030 1031 // size_t idx = 0; 1032 // for(size_t r = 0ul; r < rows; r++) { 1033 // os << "<tr>\n"; 1034 // for(size_t c = 0ul; c < columns; c++) { 1035 // os << "<td class=\"custom custom" << data[idx] << "\"></td>\n"; 1036 // idx++; 1037 // } 1038 // os << "</tr>\n"; 1039 // } 1040 1041 // os << "</table>\n"; 1042 // os << "</body>\n"; 1043 // os << "</html>\n"; 1044 // os << std::endl; 1045 // } 1046 1047 // #include <png.h> 1048 // #include <setjmp.h> 831 1049 832 1050 /* -
doc/theses/thierry_delisle_PhD/code/relaxed_list.hpp
r7f9968ad r8b58bae 1 1 #pragma once 2 #define LIST_VARIANT relaxed_list 3 4 #define VANILLA 0 5 #define SNZI 1 6 #define BITMASK 2 7 #define DISCOVER 3 8 #define SNZM 4 9 #define BIAS 5 10 11 #ifndef VARIANT 12 #define VARIANT VANILLA 13 #endif 2 14 3 15 #ifndef NO_STATS … … 5 17 #endif 6 18 19 #include <cmath> 7 20 #include <memory> 8 21 #include <mutex> … … 11 24 #include "assert.hpp" 12 25 #include "utils.hpp" 26 #include "links.hpp" 27 #include "snzi.hpp" 28 #include "snzm.hpp" 13 29 14 30 using namespace std; 15 16 struct spinlock_t {17 std::atomic_bool ll = { false };18 19 inline void lock() {20 while( __builtin_expect(ll.exchange(true),false) ) {21 while(ll.load(std::memory_order_relaxed))22 asm volatile("pause");23 }24 }25 26 inline bool try_lock() {27 return false == ll.exchange(true);28 }29 30 inline void unlock() {31 ll.store(false, std::memory_order_release);32 }33 34 inline explicit operator bool() {35 return ll.load(std::memory_order_relaxed);36 }37 };38 39 static inline bool bts(std::atomic_size_t & target, size_t bit ) {40 //*41 int result = 0;42 asm volatile(43 "LOCK btsq %[bit], %[target]\n\t"44 :"=@ccc" (result)45 : [target] "m" (target), [bit] "r" (bit)46 );47 return result != 0;48 /*/49 size_t mask = 1ul << bit;50 size_t ret = target.fetch_or(mask, std::memory_order_relaxed);51 return (ret & mask) != 0;52 //*/53 }54 55 static inline bool btr(std::atomic_size_t & target, size_t bit ) {56 //*57 int result = 0;58 asm volatile(59 "LOCK btrq %[bit], %[target]\n\t"60 :"=@ccc" (result)61 : [target] "m" (target), [bit] "r" (bit)62 );63 return result != 0;64 /*/65 size_t mask = 1ul << bit;66 size_t ret = target.fetch_and(~mask, std::memory_order_relaxed);67 return (ret & mask) != 0;68 //*/69 }70 71 extern bool enable_stats;72 31 73 32 struct pick_stat { … … 75 34 size_t attempt = 0; 76 35 size_t success = 0; 36 size_t local = 0; 77 37 } push; 78 38 struct { … … 80 40 size_t success = 0; 81 41 size_t mask_attempt = 0; 42 size_t mask_reset = 0; 43 size_t local = 0; 82 44 } pop; 83 45 }; … … 95 57 96 58 template<typename node_t> 97 struct _LinksFields_t {98 node_t * prev = nullptr;99 node_t * next = nullptr;100 unsigned long long ts = 0;101 };102 103 template<typename node_t>104 59 class __attribute__((aligned(128))) relaxed_list { 105 60 static_assert(std::is_same<decltype(node_t::_links), _LinksFields_t<node_t>>::value, "Node must have a links field"); 106 61 107 62 public: 108 relaxed_list(unsigned numLists) 109 : lists(new intrusive_queue_t[numLists]) 110 , numLists(numLists) 63 static const char * name() { 64 const char * names[] = { 65 "RELAXED: VANILLA", 66 "RELAXED: SNZI", 67 "RELAXED: BITMASK", 68 "RELAXED: SNZI + DISCOVERED MASK", 69 "RELAXED: SNZI + MASK", 70 "RELAXED: SNZI + LOCAL BIAS" 71 }; 72 return names[VARIANT]; 73 } 74 75 relaxed_list(unsigned numThreads, unsigned numQueues) 76 : numLists(numThreads * numQueues) 77 , lists(new intrusive_queue_t<node_t>[numLists]) 78 #if VARIANT == SNZI || VARIANT == BIAS 79 , snzi( std::log2( numLists / (2 * numQueues) ), 2 ) 80 #elif VARIANT == SNZM || VARIANT == DISCOVER 81 , snzm( numLists ) 82 #endif 111 83 { 112 84 assertf(7 * 8 * 8 >= numLists, "List currently only supports 448 sublists"); 113 // assert(sizeof(*this) == 128);114 85 std::cout << "Constructing Relaxed List with " << numLists << std::endl; 115 116 #ifndef NO_STATS117 if(head) this->next = head;118 head = this;119 #endif120 86 } 121 87 … … 130 96 while(true) { 131 97 // Pick a random list 98 #if VARIANT == BIAS 99 unsigned r = tls.rng.next(); 100 unsigned i; 101 if(0 == (r & 0xF)) { 102 i = r >> 4; 103 } else { 104 i = tls.my_queue + ((r >> 4) % 4); 105 tls.pick.push.local++; 106 } 107 i %= numLists; 108 #else 132 109 unsigned i = tls.rng.next() % numLists; 110 #endif 133 111 134 112 #ifndef NO_STATS … … 139 117 if( !lists[i].lock.try_lock() ) continue; 140 118 141 __attribute__((unused)) int num = numNonEmpty; 119 #if VARIANT != SNZM && VARIANT != SNZI && VARIANT != DISCOVER && VARIANT != BIAS 120 __attribute__((unused)) int num = numNonEmpty; 121 #endif 142 122 143 123 // Actually push it 144 124 if(lists[i].push(node)) { 145 numNonEmpty++; 146 size_t qword = i >> 6ull; 147 size_t bit = i & 63ull; 148 assertf((list_mask[qword] & (1ul << bit)) == 0, "Before set %zu:%zu (%u), %zx & %zx", qword, bit, i, list_mask[qword].load(), (1ul << bit)); 149 __attribute__((unused)) bool ret = bts(list_mask[qword], bit); 150 assert(!ret); 151 assertf((list_mask[qword] & (1ul << bit)) != 0, "After set %zu:%zu (%u), %zx & %zx", qword, bit, i, list_mask[qword].load(), (1ul << bit)); 152 } 153 assert(numNonEmpty <= (int)numLists); 125 #if VARIANT == DISCOVER 126 size_t qword = i >> 6ull; 127 size_t bit = i & 63ull; 128 assert(qword == 0); 129 bts(tls.mask, bit); 130 snzm.arrive(i); 131 #elif VARIANT == SNZI || VARIANT == BIAS 132 snzi.arrive(i); 133 #elif VARIANT == SNZM 134 snzm.arrive(i); 135 #elif VARIANT == BITMASK 136 numNonEmpty++; 137 size_t qword = i >> 6ull; 138 size_t bit = i & 63ull; 139 assertf((list_mask[qword] & (1ul << bit)) == 0, "Before set %zu:%zu (%u), %zx & %zx", qword, bit, i, list_mask[qword].load(), (1ul << bit)); 140 __attribute__((unused)) bool ret = bts(list_mask[qword], bit); 141 assert(!ret); 142 assertf((list_mask[qword] & (1ul << bit)) != 0, "After set %zu:%zu (%u), %zx & %zx", qword, bit, i, list_mask[qword].load(), (1ul << bit)); 143 #else 144 numNonEmpty++; 145 #endif 146 } 147 #if VARIANT != SNZM && VARIANT != SNZI && VARIANT != DISCOVER && VARIANT != BIAS 148 assert(numNonEmpty <= (int)numLists); 149 #endif 154 150 155 151 // Unlock and return … … 158 154 #ifndef NO_STATS 159 155 tls.pick.push.success++; 160 tls.empty.push.value += num; 161 tls.empty.push.count += 1; 156 #if VARIANT != SNZM && VARIANT != SNZI && VARIANT != DISCOVER && VARIANT != BIAS 157 tls.empty.push.value += num; 158 tls.empty.push.count += 1; 159 #endif 162 160 #endif 163 161 return; … … 166 164 167 165 __attribute__((noinline, hot)) node_t * pop() { 168 #if !defined(NO_BITMASK) 169 // for(int r = 0; r < 10 && numNonEmpty != 0; r++) { 170 // // Pick two lists at random 171 // unsigned i = tls.rng.next() % numLists; 172 // unsigned j = tls.rng.next() % numLists; 173 174 // if(auto node = try_pop(i, j)) return node; 175 // } 166 #if VARIANT == DISCOVER 167 assert(numLists <= 64); 168 while(snzm.query()) { 169 tls.pick.pop.mask_attempt++; 170 unsigned i, j; 171 { 172 // Pick first list totally randomly 173 i = tls.rng.next() % numLists; 174 175 // Pick the other according to the bitmask 176 unsigned r = tls.rng.next(); 177 178 size_t mask = tls.mask.load(std::memory_order_relaxed); 179 if(mask == 0) { 180 tls.pick.pop.mask_reset++; 181 mask = (1U << numLists) - 1; 182 tls.mask.store(mask, std::memory_order_relaxed); 183 } 184 185 unsigned b = rand_bit(r, mask); 186 187 assertf(b < 64, "%zu %u", mask, b); 188 189 j = b; 190 191 assert(j < numLists); 192 } 193 194 if(auto node = try_pop(i, j)) return node; 195 } 196 #elif VARIANT == SNZI 197 while(snzi.query()) { 198 // Pick two lists at random 199 int i = tls.rng.next() % numLists; 200 // int j = tls.rng.next() % numLists; 201 202 if(auto node = try_pop(i, j)) return node; 203 } 204 205 #elif VARIANT == BIAS 206 while(snzi.query()) { 207 // Pick two lists at random 208 unsigned ri = tls.rng.next(); 209 unsigned i; 210 unsigned j = tls.rng.next(); 211 if(0 == (ri & 0xF)) { 212 i = (ri >> 4) % numLists; 213 } else { 214 i = tls.my_queue + ((ri >> 4) % 4); 215 j = tls.my_queue + ((j >> 4) % 4); 216 tls.pick.pop.local++; 217 } 218 i %= numLists; 219 j %= numLists; 220 221 if(auto node = try_pop(i, j)) return node; 222 } 223 #elif VARIANT == SNZM 224 //* 225 while(snzm.query()) { 226 tls.pick.pop.mask_attempt++; 227 unsigned i, j; 228 { 229 // Pick two random number 230 unsigned ri = tls.rng.next(); 231 unsigned rj = tls.rng.next(); 232 233 // Pick two nodes from it 234 unsigned wdxi = ri & snzm.mask; 235 // unsigned wdxj = rj & snzm.mask; 236 237 // Get the masks from the nodes 238 // size_t maski = snzm.masks(wdxi); 239 size_t maskj = snzm.masks(wdxj); 240 241 if(maski == 0 && maskj == 0) continue; 242 243 #if defined(__BMI2__) 244 uint64_t idxsi = _pext_u64(snzm.indexes, maski); 245 // uint64_t idxsj = _pext_u64(snzm.indexes, maskj); 246 247 auto pi = __builtin_popcountll(maski); 248 // auto pj = __builtin_popcountll(maskj); 249 250 ri = pi ? ri & ((pi >> 3) - 1) : 0; 251 rj = pj ? rj & ((pj >> 3) - 1) : 0; 252 253 unsigned bi = (idxsi >> (ri << 3)) & 0xff; 254 unsigned bj = (idxsj >> (rj << 3)) & 0xff; 255 #else 256 unsigned bi = rand_bit(ri >> snzm.depth, maski); 257 unsigned bj = rand_bit(rj >> snzm.depth, maskj); 258 #endif 259 260 i = (bi << snzm.depth) | wdxi; 261 j = (bj << snzm.depth) | wdxj; 262 263 /* paranoid */ assertf(i < numLists, "%u %u", bj, wdxi); 264 /* paranoid */ assertf(j < numLists, "%u %u", bj, wdxj); 265 } 266 267 if(auto node = try_pop(i, j)) return node; 268 } 269 /*/ 270 while(snzm.query()) { 271 // Pick two lists at random 272 int i = tls.rng.next() % numLists; 273 int j = tls.rng.next() % numLists; 274 275 if(auto node = try_pop(i, j)) return node; 276 } 277 //*/ 278 #elif VARIANT == BITMASK 176 279 int nnempty; 177 280 while(0 != (nnempty = numNonEmpty)) { 178 281 tls.pick.pop.mask_attempt++; 179 282 unsigned i, j; 180 // if( numLists < 4 || (numLists / nnempty) < 4 ) {181 // // Pick two lists at random182 // i = tls.rng.next() % numLists;183 // j = tls.rng.next() % numLists;184 // } else185 283 { 186 #ifndef NO_STATS187 // tls.pick.push.mask_attempt++;188 #endif189 190 284 // Pick two lists at random 191 285 unsigned num = ((numLists - 1) >> 6) + 1; … … 236 330 #endif 237 331 332 #if VARIANT == DISCOVER 333 if(lists[i].ts() > 0) bts(tls.mask, i); else btr(tls.mask, i); 334 if(lists[j].ts() > 0) bts(tls.mask, j); else btr(tls.mask, j); 335 #endif 336 238 337 // Pick the bet list 239 338 int w = i; … … 249 348 if( !list.lock.try_lock() ) return nullptr; 250 349 251 __attribute__((unused)) int num = numNonEmpty; 350 #if VARIANT != SNZM && VARIANT != SNZI && VARIANT != DISCOVER && VARIANT != BIAS 351 __attribute__((unused)) int num = numNonEmpty; 352 #endif 252 353 253 354 // If list is empty, unlock and retry … … 264 365 265 366 if(emptied) { 266 numNonEmpty--; 267 size_t qword = w >> 6ull; 268 size_t bit = w & 63ull; 269 assert((list_mask[qword] & (1ul << bit)) != 0); 270 __attribute__((unused)) bool ret = btr(list_mask[qword], bit); 271 assert(ret); 272 assert((list_mask[qword] & (1ul << bit)) == 0); 367 #if VARIANT == DISCOVER 368 size_t qword = w >> 6ull; 369 size_t bit = w & 63ull; 370 assert(qword == 0); 371 __attribute__((unused)) bool ret = btr(tls.mask, bit); 372 snzm.depart(w); 373 #elif VARIANT == SNZI || VARIANT == BIAS 374 snzi.depart(w); 375 #elif VARIANT == SNZM 376 snzm.depart(w); 377 #elif VARIANT == BITMASK 378 numNonEmpty--; 379 size_t qword = w >> 6ull; 380 size_t bit = w & 63ull; 381 assert((list_mask[qword] & (1ul << bit)) != 0); 382 __attribute__((unused)) bool ret = btr(list_mask[qword], bit); 383 assert(ret); 384 assert((list_mask[qword] & (1ul << bit)) == 0); 385 #else 386 numNonEmpty--; 387 #endif 273 388 } 274 389 275 390 // Unlock and return 276 391 list.lock.unlock(); 277 assert(numNonEmpty >= 0); 392 #if VARIANT != SNZM && VARIANT != SNZI && VARIANT != DISCOVER && VARIANT != BIAS 393 assert(numNonEmpty >= 0); 394 #endif 278 395 #ifndef NO_STATS 279 396 tls.pick.pop.success++; 280 tls.empty.pop.value += num; 281 tls.empty.pop.count += 1; 397 #if VARIANT != SNZM && VARIANT != SNZI && VARIANT != DISCOVER && VARIANT != BIAS 398 tls.empty.pop.value += num; 399 tls.empty.pop.count += 1; 400 #endif 282 401 #endif 283 402 return node; 284 403 } 285 404 286 private:287 288 class __attribute__((aligned(128))) intrusive_queue_t {289 public:290 typedef spinlock_t lock_t;291 292 friend class relaxed_list<node_t>;293 294 struct stat {295 ssize_t diff = 0;296 size_t push = 0;297 size_t pop = 0;298 // size_t value = 0;299 // size_t count = 0;300 };301 302 private:303 struct sentinel_t {304 _LinksFields_t<node_t> _links;305 };306 307 lock_t lock;308 sentinel_t before;309 sentinel_t after;310 #ifndef NO_STATS311 stat s;312 #endif313 314 #pragma GCC diagnostic push315 #pragma GCC diagnostic ignored "-Winvalid-offsetof"316 static constexpr auto fields_offset = offsetof( node_t, _links );317 #pragma GCC diagnostic pop318 public:319 intrusive_queue_t()320 : before{{ nullptr, tail() }}321 , after {{ head(), nullptr }}322 {323 /* paranoid */ assert((reinterpret_cast<uintptr_t>( head() ) + fields_offset) == reinterpret_cast<uintptr_t>(&before));324 /* paranoid */ assert((reinterpret_cast<uintptr_t>( tail() ) + fields_offset) == reinterpret_cast<uintptr_t>(&after ));325 /* paranoid */ assert(head()->_links.prev == nullptr);326 /* paranoid */ assert(head()->_links.next == tail() );327 /* paranoid */ assert(tail()->_links.next == nullptr);328 /* paranoid */ assert(tail()->_links.prev == head() );329 /* paranoid */ assert(sizeof(*this) == 128);330 /* paranoid */ assert((intptr_t(this) % 128) == 0);331 }332 333 ~intrusive_queue_t() = default;334 335 inline node_t * head() const {336 node_t * rhead = reinterpret_cast<node_t *>(337 reinterpret_cast<uintptr_t>( &before ) - fields_offset338 );339 assert(rhead);340 return rhead;341 }342 343 inline node_t * tail() const {344 node_t * rtail = reinterpret_cast<node_t *>(345 reinterpret_cast<uintptr_t>( &after ) - fields_offset346 );347 assert(rtail);348 return rtail;349 }350 351 inline bool push(node_t * node) {352 assert(lock);353 assert(node->_links.ts != 0);354 node_t * tail = this->tail();355 356 node_t * prev = tail->_links.prev;357 // assertf(node->_links.ts >= prev->_links.ts,358 // "New node has smaller timestamp: %llu < %llu", node->_links.ts, prev->_links.ts);359 node->_links.next = tail;360 node->_links.prev = prev;361 prev->_links.next = node;362 tail->_links.prev = node;363 #ifndef NO_STATS364 if(enable_stats) {365 s.diff++;366 s.push++;367 }368 #endif369 if(before._links.ts == 0l) {370 before._links.ts = node->_links.ts;371 assert(node->_links.prev == this->head());372 return true;373 }374 return false;375 }376 377 inline std::pair<node_t *, bool> pop() {378 assert(lock);379 node_t * head = this->head();380 node_t * tail = this->tail();381 382 node_t * node = head->_links.next;383 node_t * next = node->_links.next;384 if(node == tail) return {nullptr, false};385 386 head->_links.next = next;387 next->_links.prev = head;388 389 #ifndef NO_STATS390 if(enable_stats) {391 s.diff--;392 s.pop ++;393 }394 #endif395 if(next == tail) {396 before._links.ts = 0l;397 return {node, true};398 }399 else {400 assert(next->_links.ts != 0);401 before._links.ts = next->_links.ts;402 assert(before._links.ts != 0);403 return {node, false};404 }405 }406 407 long long ts() const {408 return before._links.ts;409 }410 };411 412 405 413 406 public: … … 415 408 static __attribute__((aligned(128))) thread_local struct TLS { 416 409 Random rng = { int(rdtscl()) }; 410 unsigned my_queue = (ticket++) * 4; 417 411 pick_stat pick; 418 412 empty_stat empty; 413 __attribute__((aligned(64))) std::atomic_size_t mask = { 0 }; 419 414 } tls; 420 415 416 private: 417 const unsigned numLists; 418 __attribute__((aligned(64))) std::unique_ptr<intrusive_queue_t<node_t> []> lists; 419 private: 420 #if VARIANT == SNZI || VARIANT == BIAS 421 snzi_t snzi; 422 #elif VARIANT == SNZM || VARIANT == DISCOVER 423 snzm_t snzm; 424 #else 425 std::atomic_int numNonEmpty = { 0 }; // number of non-empty lists 426 #endif 427 #if VARIANT == BITMASK 428 std::atomic_size_t list_mask[7] = { {0}, {0}, {0}, {0}, {0}, {0}, {0} }; // which queues are empty 429 #endif 430 421 431 public: 422 std::atomic_int numNonEmpty = { 0 }; // number of non-empty lists 423 std::atomic_size_t list_mask[7] = { {0}, {0}, {0}, {0}, {0}, {0}, {0} }; // which queues are empty 424 private: 425 __attribute__((aligned(64))) std::unique_ptr<intrusive_queue_t []> lists; 426 const unsigned numLists; 427 428 public: 429 static const constexpr size_t sizeof_queue = sizeof(intrusive_queue_t); 432 static const constexpr size_t sizeof_queue = sizeof(intrusive_queue_t<node_t>); 433 static std::atomic_uint32_t ticket; 430 434 431 435 #ifndef NO_STATS 432 static void stats_print(std::ostream & os) {433 auto it = head;434 while(it) {435 it->stats_print_local(os);436 it = it->next;437 }438 }439 440 436 static void stats_tls_tally() { 441 437 global_stats.pick.push.attempt += tls.pick.push.attempt; 442 438 global_stats.pick.push.success += tls.pick.push.success; 439 global_stats.pick.push.local += tls.pick.push.local; 443 440 global_stats.pick.pop .attempt += tls.pick.pop.attempt; 444 441 global_stats.pick.pop .success += tls.pick.pop.success; 445 442 global_stats.pick.pop .mask_attempt += tls.pick.pop.mask_attempt; 443 global_stats.pick.pop .mask_reset += tls.pick.pop.mask_reset; 444 global_stats.pick.pop .local += tls.pick.pop.local; 446 445 447 446 global_stats.qstat.push.value += tls.empty.push.value; … … 457 456 std::atomic_size_t attempt = { 0 }; 458 457 std::atomic_size_t success = { 0 }; 458 std::atomic_size_t local = { 0 }; 459 459 } push; 460 460 struct { … … 462 462 std::atomic_size_t success = { 0 }; 463 463 std::atomic_size_t mask_attempt = { 0 }; 464 std::atomic_size_t mask_reset = { 0 }; 465 std::atomic_size_t local = { 0 }; 464 466 } pop; 465 467 } pick; … … 476 478 } global_stats; 477 479 478 // Link list of all lists for stats 479 __attribute__((aligned(64))) relaxed_list<node_t> * next = nullptr; 480 481 static relaxed_list<node_t> * head; 482 483 void stats_print_local(std::ostream & os ) { 480 public: 481 static void stats_print(std::ostream & os ) { 484 482 std::cout << "----- Relaxed List Stats -----" << std::endl; 485 {486 ssize_t diff = 0;487 size_t num = 0;488 ssize_t max = 0;489 490 for(size_t i = 0; i < numLists; i++) {491 const auto & list = lists[i];492 diff+= list.s.diff;493 num ++;494 max = std::abs(max) > std::abs(list.s.diff) ? max : list.s.diff;495 os << "Local Q ops : " << (list.s.push + list.s.pop) << "(" << list.s.push << "i, " << list.s.pop << "o)\n";496 }497 498 os << "Difference : " << ssize_t(double(diff) / num ) << " avg\t" << max << "max" << std::endl;499 }500 483 501 484 const auto & global = global_stats; … … 504 487 double pop_sur = (100.0 * double(global.pick.pop .success) / global.pick.pop .attempt); 505 488 double mpop_sur = (100.0 * double(global.pick.pop .success) / global.pick.pop .mask_attempt); 506 507 os << "Push Pick % : " << push_sur << "(" << global.pick.push.success << " / " << global.pick.push.attempt << ")\n"; 508 os << "Pop Pick % : " << pop_sur << "(" << global.pick.pop .success << " / " << global.pick.pop .attempt << ")\n"; 509 os << "TryPop Pick % : " << mpop_sur << "(" << global.pick.pop .success << " / " << global.pick.pop .mask_attempt << ")\n"; 489 double rpop_sur = (100.0 * double(global.pick.pop .success) / global.pick.pop .mask_reset); 490 491 double push_len = double(global.pick.push.attempt ) / global.pick.push.success; 492 double pop_len = double(global.pick.pop .attempt ) / global.pick.pop .success; 493 double mpop_len = double(global.pick.pop .mask_attempt) / global.pick.pop .success; 494 double rpop_len = double(global.pick.pop .mask_reset ) / global.pick.pop .success; 495 496 os << "Push Pick : " << push_sur << " %, len " << push_len << " (" << global.pick.push.attempt << " / " << global.pick.push.success << ")\n"; 497 os << "Pop Pick : " << pop_sur << " %, len " << pop_len << " (" << global.pick.pop .attempt << " / " << global.pick.pop .success << ")\n"; 498 os << "TryPop Pick : " << mpop_sur << " %, len " << mpop_len << " (" << global.pick.pop .mask_attempt << " / " << global.pick.pop .success << ")\n"; 499 os << "Pop M Reset : " << rpop_sur << " %, len " << rpop_len << " (" << global.pick.pop .mask_reset << " / " << global.pick.pop .success << ")\n"; 510 500 511 501 double avgQ_push = double(global.qstat.push.value) / global.qstat.push.count; … … 515 505 os << "Pop Avg Qs : " << avgQ_pop << " (" << global.qstat.pop .count << "ops)\n"; 516 506 os << "Global Avg Qs : " << avgQ << " (" << (global.qstat.push.count + global.qstat.pop .count) << "ops)\n"; 507 508 os << "Local Push : " << global.pick.push.local << "\n"; 509 os << "Local Pop : " << global.pick.pop .local << "\n"; 517 510 } 518 511 #endif -
doc/theses/thierry_delisle_PhD/code/utils.hpp
r7f9968ad r8b58bae 106 106 } 107 107 108 static inline unsigned rand_bit(unsigned rnum, size_t mask) __attribute__((artificial)); 108 109 static inline unsigned rand_bit(unsigned rnum, size_t mask) { 109 110 unsigned bit = mask ? rnum % __builtin_popcountl(mask) : 0; … … 143 144 #endif 144 145 } 146 147 struct spinlock_t { 148 std::atomic_bool ll = { false }; 149 150 inline void lock() { 151 while( __builtin_expect(ll.exchange(true),false) ) { 152 while(ll.load(std::memory_order_relaxed)) 153 asm volatile("pause"); 154 } 155 } 156 157 inline bool try_lock() { 158 return false == ll.exchange(true); 159 } 160 161 inline void unlock() { 162 ll.store(false, std::memory_order_release); 163 } 164 165 inline explicit operator bool() { 166 return ll.load(std::memory_order_relaxed); 167 } 168 }; 169 170 static inline bool bts(std::atomic_size_t & target, size_t bit ) { 171 //* 172 int result = 0; 173 asm volatile( 174 "LOCK btsq %[bit], %[target]\n\t" 175 :"=@ccc" (result) 176 : [target] "m" (target), [bit] "r" (bit) 177 ); 178 return result != 0; 179 /*/ 180 size_t mask = 1ul << bit; 181 size_t ret = target.fetch_or(mask, std::memory_order_relaxed); 182 return (ret & mask) != 0; 183 //*/ 184 } 185 186 static inline bool btr(std::atomic_size_t & target, size_t bit ) { 187 //* 188 int result = 0; 189 asm volatile( 190 "LOCK btrq %[bit], %[target]\n\t" 191 :"=@ccc" (result) 192 : [target] "m" (target), [bit] "r" (bit) 193 ); 194 return result != 0; 195 /*/ 196 size_t mask = 1ul << bit; 197 size_t ret = target.fetch_and(~mask, std::memory_order_relaxed); 198 return (ret & mask) != 0; 199 //*/ 200 } -
libcfa/src/Makefile.am
r7f9968ad r8b58bae 50 50 thread_headers_nosrc = concurrency/invoke.h 51 51 thread_headers = concurrency/coroutine.hfa concurrency/thread.hfa concurrency/kernel.hfa concurrency/monitor.hfa concurrency/mutex.hfa 52 thread_libsrc = concurrency/CtxSwitch-@ARCHITECTURE@.S concurrency/alarm.cfa concurrency/invoke.c concurrency/io.cfa concurrency/preemption.cfa ${thread_headers:.hfa=.cfa}52 thread_libsrc = concurrency/CtxSwitch-@ARCHITECTURE@.S concurrency/alarm.cfa concurrency/invoke.c concurrency/io.cfa concurrency/preemption.cfa concurrency/ready_queue.cfa concurrency/stats.cfa ${thread_headers:.hfa=.cfa} 53 53 else 54 54 headers = -
libcfa/src/Makefile.in
r7f9968ad r8b58bae 166 166 concurrency/CtxSwitch-@ARCHITECTURE@.S concurrency/alarm.cfa \ 167 167 concurrency/invoke.c concurrency/io.cfa \ 168 concurrency/preemption.cfa concurrency/coroutine.cfa \ 168 concurrency/preemption.cfa concurrency/ready_queue.cfa \ 169 concurrency/stats.cfa concurrency/coroutine.cfa \ 169 170 concurrency/thread.cfa concurrency/kernel.cfa \ 170 171 concurrency/monitor.cfa concurrency/mutex.cfa … … 176 177 @BUILDLIB_TRUE@ concurrency/alarm.lo concurrency/invoke.lo \ 177 178 @BUILDLIB_TRUE@ concurrency/io.lo concurrency/preemption.lo \ 179 @BUILDLIB_TRUE@ concurrency/ready_queue.lo concurrency/stats.lo \ 178 180 @BUILDLIB_TRUE@ $(am__objects_3) 179 181 am_libcfathread_la_OBJECTS = $(am__objects_4) … … 482 484 @BUILDLIB_FALSE@thread_headers = 483 485 @BUILDLIB_TRUE@thread_headers = concurrency/coroutine.hfa concurrency/thread.hfa concurrency/kernel.hfa concurrency/monitor.hfa concurrency/mutex.hfa 484 @BUILDLIB_TRUE@thread_libsrc = concurrency/CtxSwitch-@ARCHITECTURE@.S concurrency/alarm.cfa concurrency/invoke.c concurrency/io.cfa concurrency/preemption.cfa ${thread_headers:.hfa=.cfa}486 @BUILDLIB_TRUE@thread_libsrc = concurrency/CtxSwitch-@ARCHITECTURE@.S concurrency/alarm.cfa concurrency/invoke.c concurrency/io.cfa concurrency/preemption.cfa concurrency/ready_queue.cfa concurrency/stats.cfa ${thread_headers:.hfa=.cfa} 485 487 486 488 #---------------------------------------------------------------------------------------------------------------- … … 620 622 concurrency/$(DEPDIR)/$(am__dirstamp) 621 623 concurrency/preemption.lo: concurrency/$(am__dirstamp) \ 624 concurrency/$(DEPDIR)/$(am__dirstamp) 625 concurrency/ready_queue.lo: concurrency/$(am__dirstamp) \ 626 concurrency/$(DEPDIR)/$(am__dirstamp) 627 concurrency/stats.lo: concurrency/$(am__dirstamp) \ 622 628 concurrency/$(DEPDIR)/$(am__dirstamp) 623 629 concurrency/coroutine.lo: concurrency/$(am__dirstamp) \ -
libcfa/src/bits/debug.hfa
r7f9968ad r8b58bae 52 52 || defined(__CFA_DEBUG_PRINT_IO__) || defined(__CFA_DEBUG_PRINT_IO_CORE__) \ 53 53 || defined(__CFA_DEBUG_PRINT_MONITOR__) || defined(__CFA_DEBUG_PRINT_PREEMPTION__) \ 54 || defined(__CFA_DEBUG_PRINT_RUNTIME_CORE__) || defined(__CFA_DEBUG_PRINT_EXCEPTION__) 54 || defined(__CFA_DEBUG_PRINT_RUNTIME_CORE__) || defined(__CFA_DEBUG_PRINT_EXCEPTION__) \ 55 || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) 55 56 #include <stdio.h> 56 57 #include <unistd.h> -
libcfa/src/bits/defs.hfa
r7f9968ad r8b58bae 54 54 return ( (unsigned long long)lo)|( ((unsigned long long)hi)<<32 ); 55 55 } 56 57 // #define __CFA_NO_BIT_TEST_AND_SET__ 58 59 #if defined( __i386 ) 60 static inline bool __atomic_bts(volatile unsigned long int * target, unsigned long int bit ) { 61 #if defined(__CFA_NO_BIT_TEST_AND_SET__) 62 unsigned long int mask = 1ul << bit; 63 unsigned long int ret = __atomic_fetch_or(target, mask, (int)__ATOMIC_RELAXED); 64 return (ret & mask) != 0; 65 #else 66 int result = 0; 67 asm volatile( 68 "LOCK btsl %[bit], %[target]\n\t" 69 : "=@ccc" (result) 70 : [target] "m" (*target), [bit] "r" (bit) 71 ); 72 return result != 0; 73 #endif 74 } 75 76 static inline bool __atomic_btr(volatile unsigned long int * target, unsigned long int bit ) { 77 #if defined(__CFA_NO_BIT_TEST_AND_SET__) 78 unsigned long int mask = 1ul << bit; 79 unsigned long int ret = __atomic_fetch_and(target, ~mask, (int)__ATOMIC_RELAXED); 80 return (ret & mask) != 0; 81 #else 82 int result = 0; 83 asm volatile( 84 "LOCK btrl %[bit], %[target]\n\t" 85 :"=@ccc" (result) 86 : [target] "m" (*target), [bit] "r" (bit) 87 ); 88 return result != 0; 89 #endif 90 } 91 #elif defined( __x86_64 ) 92 static inline bool __atomic_bts(volatile unsigned long long int * target, unsigned long long int bit ) { 93 #if defined(__CFA_NO_BIT_TEST_AND_SET__) 94 unsigned long long int mask = 1ul << bit; 95 unsigned long long int ret = __atomic_fetch_or(target, mask, (int)__ATOMIC_RELAXED); 96 return (ret & mask) != 0; 97 #else 98 int result = 0; 99 asm volatile( 100 "LOCK btsq %[bit], %[target]\n\t" 101 : "=@ccc" (result) 102 : [target] "m" (*target), [bit] "r" (bit) 103 ); 104 return result != 0; 105 #endif 106 } 107 108 static inline bool __atomic_btr(volatile unsigned long long int * target, unsigned long long int bit ) { 109 #if defined(__CFA_NO_BIT_TEST_AND_SET__) 110 unsigned long long int mask = 1ul << bit; 111 unsigned long long int ret = __atomic_fetch_and(target, ~mask, (int)__ATOMIC_RELAXED); 112 return (ret & mask) != 0; 113 #else 114 int result = 0; 115 asm volatile( 116 "LOCK btrq %[bit], %[target]\n\t" 117 :"=@ccc" (result) 118 : [target] "m" (*target), [bit] "r" (bit) 119 ); 120 return result != 0; 121 #endif 122 } 123 #elif defined( __ARM_ARCH ) 124 #error __atomic_bts and __atomic_btr not implemented for arm 125 #else 126 #error uknown hardware architecture 127 #endif -
libcfa/src/concurrency/invoke.h
r7f9968ad r8b58bae 48 48 extern __attribute__((aligned(128))) thread_local struct KernelThreadData { 49 49 struct $thread * volatile this_thread; 50 struct processor * volatile this_processor; 50 struct processor * volatile this_processor; 51 struct __stats_t * volatile this_stats; 51 52 52 53 struct { … … 56 57 } preemption_state; 57 58 58 uint32_t rand_seed;59 __uint128_t rand_seed; 59 60 } kernelTLS __attribute__ ((tls_model ( "initial-exec" ))); 60 61 } … … 92 93 }; 93 94 94 enum coroutine_state { Halted, Start, Primed, Blocked, Ready, Active, Rerun};95 enum __Coroutine_State { Halted, Start, Primed, Blocked, Ready, Active }; 95 96 enum __Preemption_Reason { __NO_PREEMPTION, __ALARM_PREEMPTION, __POLL_PREEMPTION, __MANUAL_PREEMPTION }; 96 97 … … 106 107 107 108 // current execution status for coroutine 108 enum coroutine_state state;109 enum __Coroutine_State state; 109 110 110 111 // first coroutine to resume this one … … 161 162 }; 162 163 164 // Link lists fields 165 // instrusive link field for threads 166 struct __thread_desc_link { 167 struct $thread * next; 168 struct $thread * prev; 169 volatile unsigned long long ts; 170 int preferred; 171 }; 172 163 173 struct $thread { 164 174 // Core threading fields … … 167 177 168 178 // current execution status for coroutine 169 volatile int state; 170 enum __Preemption_Reason preempted; 179 volatile int ticket; 180 enum __Coroutine_State state:8; 181 enum __Preemption_Reason preempted:8; 171 182 172 183 //SKULLDUGGERY errno is not save in the thread data structure because returnToKernel appears to be the only function to require saving and restoring it 184 185 // pointer to the cluster on which the thread is running 186 struct cluster * curr_cluster; 187 188 // Link lists fields 189 // instrusive link field for threads 190 struct __thread_desc_link link; 173 191 174 192 // coroutine body used to store context … … 184 202 struct $monitor * self_mon_p; 185 203 186 // pointer to the cluster on which the thread is running187 struct cluster * curr_cluster;188 189 204 // monitors currently held by this thread 190 205 struct __monitor_group_t monitors; 191 192 // Link lists fields193 // instrusive link field for threads194 struct $thread * next;195 206 196 207 struct { … … 202 213 // previous function to park/unpark the thread 203 214 const char * park_caller; 204 enum coroutine_state park_result; 215 int park_result; 216 enum __Coroutine_State park_state; 205 217 bool park_stale; 206 218 const char * unpark_caller; 207 enum coroutine_state unpark_result; 219 int unpark_result; 220 enum __Coroutine_State unpark_state; 208 221 bool unpark_stale; 209 222 #endif … … 218 231 #ifdef __cforall 219 232 extern "Cforall" { 233 220 234 static inline $thread *& get_next( $thread & this ) __attribute__((const)) { 221 return this. next;235 return this.link.next; 222 236 } 223 237 -
libcfa/src/concurrency/io.cfa
r7f9968ad r8b58bae 135 135 void * ring_ptr; 136 136 size_t ring_sz; 137 138 // Statistics139 #if !defined(__CFA_NO_STATISTICS__)140 struct {141 struct {142 volatile unsigned long long int rdy;143 volatile unsigned long long int csm;144 volatile unsigned long long int avl;145 volatile unsigned long long int cnt;146 } submit_avg;147 struct {148 volatile unsigned long long int val;149 volatile unsigned long long int cnt;150 volatile unsigned long long int block;151 } look_avg;152 struct {153 volatile unsigned long long int val;154 volatile unsigned long long int cnt;155 volatile unsigned long long int block;156 } alloc_avg;157 } stats;158 #endif159 137 }; 160 138 … … 177 155 void * ring_ptr; 178 156 size_t ring_sz; 179 180 // Statistics181 #if !defined(__CFA_NO_STATISTICS__)182 struct {183 struct {184 unsigned long long int val;185 unsigned long long int slow_cnt;186 unsigned long long int fast_cnt;187 } completed_avg;188 } stats;189 #endif190 157 }; 191 158 … … 200 167 struct { 201 168 struct { 169 __processor_id_t id; 202 170 void * stack; 203 171 pthread_t kthrd; … … 331 299 (this.io->submit){ min(*sq.num, *cq.num) }; 332 300 333 // Initialize statistics334 #if !defined(__CFA_NO_STATISTICS__)335 this.io->submit_q.stats.submit_avg.rdy = 0;336 this.io->submit_q.stats.submit_avg.csm = 0;337 this.io->submit_q.stats.submit_avg.avl = 0;338 this.io->submit_q.stats.submit_avg.cnt = 0;339 this.io->submit_q.stats.look_avg.val = 0;340 this.io->submit_q.stats.look_avg.cnt = 0;341 this.io->submit_q.stats.look_avg.block = 0;342 this.io->submit_q.stats.alloc_avg.val = 0;343 this.io->submit_q.stats.alloc_avg.cnt = 0;344 this.io->submit_q.stats.alloc_avg.block = 0;345 this.io->completion_q.stats.completed_avg.val = 0;346 this.io->completion_q.stats.completed_avg.slow_cnt = 0;347 this.io->completion_q.stats.completed_avg.fast_cnt = 0;348 #endif349 350 301 if(!main_cluster) { 351 302 __kernel_io_finish_start( this ); … … 384 335 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 385 336 with( this.io->poller.fast ) { 386 /* paranoid */ verify( this. procs.head == 0p|| &this == mainCluster );387 /* paranoid */ verify( this.idles.head == 0p || &this == mainCluster);337 /* paranoid */ verify( this.nprocessors == 0 || &this == mainCluster ); 338 /* paranoid */ verify( !ready_mutate_islocked() ); 388 339 389 340 // We need to adjust the clean-up based on where the thread is 390 341 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 391 342 392 // This is the tricky case 393 // The thread was preempted and now it is on the ready queue 394 /* paranoid */ verify( thrd.next == 1p ); // The thread should be the last on the list 395 /* paranoid */ verify( this.ready_queue.head == &thrd ); // The thread should be the only thing on the list 396 397 // Remove the thread from the ready queue of this cluster 398 this.ready_queue.head = 1p; 399 thrd.next = 0p; 400 __cfaabi_dbg_debug_do( thrd.unpark_stale = true ); 401 402 // Fixup the thread state 403 thrd.state = Blocked; 404 thrd.preempted = __NO_PREEMPTION; 343 ready_schedule_lock( (struct __processor_id_t *)active_processor() ); 344 345 // This is the tricky case 346 // The thread was preempted and now it is on the ready queue 347 // The thread should be the last on the list 348 /* paranoid */ verify( thrd.link.next != 0p ); 349 350 // Remove the thread from the ready queue of this cluster 351 __attribute__((unused)) bool removed = remove_head( &this, &thrd ); 352 /* paranoid */ verify( removed ); 353 thrd.link.next = 0p; 354 thrd.link.prev = 0p; 355 __cfaabi_dbg_debug_do( thrd.unpark_stale = true ); 356 357 // Fixup the thread state 358 thrd.state = Blocked; 359 thrd.ticket = 0; 360 thrd.preempted = __NO_PREEMPTION; 361 362 ready_schedule_unlock( (struct __processor_id_t *)active_processor() ); 405 363 406 364 // Pretend like the thread was blocked all along … … 414 372 thrd.curr_cluster = active_cluster(); 415 373 416 // unpark the fast io_poller374 // unpark the fast io_poller 417 375 unpark( &thrd __cfaabi_dbg_ctx2 ); 418 376 } … … 436 394 __kernel_io_prepare_stop( this ); 437 395 } 438 439 // print statistics440 #if !defined(__CFA_NO_STATISTICS__)441 if(this.print_stats) {442 with(this.io->submit_q.stats, this.io->completion_q.stats) {443 double avgrdy = ((double)submit_avg.rdy) / submit_avg.cnt;444 double avgcsm = ((double)submit_avg.csm) / submit_avg.cnt;445 double avgavl = ((double)submit_avg.avl) / submit_avg.cnt;446 447 double lavgv = 0;448 double lavgb = 0;449 if(look_avg.cnt != 0) {450 lavgv = ((double)look_avg.val ) / look_avg.cnt;451 lavgb = ((double)look_avg.block) / look_avg.cnt;452 }453 454 double aavgv = 0;455 double aavgb = 0;456 if(alloc_avg.cnt != 0) {457 aavgv = ((double)alloc_avg.val ) / alloc_avg.cnt;458 aavgb = ((double)alloc_avg.block) / alloc_avg.cnt;459 }460 461 __cfaabi_bits_print_safe( STDOUT_FILENO,462 "----- I/O uRing Stats -----\n"463 "- total submit calls : %'15llu\n"464 "- avg ready entries : %'18.2lf\n"465 "- avg submitted entries : %'18.2lf\n"466 "- avg available entries : %'18.2lf\n"467 "- total ready search : %'15llu\n"468 "- avg ready search len : %'18.2lf\n"469 "- avg ready search block : %'18.2lf\n"470 "- total alloc search : %'15llu\n"471 "- avg alloc search len : %'18.2lf\n"472 "- avg alloc search block : %'18.2lf\n"473 "- total wait calls : %'15llu (%'llu slow, %'llu fast)\n"474 "- avg completion/wait : %'18.2lf\n",475 submit_avg.cnt,476 avgrdy,477 avgcsm,478 avgavl,479 look_avg.cnt,480 lavgv,481 lavgb,482 alloc_avg.cnt,483 aavgv,484 aavgb,485 completed_avg.slow_cnt + completed_avg.fast_cnt,486 completed_avg.slow_cnt, completed_avg.fast_cnt,487 ((double)completed_avg.val) / (completed_avg.slow_cnt + completed_avg.fast_cnt)488 );489 }490 }491 #endif492 396 493 397 // Shutdown the io rings … … 561 465 } 562 466 563 verify( (shead + ret) == *ring.submit_q.head );564 565 467 // Release the consumed SQEs 566 468 for( i; ret ) { … … 577 479 // update statistics 578 480 #if !defined(__CFA_NO_STATISTICS__) 579 ring.submit_q.stats.submit_avg.rdy += to_submit;580 ring.submit_q.stats.submit_avg.csm += ret;581 ring.submit_q.stats.submit_avg.avl += avail;582 ring.submit_q.stats.submit_avg.cnt += 1;481 __tls_stats()->io.submit_q.submit_avg.rdy += to_submit; 482 __tls_stats()->io.submit_q.submit_avg.csm += ret; 483 __tls_stats()->io.submit_q.submit_avg.avl += avail; 484 __tls_stats()->io.submit_q.submit_avg.cnt += 1; 583 485 #endif 584 486 … … 608 510 data->result = cqe.res; 609 511 if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); } 610 else { __unpark( data->thrd __cfaabi_dbg_ctx2 ); }512 else { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); } 611 513 } 612 514 … … 623 525 624 526 static void * __io_poller_slow( void * arg ) { 527 #if !defined( __CFA_NO_STATISTICS__ ) 528 __stats_t local_stats; 529 __init_stats( &local_stats ); 530 kernelTLS.this_stats = &local_stats; 531 #endif 532 625 533 cluster * cltr = (cluster *)arg; 626 534 struct __io_data & ring = *cltr->io; 535 536 ring.poller.slow.id.id = doregister( &ring.poller.slow.id ); 627 537 628 538 sigset_t mask; … … 654 564 // Update statistics 655 565 #if !defined(__CFA_NO_STATISTICS__) 656 ring.completion_q.stats.completed_avg.val += count;657 ring.completion_q.stats.completed_avg.slow_cnt += 1;566 __tls_stats()->io.complete_q.completed_avg.val += count; 567 __tls_stats()->io.complete_q.completed_avg.slow_cnt += 1; 658 568 #endif 659 569 660 570 if(again) { 661 571 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring); 662 __unpark( &ring.poller. fast.thrd __cfaabi_dbg_ctx2 );572 __unpark( &ring.poller.slow.id, &ring.poller.fast.thrd __cfaabi_dbg_ctx2 ); 663 573 wait( ring.poller.sem ); 664 574 } … … 674 584 // Update statistics 675 585 #if !defined(__CFA_NO_STATISTICS__) 676 ring.completion_q.stats.completed_avg.val += count;677 ring.completion_q.stats.completed_avg.slow_cnt += 1;586 __tls_stats()->io.complete_q.completed_avg.val += count; 587 __tls_stats()->io.complete_q.completed_avg.slow_cnt += 1; 678 588 #endif 679 589 } … … 681 591 682 592 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring); 593 594 unregister( &ring.poller.slow.id ); 683 595 684 596 return 0p; … … 701 613 int count; 702 614 bool again; 703 [count, again] = __drain_io( *this.ring, 0p, 0, false ); 704 705 if(!again) reset++; 706 707 // Update statistics 708 #if !defined(__CFA_NO_STATISTICS__) 709 this.ring->completion_q.stats.completed_avg.val += count; 710 this.ring->completion_q.stats.completed_avg.fast_cnt += 1; 711 #endif 615 disable_interrupts(); 616 [count, again] = __drain_io( *this.ring, 0p, 0, false ); 617 618 if(!again) reset++; 619 620 // Update statistics 621 #if !defined(__CFA_NO_STATISTICS__) 622 __tls_stats()->io.complete_q.completed_avg.val += count; 623 __tls_stats()->io.complete_q.completed_avg.fast_cnt += 1; 624 #endif 625 enable_interrupts( __cfaabi_dbg_ctx ); 712 626 713 627 // If we got something, just yield and check again … … 770 684 verify( data != 0 ); 771 685 686 772 687 // Prepare the data we need 773 688 __attribute((unused)) int len = 0; … … 775 690 uint32_t cnt = *ring.submit_q.num; 776 691 uint32_t mask = *ring.submit_q.mask; 777 uint32_t off = __tls_rand(); 692 693 disable_interrupts(); 694 uint32_t off = __tls_rand(); 695 enable_interrupts( __cfaabi_dbg_ctx ); 778 696 779 697 // Loop around looking for an available spot 780 LOOKING:for() {698 for() { 781 699 // Look through the list starting at some offset 782 700 for(i; cnt) { … … 791 709 // update statistics 792 710 #if !defined(__CFA_NO_STATISTICS__) 793 __atomic_fetch_add( &ring.submit_q.stats.alloc_avg.val, len, __ATOMIC_RELAXED ); 794 __atomic_fetch_add( &ring.submit_q.stats.alloc_avg.block, block, __ATOMIC_RELAXED ); 795 __atomic_fetch_add( &ring.submit_q.stats.alloc_avg.cnt, 1, __ATOMIC_RELAXED ); 711 disable_interrupts(); 712 __tls_stats()->io.submit_q.alloc_avg.val += len; 713 __tls_stats()->io.submit_q.alloc_avg.block += block; 714 __tls_stats()->io.submit_q.alloc_avg.cnt += 1; 715 enable_interrupts( __cfaabi_dbg_ctx ); 796 716 #endif 717 797 718 798 719 // Success return the data … … 813 734 uint32_t * const tail = ring.submit_q.tail; 814 735 const uint32_t mask = *ring.submit_q.mask; 736 737 disable_interrupts(); 815 738 816 739 // There are 2 submission schemes, check which one we are using … … 846 769 // update statistics 847 770 #if !defined(__CFA_NO_STATISTICS__) 848 __ atomic_fetch_add( &ring.submit_q.stats.look_avg.val, len, __ATOMIC_RELAXED );849 __ atomic_fetch_add( &ring.submit_q.stats.look_avg.block, block, __ATOMIC_RELAXED );850 __ atomic_fetch_add( &ring.submit_q.stats.look_avg.cnt, 1, __ATOMIC_RELAXED );771 __tls_stats()->io.submit_q.look_avg.val += len; 772 __tls_stats()->io.submit_q.look_avg.block += block; 773 __tls_stats()->io.submit_q.look_avg.cnt += 1; 851 774 #endif 852 775 … … 875 798 // update statistics 876 799 #if !defined(__CFA_NO_STATISTICS__) 877 ring.submit_q.stats.submit_avg.csm += 1;878 ring.submit_q.stats.submit_avg.cnt += 1;800 __tls_stats()->io.submit_q.submit_avg.csm += 1; 801 __tls_stats()->io.submit_q.submit_avg.cnt += 1; 879 802 #endif 880 803 804 ring.submit_q.sqes[ idx & mask ].user_data = 0; 805 881 806 unlock(ring.submit_q.lock); 882 807 883 808 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret ); 884 809 } 810 811 enable_interrupts( __cfaabi_dbg_ctx ); 885 812 } 886 813 -
libcfa/src/concurrency/kernel.cfa
r7f9968ad r8b58bae 118 118 // Kernel Scheduling logic 119 119 static $thread * __next_thread(cluster * this); 120 static bool __has_next_thread(cluster * this); 120 121 static void __run_thread(processor * this, $thread * dst); 121 static $thread * __halt(processor * this);122 static bool __wake_one(cluster * cltr, bool was_empty);123 122 static bool __wake_proc(processor *); 123 static bool __wake_one(struct __processor_id_t * id, cluster * cltr); 124 static void __halt(processor * this); 124 125 125 126 //----------------------------------------------------------------------------- 126 127 // Kernel storage 127 KERNEL_STORAGE(cluster, mainCluster); 128 KERNEL_STORAGE(processor, mainProcessor); 129 KERNEL_STORAGE($thread, mainThread); 130 KERNEL_STORAGE(__stack_t, mainThreadCtx); 131 132 cluster * mainCluster; 133 processor * mainProcessor; 134 $thread * mainThread; 128 KERNEL_STORAGE(cluster, mainCluster); 129 KERNEL_STORAGE(processor, mainProcessor); 130 KERNEL_STORAGE($thread, mainThread); 131 KERNEL_STORAGE(__stack_t, mainThreadCtx); 132 KERNEL_STORAGE(__scheduler_RWLock_t, __scheduler_lock); 133 #if !defined(__CFA_NO_STATISTICS__) 134 KERNEL_STORAGE(__stats_t, mainProcStats); 135 #endif 136 137 cluster * mainCluster; 138 processor * mainProcessor; 139 $thread * mainThread; 140 __scheduler_RWLock_t * __scheduler_lock; 135 141 136 142 extern "C" { … … 144 150 thread_local struct KernelThreadData kernelTLS __attribute__ ((tls_model ( "initial-exec" ))) = { 145 151 NULL, // cannot use 0p 152 NULL, 146 153 NULL, 147 154 { 1, false, false }, … … 190 197 191 198 void ?{}( $thread & this, current_stack_info_t * info) with( this ) { 199 ticket = 1; 192 200 state = Start; 193 201 self_cor{ info }; … … 197 205 self_mon.recursion = 1; 198 206 self_mon_p = &self_mon; 199 next = 0p; 207 link.next = 0p; 208 link.prev = 0p; 200 209 201 210 node.next = 0p; … … 220 229 static void * __invoke_processor(void * arg); 221 230 222 void ?{}(processor & this, const char name[], cluster & cltr) with( this ) {231 void ?{}(processor & this, const char name[], cluster & _cltr) with( this ) { 223 232 this.name = name; 224 this.cltr = &cltr; 233 this.cltr = &_cltr; 234 id = -1u; 225 235 terminated{ 0 }; 226 236 destroyer = 0p; … … 235 245 236 246 this.stack = __create_pthread( &this.kernel_thread, __invoke_processor, (void *)&this ); 247 __atomic_fetch_add( &cltr->nprocessors, 1u, __ATOMIC_SEQ_CST ); 237 248 238 249 __cfadbg_print_safe(runtime_core, "Kernel : core %p created\n", &this); … … 254 265 255 266 free( this.stack ); 267 268 __atomic_fetch_sub( &cltr->nprocessors, 1u, __ATOMIC_SEQ_CST ); 256 269 } 257 270 … … 259 272 this.name = name; 260 273 this.preemption_rate = preemption_rate; 274 this.nprocessors = 0; 261 275 ready_queue{}; 262 ready_queue_lock{};263 276 264 277 #if !defined(__CFA_NO_STATISTICS__) 265 278 print_stats = false; 279 stats = alloc(); 280 __init_stats( stats ); 266 281 #endif 267 282 268 procs{ __get };269 idles{ __get };270 283 threads{ __get }; 271 284 … … 277 290 void ^?{}(cluster & this) { 278 291 __kernel_io_shutdown( this, &this == mainCluster ); 292 293 #if !defined(__CFA_NO_STATISTICS__) 294 if(this.print_stats) { 295 __print_stats( this.stats ); 296 } 297 free( this.stats ); 298 #endif 279 299 280 300 unregister(this); … … 295 315 __cfadbg_print_safe(runtime_core, "Kernel : core %p starting\n", this); 296 316 297 doregister(this->cltr, this); 317 // register the processor unless it's the main thread which is handled in the boot sequence 318 if(this != mainProcessor) { 319 this->id = doregister((__processor_id_t*)this); 320 // Lock the RWlock so no-one pushes/pops while we are changing the queue 321 uint_fast32_t last_size = ready_mutate_lock(); 322 323 // Adjust the ready queue size 324 ready_queue_grow( this->cltr ); 325 326 // Unlock the RWlock 327 ready_mutate_unlock( last_size ); 328 } 298 329 299 330 { … … 308 339 readyThread = __next_thread( this->cltr ); 309 340 310 // If no ready thread311 if( readyThread == 0p ) {312 // Block until a thread is ready313 readyThread = __halt(this);314 }315 316 341 // Check if we actually found a thread 317 342 if( readyThread ) { 318 343 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 319 344 /* paranoid */ verifyf( readyThread->state == Ready || readyThread->preempted != __NO_PREEMPTION, "state : %d, preempted %d\n", readyThread->state, readyThread->preempted); 320 /* paranoid */ verifyf( readyThread->next == 0p, "Expected null got %p", readyThread->next ); 345 /* paranoid */ verifyf( readyThread->link.next == 0p, "Expected null got %p", readyThread->link.next ); 346 __builtin_prefetch( readyThread->context.SP ); 321 347 322 348 // We found a thread run it … … 325 351 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 326 352 } 353 else { 354 // Block until a thread is ready 355 __halt(this); 356 } 327 357 } 328 358 … … 330 360 } 331 361 332 unregister(this->cltr, this);333 334 362 V( this->terminated ); 335 363 364 // unregister the processor unless it's the main thread which is handled in the boot sequence 365 if(this != mainProcessor) { 366 // Lock the RWlock so no-one pushes/pops while we are changing the queue 367 uint_fast32_t last_size = ready_mutate_lock(); 368 369 // Adjust the ready queue size 370 ready_queue_shrink( this->cltr ); 371 372 // Make sure we aren't on the idle queue 373 #if !defined(__CFA_NO_STATISTICS__) 374 bool removed = 375 #endif 376 unsafe_remove( this->cltr->idles, this ); 377 378 #if !defined(__CFA_NO_STATISTICS__) 379 if(removed) __tls_stats()->ready.sleep.exits++; 380 #endif 381 382 // Unlock the RWlock 383 ready_mutate_unlock( last_size ); 384 385 // Finally we don't need the read_lock any more 386 unregister((__processor_id_t*)this); 387 } 388 else { 389 // HACK : the coroutine context switch expects this_thread to be set 390 // and it make sense for it to be set in all other cases except here 391 // fake it 392 kernelTLS.this_thread = mainThread; 393 } 394 336 395 __cfadbg_print_safe(runtime_core, "Kernel : core %p terminated\n", this); 337 338 // HACK : the coroutine context switch expects this_thread to be set339 // and it make sense for it to be set in all other cases except here340 // fake it341 if( this == mainProcessor ) kernelTLS.this_thread = mainThread;342 396 } 343 397 … … 360 414 // Actually run the thread 361 415 RUNNING: while(true) { 362 if(unlikely(thrd_dst->preempted)) { 363 thrd_dst->preempted = __NO_PREEMPTION; 364 verify(thrd_dst->state == Active || thrd_dst->state == Rerun); 365 } else { 366 verify(thrd_dst->state == Blocked || thrd_dst->state == Ready); // Ready means scheduled normally, blocked means rerun 367 thrd_dst->state = Active; 368 } 416 thrd_dst->preempted = __NO_PREEMPTION; 417 thrd_dst->state = Active; 369 418 370 419 __cfaabi_dbg_debug_do( … … 398 447 if(unlikely(thrd_dst->preempted != __NO_PREEMPTION)) { 399 448 // The thread was preempted, reschedule it and reset the flag 400 __schedule_thread( thrd_dst );449 __schedule_thread( (__processor_id_t*)this, thrd_dst ); 401 450 break RUNNING; 402 451 } 403 452 453 if(unlikely(thrd_dst->state == Halted)) { 454 // The thread has halted, it should never be scheduled/run again 455 // We may need to wake someone up here since 456 unpark( this->destroyer __cfaabi_dbg_ctx2 ); 457 this->destroyer = 0p; 458 break RUNNING; 459 } 460 461 /* paranoid */ verify( thrd_dst->state == Active ); 462 thrd_dst->state = Blocked; 463 404 464 // set state of processor coroutine to active and the thread to inactive 405 static_assert(sizeof(thrd_dst->state) == sizeof(int)); 406 enum coroutine_state old_state = __atomic_exchange_n(&thrd_dst->state, Blocked, __ATOMIC_SEQ_CST); 407 __cfaabi_dbg_debug_do( thrd_dst->park_result = old_state; ) 408 switch(old_state) { 409 case Halted: 410 // The thread has halted, it should never be scheduled/run again, leave it back to Halted and move on 411 thrd_dst->state = Halted; 412 413 // We may need to wake someone up here since 414 unpark( this->destroyer __cfaabi_dbg_ctx2 ); 415 this->destroyer = 0p; 416 break RUNNING; 417 case Active: 465 int old_ticket = __atomic_fetch_sub(&thrd_dst->ticket, 1, __ATOMIC_SEQ_CST); 466 __cfaabi_dbg_debug_do( thrd_dst->park_result = old_ticket; ) 467 switch(old_ticket) { 468 case 1: 418 469 // This is case 1, the regular case, nothing more is needed 419 470 break RUNNING; 420 case Rerun:471 case 2: 421 472 // This is case 2, the racy case, someone tried to run this thread before it finished blocking 422 473 // In this case, just run it again. … … 424 475 default: 425 476 // This makes no sense, something is wrong abort 426 abort( "Finished running a thread that was Blocked/Start/Primed %d\n", old_state);477 abort(); 427 478 } 428 479 } … … 438 489 $coroutine * proc_cor = get_coroutine(kernelTLS.this_processor->runner); 439 490 $thread * thrd_src = kernelTLS.this_thread; 491 492 #if !defined(__CFA_NO_STATISTICS__) 493 struct processor * last_proc = kernelTLS.this_processor; 494 #endif 440 495 441 496 // Run the thread on this processor … … 453 508 } 454 509 510 #if !defined(__CFA_NO_STATISTICS__) 511 if(last_proc != kernelTLS.this_processor) { 512 __tls_stats()->ready.threads.migration++; 513 } 514 #endif 515 455 516 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 456 517 /* paranoid */ verifyf( ((uintptr_t)thrd_src->context.SP) < ((uintptr_t)__get_stack(thrd_src->curr_cor)->base ), "ERROR : Returning $thread %p has been corrupted.\n StackPointer too small.\n", thrd_src ); … … 463 524 // It effectively constructs a coroutine by stealing the pthread stack 464 525 static void * __invoke_processor(void * arg) { 526 #if !defined( __CFA_NO_STATISTICS__ ) 527 __stats_t local_stats; 528 __init_stats( &local_stats ); 529 kernelTLS.this_stats = &local_stats; 530 #endif 531 465 532 processor * proc = (processor *) arg; 466 533 kernelTLS.this_processor = proc; … … 494 561 __cfadbg_print_safe(runtime_core, "Kernel : core %p main ended (%p)\n", proc, &proc->runner); 495 562 563 #if !defined(__CFA_NO_STATISTICS__) 564 __tally_stats(proc->cltr->stats, &local_stats); 565 #endif 566 496 567 return 0p; 497 568 } … … 591 662 // Scheduler routines 592 663 // KERNEL ONLY 593 void __schedule_thread( $thread * thrd ) with( *thrd->curr_cluster ) { 664 void __schedule_thread( struct __processor_id_t * id, $thread * thrd ) { 665 /* paranoid */ verify( thrd ); 666 /* paranoid */ verify( thrd->state != Halted ); 594 667 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 595 668 /* paranoid */ #if defined( __CFA_WITH_VERIFY__ ) 596 /* paranoid */ if( thrd->state == Blocked || thrd->state == Start ) assertf( thrd->preempted == __NO_PREEMPTION,597 598 /* paranoid */ if( thrd->preempted != __NO_PREEMPTION ) assertf(thrd->state == Active || thrd->state == Rerun,599 669 /* paranoid */ if( thrd->state == Blocked || thrd->state == Start ) assertf( thrd->preempted == __NO_PREEMPTION, 670 "Error inactive thread marked as preempted, state %d, preemption %d\n", thrd->state, thrd->preempted ); 671 /* paranoid */ if( thrd->preempted != __NO_PREEMPTION ) assertf(thrd->state == Active, 672 "Error preempted thread marked as not currently running, state %d, preemption %d\n", thrd->state, thrd->preempted ); 600 673 /* paranoid */ #endif 601 /* paranoid */ verifyf( thrd-> next == 0p, "Expected null got %p", thrd->next );674 /* paranoid */ verifyf( thrd->link.next == 0p, "Expected null got %p", thrd->link.next ); 602 675 603 676 if (thrd->preempted == __NO_PREEMPTION) thrd->state = Ready; 604 677 605 lock ( ready_queue_lock __cfaabi_dbg_ctx2 ); 606 bool was_empty = !(ready_queue != 0); 607 append( ready_queue, thrd ); 608 unlock( ready_queue_lock ); 609 610 __wake_one(thrd->curr_cluster, was_empty); 678 ready_schedule_lock ( id ); 679 push( thrd->curr_cluster, thrd ); 680 681 #if !defined(__CFA_NO_STATISTICS__) 682 bool woke = 683 #endif 684 __wake_one(id, thrd->curr_cluster); 685 686 #if !defined(__CFA_NO_STATISTICS__) 687 if(woke) __tls_stats()->ready.sleep.wakes++; 688 #endif 689 ready_schedule_unlock( id ); 611 690 612 691 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); … … 617 696 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 618 697 619 lock( ready_queue_lock __cfaabi_dbg_ctx2);620 $thread * head = pop_head( ready_queue);621 unlock( ready_queue_lock);698 ready_schedule_lock ( (__processor_id_t*)kernelTLS.this_processor ); 699 $thread * head = pop( this ); 700 ready_schedule_unlock( (__processor_id_t*)kernelTLS.this_processor ); 622 701 623 702 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); … … 625 704 } 626 705 706 // KERNEL ONLY 707 static bool __has_next_thread(cluster * this) with( *this ) { 708 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 709 710 ready_schedule_lock ( (__processor_id_t*)kernelTLS.this_processor ); 711 bool not_empty = query( this ); 712 ready_schedule_unlock( (__processor_id_t*)kernelTLS.this_processor ); 713 714 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 715 return not_empty; 716 } 717 627 718 // KERNEL ONLY unpark with out disabling interrupts 628 void __unpark( $thread * thrd __cfaabi_dbg_ctx_param2 ) { 629 static_assert(sizeof(thrd->state) == sizeof(int)); 630 719 void __unpark( struct __processor_id_t * id, $thread * thrd __cfaabi_dbg_ctx_param2 ) { 631 720 // record activity 632 721 __cfaabi_dbg_debug_do( char * old_caller = thrd->unpark_caller; ) 633 722 __cfaabi_dbg_record_thrd( *thrd, false, caller ); 634 723 635 enum coroutine_state old_state = __atomic_exchange_n(&thrd->state, Rerun, __ATOMIC_SEQ_CST);636 __cfaabi_dbg_debug_do( thrd->unpark_result = old_ state; )637 switch(old_ state) {638 case Active:724 int old_ticket = __atomic_fetch_add(&thrd->ticket, 1, __ATOMIC_SEQ_CST); 725 __cfaabi_dbg_debug_do( thrd->unpark_result = old_ticket; thrd->unpark_state = thrd->state; ) 726 switch(old_ticket) { 727 case 1: 639 728 // Wake won the race, the thread will reschedule/rerun itself 640 729 break; 641 case Blocked:730 case 0: 642 731 /* paranoid */ verify( ! thrd->preempted != __NO_PREEMPTION ); 732 /* paranoid */ verify( thrd->state == Blocked ); 643 733 644 734 // Wake lost the race, 645 thrd->state = Blocked; 646 __schedule_thread( thrd ); 735 __schedule_thread( id, thrd ); 647 736 break; 648 case Rerun:649 abort("More than one thread attempted to schedule thread %p\n", thrd);650 break;651 case Halted:652 case Start:653 case Primed:654 737 default: 655 738 // This makes no sense, something is wrong abort … … 662 745 663 746 disable_interrupts(); 664 __unpark( thrd __cfaabi_dbg_ctx_fwd2 );747 __unpark( (__processor_id_t*)kernelTLS.this_processor, thrd __cfaabi_dbg_ctx_fwd2 ); 665 748 enable_interrupts( __cfaabi_dbg_ctx ); 666 749 } … … 697 780 698 781 $thread * thrd = kernelTLS.this_thread; 699 /* paranoid */ verify(thrd->state == Active || thrd->state == Rerun);782 /* paranoid */ verify(thrd->state == Active); 700 783 701 784 // SKULLDUGGERY: It is possible that we are preempting this thread just before … … 704 787 // If that is the case, abandon the preemption. 705 788 bool preempted = false; 706 if(thrd-> next == 0p) {789 if(thrd->link.next == 0p) { 707 790 preempted = true; 708 791 thrd->preempted = reason; … … 730 813 __cfa_dbg_global_clusters.list{ __get }; 731 814 __cfa_dbg_global_clusters.lock{}; 815 816 // Initialize the global scheduler lock 817 __scheduler_lock = (__scheduler_RWLock_t*)&storage___scheduler_lock; 818 (*__scheduler_lock){}; 732 819 733 820 // Initialize the main cluster … … 764 851 pending_preemption = false; 765 852 kernel_thread = pthread_self(); 853 id = -1u; 766 854 767 855 runner{ &this }; 768 856 __cfadbg_print_safe(runtime_core, "Kernel : constructed main processor context %p\n", &runner); 857 858 __atomic_fetch_add( &cltr->nprocessors, 1u, __ATOMIC_SEQ_CST ); 769 859 } 770 860 … … 774 864 (*mainProcessor){}; 775 865 866 mainProcessor->id = doregister( (__processor_id_t*)mainProcessor); 867 776 868 //initialize the global state variables 777 869 kernelTLS.this_processor = mainProcessor; 778 870 kernelTLS.this_thread = mainThread; 779 871 872 #if !defined( __CFA_NO_STATISTICS__ ) 873 kernelTLS.this_stats = (__stats_t *)& storage_mainProcStats; 874 __init_stats( kernelTLS.this_stats ); 875 #endif 876 780 877 // Enable preemption 781 878 kernel_start_preemption(); … … 783 880 // Add the main thread to the ready queue 784 881 // once resume is called on mainProcessor->runner the mainThread needs to be scheduled like any normal thread 785 __schedule_thread( mainThread);882 __schedule_thread((__processor_id_t *)mainProcessor, mainThread); 786 883 787 884 // SKULLDUGGERY: Force a context switch to the main processor to set the main thread's context to the current UNIX … … 827 924 kernel_stop_preemption(); 828 925 926 unregister((__processor_id_t*)mainProcessor); 927 829 928 // Destroy the main processor and its context in reverse order of construction 830 929 // These were manually constructed so we need manually destroy them 831 930 void ^?{}(processor & this) with( this ){ 832 931 /* paranoid */ verify( this.do_terminate == true ); 932 __atomic_fetch_sub( &cltr->nprocessors, 1u, __ATOMIC_SEQ_CST ); 933 __cfaabi_dbg_print_safe("Kernel : destroyed main processor context %p\n", &runner); 833 934 } 834 935 … … 836 937 837 938 // Final step, destroy the main thread since it is no longer needed 939 838 940 // Since we provided a stack to this taxk it will not destroy anything 839 941 /* paranoid */ verify(mainThread->self_cor.stack.storage == (__stack_t*)(((uintptr_t)&storage_mainThreadCtx)| 0x1)); … … 842 944 ^(*mainCluster){}; 843 945 946 ^(*__scheduler_lock){}; 947 844 948 ^(__cfa_dbg_global_clusters.list){}; 845 949 ^(__cfa_dbg_global_clusters.lock){}; … … 851 955 // Kernel Idle Sleep 852 956 //============================================================================================= 853 static $thread * __halt(processor * this) with( *this ) {854 if( do_terminate ) return 0p;855 856 // First, lock the cluster idle857 lock( cltr->idle_lock __cfaabi_dbg_ctx2 );858 859 // Check if we can find a thread860 if( $thread * found = __next_thread( cltr ) ) {861 unlock( cltr->idle_lock );862 return found;863 }864 865 // Move this processor from the active list to the idle list866 move_to_front(cltr->procs, cltr->idles, *this);867 868 // Unlock the idle lock so we don't go to sleep with a lock869 unlock (cltr->idle_lock);870 871 // We are ready to sleep872 __cfadbg_print_safe(runtime_core, "Kernel : Processor %p ready to sleep\n", this);873 wait( idle );874 875 // We have woken up876 __cfadbg_print_safe(runtime_core, "Kernel : Processor %p woke up and ready to run\n", this);877 878 // Get ourself off the idle list879 with( *cltr ) {880 lock (idle_lock __cfaabi_dbg_ctx2);881 move_to_front(idles, procs, *this);882 unlock(idle_lock);883 }884 885 // Don't check the ready queue again, we may not be in a position to run a thread886 return 0p;887 }888 889 957 // Wake a thread from the front if there are any 890 static bool __wake_one(cluster * this, __attribute__((unused)) bool force) { 891 // if we don't want to force check if we know it's false 892 // if( !this->idles.head && !force ) return false; 893 894 // First, lock the cluster idle 895 lock( this->idle_lock __cfaabi_dbg_ctx2 ); 896 897 // Check if there is someone to wake up 898 if( !this->idles.head ) { 899 // Nope unlock and return false 900 unlock( this->idle_lock ); 901 return false; 902 } 903 904 // Wake them up 905 __cfadbg_print_safe(runtime_core, "Kernel : waking Processor %p\n", this->idles.head); 906 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 907 post( this->idles.head->idle ); 908 909 // Unlock and return true 910 unlock( this->idle_lock ); 958 static bool __wake_one(struct __processor_id_t * id, cluster * this) { 959 /* paranoid */ verify( ready_schedule_islocked( id ) ); 960 961 // Check if there is a sleeping processor 962 processor * p = pop(this->idles); 963 964 // If no one is sleeping, we are done 965 if( 0p == p ) return false; 966 967 // We found a processor, wake it up 968 post( p->idle ); 969 911 970 return true; 912 971 } … … 922 981 923 982 return ret; 983 } 984 985 static void __halt(processor * this) with( *this ) { 986 if( do_terminate ) return; 987 988 #if !defined(__CFA_NO_STATISTICS__) 989 __tls_stats()->ready.sleep.halts++; 990 #endif 991 // Push self to queue 992 push(cltr->idles, *this); 993 994 // Makre sure we don't miss a thread 995 if( __has_next_thread(cltr) ) { 996 // A thread was posted, make sure a processor is woken up 997 struct __processor_id_t *id = (struct __processor_id_t *) this; 998 ready_schedule_lock ( id ); 999 __wake_one( id, cltr ); 1000 ready_schedule_unlock( id ); 1001 #if !defined(__CFA_NO_STATISTICS__) 1002 __tls_stats()->ready.sleep.cancels++; 1003 #endif 1004 } 1005 1006 wait( idle ); 924 1007 } 925 1008 … … 1078 1161 cltr->nthreads -= 1; 1079 1162 unlock(cltr->thread_list_lock); 1080 }1081 1082 void doregister( cluster * cltr, processor * proc ) {1083 lock (cltr->idle_lock __cfaabi_dbg_ctx2);1084 cltr->nprocessors += 1;1085 push_front(cltr->procs, *proc);1086 unlock (cltr->idle_lock);1087 }1088 1089 void unregister( cluster * cltr, processor * proc ) {1090 lock (cltr->idle_lock __cfaabi_dbg_ctx2);1091 remove(cltr->procs, *proc );1092 cltr->nprocessors -= 1;1093 unlock(cltr->idle_lock);1094 1163 } 1095 1164 -
libcfa/src/concurrency/kernel.hfa
r7f9968ad r8b58bae 23 23 #include "coroutine.hfa" 24 24 25 #include "containers/stackLockFree.hfa" 26 25 27 extern "C" { 26 28 #include <pthread.h> … … 47 49 extern struct cluster * mainCluster; 48 50 49 // Processor 51 // Processor id, required for scheduling threads 52 struct __processor_id_t { 53 unsigned id; 54 55 #if !defined(__CFA_NO_STATISTICS__) 56 struct __stats_t * stats; 57 #endif 58 }; 59 50 60 coroutine processorCtx_t { 51 61 struct processor * proc; … … 53 63 54 64 // Wrapper around kernel threads 55 struct processor {65 struct __attribute__((aligned(128))) processor { 56 66 // Main state 67 inline __processor_id_t; 68 69 // Cluster from which to get threads 70 struct cluster * cltr; 71 72 // Set to true to notify the processor should terminate 73 volatile bool do_terminate; 74 57 75 // Coroutine ctx who does keeps the state of the processor 58 76 struct processorCtx_t runner; 59 60 // Cluster from which to get threads61 struct cluster * cltr;62 77 63 78 // Name of the processor … … 81 96 __bin_sem_t idle; 82 97 83 // Termination84 // Set to true to notify the processor should terminate85 volatile bool do_terminate;86 87 98 // Termination synchronisation (user semaphore) 88 99 semaphore terminated; … … 92 103 93 104 // Link lists fields 94 struct __dbg_node_proc { 95 struct processor * next; 96 struct processor * prev; 97 } node; 105 Link(processor) link; 98 106 99 107 #ifdef __CFA_DEBUG__ … … 110 118 static inline void ?{}(processor & this, const char name[]) { this{name, *mainCluster }; } 111 119 112 static inline [processor *&, processor *& ] __get( processor & this ) __attribute__((const)) { return this.node.[next, prev]; }120 static inline Link(processor) * ?`next( processor * this ) { return &this->link; } 113 121 114 122 //----------------------------------------------------------------------------- … … 121 129 #define CFA_CLUSTER_IO_BUFFLEN_OFFSET 16 122 130 131 132 //----------------------------------------------------------------------------- 133 // Cluster Tools 134 135 // Intrusives lanes which are used by the relaxed ready queue 136 struct __attribute__((aligned(128))) __intrusive_lane_t; 137 void ?{}(__intrusive_lane_t & this); 138 void ^?{}(__intrusive_lane_t & this); 139 140 // Counter used for wether or not the lanes are all empty 141 struct __attribute__((aligned(128))) __snzi_node_t; 142 struct __snzi_t { 143 unsigned mask; 144 int root; 145 __snzi_node_t * nodes; 146 }; 147 148 void ?{}( __snzi_t & this, unsigned depth ); 149 void ^?{}( __snzi_t & this ); 150 151 //TODO adjust cache size to ARCHITECTURE 152 // Structure holding the relaxed ready queue 153 struct __ready_queue_t { 154 // Data tracking how many/which lanes are used 155 // Aligned to 128 for cache locality 156 __snzi_t snzi; 157 158 // Data tracking the actual lanes 159 // On a seperate cacheline from the used struct since 160 // used can change on each push/pop but this data 161 // only changes on shrink/grow 162 struct { 163 // Arary of lanes 164 __intrusive_lane_t * volatile data; 165 166 // Number of lanes (empty or not) 167 volatile size_t count; 168 } lanes; 169 }; 170 171 void ?{}(__ready_queue_t & this); 172 void ^?{}(__ready_queue_t & this); 173 123 174 //----------------------------------------------------------------------------- 124 175 // Cluster 125 struct cluster { 126 // Ready queue locks 127 __spinlock_t ready_queue_lock; 128 176 struct __attribute__((aligned(128))) cluster { 129 177 // Ready queue for threads 130 __ queue_t($thread)ready_queue;178 __ready_queue_t ready_queue; 131 179 132 180 // Name of the cluster … … 136 184 Duration preemption_rate; 137 185 138 // List of processors 139 __spinlock_t idle_lock; 140 __dllist_t(struct processor) procs; 141 __dllist_t(struct processor) idles; 142 unsigned int nprocessors; 186 // List of idle processors 187 StackLF(processor) idles; 188 volatile unsigned int nprocessors; 143 189 144 190 // List of threads … … 157 203 #if !defined(__CFA_NO_STATISTICS__) 158 204 bool print_stats; 205 struct __stats_t * stats; 159 206 #endif 160 207 }; -
libcfa/src/concurrency/kernel_private.hfa
r7f9968ad r8b58bae 20 20 21 21 #include "alarm.hfa" 22 #include "stats.hfa" 23 24 #include "bits/random.hfa" 22 25 23 26 24 27 //----------------------------------------------------------------------------- 25 28 // Scheduler 29 30 struct __attribute__((aligned(128))) __scheduler_lock_id_t; 26 31 27 32 extern "C" { … … 31 36 } 32 37 33 void __schedule_thread( $thread * ) __attribute__((nonnull (1)));38 void __schedule_thread( struct __processor_id_t *, $thread * ) __attribute__((nonnull (2))); 34 39 35 40 //Block current thread and release/wake-up the following resources … … 73 78 74 79 // KERNEL ONLY unpark with out disabling interrupts 75 void __unpark( $thread * thrd __cfaabi_dbg_ctx_param2 );80 void __unpark( struct __processor_id_t *, $thread * thrd __cfaabi_dbg_ctx_param2 ); 76 81 77 82 //----------------------------------------------------------------------------- … … 84 89 //----------------------------------------------------------------------------- 85 90 // Utils 86 #define KERNEL_STORAGE(T,X) static char storage_##X[sizeof(T)] 87 88 static inline uint32_t __tls_rand() { 89 kernelTLS.rand_seed ^= kernelTLS.rand_seed << 6; 90 kernelTLS.rand_seed ^= kernelTLS.rand_seed >> 21; 91 kernelTLS.rand_seed ^= kernelTLS.rand_seed << 7; 92 return kernelTLS.rand_seed; 91 #define KERNEL_STORAGE(T,X) __attribute((aligned(__alignof__(T)))) static char storage_##X[sizeof(T)] 92 93 static inline uint64_t __tls_rand() { 94 // kernelTLS.rand_seed ^= kernelTLS.rand_seed << 6; 95 // kernelTLS.rand_seed ^= kernelTLS.rand_seed >> 21; 96 // kernelTLS.rand_seed ^= kernelTLS.rand_seed << 7; 97 // return kernelTLS.rand_seed; 98 return __lehmer64( kernelTLS.rand_seed ); 93 99 } 94 100 … … 100 106 void unregister( struct cluster * cltr, struct $thread & thrd ); 101 107 102 void doregister( struct cluster * cltr, struct processor * proc ); 103 void unregister( struct cluster * cltr, struct processor * proc ); 108 //======================================================================= 109 // Cluster lock API 110 //======================================================================= 111 // Cells use by the reader writer lock 112 // while not generic it only relies on a opaque pointer 113 struct __attribute__((aligned(128))) __scheduler_lock_id_t { 114 // Spin lock used as the underlying lock 115 volatile bool lock; 116 117 // Handle pointing to the proc owning this cell 118 // Used for allocating cells and debugging 119 __processor_id_t * volatile handle; 120 121 #ifdef __CFA_WITH_VERIFY__ 122 // Debug, check if this is owned for reading 123 bool owned; 124 #endif 125 }; 126 127 static_assert( sizeof(struct __scheduler_lock_id_t) <= __alignof(struct __scheduler_lock_id_t)); 128 129 // Lock-Free registering/unregistering of threads 130 // Register a processor to a given cluster and get its unique id in return 131 unsigned doregister( struct __processor_id_t * proc ); 132 133 // Unregister a processor from a given cluster using its id, getting back the original pointer 134 void unregister( struct __processor_id_t * proc ); 135 136 //======================================================================= 137 // Reader-writer lock implementation 138 // Concurrent with doregister/unregister, 139 // i.e., threads can be added at any point during or between the entry/exit 140 141 //----------------------------------------------------------------------- 142 // simple spinlock underlying the RWLock 143 // Blocking acquire 144 static inline void __atomic_acquire(volatile bool * ll) { 145 while( __builtin_expect(__atomic_exchange_n(ll, (bool)true, __ATOMIC_SEQ_CST), false) ) { 146 while(__atomic_load_n(ll, (int)__ATOMIC_RELAXED)) 147 asm volatile("pause"); 148 } 149 /* paranoid */ verify(*ll); 150 } 151 152 // Non-Blocking acquire 153 static inline bool __atomic_try_acquire(volatile bool * ll) { 154 return !__atomic_exchange_n(ll, (bool)true, __ATOMIC_SEQ_CST); 155 } 156 157 // Release 158 static inline void __atomic_unlock(volatile bool * ll) { 159 /* paranoid */ verify(*ll); 160 __atomic_store_n(ll, (bool)false, __ATOMIC_RELEASE); 161 } 162 163 //----------------------------------------------------------------------- 164 // Reader-Writer lock protecting the ready-queues 165 // while this lock is mostly generic some aspects 166 // have been hard-coded to for the ready-queue for 167 // simplicity and performance 168 struct __scheduler_RWLock_t { 169 // total cachelines allocated 170 unsigned int max; 171 172 // cachelines currently in use 173 volatile unsigned int alloc; 174 175 // cachelines ready to itereate over 176 // (!= to alloc when thread is in second half of doregister) 177 volatile unsigned int ready; 178 179 // writer lock 180 volatile bool lock; 181 182 // data pointer 183 __scheduler_lock_id_t * data; 184 }; 185 186 void ?{}(__scheduler_RWLock_t & this); 187 void ^?{}(__scheduler_RWLock_t & this); 188 189 extern __scheduler_RWLock_t * __scheduler_lock; 190 191 //----------------------------------------------------------------------- 192 // Reader side : acquire when using the ready queue to schedule but not 193 // creating/destroying queues 194 static inline void ready_schedule_lock( struct __processor_id_t * proc) with(*__scheduler_lock) { 195 unsigned iproc = proc->id; 196 /*paranoid*/ verify(data[iproc].handle == proc); 197 /*paranoid*/ verify(iproc < ready); 198 199 // Step 1 : make sure no writer are in the middle of the critical section 200 while(__atomic_load_n(&lock, (int)__ATOMIC_RELAXED)) 201 asm volatile("pause"); 202 203 // Fence needed because we don't want to start trying to acquire the lock 204 // before we read a false. 205 // Not needed on x86 206 // std::atomic_thread_fence(std::memory_order_seq_cst); 207 208 // Step 2 : acquire our local lock 209 __atomic_acquire( &data[iproc].lock ); 210 /*paranoid*/ verify(data[iproc].lock); 211 212 #ifdef __CFA_WITH_VERIFY__ 213 // Debug, check if this is owned for reading 214 data[iproc].owned = true; 215 #endif 216 } 217 218 static inline void ready_schedule_unlock( struct __processor_id_t * proc) with(*__scheduler_lock) { 219 unsigned iproc = proc->id; 220 /*paranoid*/ verify(data[iproc].handle == proc); 221 /*paranoid*/ verify(iproc < ready); 222 /*paranoid*/ verify(data[iproc].lock); 223 /*paranoid*/ verify(data[iproc].owned); 224 #ifdef __CFA_WITH_VERIFY__ 225 // Debug, check if this is owned for reading 226 data[iproc].owned = false; 227 #endif 228 __atomic_unlock(&data[iproc].lock); 229 } 230 231 #ifdef __CFA_WITH_VERIFY__ 232 static inline bool ready_schedule_islocked( struct __processor_id_t * proc) { 233 return __scheduler_lock->data[proc->id].owned; 234 } 235 236 static inline bool ready_mutate_islocked() { 237 return __scheduler_lock->lock; 238 } 239 #endif 240 241 //----------------------------------------------------------------------- 242 // Writer side : acquire when changing the ready queue, e.g. adding more 243 // queues or removing them. 244 uint_fast32_t ready_mutate_lock( void ); 245 246 void ready_mutate_unlock( uint_fast32_t /* value returned by lock */ ); 247 248 //======================================================================= 249 // Ready-Queue API 250 //----------------------------------------------------------------------- 251 // pop thread from the ready queue of a cluster 252 // returns 0p if empty 253 __attribute__((hot)) bool query(struct cluster * cltr); 254 255 //----------------------------------------------------------------------- 256 // push thread onto a ready queue for a cluster 257 // returns true if the list was previously empty, false otherwise 258 __attribute__((hot)) bool push(struct cluster * cltr, struct $thread * thrd); 259 260 //----------------------------------------------------------------------- 261 // pop thread from the ready queue of a cluster 262 // returns 0p if empty 263 __attribute__((hot)) struct $thread * pop(struct cluster * cltr); 264 265 //----------------------------------------------------------------------- 266 // remove thread from the ready queue of a cluster 267 // returns bool if it wasn't found 268 bool remove_head(struct cluster * cltr, struct $thread * thrd); 269 270 //----------------------------------------------------------------------- 271 // Increase the width of the ready queue (number of lanes) by 4 272 void ready_queue_grow (struct cluster * cltr); 273 274 //----------------------------------------------------------------------- 275 // Decrease the width of the ready queue (number of lanes) by 4 276 void ready_queue_shrink(struct cluster * cltr); 277 278 //----------------------------------------------------------------------- 279 // Statics call at the end of each thread to register statistics 280 #if !defined(__CFA_NO_STATISTICS__) 281 static inline struct __stats_t * __tls_stats() { 282 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 283 /* paranoid */ verify( kernelTLS.this_stats ); 284 return kernelTLS.this_stats; 285 } 286 #endif 104 287 105 288 // Local Variables: // -
libcfa/src/concurrency/monitor.cfa
r7f9968ad r8b58bae 114 114 115 115 // Some one else has the monitor, wait in line for it 116 /* paranoid */ verify( thrd-> next == 0p );116 /* paranoid */ verify( thrd->link.next == 0p ); 117 117 append( this->entry_queue, thrd ); 118 /* paranoid */ verify( thrd-> next == 1p );118 /* paranoid */ verify( thrd->link.next == 1p ); 119 119 120 120 unlock( this->lock ); … … 199 199 200 200 // Some one else has the monitor, wait in line for it 201 /* paranoid */ verify( thrd-> next == 0p );201 /* paranoid */ verify( thrd->link.next == 0p ); 202 202 append( this->entry_queue, thrd ); 203 /* paranoid */ verify( thrd-> next == 1p );203 /* paranoid */ verify( thrd->link.next == 1p ); 204 204 unlock( this->lock ); 205 205 … … 761 761 $thread * new_owner = pop_head( this->entry_queue ); 762 762 /* paranoid */ verifyf( !this->owner || kernelTLS.this_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, this->owner, this->recursion, this ); 763 /* paranoid */ verify( !new_owner || new_owner-> next == 0p );763 /* paranoid */ verify( !new_owner || new_owner->link.next == 0p ); 764 764 __set_owner( this, new_owner ); 765 765 … … 883 883 } 884 884 885 __cfaabi_dbg_print_safe( "Kernel : Runing %i (%p)\n", ready2run, ready2run ? node->waiting_thread :0p );885 __cfaabi_dbg_print_safe( "Kernel : Runing %i (%p)\n", ready2run, ready2run ? (thread*)node->waiting_thread : (thread*)0p ); 886 886 return ready2run ? node->waiting_thread : 0p; 887 887 } … … 907 907 // For each thread in the entry-queue 908 908 for( $thread ** thrd_it = &entry_queue.head; 909 *thrd_it!= 1p;910 thrd_it = &(*thrd_it)-> next909 (*thrd_it) != 1p; 910 thrd_it = &(*thrd_it)->link.next 911 911 ) { 912 912 // For each acceptable check if it matches -
libcfa/src/concurrency/preemption.cfa
r7f9968ad r8b58bae 37 37 // FwdDeclarations : timeout handlers 38 38 static void preempt( processor * this ); 39 static void timeout( $thread * this );39 static void timeout( struct __processor_id_t * id, $thread * this ); 40 40 41 41 // FwdDeclarations : Signal handlers … … 88 88 89 89 // Tick one frame of the Discrete Event Simulation for alarms 90 static void tick_preemption( ) {90 static void tick_preemption( struct __processor_id_t * id ) { 91 91 alarm_node_t * node = 0p; // Used in the while loop but cannot be declared in the while condition 92 92 alarm_list_t * alarms = &event_kernel->alarms; // Local copy for ease of reading … … 106 106 } 107 107 else { 108 timeout( node->thrd );108 timeout( id, node->thrd ); 109 109 } 110 110 … … 119 119 // If there are still alarms pending, reset the timer 120 120 if( & (*alarms)`first ) { 121 __cfa abi_dbg_print_buffer_decl(" KERNEL: @%ju(%ju) resetting alarm to %ju.\n", currtime.tv, __kernel_get_time().tv, (alarms->head->alarm - currtime).tv);121 __cfadbg_print_buffer_decl(preemption, " KERNEL: @%ju(%ju) resetting alarm to %ju.\n", currtime.tv, __kernel_get_time().tv, (alarms->head->alarm - currtime).tv); 122 122 Duration delta = (*alarms)`first.alarm - currtime; 123 123 Duration capped = max(delta, 50`us); … … 266 266 267 267 // reserved for future use 268 static void timeout( $thread * this ) { 269 __unpark( this __cfaabi_dbg_ctx2 ); 268 static void timeout( struct __processor_id_t * id, $thread * this ) { 269 #if !defined( __CFA_NO_STATISTICS__ ) 270 kernelTLS.this_stats = this->curr_cluster->stats; 271 #endif 272 __unpark( id, this __cfaabi_dbg_ctx2 ); 270 273 } 271 274 … … 403 406 // Waits on SIGALRM and send SIGUSR1 to whom ever needs it 404 407 static void * alarm_loop( __attribute__((unused)) void * args ) { 408 __processor_id_t id; 409 id.id = doregister(&id); 410 405 411 // Block sigalrms to control when they arrive 406 412 sigset_t mask; … … 447 453 // __cfaabi_dbg_print_safe( "Kernel : Preemption thread tick\n" ); 448 454 lock( event_kernel->lock __cfaabi_dbg_ctx2 ); 449 tick_preemption( );455 tick_preemption( &id ); 450 456 unlock( event_kernel->lock ); 451 457 break; … … 460 466 EXIT: 461 467 __cfaabi_dbg_print_safe( "Kernel : Preemption thread stopping\n" ); 468 unregister(&id); 462 469 return 0p; 463 470 } -
libcfa/src/concurrency/thread.cfa
r7f9968ad r8b58bae 28 28 context{ 0p, 0p }; 29 29 self_cor{ name, storage, storageSize }; 30 ticket = 1; 30 31 state = Start; 31 32 preempted = __NO_PREEMPTION; … … 35 36 self_mon_p = &self_mon; 36 37 curr_cluster = &cl; 37 next = 0p; 38 link.next = 0p; 39 link.prev = 0p; 40 link.preferred = -1; 38 41 39 42 node.next = 0p; … … 61 64 verify( this_thrd->context.SP ); 62 65 63 __schedule_thread( this_thrd);66 __schedule_thread( (__processor_id_t *)kernelTLS.this_processor, this_thrd); 64 67 enable_interrupts( __cfaabi_dbg_ctx ); 65 68 } -
libcfa/src/containers/stackLockFree.hfa
r7f9968ad r8b58bae 1 // 1 // 2 2 // Cforall Version 1.0.0 Copyright (C) 2017 University of Waterloo 3 3 // The contents of this file are covered under the licence agreement in the 4 4 // file "LICENCE" distributed with Cforall. 5 5 // 6 // stackLockFree.hfa -- 7 // 6 // stackLockFree.hfa -- 7 // 8 8 // Author : Peter A. Buhr 9 9 // Created On : Wed May 13 20:58:58 2020 … … 11 11 // Last Modified On : Sun Jun 14 13:25:09 2020 12 12 // Update Count : 64 13 // 13 // 14 14 15 15 #pragma once … … 31 31 }; // Link 32 32 33 forall( dtype T | sized(T) | { Link(T) * getNext( T * ); } ) {34 33 forall( otype T | sized(T) | { Link(T) * ?`next( T * ); } ) { 34 struct StackLF { 35 35 Link(T) stack; 36 36 }; // StackLF … … 42 42 43 43 void push( StackLF(T) & this, T & n ) with(this) { 44 * getNext( &n ) = stack;// atomic assignment unnecessary, or use CAA44 *( &n )`next = stack; // atomic assignment unnecessary, or use CAA 45 45 for () { // busy wait 46 if ( __atomic_compare_exchange_n( &stack.atom, & getNext( &n )->atom, (Link(T))@{ {&n, getNext( &n )->count + 1} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) break; // attempt to update top node46 if ( __atomic_compare_exchange_n( &stack.atom, &( &n )`next->atom, (Link(T))@{ {&n, ( &n )`next->count + 1} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) break; // attempt to update top node 47 47 } // for 48 48 } // push … … 52 52 for () { // busy wait 53 53 if ( t.top == 0p ) return 0p; // empty stack ? 54 if ( __atomic_compare_exchange_n( &stack.atom, &t.atom, (Link(T))@{ { getNext( t.top )->top, t.count} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) return t.top; // attempt to update top node54 if ( __atomic_compare_exchange_n( &stack.atom, &t.atom, (Link(T))@{ {( t.top )`next->top, t.count} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) return t.top; // attempt to update top node 55 55 } // for 56 56 } // pop 57 58 bool unsafe_remove( StackLF(T) & this, T * node ) with(this) { 59 Link(T) * link = &stack; 60 for() { 61 T * next = link->top; 62 if( next == node ) { 63 link->top = ( node )`next->top; 64 return true; 65 } 66 if( next == 0p ) return false; 67 link = (next)`next; 68 } 69 } 57 70 } // distribution 58 71 } // distribution -
libcfa/src/heap.cfa
r7f9968ad r8b58bae 209 209 #if BUCKETLOCK == LOCKFREE 210 210 static inline { 211 Link(HeapManager.Storage) * getNext( HeapManager.Storage * this ) { return &this->header.kind.real.next; }211 Link(HeapManager.Storage) * ?`next( HeapManager.Storage * this ) { return &this->header.kind.real.next; } 212 212 void ?{}( HeapManager.FreeHeader & ) {} 213 213 void ^?{}( HeapManager.FreeHeader & ) {} … … 667 667 #else 668 668 for ( HeapManager.Storage * p = top( freeLists[i].freeList ); p != 0p; /* p = getNext( p )->top */) { 669 typeof(p) temp = getNext( p )->top; // FIX ME: direct assignent fails, initialization works669 typeof(p) temp = ( p )`next->top; // FIX ME: direct assignent fails, initialization works 670 670 p = temp; 671 671 #endif // BUCKETLOCK … … 903 903 return oaddr; 904 904 } // if 905 905 906 906 // change size, DO NOT preserve STICKY PROPERTIES. 907 907 free( oaddr ); -
libcfa/src/stdhdr/assert.h
r7f9968ad r8b58bae 33 33 #define verify(x) assert(x) 34 34 #define verifyf(x, ...) assertf(x, __VA_ARGS__) 35 #define verifyfail(...) 35 36 #define __CFA_WITH_VERIFY__ 36 37 #else 37 38 #define verify(x) 38 39 #define verifyf(x, ...) 40 #define verifyfail(...) 39 41 #endif 40 42 -
tests/concurrent/examples/datingService.cfa
r7f9968ad r8b58bae 35 35 signal_block( Boys[ccode] ); // restart boy to set phone number 36 36 } // if 37 // sout | "Girl:" | PhoneNo | "is dating Boy at" | BoyPhoneNo | "with ccode" | ccode;37 // sout | "Girl:" | PhoneNo | "is dating Boy at" | BoyPhoneNo | "with ccode" | ccode; 38 38 return BoyPhoneNo; 39 39 } // DatingService girl … … 47 47 signal_block( Girls[ccode] ); // restart girl to set phone number 48 48 } // if 49 // sout | " Boy:" | PhoneNo | "is dating Girl" | GirlPhoneNo | "with ccode" | ccode;49 // sout | " Boy:" | PhoneNo | "is dating Girl" | GirlPhoneNo | "with ccode" | ccode; 50 50 return GirlPhoneNo; 51 51 } // DatingService boy -
tests/concurrent/signal/disjoint.cfa
r7f9968ad r8b58bae 21 21 #endif 22 22 23 // This tests checks what happens when someone barges in the midle of the release 24 // of a bulk of monitors. 25 23 26 enum state_t { WAIT, SIGNAL, BARGE }; 24 27 25 28 monitor global_t {}; 26 global_t mut;27 29 28 30 monitor global_data_t; … … 33 35 int counter; 34 36 state_t state; 35 } data; 37 }; 38 39 // Use a global struct because the order needs to match with Signaller thread 40 struct { 41 global_t mut; 42 global_data_t data; 43 } globals; 36 44 37 45 condition cond; … … 40 48 41 49 void ?{}( global_data_t & this ) { 42 this.counter = =0;50 this.counter = 0; 43 51 this.state = BARGE; 44 52 } … … 53 61 54 62 thread Barger {}; 63 void ?{}( Barger & this ) { 64 ((thread&)this){ "Barger Thread" }; 65 } 55 66 56 67 void main( Barger & this ) { 57 68 while( !all_done ) { 58 barge( data );69 barge( globals.data ); 59 70 yield(); 60 71 } … … 78 89 79 90 thread Waiter {}; 91 void ?{}( Waiter & this ) { 92 ((thread&)this){ "Waiter Thread" }; 93 } 80 94 81 95 void main( Waiter & this ) { 82 while( wait( mut,data ) ) { KICK_WATCHDOG; yield(); }96 while( wait( globals.mut, globals.data ) ) { KICK_WATCHDOG; yield(); } 83 97 } 84 98 … … 92 106 93 107 void logic( global_t & mutex a ) { 94 signal( cond, a, data );108 signal( cond, a, globals.data ); 95 109 96 110 yield( random( 10 ) ); 97 111 98 112 //This is technically a mutual exclusion violation but the mutex monitor protects us 99 bool running = TEST( data.counter < N) &&data.counter > 0;100 if( data.state != SIGNAL && running ) {101 sout | "ERROR Eager signal" | data.state;113 bool running = TEST(globals.data.counter < N) && globals.data.counter > 0; 114 if( globals.data.state != SIGNAL && running ) { 115 sout | "ERROR Eager signal" | globals.data.state; 102 116 } 103 117 } 104 118 105 119 thread Signaller {}; 120 void ?{}( Signaller & this ) { 121 ((thread&)this){ "Signaller Thread" }; 122 } 106 123 107 124 void main( Signaller & this ) { 108 125 while( !all_done ) { 109 logic( mut );126 logic( globals.mut ); 110 127 yield(); 111 128 } -
tests/concurrent/waitfor/when.cfa
r7f9968ad r8b58bae 57 57 58 58 void arbiter( global_t & mutex this ) { 59 // There is a race at start where callers can get in before the arbiter. 60 // It doesn't really matter here so just restart the loop correctly and move on 61 this.last_call = 6; 62 59 63 for( int i = 0; i < N; i++ ) { 60 64 when( this.last_call == 6 ) waitfor( call1 : this ) { if( this.last_call != 1) { serr | "Expected last_call to be 1 got" | this.last_call; } } -
tools/gdb/utils-gdb.py
r7f9968ad r8b58bae 59 59 thread_ptr = gdb.lookup_type('struct $thread').pointer(), 60 60 int_ptr = gdb.lookup_type('int').pointer(), 61 thread_state = gdb.lookup_type('enum coroutine_state'))61 thread_state = gdb.lookup_type('enum __Coroutine_State')) 62 62 63 63 def get_addr(addr):
Note:
See TracChangeset
for help on using the changeset viewer.