source: doc/theses/thierry_delisle_PhD/code/relaxed_list.hpp@ 09f357ec

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation jenkins-sandbox new-ast new-ast-unique-expr pthread-emulation qualifiedEnum stuck-waitfor-destruct
Last change on this file since 09f357ec was c921712, checked in by Thierry Delisle <tdelisle@…>, 6 years ago

Fixed support for setting number of starting nodes

  • Property mode set to 100644
File size: 6.2 KB
Line 
1#pragma once
2
3#ifndef NO_STATS
4#include <iostream>
5#endif
6
7#include <memory>
8#include <mutex>
9#include <type_traits>
10
11#include "assert.hpp"
12#include "utils.hpp"
13
14using namespace std;
15
16struct spinlock_t {
17 std::atomic_bool ll = { false };
18
19 inline void lock() {
20 while( __builtin_expect(ll.exchange(true),false) ) {
21 while(ll.load(std::memory_order_relaxed))
22 asm volatile("pause");
23 }
24 }
25
26 inline bool try_lock() {
27 return false == ll.exchange(true);
28 }
29
30 inline void unlock() {
31 ll.store(false, std::memory_order_release);
32 }
33
34 inline explicit operator bool() {
35 return ll.load(std::memory_order_relaxed);
36 }
37};
38
39
40extern bool enable_stats;
41
42struct pick_stat {
43 struct {
44 size_t attempt = 0;
45 size_t success = 0;
46 } push;
47 struct {
48 size_t attempt = 0;
49 size_t success = 0;
50 } pop;
51};
52
53template<typename node_t>
54struct _LinksFields_t {
55 node_t * prev = nullptr;
56 node_t * next = nullptr;
57 unsigned long long ts = 0;
58};
59
60template<typename node_t>
61class __attribute__((aligned(128))) relaxed_list {
62 static_assert(std::is_same<decltype(node_t::_links), _LinksFields_t<node_t>>::value, "Node must have a links field");
63
64
65public:
66 relaxed_list(unsigned numLists)
67 : numNonEmpty{0}
68 , lists(new intrusive_queue_t[numLists])
69 , numLists(numLists)
70 {}
71
72 ~relaxed_list() {
73 lists.reset();
74 #ifndef NO_STATS
75 std::cout << "Difference : "
76 << ssize_t(double(intrusive_queue_t::stat::dif.value) / intrusive_queue_t::stat::dif.num ) << " avg\t"
77 << intrusive_queue_t::stat::dif.max << "max" << std::endl;
78 #endif
79 }
80
81 __attribute__((noinline, hot)) void push(node_t * node) {
82 node->_links.ts = rdtscl();
83
84 while(true) {
85 // Pick a random list
86 int i = tls.rng.next() % numLists;
87
88 #ifndef NO_STATS
89 tls.pick.push.attempt++;
90 #endif
91
92 // If we can't lock it retry
93 if( !lists[i].lock.try_lock() ) continue;
94
95 // Actually push it
96 lists[i].push(node, numNonEmpty);
97 assert(numNonEmpty <= (int)numLists);
98
99 // Unlock and return
100 lists[i].lock.unlock();
101
102 #ifndef NO_STATS
103 tls.pick.push.success++;
104 #endif
105 return;
106 }
107 }
108
109 __attribute__((noinline, hot)) node_t * pop() {
110 while(numNonEmpty != 0) {
111 // Pick two lists at random
112 int i = tls.rng.next() % numLists;
113 int j = tls.rng.next() % numLists;
114
115 #ifndef NO_STATS
116 tls.pick.pop.attempt++;
117 #endif
118
119 // Pick the bet list
120 int w = i;
121 if( __builtin_expect(lists[j].ts() != 0, true) ) {
122 w = (lists[i].ts() < lists[j].ts()) ? i : j;
123 }
124
125 auto & list = lists[w];
126 // If list looks empty retry
127 if( list.ts() == 0 ) continue;
128
129 // If we can't get the lock retry
130 if( !list.lock.try_lock() ) continue;
131
132 // If list is empty, unlock and retry
133 if( list.ts() == 0 ) {
134 list.lock.unlock();
135 continue;
136 }
137
138 // Actually pop the list
139 auto node = list.pop(numNonEmpty);
140 assert(node);
141
142 // Unlock and return
143 list.lock.unlock();
144 assert(numNonEmpty >= 0);
145 #ifndef NO_STATS
146 tls.pick.pop.success++;
147 #endif
148 return node;
149 }
150
151 return nullptr;
152 }
153
154private:
155
156 class __attribute__((aligned(128))) intrusive_queue_t {
157 public:
158 typedef spinlock_t lock_t;
159
160 friend class relaxed_list<node_t>;
161
162 struct stat {
163 ssize_t diff = 0;
164
165 static struct Dif {
166 ssize_t value = 0;
167 size_t num = 0;
168 ssize_t max = 0;
169 } dif;
170 };
171
172 private:
173 struct sentinel_t {
174 _LinksFields_t<node_t> _links;
175 };
176
177 lock_t lock;
178 sentinel_t before;
179 sentinel_t after;
180 stat s;
181
182 static constexpr auto fields_offset = offsetof( node_t, _links );
183 public:
184 intrusive_queue_t()
185 : before{{ nullptr, tail() }}
186 , after {{ head(), nullptr }}
187 {
188 assert((reinterpret_cast<uintptr_t>( head() ) + fields_offset) == reinterpret_cast<uintptr_t>(&before));
189 assert((reinterpret_cast<uintptr_t>( tail() ) + fields_offset) == reinterpret_cast<uintptr_t>(&after ));
190 assert(head()->_links.prev == nullptr);
191 assert(head()->_links.next == tail() );
192 assert(tail()->_links.next == nullptr);
193 assert(tail()->_links.prev == head() );
194 assert(sizeof(*this) == 128);
195 assert((intptr_t(this) % 128) == 0);
196 }
197
198 ~intrusive_queue_t() {
199 #ifndef NO_STATS
200 stat::dif.value+= s.diff;
201 stat::dif.num ++;
202 stat::dif.max = std::abs(stat::dif.max) > std::abs(s.diff) ? stat::dif.max : s.diff;
203 #endif
204 }
205
206 inline node_t * head() const {
207 node_t * rhead = reinterpret_cast<node_t *>(
208 reinterpret_cast<uintptr_t>( &before ) - fields_offset
209 );
210 assert(rhead);
211 return rhead;
212 }
213
214 inline node_t * tail() const {
215 node_t * rtail = reinterpret_cast<node_t *>(
216 reinterpret_cast<uintptr_t>( &after ) - fields_offset
217 );
218 assert(rtail);
219 return rtail;
220 }
221
222 inline void push(node_t * node, std::atomic_int & nonEmpty) {
223 assert(lock);
224 assert(node->_links.ts != 0);
225 node_t * tail = this->tail();
226
227 node_t * prev = tail->_links.prev;
228 // assertf(node->_links.ts >= prev->_links.ts,
229 // "New node has smaller timestamp: %llu < %llu", node->_links.ts, prev->_links.ts);
230 node->_links.next = tail;
231 node->_links.prev = prev;
232 prev->_links.next = node;
233 tail->_links.prev = node;
234 if(before._links.ts == 0l) {
235 nonEmpty += 1;
236 before._links.ts = node->_links.ts;
237 }
238 #ifndef NO_STATS
239 if(enable_stats) s.diff++;
240 #endif
241 }
242
243 inline node_t * pop(std::atomic_int & nonEmpty) {
244 assert(lock);
245 node_t * head = this->head();
246 node_t * tail = this->tail();
247
248 node_t * node = head->_links.next;
249 node_t * next = node->_links.next;
250 if(node == tail) return nullptr;
251
252 head->_links.next = next;
253 next->_links.prev = head;
254
255 if(next == tail) {
256 before._links.ts = 0l;
257 nonEmpty -= 1;
258 }
259 else {
260 assert(next->_links.ts != 0);
261 before._links.ts = next->_links.ts;
262 assert(before._links.ts != 0);
263 }
264 #ifndef NO_STATS
265 if(enable_stats) s.diff--;
266 #endif
267 return node;
268 }
269
270 long long ts() const {
271 return before._links.ts;
272 }
273 };
274
275
276public:
277
278 static __attribute__((aligned(128))) thread_local struct TLS {
279 Random rng = { int(rdtscl()) };
280 pick_stat pick;
281 } tls;
282
283private:
284 std::atomic_int numNonEmpty; // number of non-empty lists
285 __attribute__((aligned(64))) std::unique_ptr<intrusive_queue_t []> lists;
286 const unsigned numLists;
287
288public:
289 static const constexpr size_t sizeof_queue = sizeof(intrusive_queue_t);
290};
Note: See TracBrowser for help on using the repository browser.