source: doc/theses/thierry_delisle_PhD/code/relaxed_list.cpp@ 1e24d13

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation jenkins-sandbox new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since 1e24d13 was 1e24d13, checked in by Thierry Delisle <tdelisle@…>, 6 years ago

Fixed stupid typo

  • Property mode set to 100644
File size: 6.5 KB
Line 
1#include "relaxed_list.hpp"
2
3#include <array>
4#include <iostream>
5#include <locale>
6#include <string>
7#include <thread>
8#include <vector>
9
10#include <unistd.h>
11#include <sys/sysinfo.h>
12
13#include "utils.hpp"
14
15struct __attribute__((aligned(64))) Node {
16 static std::atomic_size_t creates;
17 static std::atomic_size_t destroys;
18
19 _LinksFields_t<Node> _links;
20
21 int value;
22 Node(int value): value(value) {
23 creates++;
24 }
25
26 ~Node() {
27 destroys++;
28 }
29};
30
31std::atomic_size_t Node::creates = { 0 };
32std::atomic_size_t Node::destroys = { 0 };
33
34static const constexpr int nodes_per_threads = 128;
35struct NodeArray {
36 __attribute__((aligned(64))) Node * array[nodes_per_threads];
37 __attribute__((aligned(64))) char pad;
38};
39
40bool enable_stats = false;
41
42struct local_stat_t {
43 size_t in = 0;
44 size_t out = 0;
45 size_t empty = 0;
46 size_t crc_in = 0;
47 size_t crc_out = 0;
48};
49
50__attribute__((noinline)) void run_body(
51 std::atomic<bool>& done,
52 Random & rand,
53 Node * (&my_nodes)[128],
54 local_stat_t & local,
55 relaxed_list<Node> & list
56) {
57 while(__builtin_expect(!done.load(std::memory_order_relaxed), true)) {
58 int idx = rand.next() % nodes_per_threads;
59 if (auto node = my_nodes[idx]) {
60 local.crc_in += node->value;
61 list.push(node);
62 my_nodes[idx] = nullptr;
63 local.in++;
64 }
65 else if(auto node = list.pop()) {
66 local.crc_out += node->value;
67 my_nodes[idx] = node;
68 local.out++;
69 }
70 else {
71 local.empty++;
72 }
73 }
74}
75
76void run(unsigned nthread, unsigned nqueues, unsigned fill, double duration) {
77 // List being tested
78 relaxed_list<Node> list = { nthread * nqueues };
79
80 // Barrier for synchronization
81 barrier_t barrier(nthread + 1);
82
83 // Data to check everything is OK
84 struct {
85 std::atomic_size_t in = { 0 };
86 std::atomic_size_t out = { 0 };
87 std::atomic_size_t empty = { 0 };
88 std::atomic_size_t crc_in = { 0 };
89 std::atomic_size_t crc_out = { 0 };
90 struct {
91 struct {
92 std::atomic_size_t attempt = { 0 };
93 std::atomic_size_t success = { 0 };
94 } push;
95 struct {
96 std::atomic_size_t attempt = { 0 };
97 std::atomic_size_t success = { 0 };
98 } pop;
99 } pick;
100 } global;
101
102 // Flag to signal termination
103 std::atomic_bool done = { false };
104
105 // Prep nodes
106 std::cout << "Initializing" << std::endl;
107 NodeArray all_nodes[nthread];
108 for(auto & nodes : all_nodes) {
109 Random rand(rdtscl());
110 for(auto & node : nodes.array) {
111 auto r = rand.next() % 100;
112 if(r < fill)
113 node = new Node(rand.next() % 100);
114 else
115 node = nullptr;
116 }
117
118 for(int i = 0; i < 10; i++) {
119 int idx = rand.next() % nodes_per_threads;
120 if (auto node = nodes.array[idx]) {
121 global.crc_in += node->value;
122 list.push(node);
123 nodes.array[idx] = nullptr;
124 }
125 }
126 }
127
128 enable_stats = true;
129
130 std::thread * threads[nthread];
131 unsigned i = 1;
132 for(auto & t : threads) {
133 auto & my_nodes = all_nodes[i - 1].array;
134 t = new std::thread([&done, &list, &barrier, &global, &my_nodes](unsigned tid) {
135 Random rand(tid + rdtscl());
136
137 local_stat_t local;
138
139 // affinity(tid);
140
141 barrier.wait(tid);
142
143 // EXPERIMENT START
144
145 run_body(done, rand, my_nodes, local, list);
146
147 // EXPERIMENT END
148
149 barrier.wait(tid);
150
151 global.in += local.in;
152 global.out += local.out;
153 global.empty += local.empty;
154
155 for(auto node : my_nodes) {
156 delete node;
157 }
158
159 global.crc_in += local.crc_in;
160 global.crc_out += local.crc_out;
161
162 global.pick.push.attempt += relaxed_list<Node>::tls.pick.push.attempt;
163 global.pick.push.success += relaxed_list<Node>::tls.pick.push.success;
164 global.pick.pop .attempt += relaxed_list<Node>::tls.pick.pop.attempt;
165 global.pick.pop .success += relaxed_list<Node>::tls.pick.pop.success;
166 }, i++);
167 }
168
169 std::cout << "Starting" << std::endl;
170 auto before = Clock::now();
171 barrier.wait(0);
172
173 while(true) {
174 usleep(100000);
175 auto now = Clock::now();
176 duration_t durr = now - before;
177 if( durr.count() > duration ) {
178 done = true;
179 break;
180 }
181 std::cout << "\r" << durr.count() << "/" << duration;
182 std::cout.flush();
183 }
184
185 barrier.wait(0);
186 auto after = Clock::now();
187 duration_t durr = after - before;
188 duration = durr.count();
189 std::cout << "\nClosing down" << std::endl;
190
191 for(auto t : threads) {
192 t->join();
193 delete t;
194 }
195
196 enable_stats = false;
197
198 while(auto node = list.pop()) {
199 global.crc_out += node->value;
200 delete node;
201 }
202
203 assert(Node::creates == Node::destroys);
204 assert(global.crc_in == global.crc_out);
205
206 std::cout << "Done" << std::endl;
207
208 size_t ops = global.in + global.out;
209 size_t ops_sec = size_t(double(ops) / duration);
210 size_t ops_thread = ops_sec / nthread;
211 auto dur_nano = duration_cast<std::nano>(1.0);
212
213 std::cout << "Duration : " << duration << "s\n";
214 std::cout << "ns/Op : " << ( dur_nano / ops_thread )<< "\n";
215 std::cout << "Ops/sec/thread: " << ops_thread << "\n";
216 std::cout << "Ops/sec : " << ops_sec << "\n";
217 std::cout << "Total ops : " << ops << "(" << global.in << "i, " << global.out << "o, " << global.empty << "e)\n";
218 #ifndef NO_STATS
219 double push_sur = (100.0 * double(global.pick.push.success) / global.pick.push.attempt);
220 double pop_sur = (100.0 * double(global.pick.pop .success) / global.pick.pop .attempt);
221 std::cout << "Push Pick % : " << push_sur << "(" << global.pick.push.success << " / " << global.pick.push.attempt << ")\n";
222 std::cout << "Pop Pick % : " << pop_sur << "(" << global.pick.pop .success << " / " << global.pick.pop .attempt << ")\n";
223 #endif
224}
225
226void usage(char * argv[]) {
227 std::cerr << argv[0] << ": [DURATION (FLOAT:SEC)] [NTHREADS] [NQUEUES] [FILL]" << std::endl;;
228 std::exit(1);
229}
230
231int main(int argc, char * argv[]) {
232
233 double duration = 5.0;
234 unsigned nthreads = 2;
235 unsigned nqueues = 2;
236 unsigned fill = 100;
237
238 std::cout.imbue(std::locale(""));
239
240 switch (argc)
241 {
242 case 5:
243 nqueues = std::stoul(argv[4]);
244 [[fallthrough]];
245 case 4:
246 nqueues = std::stoul(argv[3]);
247 [[fallthrough]];
248 case 3:
249 nthreads = std::stoul(argv[2]);
250 [[fallthrough]];
251 case 2:
252 duration = std::stod(argv[1]);
253 if( duration <= 0.0 ) {
254 std::cerr << "Duration must be positive, was " << argv[1] << "(" << duration << ")" << std::endl;
255 usage(argv);
256 }
257 [[fallthrough]];
258 case 1:
259 break;
260 default:
261 usage(argv);
262 break;
263 }
264
265 check_cache_line_size();
266
267 std::cout << "Running " << nthreads << " threads (" << (nthreads * nqueues) << " queues) for " << duration << " seconds" << std::endl;
268 run(nthreads, nqueues, fill, duration);
269
270 return 0;
271}
272
273template<>
274thread_local relaxed_list<Node>::TLS relaxed_list<Node>::tls = {};
275
276template<>
277relaxed_list<Node>::intrusive_queue_t::stat::Dif relaxed_list<Node>::intrusive_queue_t::stat::dif = {};
278
279const char * __my_progname = "Relaxed List";
Note: See TracBrowser for help on using the repository browser.