source: doc/theses/thierry_delisle_PhD/code/readyQ_proto/work_stealing.hpp @ 8590328

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 8590328 was 634a5c2, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

New changes to the prototype with Andrew's comments

  • Property mode set to 100644
File size: 11.2 KB
Line 
1#pragma once
2#define LIST_VARIANT work_stealing
3
4#include <cmath>
5#include <iomanip>
6#include <memory>
7#include <mutex>
8#include <thread>
9#include <type_traits>
10
11#include "assert.hpp"
12#include "utils.hpp"
13#include "links.hpp"
14#include "links2.hpp"
15#include "snzi.hpp"
16
17// #include <x86intrin.h>
18
19using namespace std;
20
21static const long long lim = 2000;
22static const unsigned nqueues = 2;
23
24struct __attribute__((aligned(128))) timestamp_t {
25        volatile unsigned long long val = 0;
26};
27
28template<typename node_t>
29struct __attribute__((aligned(128))) localQ_t {
30        // volatile unsigned long long val = 0;
31        // mpsc_queue<node_t> queue = {};
32        // spinlock_t lock = {};
33        intrusive_queue_t<node_t> list;
34
35        inline auto ts() { return list.ts(); }
36        inline auto lock() { return list.lock.lock(); }
37        inline auto try_lock() { return list.lock.try_lock(); }
38        inline auto unlock() { return list.lock.unlock(); }
39
40        inline auto push( node_t * node ) { return list.push( node ); }
41        inline auto pop() { return list.pop(); }
42};
43
44template<typename node_t>
45class __attribute__((aligned(128))) work_stealing {
46        static_assert(std::is_same<decltype(node_t::_links), _LinksFields_t<node_t>>::value, "Node must have a links field");
47
48public:
49        static const char * name() {
50                return "Work Stealing";
51        }
52
53        work_stealing(unsigned _numThreads, unsigned)
54                : numThreads(_numThreads * nqueues)
55                , lists(new localQ_t<node_t>[numThreads])
56                // , lists(new intrusive_queue_t<node_t>[numThreads])
57                , times(new timestamp_t[numThreads])
58                // , snzi( std::log2( numThreads / 2 ), 2 )
59
60        {
61                std::cout << "Constructing Work Stealer with " << numThreads << std::endl;
62        }
63
64        ~work_stealing() {
65                std::cout << "Destroying Work Stealer" << std::endl;
66                lists.reset();
67        }
68
69        __attribute__((noinline, hot)) void push(node_t * node) {
70                node->_links.ts = rdtscl();
71                // node->_links.ts = 1;
72
73                auto & list = *({
74                        unsigned i;
75                        do {
76                                tls.stats.push.attempt++;
77                                // unsigned r = tls.rng1.next();
78                                unsigned r = tls.it++;
79                                if(tls.my_queue == outside) {
80                                        i = r % numThreads;
81                                } else {
82                                        i = tls.my_queue + (r % nqueues);
83                                }
84                        } while(!lists[i].try_lock());
85                        &lists[i];
86                });
87
88                list.push( node );
89                list.unlock();
90                // tls.rng2.set_raw_state( tls.rng1.get_raw_state());
91                // count++;
92                tls.stats.push.success++;
93        }
94
95        __attribute__((noinline, hot)) node_t * pop() {
96                if(tls.my_queue != outside) {
97                        if( tls.myfriend == outside ) {
98                                auto r  = tls.rng1.next();
99                                tls.myfriend = r % numThreads;
100                                tls.mytime = lists[(tls.it % nqueues) + tls.my_queue].ts();
101                                // times[tls.myfriend].val = 0;
102                                // lists[tls.myfriend].val = 0;
103                        }
104                        // else if(times[tls.myfriend].val == 0) {
105                        // else if(lists[tls.myfriend].val == 0) {
106                        else if(times[tls.myfriend].val < tls.mytime) {
107                        // else if(times[tls.myfriend].val < lists[(tls.it % nqueues) + tls.my_queue].ts()) {
108                                node_t * n = try_pop(tls.myfriend, tls.stats.pop.help);
109                                tls.stats.help++;
110                                tls.myfriend = outside;
111                                if(n) return n;
112                        }
113                        // if( tls.myfriend == outside ) {
114                        //      auto r  = tls.rng1.next();
115                        //      tls.myfriend = r % numThreads;
116                        //      tls.mytime = lists[((tls.it + 1) % nqueues) + tls.my_queue].ts();
117                        // }
118                        // else {
119                        //      if(times[tls.myfriend].val + 1000 < tls.mytime) {
120                        //              node_t * n = try_pop(tls.myfriend, tls.stats.pop.help);
121                        //              tls.stats.help++;
122                        //              if(n) return n;
123                        //      }
124                        //      tls.myfriend = outside;
125                        // }
126
127                        node_t * n = local();
128                        if(n) return n;
129                }
130
131                // try steal
132                for(int i = 0; i < 25; i++) {
133                        node_t * n = steal();
134                        if(n) return n;
135                }
136
137                return search();
138        }
139
140private:
141        inline node_t * local() {
142                unsigned i = (--tls.it % nqueues) + tls.my_queue;
143                return try_pop(i, tls.stats.pop.local);
144        }
145
146        inline node_t * steal() {
147                unsigned i = tls.rng2.prev() % numThreads;
148                return try_pop(i, tls.stats.pop.steal);
149        }
150
151        inline node_t * search() {
152                unsigned offset = tls.rng2.prev();
153                for(unsigned i = 0; i < numThreads; i++) {
154                        unsigned idx = (offset + i) % numThreads;
155                        node_t * thrd = try_pop(idx, tls.stats.pop.search);
156                        if(thrd) {
157                                return thrd;
158                        }
159                }
160
161                return nullptr;
162        }
163
164private:
165        struct attempt_stat_t {
166                std::size_t attempt = { 0 };
167                std::size_t elock   = { 0 };
168                std::size_t eempty  = { 0 };
169                std::size_t espec   = { 0 };
170                std::size_t success = { 0 };
171        };
172
173        node_t * try_pop(unsigned i, attempt_stat_t & stat) {
174                assert(i < numThreads);
175                auto & list = lists[i];
176                stat.attempt++;
177
178                // If the list is empty, don't try
179                if(list.ts() == 0) { stat.espec++; return nullptr; }
180
181                // If we can't get the lock, move on
182                if( !list.try_lock() ) { stat.elock++; return nullptr; }
183
184                // If list is empty, unlock and retry
185                if( list.ts() == 0 ) {
186                        list.unlock();
187                        stat.eempty++;
188                        return nullptr;
189                }
190
191                auto node = list.pop();
192                list.unlock();
193                stat.success++;
194                // times[i].val = 1;
195                times[i].val = node.first->_links.ts;
196                // lists[i].val = node.first->_links.ts;
197                return node.first;
198        }
199
200
201public:
202
203        static std::atomic_uint32_t ticket;
204        static const unsigned outside = 0xFFFFFFFF;
205
206        static inline unsigned calc_preferred() {
207                unsigned t = ticket++;
208                if(t == 0) return outside;
209                unsigned i = (t - 1) * nqueues;
210                return i;
211        }
212
213        static __attribute__((aligned(128))) thread_local struct TLS {
214                Random     rng1 = { unsigned(std::hash<std::thread::id>{}(std::this_thread::get_id()) ^ rdtscl()) };
215                Random     rng2 = { unsigned(std::hash<std::thread::id>{}(std::this_thread::get_id()) ^ rdtscl()) };
216                unsigned   it   = 0;
217                unsigned   my_queue = calc_preferred();
218                unsigned   myfriend = outside;
219                unsigned long long int mytime = 0;
220                #if defined(READ)
221                        unsigned it = 0;
222                #endif
223                struct {
224                        struct {
225                                std::size_t attempt = { 0 };
226                                std::size_t success = { 0 };
227                        } push;
228                        struct {
229                                attempt_stat_t help;
230                                attempt_stat_t local;
231                                attempt_stat_t steal;
232                                attempt_stat_t search;
233                        } pop;
234                        std::size_t help = { 0 };
235                } stats;
236        } tls;
237
238private:
239        const unsigned numThreads;
240        std::unique_ptr<localQ_t<node_t> []> lists;
241        // std::unique_ptr<intrusive_queue_t<node_t> []> lists;
242        std::unique_ptr<timestamp_t []> times;
243        __attribute__((aligned(128))) std::atomic_size_t count;
244
245#ifndef NO_STATS
246private:
247        static struct GlobalStats {
248                struct {
249                        std::atomic_size_t attempt = { 0 };
250                        std::atomic_size_t success = { 0 };
251                } push;
252                struct {
253                        struct {
254                                std::atomic_size_t attempt = { 0 };
255                                std::atomic_size_t elock   = { 0 };
256                                std::atomic_size_t eempty  = { 0 };
257                                std::atomic_size_t espec   = { 0 };
258                                std::atomic_size_t success = { 0 };
259                        } help;
260                        struct {
261                                std::atomic_size_t attempt = { 0 };
262                                std::atomic_size_t elock   = { 0 };
263                                std::atomic_size_t eempty  = { 0 };
264                                std::atomic_size_t espec   = { 0 };
265                                std::atomic_size_t success = { 0 };
266                        } local;
267                        struct {
268                                std::atomic_size_t attempt = { 0 };
269                                std::atomic_size_t elock   = { 0 };
270                                std::atomic_size_t eempty  = { 0 };
271                                std::atomic_size_t espec   = { 0 };
272                                std::atomic_size_t success = { 0 };
273                        } steal;
274                        struct {
275                                std::atomic_size_t attempt = { 0 };
276                                std::atomic_size_t elock   = { 0 };
277                                std::atomic_size_t eempty  = { 0 };
278                                std::atomic_size_t espec   = { 0 };
279                                std::atomic_size_t success = { 0 };
280                        } search;
281                } pop;
282                std::atomic_size_t help = { 0 };
283        } global_stats;
284
285public:
286        static void stats_tls_tally() {
287                global_stats.push.attempt += tls.stats.push.attempt;
288                global_stats.push.success += tls.stats.push.success;
289                global_stats.pop.help  .attempt += tls.stats.pop.help  .attempt;
290                global_stats.pop.help  .elock   += tls.stats.pop.help  .elock  ;
291                global_stats.pop.help  .eempty  += tls.stats.pop.help  .eempty ;
292                global_stats.pop.help  .espec   += tls.stats.pop.help  .espec  ;
293                global_stats.pop.help  .success += tls.stats.pop.help  .success;
294                global_stats.pop.local .attempt += tls.stats.pop.local .attempt;
295                global_stats.pop.local .elock   += tls.stats.pop.local .elock  ;
296                global_stats.pop.local .eempty  += tls.stats.pop.local .eempty ;
297                global_stats.pop.local .espec   += tls.stats.pop.local .espec  ;
298                global_stats.pop.local .success += tls.stats.pop.local .success;
299                global_stats.pop.steal .attempt += tls.stats.pop.steal .attempt;
300                global_stats.pop.steal .elock   += tls.stats.pop.steal .elock  ;
301                global_stats.pop.steal .eempty  += tls.stats.pop.steal .eempty ;
302                global_stats.pop.steal .espec   += tls.stats.pop.steal .espec  ;
303                global_stats.pop.steal .success += tls.stats.pop.steal .success;
304                global_stats.pop.search.attempt += tls.stats.pop.search.attempt;
305                global_stats.pop.search.elock   += tls.stats.pop.search.elock  ;
306                global_stats.pop.search.eempty  += tls.stats.pop.search.eempty ;
307                global_stats.pop.search.espec   += tls.stats.pop.search.espec  ;
308                global_stats.pop.search.success += tls.stats.pop.search.success;
309                global_stats.help += tls.stats.help;
310        }
311
312        static void stats_print(std::ostream & os, double duration ) {
313                std::cout << "----- Work Stealing Stats -----" << std::endl;
314
315                double push_suc = (100.0 * double(global_stats.push.success) / global_stats.push.attempt);
316                double push_len = double(global_stats.push.attempt     ) / global_stats.push.success;
317                os << "Push   Pick : " << push_suc << " %, len " << push_len << " (" << global_stats.push.attempt      << " / " << global_stats.push.success << ")\n";
318
319                double hlp_suc = (100.0 * double(global_stats.pop.help.success) / global_stats.pop.help.attempt);
320                double hlp_len = double(global_stats.pop.help.attempt     ) / global_stats.pop.help.success;
321                os << "Help        : " << hlp_suc << " %, len " << hlp_len << " (" << global_stats.pop.help.attempt      << " / " << global_stats.pop.help.success << ")\n";
322                os << "Help Fail   : " << global_stats.pop.help.espec << "s, " << global_stats.pop.help.eempty << "e, " << global_stats.pop.help.elock << "l\n";
323
324                double pop_suc = (100.0 * double(global_stats.pop.local.success) / global_stats.pop.local.attempt);
325                double pop_len = double(global_stats.pop.local.attempt     ) / global_stats.pop.local.success;
326                os << "Local       : " << pop_suc << " %, len " << pop_len << " (" << global_stats.pop.local.attempt      << " / " << global_stats.pop.local.success << ")\n";
327                os << "Local Fail  : " << global_stats.pop.local.espec << "s, " << global_stats.pop.local.eempty << "e, " << global_stats.pop.local.elock << "l\n";
328
329                double stl_suc = (100.0 * double(global_stats.pop.steal.success) / global_stats.pop.steal.attempt);
330                double stl_len = double(global_stats.pop.steal.attempt     ) / global_stats.pop.steal.success;
331                os << "Steal       : " << stl_suc << " %, len " << stl_len << " (" << global_stats.pop.steal.attempt      << " / " << global_stats.pop.steal.success << ")\n";
332                os << "Steal Fail  : " << global_stats.pop.steal.espec << "s, " << global_stats.pop.steal.eempty << "e, " << global_stats.pop.steal.elock << "l\n";
333
334                double srh_suc = (100.0 * double(global_stats.pop.search.success) / global_stats.pop.search.attempt);
335                double srh_len = double(global_stats.pop.search.attempt     ) / global_stats.pop.search.success;
336                os << "Search      : " << srh_suc << " %, len " << srh_len << " (" << global_stats.pop.search.attempt      << " / " << global_stats.pop.search.success << ")\n";
337                os << "Search Fail : " << global_stats.pop.search.espec << "s, " << global_stats.pop.search.eempty << "e, " << global_stats.pop.search.elock << "l\n";
338                os << "Helps       : " << std::setw(15) << std::scientific << global_stats.help / duration << "/sec (" << global_stats.help  << ")\n";
339        }
340private:
341#endif
342};
Note: See TracBrowser for help on using the repository browser.