source: doc/theses/thierry_delisle_PhD/code/relaxed_list.cpp@ 50aeb6f

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation jenkins-sandbox new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since 50aeb6f was 50aeb6f, checked in by Thierry Delisle <tdelisle@…>, 6 years ago

Small tweaks to the memory layout

  • Property mode set to 100644
File size: 6.5 KB
Line 
1#include "relaxed_list.hpp"
2
3#include <array>
4#include <iostream>
5#include <locale>
6#include <string>
7#include <thread>
8#include <vector>
9
10#include <unistd.h>
11#include <sys/sysinfo.h>
12
13#include "utils.hpp"
14
15struct __attribute__((aligned(64))) Node {
16 static std::atomic_size_t creates;
17 static std::atomic_size_t destroys;
18
19 _LinksFields_t<Node> _links;
20
21 int value;
22 Node(int value): value(value) {
23 creates++;
24 }
25
26 ~Node() {
27 destroys++;
28 }
29};
30
31std::atomic_size_t Node::creates = { 0 };
32std::atomic_size_t Node::destroys = { 0 };
33
34static const constexpr int nodes_per_threads = 128;
35struct NodeArray {
36 __attribute__((aligned(64))) Node * array[nodes_per_threads];
37 __attribute__((aligned(64))) char pad;
38};
39
40bool enable_stats = false;
41
42struct local_stat_t {
43 size_t in = 0;
44 size_t out = 0;
45 size_t empty = 0;
46 size_t crc_in = 0;
47 size_t crc_out = 0;
48};
49
50__attribute__((noinline)) void run_body(
51 std::atomic<bool>& done,
52 Random & rand,
53 Node * (&my_nodes)[128],
54 local_stat_t & local,
55 relaxed_list<Node> & list
56) {
57 while(__builtin_expect(!done.load(std::memory_order_relaxed), true)) {
58 int idx = rand.next() % nodes_per_threads;
59 if (auto node = my_nodes[idx]) {
60 local.crc_in += node->value;
61 list.push(node);
62 my_nodes[idx] = nullptr;
63 local.in++;
64 }
65 else if(auto node = list.pop()) {
66 local.crc_out += node->value;
67 my_nodes[idx] = node;
68 local.out++;
69 }
70 else {
71 local.empty++;
72 }
73 }
74}
75
76void run(unsigned nthread, unsigned nqueues, unsigned fill, double duration) {
77 // List being tested
78 relaxed_list<Node> list = { nthread * nqueues };
79
80 // Barrier for synchronization
81 barrier_t barrier(nthread + 1);
82
83 // Data to check everything is OK
84 struct {
85 std::atomic_size_t in = { 0 };
86 std::atomic_size_t out = { 0 };
87 std::atomic_size_t empty = { 0 };
88 std::atomic_size_t crc_in = { 0 };
89 std::atomic_size_t crc_out = { 0 };
90 struct {
91 struct {
92 std::atomic_size_t attempt = { 0 };
93 std::atomic_size_t success = { 0 };
94 } push;
95 struct {
96 std::atomic_size_t attempt = { 0 };
97 std::atomic_size_t success = { 0 };
98 } pop;
99 } pick;
100 } global;
101
102 // Flag to signal termination
103 std::atomic_bool done = { false };
104
105 // Prep nodes
106 std::cout << "Initializing" << std::endl;
107 NodeArray all_nodes[nthread];
108 for(auto & nodes : all_nodes) {
109 Random rand(rdtscl());
110 for(auto & node : nodes.array) {
111 auto r = rand.next() % 100;
112 if(r < fill)
113 node = new Node(rand.next() % 100);
114 }
115
116 for(int i = 0; i < 10; i++) {
117 int idx = rand.next() % nodes_per_threads;
118 if (auto node = nodes.array[idx]) {
119 global.crc_in += node->value;
120 list.push(node);
121 nodes.array[idx] = nullptr;
122 }
123 }
124 }
125
126 enable_stats = true;
127
128 std::thread * threads[nthread];
129 unsigned i = 1;
130 for(auto & t : threads) {
131 auto & my_nodes = all_nodes[i - 1].array;
132 t = new std::thread([&done, &list, &barrier, &global, &my_nodes](unsigned tid) {
133 Random rand(tid + rdtscl());
134
135 local_stat_t local;
136
137 // affinity(tid);
138
139 barrier.wait(tid);
140
141 // EXPERIMENT START
142
143 run_body(done, rand, my_nodes, local, list);
144
145 // EXPERIMENT END
146
147 barrier.wait(tid);
148
149 global.in += local.in;
150 global.out += local.out;
151 global.empty += local.empty;
152
153 for(auto node : my_nodes) {
154 delete node;
155 }
156
157 global.crc_in += local.crc_in;
158 global.crc_out += local.crc_out;
159
160 global.pick.push.attempt += relaxed_list<Node>::tls.pick.push.attempt;
161 global.pick.push.success += relaxed_list<Node>::tls.pick.push.success;
162 global.pick.pop .attempt += relaxed_list<Node>::tls.pick.pop.attempt;
163 global.pick.pop .success += relaxed_list<Node>::tls.pick.pop.success;
164 }, i++);
165 }
166
167 std::cout << "Starting" << std::endl;
168 auto before = Clock::now();
169 barrier.wait(0);
170
171 while(true) {
172 usleep(100000);
173 auto now = Clock::now();
174 duration_t durr = now - before;
175 if( durr.count() > duration ) {
176 done = true;
177 break;
178 }
179 std::cout << "\r" << durr.count();
180 std::cout.flush();
181 }
182
183 barrier.wait(0);
184 auto after = Clock::now();
185 duration_t durr = after - before;
186 duration = durr.count();
187 std::cout << "\nClosing down" << std::endl;
188
189 for(auto t : threads) {
190 t->join();
191 delete t;
192 }
193
194 enable_stats = false;
195
196 while(auto node = list.pop()) {
197 global.crc_out += node->value;
198 delete node;
199 }
200
201 assert(Node::creates == Node::destroys);
202 assert(global.crc_in == global.crc_out);
203
204 std::cout << "Done" << std::endl;
205
206 size_t ops = global.in + global.out;
207 size_t ops_sec = size_t(double(ops) / duration);
208 size_t ops_thread = ops_sec / nthread;
209 auto dur_nano = duration_cast<std::nano>(1.0);
210
211 std::cout << "Duration : " << duration << "s\n";
212 std::cout << "ns/Op : " << ( dur_nano / ops_thread )<< "\n";
213 std::cout << "Ops/sec/thread: " << ops_thread << "\n";
214 std::cout << "Ops/sec : " << ops_sec << "\n";
215 std::cout << "Total ops : " << ops << "(" << global.in << "i, " << global.out << "o, " << global.empty << "e)\n";
216 #ifndef NO_STATS
217 double push_sur = (100.0 * double(global.pick.push.success) / global.pick.push.attempt);
218 double pop_sur = (100.0 * double(global.pick.pop .success) / global.pick.pop .attempt);
219 std::cout << "Push Pick % : " << push_sur << "(" << global.pick.push.success << " / " << global.pick.push.attempt << ")\n";
220 std::cout << "Pop Pick % : " << pop_sur << "(" << global.pick.pop .success << " / " << global.pick.pop .attempt << ")\n";
221 #endif
222}
223
224void usage(char * argv[]) {
225 std::cerr << argv[0] << ": [DURATION (FLOAT:SEC)] [NTHREADS] [NQUEUES] [FILL]" << std::endl;;
226 std::exit(1);
227}
228
229int main(int argc, char * argv[]) {
230
231 double duration = 5.0;
232 unsigned nthreads = 2;
233 unsigned nqueues = 2;
234 unsigned fill = 100;
235
236 std::cout.imbue(std::locale(""));
237
238 switch (argc)
239 {
240 case 5:
241 nqueues = std::stoul(argv[4]);
242 [[fallthrough]];
243 case 4:
244 nqueues = std::stoul(argv[3]);
245 [[fallthrough]];
246 case 3:
247 nthreads = std::stoul(argv[2]);
248 [[fallthrough]];
249 case 2:
250 duration = std::stod(argv[1]);
251 if( duration <= 0.0 ) {
252 std::cerr << "Duration must be positive, was " << argv[1] << "(" << duration << ")" << std::endl;
253 usage(argv);
254 }
255 [[fallthrough]];
256 case 1:
257 break;
258 default:
259 usage(argv);
260 break;
261 }
262
263 check_cache_line_size();
264
265 std::cout << "Running " << nthreads << " threads (" << (nthreads * nqueues) << " queues) for " << duration << " seconds" << std::endl;
266 run(nthreads, nqueues, duration, fill);
267
268 return 0;
269}
270
271template<>
272thread_local relaxed_list<Node>::TLS relaxed_list<Node>::tls = {};
273
274template<>
275relaxed_list<Node>::intrusive_queue_t::stat::Dif relaxed_list<Node>::intrusive_queue_t::stat::dif = {};
276
277const char * __my_progname = "Relaxed List";
Note: See TracBrowser for help on using the repository browser.