#pragma once #include forall( T &) { //------------------------------------------------------------ // Queue based on the MCS lock // It is a Multi-Producer/Single-Consumer queue threads pushing // elements must hold on to the elements they push // Not appropriate for an async message queue for example, struct mcs_queue { T * volatile tail; }; static inline void ?{}(mcs_queue(T) & this) { this.tail = 0p; } static inline bool empty(const mcs_queue(T) & this) { return !this.tail; } static inline forall(| { T * volatile & ?`next ( T * ); }) { // Adds an element to the list // Multi-Thread Safe, Lock-Free T * push(mcs_queue(T) & this, T * elem) __attribute__((artificial)); T * push(mcs_queue(T) & this, T * elem) { /* paranoid */ verify(!(elem`next)); // Race to add to the tail T * prev = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST); // If we aren't the first, we need to tell the person before us // No need to if (prev) prev`next = elem; return prev; } // Advances the head of the list, dropping the element given. // Passing an element that is not the head is undefined behavior // NOT Multi-Thread Safe, concurrent pushes are safe T * advance(mcs_queue(T) & this, T * elem) __attribute__((artificial)); T * advance(mcs_queue(T) & this, T * elem) { T * expected = elem; // Check if this is already the last item if (__atomic_compare_exchange_n(&this.tail, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return 0p; // If not wait for next item to show-up, filled by push while (!(elem`next)) Pause(); // we need to return if the next link was empty T * ret = elem`next; // invalidate link to reset to initial state elem`next = 0p; return ret; } } //------------------------------------------------------------ // Queue based on the MCS lock // Extension of the above lock which supports 'blind' pops. // i.e., popping a value from the head without knowing what the head is // has no extra guarantees beyond the mcs_queue struct mpsc_queue { inline mcs_queue(T); T * volatile head; }; static inline void ?{}(mpsc_queue(T) & this) { ((mcs_queue(T)&)this){}; this.head = 0p; } static inline forall(| { T * volatile & ?`next ( T * ); }) { // Added a new element to the queue // Multi-Thread Safe, Lock-Free T * push(mpsc_queue(T) & this, T * elem) __attribute__((artificial)); T * push(mpsc_queue(T) & this, T * elem) { T * prev = push((mcs_queue(T)&)this, elem); if (!prev) this.head = elem; return prev; } // Pop an element from the queue // return the element that was removed // next is set to the new head of the queue // NOT Multi-Thread Safe T * pop(mpsc_queue(T) & this, T *& next) __attribute__((artificial)); T * pop(mpsc_queue(T) & this, T *& next) { T * elem = this.head; // If head is empty just return if (!elem) return 0p; // If there is already someone in the list, then it's easy if (elem`next) { this.head = next = elem`next; // force memory sync __atomic_thread_fence(__ATOMIC_SEQ_CST); // invalidate link to reset to initial state elem`next = 0p; } // Otherwise, there might be a race where it only looks but someone is enqueuing else { // null out head here, because we linearize with push // at the CAS in advance and therefore can write to head // after that point, it could overwrite the write in push this.head = 0p; next = advance((mcs_queue(T)&)this, elem); // Only write to the head if there is a next element // it is the only way we can guarantee we are not overwriting // a write made in push if (next) this.head = next; } // return removed element return elem; } // Same as previous function T * pop(mpsc_queue(T) & this) { T * _ = 0p; return pop(this, _); } } }