source: doc/theses/thierry_delisle_PhD/code/relaxed_list.cpp@ 5ee7d36

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation jenkins-sandbox new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since 5ee7d36 was c921712, checked in by Thierry Delisle <tdelisle@…>, 6 years ago

Fixed support for setting number of starting nodes

  • Property mode set to 100644
File size: 6.7 KB
Line 
1#include "relaxed_list.hpp"
2
3#include <array>
4#include <iomanip>
5#include <iostream>
6#include <locale>
7#include <string>
8#include <thread>
9#include <vector>
10
11#include <unistd.h>
12#include <sys/sysinfo.h>
13
14#include "utils.hpp"
15
16struct __attribute__((aligned(64))) Node {
17 static std::atomic_size_t creates;
18 static std::atomic_size_t destroys;
19
20 _LinksFields_t<Node> _links;
21
22 int value;
23 Node(int value): value(value) {
24 creates++;
25 }
26
27 ~Node() {
28 destroys++;
29 }
30};
31
32std::atomic_size_t Node::creates = { 0 };
33std::atomic_size_t Node::destroys = { 0 };
34
35static const constexpr int nodes_per_threads = 128;
36struct NodeArray {
37 __attribute__((aligned(64))) Node * array[nodes_per_threads];
38 __attribute__((aligned(64))) char pad;
39};
40
41bool enable_stats = false;
42
43struct local_stat_t {
44 size_t in = 0;
45 size_t out = 0;
46 size_t empty = 0;
47 size_t crc_in = 0;
48 size_t crc_out = 0;
49};
50
51__attribute__((noinline)) void run_body(
52 std::atomic<bool>& done,
53 Random & rand,
54 Node * (&my_nodes)[128],
55 local_stat_t & local,
56 relaxed_list<Node> & list
57) {
58 while(__builtin_expect(!done.load(std::memory_order_relaxed), true)) {
59 int idx = rand.next() % nodes_per_threads;
60 if (auto node = my_nodes[idx]) {
61 local.crc_in += node->value;
62 list.push(node);
63 my_nodes[idx] = nullptr;
64 local.in++;
65 }
66 else if(auto node = list.pop()) {
67 local.crc_out += node->value;
68 my_nodes[idx] = node;
69 local.out++;
70 }
71 else {
72 local.empty++;
73 }
74 }
75}
76
77void run(unsigned nthread, unsigned nqueues, unsigned fill, double duration) {
78 // List being tested
79 relaxed_list<Node> list = { nthread * nqueues };
80
81 // Barrier for synchronization
82 barrier_t barrier(nthread + 1);
83
84 // Data to check everything is OK
85 struct {
86 std::atomic_size_t in = { 0 };
87 std::atomic_size_t out = { 0 };
88 std::atomic_size_t empty = { 0 };
89 std::atomic_size_t crc_in = { 0 };
90 std::atomic_size_t crc_out = { 0 };
91 struct {
92 struct {
93 std::atomic_size_t attempt = { 0 };
94 std::atomic_size_t success = { 0 };
95 } push;
96 struct {
97 std::atomic_size_t attempt = { 0 };
98 std::atomic_size_t success = { 0 };
99 } pop;
100 } pick;
101 } global;
102
103 // Flag to signal termination
104 std::atomic_bool done = { false };
105
106 // Prep nodes
107 std::cout << "Initializing ";
108 size_t nnodes = 0;
109 size_t npushed = 0;
110 NodeArray all_nodes[nthread];
111 for(auto & nodes : all_nodes) {
112 Random rand(rdtscl());
113 for(auto & node : nodes.array) {
114 auto r = rand.next() % 100;
115 if(r < fill) {
116 node = new Node(rand.next() % 100);
117 nnodes++;
118 } else {
119 node = nullptr;
120 }
121 }
122
123 for(int i = 0; i < 10; i++) {
124 int idx = rand.next() % nodes_per_threads;
125 if (auto node = nodes.array[idx]) {
126 global.crc_in += node->value;
127 list.push(node);
128 npushed++;
129 nodes.array[idx] = nullptr;
130 }
131 }
132 }
133
134 std::cout << nnodes << " nodes " << fill << "% (" << npushed << " pushed)" << std::endl;
135
136 enable_stats = true;
137
138 std::thread * threads[nthread];
139 unsigned i = 1;
140 for(auto & t : threads) {
141 auto & my_nodes = all_nodes[i - 1].array;
142 t = new std::thread([&done, &list, &barrier, &global, &my_nodes](unsigned tid) {
143 Random rand(tid + rdtscl());
144
145 local_stat_t local;
146
147 // affinity(tid);
148
149 barrier.wait(tid);
150
151 // EXPERIMENT START
152
153 run_body(done, rand, my_nodes, local, list);
154
155 // EXPERIMENT END
156
157 barrier.wait(tid);
158
159 global.in += local.in;
160 global.out += local.out;
161 global.empty += local.empty;
162
163 for(auto node : my_nodes) {
164 delete node;
165 }
166
167 global.crc_in += local.crc_in;
168 global.crc_out += local.crc_out;
169
170 global.pick.push.attempt += relaxed_list<Node>::tls.pick.push.attempt;
171 global.pick.push.success += relaxed_list<Node>::tls.pick.push.success;
172 global.pick.pop .attempt += relaxed_list<Node>::tls.pick.pop.attempt;
173 global.pick.pop .success += relaxed_list<Node>::tls.pick.pop.success;
174 }, i++);
175 }
176
177 std::cout << "Starting" << std::endl;
178 auto before = Clock::now();
179 barrier.wait(0);
180
181 while(true) {
182 usleep(100000);
183 auto now = Clock::now();
184 duration_t durr = now - before;
185 if( durr.count() > duration ) {
186 done = true;
187 break;
188 }
189 std::cout << "\r" << std::setprecision(4) << durr.count();
190 std::cout.flush();
191 }
192
193 barrier.wait(0);
194 auto after = Clock::now();
195 duration_t durr = after - before;
196 duration = durr.count();
197 std::cout << "\rClosing down" << std::endl;
198
199 for(auto t : threads) {
200 t->join();
201 delete t;
202 }
203
204 enable_stats = false;
205
206 while(auto node = list.pop()) {
207 global.crc_out += node->value;
208 delete node;
209 }
210
211 assert(Node::creates == Node::destroys);
212 assert(global.crc_in == global.crc_out);
213
214 std::cout << "Done" << std::endl;
215
216 size_t ops = global.in + global.out;
217 size_t ops_sec = size_t(double(ops) / duration);
218 size_t ops_thread = ops_sec / nthread;
219 auto dur_nano = duration_cast<std::nano>(1.0);
220
221 std::cout << "Duration : " << duration << "s\n";
222 std::cout << "ns/Op : " << ( dur_nano / ops_thread )<< "\n";
223 std::cout << "Ops/sec/thread: " << ops_thread << "\n";
224 std::cout << "Ops/sec : " << ops_sec << "\n";
225 std::cout << "Total ops : " << ops << "(" << global.in << "i, " << global.out << "o, " << global.empty << "e)\n";
226 #ifndef NO_STATS
227 double push_sur = (100.0 * double(global.pick.push.success) / global.pick.push.attempt);
228 double pop_sur = (100.0 * double(global.pick.pop .success) / global.pick.pop .attempt);
229 std::cout << "Push Pick % : " << push_sur << "(" << global.pick.push.success << " / " << global.pick.push.attempt << ")\n";
230 std::cout << "Pop Pick % : " << pop_sur << "(" << global.pick.pop .success << " / " << global.pick.pop .attempt << ")\n";
231 #endif
232}
233
234void usage(char * argv[]) {
235 std::cerr << argv[0] << ": [DURATION (FLOAT:SEC)] [NTHREADS] [NQUEUES] [FILL]" << std::endl;;
236 std::exit(1);
237}
238
239int main(int argc, char * argv[]) {
240
241 double duration = 5.0;
242 unsigned nthreads = 2;
243 unsigned nqueues = 2;
244 unsigned fill = 100;
245
246 std::cout.imbue(std::locale(""));
247
248 switch (argc)
249 {
250 case 5:
251 fill = std::stoul(argv[4]);
252 [[fallthrough]];
253 case 4:
254 nqueues = std::stoul(argv[3]);
255 [[fallthrough]];
256 case 3:
257 nthreads = std::stoul(argv[2]);
258 [[fallthrough]];
259 case 2:
260 duration = std::stod(argv[1]);
261 if( duration <= 0.0 ) {
262 std::cerr << "Duration must be positive, was " << argv[1] << "(" << duration << ")" << std::endl;
263 usage(argv);
264 }
265 [[fallthrough]];
266 case 1:
267 break;
268 default:
269 usage(argv);
270 break;
271 }
272
273 check_cache_line_size();
274
275 std::cout << "Running " << nthreads << " threads (" << (nthreads * nqueues) << " queues) for " << duration << " seconds" << std::endl;
276 run(nthreads, nqueues, fill, duration);
277
278 return 0;
279}
280
281template<>
282thread_local relaxed_list<Node>::TLS relaxed_list<Node>::tls = {};
283
284template<>
285relaxed_list<Node>::intrusive_queue_t::stat::Dif relaxed_list<Node>::intrusive_queue_t::stat::dif = {};
286
287const char * __my_progname = "Relaxed List";
Note: See TracBrowser for help on using the repository browser.