source: libcfa/src/collections/lockfree.hfa@ 21f4dff

Last change on this file since 21f4dff was 55b060d, checked in by Peter A. Buhr <pabuhr@…>, 2 years ago

rename directories containers to collections

  • Property mode set to 100644
File size: 8.2 KB
Line 
1#pragma once
2
3#include <assert.h>
4
5#include <stdint.h>
6#include <bits/defs.hfa>
7
8forall( T &) {
9 //------------------------------------------------------------
10 // Queue based on the MCS lock
11 // It is a Multi-Producer/Single-Consumer queue threads pushing
12 // elements must hold on to the elements they push
13 // Not appropriate for an async message queue for example,
14 struct mcs_queue {
15 T * volatile tail;
16 };
17
18 static inline void ?{}(mcs_queue(T) & this) { this.tail = 0p; }
19 static inline bool empty(const mcs_queue(T) & this) { return !this.tail; }
20
21 static inline forall(| { T * volatile & ?`next ( T * ); })
22 {
23 // Adds an element to the list
24 // Multi-Thread Safe, Lock-Free
25 T * push(mcs_queue(T) & this, T * elem) __attribute__((artificial));
26 T * push(mcs_queue(T) & this, T * elem) {
27 /* paranoid */ verify(!(elem`next));
28 // Race to add to the tail
29 T * prev = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST);
30 // If we aren't the first, we need to tell the person before us
31 // No need to
32 if (prev) prev`next = elem;
33 return prev;
34 }
35
36 // Advances the head of the list, dropping the element given.
37 // Passing an element that is not the head is undefined behavior
38 // NOT Multi-Thread Safe, concurrent pushes are safe
39 T * advance(mcs_queue(T) & this, T * elem) __attribute__((artificial));
40 T * advance(mcs_queue(T) & this, T * elem) {
41 T * expected = elem;
42 // Check if this is already the last item
43 if (__atomic_compare_exchange_n(&this.tail, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return 0p;
44
45 // If not wait for next item to show-up, filled by push
46 while (!(elem`next)) Pause();
47
48 // we need to return if the next link was empty
49 T * ret = elem`next;
50
51 // invalidate link to reset to initial state
52 elem`next = 0p;
53 return ret;
54 }
55 }
56
57 //------------------------------------------------------------
58 // Queue based on the MCS lock
59 // Extension of the above lock which supports 'blind' pops.
60 // i.e., popping a value from the head without knowing what the head is
61 // has no extra guarantees beyond the mcs_queue
62 struct mpsc_queue {
63 inline mcs_queue(T);
64 T * volatile head;
65 };
66
67 static inline void ?{}(mpsc_queue(T) & this) {
68 ((mcs_queue(T)&)this){};
69 this.head = 0p;
70 }
71
72 static inline forall(| { T * volatile & ?`next ( T * ); })
73 {
74 // Added a new element to the queue
75 // Multi-Thread Safe, Lock-Free
76 T * push(mpsc_queue(T) & this, T * elem) __attribute__((artificial));
77 T * push(mpsc_queue(T) & this, T * elem) {
78 T * prev = push((mcs_queue(T)&)this, elem);
79 if (!prev) this.head = elem;
80 return prev;
81 }
82
83 // Pop an element from the queue
84 // return the element that was removed
85 // next is set to the new head of the queue
86 // NOT Multi-Thread Safe
87 T * pop(mpsc_queue(T) & this, T *& next) __attribute__((artificial));
88 T * pop(mpsc_queue(T) & this, T *& next) {
89 T * elem = this.head;
90 // If head is empty just return
91 if (!elem) return 0p;
92
93 // If there is already someone in the list, then it's easy
94 if (elem`next) {
95 this.head = next = elem`next;
96 // force memory sync
97 __atomic_thread_fence(__ATOMIC_SEQ_CST);
98
99 // invalidate link to reset to initial state
100 elem`next = 0p;
101 }
102 // Otherwise, there might be a race where it only looks but someone is enqueuing
103 else {
104 // null out head here, because we linearize with push
105 // at the CAS in advance and therefore can write to head
106 // after that point, it could overwrite the write in push
107 this.head = 0p;
108 next = advance((mcs_queue(T)&)this, elem);
109
110 // Only write to the head if there is a next element
111 // it is the only way we can guarantee we are not overwriting
112 // a write made in push
113 if (next) this.head = next;
114 }
115
116 // return removed element
117 return elem;
118 }
119
120 // Same as previous function
121 T * pop(mpsc_queue(T) & this) {
122 T * _ = 0p;
123 return pop(this, _);
124 }
125 }
126
127 //------------------------------------------------------------
128 // Queue based on the MCS lock with poisoning
129 // It is a Multi-Producer/Single-Consumer queue threads pushing
130 // elements must hold on to the elements they push
131 // Not appropriate for an async message queue for example
132 // poisoning the queue prevents any new elements from being push
133 // enum(void*) poison_state {
134 // EMPTY = 0p,
135 // POISON = 1p,
136 // IN_PROGRESS = 1p
137 // };
138
139 struct poison_list {
140 T * volatile head;
141 };
142
143 static inline void ?{}(poison_list(T) & this) { this.head = 0p; }
144 static inline bool is_poisoned( const poison_list(T) & this ) { return 1p == this.head; }
145
146 static inline forall(| { T * volatile & ?`next ( T * ); })
147 {
148 // Adds an element to the list
149 // Multi-Thread Safe, Lock-Free
150 bool push(poison_list(T) & this, T * elem) __attribute__((artificial));
151 bool push(poison_list(T) & this, T * elem) {
152 /* paranoid */ verify(0p == (elem`next));
153 __atomic_store_n( &elem`next, (T*)1p, __ATOMIC_RELAXED );
154
155 // read the head up-front
156 T * expected = this.head;
157 for() {
158 // check if it's poisoned
159 if(expected == 1p) return false;
160
161 // try to CAS the elem in
162 if(__atomic_compare_exchange_n(&this.head, &expected, elem, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED)) {
163 // We managed to exchange in, we are done
164
165 // We should never succeed the CAS if it's poisonned and the elem should be 1p.
166 /* paranoid */ verify( expected != 1p );
167 /* paranoid */ verify( elem`next == 1p );
168
169 // If we aren't the first, we need to tell the person before us
170 // No need to
171 elem`next = expected;
172 return true;
173 }
174 }
175 }
176
177 // Advances the head of the list, dropping the element given.
178 // Passing an element that is not the head is undefined behavior
179 // NOT Multi-Thread Safe, concurrent pushes are safe
180 T * advance(T * elem) __attribute__((artificial));
181 T * advance(T * elem) {
182 T * ret;
183
184 // Wait for next item to show-up, filled by push
185 while (1p == (ret = __atomic_load_n(&elem`next, __ATOMIC_RELAXED))) Pause();
186
187 return ret;
188 }
189
190 // Poison the queue, preveting new pushes and returning the head
191 T * poison(poison_list(T) & this) __attribute__((artificial));
192 T * poison(poison_list(T) & this) {
193 T * ret = __atomic_exchange_n( &this.head, (T*)1p, __ATOMIC_SEQ_CST );
194 /* paranoid */ verifyf( ret != (T*)1p, "Poison list %p poisoned more than once!", &this );
195 return ret;
196 }
197 }
198}
199
200forall( T & )
201struct LinkData {
202 T * volatile top; // pointer to stack top
203 uintptr_t count; // count each push
204};
205
206forall( T & )
207union Link {
208 LinkData(T) data;
209 #if __SIZEOF_INT128__ == 16
210 __int128 // gcc, 128-bit integer
211 #else
212 uint64_t // 64-bit integer
213 #endif // __SIZEOF_INT128__ == 16
214 atom;
215}; // Link
216
217forall( T | sized(T) | { Link(T) * ?`next( T * ); } ) {
218 struct StackLF {
219 Link(T) stack;
220 }; // StackLF
221
222 static inline {
223 void ?{}( StackLF(T) & this ) with(this) { stack.atom = 0; }
224
225 T * top( StackLF(T) & this ) with(this) { return stack.data.top; }
226
227 void push( StackLF(T) & this, T & n ) with(this) {
228 *( &n )`next = stack; // atomic assignment unnecessary, or use CAA
229 for () { // busy wait
230 if ( __atomic_compare_exchange_n( &stack.atom, &( &n )`next->atom, (Link(T))@{ (LinkData(T))@{ &n, ( &n )`next->data.count + 1} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) break; // attempt to update top node
231 } // for
232 } // push
233
234 T * pop( StackLF(T) & this ) with(this) {
235 Link(T) t @= stack; // atomic assignment unnecessary, or use CAA
236 for () { // busy wait
237 if ( t.data.top == 0p ) return 0p; // empty stack ?
238 Link(T) * next = ( t.data.top )`next;
239 if ( __atomic_compare_exchange_n( &stack.atom, &t.atom, (Link(T))@{ (LinkData(T))@{ next->data.top, t.data.count } }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) return t.data.top; // attempt to update top node
240 } // for
241 } // pop
242
243 bool unsafe_remove( StackLF(T) & this, T * node ) with(this) {
244 Link(T) * link = &stack;
245 for () {
246 // TODO: Avoiding some problems with double fields access.
247 LinkData(T) * data = &link->data;
248 T * next = (T *)&(*data).top;
249 if ( next == node ) {
250 data->top = ( node )`next->data.top;
251 return true;
252 }
253 if ( next == 0p ) return false;
254 link = ( next )`next;
255 }
256 }
257 } // distribution
258} // distribution
Note: See TracBrowser for help on using the repository browser.