#pragma once #ifndef NO_STATS #include #endif #include #include #include #include "assert.hpp" #include "utils.hpp" using namespace std; struct spinlock_t { std::atomic_bool ll = { false }; inline void lock() { while( __builtin_expect(ll.exchange(true),false) ) { while(ll.load(std::memory_order_relaxed)) asm volatile("pause"); } } inline bool try_lock() { return false == ll.exchange(true); } inline void unlock() { ll.store(false, std::memory_order_release); } inline explicit operator bool() { return ll.load(std::memory_order_relaxed); } }; static inline bool bts(std::atomic_size_t & target, size_t bit ) { //* int result = 0; asm volatile( "LOCK btsq %[bit], %[target]\n\t" :"=@ccc" (result) : [target] "m" (target), [bit] "r" (bit) ); return result != 0; /*/ size_t mask = 1ul << bit; size_t ret = target.fetch_or(mask, std::memory_order_relaxed); return (ret & mask) != 0; //*/ } static inline bool btr(std::atomic_size_t & target, size_t bit ) { //* int result = 0; asm volatile( "LOCK btrq %[bit], %[target]\n\t" :"=@ccc" (result) : [target] "m" (target), [bit] "r" (bit) ); return result != 0; /*/ size_t mask = 1ul << bit; size_t ret = target.fetch_and(~mask, std::memory_order_relaxed); return (ret & mask) != 0; //*/ } extern bool enable_stats; struct pick_stat { struct { size_t attempt = 0; size_t success = 0; } push; struct { size_t attempt = 0; size_t success = 0; size_t mask_attempt = 0; } pop; }; struct empty_stat { struct { size_t value = 0; size_t count = 0; } push; struct { size_t value = 0; size_t count = 0; } pop; }; template struct _LinksFields_t { node_t * prev = nullptr; node_t * next = nullptr; unsigned long long ts = 0; }; template class __attribute__((aligned(128))) relaxed_list { static_assert(std::is_same>::value, "Node must have a links field"); public: relaxed_list(unsigned numLists) : lists(new intrusive_queue_t[numLists]) , numLists(numLists) { assertf(7 * 8 * 8 >= numLists, "List currently only supports 448 sublists"); // assert(sizeof(*this) == 128); std::cout << "Constructing Relaxed List with " << numLists << std::endl; #ifndef NO_STATS if(head) this->next = head; head = this; #endif } ~relaxed_list() { std::cout << "Destroying Relaxed List" << std::endl; lists.reset(); } __attribute__((noinline, hot)) void push(node_t * node) { node->_links.ts = rdtscl(); while(true) { // Pick a random list unsigned i = tls.rng.next() % numLists; #ifndef NO_STATS tls.pick.push.attempt++; #endif // If we can't lock it retry if( !lists[i].lock.try_lock() ) continue; __attribute__((unused)) int num = numNonEmpty; // Actually push it if(lists[i].push(node)) { numNonEmpty++; size_t qword = i >> 6ull; size_t bit = i & 63ull; assertf((list_mask[qword] & (1ul << bit)) == 0, "Before set %zu:%zu (%u), %zx & %zx", qword, bit, i, list_mask[qword].load(), (1ul << bit)); __attribute__((unused)) bool ret = bts(list_mask[qword], bit); assert(!ret); assertf((list_mask[qword] & (1ul << bit)) != 0, "After set %zu:%zu (%u), %zx & %zx", qword, bit, i, list_mask[qword].load(), (1ul << bit)); } assert(numNonEmpty <= (int)numLists); // Unlock and return lists[i].lock.unlock(); #ifndef NO_STATS tls.pick.push.success++; tls.empty.push.value += num; tls.empty.push.count += 1; #endif return; } } __attribute__((noinline, hot)) node_t * pop() { #if !defined(NO_BITMASK) // for(int r = 0; r < 10 && numNonEmpty != 0; r++) { // // Pick two lists at random // unsigned i = tls.rng.next() % numLists; // unsigned j = tls.rng.next() % numLists; // if(auto node = try_pop(i, j)) return node; // } int nnempty; while(0 != (nnempty = numNonEmpty)) { tls.pick.pop.mask_attempt++; unsigned i, j; // if( numLists < 4 || (numLists / nnempty) < 4 ) { // // Pick two lists at random // i = tls.rng.next() % numLists; // j = tls.rng.next() % numLists; // } else { #ifndef NO_STATS // tls.pick.push.mask_attempt++; #endif // Pick two lists at random unsigned num = ((numLists - 1) >> 6) + 1; unsigned ri = tls.rng.next(); unsigned rj = tls.rng.next(); unsigned wdxi = (ri >> 6u) % num; unsigned wdxj = (rj >> 6u) % num; size_t maski = list_mask[wdxi].load(std::memory_order_relaxed); size_t maskj = list_mask[wdxj].load(std::memory_order_relaxed); if(maski == 0 && maskj == 0) continue; unsigned bi = rand_bit(ri, maski); unsigned bj = rand_bit(rj, maskj); assertf(bi < 64, "%zu %u", maski, bi); assertf(bj < 64, "%zu %u", maskj, bj); i = bi | (wdxi << 6); j = bj | (wdxj << 6); assertf(i < numLists, "%u", wdxi << 6); assertf(j < numLists, "%u", wdxj << 6); } if(auto node = try_pop(i, j)) return node; } #else while(numNonEmpty != 0) { // Pick two lists at random int i = tls.rng.next() % numLists; int j = tls.rng.next() % numLists; if(auto node = try_pop(i, j)) return node; } #endif return nullptr; } private: node_t * try_pop(unsigned i, unsigned j) { #ifndef NO_STATS tls.pick.pop.attempt++; #endif // Pick the bet list int w = i; if( __builtin_expect(lists[j].ts() != 0, true) ) { w = (lists[i].ts() < lists[j].ts()) ? i : j; } auto & list = lists[w]; // If list looks empty retry if( list.ts() == 0 ) return nullptr; // If we can't get the lock retry if( !list.lock.try_lock() ) return nullptr; __attribute__((unused)) int num = numNonEmpty; // If list is empty, unlock and retry if( list.ts() == 0 ) { list.lock.unlock(); return nullptr; } // Actually pop the list node_t * node; bool emptied; std::tie(node, emptied) = list.pop(); assert(node); if(emptied) { numNonEmpty--; size_t qword = w >> 6ull; size_t bit = w & 63ull; assert((list_mask[qword] & (1ul << bit)) != 0); __attribute__((unused)) bool ret = btr(list_mask[qword], bit); assert(ret); assert((list_mask[qword] & (1ul << bit)) == 0); } // Unlock and return list.lock.unlock(); assert(numNonEmpty >= 0); #ifndef NO_STATS tls.pick.pop.success++; tls.empty.pop.value += num; tls.empty.pop.count += 1; #endif return node; } private: class __attribute__((aligned(128))) intrusive_queue_t { public: typedef spinlock_t lock_t; friend class relaxed_list; struct stat { ssize_t diff = 0; size_t push = 0; size_t pop = 0; // size_t value = 0; // size_t count = 0; }; private: struct sentinel_t { _LinksFields_t _links; }; lock_t lock; sentinel_t before; sentinel_t after; #ifndef NO_STATS stat s; #endif #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Winvalid-offsetof" static constexpr auto fields_offset = offsetof( node_t, _links ); #pragma GCC diagnostic pop public: intrusive_queue_t() : before{{ nullptr, tail() }} , after {{ head(), nullptr }} { /* paranoid */ assert((reinterpret_cast( head() ) + fields_offset) == reinterpret_cast(&before)); /* paranoid */ assert((reinterpret_cast( tail() ) + fields_offset) == reinterpret_cast(&after )); /* paranoid */ assert(head()->_links.prev == nullptr); /* paranoid */ assert(head()->_links.next == tail() ); /* paranoid */ assert(tail()->_links.next == nullptr); /* paranoid */ assert(tail()->_links.prev == head() ); /* paranoid */ assert(sizeof(*this) == 128); /* paranoid */ assert((intptr_t(this) % 128) == 0); } ~intrusive_queue_t() = default; inline node_t * head() const { node_t * rhead = reinterpret_cast( reinterpret_cast( &before ) - fields_offset ); assert(rhead); return rhead; } inline node_t * tail() const { node_t * rtail = reinterpret_cast( reinterpret_cast( &after ) - fields_offset ); assert(rtail); return rtail; } inline bool push(node_t * node) { assert(lock); assert(node->_links.ts != 0); node_t * tail = this->tail(); node_t * prev = tail->_links.prev; // assertf(node->_links.ts >= prev->_links.ts, // "New node has smaller timestamp: %llu < %llu", node->_links.ts, prev->_links.ts); node->_links.next = tail; node->_links.prev = prev; prev->_links.next = node; tail->_links.prev = node; #ifndef NO_STATS if(enable_stats) { s.diff++; s.push++; } #endif if(before._links.ts == 0l) { before._links.ts = node->_links.ts; assert(node->_links.prev == this->head()); return true; } return false; } inline std::pair pop() { assert(lock); node_t * head = this->head(); node_t * tail = this->tail(); node_t * node = head->_links.next; node_t * next = node->_links.next; if(node == tail) return {nullptr, false}; head->_links.next = next; next->_links.prev = head; #ifndef NO_STATS if(enable_stats) { s.diff--; s.pop ++; } #endif if(next == tail) { before._links.ts = 0l; return {node, true}; } else { assert(next->_links.ts != 0); before._links.ts = next->_links.ts; assert(before._links.ts != 0); return {node, false}; } } long long ts() const { return before._links.ts; } }; public: static __attribute__((aligned(128))) thread_local struct TLS { Random rng = { int(rdtscl()) }; pick_stat pick; empty_stat empty; } tls; public: std::atomic_int numNonEmpty = { 0 }; // number of non-empty lists std::atomic_size_t list_mask[7] = { {0}, {0}, {0}, {0}, {0}, {0}, {0} }; // which queues are empty private: __attribute__((aligned(64))) std::unique_ptr lists; const unsigned numLists; public: static const constexpr size_t sizeof_queue = sizeof(intrusive_queue_t); #ifndef NO_STATS static void stats_print(std::ostream & os) { auto it = head; while(it) { it->stats_print_local(os); it = it->next; } } static void stats_tls_tally() { global_stats.pick.push.attempt += tls.pick.push.attempt; global_stats.pick.push.success += tls.pick.push.success; global_stats.pick.pop .attempt += tls.pick.pop.attempt; global_stats.pick.pop .success += tls.pick.pop.success; global_stats.pick.pop .mask_attempt += tls.pick.pop.mask_attempt; global_stats.qstat.push.value += tls.empty.push.value; global_stats.qstat.push.count += tls.empty.push.count; global_stats.qstat.pop .value += tls.empty.pop .value; global_stats.qstat.pop .count += tls.empty.pop .count; } private: static struct GlobalStats { struct { struct { std::atomic_size_t attempt = { 0 }; std::atomic_size_t success = { 0 }; } push; struct { std::atomic_size_t attempt = { 0 }; std::atomic_size_t success = { 0 }; std::atomic_size_t mask_attempt = { 0 }; } pop; } pick; struct { struct { std::atomic_size_t value = { 0 }; std::atomic_size_t count = { 0 }; } push; struct { std::atomic_size_t value = { 0 }; std::atomic_size_t count = { 0 }; } pop; } qstat; } global_stats; // Link list of all lists for stats __attribute__((aligned(64))) relaxed_list * next = nullptr; static relaxed_list * head; void stats_print_local(std::ostream & os ) { std::cout << "----- Relaxed List Stats -----" << std::endl; { ssize_t diff = 0; size_t num = 0; ssize_t max = 0; for(size_t i = 0; i < numLists; i++) { const auto & list = lists[i]; diff+= list.s.diff; num ++; max = std::abs(max) > std::abs(list.s.diff) ? max : list.s.diff; os << "Local Q ops : " << (list.s.push + list.s.pop) << "(" << list.s.push << "i, " << list.s.pop << "o)\n"; } os << "Difference : " << ssize_t(double(diff) / num ) << " avg\t" << max << "max" << std::endl; } const auto & global = global_stats; double push_sur = (100.0 * double(global.pick.push.success) / global.pick.push.attempt); double pop_sur = (100.0 * double(global.pick.pop .success) / global.pick.pop .attempt); double mpop_sur = (100.0 * double(global.pick.pop .success) / global.pick.pop .mask_attempt); os << "Push Pick % : " << push_sur << "(" << global.pick.push.success << " / " << global.pick.push.attempt << ")\n"; os << "Pop Pick % : " << pop_sur << "(" << global.pick.pop .success << " / " << global.pick.pop .attempt << ")\n"; os << "TryPop Pick % : " << mpop_sur << "(" << global.pick.pop .success << " / " << global.pick.pop .mask_attempt << ")\n"; double avgQ_push = double(global.qstat.push.value) / global.qstat.push.count; double avgQ_pop = double(global.qstat.pop .value) / global.qstat.pop .count; double avgQ = double(global.qstat.push.value + global.qstat.pop .value) / (global.qstat.push.count + global.qstat.pop .count); os << "Push Avg Qs : " << avgQ_push << " (" << global.qstat.push.count << "ops)\n"; os << "Pop Avg Qs : " << avgQ_pop << " (" << global.qstat.pop .count << "ops)\n"; os << "Global Avg Qs : " << avgQ << " (" << (global.qstat.push.count + global.qstat.pop .count) << "ops)\n"; } #endif };