source: libcfa/src/collections/lockfree.hfa@ 6b33e89

Last change on this file since 6b33e89 was 6b33e89, checked in by Peter A. Buhr <pabuhr@…>, 5 months ago

change backquote call to regular call

  • Property mode set to 100644
File size: 8.3 KB
Line 
1#pragma once
2
3#include <assert.h>
4
5#include <stdint.h>
6#include <bits/defs.hfa>
7
8forall( T & ) {
9 //------------------------------------------------------------
10 // Queue based on the MCS lock
11 // It is a Multi-Producer/Single-Consumer queue threads pushing
12 // elements must hold on to the elements they push
13 // Not appropriate for an async message queue for example,
14 struct mcs_queue {
15 T * volatile tail;
16 };
17
18 static inline void ?{}( mcs_queue(T) & this ) { this.tail = 0p; }
19 static inline bool empty( const mcs_queue(T) & this ) { return ! this.tail; }
20
21 static inline forall( | { T * volatile & next ( T * ); }) {
22 // Adds an element to the list
23 // Multi-Thread Safe, Lock-Free
24 T * push( mcs_queue(T) & this, T * elem ) __attribute__((artificial));
25 T * push( mcs_queue(T) & this, T * elem ) {
26 /* paranoid */ verify( ! next( elem ) );
27 // Race to add to the tail
28 T * prev_val = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST);
29 // If we aren't the first, we need to tell the person before us
30 // No need to
31 if ( prev_val ) next( prev_val ) = elem;
32 return prev_val;
33 }
34
35 // Advances the head of the list, dropping the element given.
36 // Passing an element that is not the head is undefined behavior
37 // NOT Multi-Thread Safe, concurrent pushes are safe
38 T * advance( mcs_queue(T) & this, T * elem ) __attribute__((artificial));
39 T * advance( mcs_queue(T) & this, T * elem ) {
40 T * expected = elem;
41 // Check if this is already the last item
42 if (__atomic_compare_exchange_n(&this.tail, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return 0p;
43
44 // If not wait for next item to show-up, filled by push
45 while ( ! next( elem ) ) Pause();
46
47 // we need to return if the next link was empty
48 T * ret = next( elem );
49
50 // invalidate link to reset to initial state
51 next( elem ) = 0p;
52 return ret;
53 }
54 }
55
56 //------------------------------------------------------------
57 // Queue based on the MCS lock
58 // Extension of the above lock which supports 'blind' pops.
59 // i.e., popping a value from the head without knowing what the head is
60 // has no extra guarantees beyond the mcs_queue
61 struct mpsc_queue {
62 inline mcs_queue(T);
63 T * volatile head;
64 };
65
66 static inline void ?{}( mpsc_queue(T) & this ) {
67 ((mcs_queue(T)&)this){};
68 this.head = 0p;
69 }
70
71 static inline forall( | { T * volatile & next ( T * ); }) {
72 // Added a new element to the queue
73 // Multi-Thread Safe, Lock-Free
74 T * push( mpsc_queue(T) & this, T * elem ) __attribute__((artificial));
75 T * push( mpsc_queue(T) & this, T * elem ) {
76 T * prev_val = push( (mcs_queue(T)&)this, elem );
77 if ( ! prev_val ) this.head = elem;
78 return prev_val;
79 }
80
81 // Pop an element from the queue
82 // return the element that was removed
83 // head is set to the new head of the queue
84 // NOT Multi-Thread Safe
85 T * pop( mpsc_queue(T) & this, T *& head ) __attribute__((artificial));
86 T * pop( mpsc_queue(T) & this, T *& head ) {
87 T * elem = this.head;
88 // If head is empty just return
89 if ( ! elem ) return 0p;
90
91 // If there is already someone in the list, then it's easy
92 if ( next( elem ) ) {
93 this.head = head = next( elem );
94 // force memory sync
95 __atomic_thread_fence(__ATOMIC_SEQ_CST);
96
97 // invalidate link to reset to initial state
98 next( elem ) = 0p;
99 }
100 // Otherwise, there might be a race where it only looks but someone is enqueuing
101 else {
102 // null out head here, because we linearize with push
103 // at the CAS in advance and therefore can write to head
104 // after that point, it could overwrite the write in push
105 this.head = 0p;
106 head = advance( (mcs_queue(T)&)this, elem );
107
108 // Only write to the head if there is a next element
109 // it is the only way we can guarantee we are not overwriting
110 // a write made in push
111 if ( head ) this.head = head;
112 }
113 // return removed element
114 return elem;
115 }
116
117 // Same as previous function
118 T * pop( mpsc_queue(T) & this ) {
119 T * _ = 0p;
120 return pop(this, _);
121 }
122 }
123
124 //------------------------------------------------------------
125 // Queue based on the MCS lock with poisoning
126 // It is a Multi-Producer/Single-Consumer queue threads pushing
127 // elements must hold on to the elements they push
128 // Not appropriate for an async message queue for example
129 // poisoning the queue prevents any new elements from being push
130 // enum(void*) poison_state {
131 // EMPTY = 0p,
132 // POISON = 1p,
133 // IN_PROGRESS = 1p
134 // };
135
136 struct poison_list {
137 T * volatile head;
138 };
139
140 static inline void ?{}(poison_list(T) & this) { this.head = 0p; }
141 static inline bool is_poisoned( const poison_list(T) & this ) { return 1p == this.head; }
142
143 static inline forall( | { T * volatile & next( T * ); })
144 {
145 // Adds an element to the list
146 // Multi-Thread Safe, Lock-Free
147 bool push( poison_list(T) & this, T * elem ) __attribute__((artificial));
148 bool push( poison_list(T) & this, T * elem ) {
149 /* paranoid */ verify( 0p == next( elem ) );
150 __atomic_store_n( &next( elem ), (T *)1p, __ATOMIC_RELAXED );
151
152 // read the head up-front
153 T * expected = this.head;
154 for() {
155 // check if it's poisoned
156 if(expected == 1p) return false;
157
158 // try to CAS the elem in
159 if(__atomic_compare_exchange_n(&this.head, &expected, elem, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED)) {
160 // We managed to exchange in, we are done
161
162 // We should never succeed the CAS if it's poisonned and the elem should be 1p.
163 /* paranoid */ verify( expected != 1p );
164 /* paranoid */ verify( next( elem ) == 1p );
165
166 // If we aren't the first, we need to tell the person before us
167 // No need to
168 next( elem ) = expected;
169 return true;
170 }
171 }
172 }
173
174 // Advances the head of the list, dropping the element given.
175 // Passing an element that is not the head is undefined behavior
176 // NOT Multi-Thread Safe, concurrent pushes are safe
177 T * advance( T * elem ) __attribute__((artificial));
178 T * advance( T * elem ) {
179 T * ret;
180
181 // Wait for next item to show-up, filled by push
182 while (1p == (ret = __atomic_load_n( &next( elem ), __ATOMIC_RELAXED ) ) ) Pause();
183
184 return ret;
185 }
186
187 // Poison the queue, preveting new pushes and returning the head
188 T * poison( poison_list(T) & this ) __attribute__((artificial));
189 T * poison( poison_list(T) & this ) {
190 T * ret = __atomic_exchange_n( &this.head, (T*)1p, __ATOMIC_SEQ_CST );
191 /* paranoid */ verifyf( ret != (T*)1p, "Poison list %p poisoned more than once!", &this );
192 return ret;
193 }
194 }
195}
196
197forall( T & )
198struct LinkData {
199 T * volatile top; // pointer to stack top
200 uintptr_t count; // count each push
201};
202
203forall( T & )
204union Link {
205 LinkData(T) data;
206 #if __SIZEOF_INT128__ == 16
207 __int128 // gcc, 128-bit integer
208 #else
209 uint64_t // 64-bit integer
210 #endif // __SIZEOF_INT128__ == 16
211 atom;
212}; // Link
213
214forall( T /*| sized(T)*/ | { Link(T) * next( T * ); } ) {
215 struct StackLF {
216 Link(T) stack;
217 }; // StackLF
218
219 static inline {
220 void ?{}( StackLF(T) & this ) with(this) { stack.atom = 0; }
221
222 T * top( StackLF(T) & this ) with(this) { return stack.data.top; }
223
224 void push( StackLF(T) & this, T & n ) with(this) {
225 *next( &n ) = stack; // atomic assignment unnecessary, or use CAA
226 for () { // busy wait
227 if ( __atomic_compare_exchange_n( &stack.atom, &next( &n )->atom, (Link(T))@{ (LinkData(T))@{ &n, next( &n )->data.count + 1} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) break; // attempt to update top node
228 } // for
229 } // push
230
231 T * pop( StackLF(T) & this ) with(this) {
232 Link(T) t @= stack; // atomic assignment unnecessary, or use CAA
233 for () { // busy wait
234 if ( t.data.top == 0p ) return 0p; // empty stack ?
235 Link(T) * next = next( t.data.top );
236 if ( __atomic_compare_exchange_n( &stack.atom, &t.atom, (Link(T))@{ (LinkData(T))@{ next->data.top, t.data.count } }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) return t.data.top; // attempt to update top node
237 } // for
238 } // pop
239
240 bool unsafe_remove( StackLF(T) & this, T * node ) with(this) {
241 Link(T) * link = &stack;
242 for () {
243 // TODO: Avoiding some problems with double fields access.
244 LinkData(T) * data = &link->data;
245 T * ntop = (T *)&(*data).top;
246 if ( ntop == node ) {
247 data->top = next( node )->data.top;
248 return true;
249 }
250 if ( ntop == 0p ) return false;
251 link = next( ntop );
252 }
253 }
254 } // distribution
255} // distribution
Note: See TracBrowser for help on using the repository browser.