source: libcfa/src/collections/lockfree.hfa @ 5aeb1a9

Last change on this file since 5aeb1a9 was 55b060d, checked in by Peter A. Buhr <pabuhr@…>, 15 months ago

rename directories containers to collections

  • Property mode set to 100644
File size: 8.2 KB
Line 
1#pragma once
2
3#include <assert.h>
4
5#include <stdint.h>
6#include <bits/defs.hfa>
7
8forall( T &) {
9        //------------------------------------------------------------
10        // Queue based on the MCS lock
11        // It is a Multi-Producer/Single-Consumer queue threads pushing
12        // elements must hold on to the elements they push
13        // Not appropriate for an async message queue for example,
14        struct mcs_queue {
15                T * volatile tail;
16        };
17
18        static inline void ?{}(mcs_queue(T) & this) { this.tail = 0p; }
19        static inline bool empty(const mcs_queue(T) & this) { return !this.tail; }
20
21        static inline forall(| { T * volatile & ?`next ( T * ); })
22        {
23                // Adds an element to the list
24                // Multi-Thread Safe, Lock-Free
25                T * push(mcs_queue(T) & this, T * elem) __attribute__((artificial));
26                T * push(mcs_queue(T) & this, T * elem) {
27                        /* paranoid */ verify(!(elem`next));
28                        // Race to add to the tail
29                        T * prev = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST);
30                        // If we aren't the first, we need to tell the person before us
31                        // No need to
32                        if (prev) prev`next = elem;
33                        return prev;
34                }
35
36                // Advances the head of the list, dropping the element given.
37                // Passing an element that is not the head is undefined behavior
38                // NOT Multi-Thread Safe, concurrent pushes are safe
39                T * advance(mcs_queue(T) & this, T * elem) __attribute__((artificial));
40                T * advance(mcs_queue(T) & this, T * elem) {
41                        T * expected = elem;
42                        // Check if this is already the last item
43                        if (__atomic_compare_exchange_n(&this.tail, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return 0p;
44
45                        // If not wait for next item to show-up, filled by push
46                        while (!(elem`next)) Pause();
47
48                        // we need to return if the next link was empty
49                        T * ret = elem`next;
50
51                        // invalidate link to reset to initial state
52                        elem`next = 0p;
53                        return ret;
54                }
55        }
56
57        //------------------------------------------------------------
58        // Queue based on the MCS lock
59        // Extension of the above lock which supports 'blind' pops.
60        // i.e., popping a value from the head without knowing what the head is
61        // has no extra guarantees beyond the mcs_queue
62        struct mpsc_queue {
63                inline mcs_queue(T);
64                T * volatile head;
65        };
66
67        static inline void ?{}(mpsc_queue(T) & this) {
68                ((mcs_queue(T)&)this){};
69                this.head = 0p;
70        }
71
72        static inline forall(| { T * volatile & ?`next ( T * ); })
73        {
74                // Added a new element to the queue
75                // Multi-Thread Safe, Lock-Free
76                T * push(mpsc_queue(T) & this, T * elem) __attribute__((artificial));
77                T * push(mpsc_queue(T) & this, T * elem) {
78                        T * prev = push((mcs_queue(T)&)this, elem);
79                        if (!prev) this.head = elem;
80                        return prev;
81                }
82
83                // Pop an element from the queue
84                // return the element that was removed
85                // next is set to the new head of the queue
86                // NOT Multi-Thread Safe
87                T * pop(mpsc_queue(T) & this, T *& next) __attribute__((artificial));
88                T * pop(mpsc_queue(T) & this, T *& next) {
89                        T * elem = this.head;
90                        // If head is empty just return
91                        if (!elem) return 0p;
92
93                        // If there is already someone in the list, then it's easy
94                        if (elem`next) {
95                                this.head = next = elem`next;
96                                // force memory sync
97                                __atomic_thread_fence(__ATOMIC_SEQ_CST);
98
99                                // invalidate link to reset to initial state
100                                elem`next = 0p;
101                        }
102                        // Otherwise, there might be a race where it only looks but someone is enqueuing
103                        else {
104                                // null out head here, because we linearize with push
105                                // at the CAS in advance and therefore can write to head
106                                // after that point, it could overwrite the write in push
107                                this.head = 0p;
108                                next = advance((mcs_queue(T)&)this, elem);
109
110                                // Only write to the head if there is a next element
111                                // it is the only way we can guarantee we are not overwriting
112                                // a write made in push
113                                if (next) this.head = next;
114                        }
115
116                        // return removed element
117                        return elem;
118                }
119
120                // Same as previous function
121                T * pop(mpsc_queue(T) & this) {
122                        T * _ = 0p;
123                        return pop(this, _);
124                }
125        }
126
127        //------------------------------------------------------------
128        // Queue based on the MCS lock with poisoning
129        // It is a Multi-Producer/Single-Consumer queue threads pushing
130        // elements must hold on to the elements they push
131        // Not appropriate for an async message queue for example
132        // poisoning the queue prevents any new elements from being push
133        // enum(void*) poison_state {
134        //      EMPTY = 0p,
135        //      POISON = 1p,
136        //      IN_PROGRESS = 1p
137        // };
138
139        struct poison_list {
140                T * volatile head;
141        };
142
143        static inline void ?{}(poison_list(T) & this) { this.head = 0p; }
144        static inline bool is_poisoned( const poison_list(T) & this ) { return 1p == this.head; }
145
146        static inline forall(| { T * volatile & ?`next ( T * ); })
147        {
148                // Adds an element to the list
149                // Multi-Thread Safe, Lock-Free
150                bool push(poison_list(T) & this, T * elem) __attribute__((artificial));
151                bool push(poison_list(T) & this, T * elem) {
152                        /* paranoid */ verify(0p == (elem`next));
153                        __atomic_store_n( &elem`next, (T*)1p, __ATOMIC_RELAXED );
154
155                        // read the head up-front
156                        T * expected = this.head;
157                        for() {
158                                // check if it's poisoned
159                                if(expected == 1p) return false;
160
161                                // try to CAS the elem in
162                                if(__atomic_compare_exchange_n(&this.head, &expected, elem, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED)) {
163                                        // We managed to exchange in, we are done
164
165                                        // We should never succeed the CAS if it's poisonned and the elem should be 1p.
166                                        /* paranoid */ verify( expected  != 1p );
167                                        /* paranoid */ verify( elem`next == 1p );
168
169                                        // If we aren't the first, we need to tell the person before us
170                                        // No need to
171                                        elem`next = expected;
172                                        return true;
173                                }
174                        }
175                }
176
177                // Advances the head of the list, dropping the element given.
178                // Passing an element that is not the head is undefined behavior
179                // NOT Multi-Thread Safe, concurrent pushes are safe
180                T * advance(T * elem) __attribute__((artificial));
181                T * advance(T * elem) {
182                        T * ret;
183
184                        // Wait for next item to show-up, filled by push
185                        while (1p == (ret = __atomic_load_n(&elem`next, __ATOMIC_RELAXED))) Pause();
186
187                        return ret;
188                }
189
190                // Poison the queue, preveting new pushes and returning the head
191                T * poison(poison_list(T) & this) __attribute__((artificial));
192                T * poison(poison_list(T) & this) {
193                        T * ret = __atomic_exchange_n( &this.head, (T*)1p, __ATOMIC_SEQ_CST );
194                        /* paranoid */ verifyf( ret != (T*)1p, "Poison list %p poisoned more than once!", &this );
195                        return ret;
196                }
197        }
198}
199
200forall( T & )
201struct LinkData {
202        T * volatile top;                                                               // pointer to stack top
203        uintptr_t count;                                                                // count each push
204};
205
206forall( T & )
207union Link {
208        LinkData(T) data;
209        #if __SIZEOF_INT128__ == 16
210        __int128                                                                                        // gcc, 128-bit integer
211        #else
212        uint64_t                                                                                        // 64-bit integer
213        #endif // __SIZEOF_INT128__ == 16
214        atom;
215}; // Link
216
217forall( T | sized(T) | { Link(T) * ?`next( T * ); } ) {
218        struct StackLF {
219                Link(T) stack;
220        }; // StackLF
221
222        static inline {
223                void ?{}( StackLF(T) & this ) with(this) { stack.atom = 0; }
224
225                T * top( StackLF(T) & this ) with(this) { return stack.data.top; }
226
227                void push( StackLF(T) & this, T & n ) with(this) {
228                        *( &n )`next = stack;                                           // atomic assignment unnecessary, or use CAA
229                        for () {                                                                        // busy wait
230                                if ( __atomic_compare_exchange_n( &stack.atom, &( &n )`next->atom, (Link(T))@{ (LinkData(T))@{ &n, ( &n )`next->data.count + 1} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) break; // attempt to update top node
231                        } // for
232                } // push
233
234                T * pop( StackLF(T) & this ) with(this) {
235                        Link(T) t @= stack;                                                     // atomic assignment unnecessary, or use CAA
236                        for () {                                                                        // busy wait
237                                if ( t.data.top == 0p ) return 0p;                              // empty stack ?
238                                Link(T) * next = ( t.data.top )`next;
239                                if ( __atomic_compare_exchange_n( &stack.atom, &t.atom, (Link(T))@{ (LinkData(T))@{ next->data.top, t.data.count } }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) return t.data.top; // attempt to update top node
240                        } // for
241                } // pop
242
243                bool unsafe_remove( StackLF(T) & this, T * node ) with(this) {
244                        Link(T) * link = &stack;
245                        for () {
246                                // TODO: Avoiding some problems with double fields access.
247                                LinkData(T) * data = &link->data;
248                                T * next = (T *)&(*data).top;
249                                if ( next == node ) {
250                                        data->top = ( node )`next->data.top;
251                                        return true;
252                                }
253                                if ( next == 0p ) return false;
254                                link = ( next )`next;
255                        }
256                }
257        } // distribution
258} // distribution
Note: See TracBrowser for help on using the repository browser.