source: doc/theses/thierry_delisle_PhD/code/relaxed_list.cpp @ 50aeb6f

arm-ehjacob/cs343-translationjenkins-sandboxnew-astnew-ast-unique-expr
Last change on this file since 50aeb6f was 50aeb6f, checked in by Thierry Delisle <tdelisle@…>, 2 years ago

Small tweaks to the memory layout

  • Property mode set to 100644
File size: 6.5 KB
Line 
1#include "relaxed_list.hpp"
2
3#include <array>
4#include <iostream>
5#include <locale>
6#include <string>
7#include <thread>
8#include <vector>
9
10#include <unistd.h>
11#include <sys/sysinfo.h>
12
13#include "utils.hpp"
14
15struct __attribute__((aligned(64))) Node {
16        static std::atomic_size_t creates;
17        static std::atomic_size_t destroys;
18
19        _LinksFields_t<Node> _links;
20
21        int value;
22        Node(int value): value(value) {
23                creates++;
24        }
25
26        ~Node() {
27                destroys++;
28        }
29};
30
31std::atomic_size_t Node::creates  = { 0 };
32std::atomic_size_t Node::destroys = { 0 };
33
34static const constexpr int nodes_per_threads = 128;
35struct NodeArray {
36        __attribute__((aligned(64))) Node * array[nodes_per_threads];
37        __attribute__((aligned(64))) char pad;
38};
39
40bool enable_stats = false;
41
42struct local_stat_t {
43        size_t in  = 0;
44        size_t out = 0;
45        size_t empty = 0;
46        size_t crc_in  = 0;
47        size_t crc_out = 0;
48};
49
50__attribute__((noinline)) void run_body(
51        std::atomic<bool>& done,
52        Random & rand,
53        Node * (&my_nodes)[128],
54        local_stat_t & local,
55        relaxed_list<Node> & list
56) {
57        while(__builtin_expect(!done.load(std::memory_order_relaxed), true)) {
58                int idx = rand.next() % nodes_per_threads;
59                if (auto node = my_nodes[idx]) {
60                        local.crc_in += node->value;
61                        list.push(node);
62                        my_nodes[idx] = nullptr;
63                        local.in++;
64                }
65                else if(auto node = list.pop()) {
66                        local.crc_out += node->value;
67                        my_nodes[idx] = node;
68                        local.out++;
69                }
70                else {
71                        local.empty++;
72                }
73        }
74}
75
76void run(unsigned nthread, unsigned nqueues, unsigned fill, double duration) {
77        // List being tested
78        relaxed_list<Node> list = { nthread * nqueues };
79
80        // Barrier for synchronization
81        barrier_t barrier(nthread + 1);
82
83        // Data to check everything is OK
84        struct {
85                std::atomic_size_t in  = { 0 };
86                std::atomic_size_t out = { 0 };
87                std::atomic_size_t empty = { 0 };
88                std::atomic_size_t crc_in  = { 0 };
89                std::atomic_size_t crc_out = { 0 };
90                struct {
91                        struct {
92                                std::atomic_size_t attempt = { 0 };
93                                std::atomic_size_t success = { 0 };
94                        } push;
95                        struct {
96                                std::atomic_size_t attempt = { 0 };
97                                std::atomic_size_t success = { 0 };
98                        } pop;
99                } pick;
100        } global;
101
102        // Flag to signal termination
103        std::atomic_bool done  = { false };
104
105        // Prep nodes
106        std::cout << "Initializing" << std::endl;
107        NodeArray all_nodes[nthread];
108        for(auto & nodes : all_nodes) {
109                Random rand(rdtscl());
110                for(auto & node : nodes.array) {
111                        auto r = rand.next() % 100;
112                        if(r < fill)
113                                node = new Node(rand.next() % 100);
114                }
115
116                for(int i = 0; i < 10; i++) {
117                        int idx = rand.next() % nodes_per_threads;
118                        if (auto node = nodes.array[idx]) {
119                                global.crc_in += node->value;
120                                list.push(node);
121                                nodes.array[idx] = nullptr;
122                        }
123                }
124        }
125
126        enable_stats = true;
127
128        std::thread * threads[nthread];
129        unsigned i = 1;
130        for(auto & t : threads) {
131                auto & my_nodes = all_nodes[i - 1].array;
132                t = new std::thread([&done, &list, &barrier, &global, &my_nodes](unsigned tid) {
133                        Random rand(tid + rdtscl());
134
135                        local_stat_t local;
136
137                        // affinity(tid);
138
139                        barrier.wait(tid);
140
141                        // EXPERIMENT START
142
143                        run_body(done, rand, my_nodes, local, list);
144
145                        // EXPERIMENT END
146
147                        barrier.wait(tid);
148
149                        global.in    += local.in;
150                        global.out   += local.out;
151                        global.empty += local.empty;
152
153                        for(auto node : my_nodes) {
154                                delete node;
155                        }
156
157                        global.crc_in  += local.crc_in;
158                        global.crc_out += local.crc_out;
159
160                        global.pick.push.attempt += relaxed_list<Node>::tls.pick.push.attempt;
161                        global.pick.push.success += relaxed_list<Node>::tls.pick.push.success;
162                        global.pick.pop .attempt += relaxed_list<Node>::tls.pick.pop.attempt;
163                        global.pick.pop .success += relaxed_list<Node>::tls.pick.pop.success;
164                }, i++);
165        }
166
167        std::cout << "Starting" << std::endl;
168        auto before = Clock::now();
169        barrier.wait(0);
170
171        while(true) {
172                usleep(100000);
173                auto now = Clock::now();
174                duration_t durr = now - before;
175                if( durr.count() > duration ) {
176                        done = true;
177                        break;
178                }
179                std::cout << "\r" << durr.count();
180                std::cout.flush();
181        }
182
183        barrier.wait(0);
184        auto after = Clock::now();
185        duration_t durr = after - before;
186        duration = durr.count();
187        std::cout << "\nClosing down" << std::endl;
188
189        for(auto t : threads) {
190                t->join();
191                delete t;
192        }
193
194        enable_stats = false;
195
196        while(auto node = list.pop()) {
197                global.crc_out += node->value;
198                delete node;
199        }
200
201        assert(Node::creates == Node::destroys);
202        assert(global.crc_in == global.crc_out);
203
204        std::cout << "Done" << std::endl;
205
206        size_t ops = global.in + global.out;
207        size_t ops_sec = size_t(double(ops) / duration);
208        size_t ops_thread = ops_sec / nthread;
209        auto dur_nano = duration_cast<std::nano>(1.0);
210
211        std::cout << "Duration      : " << duration << "s\n";
212        std::cout << "ns/Op         : " << ( dur_nano / ops_thread )<< "\n";
213        std::cout << "Ops/sec/thread: " << ops_thread << "\n";
214        std::cout << "Ops/sec       : " << ops_sec << "\n";
215        std::cout << "Total ops     : " << ops << "(" << global.in << "i, " << global.out << "o, " << global.empty << "e)\n";
216        #ifndef NO_STATS
217                double push_sur = (100.0 * double(global.pick.push.success) / global.pick.push.attempt);
218                double pop_sur  = (100.0 * double(global.pick.pop .success) / global.pick.pop .attempt);
219                std::cout << "Push Pick %   : " << push_sur << "(" << global.pick.push.success << " / " << global.pick.push.attempt << ")\n";
220                std::cout << "Pop  Pick %   : " << pop_sur  << "(" << global.pick.pop .success << " / " << global.pick.pop .attempt << ")\n";
221        #endif
222}
223
224void usage(char * argv[]) {
225        std::cerr << argv[0] << ": [DURATION (FLOAT:SEC)] [NTHREADS] [NQUEUES] [FILL]" << std::endl;;
226        std::exit(1);
227}
228
229int main(int argc, char * argv[]) {
230
231        double duration   = 5.0;
232        unsigned nthreads = 2;
233        unsigned nqueues  = 2;
234        unsigned fill     = 100;
235
236        std::cout.imbue(std::locale(""));
237
238        switch (argc)
239        {
240        case 5:
241                nqueues = std::stoul(argv[4]);
242                [[fallthrough]];
243        case 4:
244                nqueues = std::stoul(argv[3]);
245                [[fallthrough]];
246        case 3:
247                nthreads = std::stoul(argv[2]);
248                [[fallthrough]];
249        case 2:
250                duration = std::stod(argv[1]);
251                if( duration <= 0.0 ) {
252                        std::cerr << "Duration must be positive, was " << argv[1] << "(" << duration << ")" << std::endl;
253                        usage(argv);
254                }
255                [[fallthrough]];
256        case 1:
257                break;
258        default:
259                usage(argv);
260                break;
261        }
262
263        check_cache_line_size();
264
265        std::cout << "Running " << nthreads << " threads (" << (nthreads * nqueues) << " queues) for " << duration << " seconds" << std::endl;
266        run(nthreads, nqueues, duration, fill);
267
268        return 0;
269}
270
271template<>
272thread_local relaxed_list<Node>::TLS relaxed_list<Node>::tls = {};
273
274template<>
275relaxed_list<Node>::intrusive_queue_t::stat::Dif relaxed_list<Node>::intrusive_queue_t::stat::dif = {};
276
277const char * __my_progname = "Relaxed List";
Note: See TracBrowser for help on using the repository browser.