Changeset 50aeb6f
- Timestamp:
- Sep 26, 2019, 4:25:04 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, jenkins-sandbox, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 1e24d13
- Parents:
- b2a37b0
- Location:
- doc/theses/thierry_delisle_PhD/code
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
doc/theses/thierry_delisle_PhD/code/assert.hpp
rb2a37b0 r50aeb6f 1 1 #pragma once 2 2 3 #ifndef NDEBUG 3 4 #include <cassert> 4 5 #include <cstdlib> … … 17 18 } \ 18 19 }) 20 #else 21 #define assertf(cond, ...) 22 #endif -
doc/theses/thierry_delisle_PhD/code/processor_list.hpp
rb2a37b0 r50aeb6f 188 188 } 189 189 190 //----------------------------------------------------------------------- 191 // Checking support 192 uint_fast32_t epoch_check() { 193 // Step 1 : lock global lock 194 // It is needed to avoid processors that register mid Critical-Section 195 // to simply lock their own lock and enter. 196 while(lock.load(std::memory_order_relaxed)) 197 asm volatile("pause"); 198 199 // Step 2 : lock per-proc lock 200 // Processors that are currently being registered aren't counted 201 // but can't be in read_lock or in the critical section. 202 // All other processors are counted 203 uint_fast32_t s = ready; 204 for(uint_fast32_t i = 0; i < s; i++) { 205 while(data[i].lock.load(std::memory_order_relaxed)) 206 asm volatile("pause"); 207 } 208 209 return s; 210 } 211 190 212 public: 191 213 }; -
doc/theses/thierry_delisle_PhD/code/processor_list_fast.cpp
rb2a37b0 r50aeb6f 19 19 unsigned id; 20 20 }; 21 void run(unsigned nthread, double duration, unsigned writes ) {21 void run(unsigned nthread, double duration, unsigned writes, unsigned epochs) { 22 22 assert(writes < 100); 23 23 … … 30 30 // Data to check everything is OK 31 31 size_t write_committed = 0ul; 32 std::atomic_size_t lock_cnt_write = { 0ul }; 33 std::atomic_size_t lock_cnt_read = { 0ul }; 32 struct { 33 std::atomic_size_t write = { 0ul }; 34 std::atomic_size_t read = { 0ul }; 35 std::atomic_size_t epoch = { 0ul }; 36 } lock_cnt; 34 37 35 38 // Flag to signal termination … … 39 42 unsigned i = 1; 40 43 for(auto & t : threads) { 41 t = new std::thread([&done, &list, &barrier, &write_committed, &lock_cnt _write, &lock_cnt_read, writes](unsigned tid) {44 t = new std::thread([&done, &list, &barrier, &write_committed, &lock_cnt, writes, epochs](unsigned tid) { 42 45 Random rand(tid + rdtscl()); 43 46 processor proc; … … 45 48 size_t writes_cnt = 0; 46 49 size_t reads_cnt = 0; 50 size_t epoch_cnt = 0; 47 51 48 52 affinity(tid); … … 51 55 52 56 while(__builtin_expect(!done, true)) { 53 if ((rand.next() % 100) < writes) { 57 auto r = rand.next() % 100; 58 if (r < writes) { 54 59 auto n = list.write_lock(); 55 60 write_committed++; … … 57 62 assert(writes_cnt < -2ul); 58 63 list.write_unlock(n); 64 } 65 else if(r < epochs) { 66 list.epoch_check(); 67 epoch_cnt++; 59 68 } 60 69 else { … … 70 79 auto p = list.unregister(proc.id); 71 80 assert(&proc == p); 72 lock_cnt_write += writes_cnt; 73 lock_cnt_read += reads_cnt; 81 lock_cnt.write += writes_cnt; 82 lock_cnt.read += reads_cnt; 83 lock_cnt.epoch += epoch_cnt; 74 84 }, i++); 75 85 } … … 98 108 } 99 109 100 assert(write_committed == lock_cnt _write);110 assert(write_committed == lock_cnt.write); 101 111 102 size_t ops_sec = size_t(double(lock_cnt_read + lock_cnt_write) / duration); 112 size_t totalop = lock_cnt.read + lock_cnt.write + lock_cnt.epoch; 113 size_t ops_sec = size_t(double(totalop) / duration); 103 114 size_t ops_thread = ops_sec / nthread; 104 115 double dur_nano = duration_cast<std::nano>(1.0); 105 116 106 117 std::cout << "Duration : " << duration << "s\n"; 107 std::cout << "Total ops : " << (lock_cnt_read + lock_cnt_write) << "(" << lock_cnt_read << "r, " << lock_cnt_write << "w)\n";118 std::cout << "Total ops : " << totalop << "(" << lock_cnt.read << "r, " << lock_cnt.write << "w, " << lock_cnt.epoch << "e)\n"; 108 119 std::cout << "Ops/sec : " << ops_sec << "\n"; 109 120 std::cout << "Ops/sec/thread: " << ops_thread << "\n"; … … 121 132 unsigned nthreads = 2; 122 133 unsigned writes = 0; 134 unsigned epochs = 0; 123 135 124 136 std::cout.imbue(std::locale("")); … … 126 138 switch (argc) 127 139 { 140 case 5: 141 epochs = std::stoul(argv[4]); 142 [[fallthrough]]; 128 143 case 4: 129 144 writes = std::stoul(argv[3]); 130 if( writes >=100 ) {131 std::cerr << "Writes must be valid percentage, was " << argv[3] << "(" << writes << ")" << std::endl;145 if( (writes + epochs) > 100 ) { 146 std::cerr << "Writes + Epochs must be valid percentage, was " << argv[3] << " + " << argv[4] << "(" << writes << " + " << epochs << ")" << std::endl; 132 147 usage(argv); 133 148 } … … 152 167 check_cache_line_size(); 153 168 154 std::cout << "Running " << nthreads << " threads for " << duration << " seconds with " << writes << "% writes " << std::endl;155 run(nthreads, duration, writes );169 std::cout << "Running " << nthreads << " threads for " << duration << " seconds with " << writes << "% writes and " << epochs << "% epochs" << std::endl; 170 run(nthreads, duration, writes, epochs + writes); 156 171 157 172 return 0; -
doc/theses/thierry_delisle_PhD/code/relaxed_list.cpp
rb2a37b0 r50aeb6f 13 13 #include "utils.hpp" 14 14 15 struct Node {15 struct __attribute__((aligned(64))) Node { 16 16 static std::atomic_size_t creates; 17 17 static std::atomic_size_t destroys; … … 33 33 34 34 static const constexpr int nodes_per_threads = 128; 35 struct NodeArray { 36 __attribute__((aligned(64))) Node * array[nodes_per_threads]; 37 __attribute__((aligned(64))) char pad; 38 }; 35 39 36 40 bool enable_stats = false; 37 41 38 __attribute__((aligned(64))) thread_local pick_stat local_pick; 39 40 void run(unsigned nthread, double duration) { 42 struct local_stat_t { 43 size_t in = 0; 44 size_t out = 0; 45 size_t empty = 0; 46 size_t crc_in = 0; 47 size_t crc_out = 0; 48 }; 49 50 __attribute__((noinline)) void run_body( 51 std::atomic<bool>& done, 52 Random & rand, 53 Node * (&my_nodes)[128], 54 local_stat_t & local, 55 relaxed_list<Node> & list 56 ) { 57 while(__builtin_expect(!done.load(std::memory_order_relaxed), true)) { 58 int idx = rand.next() % nodes_per_threads; 59 if (auto node = my_nodes[idx]) { 60 local.crc_in += node->value; 61 list.push(node); 62 my_nodes[idx] = nullptr; 63 local.in++; 64 } 65 else if(auto node = list.pop()) { 66 local.crc_out += node->value; 67 my_nodes[idx] = node; 68 local.out++; 69 } 70 else { 71 local.empty++; 72 } 73 } 74 } 75 76 void run(unsigned nthread, unsigned nqueues, unsigned fill, double duration) { 41 77 // List being tested 42 relaxed_list<Node> list = { nthread * 2};78 relaxed_list<Node> list = { nthread * nqueues }; 43 79 44 80 // Barrier for synchronization … … 52 88 std::atomic_size_t crc_in = { 0 }; 53 89 std::atomic_size_t crc_out = { 0 }; 54 std::atomic_size_t pick_at = { 0 }; 55 std::atomic_size_t pick_su = { 0 }; 90 struct { 91 struct { 92 std::atomic_size_t attempt = { 0 }; 93 std::atomic_size_t success = { 0 }; 94 } push; 95 struct { 96 std::atomic_size_t attempt = { 0 }; 97 std::atomic_size_t success = { 0 }; 98 } pop; 99 } pick; 56 100 } global; 57 101 … … 61 105 // Prep nodes 62 106 std::cout << "Initializing" << std::endl; 63 std::vector<Node *>all_nodes[nthread];107 NodeArray all_nodes[nthread]; 64 108 for(auto & nodes : all_nodes) { 65 109 Random rand(rdtscl()); 66 nodes.resize(nodes_per_threads); 67 for(auto & node : nodes) { 68 node = new Node(rand.next() % 100); 110 for(auto & node : nodes.array) { 111 auto r = rand.next() % 100; 112 if(r < fill) 113 node = new Node(rand.next() % 100); 69 114 } 70 115 71 116 for(int i = 0; i < 10; i++) { 72 117 int idx = rand.next() % nodes_per_threads; 73 if (auto node = nodes [idx]) {118 if (auto node = nodes.array[idx]) { 74 119 global.crc_in += node->value; 75 120 list.push(node); 76 nodes [idx] = nullptr;121 nodes.array[idx] = nullptr; 77 122 } 78 123 } … … 84 129 unsigned i = 1; 85 130 for(auto & t : threads) { 86 auto & my_nodes = all_nodes[i - 1] ;131 auto & my_nodes = all_nodes[i - 1].array; 87 132 t = new std::thread([&done, &list, &barrier, &global, &my_nodes](unsigned tid) { 88 133 Random rand(tid + rdtscl()); 89 134 90 size_t local_in = 0; 91 size_t local_out = 0; 92 size_t local_empty = 0; 93 size_t local_crc_in = 0; 94 size_t local_crc_out = 0; 95 96 affinity(tid); 135 local_stat_t local; 136 137 // affinity(tid); 97 138 98 139 barrier.wait(tid); … … 100 141 // EXPERIMENT START 101 142 102 while(__builtin_expect(!done, true)) { 103 int idx = rand.next() % nodes_per_threads; 104 if (auto node = my_nodes[idx]) { 105 local_crc_in += node->value; 106 list.push(node); 107 my_nodes[idx] = nullptr; 108 local_in++; 109 } 110 else if(auto node = list.pop2()) { 111 local_crc_out += node->value; 112 my_nodes[idx] = node; 113 local_out++; 114 } 115 else { 116 local_empty++; 117 } 118 } 143 run_body(done, rand, my_nodes, local, list); 119 144 120 145 // EXPERIMENT END … … 122 147 barrier.wait(tid); 123 148 124 global.in += local _in;125 global.out += local _out;126 global.empty += local _empty;149 global.in += local.in; 150 global.out += local.out; 151 global.empty += local.empty; 127 152 128 153 for(auto node : my_nodes) { … … 130 155 } 131 156 132 global.crc_in += local_crc_in; 133 global.crc_out += local_crc_out; 134 135 global.pick_at += local_pick.attempt; 136 global.pick_su += local_pick.success; 157 global.crc_in += local.crc_in; 158 global.crc_out += local.crc_out; 159 160 global.pick.push.attempt += relaxed_list<Node>::tls.pick.push.attempt; 161 global.pick.push.success += relaxed_list<Node>::tls.pick.push.success; 162 global.pick.pop .attempt += relaxed_list<Node>::tls.pick.pop.attempt; 163 global.pick.pop .success += relaxed_list<Node>::tls.pick.pop.success; 137 164 }, i++); 138 165 } … … 143 170 144 171 while(true) { 145 usleep(1000 );172 usleep(100000); 146 173 auto now = Clock::now(); 147 174 duration_t durr = now - before; … … 150 177 break; 151 178 } 179 std::cout << "\r" << durr.count(); 180 std::cout.flush(); 152 181 } 153 182 … … 156 185 duration_t durr = after - before; 157 186 duration = durr.count(); 158 std::cout << " Closing down" << std::endl;187 std::cout << "\nClosing down" << std::endl; 159 188 160 189 for(auto t : threads) { … … 181 210 182 211 std::cout << "Duration : " << duration << "s\n"; 212 std::cout << "ns/Op : " << ( dur_nano / ops_thread )<< "\n"; 213 std::cout << "Ops/sec/thread: " << ops_thread << "\n"; 214 std::cout << "Ops/sec : " << ops_sec << "\n"; 183 215 std::cout << "Total ops : " << ops << "(" << global.in << "i, " << global.out << "o, " << global.empty << "e)\n"; 184 std::cout << "Ops/sec : " << ops_sec << "\n"; 185 std::cout << "Ops/sec/thread: " << ops_thread << "\n"; 186 std::cout << "ns/Op : " << ( dur_nano / ops_thread )<< "\n"; 187 std::cout << "Pick % : " << (100.0 * double(global.pick_su) / global.pick_at) << "(" << global.pick_su << " / " << global.pick_at << ")\n"; 216 #ifndef NO_STATS 217 double push_sur = (100.0 * double(global.pick.push.success) / global.pick.push.attempt); 218 double pop_sur = (100.0 * double(global.pick.pop .success) / global.pick.pop .attempt); 219 std::cout << "Push Pick % : " << push_sur << "(" << global.pick.push.success << " / " << global.pick.push.attempt << ")\n"; 220 std::cout << "Pop Pick % : " << pop_sur << "(" << global.pick.pop .success << " / " << global.pick.pop .attempt << ")\n"; 221 #endif 188 222 } 189 223 190 224 void usage(char * argv[]) { 191 std::cerr << argv[0] << ": [DURATION (FLOAT:SEC)] [NTHREADS] " << std::endl;;225 std::cerr << argv[0] << ": [DURATION (FLOAT:SEC)] [NTHREADS] [NQUEUES] [FILL]" << std::endl;; 192 226 std::exit(1); 193 227 } … … 197 231 double duration = 5.0; 198 232 unsigned nthreads = 2; 233 unsigned nqueues = 2; 234 unsigned fill = 100; 199 235 200 236 std::cout.imbue(std::locale("")); … … 202 238 switch (argc) 203 239 { 240 case 5: 241 nqueues = std::stoul(argv[4]); 242 [[fallthrough]]; 243 case 4: 244 nqueues = std::stoul(argv[3]); 245 [[fallthrough]]; 204 246 case 3: 205 247 nthreads = std::stoul(argv[2]); … … 221 263 check_cache_line_size(); 222 264 223 std::cout << "Running " << nthreads << " threads for " << duration << " seconds" << std::endl;224 run(nthreads, duration);265 std::cout << "Running " << nthreads << " threads (" << (nthreads * nqueues) << " queues) for " << duration << " seconds" << std::endl; 266 run(nthreads, nqueues, duration, fill); 225 267 226 268 return 0; … … 228 270 229 271 template<> 230 thread_local Random relaxed_list<Node>::rng_g = { int(rdtscl()) }; 272 thread_local relaxed_list<Node>::TLS relaxed_list<Node>::tls = {}; 273 274 template<> 275 relaxed_list<Node>::intrusive_queue_t::stat::Dif relaxed_list<Node>::intrusive_queue_t::stat::dif = {}; 276 277 const char * __my_progname = "Relaxed List"; -
doc/theses/thierry_delisle_PhD/code/relaxed_list.hpp
rb2a37b0 r50aeb6f 1 1 #pragma once 2 2 3 #ifndef NO_STATS 3 4 #include <iostream> 5 #endif 6 4 7 #include <memory> 5 8 #include <mutex> … … 11 14 using namespace std; 12 15 16 struct spinlock_t { 17 std::atomic_bool ll = { false }; 18 19 inline void lock() { 20 while( __builtin_expect(ll.exchange(true),false) ) { 21 while(ll.load(std::memory_order_relaxed)) 22 asm volatile("pause"); 23 } 24 } 25 26 inline bool try_lock() { 27 return false == ll.exchange(true); 28 } 29 30 inline void unlock() { 31 ll.store(false, std::memory_order_release); 32 } 33 34 inline explicit operator bool() { 35 return ll.load(std::memory_order_relaxed); 36 } 37 }; 38 39 13 40 extern bool enable_stats; 14 41 15 16 42 struct pick_stat { 17 size_t attempt = 0; 18 size_t success = 0; 19 }; 20 21 extern __attribute__((aligned(64))) thread_local pick_stat local_pick; 43 struct { 44 size_t attempt = 0; 45 size_t success = 0; 46 } push; 47 struct { 48 size_t attempt = 0; 49 size_t success = 0; 50 } pop; 51 }; 22 52 23 53 template<typename node_t> … … 28 58 }; 29 59 30 struct spinlock_t {31 std::atomic_bool ll = { false };32 33 inline void lock() {34 while( __builtin_expect(ll.exchange(true),false) ) {35 while(ll.load(std::memory_order_relaxed))36 asm volatile("pause");37 }38 }39 40 inline void unlock() {41 ll.store(false, std::memory_order_release);42 }43 };44 45 60 template<typename node_t> 46 class relaxed_list {61 class __attribute__((aligned(128))) relaxed_list { 47 62 static_assert(std::is_same<decltype(node_t::_links), _LinksFields_t<node_t>>::value, "Node must have a links field"); 48 63 … … 50 65 public: 51 66 relaxed_list(unsigned numLists) 52 : numLists(numLists)67 : numNonEmpty{0} 53 68 , lists(new intrusive_queue_t[numLists]) 54 , numNonEmpty(0)69 , numLists(numLists) 55 70 {} 56 71 57 void push(node_t * node) { 58 int i = rng_g.next() % numLists; 59 lists[i].push(node, numNonEmpty); 72 ~relaxed_list() { 73 lists.reset(); 74 #ifndef NO_STATS 75 std::cout << "Difference : " 76 << size_t(double(intrusive_queue_t::stat::dif.value) / intrusive_queue_t::stat::dif.num ) << " avg\t" 77 << intrusive_queue_t::stat::dif.max << "max" << std::endl; 78 #endif 79 } 80 81 __attribute__((noinline, hot)) void push(node_t * node) { 82 node->_links.ts = rdtscl(); 83 84 while(true) { 85 // Pick a random list 86 int i = tls.rng.next() % numLists; 87 88 #ifndef NO_STATS 89 tls.pick.push.attempt++; 90 #endif 91 92 // If we can't lock it retry 93 if( !lists[i].lock.try_lock() ) continue; 94 95 // Actually push it 96 lists[i].push(node, numNonEmpty); 97 assert(numNonEmpty <= (int)numLists); 98 99 // Unlock and return 100 lists[i].lock.unlock(); 101 102 #ifndef NO_STATS 103 tls.pick.push.success++; 104 #endif 105 return; 106 } 60 107 } 61 108 62 node_t * pop() { 63 int i = pickRandomly(-1); 64 int j = pickRandomly(i); 65 66 if(i == -1) { 67 return nullptr; 68 } 69 70 auto guard = lock(i, j); 71 auto & list = best(i, j); 72 return list.pop(numNonEmpty); 109 __attribute__((noinline, hot)) node_t * pop() { 110 while(numNonEmpty != 0) { 111 // Pick two lists at random 112 int i = tls.rng.next() % numLists; 113 int j = tls.rng.next() % numLists; 114 115 #ifndef NO_STATS 116 tls.pick.pop.attempt++; 117 #endif 118 119 // Pick the bet list 120 int w = i; 121 if( __builtin_expect(lists[j].ts() != 0, true) ) { 122 w = (lists[i].ts() < lists[j].ts()) ? i : j; 123 } 124 125 auto & list = lists[w]; 126 // If list looks empty retry 127 if( list.ts() == 0 ) continue; 128 129 // If we can't get the lock retry 130 if( !list.lock.try_lock() ) continue; 131 132 // If list is empty, unlock and retry 133 if( list.ts() == 0 ) { 134 list.lock.unlock(); 135 continue; 136 } 137 138 // Actually pop the list 139 auto node = list.pop(numNonEmpty); 140 assert(node); 141 142 // Unlock and return 143 list.lock.unlock(); 144 assert(numNonEmpty >= 0); 145 #ifndef NO_STATS 146 tls.pick.pop.success++; 147 #endif 148 return node; 149 } 150 151 return nullptr; 73 152 } 74 153 75 node_t * pop2() {76 int i = pickRandomly(-1);77 int j = pickRandomly(i);78 79 if(i == -1) {80 return nullptr;81 }82 83 auto & list = best2(i, j);84 return list.pop2(numNonEmpty);85 }86 87 154 private: 88 155 89 class intrusive_queue_t {156 class __attribute__((aligned(128))) intrusive_queue_t { 90 157 public: 91 158 typedef spinlock_t lock_t; 92 159 93 160 friend class relaxed_list<node_t>; 161 162 struct stat { 163 ssize_t diff = 0; 164 165 static struct Dif { 166 ssize_t value = 0; 167 size_t num = 0; 168 ssize_t max = 0; 169 } dif; 170 }; 94 171 95 172 private: … … 98 175 }; 99 176 100 struct stat { 101 size_t push = 0; 102 size_t pop = 0; 103 }; 104 105 __attribute__((aligned(64))) lock_t lock; 106 __attribute__((aligned(64))) bool empty; 107 stat s; 177 lock_t lock; 108 178 sentinel_t before; 109 179 sentinel_t after; 180 stat s; 110 181 111 182 static constexpr auto fields_offset = offsetof( node_t, _links ); 112 183 public: 113 184 intrusive_queue_t() 114 : empty(true) 115 , before{{ nullptr, tail() }} 185 : before{{ nullptr, tail() }} 116 186 , after {{ head(), nullptr }} 117 187 { … … 122 192 assert(tail()->_links.next == nullptr); 123 193 assert(tail()->_links.prev == head() ); 194 assert(sizeof(*this) == 128); 195 assert((intptr_t(this) % 128) == 0); 124 196 } 125 197 126 198 ~intrusive_queue_t() { 127 std::cout << " Push: " << s.push << "\tPop: " << s.pop << "\t(this: " << this << ")" << std::endl; 128 } 129 130 node_t * head() const { 131 return reinterpret_cast<node_t *>( 199 #ifndef NO_STATS 200 stat::dif.value+= s.diff; 201 stat::dif.num ++; 202 stat::dif.max = std::max(stat::dif.max, s.diff); 203 #endif 204 } 205 206 inline node_t * head() const { 207 node_t * rhead = reinterpret_cast<node_t *>( 132 208 reinterpret_cast<uintptr_t>( &before ) - fields_offset 133 209 ); 134 } 135 136 node_t * tail() const { 137 return reinterpret_cast<node_t *>( 210 assert(rhead); 211 return rhead; 212 } 213 214 inline node_t * tail() const { 215 node_t * rtail = reinterpret_cast<node_t *>( 138 216 reinterpret_cast<uintptr_t>( &after ) - fields_offset 139 217 ); 140 } 141 142 void push(node_t * node, volatile int & nonEmpty) { 218 assert(rtail); 219 return rtail; 220 } 221 222 inline void push(node_t * node, std::atomic_int & nonEmpty) { 223 assert(lock); 224 assert(node->_links.ts != 0); 143 225 node_t * tail = this->tail(); 144 std::lock_guard<lock_t> guard(lock);145 node->_links.ts = rdtscl();146 226 147 227 node_t * prev = tail->_links.prev; 148 228 // assertf(node->_links.ts >= prev->_links.ts, 149 // "New node has smaller timestamp: %llu < %llu", node->_links.ts, prev->_links.ts);229 // "New node has smaller timestamp: %llu < %llu", node->_links.ts, prev->_links.ts); 150 230 node->_links.next = tail; 151 231 node->_links.prev = prev; 152 232 prev->_links.next = node; 153 233 tail->_links.prev = node; 154 if(empty) { 155 __atomic_fetch_add(&nonEmpty, 1, __ATOMIC_SEQ_CST); 156 empty = false; 157 } 158 if(enable_stats) s.push++; 159 } 160 161 node_t * pop(volatile int & nonEmpty) { 234 if(before._links.ts == 0l) { 235 nonEmpty += 1; 236 before._links.ts = node->_links.ts; 237 } 238 #ifndef NO_STATS 239 if(enable_stats) s.diff++; 240 #endif 241 } 242 243 inline node_t * pop(std::atomic_int & nonEmpty) { 244 assert(lock); 162 245 node_t * head = this->head(); 163 246 node_t * tail = this->tail(); … … 171 254 172 255 if(next == tail) { 173 empty = true; 174 __atomic_fetch_sub(&nonEmpty, 1, __ATOMIC_SEQ_CST); 175 } 176 if(enable_stats) s.pop++; 256 before._links.ts = 0l; 257 nonEmpty -= 1; 258 } 259 else { 260 assert(next->_links.ts != 0); 261 before._links.ts = next->_links.ts; 262 assert(before._links.ts != 0); 263 } 264 #ifndef NO_STATS 265 if(enable_stats) s.diff--; 266 #endif 177 267 return node; 178 268 } 179 269 180 node_t * pop2(volatile int & nonEmpty) { 181 node_t * head = this->head(); 182 node_t * tail = this->tail(); 183 184 std::lock_guard<lock_t> guard(lock); 185 node_t * node = head->_links.next; 186 node_t * next = node->_links.next; 187 if(node == tail) return nullptr; 188 189 head->_links.next = next; 190 next->_links.prev = head; 191 192 if(next == tail) { 193 empty = true; 194 __atomic_fetch_sub(&nonEmpty, 1, __ATOMIC_SEQ_CST); 195 } 196 if(enable_stats) s.pop++; 197 return node; 198 } 199 200 static intrusive_queue_t & best(intrusive_queue_t & lhs, intrusive_queue_t & rhs) { 201 bool lhs_empty = lhs.empty; 202 bool rhs_empty = rhs.empty; 203 204 if(lhs_empty && rhs_empty) return lhs; 205 if(!lhs_empty && rhs_empty) return lhs; 206 if(lhs_empty && !rhs_empty) return rhs; 207 node_t * lhs_head = lhs.head()->_links.next; 208 node_t * rhs_head = rhs.head()->_links.next; 209 210 assert(lhs_head != lhs.tail()); 211 assert(rhs_head != rhs.tail()); 212 213 if(lhs_head->_links.ts < lhs_head->_links.ts) { 214 return lhs; 215 } else { 216 return rhs; 217 } 218 } 219 220 static intrusive_queue_t & best2(intrusive_queue_t & lhs, intrusive_queue_t & rhs) { 221 node_t * lhs_head = lhs.head()->_links.next; 222 node_t * rhs_head = rhs.head()->_links.next; 223 224 bool lhs_empty = lhs_head != lhs.tail(); 225 bool rhs_empty = rhs_head != rhs.tail(); 226 if(lhs_empty && rhs_empty) return lhs; 227 if(!lhs_empty && rhs_empty) return lhs; 228 if(lhs_empty && !rhs_empty) return rhs; 229 230 if(lhs_head->_links.ts < lhs_head->_links.ts) { 231 return lhs; 232 } else { 233 return rhs; 234 } 270 long long ts() const { 271 return before._links.ts; 235 272 } 236 273 }; 237 274 238 275 276 public: 277 278 static __attribute__((aligned(128))) thread_local struct TLS { 279 Random rng = { int(rdtscl()) }; 280 pick_stat pick; 281 } tls; 282 239 283 private: 240 241 static thread_local Random rng_g; 242 __attribute__((aligned(64))) const unsigned numLists; 243 std::unique_ptr<intrusive_queue_t []> lists; 244 __attribute__((aligned(64))) volatile int numNonEmpty; // number of non-empty lists 245 246 247 private: 248 249 250 251 private: 252 int pickRandomly(const int avoid) { 253 int j; 254 do { 255 local_pick.attempt++; 256 j = rng_g.next() % numLists; 257 if (numNonEmpty < 1 + (avoid != -1)) return -1; 258 } while (j == avoid || lists[j].empty); 259 local_pick.success++; 260 return j; 261 } 262 263 private: 264 265 struct queue_guard { 266 intrusive_queue_t * lists; 267 int i, j; 268 269 queue_guard(intrusive_queue_t * lists, int i, int j) 270 : lists(lists), i(i), j(j) 271 { 272 if(i >= 0) lists[i].lock.lock(); 273 if(j >= 0) lists[j].lock.lock(); 274 } 275 276 queue_guard(const queue_guard &) = delete; 277 queue_guard(queue_guard &&) = default; 278 279 ~queue_guard() { 280 if(i >= 0) lists[i].lock.unlock(); 281 if(j >= 0) lists[j].lock.unlock(); 282 } 283 }; 284 285 auto lock(int i, int j) { 286 assert(i >= 0); 287 assert(i != j); 288 if(j < i) return queue_guard(lists.get(), j, i); 289 return queue_guard(lists.get(), i, j); 290 } 291 292 intrusive_queue_t & best(int i, int j) { 293 assert(i != -1); 294 if(j == -1) return lists[i]; 295 return intrusive_queue_t::best(lists[i], lists[j]); 296 } 297 298 intrusive_queue_t & best2(int i, int j) { 299 assert(i != -1); 300 if(j == -1) return lists[i]; 301 return intrusive_queue_t::best2(lists[i], lists[j]); 302 } 303 }; 284 std::atomic_int numNonEmpty; // number of non-empty lists 285 __attribute__((aligned(64))) std::unique_ptr<intrusive_queue_t []> lists; 286 const unsigned numLists; 287 288 public: 289 static const constexpr size_t sizeof_queue = sizeof(intrusive_queue_t); 290 };
Note: See TracChangeset
for help on using the changeset viewer.