#pragma once #include #include #include forall( T & ) { //------------------------------------------------------------ // Queue based on the MCS lock // It is a Multi-Producer/Single-Consumer queue threads pushing // elements must hold on to the elements they push // Not appropriate for an async message queue for example, struct mcs_queue { T * volatile tail; }; static inline void ?{}( mcs_queue(T) & this ) { this.tail = 0p; } static inline bool empty( const mcs_queue(T) & this ) { return ! this.tail; } static inline forall( | { T * volatile & next ( T * ); }) { // Adds an element to the list // Multi-Thread Safe, Lock-Free T * push( mcs_queue(T) & this, T * elem ) __attribute__((artificial)); T * push( mcs_queue(T) & this, T * elem ) { /* paranoid */ verify( ! next( elem ) ); // Race to add to the tail T * prev_val = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST); // If we aren't the first, we need to tell the person before us // No need to if ( prev_val ) next( prev_val ) = elem; return prev_val; } // Advances the head of the list, dropping the element given. // Passing an element that is not the head is undefined behavior // NOT Multi-Thread Safe, concurrent pushes are safe T * advance( mcs_queue(T) & this, T * elem ) __attribute__((artificial)); T * advance( mcs_queue(T) & this, T * elem ) { T * expected = elem; // Check if this is already the last item if (__atomic_compare_exchange_n(&this.tail, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return 0p; // If not wait for next item to show-up, filled by push while ( ! next( elem ) ) Pause(); // we need to return if the next link was empty T * ret = next( elem ); // invalidate link to reset to initial state next( elem ) = 0p; return ret; } } //------------------------------------------------------------ // Queue based on the MCS lock // Extension of the above lock which supports 'blind' pops. // i.e., popping a value from the head without knowing what the head is // has no extra guarantees beyond the mcs_queue struct mpsc_queue { inline mcs_queue(T); T * volatile head; }; static inline void ?{}( mpsc_queue(T) & this ) { ((mcs_queue(T)&)this){}; this.head = 0p; } static inline forall( | { T * volatile & next ( T * ); }) { // Added a new element to the queue // Multi-Thread Safe, Lock-Free T * push( mpsc_queue(T) & this, T * elem ) __attribute__((artificial)); T * push( mpsc_queue(T) & this, T * elem ) { T * prev_val = push( (mcs_queue(T)&)this, elem ); if ( ! prev_val ) this.head = elem; return prev_val; } // Pop an element from the queue // return the element that was removed // head is set to the new head of the queue // NOT Multi-Thread Safe T * pop( mpsc_queue(T) & this, T *& head ) __attribute__((artificial)); T * pop( mpsc_queue(T) & this, T *& head ) { T * elem = this.head; // If head is empty just return if ( ! elem ) return 0p; // If there is already someone in the list, then it's easy if ( next( elem ) ) { this.head = head = next( elem ); // force memory sync __atomic_thread_fence(__ATOMIC_SEQ_CST); // invalidate link to reset to initial state next( elem ) = 0p; } // Otherwise, there might be a race where it only looks but someone is enqueuing else { // null out head here, because we linearize with push // at the CAS in advance and therefore can write to head // after that point, it could overwrite the write in push this.head = 0p; head = advance( (mcs_queue(T)&)this, elem ); // Only write to the head if there is a next element // it is the only way we can guarantee we are not overwriting // a write made in push if ( head ) this.head = head; } // return removed element return elem; } // Same as previous function T * pop( mpsc_queue(T) & this ) { T * _ = 0p; return pop(this, _); } } //------------------------------------------------------------ // Queue based on the MCS lock with poisoning // It is a Multi-Producer/Single-Consumer queue threads pushing // elements must hold on to the elements they push // Not appropriate for an async message queue for example // poisoning the queue prevents any new elements from being push // enum(void*) poison_state { // EMPTY = 0p, // POISON = 1p, // IN_PROGRESS = 1p // }; struct poison_list { T * volatile head; }; static inline void ?{}(poison_list(T) & this) { this.head = 0p; } static inline bool is_poisoned( const poison_list(T) & this ) { return 1p == this.head; } static inline forall( | { T * volatile & next( T * ); }) { // Adds an element to the list // Multi-Thread Safe, Lock-Free bool push( poison_list(T) & this, T * elem ) __attribute__((artificial)); bool push( poison_list(T) & this, T * elem ) { /* paranoid */ verify( 0p == next( elem ) ); __atomic_store_n( &next( elem ), (T *)1p, __ATOMIC_RELAXED ); // read the head up-front T * expected = this.head; for() { // check if it's poisoned if(expected == 1p) return false; // try to CAS the elem in if(__atomic_compare_exchange_n(&this.head, &expected, elem, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED)) { // We managed to exchange in, we are done // We should never succeed the CAS if it's poisonned and the elem should be 1p. /* paranoid */ verify( expected != 1p ); /* paranoid */ verify( next( elem ) == 1p ); // If we aren't the first, we need to tell the person before us // No need to next( elem ) = expected; return true; } } } // Advances the head of the list, dropping the element given. // Passing an element that is not the head is undefined behavior // NOT Multi-Thread Safe, concurrent pushes are safe T * advance( T * elem ) __attribute__((artificial)); T * advance( T * elem ) { T * ret; // Wait for next item to show-up, filled by push while (1p == (ret = __atomic_load_n( &next( elem ), __ATOMIC_RELAXED ) ) ) Pause(); return ret; } // Poison the queue, preveting new pushes and returning the head T * poison( poison_list(T) & this ) __attribute__((artificial)); T * poison( poison_list(T) & this ) { T * ret = __atomic_exchange_n( &this.head, (T*)1p, __ATOMIC_SEQ_CST ); /* paranoid */ verifyf( ret != (T*)1p, "Poison list %p poisoned more than once!", &this ); return ret; } } } forall( T & ) struct LinkData { T * volatile top; // pointer to stack top uintptr_t count; // count each push }; forall( T & ) union Link { LinkData(T) data; #if __SIZEOF_INT128__ == 16 __int128 // gcc, 128-bit integer #else uint64_t // 64-bit integer #endif // __SIZEOF_INT128__ == 16 atom; }; // Link forall( T /*| sized(T)*/ | { Link(T) * next( T * ); } ) { struct StackLF { Link(T) stack; }; // StackLF static inline { void ?{}( StackLF(T) & this ) with(this) { stack.atom = 0; } T * top( StackLF(T) & this ) with(this) { return stack.data.top; } void push( StackLF(T) & this, T & n ) with(this) { *next( &n ) = stack; // atomic assignment unnecessary, or use CAA for () { // busy wait if ( __atomic_compare_exchange_n( &stack.atom, &next( &n )->atom, (Link(T))@{ (LinkData(T))@{ &n, next( &n )->data.count + 1} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) break; // attempt to update top node } // for } // push T * pop( StackLF(T) & this ) with(this) { Link(T) t @= stack; // atomic assignment unnecessary, or use CAA for () { // busy wait if ( t.data.top == 0p ) return 0p; // empty stack ? Link(T) * next = next( t.data.top ); if ( __atomic_compare_exchange_n( &stack.atom, &t.atom, (Link(T))@{ (LinkData(T))@{ next->data.top, t.data.count } }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) return t.data.top; // attempt to update top node } // for } // pop bool unsafe_remove( StackLF(T) & this, T * node ) with(this) { Link(T) * link = &stack; for () { // TODO: Avoiding some problems with double fields access. LinkData(T) * data = &link->data; T * ntop = (T *)&(*data).top; if ( ntop == node ) { data->top = next( node )->data.top; return true; } if ( ntop == 0p ) return false; link = next( ntop ); } } } // distribution } // distribution