source: doc/theses/thierry_delisle_PhD/code/relaxed_list.cpp @ 1e24d13

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationjenkins-sandboxnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 1e24d13 was 1e24d13, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Fixed stupid typo

  • Property mode set to 100644
File size: 6.5 KB
Line 
1#include "relaxed_list.hpp"
2
3#include <array>
4#include <iostream>
5#include <locale>
6#include <string>
7#include <thread>
8#include <vector>
9
10#include <unistd.h>
11#include <sys/sysinfo.h>
12
13#include "utils.hpp"
14
15struct __attribute__((aligned(64))) Node {
16        static std::atomic_size_t creates;
17        static std::atomic_size_t destroys;
18
19        _LinksFields_t<Node> _links;
20
21        int value;
22        Node(int value): value(value) {
23                creates++;
24        }
25
26        ~Node() {
27                destroys++;
28        }
29};
30
31std::atomic_size_t Node::creates  = { 0 };
32std::atomic_size_t Node::destroys = { 0 };
33
34static const constexpr int nodes_per_threads = 128;
35struct NodeArray {
36        __attribute__((aligned(64))) Node * array[nodes_per_threads];
37        __attribute__((aligned(64))) char pad;
38};
39
40bool enable_stats = false;
41
42struct local_stat_t {
43        size_t in  = 0;
44        size_t out = 0;
45        size_t empty = 0;
46        size_t crc_in  = 0;
47        size_t crc_out = 0;
48};
49
50__attribute__((noinline)) void run_body(
51        std::atomic<bool>& done,
52        Random & rand,
53        Node * (&my_nodes)[128],
54        local_stat_t & local,
55        relaxed_list<Node> & list
56) {
57        while(__builtin_expect(!done.load(std::memory_order_relaxed), true)) {
58                int idx = rand.next() % nodes_per_threads;
59                if (auto node = my_nodes[idx]) {
60                        local.crc_in += node->value;
61                        list.push(node);
62                        my_nodes[idx] = nullptr;
63                        local.in++;
64                }
65                else if(auto node = list.pop()) {
66                        local.crc_out += node->value;
67                        my_nodes[idx] = node;
68                        local.out++;
69                }
70                else {
71                        local.empty++;
72                }
73        }
74}
75
76void run(unsigned nthread, unsigned nqueues, unsigned fill, double duration) {
77        // List being tested
78        relaxed_list<Node> list = { nthread * nqueues };
79
80        // Barrier for synchronization
81        barrier_t barrier(nthread + 1);
82
83        // Data to check everything is OK
84        struct {
85                std::atomic_size_t in  = { 0 };
86                std::atomic_size_t out = { 0 };
87                std::atomic_size_t empty = { 0 };
88                std::atomic_size_t crc_in  = { 0 };
89                std::atomic_size_t crc_out = { 0 };
90                struct {
91                        struct {
92                                std::atomic_size_t attempt = { 0 };
93                                std::atomic_size_t success = { 0 };
94                        } push;
95                        struct {
96                                std::atomic_size_t attempt = { 0 };
97                                std::atomic_size_t success = { 0 };
98                        } pop;
99                } pick;
100        } global;
101
102        // Flag to signal termination
103        std::atomic_bool done  = { false };
104
105        // Prep nodes
106        std::cout << "Initializing" << std::endl;
107        NodeArray all_nodes[nthread];
108        for(auto & nodes : all_nodes) {
109                Random rand(rdtscl());
110                for(auto & node : nodes.array) {
111                        auto r = rand.next() % 100;
112                        if(r < fill)
113                                node = new Node(rand.next() % 100);
114                        else
115                                node = nullptr;
116                }
117
118                for(int i = 0; i < 10; i++) {
119                        int idx = rand.next() % nodes_per_threads;
120                        if (auto node = nodes.array[idx]) {
121                                global.crc_in += node->value;
122                                list.push(node);
123                                nodes.array[idx] = nullptr;
124                        }
125                }
126        }
127
128        enable_stats = true;
129
130        std::thread * threads[nthread];
131        unsigned i = 1;
132        for(auto & t : threads) {
133                auto & my_nodes = all_nodes[i - 1].array;
134                t = new std::thread([&done, &list, &barrier, &global, &my_nodes](unsigned tid) {
135                        Random rand(tid + rdtscl());
136
137                        local_stat_t local;
138
139                        // affinity(tid);
140
141                        barrier.wait(tid);
142
143                        // EXPERIMENT START
144
145                        run_body(done, rand, my_nodes, local, list);
146
147                        // EXPERIMENT END
148
149                        barrier.wait(tid);
150
151                        global.in    += local.in;
152                        global.out   += local.out;
153                        global.empty += local.empty;
154
155                        for(auto node : my_nodes) {
156                                delete node;
157                        }
158
159                        global.crc_in  += local.crc_in;
160                        global.crc_out += local.crc_out;
161
162                        global.pick.push.attempt += relaxed_list<Node>::tls.pick.push.attempt;
163                        global.pick.push.success += relaxed_list<Node>::tls.pick.push.success;
164                        global.pick.pop .attempt += relaxed_list<Node>::tls.pick.pop.attempt;
165                        global.pick.pop .success += relaxed_list<Node>::tls.pick.pop.success;
166                }, i++);
167        }
168
169        std::cout << "Starting" << std::endl;
170        auto before = Clock::now();
171        barrier.wait(0);
172
173        while(true) {
174                usleep(100000);
175                auto now = Clock::now();
176                duration_t durr = now - before;
177                if( durr.count() > duration ) {
178                        done = true;
179                        break;
180                }
181                std::cout << "\r" << durr.count() << "/" << duration;
182                std::cout.flush();
183        }
184
185        barrier.wait(0);
186        auto after = Clock::now();
187        duration_t durr = after - before;
188        duration = durr.count();
189        std::cout << "\nClosing down" << std::endl;
190
191        for(auto t : threads) {
192                t->join();
193                delete t;
194        }
195
196        enable_stats = false;
197
198        while(auto node = list.pop()) {
199                global.crc_out += node->value;
200                delete node;
201        }
202
203        assert(Node::creates == Node::destroys);
204        assert(global.crc_in == global.crc_out);
205
206        std::cout << "Done" << std::endl;
207
208        size_t ops = global.in + global.out;
209        size_t ops_sec = size_t(double(ops) / duration);
210        size_t ops_thread = ops_sec / nthread;
211        auto dur_nano = duration_cast<std::nano>(1.0);
212
213        std::cout << "Duration      : " << duration << "s\n";
214        std::cout << "ns/Op         : " << ( dur_nano / ops_thread )<< "\n";
215        std::cout << "Ops/sec/thread: " << ops_thread << "\n";
216        std::cout << "Ops/sec       : " << ops_sec << "\n";
217        std::cout << "Total ops     : " << ops << "(" << global.in << "i, " << global.out << "o, " << global.empty << "e)\n";
218        #ifndef NO_STATS
219                double push_sur = (100.0 * double(global.pick.push.success) / global.pick.push.attempt);
220                double pop_sur  = (100.0 * double(global.pick.pop .success) / global.pick.pop .attempt);
221                std::cout << "Push Pick %   : " << push_sur << "(" << global.pick.push.success << " / " << global.pick.push.attempt << ")\n";
222                std::cout << "Pop  Pick %   : " << pop_sur  << "(" << global.pick.pop .success << " / " << global.pick.pop .attempt << ")\n";
223        #endif
224}
225
226void usage(char * argv[]) {
227        std::cerr << argv[0] << ": [DURATION (FLOAT:SEC)] [NTHREADS] [NQUEUES] [FILL]" << std::endl;;
228        std::exit(1);
229}
230
231int main(int argc, char * argv[]) {
232
233        double duration   = 5.0;
234        unsigned nthreads = 2;
235        unsigned nqueues  = 2;
236        unsigned fill     = 100;
237
238        std::cout.imbue(std::locale(""));
239
240        switch (argc)
241        {
242        case 5:
243                nqueues = std::stoul(argv[4]);
244                [[fallthrough]];
245        case 4:
246                nqueues = std::stoul(argv[3]);
247                [[fallthrough]];
248        case 3:
249                nthreads = std::stoul(argv[2]);
250                [[fallthrough]];
251        case 2:
252                duration = std::stod(argv[1]);
253                if( duration <= 0.0 ) {
254                        std::cerr << "Duration must be positive, was " << argv[1] << "(" << duration << ")" << std::endl;
255                        usage(argv);
256                }
257                [[fallthrough]];
258        case 1:
259                break;
260        default:
261                usage(argv);
262                break;
263        }
264
265        check_cache_line_size();
266
267        std::cout << "Running " << nthreads << " threads (" << (nthreads * nqueues) << " queues) for " << duration << " seconds" << std::endl;
268        run(nthreads, nqueues, fill, duration);
269
270        return 0;
271}
272
273template<>
274thread_local relaxed_list<Node>::TLS relaxed_list<Node>::tls = {};
275
276template<>
277relaxed_list<Node>::intrusive_queue_t::stat::Dif relaxed_list<Node>::intrusive_queue_t::stat::dif = {};
278
279const char * __my_progname = "Relaxed List";
Note: See TracBrowser for help on using the repository browser.