source: doc/theses/thierry_delisle_PhD/code/readyQ_proto/work_stealing.hpp @ 451d958

ADTast-experimentalenumforall-pointer-decaypthread-emulationqualifiedEnum
Last change on this file since 451d958 was 780a614, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Added comparison of the mpsc queue to the protoptype.

  • Property mode set to 100644
File size: 12.0 KB
Line 
1#pragma once
2#define LIST_VARIANT work_stealing
3
4#include <cmath>
5#include <iomanip>
6#include <memory>
7#include <mutex>
8#include <thread>
9#include <type_traits>
10
11#include "assert.hpp"
12#include "utils.hpp"
13#include "links.hpp"
14#include "links2.hpp"
15#include "snzi.hpp"
16
17// #include <x86intrin.h>
18
19using namespace std;
20
21static const long long lim = 2000;
22static const unsigned nqueues = 2;
23
24struct __attribute__((aligned(128))) timestamp_t {
25        volatile unsigned long long val = 0;
26};
27
28template<typename node_t>
29struct __attribute__((aligned(128))) localQ_t {
30        #ifdef NO_MPSC
31                intrusive_queue_t<node_t> list;
32
33                inline auto ts() { return list.ts(); }
34                inline auto lock() { return list.lock.lock(); }
35                inline auto try_lock() { return list.lock.try_lock(); }
36                inline auto unlock() { return list.lock.unlock(); }
37
38                inline auto push( node_t * node ) { return list.push( node ); }
39                inline auto pop() { return list.pop(); }
40        #else
41                mpsc_queue<node_t> queue = {};
42                spinlock_t _lock = {};
43
44                inline auto ts() { auto h = queue.head(); return h ? h->_links.ts : 0ull; }
45                inline auto lock() { return _lock.lock(); }
46                inline auto try_lock() { return _lock.try_lock(); }
47                inline auto unlock() { return _lock.unlock(); }
48
49                inline auto push( node_t * node ) { return queue.push( node ); }
50                inline auto pop() { return queue.pop(); }
51        #endif
52
53
54};
55
56template<typename node_t>
57class __attribute__((aligned(128))) work_stealing {
58        static_assert(std::is_same<decltype(node_t::_links), _LinksFields_t<node_t>>::value, "Node must have a links field");
59
60public:
61        static const char * name() {
62                return "Work Stealing";
63        }
64
65        work_stealing(unsigned _numThreads, unsigned)
66                : numThreads(_numThreads * nqueues)
67                , lists(new localQ_t<node_t>[numThreads])
68                // , lists(new intrusive_queue_t<node_t>[numThreads])
69                , times(new timestamp_t[numThreads])
70                // , snzi( std::log2( numThreads / 2 ), 2 )
71
72        {
73                std::cout << "Constructing Work Stealer with " << numThreads << std::endl;
74        }
75
76        ~work_stealing() {
77                std::cout << "Destroying Work Stealer" << std::endl;
78                lists.reset();
79        }
80
81        __attribute__((noinline, hot)) void push(node_t * node) {
82                node->_links.ts = rdtscl();
83                // node->_links.ts = 1;
84
85                auto & list = *({
86                        unsigned i;
87                        #ifdef NO_MPSC
88                                do {
89                        #endif
90                                tls.stats.push.attempt++;
91                                // unsigned r = tls.rng1.next();
92                                unsigned r = tls.it++;
93                                if(tls.my_queue == outside) {
94                                        i = r % numThreads;
95                                } else {
96                                        i = tls.my_queue + (r % nqueues);
97                                }
98                        #ifdef NO_MPSC
99                                } while(!lists[i].try_lock());
100                        #endif
101                        &lists[i];
102                });
103
104                list.push( node );
105                #ifdef NO_MPSC
106                        list.unlock();
107                #endif
108                // tls.rng2.set_raw_state( tls.rng1.get_raw_state());
109                // count++;
110                tls.stats.push.success++;
111        }
112
113        __attribute__((noinline, hot)) node_t * pop() {
114                if(tls.my_queue != outside) {
115                        // if( tls.myfriend == outside ) {
116                        //      auto r  = tls.rng1.next();
117                        //      tls.myfriend = r % numThreads;
118                        //      // assert(lists[(tls.it % nqueues) + tls.my_queue].ts() >= lists[((tls.it + 1) % nqueues) + tls.my_queue].ts());
119                        //      tls.mytime = std::min(lists[(tls.it % nqueues) + tls.my_queue].ts(), lists[((tls.it + 1) % nqueues) + tls.my_queue].ts());
120                        //      // times[tls.myfriend].val = 0;
121                        //      // lists[tls.myfriend].val = 0;
122                        // }
123                        // // else if(times[tls.myfriend].val == 0) {
124                        // // else if(lists[tls.myfriend].val == 0) {
125                        // else if(times[tls.myfriend].val < tls.mytime) {
126                        // // else if(times[tls.myfriend].val < lists[(tls.it % nqueues) + tls.my_queue].ts()) {
127                        //      node_t * n = try_pop(tls.myfriend, tls.stats.pop.help);
128                        //      tls.stats.help++;
129                        //      tls.myfriend = outside;
130                        //      if(n) return n;
131                        // }
132                        // if( tls.myfriend == outside ) {
133                        //      auto r  = tls.rng1.next();
134                        //      tls.myfriend = r % numThreads;
135                        //      tls.mytime = lists[((tls.it + 1) % nqueues) + tls.my_queue].ts();
136                        // }
137                        // else {
138                        //      if(times[tls.myfriend].val + 1000 < tls.mytime) {
139                        //              node_t * n = try_pop(tls.myfriend, tls.stats.pop.help);
140                        //              tls.stats.help++;
141                        //              if(n) return n;
142                        //      }
143                        //      tls.myfriend = outside;
144                        // }
145
146                        node_t * n = local();
147                        if(n) return n;
148                }
149
150                // try steal
151                for(int i = 0; i < 25; i++) {
152                        node_t * n = steal();
153                        if(n) return n;
154                }
155
156                return search();
157        }
158
159private:
160        inline node_t * local() {
161                unsigned i = (--tls.it % nqueues) + tls.my_queue;
162                node_t * n = try_pop(i, tls.stats.pop.local);
163                if(n) return n;
164                i = (--tls.it % nqueues) + tls.my_queue;
165                return try_pop(i, tls.stats.pop.local);
166        }
167
168        inline node_t * steal() {
169                unsigned i = tls.rng2.prev() % numThreads;
170                return try_pop(i, tls.stats.pop.steal);
171        }
172
173        inline node_t * search() {
174                unsigned offset = tls.rng2.prev();
175                for(unsigned i = 0; i < numThreads; i++) {
176                        unsigned idx = (offset + i) % numThreads;
177                        node_t * thrd = try_pop(idx, tls.stats.pop.search);
178                        if(thrd) {
179                                return thrd;
180                        }
181                }
182
183                return nullptr;
184        }
185
186private:
187        struct attempt_stat_t {
188                std::size_t attempt = { 0 };
189                std::size_t elock   = { 0 };
190                std::size_t eempty  = { 0 };
191                std::size_t espec   = { 0 };
192                std::size_t success = { 0 };
193        };
194
195        node_t * try_pop(unsigned i, attempt_stat_t & stat) {
196                assert(i < numThreads);
197                auto & list = lists[i];
198                stat.attempt++;
199
200                // If the list is empty, don't try
201                if(list.ts() == 0) { stat.espec++; return nullptr; }
202
203                // If we can't get the lock, move on
204                if( !list.try_lock() ) { stat.elock++; return nullptr; }
205
206                // If list is empty, unlock and retry
207                if( list.ts() == 0 ) {
208                        list.unlock();
209                        stat.eempty++;
210                        return nullptr;
211                }
212
213                auto node = list.pop();
214                list.unlock();
215                stat.success++;
216                #ifdef NO_MPSC
217                        // times[i].val = 1;
218                        times[i].val = node.first->_links.ts;
219                        // lists[i].val = node.first->_links.ts;
220                        return node.first;
221                #else
222                        times[i].val = node->_links.ts;
223                        return node;
224                #endif
225        }
226
227
228public:
229
230        static std::atomic_uint32_t ticket;
231        static const unsigned outside = 0xFFFFFFFF;
232
233        static inline unsigned calc_preferred() {
234                unsigned t = ticket++;
235                if(t == 0) return outside;
236                unsigned i = (t - 1) * nqueues;
237                return i;
238        }
239
240        static __attribute__((aligned(128))) thread_local struct TLS {
241                Random     rng1 = { unsigned(std::hash<std::thread::id>{}(std::this_thread::get_id()) ^ rdtscl()) };
242                Random     rng2 = { unsigned(std::hash<std::thread::id>{}(std::this_thread::get_id()) ^ rdtscl()) };
243                unsigned   it   = 0;
244                unsigned   my_queue = calc_preferred();
245                unsigned   myfriend = outside;
246                unsigned long long int mytime = 0;
247                #if defined(READ)
248                        unsigned it = 0;
249                #endif
250                struct {
251                        struct {
252                                std::size_t attempt = { 0 };
253                                std::size_t success = { 0 };
254                        } push;
255                        struct {
256                                attempt_stat_t help;
257                                attempt_stat_t local;
258                                attempt_stat_t steal;
259                                attempt_stat_t search;
260                        } pop;
261                        std::size_t help = { 0 };
262                } stats;
263        } tls;
264
265private:
266        const unsigned numThreads;
267        std::unique_ptr<localQ_t<node_t> []> lists;
268        // std::unique_ptr<intrusive_queue_t<node_t> []> lists;
269        std::unique_ptr<timestamp_t []> times;
270        __attribute__((aligned(128))) std::atomic_size_t count;
271
272#ifndef NO_STATS
273private:
274        static struct GlobalStats {
275                struct {
276                        std::atomic_size_t attempt = { 0 };
277                        std::atomic_size_t success = { 0 };
278                } push;
279                struct {
280                        struct {
281                                std::atomic_size_t attempt = { 0 };
282                                std::atomic_size_t elock   = { 0 };
283                                std::atomic_size_t eempty  = { 0 };
284                                std::atomic_size_t espec   = { 0 };
285                                std::atomic_size_t success = { 0 };
286                        } help;
287                        struct {
288                                std::atomic_size_t attempt = { 0 };
289                                std::atomic_size_t elock   = { 0 };
290                                std::atomic_size_t eempty  = { 0 };
291                                std::atomic_size_t espec   = { 0 };
292                                std::atomic_size_t success = { 0 };
293                        } local;
294                        struct {
295                                std::atomic_size_t attempt = { 0 };
296                                std::atomic_size_t elock   = { 0 };
297                                std::atomic_size_t eempty  = { 0 };
298                                std::atomic_size_t espec   = { 0 };
299                                std::atomic_size_t success = { 0 };
300                        } steal;
301                        struct {
302                                std::atomic_size_t attempt = { 0 };
303                                std::atomic_size_t elock   = { 0 };
304                                std::atomic_size_t eempty  = { 0 };
305                                std::atomic_size_t espec   = { 0 };
306                                std::atomic_size_t success = { 0 };
307                        } search;
308                } pop;
309                std::atomic_size_t help = { 0 };
310        } global_stats;
311
312public:
313        static void stats_tls_tally() {
314                global_stats.push.attempt += tls.stats.push.attempt;
315                global_stats.push.success += tls.stats.push.success;
316                global_stats.pop.help  .attempt += tls.stats.pop.help  .attempt;
317                global_stats.pop.help  .elock   += tls.stats.pop.help  .elock  ;
318                global_stats.pop.help  .eempty  += tls.stats.pop.help  .eempty ;
319                global_stats.pop.help  .espec   += tls.stats.pop.help  .espec  ;
320                global_stats.pop.help  .success += tls.stats.pop.help  .success;
321                global_stats.pop.local .attempt += tls.stats.pop.local .attempt;
322                global_stats.pop.local .elock   += tls.stats.pop.local .elock  ;
323                global_stats.pop.local .eempty  += tls.stats.pop.local .eempty ;
324                global_stats.pop.local .espec   += tls.stats.pop.local .espec  ;
325                global_stats.pop.local .success += tls.stats.pop.local .success;
326                global_stats.pop.steal .attempt += tls.stats.pop.steal .attempt;
327                global_stats.pop.steal .elock   += tls.stats.pop.steal .elock  ;
328                global_stats.pop.steal .eempty  += tls.stats.pop.steal .eempty ;
329                global_stats.pop.steal .espec   += tls.stats.pop.steal .espec  ;
330                global_stats.pop.steal .success += tls.stats.pop.steal .success;
331                global_stats.pop.search.attempt += tls.stats.pop.search.attempt;
332                global_stats.pop.search.elock   += tls.stats.pop.search.elock  ;
333                global_stats.pop.search.eempty  += tls.stats.pop.search.eempty ;
334                global_stats.pop.search.espec   += tls.stats.pop.search.espec  ;
335                global_stats.pop.search.success += tls.stats.pop.search.success;
336                global_stats.help += tls.stats.help;
337        }
338
339        static void stats_print(std::ostream & os, double duration ) {
340                std::cout << "----- Work Stealing Stats -----" << std::endl;
341
342                double push_suc = (100.0 * double(global_stats.push.success) / global_stats.push.attempt);
343                double push_len = double(global_stats.push.attempt     ) / global_stats.push.success;
344                os << "Push   Pick : " << push_suc << " %, len " << push_len << " (" << global_stats.push.attempt      << " / " << global_stats.push.success << ")\n";
345
346                double hlp_suc = (100.0 * double(global_stats.pop.help.success) / global_stats.pop.help.attempt);
347                double hlp_len = double(global_stats.pop.help.attempt     ) / global_stats.pop.help.success;
348                os << "Help        : " << hlp_suc << " %, len " << hlp_len << " (" << global_stats.pop.help.attempt      << " / " << global_stats.pop.help.success << ")\n";
349                os << "Help Fail   : " << global_stats.pop.help.espec << "s, " << global_stats.pop.help.eempty << "e, " << global_stats.pop.help.elock << "l\n";
350
351                double pop_suc = (100.0 * double(global_stats.pop.local.success) / global_stats.pop.local.attempt);
352                double pop_len = double(global_stats.pop.local.attempt     ) / global_stats.pop.local.success;
353                os << "Local       : " << pop_suc << " %, len " << pop_len << " (" << global_stats.pop.local.attempt      << " / " << global_stats.pop.local.success << ")\n";
354                os << "Local Fail  : " << global_stats.pop.local.espec << "s, " << global_stats.pop.local.eempty << "e, " << global_stats.pop.local.elock << "l\n";
355
356                double stl_suc = (100.0 * double(global_stats.pop.steal.success) / global_stats.pop.steal.attempt);
357                double stl_len = double(global_stats.pop.steal.attempt     ) / global_stats.pop.steal.success;
358                os << "Steal       : " << stl_suc << " %, len " << stl_len << " (" << global_stats.pop.steal.attempt      << " / " << global_stats.pop.steal.success << ")\n";
359                os << "Steal Fail  : " << global_stats.pop.steal.espec << "s, " << global_stats.pop.steal.eempty << "e, " << global_stats.pop.steal.elock << "l\n";
360
361                double srh_suc = (100.0 * double(global_stats.pop.search.success) / global_stats.pop.search.attempt);
362                double srh_len = double(global_stats.pop.search.attempt     ) / global_stats.pop.search.success;
363                os << "Search      : " << srh_suc << " %, len " << srh_len << " (" << global_stats.pop.search.attempt      << " / " << global_stats.pop.search.success << ")\n";
364                os << "Search Fail : " << global_stats.pop.search.espec << "s, " << global_stats.pop.search.eempty << "e, " << global_stats.pop.search.elock << "l\n";
365                os << "Helps       : " << std::setw(15) << std::scientific << global_stats.help / duration << "/sec (" << global_stats.help  << ")\n";
366        }
367private:
368#endif
369};
Note: See TracBrowser for help on using the repository browser.