Changes in / [8b58bae:7f9968ad]
- Files:
-
- 18 deleted
- 23 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/readv.cfa
r8b58bae r7f9968ad 17 17 #include <time.hfa> 18 18 19 #include "../benchcltr.hfa"20 21 19 extern bool traceHeapOn(); 22 20 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags); … … 28 26 unsigned long int buflen = 50; 29 27 30 thread __attribute__((aligned(128))) Reader {}; 28 cluster * the_cluster; 29 30 thread Reader {}; 31 31 void ?{}( Reader & this ) { 32 ((thread&)this){ "Reader Thread", *the_benchmark_cluster }; 32 ((thread&)this){ "Reader Thread", *the_cluster }; 33 } 34 35 struct my_processor { 36 processor p; 37 }; 38 39 void ?{}( my_processor & this ) { 40 (this.p){ "I/O Processor", *the_cluster }; 33 41 } 34 42 35 43 void main( Reader & ) { 36 park( __cfaabi_dbg_ctx ); 37 /* paranoid */ assert( true == __atomic_load_n(&run, __ATOMIC_RELAXED) ); 44 while(!__atomic_load_n(&run, __ATOMIC_RELAXED)) yield(); 38 45 39 46 char data[buflen]; … … 146 153 { 147 154 Time start, end; 148 BenchCluster cl = { flags }; 155 cluster cl = { "IO Cluster", flags }; 156 the_cluster = &cl; 149 157 #if !defined(__CFA_NO_STATISTICS__) 150 print_stats_at_exit( cl .self);158 print_stats_at_exit( cl ); 151 159 #endif 152 160 { 153 BenchProcprocs[nprocs];161 my_processor procs[nprocs]; 154 162 { 155 163 Reader threads[nthreads]; 156 164 157 165 printf("Starting\n"); 158 bool is_tty = isatty(STDOUT_FILENO);159 166 start = getTime(); 160 167 run = true; 161 162 for(i; nthreads) { 163 unpark( threads[i] __cfaabi_dbg_ctx2 ); 164 } 165 wait(duration, start, end, is_tty); 166 168 do { 169 sleep(500`ms); 170 end = getTime(); 171 } while( (end - start) < duration`s ); 167 172 run = false; 168 173 end = getTime(); 169 printf(" \nDone\n");174 printf("Done\n"); 170 175 } 171 176 } -
doc/theses/thierry_delisle_PhD/code/relaxed_list.cpp
r8b58bae r7f9968ad 1 #if !defined(LIST_VARIANT_HPP) 2 #define LIST_VARIANT_HPP "relaxed_list.hpp" 3 #endif 4 5 #include LIST_VARIANT_HPP 6 #if !defined(LIST_VARIANT) 7 #error not variant selected 8 #endif 1 #include "relaxed_list.hpp" 9 2 10 3 #include <array> … … 42 35 43 36 template<> 44 thread_local LIST_VARIANT<Node>::TLS LIST_VARIANT<Node>::tls = {};37 thread_local relaxed_list<Node>::TLS relaxed_list<Node>::tls = {}; 45 38 46 39 template<> 47 std::atomic_uint32_t LIST_VARIANT<Node>::ticket = { 0 };40 relaxed_list<Node> * relaxed_list<Node>::head = nullptr; 48 41 49 42 #ifndef NO_STATS 50 43 template<> 51 LIST_VARIANT<Node>::GlobalStats LIST_VARIANT<Node>::global_stats = {};44 relaxed_list<Node>::GlobalStats relaxed_list<Node>::global_stats = {}; 52 45 #endif 53 46 … … 64 57 size_t valmax = 0; 65 58 size_t valmin = 100000000ul; 66 struct {67 size_t val = 0;68 size_t cnt = 0;69 } comp;70 struct {71 size_t val = 0;72 size_t cnt = 0;73 } subm;74 59 }; 75 60 … … 82 67 std::atomic_size_t valmax = { 0 }; 83 68 std::atomic_size_t valmin = { 100000000ul }; 84 struct {85 std::atomic_size_t val = { 0 };86 std::atomic_size_t cnt = { 0 };87 } comp;88 struct {89 std::atomic_size_t val = { 0 };90 std::atomic_size_t cnt = { 0 };91 } subm;92 69 }; 93 70 … … 119 96 global.crc_out += local.crc_out; 120 97 121 global.comp.val += local.comp.val;122 global.comp.cnt += local.comp.cnt;123 global.subm.val += local.subm.val;124 global.subm.cnt += local.subm.cnt;125 126 98 atomic_max(global.valmax, local.valmax); 127 99 atomic_min(global.valmin, local.valmin); 128 100 129 LIST_VARIANT<Node>::stats_tls_tally();101 relaxed_list<Node>::stats_tls_tally(); 130 102 } 131 103 … … 134 106 auto before = Clock::now(); 135 107 barrier.wait(0); 136 bool is_tty = isatty(STDOUT_FILENO);137 108 138 109 while(true) { … … 144 115 break; 145 116 } 146 if(is_tty) { 147 std::cout << "\r" << std::setprecision(4) << durr.count(); 148 std::cout.flush(); 149 } 117 std::cout << "\r" << std::setprecision(4) << durr.count(); 118 std::cout.flush(); 150 119 } 151 120 … … 190 159 auto dur_nano = duration_cast<std::nano>(1.0); 191 160 192 if(global.valmax != 0) {193 std::cout << "Max runs : " << global.valmax << "\n";194 std::cout << "Min runs : " << global.valmin << "\n";195 }196 if(global.comp.cnt != 0) {197 std::cout << "Submit count : " << global.subm.cnt << "\n";198 std::cout << "Submit average: " << ((double(global.subm.val)) / global.subm.cnt) << "\n";199 std::cout << "Complete count: " << global.comp.cnt << "\n";200 std::cout << "Complete avg : " << ((double(global.comp.val)) / global.comp.cnt) << "\n";201 }202 161 std::cout << "Duration : " << duration << "s\n"; 203 162 std::cout << "ns/Op : " << ( dur_nano / ops_thread )<< "\n"; … … 205 164 std::cout << "Ops/sec : " << ops_sec << "\n"; 206 165 std::cout << "Total ops : " << ops << "(" << global.in << "i, " << global.out << "o, " << global.empty << "e)\n"; 166 if(global.valmax != 0) { 167 std::cout << "Max runs : " << global.valmax << "\n"; 168 std::cout << "Min runs : " << global.valmin << "\n"; 169 } 207 170 #ifndef NO_STATS 208 LIST_VARIANT<Node>::stats_print(std::cout);171 relaxed_list<Node>::stats_print(std::cout); 209 172 #endif 210 173 } … … 223 186 unsigned nslots, 224 187 local_stat_t & local, 225 LIST_VARIANT<Node> & list188 relaxed_list<Node> & list 226 189 ) { 227 190 while(__builtin_expect(!done.load(std::memory_order_relaxed), true)) { … … 261 224 std::cout << "Initializing "; 262 225 size_t npushed = 0; 263 LIST_VARIANT<Node> list = { nthread,nqueues };226 relaxed_list<Node> list = { nthread * nqueues }; 264 227 { 265 228 Node** all_nodes[nthread]; … … 347 310 unsigned nnodes, 348 311 local_stat_t & local, 349 LIST_VARIANT<Node> & list312 relaxed_list<Node> & list 350 313 ) { 351 314 Node * nodes[nnodes]; … … 391 354 std::cout << "Initializing "; 392 355 // List being tested 393 LIST_VARIANT<Node> list = { nthread,nqueues };356 relaxed_list<Node> list = { nthread * nqueues }; 394 357 { 395 358 enable_stats = true; … … 432 395 433 396 enable_stats = false; 434 }435 436 print_stats(duration, nthread, global);437 }438 439 // ================================================================================================440 struct __attribute__((aligned(64))) Slot {441 Node * volatile node;442 };443 444 __attribute__((noinline)) void runProducer_body(445 std::atomic<bool>& done,446 Random & rand,447 Slot * slots,448 int nslots,449 local_stat_t & local,450 LIST_VARIANT<Node> & list451 ) {452 while(__builtin_expect(!done.load(std::memory_order_relaxed), true)) {453 454 Node * node = list.pop();455 if(!node) {456 local.empty ++;457 continue;458 }459 460 local.crc_out += node->value;461 local.out++;462 463 if(node->id == 0) {464 unsigned cnt = 0;465 for(int i = 0; i < nslots; i++) {466 Node * found = __atomic_exchange_n( &slots[i].node, nullptr, __ATOMIC_SEQ_CST );467 if( found ) {468 local.crc_in += found->value;469 local.in++;470 cnt++;471 list.push( found );472 }473 }474 475 local.crc_in += node->value;476 local.in++;477 list.push( node );478 479 local.comp.cnt++;480 local.comp.val += cnt;481 }482 else {483 unsigned len = 0;484 while(true) {485 auto off = rand.next();486 for(int i = 0; i < nslots; i++) {487 Node * expected = nullptr;488 int idx = (i + off) % nslots;489 Slot & slot = slots[ idx ];490 if(491 slot.node == nullptr &&492 __atomic_compare_exchange_n( &slot.node, &expected, node, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST )493 ) {494 local.subm.cnt++;495 local.subm.val += len;496 goto LOOP;497 }498 assert( expected != node );499 len++;500 }501 }502 }503 504 LOOP:;505 }506 }507 508 void runProducer(unsigned nthread, unsigned nqueues, double duration, unsigned nnodes) {509 std::cout << "Producer Benchmark" << std::endl;510 511 // Barrier for synchronization512 barrier_t barrier(nthread + 1);513 514 // Data to check everything is OK515 global_stat_t global;516 517 // Flag to signal termination518 std::atomic_bool done = { false };519 520 std::cout << "Initializing ";521 522 int nslots = nnodes * 4;523 Slot * slots = new Slot[nslots];524 std::cout << nnodes << " nodes (" << nslots << " slots)" << std::endl;525 526 // List being tested527 LIST_VARIANT<Node> list = { nthread, nqueues };528 {529 Random rand(rdtscl());530 for(unsigned i = 0; i < nnodes; i++) {531 Node * node = new Node(rand.next() % 100);532 node->id = i;533 global.crc_in += node->value;534 list.push(node);535 }536 537 for(int i = 0; i < nslots; i++) {538 slots[i].node = nullptr;539 }540 }541 542 {543 enable_stats = true;544 545 std::thread * threads[nthread];546 unsigned i = 1;547 for(auto & t : threads) {548 t = new std::thread([&done, &list, &barrier, &global, slots, nslots](unsigned tid) {549 Random rand(tid + rdtscl());550 551 local_stat_t local;552 barrier.wait(tid);553 554 // EXPERIMENT START555 556 runProducer_body(done, rand, slots, nslots, local, list);557 558 // EXPERIMENT END559 560 barrier.wait(tid);561 562 tally_stats(global, local);563 }, i++);564 }565 566 waitfor(duration, barrier, done);567 568 for(auto t : threads) {569 t->join();570 delete t;571 }572 573 enable_stats = false;574 }575 576 {577 while(Node * node = list.pop()) {578 global.crc_out += node->value;579 delete node;580 }581 582 for(int i = 0; i < nslots; i++) {583 delete slots[i].node;584 }585 586 delete [] slots;587 397 } 588 398 … … 600 410 unsigned nnodes, 601 411 local_stat_t & local, 602 LIST_VARIANT<Node> & list412 relaxed_list<Node> & list 603 413 ) { 604 414 Node * nodes[nnodes]; … … 660 470 661 471 // List being tested 662 LIST_VARIANT<Node> list = { nthread,nqueues };472 relaxed_list<Node> list = { nthread * nqueues }; 663 473 { 664 474 enable_stats = true; … … 711 521 print_stats(duration, nthread, global); 712 522 713 //save_fairness(data_out.get(), 100, nthread, width, length, output);523 save_fairness(data_out.get(), 100, nthread, width, length, output); 714 524 } 715 525 … … 737 547 Churn, 738 548 PingPong, 739 Producer,740 549 Fairness, 741 550 NONE … … 768 577 case PingPong: 769 578 nnodes = 1; 579 nslots = 1; 770 580 switch(argc - optind) { 771 581 case 0: break; … … 781 591 break; 782 592 default: 783 std::cerr << "'PingPong' benchmark doesn't accept more than 1 extra arguments" << std::endl; 784 goto usage; 785 } 786 break; 787 case Producer: 788 nnodes = 32; 789 switch(argc - optind) { 790 case 0: break; 791 case 1: 792 try { 793 arg = optarg = argv[optind]; 794 nnodes = stoul(optarg, &len); 795 if(len != arg.size()) { throw std::invalid_argument(""); } 796 } catch(std::invalid_argument &) { 797 std::cerr << "Number of nodes must be a positive integer, was " << arg << std::endl; 798 goto usage; 799 } 800 break; 801 default: 802 std::cerr << "'Producer' benchmark doesn't accept more than 1 extra arguments" << std::endl; 593 std::cerr << "'PingPong' benchmark doesn't accept more than 2 extra arguments" << std::endl; 803 594 goto usage; 804 595 } … … 871 662 break; 872 663 } 873 if(iequals(arg, "producer")) {874 benchmark = Producer;875 break;876 }877 664 if(iequals(arg, "fairness")) { 878 665 benchmark = Fairness; … … 915 702 std::cerr << "Usage: " << argv[0] << ": [options] -b churn [NNODES] [NSLOTS = NNODES]" << std::endl; 916 703 std::cerr << " or: " << argv[0] << ": [options] -b pingpong [NNODES]" << std::endl; 917 std::cerr << " or: " << argv[0] << ": [options] -b producer [NNODES]" << std::endl;918 704 std::cerr << std::endl; 919 705 std::cerr << " -d, --duration=DURATION Duration of the experiment, in seconds" << std::endl; … … 928 714 929 715 std::cout << "Running " << nthreads << " threads (" << (nthreads * nqueues) << " queues) for " << duration << " seconds" << std::endl; 930 std::cout << "Relaxed list variant: " << LIST_VARIANT<Node>::name() << std::endl;931 716 switch(benchmark) { 932 717 case Churn: … … 935 720 case PingPong: 936 721 runPingPong(nthreads, nqueues, duration, nnodes); 937 break;938 case Producer:939 runProducer(nthreads, nqueues, duration, nnodes);940 722 break; 941 723 case Fairness: … … 1019 801 } 1020 802 1021 //void save_fairness(const int data[], int factor, unsigned nthreads, size_t columns, size_t rows, const std::string & output) {1022 //std::ofstream os(output);1023 //os << "<html>\n";1024 //os << "<head>\n";1025 //os << "<style>\n";1026 //os << "</style>\n";1027 //os << "</head>\n";1028 //os << "<body>\n";1029 //os << "<table style=\"width=100%\">\n";1030 1031 //size_t idx = 0;1032 //for(size_t r = 0ul; r < rows; r++) {1033 //os << "<tr>\n";1034 //for(size_t c = 0ul; c < columns; c++) {1035 //os << "<td class=\"custom custom" << data[idx] << "\"></td>\n";1036 //idx++;1037 //}1038 //os << "</tr>\n";1039 //}1040 1041 //os << "</table>\n";1042 //os << "</body>\n";1043 //os << "</html>\n";1044 //os << std::endl;1045 //}1046 1047 //#include <png.h>1048 //#include <setjmp.h>803 void save_fairness(const int data[], int factor, unsigned nthreads, size_t columns, size_t rows, const std::string & output) { 804 std::ofstream os(output); 805 os << "<html>\n"; 806 os << "<head>\n"; 807 os << "<style>\n"; 808 os << "</style>\n"; 809 os << "</head>\n"; 810 os << "<body>\n"; 811 os << "<table style=\"width=100%\">\n"; 812 813 size_t idx = 0; 814 for(size_t r = 0ul; r < rows; r++) { 815 os << "<tr>\n"; 816 for(size_t c = 0ul; c < columns; c++) { 817 os << "<td class=\"custom custom" << data[idx] << "\"></td>\n"; 818 idx++; 819 } 820 os << "</tr>\n"; 821 } 822 823 os << "</table>\n"; 824 os << "</body>\n"; 825 os << "</html>\n"; 826 os << std::endl; 827 } 828 829 #include <png.h> 830 #include <setjmp.h> 1049 831 1050 832 /* -
doc/theses/thierry_delisle_PhD/code/relaxed_list.hpp
r8b58bae r7f9968ad 1 1 #pragma once 2 #define LIST_VARIANT relaxed_list3 4 #define VANILLA 05 #define SNZI 16 #define BITMASK 27 #define DISCOVER 38 #define SNZM 49 #define BIAS 510 11 #ifndef VARIANT12 #define VARIANT VANILLA13 #endif14 2 15 3 #ifndef NO_STATS … … 17 5 #endif 18 6 19 #include <cmath>20 7 #include <memory> 21 8 #include <mutex> … … 24 11 #include "assert.hpp" 25 12 #include "utils.hpp" 26 #include "links.hpp"27 #include "snzi.hpp"28 #include "snzm.hpp"29 13 30 14 using namespace std; 15 16 struct spinlock_t { 17 std::atomic_bool ll = { false }; 18 19 inline void lock() { 20 while( __builtin_expect(ll.exchange(true),false) ) { 21 while(ll.load(std::memory_order_relaxed)) 22 asm volatile("pause"); 23 } 24 } 25 26 inline bool try_lock() { 27 return false == ll.exchange(true); 28 } 29 30 inline void unlock() { 31 ll.store(false, std::memory_order_release); 32 } 33 34 inline explicit operator bool() { 35 return ll.load(std::memory_order_relaxed); 36 } 37 }; 38 39 static inline bool bts(std::atomic_size_t & target, size_t bit ) { 40 //* 41 int result = 0; 42 asm volatile( 43 "LOCK btsq %[bit], %[target]\n\t" 44 :"=@ccc" (result) 45 : [target] "m" (target), [bit] "r" (bit) 46 ); 47 return result != 0; 48 /*/ 49 size_t mask = 1ul << bit; 50 size_t ret = target.fetch_or(mask, std::memory_order_relaxed); 51 return (ret & mask) != 0; 52 //*/ 53 } 54 55 static inline bool btr(std::atomic_size_t & target, size_t bit ) { 56 //* 57 int result = 0; 58 asm volatile( 59 "LOCK btrq %[bit], %[target]\n\t" 60 :"=@ccc" (result) 61 : [target] "m" (target), [bit] "r" (bit) 62 ); 63 return result != 0; 64 /*/ 65 size_t mask = 1ul << bit; 66 size_t ret = target.fetch_and(~mask, std::memory_order_relaxed); 67 return (ret & mask) != 0; 68 //*/ 69 } 70 71 extern bool enable_stats; 31 72 32 73 struct pick_stat { … … 34 75 size_t attempt = 0; 35 76 size_t success = 0; 36 size_t local = 0;37 77 } push; 38 78 struct { … … 40 80 size_t success = 0; 41 81 size_t mask_attempt = 0; 42 size_t mask_reset = 0;43 size_t local = 0;44 82 } pop; 45 83 }; … … 57 95 58 96 template<typename node_t> 97 struct _LinksFields_t { 98 node_t * prev = nullptr; 99 node_t * next = nullptr; 100 unsigned long long ts = 0; 101 }; 102 103 template<typename node_t> 59 104 class __attribute__((aligned(128))) relaxed_list { 60 105 static_assert(std::is_same<decltype(node_t::_links), _LinksFields_t<node_t>>::value, "Node must have a links field"); 61 106 62 107 public: 63 static const char * name() { 64 const char * names[] = { 65 "RELAXED: VANILLA", 66 "RELAXED: SNZI", 67 "RELAXED: BITMASK", 68 "RELAXED: SNZI + DISCOVERED MASK", 69 "RELAXED: SNZI + MASK", 70 "RELAXED: SNZI + LOCAL BIAS" 71 }; 72 return names[VARIANT]; 73 } 74 75 relaxed_list(unsigned numThreads, unsigned numQueues) 76 : numLists(numThreads * numQueues) 77 , lists(new intrusive_queue_t<node_t>[numLists]) 78 #if VARIANT == SNZI || VARIANT == BIAS 79 , snzi( std::log2( numLists / (2 * numQueues) ), 2 ) 80 #elif VARIANT == SNZM || VARIANT == DISCOVER 81 , snzm( numLists ) 82 #endif 108 relaxed_list(unsigned numLists) 109 : lists(new intrusive_queue_t[numLists]) 110 , numLists(numLists) 83 111 { 84 112 assertf(7 * 8 * 8 >= numLists, "List currently only supports 448 sublists"); 113 // assert(sizeof(*this) == 128); 85 114 std::cout << "Constructing Relaxed List with " << numLists << std::endl; 115 116 #ifndef NO_STATS 117 if(head) this->next = head; 118 head = this; 119 #endif 86 120 } 87 121 … … 96 130 while(true) { 97 131 // Pick a random list 98 #if VARIANT == BIAS99 unsigned r = tls.rng.next();100 unsigned i;101 if(0 == (r & 0xF)) {102 i = r >> 4;103 } else {104 i = tls.my_queue + ((r >> 4) % 4);105 tls.pick.push.local++;106 }107 i %= numLists;108 #else109 132 unsigned i = tls.rng.next() % numLists; 110 #endif111 133 112 134 #ifndef NO_STATS … … 117 139 if( !lists[i].lock.try_lock() ) continue; 118 140 119 #if VARIANT != SNZM && VARIANT != SNZI && VARIANT != DISCOVER && VARIANT != BIAS 120 __attribute__((unused)) int num = numNonEmpty; 121 #endif 141 __attribute__((unused)) int num = numNonEmpty; 122 142 123 143 // Actually push it 124 144 if(lists[i].push(node)) { 125 #if VARIANT == DISCOVER 126 size_t qword = i >> 6ull; 127 size_t bit = i & 63ull; 128 assert(qword == 0); 129 bts(tls.mask, bit); 130 snzm.arrive(i); 131 #elif VARIANT == SNZI || VARIANT == BIAS 132 snzi.arrive(i); 133 #elif VARIANT == SNZM 134 snzm.arrive(i); 135 #elif VARIANT == BITMASK 136 numNonEmpty++; 137 size_t qword = i >> 6ull; 138 size_t bit = i & 63ull; 139 assertf((list_mask[qword] & (1ul << bit)) == 0, "Before set %zu:%zu (%u), %zx & %zx", qword, bit, i, list_mask[qword].load(), (1ul << bit)); 140 __attribute__((unused)) bool ret = bts(list_mask[qword], bit); 141 assert(!ret); 142 assertf((list_mask[qword] & (1ul << bit)) != 0, "After set %zu:%zu (%u), %zx & %zx", qword, bit, i, list_mask[qword].load(), (1ul << bit)); 143 #else 144 numNonEmpty++; 145 #endif 146 } 147 #if VARIANT != SNZM && VARIANT != SNZI && VARIANT != DISCOVER && VARIANT != BIAS 148 assert(numNonEmpty <= (int)numLists); 149 #endif 145 numNonEmpty++; 146 size_t qword = i >> 6ull; 147 size_t bit = i & 63ull; 148 assertf((list_mask[qword] & (1ul << bit)) == 0, "Before set %zu:%zu (%u), %zx & %zx", qword, bit, i, list_mask[qword].load(), (1ul << bit)); 149 __attribute__((unused)) bool ret = bts(list_mask[qword], bit); 150 assert(!ret); 151 assertf((list_mask[qword] & (1ul << bit)) != 0, "After set %zu:%zu (%u), %zx & %zx", qword, bit, i, list_mask[qword].load(), (1ul << bit)); 152 } 153 assert(numNonEmpty <= (int)numLists); 150 154 151 155 // Unlock and return … … 154 158 #ifndef NO_STATS 155 159 tls.pick.push.success++; 156 #if VARIANT != SNZM && VARIANT != SNZI && VARIANT != DISCOVER && VARIANT != BIAS 157 tls.empty.push.value += num; 158 tls.empty.push.count += 1; 159 #endif 160 tls.empty.push.value += num; 161 tls.empty.push.count += 1; 160 162 #endif 161 163 return; … … 164 166 165 167 __attribute__((noinline, hot)) node_t * pop() { 166 #if VARIANT == DISCOVER 167 assert(numLists <= 64); 168 while(snzm.query()) { 169 tls.pick.pop.mask_attempt++; 170 unsigned i, j; 171 { 172 // Pick first list totally randomly 173 i = tls.rng.next() % numLists; 174 175 // Pick the other according to the bitmask 176 unsigned r = tls.rng.next(); 177 178 size_t mask = tls.mask.load(std::memory_order_relaxed); 179 if(mask == 0) { 180 tls.pick.pop.mask_reset++; 181 mask = (1U << numLists) - 1; 182 tls.mask.store(mask, std::memory_order_relaxed); 183 } 184 185 unsigned b = rand_bit(r, mask); 186 187 assertf(b < 64, "%zu %u", mask, b); 188 189 j = b; 190 191 assert(j < numLists); 192 } 193 194 if(auto node = try_pop(i, j)) return node; 195 } 196 #elif VARIANT == SNZI 197 while(snzi.query()) { 198 // Pick two lists at random 199 int i = tls.rng.next() % numLists; 200 // int j = tls.rng.next() % numLists; 201 202 if(auto node = try_pop(i, j)) return node; 203 } 204 205 #elif VARIANT == BIAS 206 while(snzi.query()) { 207 // Pick two lists at random 208 unsigned ri = tls.rng.next(); 209 unsigned i; 210 unsigned j = tls.rng.next(); 211 if(0 == (ri & 0xF)) { 212 i = (ri >> 4) % numLists; 213 } else { 214 i = tls.my_queue + ((ri >> 4) % 4); 215 j = tls.my_queue + ((j >> 4) % 4); 216 tls.pick.pop.local++; 217 } 218 i %= numLists; 219 j %= numLists; 220 221 if(auto node = try_pop(i, j)) return node; 222 } 223 #elif VARIANT == SNZM 224 //* 225 while(snzm.query()) { 226 tls.pick.pop.mask_attempt++; 227 unsigned i, j; 228 { 229 // Pick two random number 230 unsigned ri = tls.rng.next(); 231 unsigned rj = tls.rng.next(); 232 233 // Pick two nodes from it 234 unsigned wdxi = ri & snzm.mask; 235 // unsigned wdxj = rj & snzm.mask; 236 237 // Get the masks from the nodes 238 // size_t maski = snzm.masks(wdxi); 239 size_t maskj = snzm.masks(wdxj); 240 241 if(maski == 0 && maskj == 0) continue; 242 243 #if defined(__BMI2__) 244 uint64_t idxsi = _pext_u64(snzm.indexes, maski); 245 // uint64_t idxsj = _pext_u64(snzm.indexes, maskj); 246 247 auto pi = __builtin_popcountll(maski); 248 // auto pj = __builtin_popcountll(maskj); 249 250 ri = pi ? ri & ((pi >> 3) - 1) : 0; 251 rj = pj ? rj & ((pj >> 3) - 1) : 0; 252 253 unsigned bi = (idxsi >> (ri << 3)) & 0xff; 254 unsigned bj = (idxsj >> (rj << 3)) & 0xff; 255 #else 256 unsigned bi = rand_bit(ri >> snzm.depth, maski); 257 unsigned bj = rand_bit(rj >> snzm.depth, maskj); 258 #endif 259 260 i = (bi << snzm.depth) | wdxi; 261 j = (bj << snzm.depth) | wdxj; 262 263 /* paranoid */ assertf(i < numLists, "%u %u", bj, wdxi); 264 /* paranoid */ assertf(j < numLists, "%u %u", bj, wdxj); 265 } 266 267 if(auto node = try_pop(i, j)) return node; 268 } 269 /*/ 270 while(snzm.query()) { 271 // Pick two lists at random 272 int i = tls.rng.next() % numLists; 273 int j = tls.rng.next() % numLists; 274 275 if(auto node = try_pop(i, j)) return node; 276 } 277 //*/ 278 #elif VARIANT == BITMASK 168 #if !defined(NO_BITMASK) 169 // for(int r = 0; r < 10 && numNonEmpty != 0; r++) { 170 // // Pick two lists at random 171 // unsigned i = tls.rng.next() % numLists; 172 // unsigned j = tls.rng.next() % numLists; 173 174 // if(auto node = try_pop(i, j)) return node; 175 // } 279 176 int nnempty; 280 177 while(0 != (nnempty = numNonEmpty)) { 281 178 tls.pick.pop.mask_attempt++; 282 179 unsigned i, j; 180 // if( numLists < 4 || (numLists / nnempty) < 4 ) { 181 // // Pick two lists at random 182 // i = tls.rng.next() % numLists; 183 // j = tls.rng.next() % numLists; 184 // } else 283 185 { 186 #ifndef NO_STATS 187 // tls.pick.push.mask_attempt++; 188 #endif 189 284 190 // Pick two lists at random 285 191 unsigned num = ((numLists - 1) >> 6) + 1; … … 330 236 #endif 331 237 332 #if VARIANT == DISCOVER333 if(lists[i].ts() > 0) bts(tls.mask, i); else btr(tls.mask, i);334 if(lists[j].ts() > 0) bts(tls.mask, j); else btr(tls.mask, j);335 #endif336 337 238 // Pick the bet list 338 239 int w = i; … … 348 249 if( !list.lock.try_lock() ) return nullptr; 349 250 350 #if VARIANT != SNZM && VARIANT != SNZI && VARIANT != DISCOVER && VARIANT != BIAS 351 __attribute__((unused)) int num = numNonEmpty; 352 #endif 251 __attribute__((unused)) int num = numNonEmpty; 353 252 354 253 // If list is empty, unlock and retry … … 365 264 366 265 if(emptied) { 367 #if VARIANT == DISCOVER 368 size_t qword = w >> 6ull; 369 size_t bit = w & 63ull; 370 assert(qword == 0); 371 __attribute__((unused)) bool ret = btr(tls.mask, bit); 372 snzm.depart(w); 373 #elif VARIANT == SNZI || VARIANT == BIAS 374 snzi.depart(w); 375 #elif VARIANT == SNZM 376 snzm.depart(w); 377 #elif VARIANT == BITMASK 378 numNonEmpty--; 379 size_t qword = w >> 6ull; 380 size_t bit = w & 63ull; 381 assert((list_mask[qword] & (1ul << bit)) != 0); 382 __attribute__((unused)) bool ret = btr(list_mask[qword], bit); 383 assert(ret); 384 assert((list_mask[qword] & (1ul << bit)) == 0); 385 #else 386 numNonEmpty--; 387 #endif 266 numNonEmpty--; 267 size_t qword = w >> 6ull; 268 size_t bit = w & 63ull; 269 assert((list_mask[qword] & (1ul << bit)) != 0); 270 __attribute__((unused)) bool ret = btr(list_mask[qword], bit); 271 assert(ret); 272 assert((list_mask[qword] & (1ul << bit)) == 0); 388 273 } 389 274 390 275 // Unlock and return 391 276 list.lock.unlock(); 392 #if VARIANT != SNZM && VARIANT != SNZI && VARIANT != DISCOVER && VARIANT != BIAS 393 assert(numNonEmpty >= 0); 394 #endif 277 assert(numNonEmpty >= 0); 395 278 #ifndef NO_STATS 396 279 tls.pick.pop.success++; 397 #if VARIANT != SNZM && VARIANT != SNZI && VARIANT != DISCOVER && VARIANT != BIAS 398 tls.empty.pop.value += num; 399 tls.empty.pop.count += 1; 400 #endif 280 tls.empty.pop.value += num; 281 tls.empty.pop.count += 1; 401 282 #endif 402 283 return node; 403 284 } 404 285 286 private: 287 288 class __attribute__((aligned(128))) intrusive_queue_t { 289 public: 290 typedef spinlock_t lock_t; 291 292 friend class relaxed_list<node_t>; 293 294 struct stat { 295 ssize_t diff = 0; 296 size_t push = 0; 297 size_t pop = 0; 298 // size_t value = 0; 299 // size_t count = 0; 300 }; 301 302 private: 303 struct sentinel_t { 304 _LinksFields_t<node_t> _links; 305 }; 306 307 lock_t lock; 308 sentinel_t before; 309 sentinel_t after; 310 #ifndef NO_STATS 311 stat s; 312 #endif 313 314 #pragma GCC diagnostic push 315 #pragma GCC diagnostic ignored "-Winvalid-offsetof" 316 static constexpr auto fields_offset = offsetof( node_t, _links ); 317 #pragma GCC diagnostic pop 318 public: 319 intrusive_queue_t() 320 : before{{ nullptr, tail() }} 321 , after {{ head(), nullptr }} 322 { 323 /* paranoid */ assert((reinterpret_cast<uintptr_t>( head() ) + fields_offset) == reinterpret_cast<uintptr_t>(&before)); 324 /* paranoid */ assert((reinterpret_cast<uintptr_t>( tail() ) + fields_offset) == reinterpret_cast<uintptr_t>(&after )); 325 /* paranoid */ assert(head()->_links.prev == nullptr); 326 /* paranoid */ assert(head()->_links.next == tail() ); 327 /* paranoid */ assert(tail()->_links.next == nullptr); 328 /* paranoid */ assert(tail()->_links.prev == head() ); 329 /* paranoid */ assert(sizeof(*this) == 128); 330 /* paranoid */ assert((intptr_t(this) % 128) == 0); 331 } 332 333 ~intrusive_queue_t() = default; 334 335 inline node_t * head() const { 336 node_t * rhead = reinterpret_cast<node_t *>( 337 reinterpret_cast<uintptr_t>( &before ) - fields_offset 338 ); 339 assert(rhead); 340 return rhead; 341 } 342 343 inline node_t * tail() const { 344 node_t * rtail = reinterpret_cast<node_t *>( 345 reinterpret_cast<uintptr_t>( &after ) - fields_offset 346 ); 347 assert(rtail); 348 return rtail; 349 } 350 351 inline bool push(node_t * node) { 352 assert(lock); 353 assert(node->_links.ts != 0); 354 node_t * tail = this->tail(); 355 356 node_t * prev = tail->_links.prev; 357 // assertf(node->_links.ts >= prev->_links.ts, 358 // "New node has smaller timestamp: %llu < %llu", node->_links.ts, prev->_links.ts); 359 node->_links.next = tail; 360 node->_links.prev = prev; 361 prev->_links.next = node; 362 tail->_links.prev = node; 363 #ifndef NO_STATS 364 if(enable_stats) { 365 s.diff++; 366 s.push++; 367 } 368 #endif 369 if(before._links.ts == 0l) { 370 before._links.ts = node->_links.ts; 371 assert(node->_links.prev == this->head()); 372 return true; 373 } 374 return false; 375 } 376 377 inline std::pair<node_t *, bool> pop() { 378 assert(lock); 379 node_t * head = this->head(); 380 node_t * tail = this->tail(); 381 382 node_t * node = head->_links.next; 383 node_t * next = node->_links.next; 384 if(node == tail) return {nullptr, false}; 385 386 head->_links.next = next; 387 next->_links.prev = head; 388 389 #ifndef NO_STATS 390 if(enable_stats) { 391 s.diff--; 392 s.pop ++; 393 } 394 #endif 395 if(next == tail) { 396 before._links.ts = 0l; 397 return {node, true}; 398 } 399 else { 400 assert(next->_links.ts != 0); 401 before._links.ts = next->_links.ts; 402 assert(before._links.ts != 0); 403 return {node, false}; 404 } 405 } 406 407 long long ts() const { 408 return before._links.ts; 409 } 410 }; 411 405 412 406 413 public: … … 408 415 static __attribute__((aligned(128))) thread_local struct TLS { 409 416 Random rng = { int(rdtscl()) }; 410 unsigned my_queue = (ticket++) * 4;411 417 pick_stat pick; 412 418 empty_stat empty; 413 __attribute__((aligned(64))) std::atomic_size_t mask = { 0 };414 419 } tls; 415 420 421 public: 422 std::atomic_int numNonEmpty = { 0 }; // number of non-empty lists 423 std::atomic_size_t list_mask[7] = { {0}, {0}, {0}, {0}, {0}, {0}, {0} }; // which queues are empty 416 424 private: 425 __attribute__((aligned(64))) std::unique_ptr<intrusive_queue_t []> lists; 417 426 const unsigned numLists; 418 __attribute__((aligned(64))) std::unique_ptr<intrusive_queue_t<node_t> []> lists;419 private:420 #if VARIANT == SNZI || VARIANT == BIAS421 snzi_t snzi;422 #elif VARIANT == SNZM || VARIANT == DISCOVER423 snzm_t snzm;424 #else425 std::atomic_int numNonEmpty = { 0 }; // number of non-empty lists426 #endif427 #if VARIANT == BITMASK428 std::atomic_size_t list_mask[7] = { {0}, {0}, {0}, {0}, {0}, {0}, {0} }; // which queues are empty429 #endif430 427 431 428 public: 432 static const constexpr size_t sizeof_queue = sizeof(intrusive_queue_t<node_t>); 433 static std::atomic_uint32_t ticket; 429 static const constexpr size_t sizeof_queue = sizeof(intrusive_queue_t); 434 430 435 431 #ifndef NO_STATS 432 static void stats_print(std::ostream & os) { 433 auto it = head; 434 while(it) { 435 it->stats_print_local(os); 436 it = it->next; 437 } 438 } 439 436 440 static void stats_tls_tally() { 437 441 global_stats.pick.push.attempt += tls.pick.push.attempt; 438 442 global_stats.pick.push.success += tls.pick.push.success; 439 global_stats.pick.push.local += tls.pick.push.local;440 443 global_stats.pick.pop .attempt += tls.pick.pop.attempt; 441 444 global_stats.pick.pop .success += tls.pick.pop.success; 442 445 global_stats.pick.pop .mask_attempt += tls.pick.pop.mask_attempt; 443 global_stats.pick.pop .mask_reset += tls.pick.pop.mask_reset;444 global_stats.pick.pop .local += tls.pick.pop.local;445 446 446 447 global_stats.qstat.push.value += tls.empty.push.value; … … 456 457 std::atomic_size_t attempt = { 0 }; 457 458 std::atomic_size_t success = { 0 }; 458 std::atomic_size_t local = { 0 };459 459 } push; 460 460 struct { … … 462 462 std::atomic_size_t success = { 0 }; 463 463 std::atomic_size_t mask_attempt = { 0 }; 464 std::atomic_size_t mask_reset = { 0 };465 std::atomic_size_t local = { 0 };466 464 } pop; 467 465 } pick; … … 478 476 } global_stats; 479 477 480 public: 481 static void stats_print(std::ostream & os ) { 478 // Link list of all lists for stats 479 __attribute__((aligned(64))) relaxed_list<node_t> * next = nullptr; 480 481 static relaxed_list<node_t> * head; 482 483 void stats_print_local(std::ostream & os ) { 482 484 std::cout << "----- Relaxed List Stats -----" << std::endl; 485 { 486 ssize_t diff = 0; 487 size_t num = 0; 488 ssize_t max = 0; 489 490 for(size_t i = 0; i < numLists; i++) { 491 const auto & list = lists[i]; 492 diff+= list.s.diff; 493 num ++; 494 max = std::abs(max) > std::abs(list.s.diff) ? max : list.s.diff; 495 os << "Local Q ops : " << (list.s.push + list.s.pop) << "(" << list.s.push << "i, " << list.s.pop << "o)\n"; 496 } 497 498 os << "Difference : " << ssize_t(double(diff) / num ) << " avg\t" << max << "max" << std::endl; 499 } 483 500 484 501 const auto & global = global_stats; … … 487 504 double pop_sur = (100.0 * double(global.pick.pop .success) / global.pick.pop .attempt); 488 505 double mpop_sur = (100.0 * double(global.pick.pop .success) / global.pick.pop .mask_attempt); 489 double rpop_sur = (100.0 * double(global.pick.pop .success) / global.pick.pop .mask_reset); 490 491 double push_len = double(global.pick.push.attempt ) / global.pick.push.success; 492 double pop_len = double(global.pick.pop .attempt ) / global.pick.pop .success; 493 double mpop_len = double(global.pick.pop .mask_attempt) / global.pick.pop .success; 494 double rpop_len = double(global.pick.pop .mask_reset ) / global.pick.pop .success; 495 496 os << "Push Pick : " << push_sur << " %, len " << push_len << " (" << global.pick.push.attempt << " / " << global.pick.push.success << ")\n"; 497 os << "Pop Pick : " << pop_sur << " %, len " << pop_len << " (" << global.pick.pop .attempt << " / " << global.pick.pop .success << ")\n"; 498 os << "TryPop Pick : " << mpop_sur << " %, len " << mpop_len << " (" << global.pick.pop .mask_attempt << " / " << global.pick.pop .success << ")\n"; 499 os << "Pop M Reset : " << rpop_sur << " %, len " << rpop_len << " (" << global.pick.pop .mask_reset << " / " << global.pick.pop .success << ")\n"; 506 507 os << "Push Pick % : " << push_sur << "(" << global.pick.push.success << " / " << global.pick.push.attempt << ")\n"; 508 os << "Pop Pick % : " << pop_sur << "(" << global.pick.pop .success << " / " << global.pick.pop .attempt << ")\n"; 509 os << "TryPop Pick % : " << mpop_sur << "(" << global.pick.pop .success << " / " << global.pick.pop .mask_attempt << ")\n"; 500 510 501 511 double avgQ_push = double(global.qstat.push.value) / global.qstat.push.count; … … 505 515 os << "Pop Avg Qs : " << avgQ_pop << " (" << global.qstat.pop .count << "ops)\n"; 506 516 os << "Global Avg Qs : " << avgQ << " (" << (global.qstat.push.count + global.qstat.pop .count) << "ops)\n"; 507 508 os << "Local Push : " << global.pick.push.local << "\n";509 os << "Local Pop : " << global.pick.pop .local << "\n";510 517 } 511 518 #endif -
doc/theses/thierry_delisle_PhD/code/utils.hpp
r8b58bae r7f9968ad 106 106 } 107 107 108 static inline unsigned rand_bit(unsigned rnum, size_t mask) __attribute__((artificial));109 108 static inline unsigned rand_bit(unsigned rnum, size_t mask) { 110 109 unsigned bit = mask ? rnum % __builtin_popcountl(mask) : 0; … … 144 143 #endif 145 144 } 146 147 struct spinlock_t {148 std::atomic_bool ll = { false };149 150 inline void lock() {151 while( __builtin_expect(ll.exchange(true),false) ) {152 while(ll.load(std::memory_order_relaxed))153 asm volatile("pause");154 }155 }156 157 inline bool try_lock() {158 return false == ll.exchange(true);159 }160 161 inline void unlock() {162 ll.store(false, std::memory_order_release);163 }164 165 inline explicit operator bool() {166 return ll.load(std::memory_order_relaxed);167 }168 };169 170 static inline bool bts(std::atomic_size_t & target, size_t bit ) {171 //*172 int result = 0;173 asm volatile(174 "LOCK btsq %[bit], %[target]\n\t"175 :"=@ccc" (result)176 : [target] "m" (target), [bit] "r" (bit)177 );178 return result != 0;179 /*/180 size_t mask = 1ul << bit;181 size_t ret = target.fetch_or(mask, std::memory_order_relaxed);182 return (ret & mask) != 0;183 //*/184 }185 186 static inline bool btr(std::atomic_size_t & target, size_t bit ) {187 //*188 int result = 0;189 asm volatile(190 "LOCK btrq %[bit], %[target]\n\t"191 :"=@ccc" (result)192 : [target] "m" (target), [bit] "r" (bit)193 );194 return result != 0;195 /*/196 size_t mask = 1ul << bit;197 size_t ret = target.fetch_and(~mask, std::memory_order_relaxed);198 return (ret & mask) != 0;199 //*/200 } -
libcfa/src/Makefile.am
r8b58bae r7f9968ad 50 50 thread_headers_nosrc = concurrency/invoke.h 51 51 thread_headers = concurrency/coroutine.hfa concurrency/thread.hfa concurrency/kernel.hfa concurrency/monitor.hfa concurrency/mutex.hfa 52 thread_libsrc = concurrency/CtxSwitch-@ARCHITECTURE@.S concurrency/alarm.cfa concurrency/invoke.c concurrency/io.cfa concurrency/preemption.cfa concurrency/ready_queue.cfa concurrency/stats.cfa${thread_headers:.hfa=.cfa}52 thread_libsrc = concurrency/CtxSwitch-@ARCHITECTURE@.S concurrency/alarm.cfa concurrency/invoke.c concurrency/io.cfa concurrency/preemption.cfa ${thread_headers:.hfa=.cfa} 53 53 else 54 54 headers = -
libcfa/src/Makefile.in
r8b58bae r7f9968ad 166 166 concurrency/CtxSwitch-@ARCHITECTURE@.S concurrency/alarm.cfa \ 167 167 concurrency/invoke.c concurrency/io.cfa \ 168 concurrency/preemption.cfa concurrency/ready_queue.cfa \ 169 concurrency/stats.cfa concurrency/coroutine.cfa \ 168 concurrency/preemption.cfa concurrency/coroutine.cfa \ 170 169 concurrency/thread.cfa concurrency/kernel.cfa \ 171 170 concurrency/monitor.cfa concurrency/mutex.cfa … … 177 176 @BUILDLIB_TRUE@ concurrency/alarm.lo concurrency/invoke.lo \ 178 177 @BUILDLIB_TRUE@ concurrency/io.lo concurrency/preemption.lo \ 179 @BUILDLIB_TRUE@ concurrency/ready_queue.lo concurrency/stats.lo \180 178 @BUILDLIB_TRUE@ $(am__objects_3) 181 179 am_libcfathread_la_OBJECTS = $(am__objects_4) … … 484 482 @BUILDLIB_FALSE@thread_headers = 485 483 @BUILDLIB_TRUE@thread_headers = concurrency/coroutine.hfa concurrency/thread.hfa concurrency/kernel.hfa concurrency/monitor.hfa concurrency/mutex.hfa 486 @BUILDLIB_TRUE@thread_libsrc = concurrency/CtxSwitch-@ARCHITECTURE@.S concurrency/alarm.cfa concurrency/invoke.c concurrency/io.cfa concurrency/preemption.cfa concurrency/ready_queue.cfa concurrency/stats.cfa${thread_headers:.hfa=.cfa}484 @BUILDLIB_TRUE@thread_libsrc = concurrency/CtxSwitch-@ARCHITECTURE@.S concurrency/alarm.cfa concurrency/invoke.c concurrency/io.cfa concurrency/preemption.cfa ${thread_headers:.hfa=.cfa} 487 485 488 486 #---------------------------------------------------------------------------------------------------------------- … … 622 620 concurrency/$(DEPDIR)/$(am__dirstamp) 623 621 concurrency/preemption.lo: concurrency/$(am__dirstamp) \ 624 concurrency/$(DEPDIR)/$(am__dirstamp)625 concurrency/ready_queue.lo: concurrency/$(am__dirstamp) \626 concurrency/$(DEPDIR)/$(am__dirstamp)627 concurrency/stats.lo: concurrency/$(am__dirstamp) \628 622 concurrency/$(DEPDIR)/$(am__dirstamp) 629 623 concurrency/coroutine.lo: concurrency/$(am__dirstamp) \ -
libcfa/src/bits/debug.hfa
r8b58bae r7f9968ad 52 52 || defined(__CFA_DEBUG_PRINT_IO__) || defined(__CFA_DEBUG_PRINT_IO_CORE__) \ 53 53 || defined(__CFA_DEBUG_PRINT_MONITOR__) || defined(__CFA_DEBUG_PRINT_PREEMPTION__) \ 54 || defined(__CFA_DEBUG_PRINT_RUNTIME_CORE__) || defined(__CFA_DEBUG_PRINT_EXCEPTION__) \ 55 || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) 54 || defined(__CFA_DEBUG_PRINT_RUNTIME_CORE__) || defined(__CFA_DEBUG_PRINT_EXCEPTION__) 56 55 #include <stdio.h> 57 56 #include <unistd.h> -
libcfa/src/bits/defs.hfa
r8b58bae r7f9968ad 54 54 return ( (unsigned long long)lo)|( ((unsigned long long)hi)<<32 ); 55 55 } 56 57 // #define __CFA_NO_BIT_TEST_AND_SET__58 59 #if defined( __i386 )60 static inline bool __atomic_bts(volatile unsigned long int * target, unsigned long int bit ) {61 #if defined(__CFA_NO_BIT_TEST_AND_SET__)62 unsigned long int mask = 1ul << bit;63 unsigned long int ret = __atomic_fetch_or(target, mask, (int)__ATOMIC_RELAXED);64 return (ret & mask) != 0;65 #else66 int result = 0;67 asm volatile(68 "LOCK btsl %[bit], %[target]\n\t"69 : "=@ccc" (result)70 : [target] "m" (*target), [bit] "r" (bit)71 );72 return result != 0;73 #endif74 }75 76 static inline bool __atomic_btr(volatile unsigned long int * target, unsigned long int bit ) {77 #if defined(__CFA_NO_BIT_TEST_AND_SET__)78 unsigned long int mask = 1ul << bit;79 unsigned long int ret = __atomic_fetch_and(target, ~mask, (int)__ATOMIC_RELAXED);80 return (ret & mask) != 0;81 #else82 int result = 0;83 asm volatile(84 "LOCK btrl %[bit], %[target]\n\t"85 :"=@ccc" (result)86 : [target] "m" (*target), [bit] "r" (bit)87 );88 return result != 0;89 #endif90 }91 #elif defined( __x86_64 )92 static inline bool __atomic_bts(volatile unsigned long long int * target, unsigned long long int bit ) {93 #if defined(__CFA_NO_BIT_TEST_AND_SET__)94 unsigned long long int mask = 1ul << bit;95 unsigned long long int ret = __atomic_fetch_or(target, mask, (int)__ATOMIC_RELAXED);96 return (ret & mask) != 0;97 #else98 int result = 0;99 asm volatile(100 "LOCK btsq %[bit], %[target]\n\t"101 : "=@ccc" (result)102 : [target] "m" (*target), [bit] "r" (bit)103 );104 return result != 0;105 #endif106 }107 108 static inline bool __atomic_btr(volatile unsigned long long int * target, unsigned long long int bit ) {109 #if defined(__CFA_NO_BIT_TEST_AND_SET__)110 unsigned long long int mask = 1ul << bit;111 unsigned long long int ret = __atomic_fetch_and(target, ~mask, (int)__ATOMIC_RELAXED);112 return (ret & mask) != 0;113 #else114 int result = 0;115 asm volatile(116 "LOCK btrq %[bit], %[target]\n\t"117 :"=@ccc" (result)118 : [target] "m" (*target), [bit] "r" (bit)119 );120 return result != 0;121 #endif122 }123 #elif defined( __ARM_ARCH )124 #error __atomic_bts and __atomic_btr not implemented for arm125 #else126 #error uknown hardware architecture127 #endif -
libcfa/src/concurrency/invoke.h
r8b58bae r7f9968ad 48 48 extern __attribute__((aligned(128))) thread_local struct KernelThreadData { 49 49 struct $thread * volatile this_thread; 50 struct processor * volatile this_processor; 51 struct __stats_t * volatile this_stats; 50 struct processor * volatile this_processor; 52 51 53 52 struct { … … 57 56 } preemption_state; 58 57 59 __uint128_t rand_seed;58 uint32_t rand_seed; 60 59 } kernelTLS __attribute__ ((tls_model ( "initial-exec" ))); 61 60 } … … 93 92 }; 94 93 95 enum __Coroutine_State { Halted, Start, Primed, Blocked, Ready, Active};94 enum coroutine_state { Halted, Start, Primed, Blocked, Ready, Active, Rerun }; 96 95 enum __Preemption_Reason { __NO_PREEMPTION, __ALARM_PREEMPTION, __POLL_PREEMPTION, __MANUAL_PREEMPTION }; 97 96 … … 107 106 108 107 // current execution status for coroutine 109 enum __Coroutine_State state;108 enum coroutine_state state; 110 109 111 110 // first coroutine to resume this one … … 162 161 }; 163 162 164 // Link lists fields165 // instrusive link field for threads166 struct __thread_desc_link {167 struct $thread * next;168 struct $thread * prev;169 volatile unsigned long long ts;170 int preferred;171 };172 173 163 struct $thread { 174 164 // Core threading fields … … 177 167 178 168 // current execution status for coroutine 179 volatile int ticket; 180 enum __Coroutine_State state:8; 181 enum __Preemption_Reason preempted:8; 169 volatile int state; 170 enum __Preemption_Reason preempted; 182 171 183 172 //SKULLDUGGERY errno is not save in the thread data structure because returnToKernel appears to be the only function to require saving and restoring it 173 174 // coroutine body used to store context 175 struct $coroutine self_cor; 176 177 // current active context 178 struct $coroutine * curr_cor; 179 180 // monitor body used for mutual exclusion 181 struct $monitor self_mon; 182 183 // pointer to monitor with sufficient lifetime for current monitors 184 struct $monitor * self_mon_p; 184 185 185 186 // pointer to the cluster on which the thread is running 186 187 struct cluster * curr_cluster; 187 188 189 // monitors currently held by this thread 190 struct __monitor_group_t monitors; 191 188 192 // Link lists fields 189 193 // instrusive link field for threads 190 struct __thread_desc_link link; 191 192 // coroutine body used to store context 193 struct $coroutine self_cor; 194 195 // current active context 196 struct $coroutine * curr_cor; 197 198 // monitor body used for mutual exclusion 199 struct $monitor self_mon; 200 201 // pointer to monitor with sufficient lifetime for current monitors 202 struct $monitor * self_mon_p; 203 204 // monitors currently held by this thread 205 struct __monitor_group_t monitors; 194 struct $thread * next; 206 195 207 196 struct { … … 213 202 // previous function to park/unpark the thread 214 203 const char * park_caller; 215 int park_result; 216 enum __Coroutine_State park_state; 204 enum coroutine_state park_result; 217 205 bool park_stale; 218 206 const char * unpark_caller; 219 int unpark_result; 220 enum __Coroutine_State unpark_state; 207 enum coroutine_state unpark_result; 221 208 bool unpark_stale; 222 209 #endif … … 231 218 #ifdef __cforall 232 219 extern "Cforall" { 233 234 220 static inline $thread *& get_next( $thread & this ) __attribute__((const)) { 235 return this. link.next;221 return this.next; 236 222 } 237 223 -
libcfa/src/concurrency/io.cfa
r8b58bae r7f9968ad 135 135 void * ring_ptr; 136 136 size_t ring_sz; 137 138 // Statistics 139 #if !defined(__CFA_NO_STATISTICS__) 140 struct { 141 struct { 142 volatile unsigned long long int rdy; 143 volatile unsigned long long int csm; 144 volatile unsigned long long int avl; 145 volatile unsigned long long int cnt; 146 } submit_avg; 147 struct { 148 volatile unsigned long long int val; 149 volatile unsigned long long int cnt; 150 volatile unsigned long long int block; 151 } look_avg; 152 struct { 153 volatile unsigned long long int val; 154 volatile unsigned long long int cnt; 155 volatile unsigned long long int block; 156 } alloc_avg; 157 } stats; 158 #endif 137 159 }; 138 160 … … 155 177 void * ring_ptr; 156 178 size_t ring_sz; 179 180 // Statistics 181 #if !defined(__CFA_NO_STATISTICS__) 182 struct { 183 struct { 184 unsigned long long int val; 185 unsigned long long int slow_cnt; 186 unsigned long long int fast_cnt; 187 } completed_avg; 188 } stats; 189 #endif 157 190 }; 158 191 … … 167 200 struct { 168 201 struct { 169 __processor_id_t id;170 202 void * stack; 171 203 pthread_t kthrd; … … 299 331 (this.io->submit){ min(*sq.num, *cq.num) }; 300 332 333 // Initialize statistics 334 #if !defined(__CFA_NO_STATISTICS__) 335 this.io->submit_q.stats.submit_avg.rdy = 0; 336 this.io->submit_q.stats.submit_avg.csm = 0; 337 this.io->submit_q.stats.submit_avg.avl = 0; 338 this.io->submit_q.stats.submit_avg.cnt = 0; 339 this.io->submit_q.stats.look_avg.val = 0; 340 this.io->submit_q.stats.look_avg.cnt = 0; 341 this.io->submit_q.stats.look_avg.block = 0; 342 this.io->submit_q.stats.alloc_avg.val = 0; 343 this.io->submit_q.stats.alloc_avg.cnt = 0; 344 this.io->submit_q.stats.alloc_avg.block = 0; 345 this.io->completion_q.stats.completed_avg.val = 0; 346 this.io->completion_q.stats.completed_avg.slow_cnt = 0; 347 this.io->completion_q.stats.completed_avg.fast_cnt = 0; 348 #endif 349 301 350 if(!main_cluster) { 302 351 __kernel_io_finish_start( this ); … … 335 384 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 336 385 with( this.io->poller.fast ) { 337 /* paranoid */ verify( this. nprocessors == 0|| &this == mainCluster );338 /* paranoid */ verify( !ready_mutate_islocked());386 /* paranoid */ verify( this.procs.head == 0p || &this == mainCluster ); 387 /* paranoid */ verify( this.idles.head == 0p || &this == mainCluster ); 339 388 340 389 // We need to adjust the clean-up based on where the thread is 341 390 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 342 391 343 ready_schedule_lock( (struct __processor_id_t *)active_processor() ); 344 345 // This is the tricky case 346 // The thread was preempted and now it is on the ready queue 347 // The thread should be the last on the list 348 /* paranoid */ verify( thrd.link.next != 0p ); 349 350 // Remove the thread from the ready queue of this cluster 351 __attribute__((unused)) bool removed = remove_head( &this, &thrd ); 352 /* paranoid */ verify( removed ); 353 thrd.link.next = 0p; 354 thrd.link.prev = 0p; 355 __cfaabi_dbg_debug_do( thrd.unpark_stale = true ); 356 357 // Fixup the thread state 358 thrd.state = Blocked; 359 thrd.ticket = 0; 360 thrd.preempted = __NO_PREEMPTION; 361 362 ready_schedule_unlock( (struct __processor_id_t *)active_processor() ); 392 // This is the tricky case 393 // The thread was preempted and now it is on the ready queue 394 /* paranoid */ verify( thrd.next == 1p ); // The thread should be the last on the list 395 /* paranoid */ verify( this.ready_queue.head == &thrd ); // The thread should be the only thing on the list 396 397 // Remove the thread from the ready queue of this cluster 398 this.ready_queue.head = 1p; 399 thrd.next = 0p; 400 __cfaabi_dbg_debug_do( thrd.unpark_stale = true ); 401 402 // Fixup the thread state 403 thrd.state = Blocked; 404 thrd.preempted = __NO_PREEMPTION; 363 405 364 406 // Pretend like the thread was blocked all along … … 372 414 thrd.curr_cluster = active_cluster(); 373 415 374 416 // unpark the fast io_poller 375 417 unpark( &thrd __cfaabi_dbg_ctx2 ); 376 418 } … … 394 436 __kernel_io_prepare_stop( this ); 395 437 } 438 439 // print statistics 440 #if !defined(__CFA_NO_STATISTICS__) 441 if(this.print_stats) { 442 with(this.io->submit_q.stats, this.io->completion_q.stats) { 443 double avgrdy = ((double)submit_avg.rdy) / submit_avg.cnt; 444 double avgcsm = ((double)submit_avg.csm) / submit_avg.cnt; 445 double avgavl = ((double)submit_avg.avl) / submit_avg.cnt; 446 447 double lavgv = 0; 448 double lavgb = 0; 449 if(look_avg.cnt != 0) { 450 lavgv = ((double)look_avg.val ) / look_avg.cnt; 451 lavgb = ((double)look_avg.block) / look_avg.cnt; 452 } 453 454 double aavgv = 0; 455 double aavgb = 0; 456 if(alloc_avg.cnt != 0) { 457 aavgv = ((double)alloc_avg.val ) / alloc_avg.cnt; 458 aavgb = ((double)alloc_avg.block) / alloc_avg.cnt; 459 } 460 461 __cfaabi_bits_print_safe( STDOUT_FILENO, 462 "----- I/O uRing Stats -----\n" 463 "- total submit calls : %'15llu\n" 464 "- avg ready entries : %'18.2lf\n" 465 "- avg submitted entries : %'18.2lf\n" 466 "- avg available entries : %'18.2lf\n" 467 "- total ready search : %'15llu\n" 468 "- avg ready search len : %'18.2lf\n" 469 "- avg ready search block : %'18.2lf\n" 470 "- total alloc search : %'15llu\n" 471 "- avg alloc search len : %'18.2lf\n" 472 "- avg alloc search block : %'18.2lf\n" 473 "- total wait calls : %'15llu (%'llu slow, %'llu fast)\n" 474 "- avg completion/wait : %'18.2lf\n", 475 submit_avg.cnt, 476 avgrdy, 477 avgcsm, 478 avgavl, 479 look_avg.cnt, 480 lavgv, 481 lavgb, 482 alloc_avg.cnt, 483 aavgv, 484 aavgb, 485 completed_avg.slow_cnt + completed_avg.fast_cnt, 486 completed_avg.slow_cnt, completed_avg.fast_cnt, 487 ((double)completed_avg.val) / (completed_avg.slow_cnt + completed_avg.fast_cnt) 488 ); 489 } 490 } 491 #endif 396 492 397 493 // Shutdown the io rings … … 465 561 } 466 562 563 verify( (shead + ret) == *ring.submit_q.head ); 564 467 565 // Release the consumed SQEs 468 566 for( i; ret ) { … … 479 577 // update statistics 480 578 #if !defined(__CFA_NO_STATISTICS__) 481 __tls_stats()->io.submit_q.submit_avg.rdy += to_submit;482 __tls_stats()->io.submit_q.submit_avg.csm += ret;483 __tls_stats()->io.submit_q.submit_avg.avl += avail;484 __tls_stats()->io.submit_q.submit_avg.cnt += 1;579 ring.submit_q.stats.submit_avg.rdy += to_submit; 580 ring.submit_q.stats.submit_avg.csm += ret; 581 ring.submit_q.stats.submit_avg.avl += avail; 582 ring.submit_q.stats.submit_avg.cnt += 1; 485 583 #endif 486 584 … … 510 608 data->result = cqe.res; 511 609 if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); } 512 else { __unpark( &ring.poller.slow.id,data->thrd __cfaabi_dbg_ctx2 ); }610 else { __unpark( data->thrd __cfaabi_dbg_ctx2 ); } 513 611 } 514 612 … … 525 623 526 624 static void * __io_poller_slow( void * arg ) { 527 #if !defined( __CFA_NO_STATISTICS__ )528 __stats_t local_stats;529 __init_stats( &local_stats );530 kernelTLS.this_stats = &local_stats;531 #endif532 533 625 cluster * cltr = (cluster *)arg; 534 626 struct __io_data & ring = *cltr->io; 535 536 ring.poller.slow.id.id = doregister( &ring.poller.slow.id );537 627 538 628 sigset_t mask; … … 564 654 // Update statistics 565 655 #if !defined(__CFA_NO_STATISTICS__) 566 __tls_stats()->io.complete_q.completed_avg.val += count;567 __tls_stats()->io.complete_q.completed_avg.slow_cnt += 1;656 ring.completion_q.stats.completed_avg.val += count; 657 ring.completion_q.stats.completed_avg.slow_cnt += 1; 568 658 #endif 569 659 570 660 if(again) { 571 661 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring); 572 __unpark( &ring.poller. slow.id, &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );662 __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 ); 573 663 wait( ring.poller.sem ); 574 664 } … … 584 674 // Update statistics 585 675 #if !defined(__CFA_NO_STATISTICS__) 586 __tls_stats()->io.complete_q.completed_avg.val += count;587 __tls_stats()->io.complete_q.completed_avg.slow_cnt += 1;676 ring.completion_q.stats.completed_avg.val += count; 677 ring.completion_q.stats.completed_avg.slow_cnt += 1; 588 678 #endif 589 679 } … … 591 681 592 682 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring); 593 594 unregister( &ring.poller.slow.id );595 683 596 684 return 0p; … … 613 701 int count; 614 702 bool again; 615 disable_interrupts(); 616 [count, again] = __drain_io( *this.ring, 0p, 0, false ); 617 618 if(!again) reset++; 619 620 // Update statistics 621 #if !defined(__CFA_NO_STATISTICS__) 622 __tls_stats()->io.complete_q.completed_avg.val += count; 623 __tls_stats()->io.complete_q.completed_avg.fast_cnt += 1; 624 #endif 625 enable_interrupts( __cfaabi_dbg_ctx ); 703 [count, again] = __drain_io( *this.ring, 0p, 0, false ); 704 705 if(!again) reset++; 706 707 // Update statistics 708 #if !defined(__CFA_NO_STATISTICS__) 709 this.ring->completion_q.stats.completed_avg.val += count; 710 this.ring->completion_q.stats.completed_avg.fast_cnt += 1; 711 #endif 626 712 627 713 // If we got something, just yield and check again … … 684 770 verify( data != 0 ); 685 771 686 687 772 // Prepare the data we need 688 773 __attribute((unused)) int len = 0; … … 690 775 uint32_t cnt = *ring.submit_q.num; 691 776 uint32_t mask = *ring.submit_q.mask; 692 693 disable_interrupts(); 694 uint32_t off = __tls_rand(); 695 enable_interrupts( __cfaabi_dbg_ctx ); 777 uint32_t off = __tls_rand(); 696 778 697 779 // Loop around looking for an available spot 698 for() {780 LOOKING: for() { 699 781 // Look through the list starting at some offset 700 782 for(i; cnt) { … … 709 791 // update statistics 710 792 #if !defined(__CFA_NO_STATISTICS__) 711 disable_interrupts(); 712 __tls_stats()->io.submit_q.alloc_avg.val += len; 713 __tls_stats()->io.submit_q.alloc_avg.block += block; 714 __tls_stats()->io.submit_q.alloc_avg.cnt += 1; 715 enable_interrupts( __cfaabi_dbg_ctx ); 793 __atomic_fetch_add( &ring.submit_q.stats.alloc_avg.val, len, __ATOMIC_RELAXED ); 794 __atomic_fetch_add( &ring.submit_q.stats.alloc_avg.block, block, __ATOMIC_RELAXED ); 795 __atomic_fetch_add( &ring.submit_q.stats.alloc_avg.cnt, 1, __ATOMIC_RELAXED ); 716 796 #endif 717 718 797 719 798 // Success return the data … … 734 813 uint32_t * const tail = ring.submit_q.tail; 735 814 const uint32_t mask = *ring.submit_q.mask; 736 737 disable_interrupts();738 815 739 816 // There are 2 submission schemes, check which one we are using … … 769 846 // update statistics 770 847 #if !defined(__CFA_NO_STATISTICS__) 771 __ tls_stats()->io.submit_q.look_avg.val += len;772 __ tls_stats()->io.submit_q.look_avg.block += block;773 __ tls_stats()->io.submit_q.look_avg.cnt += 1;848 __atomic_fetch_add( &ring.submit_q.stats.look_avg.val, len, __ATOMIC_RELAXED ); 849 __atomic_fetch_add( &ring.submit_q.stats.look_avg.block, block, __ATOMIC_RELAXED ); 850 __atomic_fetch_add( &ring.submit_q.stats.look_avg.cnt, 1, __ATOMIC_RELAXED ); 774 851 #endif 775 852 … … 798 875 // update statistics 799 876 #if !defined(__CFA_NO_STATISTICS__) 800 __tls_stats()->io.submit_q.submit_avg.csm += 1;801 __tls_stats()->io.submit_q.submit_avg.cnt += 1;877 ring.submit_q.stats.submit_avg.csm += 1; 878 ring.submit_q.stats.submit_avg.cnt += 1; 802 879 #endif 803 880 804 ring.submit_q.sqes[ idx & mask ].user_data = 0;805 806 881 unlock(ring.submit_q.lock); 807 882 808 883 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret ); 809 884 } 810 811 enable_interrupts( __cfaabi_dbg_ctx );812 885 } 813 886 -
libcfa/src/concurrency/kernel.cfa
r8b58bae r7f9968ad 118 118 // Kernel Scheduling logic 119 119 static $thread * __next_thread(cluster * this); 120 static bool __has_next_thread(cluster * this);121 120 static void __run_thread(processor * this, $thread * dst); 121 static $thread * __halt(processor * this); 122 static bool __wake_one(cluster * cltr, bool was_empty); 122 123 static bool __wake_proc(processor *); 123 static bool __wake_one(struct __processor_id_t * id, cluster * cltr);124 static void __halt(processor * this);125 124 126 125 //----------------------------------------------------------------------------- 127 126 // Kernel storage 128 KERNEL_STORAGE(cluster, mainCluster); 129 KERNEL_STORAGE(processor, mainProcessor); 130 KERNEL_STORAGE($thread, mainThread); 131 KERNEL_STORAGE(__stack_t, mainThreadCtx); 132 KERNEL_STORAGE(__scheduler_RWLock_t, __scheduler_lock); 133 #if !defined(__CFA_NO_STATISTICS__) 134 KERNEL_STORAGE(__stats_t, mainProcStats); 135 #endif 136 137 cluster * mainCluster; 138 processor * mainProcessor; 139 $thread * mainThread; 140 __scheduler_RWLock_t * __scheduler_lock; 127 KERNEL_STORAGE(cluster, mainCluster); 128 KERNEL_STORAGE(processor, mainProcessor); 129 KERNEL_STORAGE($thread, mainThread); 130 KERNEL_STORAGE(__stack_t, mainThreadCtx); 131 132 cluster * mainCluster; 133 processor * mainProcessor; 134 $thread * mainThread; 141 135 142 136 extern "C" { … … 150 144 thread_local struct KernelThreadData kernelTLS __attribute__ ((tls_model ( "initial-exec" ))) = { 151 145 NULL, // cannot use 0p 152 NULL,153 146 NULL, 154 147 { 1, false, false }, … … 197 190 198 191 void ?{}( $thread & this, current_stack_info_t * info) with( this ) { 199 ticket = 1;200 192 state = Start; 201 193 self_cor{ info }; … … 205 197 self_mon.recursion = 1; 206 198 self_mon_p = &self_mon; 207 link.next = 0p; 208 link.prev = 0p; 199 next = 0p; 209 200 210 201 node.next = 0p; … … 229 220 static void * __invoke_processor(void * arg); 230 221 231 void ?{}(processor & this, const char name[], cluster & _cltr) with( this ) {222 void ?{}(processor & this, const char name[], cluster & cltr) with( this ) { 232 223 this.name = name; 233 this.cltr = &_cltr; 234 id = -1u; 224 this.cltr = &cltr; 235 225 terminated{ 0 }; 236 226 destroyer = 0p; … … 245 235 246 236 this.stack = __create_pthread( &this.kernel_thread, __invoke_processor, (void *)&this ); 247 __atomic_fetch_add( &cltr->nprocessors, 1u, __ATOMIC_SEQ_CST );248 237 249 238 __cfadbg_print_safe(runtime_core, "Kernel : core %p created\n", &this); … … 265 254 266 255 free( this.stack ); 267 268 __atomic_fetch_sub( &cltr->nprocessors, 1u, __ATOMIC_SEQ_CST );269 256 } 270 257 … … 272 259 this.name = name; 273 260 this.preemption_rate = preemption_rate; 274 this.nprocessors = 0;275 261 ready_queue{}; 262 ready_queue_lock{}; 276 263 277 264 #if !defined(__CFA_NO_STATISTICS__) 278 265 print_stats = false; 279 stats = alloc();280 __init_stats( stats );281 266 #endif 282 267 268 procs{ __get }; 269 idles{ __get }; 283 270 threads{ __get }; 284 271 … … 290 277 void ^?{}(cluster & this) { 291 278 __kernel_io_shutdown( this, &this == mainCluster ); 292 293 #if !defined(__CFA_NO_STATISTICS__)294 if(this.print_stats) {295 __print_stats( this.stats );296 }297 free( this.stats );298 #endif299 279 300 280 unregister(this); … … 315 295 __cfadbg_print_safe(runtime_core, "Kernel : core %p starting\n", this); 316 296 317 // register the processor unless it's the main thread which is handled in the boot sequence 318 if(this != mainProcessor) { 319 this->id = doregister((__processor_id_t*)this); 320 // Lock the RWlock so no-one pushes/pops while we are changing the queue 321 uint_fast32_t last_size = ready_mutate_lock(); 322 323 // Adjust the ready queue size 324 ready_queue_grow( this->cltr ); 325 326 // Unlock the RWlock 327 ready_mutate_unlock( last_size ); 328 } 297 doregister(this->cltr, this); 329 298 330 299 { … … 339 308 readyThread = __next_thread( this->cltr ); 340 309 310 // If no ready thread 311 if( readyThread == 0p ) { 312 // Block until a thread is ready 313 readyThread = __halt(this); 314 } 315 341 316 // Check if we actually found a thread 342 317 if( readyThread ) { 343 318 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 344 319 /* paranoid */ verifyf( readyThread->state == Ready || readyThread->preempted != __NO_PREEMPTION, "state : %d, preempted %d\n", readyThread->state, readyThread->preempted); 345 /* paranoid */ verifyf( readyThread->link.next == 0p, "Expected null got %p", readyThread->link.next ); 346 __builtin_prefetch( readyThread->context.SP ); 320 /* paranoid */ verifyf( readyThread->next == 0p, "Expected null got %p", readyThread->next ); 347 321 348 322 // We found a thread run it … … 351 325 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 352 326 } 353 else {354 // Block until a thread is ready355 __halt(this);356 }357 327 } 358 328 … … 360 330 } 361 331 332 unregister(this->cltr, this); 333 362 334 V( this->terminated ); 363 335 364 // unregister the processor unless it's the main thread which is handled in the boot sequence365 if(this != mainProcessor) {366 // Lock the RWlock so no-one pushes/pops while we are changing the queue367 uint_fast32_t last_size = ready_mutate_lock();368 369 // Adjust the ready queue size370 ready_queue_shrink( this->cltr );371 372 // Make sure we aren't on the idle queue373 #if !defined(__CFA_NO_STATISTICS__)374 bool removed =375 #endif376 unsafe_remove( this->cltr->idles, this );377 378 #if !defined(__CFA_NO_STATISTICS__)379 if(removed) __tls_stats()->ready.sleep.exits++;380 #endif381 382 // Unlock the RWlock383 ready_mutate_unlock( last_size );384 385 // Finally we don't need the read_lock any more386 unregister((__processor_id_t*)this);387 }388 else {389 // HACK : the coroutine context switch expects this_thread to be set390 // and it make sense for it to be set in all other cases except here391 // fake it392 kernelTLS.this_thread = mainThread;393 }394 395 336 __cfadbg_print_safe(runtime_core, "Kernel : core %p terminated\n", this); 337 338 // HACK : the coroutine context switch expects this_thread to be set 339 // and it make sense for it to be set in all other cases except here 340 // fake it 341 if( this == mainProcessor ) kernelTLS.this_thread = mainThread; 396 342 } 397 343 … … 414 360 // Actually run the thread 415 361 RUNNING: while(true) { 416 thrd_dst->preempted = __NO_PREEMPTION; 417 thrd_dst->state = Active; 362 if(unlikely(thrd_dst->preempted)) { 363 thrd_dst->preempted = __NO_PREEMPTION; 364 verify(thrd_dst->state == Active || thrd_dst->state == Rerun); 365 } else { 366 verify(thrd_dst->state == Blocked || thrd_dst->state == Ready); // Ready means scheduled normally, blocked means rerun 367 thrd_dst->state = Active; 368 } 418 369 419 370 __cfaabi_dbg_debug_do( … … 447 398 if(unlikely(thrd_dst->preempted != __NO_PREEMPTION)) { 448 399 // The thread was preempted, reschedule it and reset the flag 449 __schedule_thread( (__processor_id_t*)this,thrd_dst );400 __schedule_thread( thrd_dst ); 450 401 break RUNNING; 451 402 } 452 403 453 if(unlikely(thrd_dst->state == Halted)) {454 // The thread has halted, it should never be scheduled/run again455 // We may need to wake someone up here since456 unpark( this->destroyer __cfaabi_dbg_ctx2 );457 this->destroyer = 0p;458 break RUNNING;459 }460 461 /* paranoid */ verify( thrd_dst->state == Active );462 thrd_dst->state = Blocked;463 464 404 // set state of processor coroutine to active and the thread to inactive 465 int old_ticket = __atomic_fetch_sub(&thrd_dst->ticket, 1, __ATOMIC_SEQ_CST); 466 __cfaabi_dbg_debug_do( thrd_dst->park_result = old_ticket; ) 467 switch(old_ticket) { 468 case 1: 405 static_assert(sizeof(thrd_dst->state) == sizeof(int)); 406 enum coroutine_state old_state = __atomic_exchange_n(&thrd_dst->state, Blocked, __ATOMIC_SEQ_CST); 407 __cfaabi_dbg_debug_do( thrd_dst->park_result = old_state; ) 408 switch(old_state) { 409 case Halted: 410 // The thread has halted, it should never be scheduled/run again, leave it back to Halted and move on 411 thrd_dst->state = Halted; 412 413 // We may need to wake someone up here since 414 unpark( this->destroyer __cfaabi_dbg_ctx2 ); 415 this->destroyer = 0p; 416 break RUNNING; 417 case Active: 469 418 // This is case 1, the regular case, nothing more is needed 470 419 break RUNNING; 471 case 2:420 case Rerun: 472 421 // This is case 2, the racy case, someone tried to run this thread before it finished blocking 473 422 // In this case, just run it again. … … 475 424 default: 476 425 // This makes no sense, something is wrong abort 477 abort( );426 abort("Finished running a thread that was Blocked/Start/Primed %d\n", old_state); 478 427 } 479 428 } … … 489 438 $coroutine * proc_cor = get_coroutine(kernelTLS.this_processor->runner); 490 439 $thread * thrd_src = kernelTLS.this_thread; 491 492 #if !defined(__CFA_NO_STATISTICS__)493 struct processor * last_proc = kernelTLS.this_processor;494 #endif495 440 496 441 // Run the thread on this processor … … 508 453 } 509 454 510 #if !defined(__CFA_NO_STATISTICS__)511 if(last_proc != kernelTLS.this_processor) {512 __tls_stats()->ready.threads.migration++;513 }514 #endif515 516 455 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 517 456 /* paranoid */ verifyf( ((uintptr_t)thrd_src->context.SP) < ((uintptr_t)__get_stack(thrd_src->curr_cor)->base ), "ERROR : Returning $thread %p has been corrupted.\n StackPointer too small.\n", thrd_src ); … … 524 463 // It effectively constructs a coroutine by stealing the pthread stack 525 464 static void * __invoke_processor(void * arg) { 526 #if !defined( __CFA_NO_STATISTICS__ )527 __stats_t local_stats;528 __init_stats( &local_stats );529 kernelTLS.this_stats = &local_stats;530 #endif531 532 465 processor * proc = (processor *) arg; 533 466 kernelTLS.this_processor = proc; … … 561 494 __cfadbg_print_safe(runtime_core, "Kernel : core %p main ended (%p)\n", proc, &proc->runner); 562 495 563 #if !defined(__CFA_NO_STATISTICS__)564 __tally_stats(proc->cltr->stats, &local_stats);565 #endif566 567 496 return 0p; 568 497 } … … 662 591 // Scheduler routines 663 592 // KERNEL ONLY 664 void __schedule_thread( struct __processor_id_t * id, $thread * thrd ) { 665 /* paranoid */ verify( thrd ); 666 /* paranoid */ verify( thrd->state != Halted ); 593 void __schedule_thread( $thread * thrd ) with( *thrd->curr_cluster ) { 667 594 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 668 595 /* paranoid */ #if defined( __CFA_WITH_VERIFY__ ) 669 /* paranoid */ 670 671 /* paranoid */ if( thrd->preempted != __NO_PREEMPTION ) assertf(thrd->state == Active,672 596 /* paranoid */ if( thrd->state == Blocked || thrd->state == Start ) assertf( thrd->preempted == __NO_PREEMPTION, 597 "Error inactive thread marked as preempted, state %d, preemption %d\n", thrd->state, thrd->preempted ); 598 /* paranoid */ if( thrd->preempted != __NO_PREEMPTION ) assertf(thrd->state == Active || thrd->state == Rerun, 599 "Error preempted thread marked as not currently running, state %d, preemption %d\n", thrd->state, thrd->preempted ); 673 600 /* paranoid */ #endif 674 /* paranoid */ verifyf( thrd-> link.next == 0p, "Expected null got %p", thrd->link.next );601 /* paranoid */ verifyf( thrd->next == 0p, "Expected null got %p", thrd->next ); 675 602 676 603 if (thrd->preempted == __NO_PREEMPTION) thrd->state = Ready; 677 604 678 ready_schedule_lock ( id ); 679 push( thrd->curr_cluster, thrd ); 680 681 #if !defined(__CFA_NO_STATISTICS__) 682 bool woke = 683 #endif 684 __wake_one(id, thrd->curr_cluster); 685 686 #if !defined(__CFA_NO_STATISTICS__) 687 if(woke) __tls_stats()->ready.sleep.wakes++; 688 #endif 689 ready_schedule_unlock( id ); 605 lock ( ready_queue_lock __cfaabi_dbg_ctx2 ); 606 bool was_empty = !(ready_queue != 0); 607 append( ready_queue, thrd ); 608 unlock( ready_queue_lock ); 609 610 __wake_one(thrd->curr_cluster, was_empty); 690 611 691 612 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); … … 696 617 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 697 618 698 ready_schedule_lock ( (__processor_id_t*)kernelTLS.this_processor);699 $thread * head = pop( this);700 ready_schedule_unlock( (__processor_id_t*)kernelTLS.this_processor);619 lock( ready_queue_lock __cfaabi_dbg_ctx2 ); 620 $thread * head = pop_head( ready_queue ); 621 unlock( ready_queue_lock ); 701 622 702 623 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); … … 704 625 } 705 626 706 // KERNEL ONLY707 static bool __has_next_thread(cluster * this) with( *this ) {708 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );709 710 ready_schedule_lock ( (__processor_id_t*)kernelTLS.this_processor );711 bool not_empty = query( this );712 ready_schedule_unlock( (__processor_id_t*)kernelTLS.this_processor );713 714 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );715 return not_empty;716 }717 718 627 // KERNEL ONLY unpark with out disabling interrupts 719 void __unpark( struct __processor_id_t * id, $thread * thrd __cfaabi_dbg_ctx_param2 ) { 628 void __unpark( $thread * thrd __cfaabi_dbg_ctx_param2 ) { 629 static_assert(sizeof(thrd->state) == sizeof(int)); 630 720 631 // record activity 721 632 __cfaabi_dbg_debug_do( char * old_caller = thrd->unpark_caller; ) 722 633 __cfaabi_dbg_record_thrd( *thrd, false, caller ); 723 634 724 int old_ticket = __atomic_fetch_add(&thrd->ticket, 1, __ATOMIC_SEQ_CST);725 __cfaabi_dbg_debug_do( thrd->unpark_result = old_ ticket; thrd->unpark_state = thrd->state; )726 switch(old_ ticket) {727 case 1:635 enum coroutine_state old_state = __atomic_exchange_n(&thrd->state, Rerun, __ATOMIC_SEQ_CST); 636 __cfaabi_dbg_debug_do( thrd->unpark_result = old_state; ) 637 switch(old_state) { 638 case Active: 728 639 // Wake won the race, the thread will reschedule/rerun itself 729 640 break; 730 case 0:641 case Blocked: 731 642 /* paranoid */ verify( ! thrd->preempted != __NO_PREEMPTION ); 732 /* paranoid */ verify( thrd->state == Blocked );733 643 734 644 // Wake lost the race, 735 __schedule_thread( id, thrd ); 645 thrd->state = Blocked; 646 __schedule_thread( thrd ); 736 647 break; 648 case Rerun: 649 abort("More than one thread attempted to schedule thread %p\n", thrd); 650 break; 651 case Halted: 652 case Start: 653 case Primed: 737 654 default: 738 655 // This makes no sense, something is wrong abort … … 745 662 746 663 disable_interrupts(); 747 __unpark( (__processor_id_t*)kernelTLS.this_processor,thrd __cfaabi_dbg_ctx_fwd2 );664 __unpark( thrd __cfaabi_dbg_ctx_fwd2 ); 748 665 enable_interrupts( __cfaabi_dbg_ctx ); 749 666 } … … 780 697 781 698 $thread * thrd = kernelTLS.this_thread; 782 /* paranoid */ verify(thrd->state == Active );699 /* paranoid */ verify(thrd->state == Active || thrd->state == Rerun); 783 700 784 701 // SKULLDUGGERY: It is possible that we are preempting this thread just before … … 787 704 // If that is the case, abandon the preemption. 788 705 bool preempted = false; 789 if(thrd-> link.next == 0p) {706 if(thrd->next == 0p) { 790 707 preempted = true; 791 708 thrd->preempted = reason; … … 813 730 __cfa_dbg_global_clusters.list{ __get }; 814 731 __cfa_dbg_global_clusters.lock{}; 815 816 // Initialize the global scheduler lock817 __scheduler_lock = (__scheduler_RWLock_t*)&storage___scheduler_lock;818 (*__scheduler_lock){};819 732 820 733 // Initialize the main cluster … … 851 764 pending_preemption = false; 852 765 kernel_thread = pthread_self(); 853 id = -1u;854 766 855 767 runner{ &this }; 856 768 __cfadbg_print_safe(runtime_core, "Kernel : constructed main processor context %p\n", &runner); 857 858 __atomic_fetch_add( &cltr->nprocessors, 1u, __ATOMIC_SEQ_CST );859 769 } 860 770 … … 864 774 (*mainProcessor){}; 865 775 866 mainProcessor->id = doregister( (__processor_id_t*)mainProcessor);867 868 776 //initialize the global state variables 869 777 kernelTLS.this_processor = mainProcessor; 870 778 kernelTLS.this_thread = mainThread; 871 779 872 #if !defined( __CFA_NO_STATISTICS__ )873 kernelTLS.this_stats = (__stats_t *)& storage_mainProcStats;874 __init_stats( kernelTLS.this_stats );875 #endif876 877 780 // Enable preemption 878 781 kernel_start_preemption(); … … 880 783 // Add the main thread to the ready queue 881 784 // once resume is called on mainProcessor->runner the mainThread needs to be scheduled like any normal thread 882 __schedule_thread( (__processor_id_t *)mainProcessor,mainThread);785 __schedule_thread(mainThread); 883 786 884 787 // SKULLDUGGERY: Force a context switch to the main processor to set the main thread's context to the current UNIX … … 924 827 kernel_stop_preemption(); 925 828 926 unregister((__processor_id_t*)mainProcessor);927 928 829 // Destroy the main processor and its context in reverse order of construction 929 830 // These were manually constructed so we need manually destroy them 930 831 void ^?{}(processor & this) with( this ){ 931 832 /* paranoid */ verify( this.do_terminate == true ); 932 __atomic_fetch_sub( &cltr->nprocessors, 1u, __ATOMIC_SEQ_CST );933 __cfaabi_dbg_print_safe("Kernel : destroyed main processor context %p\n", &runner);934 833 } 935 834 … … 937 836 938 837 // Final step, destroy the main thread since it is no longer needed 939 940 838 // Since we provided a stack to this taxk it will not destroy anything 941 839 /* paranoid */ verify(mainThread->self_cor.stack.storage == (__stack_t*)(((uintptr_t)&storage_mainThreadCtx)| 0x1)); … … 944 842 ^(*mainCluster){}; 945 843 946 ^(*__scheduler_lock){};947 948 844 ^(__cfa_dbg_global_clusters.list){}; 949 845 ^(__cfa_dbg_global_clusters.lock){}; … … 955 851 // Kernel Idle Sleep 956 852 //============================================================================================= 853 static $thread * __halt(processor * this) with( *this ) { 854 if( do_terminate ) return 0p; 855 856 // First, lock the cluster idle 857 lock( cltr->idle_lock __cfaabi_dbg_ctx2 ); 858 859 // Check if we can find a thread 860 if( $thread * found = __next_thread( cltr ) ) { 861 unlock( cltr->idle_lock ); 862 return found; 863 } 864 865 // Move this processor from the active list to the idle list 866 move_to_front(cltr->procs, cltr->idles, *this); 867 868 // Unlock the idle lock so we don't go to sleep with a lock 869 unlock (cltr->idle_lock); 870 871 // We are ready to sleep 872 __cfadbg_print_safe(runtime_core, "Kernel : Processor %p ready to sleep\n", this); 873 wait( idle ); 874 875 // We have woken up 876 __cfadbg_print_safe(runtime_core, "Kernel : Processor %p woke up and ready to run\n", this); 877 878 // Get ourself off the idle list 879 with( *cltr ) { 880 lock (idle_lock __cfaabi_dbg_ctx2); 881 move_to_front(idles, procs, *this); 882 unlock(idle_lock); 883 } 884 885 // Don't check the ready queue again, we may not be in a position to run a thread 886 return 0p; 887 } 888 957 889 // Wake a thread from the front if there are any 958 static bool __wake_one(struct __processor_id_t * id, cluster * this) { 959 /* paranoid */ verify( ready_schedule_islocked( id ) ); 960 961 // Check if there is a sleeping processor 962 processor * p = pop(this->idles); 963 964 // If no one is sleeping, we are done 965 if( 0p == p ) return false; 966 967 // We found a processor, wake it up 968 post( p->idle ); 969 890 static bool __wake_one(cluster * this, __attribute__((unused)) bool force) { 891 // if we don't want to force check if we know it's false 892 // if( !this->idles.head && !force ) return false; 893 894 // First, lock the cluster idle 895 lock( this->idle_lock __cfaabi_dbg_ctx2 ); 896 897 // Check if there is someone to wake up 898 if( !this->idles.head ) { 899 // Nope unlock and return false 900 unlock( this->idle_lock ); 901 return false; 902 } 903 904 // Wake them up 905 __cfadbg_print_safe(runtime_core, "Kernel : waking Processor %p\n", this->idles.head); 906 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 907 post( this->idles.head->idle ); 908 909 // Unlock and return true 910 unlock( this->idle_lock ); 970 911 return true; 971 912 } … … 981 922 982 923 return ret; 983 }984 985 static void __halt(processor * this) with( *this ) {986 if( do_terminate ) return;987 988 #if !defined(__CFA_NO_STATISTICS__)989 __tls_stats()->ready.sleep.halts++;990 #endif991 // Push self to queue992 push(cltr->idles, *this);993 994 // Makre sure we don't miss a thread995 if( __has_next_thread(cltr) ) {996 // A thread was posted, make sure a processor is woken up997 struct __processor_id_t *id = (struct __processor_id_t *) this;998 ready_schedule_lock ( id );999 __wake_one( id, cltr );1000 ready_schedule_unlock( id );1001 #if !defined(__CFA_NO_STATISTICS__)1002 __tls_stats()->ready.sleep.cancels++;1003 #endif1004 }1005 1006 wait( idle );1007 924 } 1008 925 … … 1161 1078 cltr->nthreads -= 1; 1162 1079 unlock(cltr->thread_list_lock); 1080 } 1081 1082 void doregister( cluster * cltr, processor * proc ) { 1083 lock (cltr->idle_lock __cfaabi_dbg_ctx2); 1084 cltr->nprocessors += 1; 1085 push_front(cltr->procs, *proc); 1086 unlock (cltr->idle_lock); 1087 } 1088 1089 void unregister( cluster * cltr, processor * proc ) { 1090 lock (cltr->idle_lock __cfaabi_dbg_ctx2); 1091 remove(cltr->procs, *proc ); 1092 cltr->nprocessors -= 1; 1093 unlock(cltr->idle_lock); 1163 1094 } 1164 1095 -
libcfa/src/concurrency/kernel.hfa
r8b58bae r7f9968ad 22 22 #include "time_t.hfa" 23 23 #include "coroutine.hfa" 24 25 #include "containers/stackLockFree.hfa"26 24 27 25 extern "C" { … … 49 47 extern struct cluster * mainCluster; 50 48 51 // Processor id, required for scheduling threads 52 struct __processor_id_t { 53 unsigned id; 54 55 #if !defined(__CFA_NO_STATISTICS__) 56 struct __stats_t * stats; 57 #endif 58 }; 59 49 // Processor 60 50 coroutine processorCtx_t { 61 51 struct processor * proc; … … 63 53 64 54 // Wrapper around kernel threads 65 struct __attribute__((aligned(128)))processor {55 struct processor { 66 56 // Main state 67 inline __processor_id_t; 57 // Coroutine ctx who does keeps the state of the processor 58 struct processorCtx_t runner; 68 59 69 60 // Cluster from which to get threads 70 61 struct cluster * cltr; 71 72 // Set to true to notify the processor should terminate73 volatile bool do_terminate;74 75 // Coroutine ctx who does keeps the state of the processor76 struct processorCtx_t runner;77 62 78 63 // Name of the processor … … 96 81 __bin_sem_t idle; 97 82 83 // Termination 84 // Set to true to notify the processor should terminate 85 volatile bool do_terminate; 86 98 87 // Termination synchronisation (user semaphore) 99 88 semaphore terminated; … … 103 92 104 93 // Link lists fields 105 Link(processor) link; 94 struct __dbg_node_proc { 95 struct processor * next; 96 struct processor * prev; 97 } node; 106 98 107 99 #ifdef __CFA_DEBUG__ … … 118 110 static inline void ?{}(processor & this, const char name[]) { this{name, *mainCluster }; } 119 111 120 static inline Link(processor) * ?`next( processor * this ) { return &this->link; }112 static inline [processor *&, processor *& ] __get( processor & this ) __attribute__((const)) { return this.node.[next, prev]; } 121 113 122 114 //----------------------------------------------------------------------------- … … 129 121 #define CFA_CLUSTER_IO_BUFFLEN_OFFSET 16 130 122 131 132 //-----------------------------------------------------------------------------133 // Cluster Tools134 135 // Intrusives lanes which are used by the relaxed ready queue136 struct __attribute__((aligned(128))) __intrusive_lane_t;137 void ?{}(__intrusive_lane_t & this);138 void ^?{}(__intrusive_lane_t & this);139 140 // Counter used for wether or not the lanes are all empty141 struct __attribute__((aligned(128))) __snzi_node_t;142 struct __snzi_t {143 unsigned mask;144 int root;145 __snzi_node_t * nodes;146 };147 148 void ?{}( __snzi_t & this, unsigned depth );149 void ^?{}( __snzi_t & this );150 151 //TODO adjust cache size to ARCHITECTURE152 // Structure holding the relaxed ready queue153 struct __ready_queue_t {154 // Data tracking how many/which lanes are used155 // Aligned to 128 for cache locality156 __snzi_t snzi;157 158 // Data tracking the actual lanes159 // On a seperate cacheline from the used struct since160 // used can change on each push/pop but this data161 // only changes on shrink/grow162 struct {163 // Arary of lanes164 __intrusive_lane_t * volatile data;165 166 // Number of lanes (empty or not)167 volatile size_t count;168 } lanes;169 };170 171 void ?{}(__ready_queue_t & this);172 void ^?{}(__ready_queue_t & this);173 174 123 //----------------------------------------------------------------------------- 175 124 // Cluster 176 struct __attribute__((aligned(128))) cluster { 125 struct cluster { 126 // Ready queue locks 127 __spinlock_t ready_queue_lock; 128 177 129 // Ready queue for threads 178 __ ready_queue_tready_queue;130 __queue_t($thread) ready_queue; 179 131 180 132 // Name of the cluster … … 184 136 Duration preemption_rate; 185 137 186 // List of idle processors 187 StackLF(processor) idles; 188 volatile unsigned int nprocessors; 138 // List of processors 139 __spinlock_t idle_lock; 140 __dllist_t(struct processor) procs; 141 __dllist_t(struct processor) idles; 142 unsigned int nprocessors; 189 143 190 144 // List of threads … … 203 157 #if !defined(__CFA_NO_STATISTICS__) 204 158 bool print_stats; 205 struct __stats_t * stats;206 159 #endif 207 160 }; -
libcfa/src/concurrency/kernel_private.hfa
r8b58bae r7f9968ad 20 20 21 21 #include "alarm.hfa" 22 #include "stats.hfa"23 24 #include "bits/random.hfa"25 22 26 23 27 24 //----------------------------------------------------------------------------- 28 25 // Scheduler 29 30 struct __attribute__((aligned(128))) __scheduler_lock_id_t;31 26 32 27 extern "C" { … … 36 31 } 37 32 38 void __schedule_thread( struct __processor_id_t *, $thread * ) __attribute__((nonnull (2)));33 void __schedule_thread( $thread * ) __attribute__((nonnull (1))); 39 34 40 35 //Block current thread and release/wake-up the following resources … … 78 73 79 74 // KERNEL ONLY unpark with out disabling interrupts 80 void __unpark( struct __processor_id_t *,$thread * thrd __cfaabi_dbg_ctx_param2 );75 void __unpark( $thread * thrd __cfaabi_dbg_ctx_param2 ); 81 76 82 77 //----------------------------------------------------------------------------- … … 89 84 //----------------------------------------------------------------------------- 90 85 // Utils 91 #define KERNEL_STORAGE(T,X) __attribute((aligned(__alignof__(T))))static char storage_##X[sizeof(T)]86 #define KERNEL_STORAGE(T,X) static char storage_##X[sizeof(T)] 92 87 93 static inline uint64_t __tls_rand() { 94 // kernelTLS.rand_seed ^= kernelTLS.rand_seed << 6; 95 // kernelTLS.rand_seed ^= kernelTLS.rand_seed >> 21; 96 // kernelTLS.rand_seed ^= kernelTLS.rand_seed << 7; 97 // return kernelTLS.rand_seed; 98 return __lehmer64( kernelTLS.rand_seed ); 88 static inline uint32_t __tls_rand() { 89 kernelTLS.rand_seed ^= kernelTLS.rand_seed << 6; 90 kernelTLS.rand_seed ^= kernelTLS.rand_seed >> 21; 91 kernelTLS.rand_seed ^= kernelTLS.rand_seed << 7; 92 return kernelTLS.rand_seed; 99 93 } 100 94 … … 106 100 void unregister( struct cluster * cltr, struct $thread & thrd ); 107 101 108 //======================================================================= 109 // Cluster lock API 110 //======================================================================= 111 // Cells use by the reader writer lock 112 // while not generic it only relies on a opaque pointer 113 struct __attribute__((aligned(128))) __scheduler_lock_id_t { 114 // Spin lock used as the underlying lock 115 volatile bool lock; 116 117 // Handle pointing to the proc owning this cell 118 // Used for allocating cells and debugging 119 __processor_id_t * volatile handle; 120 121 #ifdef __CFA_WITH_VERIFY__ 122 // Debug, check if this is owned for reading 123 bool owned; 124 #endif 125 }; 126 127 static_assert( sizeof(struct __scheduler_lock_id_t) <= __alignof(struct __scheduler_lock_id_t)); 128 129 // Lock-Free registering/unregistering of threads 130 // Register a processor to a given cluster and get its unique id in return 131 unsigned doregister( struct __processor_id_t * proc ); 132 133 // Unregister a processor from a given cluster using its id, getting back the original pointer 134 void unregister( struct __processor_id_t * proc ); 135 136 //======================================================================= 137 // Reader-writer lock implementation 138 // Concurrent with doregister/unregister, 139 // i.e., threads can be added at any point during or between the entry/exit 140 141 //----------------------------------------------------------------------- 142 // simple spinlock underlying the RWLock 143 // Blocking acquire 144 static inline void __atomic_acquire(volatile bool * ll) { 145 while( __builtin_expect(__atomic_exchange_n(ll, (bool)true, __ATOMIC_SEQ_CST), false) ) { 146 while(__atomic_load_n(ll, (int)__ATOMIC_RELAXED)) 147 asm volatile("pause"); 148 } 149 /* paranoid */ verify(*ll); 150 } 151 152 // Non-Blocking acquire 153 static inline bool __atomic_try_acquire(volatile bool * ll) { 154 return !__atomic_exchange_n(ll, (bool)true, __ATOMIC_SEQ_CST); 155 } 156 157 // Release 158 static inline void __atomic_unlock(volatile bool * ll) { 159 /* paranoid */ verify(*ll); 160 __atomic_store_n(ll, (bool)false, __ATOMIC_RELEASE); 161 } 162 163 //----------------------------------------------------------------------- 164 // Reader-Writer lock protecting the ready-queues 165 // while this lock is mostly generic some aspects 166 // have been hard-coded to for the ready-queue for 167 // simplicity and performance 168 struct __scheduler_RWLock_t { 169 // total cachelines allocated 170 unsigned int max; 171 172 // cachelines currently in use 173 volatile unsigned int alloc; 174 175 // cachelines ready to itereate over 176 // (!= to alloc when thread is in second half of doregister) 177 volatile unsigned int ready; 178 179 // writer lock 180 volatile bool lock; 181 182 // data pointer 183 __scheduler_lock_id_t * data; 184 }; 185 186 void ?{}(__scheduler_RWLock_t & this); 187 void ^?{}(__scheduler_RWLock_t & this); 188 189 extern __scheduler_RWLock_t * __scheduler_lock; 190 191 //----------------------------------------------------------------------- 192 // Reader side : acquire when using the ready queue to schedule but not 193 // creating/destroying queues 194 static inline void ready_schedule_lock( struct __processor_id_t * proc) with(*__scheduler_lock) { 195 unsigned iproc = proc->id; 196 /*paranoid*/ verify(data[iproc].handle == proc); 197 /*paranoid*/ verify(iproc < ready); 198 199 // Step 1 : make sure no writer are in the middle of the critical section 200 while(__atomic_load_n(&lock, (int)__ATOMIC_RELAXED)) 201 asm volatile("pause"); 202 203 // Fence needed because we don't want to start trying to acquire the lock 204 // before we read a false. 205 // Not needed on x86 206 // std::atomic_thread_fence(std::memory_order_seq_cst); 207 208 // Step 2 : acquire our local lock 209 __atomic_acquire( &data[iproc].lock ); 210 /*paranoid*/ verify(data[iproc].lock); 211 212 #ifdef __CFA_WITH_VERIFY__ 213 // Debug, check if this is owned for reading 214 data[iproc].owned = true; 215 #endif 216 } 217 218 static inline void ready_schedule_unlock( struct __processor_id_t * proc) with(*__scheduler_lock) { 219 unsigned iproc = proc->id; 220 /*paranoid*/ verify(data[iproc].handle == proc); 221 /*paranoid*/ verify(iproc < ready); 222 /*paranoid*/ verify(data[iproc].lock); 223 /*paranoid*/ verify(data[iproc].owned); 224 #ifdef __CFA_WITH_VERIFY__ 225 // Debug, check if this is owned for reading 226 data[iproc].owned = false; 227 #endif 228 __atomic_unlock(&data[iproc].lock); 229 } 230 231 #ifdef __CFA_WITH_VERIFY__ 232 static inline bool ready_schedule_islocked( struct __processor_id_t * proc) { 233 return __scheduler_lock->data[proc->id].owned; 234 } 235 236 static inline bool ready_mutate_islocked() { 237 return __scheduler_lock->lock; 238 } 239 #endif 240 241 //----------------------------------------------------------------------- 242 // Writer side : acquire when changing the ready queue, e.g. adding more 243 // queues or removing them. 244 uint_fast32_t ready_mutate_lock( void ); 245 246 void ready_mutate_unlock( uint_fast32_t /* value returned by lock */ ); 247 248 //======================================================================= 249 // Ready-Queue API 250 //----------------------------------------------------------------------- 251 // pop thread from the ready queue of a cluster 252 // returns 0p if empty 253 __attribute__((hot)) bool query(struct cluster * cltr); 254 255 //----------------------------------------------------------------------- 256 // push thread onto a ready queue for a cluster 257 // returns true if the list was previously empty, false otherwise 258 __attribute__((hot)) bool push(struct cluster * cltr, struct $thread * thrd); 259 260 //----------------------------------------------------------------------- 261 // pop thread from the ready queue of a cluster 262 // returns 0p if empty 263 __attribute__((hot)) struct $thread * pop(struct cluster * cltr); 264 265 //----------------------------------------------------------------------- 266 // remove thread from the ready queue of a cluster 267 // returns bool if it wasn't found 268 bool remove_head(struct cluster * cltr, struct $thread * thrd); 269 270 //----------------------------------------------------------------------- 271 // Increase the width of the ready queue (number of lanes) by 4 272 void ready_queue_grow (struct cluster * cltr); 273 274 //----------------------------------------------------------------------- 275 // Decrease the width of the ready queue (number of lanes) by 4 276 void ready_queue_shrink(struct cluster * cltr); 277 278 //----------------------------------------------------------------------- 279 // Statics call at the end of each thread to register statistics 280 #if !defined(__CFA_NO_STATISTICS__) 281 static inline struct __stats_t * __tls_stats() { 282 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 283 /* paranoid */ verify( kernelTLS.this_stats ); 284 return kernelTLS.this_stats; 285 } 286 #endif 102 void doregister( struct cluster * cltr, struct processor * proc ); 103 void unregister( struct cluster * cltr, struct processor * proc ); 287 104 288 105 // Local Variables: // -
libcfa/src/concurrency/monitor.cfa
r8b58bae r7f9968ad 114 114 115 115 // Some one else has the monitor, wait in line for it 116 /* paranoid */ verify( thrd-> link.next == 0p );116 /* paranoid */ verify( thrd->next == 0p ); 117 117 append( this->entry_queue, thrd ); 118 /* paranoid */ verify( thrd-> link.next == 1p );118 /* paranoid */ verify( thrd->next == 1p ); 119 119 120 120 unlock( this->lock ); … … 199 199 200 200 // Some one else has the monitor, wait in line for it 201 /* paranoid */ verify( thrd-> link.next == 0p );201 /* paranoid */ verify( thrd->next == 0p ); 202 202 append( this->entry_queue, thrd ); 203 /* paranoid */ verify( thrd-> link.next == 1p );203 /* paranoid */ verify( thrd->next == 1p ); 204 204 unlock( this->lock ); 205 205 … … 761 761 $thread * new_owner = pop_head( this->entry_queue ); 762 762 /* paranoid */ verifyf( !this->owner || kernelTLS.this_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, this->owner, this->recursion, this ); 763 /* paranoid */ verify( !new_owner || new_owner-> link.next == 0p );763 /* paranoid */ verify( !new_owner || new_owner->next == 0p ); 764 764 __set_owner( this, new_owner ); 765 765 … … 883 883 } 884 884 885 __cfaabi_dbg_print_safe( "Kernel : Runing %i (%p)\n", ready2run, ready2run ? (thread*)node->waiting_thread : (thread*)0p );885 __cfaabi_dbg_print_safe( "Kernel : Runing %i (%p)\n", ready2run, ready2run ? node->waiting_thread : 0p ); 886 886 return ready2run ? node->waiting_thread : 0p; 887 887 } … … 907 907 // For each thread in the entry-queue 908 908 for( $thread ** thrd_it = &entry_queue.head; 909 (*thrd_it)!= 1p;910 thrd_it = &(*thrd_it)-> link.next909 *thrd_it != 1p; 910 thrd_it = &(*thrd_it)->next 911 911 ) { 912 912 // For each acceptable check if it matches -
libcfa/src/concurrency/preemption.cfa
r8b58bae r7f9968ad 37 37 // FwdDeclarations : timeout handlers 38 38 static void preempt( processor * this ); 39 static void timeout( struct __processor_id_t * id,$thread * this );39 static void timeout( $thread * this ); 40 40 41 41 // FwdDeclarations : Signal handlers … … 88 88 89 89 // Tick one frame of the Discrete Event Simulation for alarms 90 static void tick_preemption( struct __processor_id_t * id) {90 static void tick_preemption() { 91 91 alarm_node_t * node = 0p; // Used in the while loop but cannot be declared in the while condition 92 92 alarm_list_t * alarms = &event_kernel->alarms; // Local copy for ease of reading … … 106 106 } 107 107 else { 108 timeout( id,node->thrd );108 timeout( node->thrd ); 109 109 } 110 110 … … 119 119 // If there are still alarms pending, reset the timer 120 120 if( & (*alarms)`first ) { 121 __cfa dbg_print_buffer_decl(preemption," KERNEL: @%ju(%ju) resetting alarm to %ju.\n", currtime.tv, __kernel_get_time().tv, (alarms->head->alarm - currtime).tv);121 __cfaabi_dbg_print_buffer_decl( " KERNEL: @%ju(%ju) resetting alarm to %ju.\n", currtime.tv, __kernel_get_time().tv, (alarms->head->alarm - currtime).tv); 122 122 Duration delta = (*alarms)`first.alarm - currtime; 123 123 Duration capped = max(delta, 50`us); … … 266 266 267 267 // reserved for future use 268 static void timeout( struct __processor_id_t * id, $thread * this ) { 269 #if !defined( __CFA_NO_STATISTICS__ ) 270 kernelTLS.this_stats = this->curr_cluster->stats; 271 #endif 272 __unpark( id, this __cfaabi_dbg_ctx2 ); 268 static void timeout( $thread * this ) { 269 __unpark( this __cfaabi_dbg_ctx2 ); 273 270 } 274 271 … … 406 403 // Waits on SIGALRM and send SIGUSR1 to whom ever needs it 407 404 static void * alarm_loop( __attribute__((unused)) void * args ) { 408 __processor_id_t id;409 id.id = doregister(&id);410 411 405 // Block sigalrms to control when they arrive 412 406 sigset_t mask; … … 453 447 // __cfaabi_dbg_print_safe( "Kernel : Preemption thread tick\n" ); 454 448 lock( event_kernel->lock __cfaabi_dbg_ctx2 ); 455 tick_preemption( &id);449 tick_preemption(); 456 450 unlock( event_kernel->lock ); 457 451 break; … … 466 460 EXIT: 467 461 __cfaabi_dbg_print_safe( "Kernel : Preemption thread stopping\n" ); 468 unregister(&id);469 462 return 0p; 470 463 } -
libcfa/src/concurrency/thread.cfa
r8b58bae r7f9968ad 28 28 context{ 0p, 0p }; 29 29 self_cor{ name, storage, storageSize }; 30 ticket = 1;31 30 state = Start; 32 31 preempted = __NO_PREEMPTION; … … 36 35 self_mon_p = &self_mon; 37 36 curr_cluster = &cl; 38 link.next = 0p; 39 link.prev = 0p; 40 link.preferred = -1; 37 next = 0p; 41 38 42 39 node.next = 0p; … … 64 61 verify( this_thrd->context.SP ); 65 62 66 __schedule_thread( (__processor_id_t *)kernelTLS.this_processor,this_thrd);63 __schedule_thread(this_thrd); 67 64 enable_interrupts( __cfaabi_dbg_ctx ); 68 65 } -
libcfa/src/containers/stackLockFree.hfa
r8b58bae r7f9968ad 1 // 1 // 2 2 // Cforall Version 1.0.0 Copyright (C) 2017 University of Waterloo 3 3 // The contents of this file are covered under the licence agreement in the 4 4 // file "LICENCE" distributed with Cforall. 5 5 // 6 // stackLockFree.hfa -- 7 // 6 // stackLockFree.hfa -- 7 // 8 8 // Author : Peter A. Buhr 9 9 // Created On : Wed May 13 20:58:58 2020 … … 11 11 // Last Modified On : Sun Jun 14 13:25:09 2020 12 12 // Update Count : 64 13 // 13 // 14 14 15 15 #pragma once … … 31 31 }; // Link 32 32 33 forall( otype T | sized(T) | { Link(T) * ?`next( T * ); } ) {34 33 forall( dtype T | sized(T) | { Link(T) * getNext( T * ); } ) { 34 struct StackLF { 35 35 Link(T) stack; 36 36 }; // StackLF … … 42 42 43 43 void push( StackLF(T) & this, T & n ) with(this) { 44 * ( &n )`next = stack;// atomic assignment unnecessary, or use CAA44 *getNext( &n ) = stack; // atomic assignment unnecessary, or use CAA 45 45 for () { // busy wait 46 if ( __atomic_compare_exchange_n( &stack.atom, & ( &n )`next->atom, (Link(T))@{ {&n, ( &n )`next->count + 1} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) break; // attempt to update top node46 if ( __atomic_compare_exchange_n( &stack.atom, &getNext( &n )->atom, (Link(T))@{ {&n, getNext( &n )->count + 1} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) break; // attempt to update top node 47 47 } // for 48 48 } // push … … 52 52 for () { // busy wait 53 53 if ( t.top == 0p ) return 0p; // empty stack ? 54 if ( __atomic_compare_exchange_n( &stack.atom, &t.atom, (Link(T))@{ { ( t.top )`next->top, t.count} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) return t.top; // attempt to update top node54 if ( __atomic_compare_exchange_n( &stack.atom, &t.atom, (Link(T))@{ {getNext( t.top )->top, t.count} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) return t.top; // attempt to update top node 55 55 } // for 56 56 } // pop 57 58 bool unsafe_remove( StackLF(T) & this, T * node ) with(this) {59 Link(T) * link = &stack;60 for() {61 T * next = link->top;62 if( next == node ) {63 link->top = ( node )`next->top;64 return true;65 }66 if( next == 0p ) return false;67 link = (next)`next;68 }69 }70 57 } // distribution 71 58 } // distribution -
libcfa/src/heap.cfa
r8b58bae r7f9968ad 209 209 #if BUCKETLOCK == LOCKFREE 210 210 static inline { 211 Link(HeapManager.Storage) * ?`next( HeapManager.Storage * this ) { return &this->header.kind.real.next; }211 Link(HeapManager.Storage) * getNext( HeapManager.Storage * this ) { return &this->header.kind.real.next; } 212 212 void ?{}( HeapManager.FreeHeader & ) {} 213 213 void ^?{}( HeapManager.FreeHeader & ) {} … … 667 667 #else 668 668 for ( HeapManager.Storage * p = top( freeLists[i].freeList ); p != 0p; /* p = getNext( p )->top */) { 669 typeof(p) temp = ( p )`next->top; // FIX ME: direct assignent fails, initialization works669 typeof(p) temp = getNext( p )->top; // FIX ME: direct assignent fails, initialization works 670 670 p = temp; 671 671 #endif // BUCKETLOCK … … 903 903 return oaddr; 904 904 } // if 905 905 906 906 // change size, DO NOT preserve STICKY PROPERTIES. 907 907 free( oaddr ); -
libcfa/src/stdhdr/assert.h
r8b58bae r7f9968ad 33 33 #define verify(x) assert(x) 34 34 #define verifyf(x, ...) assertf(x, __VA_ARGS__) 35 #define verifyfail(...)36 35 #define __CFA_WITH_VERIFY__ 37 36 #else 38 37 #define verify(x) 39 38 #define verifyf(x, ...) 40 #define verifyfail(...)41 39 #endif 42 40 -
tests/concurrent/examples/datingService.cfa
r8b58bae r7f9968ad 35 35 signal_block( Boys[ccode] ); // restart boy to set phone number 36 36 } // if 37 // 37 //sout | "Girl:" | PhoneNo | "is dating Boy at" | BoyPhoneNo | "with ccode" | ccode; 38 38 return BoyPhoneNo; 39 39 } // DatingService girl … … 47 47 signal_block( Girls[ccode] ); // restart girl to set phone number 48 48 } // if 49 // 49 //sout | " Boy:" | PhoneNo | "is dating Girl" | GirlPhoneNo | "with ccode" | ccode; 50 50 return GirlPhoneNo; 51 51 } // DatingService boy -
tests/concurrent/signal/disjoint.cfa
r8b58bae r7f9968ad 21 21 #endif 22 22 23 // This tests checks what happens when someone barges in the midle of the release24 // of a bulk of monitors.25 26 23 enum state_t { WAIT, SIGNAL, BARGE }; 27 24 28 25 monitor global_t {}; 26 global_t mut; 29 27 30 28 monitor global_data_t; … … 35 33 int counter; 36 34 state_t state; 37 }; 38 39 // Use a global struct because the order needs to match with Signaller thread 40 struct { 41 global_t mut; 42 global_data_t data; 43 } globals; 35 } data; 44 36 45 37 condition cond; … … 48 40 49 41 void ?{}( global_data_t & this ) { 50 this.counter = 0;42 this.counter == 0; 51 43 this.state = BARGE; 52 44 } … … 61 53 62 54 thread Barger {}; 63 void ?{}( Barger & this ) {64 ((thread&)this){ "Barger Thread" };65 }66 55 67 56 void main( Barger & this ) { 68 57 while( !all_done ) { 69 barge( globals.data );58 barge( data ); 70 59 yield(); 71 60 } … … 89 78 90 79 thread Waiter {}; 91 void ?{}( Waiter & this ) {92 ((thread&)this){ "Waiter Thread" };93 }94 80 95 81 void main( Waiter & this ) { 96 while( wait( globals.mut, globals.data ) ) { KICK_WATCHDOG; yield(); }82 while( wait( mut, data ) ) { KICK_WATCHDOG; yield(); } 97 83 } 98 84 … … 106 92 107 93 void logic( global_t & mutex a ) { 108 signal( cond, a, globals.data );94 signal( cond, a, data ); 109 95 110 96 yield( random( 10 ) ); 111 97 112 98 //This is technically a mutual exclusion violation but the mutex monitor protects us 113 bool running = TEST( globals.data.counter < N) && globals.data.counter > 0;114 if( globals.data.state != SIGNAL && running ) {115 sout | "ERROR Eager signal" | globals.data.state;99 bool running = TEST(data.counter < N) && data.counter > 0; 100 if( data.state != SIGNAL && running ) { 101 sout | "ERROR Eager signal" | data.state; 116 102 } 117 103 } 118 104 119 105 thread Signaller {}; 120 void ?{}( Signaller & this ) {121 ((thread&)this){ "Signaller Thread" };122 }123 106 124 107 void main( Signaller & this ) { 125 108 while( !all_done ) { 126 logic( globals.mut );109 logic( mut ); 127 110 yield(); 128 111 } -
tests/concurrent/waitfor/when.cfa
r8b58bae r7f9968ad 57 57 58 58 void arbiter( global_t & mutex this ) { 59 // There is a race at start where callers can get in before the arbiter.60 // It doesn't really matter here so just restart the loop correctly and move on61 this.last_call = 6;62 63 59 for( int i = 0; i < N; i++ ) { 64 60 when( this.last_call == 6 ) waitfor( call1 : this ) { if( this.last_call != 1) { serr | "Expected last_call to be 1 got" | this.last_call; } } -
tools/gdb/utils-gdb.py
r8b58bae r7f9968ad 59 59 thread_ptr = gdb.lookup_type('struct $thread').pointer(), 60 60 int_ptr = gdb.lookup_type('int').pointer(), 61 thread_state = gdb.lookup_type('enum __Coroutine_State'))61 thread_state = gdb.lookup_type('enum coroutine_state')) 62 62 63 63 def get_addr(addr):
Note:
See TracChangeset
for help on using the changeset viewer.