source: doc/theses/thierry_delisle_PhD/code/readyQ_proto/work_stealing.hpp @ 6a8208cb

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 6a8208cb was a1b9bc3, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Many small changes to prototype code

  • Property mode set to 100644
File size: 10.1 KB
Line 
1#pragma once
2#define LIST_VARIANT work_stealing
3
4#include <cmath>
5#include <iomanip>
6#include <memory>
7#include <mutex>
8#include <thread>
9#include <type_traits>
10
11#include "assert.hpp"
12#include "utils.hpp"
13#include "links.hpp"
14#include "links2.hpp"
15#include "snzi.hpp"
16
17#include <x86intrin.h>
18
19using namespace std;
20
21static const long long lim = 2000;
22static const unsigned nqueues = 2;
23
24struct __attribute__((aligned(128))) timestamp_t {
25        volatile unsigned long long val = 0;
26};
27
28template<typename node_t>
29struct __attribute__((aligned(128))) localQ_t {
30        mpsc_queue<node_t> queue = {};
31        spinlock_t lock = {};
32        bool needs_help = true;
33};
34
35template<typename node_t>
36class __attribute__((aligned(128))) work_stealing {
37        static_assert(std::is_same<decltype(node_t::_links), _LinksFields_t<node_t>>::value, "Node must have a links field");
38
39public:
40        static const char * name() {
41                return "Work Stealing";
42        }
43
44        work_stealing(unsigned _numThreads, unsigned)
45                : numThreads(_numThreads * nqueues)
46                , lists(new intrusive_queue_t<node_t>[numThreads])
47                , times(new timestamp_t[numThreads])
48                // , snzi( std::log2( numThreads / 2 ), 2 )
49
50        {
51                std::cout << "Constructing Work Stealer with " << numThreads << std::endl;
52        }
53
54        ~work_stealing() {
55                std::cout << "Destroying Work Stealer" << std::endl;
56                lists.reset();
57        }
58
59        __attribute__((noinline, hot)) void push(node_t * node) {
60                // node->_links.ts = rdtscl();
61                node->_links.ts = 1;
62
63                auto & list = *({
64                        unsigned i;
65                        do {
66                                tls.stats.push.attempt++;
67                                // unsigned r = tls.rng1.next();
68                                unsigned r = tls.it++;
69                                if(tls.my_queue == outside) {
70                                        i = r % numThreads;
71                                } else {
72                                        i = tls.my_queue + (r % nqueues);
73                                }
74                        } while(!lists[i].lock.try_lock());
75                        &lists[i];
76                });
77
78                list.push( node );
79                list.lock.unlock();
80                // tls.rng2.set_raw_state( tls.rng1.get_raw_state());
81                // count++;
82                tls.stats.push.success++;
83        }
84
85        __attribute__((noinline, hot)) node_t * pop() {
86                if( tls.myfriend == outside ) {
87                        auto r  = tls.rng1.next();
88                        tls.myfriend = r % numThreads;
89                        times[tls.myfriend].val = 0;
90                }
91                else if(times[tls.myfriend].val == 0) {
92                        node_t * n = try_pop(tls.myfriend, tls.stats.pop.help);
93                        tls.stats.help++;
94                        tls.myfriend = outside;
95                        if(n) return n;
96                }
97
98                if(tls.my_queue != outside) {
99                        node_t * n = local();
100                        if(n) return n;
101                }
102
103                // try steal
104                for(int i = 0; i < 25; i++) {
105                        node_t * n = steal();
106                        if(n) return n;
107                }
108
109                return search();
110        }
111
112private:
113        inline node_t * local() {
114                // unsigned i = (tls.rng2.prev() % 4) + tls.my_queue;
115                unsigned i = (--tls.it % nqueues) + tls.my_queue;
116                return try_pop(i, tls.stats.pop.local);
117        }
118
119        inline node_t * steal() {
120                unsigned i = tls.rng2.prev() % numThreads;
121                return try_pop(i, tls.stats.pop.steal);
122        }
123
124        inline node_t * search() {
125                unsigned offset = tls.rng2.prev();
126                for(unsigned i = 0; i < numThreads; i++) {
127                        unsigned idx = (offset + i) % numThreads;
128                        node_t * thrd = try_pop(idx, tls.stats.pop.search);
129                        if(thrd) {
130                                return thrd;
131                        }
132                }
133
134                return nullptr;
135        }
136
137private:
138        struct attempt_stat_t {
139                std::size_t attempt = { 0 };
140                std::size_t elock   = { 0 };
141                std::size_t eempty  = { 0 };
142                std::size_t espec   = { 0 };
143                std::size_t success = { 0 };
144        };
145
146        node_t * try_pop(unsigned i, attempt_stat_t & stat) {
147                assert(i < numThreads);
148                auto & list = lists[i];
149                stat.attempt++;
150
151                // If the list is empty, don't try
152                if(list.ts() == 0) { stat.espec++; return nullptr; }
153
154                // If we can't get the lock, move on
155                if( !list.lock.try_lock() ) { stat.elock++; return nullptr; }
156
157
158                // If list is empty, unlock and retry
159                if( list.ts() == 0 ) {
160                        list.lock.unlock();
161                        stat.eempty++;
162                        return nullptr;
163                }
164
165                auto node = list.pop();
166                list.lock.unlock();
167                stat.success++;
168                times[i].val = 1; //node.first->_links.ts;
169                // count--;
170                // _mm_stream_si64((long long int*)&times[i].val, node.first->_links.ts);
171                return node.first;
172        }
173
174
175public:
176
177        static std::atomic_uint32_t ticket;
178        static const unsigned outside = 0xFFFFFFFF;
179
180        static inline unsigned calc_preferred() {
181                unsigned t = ticket++;
182                if(t == 0) return outside;
183                unsigned i = (t - 1) * nqueues;
184                return i;
185        }
186
187        static __attribute__((aligned(128))) thread_local struct TLS {
188                Random     rng1 = { unsigned(std::hash<std::thread::id>{}(std::this_thread::get_id()) ^ rdtscl()) };
189                Random     rng2 = { unsigned(std::hash<std::thread::id>{}(std::this_thread::get_id()) ^ rdtscl()) };
190                unsigned   it   = 0;
191                unsigned   my_queue = calc_preferred();
192                unsigned   myfriend = outside;
193                #if defined(READ)
194                        unsigned it = 0;
195                #endif
196                struct {
197                        struct {
198                                std::size_t attempt = { 0 };
199                                std::size_t success = { 0 };
200                        } push;
201                        struct {
202                                attempt_stat_t help;
203                                attempt_stat_t local;
204                                attempt_stat_t steal;
205                                attempt_stat_t search;
206                        } pop;
207                        std::size_t help = { 0 };
208                } stats;
209        } tls;
210
211private:
212        const unsigned numThreads;
213        std::unique_ptr<intrusive_queue_t<node_t> []> lists;
214        std::unique_ptr<timestamp_t []> times;
215        __attribute__((aligned(128))) std::atomic_size_t count;
216
217#ifndef NO_STATS
218private:
219        static struct GlobalStats {
220                struct {
221                        std::atomic_size_t attempt = { 0 };
222                        std::atomic_size_t success = { 0 };
223                } push;
224                struct {
225                        struct {
226                                std::atomic_size_t attempt = { 0 };
227                                std::atomic_size_t elock   = { 0 };
228                                std::atomic_size_t eempty  = { 0 };
229                                std::atomic_size_t espec   = { 0 };
230                                std::atomic_size_t success = { 0 };
231                        } help;
232                        struct {
233                                std::atomic_size_t attempt = { 0 };
234                                std::atomic_size_t elock   = { 0 };
235                                std::atomic_size_t eempty  = { 0 };
236                                std::atomic_size_t espec   = { 0 };
237                                std::atomic_size_t success = { 0 };
238                        } local;
239                        struct {
240                                std::atomic_size_t attempt = { 0 };
241                                std::atomic_size_t elock   = { 0 };
242                                std::atomic_size_t eempty  = { 0 };
243                                std::atomic_size_t espec   = { 0 };
244                                std::atomic_size_t success = { 0 };
245                        } steal;
246                        struct {
247                                std::atomic_size_t attempt = { 0 };
248                                std::atomic_size_t elock   = { 0 };
249                                std::atomic_size_t eempty  = { 0 };
250                                std::atomic_size_t espec   = { 0 };
251                                std::atomic_size_t success = { 0 };
252                        } search;
253                } pop;
254                std::atomic_size_t help = { 0 };
255        } global_stats;
256
257public:
258        static void stats_tls_tally() {
259                global_stats.push.attempt += tls.stats.push.attempt;
260                global_stats.push.success += tls.stats.push.success;
261                global_stats.pop.help  .attempt += tls.stats.pop.help  .attempt;
262                global_stats.pop.help  .elock   += tls.stats.pop.help  .elock  ;
263                global_stats.pop.help  .eempty  += tls.stats.pop.help  .eempty ;
264                global_stats.pop.help  .espec   += tls.stats.pop.help  .espec  ;
265                global_stats.pop.help  .success += tls.stats.pop.help  .success;
266                global_stats.pop.local .attempt += tls.stats.pop.local .attempt;
267                global_stats.pop.local .elock   += tls.stats.pop.local .elock  ;
268                global_stats.pop.local .eempty  += tls.stats.pop.local .eempty ;
269                global_stats.pop.local .espec   += tls.stats.pop.local .espec  ;
270                global_stats.pop.local .success += tls.stats.pop.local .success;
271                global_stats.pop.steal .attempt += tls.stats.pop.steal .attempt;
272                global_stats.pop.steal .elock   += tls.stats.pop.steal .elock  ;
273                global_stats.pop.steal .eempty  += tls.stats.pop.steal .eempty ;
274                global_stats.pop.steal .espec   += tls.stats.pop.steal .espec  ;
275                global_stats.pop.steal .success += tls.stats.pop.steal .success;
276                global_stats.pop.search.attempt += tls.stats.pop.search.attempt;
277                global_stats.pop.search.elock   += tls.stats.pop.search.elock  ;
278                global_stats.pop.search.eempty  += tls.stats.pop.search.eempty ;
279                global_stats.pop.search.espec   += tls.stats.pop.search.espec  ;
280                global_stats.pop.search.success += tls.stats.pop.search.success;
281                global_stats.help += tls.stats.help;
282        }
283
284        static void stats_print(std::ostream & os, double duration ) {
285                std::cout << "----- Work Stealing Stats -----" << std::endl;
286
287                double push_suc = (100.0 * double(global_stats.push.success) / global_stats.push.attempt);
288                double push_len = double(global_stats.push.attempt     ) / global_stats.push.success;
289                os << "Push   Pick : " << push_suc << " %, len " << push_len << " (" << global_stats.push.attempt      << " / " << global_stats.push.success << ")\n";
290
291                double hlp_suc = (100.0 * double(global_stats.pop.help.success) / global_stats.pop.help.attempt);
292                double hlp_len = double(global_stats.pop.help.attempt     ) / global_stats.pop.help.success;
293                os << "Help        : " << hlp_suc << " %, len " << hlp_len << " (" << global_stats.pop.help.attempt      << " / " << global_stats.pop.help.success << ")\n";
294                os << "Help Fail   : " << global_stats.pop.help.espec << "s, " << global_stats.pop.help.eempty << "e, " << global_stats.pop.help.elock << "l\n";
295
296                double pop_suc = (100.0 * double(global_stats.pop.local.success) / global_stats.pop.local.attempt);
297                double pop_len = double(global_stats.pop.local.attempt     ) / global_stats.pop.local.success;
298                os << "Local       : " << pop_suc << " %, len " << pop_len << " (" << global_stats.pop.local.attempt      << " / " << global_stats.pop.local.success << ")\n";
299                os << "Local Fail  : " << global_stats.pop.local.espec << "s, " << global_stats.pop.local.eempty << "e, " << global_stats.pop.local.elock << "l\n";
300
301                double stl_suc = (100.0 * double(global_stats.pop.steal.success) / global_stats.pop.steal.attempt);
302                double stl_len = double(global_stats.pop.steal.attempt     ) / global_stats.pop.steal.success;
303                os << "Steal       : " << stl_suc << " %, len " << stl_len << " (" << global_stats.pop.steal.attempt      << " / " << global_stats.pop.steal.success << ")\n";
304                os << "Steal Fail  : " << global_stats.pop.steal.espec << "s, " << global_stats.pop.steal.eempty << "e, " << global_stats.pop.steal.elock << "l\n";
305
306                double srh_suc = (100.0 * double(global_stats.pop.search.success) / global_stats.pop.search.attempt);
307                double srh_len = double(global_stats.pop.search.attempt     ) / global_stats.pop.search.success;
308                os << "Search      : " << srh_suc << " %, len " << srh_len << " (" << global_stats.pop.search.attempt      << " / " << global_stats.pop.search.success << ")\n";
309                os << "Search Fail : " << global_stats.pop.search.espec << "s, " << global_stats.pop.search.eempty << "e, " << global_stats.pop.search.elock << "l\n";
310                os << "Helps       : " << std::setw(15) << std::scientific << global_stats.help / duration << "/sec (" << global_stats.help  << ")\n";
311        }
312private:
313#endif
314};
Note: See TracBrowser for help on using the repository browser.