source: libcfa/src/containers/lockfree.hfa @ 0b1ca47

ADTast-experimental
Last change on this file since 0b1ca47 was 88ac843e, checked in by Thierry Delisle <tdelisle@…>, 2 years ago

Moved lockfree containers to containers/lockfree.hfa.
Added poison_list, which is a lock-free bag with push and poison as only operations.

  • Property mode set to 100644
File size: 7.7 KB
Line 
1#pragma once
2
3#include <assert.h>
4
5#include <stdint.h>
6#include <bits/defs.hfa>
7
8forall( T &) {
9        //------------------------------------------------------------
10        // Queue based on the MCS lock
11        // It is a Multi-Producer/Single-Consumer queue threads pushing
12        // elements must hold on to the elements they push
13        // Not appropriate for an async message queue for example,
14        struct mcs_queue {
15                T * volatile tail;
16        };
17
18        static inline void ?{}(mcs_queue(T) & this) { this.tail = 0p; }
19        static inline bool empty(const mcs_queue(T) & this) { return !this.tail; }
20
21        static inline forall(| { T * volatile & ?`next ( T * ); })
22        {
23                // Adds an element to the list
24                // Multi-Thread Safe, Lock-Free
25                T * push(mcs_queue(T) & this, T * elem) __attribute__((artificial));
26                T * push(mcs_queue(T) & this, T * elem) {
27                        /* paranoid */ verify(!(elem`next));
28                        // Race to add to the tail
29                        T * prev = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST);
30                        // If we aren't the first, we need to tell the person before us
31                        // No need to
32                        if (prev) prev`next = elem;
33                        return prev;
34                }
35
36                // Advances the head of the list, dropping the element given.
37                // Passing an element that is not the head is undefined behavior
38                // NOT Multi-Thread Safe, concurrent pushes are safe
39                T * advance(mcs_queue(T) & this, T * elem) __attribute__((artificial));
40                T * advance(mcs_queue(T) & this, T * elem) {
41                        T * expected = elem;
42                        // Check if this is already the last item
43                        if (__atomic_compare_exchange_n(&this.tail, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return 0p;
44
45                        // If not wait for next item to show-up, filled by push
46                        while (!(elem`next)) Pause();
47
48                        // we need to return if the next link was empty
49                        T * ret = elem`next;
50
51                        // invalidate link to reset to initial state
52                        elem`next = 0p;
53                        return ret;
54                }
55        }
56
57        //------------------------------------------------------------
58        // Queue based on the MCS lock
59        // Extension of the above lock which supports 'blind' pops.
60        // i.e., popping a value from the head without knowing what the head is
61        // has no extra guarantees beyond the mcs_queue
62        struct mpsc_queue {
63                inline mcs_queue(T);
64                T * volatile head;
65        };
66
67        static inline void ?{}(mpsc_queue(T) & this) {
68                ((mcs_queue(T)&)this){};
69                this.head = 0p;
70        }
71
72        static inline forall(| { T * volatile & ?`next ( T * ); })
73        {
74                // Added a new element to the queue
75                // Multi-Thread Safe, Lock-Free
76                T * push(mpsc_queue(T) & this, T * elem) __attribute__((artificial));
77                T * push(mpsc_queue(T) & this, T * elem) {
78                        T * prev = push((mcs_queue(T)&)this, elem);
79                        if (!prev) this.head = elem;
80                        return prev;
81                }
82
83                // Pop an element from the queue
84                // return the element that was removed
85                // next is set to the new head of the queue
86                // NOT Multi-Thread Safe
87                T * pop(mpsc_queue(T) & this, T *& next) __attribute__((artificial));
88                T * pop(mpsc_queue(T) & this, T *& next) {
89                        T * elem = this.head;
90                        // If head is empty just return
91                        if (!elem) return 0p;
92
93                        // If there is already someone in the list, then it's easy
94                        if (elem`next) {
95                                this.head = next = elem`next;
96                                // force memory sync
97                                __atomic_thread_fence(__ATOMIC_SEQ_CST);
98
99                                // invalidate link to reset to initial state
100                                elem`next = 0p;
101                        }
102                        // Otherwise, there might be a race where it only looks but someone is enqueuing
103                        else {
104                                // null out head here, because we linearize with push
105                                // at the CAS in advance and therefore can write to head
106                                // after that point, it could overwrite the write in push
107                                this.head = 0p;
108                                next = advance((mcs_queue(T)&)this, elem);
109
110                                // Only write to the head if there is a next element
111                                // it is the only way we can guarantee we are not overwriting
112                                // a write made in push
113                                if (next) this.head = next;
114                        }
115
116                        // return removed element
117                        return elem;
118                }
119
120                // Same as previous function
121                T * pop(mpsc_queue(T) & this) {
122                        T * _ = 0p;
123                        return pop(this, _);
124                }
125        }
126
127        //------------------------------------------------------------
128        // Queue based on the MCS lock with poisoning
129        // It is a Multi-Producer/Single-Consumer queue threads pushing
130        // elements must hold on to the elements they push
131        // Not appropriate for an async message queue for example
132        // poisoning the queue prevents any new elements from being push
133        // enum(void*) poison_state {
134        //      EMPTY = 0p,
135        //      POISON = 1p,
136        //      IN_PROGRESS = 1p
137        // };
138
139        struct poison_list {
140                T * volatile head;
141        };
142
143        static inline void ?{}(poison_list(T) & this) { this.head = 0p; }
144
145        static inline forall(| { T * volatile & ?`next ( T * ); })
146        {
147                // Adds an element to the list
148                // Multi-Thread Safe, Lock-Free
149                T * push(poison_list(T) & this, T * elem) __attribute__((artificial));
150                T * push(poison_list(T) & this, T * elem) {
151                        /* paranoid */ verify(0p == (elem`next));
152                        __atomic_store_n( &elem`next, (T*)1p, __ATOMIC_RELAXED );
153
154                        // read the head up-front
155                        T * expected = this.head;
156                        for() {
157                                // check if it's poisoned
158                                if(expected == 1p) return 0p;
159
160                                // try to CAS the elem in
161                                if(__atomic_compare_exchange_n(&this.head, &expected, elem, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED)) {
162                                        // We managed to exchange in, we are done
163
164                                        // We should never succeed the CAS if it's poisonned.
165                                        /* paranoid */ verify( expected != 1p );
166
167                                        // If we aren't the first, we need to tell the person before us
168                                        // No need to
169                                        elem`next = expected;
170                                        return expected;
171                                }
172                        }
173                }
174
175                // Advances the head of the list, dropping the element given.
176                // Passing an element that is not the head is undefined behavior
177                // NOT Multi-Thread Safe, concurrent pushes are safe
178                T * advance(T * elem) __attribute__((artificial));
179                T * advance(T * elem) {
180                        T * ret;
181
182                        // Wait for next item to show-up, filled by push
183                        while (1p == (ret = __atomic_load_n(&elem`next, __ATOMIC_RELAXED))) Pause();
184
185                        return ret;
186                }
187
188                // Poison the queue, preveting new pushes and returning the head
189                T * poison(poison_list(T) & this) __attribute__((artificial));
190                T * poison(poison_list(T) & this) {
191                        T * ret = __atomic_exchange_n( &this.head, (T*)1p, __ATOMIC_SEQ_CST );
192                        /* paranoid */ verify( ret != (T*)1p );
193                        return ret;
194                }
195        }
196}
197
198forall( T & )
199union Link {
200        struct {                                                                                        // 32/64-bit x 2
201                T * volatile top;                                                               // pointer to stack top
202                uintptr_t count;                                                                // count each push
203        };
204        #if __SIZEOF_INT128__ == 16
205        __int128                                                                                        // gcc, 128-bit integer
206        #else
207        uint64_t                                                                                        // 64-bit integer
208        #endif // __SIZEOF_INT128__ == 16
209        atom;
210}; // Link
211
212forall( T | sized(T) | { Link(T) * ?`next( T * ); } ) {
213        struct StackLF {
214                Link(T) stack;
215        }; // StackLF
216
217        static inline {
218                void ?{}( StackLF(T) & this ) with(this) { stack.atom = 0; }
219
220                T * top( StackLF(T) & this ) with(this) { return stack.top; }
221
222                void push( StackLF(T) & this, T & n ) with(this) {
223                        *( &n )`next = stack;                                           // atomic assignment unnecessary, or use CAA
224                        for () {                                                                        // busy wait
225                          if ( __atomic_compare_exchange_n( &stack.atom, &( &n )`next->atom, (Link(T))@{ {&n, ( &n )`next->count + 1} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) break; // attempt to update top node
226                        } // for
227                } // push
228
229                T * pop( StackLF(T) & this ) with(this) {
230                        Link(T) t @= stack;                                                     // atomic assignment unnecessary, or use CAA
231                        for () {                                                                        // busy wait
232                          if ( t.top == 0p ) return 0p;                         // empty stack ?
233                          if ( __atomic_compare_exchange_n( &stack.atom, &t.atom, (Link(T))@{ {( t.top )`next->top, t.count} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) return t.top; // attempt to update top node
234                        } // for
235                } // pop
236
237                bool unsafe_remove( StackLF(T) & this, T * node ) with(this) {
238                        Link(T) * link = &stack;
239                        for() {
240                                T * next = link->top;
241                                if( next == node ) {
242                                        link->top = ( node )`next->top;
243                                        return true;
244                                }
245                                if( next == 0p ) return false;
246                                link = ( next )`next;
247                        }
248                }
249        } // distribution
250} // distribution
Note: See TracBrowser for help on using the repository browser.