#pragma once #include #include #include forall( T & ) { //------------------------------------------------------------ // Queue based on the MCS lock // It is a Multi-Producer/Single-Consumer queue threads pushing // elements must hold on to the elements they push // Not appropriate for an async message queue for example, struct mcs_queue { T * volatile tail; }; static inline void ?{}(mcs_queue(T) & this) { this.tail = 0p; } static inline bool empty(const mcs_queue(T) & this) { return !this.tail; } static inline forall(| { T * volatile & ?`next ( T * ); }) { // Adds an element to the list // Multi-Thread Safe, Lock-Free T * push(mcs_queue(T) & this, T * elem) __attribute__((artificial)); T * push(mcs_queue(T) & this, T * elem) { /* paranoid */ verify(!(elem`next)); // Race to add to the tail T * prev = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST); // If we aren't the first, we need to tell the person before us // No need to if (prev) prev`next = elem; return prev; } // Advances the head of the list, dropping the element given. // Passing an element that is not the head is undefined behavior // NOT Multi-Thread Safe, concurrent pushes are safe T * advance(mcs_queue(T) & this, T * elem) __attribute__((artificial)); T * advance(mcs_queue(T) & this, T * elem) { T * expected = elem; // Check if this is already the last item if (__atomic_compare_exchange_n(&this.tail, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return 0p; // If not wait for next item to show-up, filled by push while (!(elem`next)) Pause(); // we need to return if the next link was empty T * ret = elem`next; // invalidate link to reset to initial state elem`next = 0p; return ret; } } //------------------------------------------------------------ // Queue based on the MCS lock // Extension of the above lock which supports 'blind' pops. // i.e., popping a value from the head without knowing what the head is // has no extra guarantees beyond the mcs_queue struct mpsc_queue { inline mcs_queue(T); T * volatile head; }; static inline void ?{}(mpsc_queue(T) & this) { ((mcs_queue(T)&)this){}; this.head = 0p; } static inline forall(| { T * volatile & ?`next ( T * ); }) { // Added a new element to the queue // Multi-Thread Safe, Lock-Free T * push(mpsc_queue(T) & this, T * elem) __attribute__((artificial)); T * push(mpsc_queue(T) & this, T * elem) { T * prev = push((mcs_queue(T)&)this, elem); if (!prev) this.head = elem; return prev; } // Pop an element from the queue // return the element that was removed // next is set to the new head of the queue // NOT Multi-Thread Safe T * pop(mpsc_queue(T) & this, T *& next) __attribute__((artificial)); T * pop(mpsc_queue(T) & this, T *& next) { T * elem = this.head; // If head is empty just return if (!elem) return 0p; // If there is already someone in the list, then it's easy if (elem`next) { this.head = next = elem`next; // force memory sync __atomic_thread_fence(__ATOMIC_SEQ_CST); // invalidate link to reset to initial state elem`next = 0p; } // Otherwise, there might be a race where it only looks but someone is enqueuing else { // null out head here, because we linearize with push // at the CAS in advance and therefore can write to head // after that point, it could overwrite the write in push this.head = 0p; next = advance((mcs_queue(T)&)this, elem); // Only write to the head if there is a next element // it is the only way we can guarantee we are not overwriting // a write made in push if (next) this.head = next; } // return removed element return elem; } // Same as previous function T * pop(mpsc_queue(T) & this) { T * _ = 0p; return pop(this, _); } } //------------------------------------------------------------ // Queue based on the MCS lock with poisoning // It is a Multi-Producer/Single-Consumer queue threads pushing // elements must hold on to the elements they push // Not appropriate for an async message queue for example // poisoning the queue prevents any new elements from being push // enum(void*) poison_state { // EMPTY = 0p, // POISON = 1p, // IN_PROGRESS = 1p // }; struct poison_list { T * volatile head; }; static inline void ?{}(poison_list(T) & this) { this.head = 0p; } static inline bool is_poisoned( const poison_list(T) & this ) { return 1p == this.head; } static inline forall(| { T * volatile & ?`next ( T * ); }) { // Adds an element to the list // Multi-Thread Safe, Lock-Free bool push(poison_list(T) & this, T * elem) __attribute__((artificial)); bool push(poison_list(T) & this, T * elem) { /* paranoid */ verify(0p == (elem`next)); __atomic_store_n( &elem`next, (T*)1p, __ATOMIC_RELAXED ); // read the head up-front T * expected = this.head; for() { // check if it's poisoned if(expected == 1p) return false; // try to CAS the elem in if(__atomic_compare_exchange_n(&this.head, &expected, elem, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED)) { // We managed to exchange in, we are done // We should never succeed the CAS if it's poisonned and the elem should be 1p. /* paranoid */ verify( expected != 1p ); /* paranoid */ verify( elem`next == 1p ); // If we aren't the first, we need to tell the person before us // No need to elem`next = expected; return true; } } } // Advances the head of the list, dropping the element given. // Passing an element that is not the head is undefined behavior // NOT Multi-Thread Safe, concurrent pushes are safe T * advance(T * elem) __attribute__((artificial)); T * advance(T * elem) { T * ret; // Wait for next item to show-up, filled by push while (1p == (ret = __atomic_load_n(&elem`next, __ATOMIC_RELAXED))) Pause(); return ret; } // Poison the queue, preveting new pushes and returning the head T * poison(poison_list(T) & this) __attribute__((artificial)); T * poison(poison_list(T) & this) { T * ret = __atomic_exchange_n( &this.head, (T*)1p, __ATOMIC_SEQ_CST ); /* paranoid */ verifyf( ret != (T*)1p, "Poison list %p poisoned more than once!", &this ); return ret; } } } forall( T & ) struct LinkData { T * volatile top; // pointer to stack top uintptr_t count; // count each push }; forall( T & ) union Link { LinkData(T) data; #if __SIZEOF_INT128__ == 16 __int128 // gcc, 128-bit integer #else uint64_t // 64-bit integer #endif // __SIZEOF_INT128__ == 16 atom; }; // Link forall( T /*| sized(T)*/ | { Link(T) * ?`next( T * ); } ) { struct StackLF { Link(T) stack; }; // StackLF static inline { void ?{}( StackLF(T) & this ) with(this) { stack.atom = 0; } T * top( StackLF(T) & this ) with(this) { return stack.data.top; } void push( StackLF(T) & this, T & n ) with(this) { *( &n )`next = stack; // atomic assignment unnecessary, or use CAA for () { // busy wait if ( __atomic_compare_exchange_n( &stack.atom, &( &n )`next->atom, (Link(T))@{ (LinkData(T))@{ &n, ( &n )`next->data.count + 1} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) break; // attempt to update top node } // for } // push T * pop( StackLF(T) & this ) with(this) { Link(T) t @= stack; // atomic assignment unnecessary, or use CAA for () { // busy wait if ( t.data.top == 0p ) return 0p; // empty stack ? Link(T) * next = ( t.data.top )`next; if ( __atomic_compare_exchange_n( &stack.atom, &t.atom, (Link(T))@{ (LinkData(T))@{ next->data.top, t.data.count } }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) return t.data.top; // attempt to update top node } // for } // pop bool unsafe_remove( StackLF(T) & this, T * node ) with(this) { Link(T) * link = &stack; for () { // TODO: Avoiding some problems with double fields access. LinkData(T) * data = &link->data; T * next = (T *)&(*data).top; if ( next == node ) { data->top = ( node )`next->data.top; return true; } if ( next == 0p ) return false; link = ( next )`next; } } } // distribution } // distribution