source: doc/theses/thierry_delisle_PhD/code/readyQ_proto/work_stealing.hpp@ a1b9bc3

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since a1b9bc3 was a1b9bc3, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Many small changes to prototype code

  • Property mode set to 100644
File size: 10.1 KB
Line 
1#pragma once
2#define LIST_VARIANT work_stealing
3
4#include <cmath>
5#include <iomanip>
6#include <memory>
7#include <mutex>
8#include <thread>
9#include <type_traits>
10
11#include "assert.hpp"
12#include "utils.hpp"
13#include "links.hpp"
14#include "links2.hpp"
15#include "snzi.hpp"
16
17#include <x86intrin.h>
18
19using namespace std;
20
21static const long long lim = 2000;
22static const unsigned nqueues = 2;
23
24struct __attribute__((aligned(128))) timestamp_t {
25 volatile unsigned long long val = 0;
26};
27
28template<typename node_t>
29struct __attribute__((aligned(128))) localQ_t {
30 mpsc_queue<node_t> queue = {};
31 spinlock_t lock = {};
32 bool needs_help = true;
33};
34
35template<typename node_t>
36class __attribute__((aligned(128))) work_stealing {
37 static_assert(std::is_same<decltype(node_t::_links), _LinksFields_t<node_t>>::value, "Node must have a links field");
38
39public:
40 static const char * name() {
41 return "Work Stealing";
42 }
43
44 work_stealing(unsigned _numThreads, unsigned)
45 : numThreads(_numThreads * nqueues)
46 , lists(new intrusive_queue_t<node_t>[numThreads])
47 , times(new timestamp_t[numThreads])
48 // , snzi( std::log2( numThreads / 2 ), 2 )
49
50 {
51 std::cout << "Constructing Work Stealer with " << numThreads << std::endl;
52 }
53
54 ~work_stealing() {
55 std::cout << "Destroying Work Stealer" << std::endl;
56 lists.reset();
57 }
58
59 __attribute__((noinline, hot)) void push(node_t * node) {
60 // node->_links.ts = rdtscl();
61 node->_links.ts = 1;
62
63 auto & list = *({
64 unsigned i;
65 do {
66 tls.stats.push.attempt++;
67 // unsigned r = tls.rng1.next();
68 unsigned r = tls.it++;
69 if(tls.my_queue == outside) {
70 i = r % numThreads;
71 } else {
72 i = tls.my_queue + (r % nqueues);
73 }
74 } while(!lists[i].lock.try_lock());
75 &lists[i];
76 });
77
78 list.push( node );
79 list.lock.unlock();
80 // tls.rng2.set_raw_state( tls.rng1.get_raw_state());
81 // count++;
82 tls.stats.push.success++;
83 }
84
85 __attribute__((noinline, hot)) node_t * pop() {
86 if( tls.myfriend == outside ) {
87 auto r = tls.rng1.next();
88 tls.myfriend = r % numThreads;
89 times[tls.myfriend].val = 0;
90 }
91 else if(times[tls.myfriend].val == 0) {
92 node_t * n = try_pop(tls.myfriend, tls.stats.pop.help);
93 tls.stats.help++;
94 tls.myfriend = outside;
95 if(n) return n;
96 }
97
98 if(tls.my_queue != outside) {
99 node_t * n = local();
100 if(n) return n;
101 }
102
103 // try steal
104 for(int i = 0; i < 25; i++) {
105 node_t * n = steal();
106 if(n) return n;
107 }
108
109 return search();
110 }
111
112private:
113 inline node_t * local() {
114 // unsigned i = (tls.rng2.prev() % 4) + tls.my_queue;
115 unsigned i = (--tls.it % nqueues) + tls.my_queue;
116 return try_pop(i, tls.stats.pop.local);
117 }
118
119 inline node_t * steal() {
120 unsigned i = tls.rng2.prev() % numThreads;
121 return try_pop(i, tls.stats.pop.steal);
122 }
123
124 inline node_t * search() {
125 unsigned offset = tls.rng2.prev();
126 for(unsigned i = 0; i < numThreads; i++) {
127 unsigned idx = (offset + i) % numThreads;
128 node_t * thrd = try_pop(idx, tls.stats.pop.search);
129 if(thrd) {
130 return thrd;
131 }
132 }
133
134 return nullptr;
135 }
136
137private:
138 struct attempt_stat_t {
139 std::size_t attempt = { 0 };
140 std::size_t elock = { 0 };
141 std::size_t eempty = { 0 };
142 std::size_t espec = { 0 };
143 std::size_t success = { 0 };
144 };
145
146 node_t * try_pop(unsigned i, attempt_stat_t & stat) {
147 assert(i < numThreads);
148 auto & list = lists[i];
149 stat.attempt++;
150
151 // If the list is empty, don't try
152 if(list.ts() == 0) { stat.espec++; return nullptr; }
153
154 // If we can't get the lock, move on
155 if( !list.lock.try_lock() ) { stat.elock++; return nullptr; }
156
157
158 // If list is empty, unlock and retry
159 if( list.ts() == 0 ) {
160 list.lock.unlock();
161 stat.eempty++;
162 return nullptr;
163 }
164
165 auto node = list.pop();
166 list.lock.unlock();
167 stat.success++;
168 times[i].val = 1; //node.first->_links.ts;
169 // count--;
170 // _mm_stream_si64((long long int*)&times[i].val, node.first->_links.ts);
171 return node.first;
172 }
173
174
175public:
176
177 static std::atomic_uint32_t ticket;
178 static const unsigned outside = 0xFFFFFFFF;
179
180 static inline unsigned calc_preferred() {
181 unsigned t = ticket++;
182 if(t == 0) return outside;
183 unsigned i = (t - 1) * nqueues;
184 return i;
185 }
186
187 static __attribute__((aligned(128))) thread_local struct TLS {
188 Random rng1 = { unsigned(std::hash<std::thread::id>{}(std::this_thread::get_id()) ^ rdtscl()) };
189 Random rng2 = { unsigned(std::hash<std::thread::id>{}(std::this_thread::get_id()) ^ rdtscl()) };
190 unsigned it = 0;
191 unsigned my_queue = calc_preferred();
192 unsigned myfriend = outside;
193 #if defined(READ)
194 unsigned it = 0;
195 #endif
196 struct {
197 struct {
198 std::size_t attempt = { 0 };
199 std::size_t success = { 0 };
200 } push;
201 struct {
202 attempt_stat_t help;
203 attempt_stat_t local;
204 attempt_stat_t steal;
205 attempt_stat_t search;
206 } pop;
207 std::size_t help = { 0 };
208 } stats;
209 } tls;
210
211private:
212 const unsigned numThreads;
213 std::unique_ptr<intrusive_queue_t<node_t> []> lists;
214 std::unique_ptr<timestamp_t []> times;
215 __attribute__((aligned(128))) std::atomic_size_t count;
216
217#ifndef NO_STATS
218private:
219 static struct GlobalStats {
220 struct {
221 std::atomic_size_t attempt = { 0 };
222 std::atomic_size_t success = { 0 };
223 } push;
224 struct {
225 struct {
226 std::atomic_size_t attempt = { 0 };
227 std::atomic_size_t elock = { 0 };
228 std::atomic_size_t eempty = { 0 };
229 std::atomic_size_t espec = { 0 };
230 std::atomic_size_t success = { 0 };
231 } help;
232 struct {
233 std::atomic_size_t attempt = { 0 };
234 std::atomic_size_t elock = { 0 };
235 std::atomic_size_t eempty = { 0 };
236 std::atomic_size_t espec = { 0 };
237 std::atomic_size_t success = { 0 };
238 } local;
239 struct {
240 std::atomic_size_t attempt = { 0 };
241 std::atomic_size_t elock = { 0 };
242 std::atomic_size_t eempty = { 0 };
243 std::atomic_size_t espec = { 0 };
244 std::atomic_size_t success = { 0 };
245 } steal;
246 struct {
247 std::atomic_size_t attempt = { 0 };
248 std::atomic_size_t elock = { 0 };
249 std::atomic_size_t eempty = { 0 };
250 std::atomic_size_t espec = { 0 };
251 std::atomic_size_t success = { 0 };
252 } search;
253 } pop;
254 std::atomic_size_t help = { 0 };
255 } global_stats;
256
257public:
258 static void stats_tls_tally() {
259 global_stats.push.attempt += tls.stats.push.attempt;
260 global_stats.push.success += tls.stats.push.success;
261 global_stats.pop.help .attempt += tls.stats.pop.help .attempt;
262 global_stats.pop.help .elock += tls.stats.pop.help .elock ;
263 global_stats.pop.help .eempty += tls.stats.pop.help .eempty ;
264 global_stats.pop.help .espec += tls.stats.pop.help .espec ;
265 global_stats.pop.help .success += tls.stats.pop.help .success;
266 global_stats.pop.local .attempt += tls.stats.pop.local .attempt;
267 global_stats.pop.local .elock += tls.stats.pop.local .elock ;
268 global_stats.pop.local .eempty += tls.stats.pop.local .eempty ;
269 global_stats.pop.local .espec += tls.stats.pop.local .espec ;
270 global_stats.pop.local .success += tls.stats.pop.local .success;
271 global_stats.pop.steal .attempt += tls.stats.pop.steal .attempt;
272 global_stats.pop.steal .elock += tls.stats.pop.steal .elock ;
273 global_stats.pop.steal .eempty += tls.stats.pop.steal .eempty ;
274 global_stats.pop.steal .espec += tls.stats.pop.steal .espec ;
275 global_stats.pop.steal .success += tls.stats.pop.steal .success;
276 global_stats.pop.search.attempt += tls.stats.pop.search.attempt;
277 global_stats.pop.search.elock += tls.stats.pop.search.elock ;
278 global_stats.pop.search.eempty += tls.stats.pop.search.eempty ;
279 global_stats.pop.search.espec += tls.stats.pop.search.espec ;
280 global_stats.pop.search.success += tls.stats.pop.search.success;
281 global_stats.help += tls.stats.help;
282 }
283
284 static void stats_print(std::ostream & os, double duration ) {
285 std::cout << "----- Work Stealing Stats -----" << std::endl;
286
287 double push_suc = (100.0 * double(global_stats.push.success) / global_stats.push.attempt);
288 double push_len = double(global_stats.push.attempt ) / global_stats.push.success;
289 os << "Push Pick : " << push_suc << " %, len " << push_len << " (" << global_stats.push.attempt << " / " << global_stats.push.success << ")\n";
290
291 double hlp_suc = (100.0 * double(global_stats.pop.help.success) / global_stats.pop.help.attempt);
292 double hlp_len = double(global_stats.pop.help.attempt ) / global_stats.pop.help.success;
293 os << "Help : " << hlp_suc << " %, len " << hlp_len << " (" << global_stats.pop.help.attempt << " / " << global_stats.pop.help.success << ")\n";
294 os << "Help Fail : " << global_stats.pop.help.espec << "s, " << global_stats.pop.help.eempty << "e, " << global_stats.pop.help.elock << "l\n";
295
296 double pop_suc = (100.0 * double(global_stats.pop.local.success) / global_stats.pop.local.attempt);
297 double pop_len = double(global_stats.pop.local.attempt ) / global_stats.pop.local.success;
298 os << "Local : " << pop_suc << " %, len " << pop_len << " (" << global_stats.pop.local.attempt << " / " << global_stats.pop.local.success << ")\n";
299 os << "Local Fail : " << global_stats.pop.local.espec << "s, " << global_stats.pop.local.eempty << "e, " << global_stats.pop.local.elock << "l\n";
300
301 double stl_suc = (100.0 * double(global_stats.pop.steal.success) / global_stats.pop.steal.attempt);
302 double stl_len = double(global_stats.pop.steal.attempt ) / global_stats.pop.steal.success;
303 os << "Steal : " << stl_suc << " %, len " << stl_len << " (" << global_stats.pop.steal.attempt << " / " << global_stats.pop.steal.success << ")\n";
304 os << "Steal Fail : " << global_stats.pop.steal.espec << "s, " << global_stats.pop.steal.eempty << "e, " << global_stats.pop.steal.elock << "l\n";
305
306 double srh_suc = (100.0 * double(global_stats.pop.search.success) / global_stats.pop.search.attempt);
307 double srh_len = double(global_stats.pop.search.attempt ) / global_stats.pop.search.success;
308 os << "Search : " << srh_suc << " %, len " << srh_len << " (" << global_stats.pop.search.attempt << " / " << global_stats.pop.search.success << ")\n";
309 os << "Search Fail : " << global_stats.pop.search.espec << "s, " << global_stats.pop.search.eempty << "e, " << global_stats.pop.search.elock << "l\n";
310 os << "Helps : " << std::setw(15) << std::scientific << global_stats.help / duration << "/sec (" << global_stats.help << ")\n";
311 }
312private:
313#endif
314};
Note: See TracBrowser for help on using the repository browser.