#pragma once #include #include "utils.hpp" //------------------------------------------------------------ // Queue based on the MCS lock // It is a Multi-Producer/Single-Consumer queue threads pushing // elements must hold on to the elements they push // Not appropriate for an async message queue for example, template class mcs_queue { node_t * volatile tail; public: mcs_queue(): tail(nullptr) {} inline bool empty() const { return !tail; } node_t * push( node_t * elem ) { /* paranoid */ assert(!elem->_links.next); // Race to add to the tail node_t * prev = __atomic_exchange_n(&tail, elem, __ATOMIC_SEQ_CST); // If we aren't the first, we need to tell the person before us // No need to if (prev) prev->_links.next = elem; return prev; } // Advances the head of the list, dropping the element given. // Passing an element that is not the head is undefined behavior // NOT Multi-Thread Safe, concurrent pushes are safe node_t * advance(node_t * elem) { node_t * expected = elem; // Check if this is already the last item if (__atomic_compare_exchange_n(&tail, &expected, nullptr, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return nullptr; // If not wait for next item to show-up, filled by push while (!elem->_links.next) Pause(); // we need to return if the next link was empty node_t * ret = elem->_links.next; // invalidate link to reset to initial state elem->_links.next = nullptr; return ret; } }; //------------------------------------------------------------ // Queue based on the MCS lock // Extension of the above lock which supports 'blind' pops. // i.e., popping a value from the head without knowing what the head is // has no extra guarantees beyond the mcs_queue template class mpsc_queue : private mcs_queue { node_t * volatile _head; public: mpsc_queue(): mcs_queue(), _head(nullptr) {} inline bool empty() const { return mcs_queue::empty(); } node_t * head() const { return _head; } // Added a new element to the queue // Multi-Thread Safe, Lock-Free inline node_t * push(node_t * elem) { node_t * prev = mcs_queue::push(elem); if (!prev) _head = elem; return prev; } // Pop an element from the queue // return the element that was removed // next is set to the new head of the queue // NOT Multi-Thread Safe inline node_t * pop(node_t *& next) { node_t * elem = _head; // If head is empty just return if (!elem) return nullptr; // If there is already someone in the list, then it's easy if (elem->_links.next) { _head = next = elem->_links.next; // force memory sync __atomic_thread_fence(__ATOMIC_SEQ_CST); // invalidate link to reset to initial state elem->_links.next = nullptr; } // Otherwise, there might be a race where it only looks but someone is enqueuing else { // null out head here, because we linearize with push // at the CAS in advance and therefore can write to head // after that point, it could overwrite the write in push _head = nullptr; next = mcs_queue::advance(elem); // Only write to the head if there is a next element // it is the only way we can guarantee we are not overwriting // a write made in push if (next) _head = next; } // return removed element return elem; } // Same as previous function inline node_t * pop() { node_t * _ = nullptr; return pop(_); } };