source: doc/theses/thierry_delisle_PhD/code/relaxed_list.cpp @ 8633485b

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationjenkins-sandboxnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 8633485b was c921712, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Fixed support for setting number of starting nodes

  • Property mode set to 100644
File size: 6.7 KB
Line 
1#include "relaxed_list.hpp"
2
3#include <array>
4#include <iomanip>
5#include <iostream>
6#include <locale>
7#include <string>
8#include <thread>
9#include <vector>
10
11#include <unistd.h>
12#include <sys/sysinfo.h>
13
14#include "utils.hpp"
15
16struct __attribute__((aligned(64))) Node {
17        static std::atomic_size_t creates;
18        static std::atomic_size_t destroys;
19
20        _LinksFields_t<Node> _links;
21
22        int value;
23        Node(int value): value(value) {
24                creates++;
25        }
26
27        ~Node() {
28                destroys++;
29        }
30};
31
32std::atomic_size_t Node::creates  = { 0 };
33std::atomic_size_t Node::destroys = { 0 };
34
35static const constexpr int nodes_per_threads = 128;
36struct NodeArray {
37        __attribute__((aligned(64))) Node * array[nodes_per_threads];
38        __attribute__((aligned(64))) char pad;
39};
40
41bool enable_stats = false;
42
43struct local_stat_t {
44        size_t in  = 0;
45        size_t out = 0;
46        size_t empty = 0;
47        size_t crc_in  = 0;
48        size_t crc_out = 0;
49};
50
51__attribute__((noinline)) void run_body(
52        std::atomic<bool>& done,
53        Random & rand,
54        Node * (&my_nodes)[128],
55        local_stat_t & local,
56        relaxed_list<Node> & list
57) {
58        while(__builtin_expect(!done.load(std::memory_order_relaxed), true)) {
59                int idx = rand.next() % nodes_per_threads;
60                if (auto node = my_nodes[idx]) {
61                        local.crc_in += node->value;
62                        list.push(node);
63                        my_nodes[idx] = nullptr;
64                        local.in++;
65                }
66                else if(auto node = list.pop()) {
67                        local.crc_out += node->value;
68                        my_nodes[idx] = node;
69                        local.out++;
70                }
71                else {
72                        local.empty++;
73                }
74        }
75}
76
77void run(unsigned nthread, unsigned nqueues, unsigned fill, double duration) {
78        // List being tested
79        relaxed_list<Node> list = { nthread * nqueues };
80
81        // Barrier for synchronization
82        barrier_t barrier(nthread + 1);
83
84        // Data to check everything is OK
85        struct {
86                std::atomic_size_t in  = { 0 };
87                std::atomic_size_t out = { 0 };
88                std::atomic_size_t empty = { 0 };
89                std::atomic_size_t crc_in  = { 0 };
90                std::atomic_size_t crc_out = { 0 };
91                struct {
92                        struct {
93                                std::atomic_size_t attempt = { 0 };
94                                std::atomic_size_t success = { 0 };
95                        } push;
96                        struct {
97                                std::atomic_size_t attempt = { 0 };
98                                std::atomic_size_t success = { 0 };
99                        } pop;
100                } pick;
101        } global;
102
103        // Flag to signal termination
104        std::atomic_bool done  = { false };
105
106        // Prep nodes
107        std::cout << "Initializing ";
108        size_t nnodes  = 0;
109        size_t npushed = 0;
110        NodeArray all_nodes[nthread];
111        for(auto & nodes : all_nodes) {
112                Random rand(rdtscl());
113                for(auto & node : nodes.array) {
114                        auto r = rand.next() % 100;
115                        if(r < fill) {
116                                node = new Node(rand.next() % 100);
117                                nnodes++;
118                        } else {
119                                node = nullptr;
120                        }
121                }
122
123                for(int i = 0; i < 10; i++) {
124                        int idx = rand.next() % nodes_per_threads;
125                        if (auto node = nodes.array[idx]) {
126                                global.crc_in += node->value;
127                                list.push(node);
128                                npushed++;
129                                nodes.array[idx] = nullptr;
130                        }
131                }
132        }
133
134        std::cout << nnodes << " nodes " << fill << "% (" << npushed << " pushed)" << std::endl;
135
136        enable_stats = true;
137
138        std::thread * threads[nthread];
139        unsigned i = 1;
140        for(auto & t : threads) {
141                auto & my_nodes = all_nodes[i - 1].array;
142                t = new std::thread([&done, &list, &barrier, &global, &my_nodes](unsigned tid) {
143                        Random rand(tid + rdtscl());
144
145                        local_stat_t local;
146
147                        // affinity(tid);
148
149                        barrier.wait(tid);
150
151                        // EXPERIMENT START
152
153                        run_body(done, rand, my_nodes, local, list);
154
155                        // EXPERIMENT END
156
157                        barrier.wait(tid);
158
159                        global.in    += local.in;
160                        global.out   += local.out;
161                        global.empty += local.empty;
162
163                        for(auto node : my_nodes) {
164                                delete node;
165                        }
166
167                        global.crc_in  += local.crc_in;
168                        global.crc_out += local.crc_out;
169
170                        global.pick.push.attempt += relaxed_list<Node>::tls.pick.push.attempt;
171                        global.pick.push.success += relaxed_list<Node>::tls.pick.push.success;
172                        global.pick.pop .attempt += relaxed_list<Node>::tls.pick.pop.attempt;
173                        global.pick.pop .success += relaxed_list<Node>::tls.pick.pop.success;
174                }, i++);
175        }
176
177        std::cout << "Starting" << std::endl;
178        auto before = Clock::now();
179        barrier.wait(0);
180
181        while(true) {
182                usleep(100000);
183                auto now = Clock::now();
184                duration_t durr = now - before;
185                if( durr.count() > duration ) {
186                        done = true;
187                        break;
188                }
189                std::cout << "\r" << std::setprecision(4) << durr.count();
190                std::cout.flush();
191        }
192
193        barrier.wait(0);
194        auto after = Clock::now();
195        duration_t durr = after - before;
196        duration = durr.count();
197        std::cout << "\rClosing down" << std::endl;
198
199        for(auto t : threads) {
200                t->join();
201                delete t;
202        }
203
204        enable_stats = false;
205
206        while(auto node = list.pop()) {
207                global.crc_out += node->value;
208                delete node;
209        }
210
211        assert(Node::creates == Node::destroys);
212        assert(global.crc_in == global.crc_out);
213
214        std::cout << "Done" << std::endl;
215
216        size_t ops = global.in + global.out;
217        size_t ops_sec = size_t(double(ops) / duration);
218        size_t ops_thread = ops_sec / nthread;
219        auto dur_nano = duration_cast<std::nano>(1.0);
220
221        std::cout << "Duration      : " << duration << "s\n";
222        std::cout << "ns/Op         : " << ( dur_nano / ops_thread )<< "\n";
223        std::cout << "Ops/sec/thread: " << ops_thread << "\n";
224        std::cout << "Ops/sec       : " << ops_sec << "\n";
225        std::cout << "Total ops     : " << ops << "(" << global.in << "i, " << global.out << "o, " << global.empty << "e)\n";
226        #ifndef NO_STATS
227                double push_sur = (100.0 * double(global.pick.push.success) / global.pick.push.attempt);
228                double pop_sur  = (100.0 * double(global.pick.pop .success) / global.pick.pop .attempt);
229                std::cout << "Push Pick %   : " << push_sur << "(" << global.pick.push.success << " / " << global.pick.push.attempt << ")\n";
230                std::cout << "Pop  Pick %   : " << pop_sur  << "(" << global.pick.pop .success << " / " << global.pick.pop .attempt << ")\n";
231        #endif
232}
233
234void usage(char * argv[]) {
235        std::cerr << argv[0] << ": [DURATION (FLOAT:SEC)] [NTHREADS] [NQUEUES] [FILL]" << std::endl;;
236        std::exit(1);
237}
238
239int main(int argc, char * argv[]) {
240
241        double duration   = 5.0;
242        unsigned nthreads = 2;
243        unsigned nqueues  = 2;
244        unsigned fill     = 100;
245
246        std::cout.imbue(std::locale(""));
247
248        switch (argc)
249        {
250        case 5:
251                fill = std::stoul(argv[4]);
252                [[fallthrough]];
253        case 4:
254                nqueues = std::stoul(argv[3]);
255                [[fallthrough]];
256        case 3:
257                nthreads = std::stoul(argv[2]);
258                [[fallthrough]];
259        case 2:
260                duration = std::stod(argv[1]);
261                if( duration <= 0.0 ) {
262                        std::cerr << "Duration must be positive, was " << argv[1] << "(" << duration << ")" << std::endl;
263                        usage(argv);
264                }
265                [[fallthrough]];
266        case 1:
267                break;
268        default:
269                usage(argv);
270                break;
271        }
272
273        check_cache_line_size();
274
275        std::cout << "Running " << nthreads << " threads (" << (nthreads * nqueues) << " queues) for " << duration << " seconds" << std::endl;
276        run(nthreads, nqueues, fill, duration);
277
278        return 0;
279}
280
281template<>
282thread_local relaxed_list<Node>::TLS relaxed_list<Node>::tls = {};
283
284template<>
285relaxed_list<Node>::intrusive_queue_t::stat::Dif relaxed_list<Node>::intrusive_queue_t::stat::dif = {};
286
287const char * __my_progname = "Relaxed List";
Note: See TracBrowser for help on using the repository browser.