#pragma once #include "assert.hpp" #include "utils.hpp" template struct _LinksFields_t { node_t * prev = nullptr; node_t * next = nullptr; volatile unsigned long long ts = 0; unsigned hint = (unsigned)-1; }; template class __attribute__((aligned(128))) intrusive_queue_t { public: typedef spinlock_t lock_t; struct stat { ssize_t diff = 0; size_t push = 0; size_t pop = 0; }; private: struct sentinel_t { _LinksFields_t _links; }; public: lock_t lock; private: sentinel_t before; sentinel_t after; #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Winvalid-offsetof" static constexpr auto fields_offset = offsetof( node_t, _links ); #pragma GCC diagnostic pop public: intrusive_queue_t() : before{{ nullptr, tail() }} , after {{ head(), nullptr }} { /* paranoid */ assert((reinterpret_cast( head() ) + fields_offset) == reinterpret_cast(&before)); /* paranoid */ assert((reinterpret_cast( tail() ) + fields_offset) == reinterpret_cast(&after )); /* paranoid */ assert(head()->_links.prev == nullptr); /* paranoid */ assert(head()->_links.next == tail() ); /* paranoid */ assert(tail()->_links.next == nullptr); /* paranoid */ assert(tail()->_links.prev == head() ); /* paranoid */ assert(sizeof(*this) == 128); /* paranoid */ assert((intptr_t(this) % 128) == 0); } ~intrusive_queue_t() = default; inline node_t * head() const { node_t * rhead = reinterpret_cast( reinterpret_cast( &before ) - fields_offset ); assert(rhead); return rhead; } inline node_t * tail() const { node_t * rtail = reinterpret_cast( reinterpret_cast( &after ) - fields_offset ); assert(rtail); return rtail; } inline bool push(node_t * node) { assert(lock); assert(node->_links.ts != 0); node_t * tail = this->tail(); node_t * prev = tail->_links.prev; // assertf(node->_links.ts >= prev->_links.ts, // "New node has smaller timestamp: %llu < %llu", node->_links.ts, prev->_links.ts); node->_links.next = tail; node->_links.prev = prev; prev->_links.next = node; tail->_links.prev = node; if(before._links.ts == 0l) { before._links.ts = node->_links.ts; assert(node->_links.prev == this->head()); return true; } return false; } inline std::pair pop() { assert(lock); node_t * head = this->head(); node_t * tail = this->tail(); node_t * node = head->_links.next; node_t * next = node->_links.next; if(node == tail) return {nullptr, false}; head->_links.next = next; next->_links.prev = head; if(next == tail) { before._links.ts = 0l; return {node, true}; } else { assert(next->_links.ts != 0); before._links.ts = next->_links.ts; assert(before._links.ts != 0); return {node, false}; } } unsigned long long ts() const { return before._links.ts; } };