source: doc/theses/thierry_delisle_PhD/code/relaxed_list.cpp@ 9421f3d8

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation jenkins-sandbox new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since 9421f3d8 was 9421f3d8, checked in by Thierry Delisle <tdelisle@…>, 6 years ago

Adding some of the implemented code. Current state: relaxed list is achieves at least 6M ops/sec total

  • Property mode set to 100644
File size: 14.9 KB
Line 
1#include "relaxed_list.hpp"
2
3#include <array>
4#include <iomanip>
5#include <iostream>
6#include <locale>
7#include <string>
8#include <thread>
9#include <vector>
10
11#include <getopt.h>
12#include <unistd.h>
13#include <sys/sysinfo.h>
14
15#include "utils.hpp"
16
17struct __attribute__((aligned(64))) Node {
18 static std::atomic_size_t creates;
19 static std::atomic_size_t destroys;
20
21 _LinksFields_t<Node> _links;
22
23 int value;
24
25 Node() { creates++; }
26 Node(int value): value(value) { creates++; }
27 ~Node() { destroys++; }
28};
29
30std::atomic_size_t Node::creates = { 0 };
31std::atomic_size_t Node::destroys = { 0 };
32
33bool enable_stats = false;
34
35template<>
36thread_local relaxed_list<Node>::TLS relaxed_list<Node>::tls = {};
37
38template<>
39relaxed_list<Node>::intrusive_queue_t::stat::Dif relaxed_list<Node>::intrusive_queue_t::stat::dif = {};
40
41// ================================================================================================
42// UTILS
43// ================================================================================================
44
45struct local_stat_t {
46 size_t in = 0;
47 size_t out = 0;
48 size_t empty = 0;
49 size_t crc_in = 0;
50 size_t crc_out = 0;
51};
52
53struct global_stat_t {
54 std::atomic_size_t in = { 0 };
55 std::atomic_size_t out = { 0 };
56 std::atomic_size_t empty = { 0 };
57 std::atomic_size_t crc_in = { 0 };
58 std::atomic_size_t crc_out = { 0 };
59 struct {
60 struct {
61 std::atomic_size_t attempt = { 0 };
62 std::atomic_size_t success = { 0 };
63 } push;
64 struct {
65 std::atomic_size_t attempt = { 0 };
66 std::atomic_size_t success = { 0 };
67 std::atomic_size_t mask_attempt = { 0 };
68 } pop;
69 } pick;
70 struct {
71 struct {
72 std::atomic_size_t value = { 0 };
73 std::atomic_size_t count = { 0 };
74 } push;
75 struct {
76 std::atomic_size_t value = { 0 };
77 std::atomic_size_t count = { 0 };
78 } pop;
79 } qstat;
80};
81
82void tally_stats(global_stat_t & global, local_stat_t & local) {
83 global.in += local.in;
84 global.out += local.out;
85 global.empty += local.empty;
86
87 global.crc_in += local.crc_in;
88 global.crc_out += local.crc_out;
89
90 global.pick.push.attempt += relaxed_list<Node>::tls.pick.push.attempt;
91 global.pick.push.success += relaxed_list<Node>::tls.pick.push.success;
92 global.pick.pop .attempt += relaxed_list<Node>::tls.pick.pop.attempt;
93 global.pick.pop .success += relaxed_list<Node>::tls.pick.pop.success;
94 global.pick.pop .mask_attempt += relaxed_list<Node>::tls.pick.pop.mask_attempt;
95
96 global.qstat.push.value += relaxed_list<Node>::tls.empty.push.value;
97 global.qstat.push.count += relaxed_list<Node>::tls.empty.push.count;
98 global.qstat.pop .value += relaxed_list<Node>::tls.empty.pop .value;
99 global.qstat.pop .count += relaxed_list<Node>::tls.empty.pop .count;
100}
101
102void waitfor(double & duration, barrier_t & barrier, std::atomic_bool & done) {
103 std::cout << "Starting" << std::endl;
104 auto before = Clock::now();
105 barrier.wait(0);
106
107 while(true) {
108 usleep(100000);
109 auto now = Clock::now();
110 duration_t durr = now - before;
111 if( durr.count() > duration ) {
112 done = true;
113 break;
114 }
115 std::cout << "\r" << std::setprecision(4) << durr.count();
116 std::cout.flush();
117 }
118
119 barrier.wait(0);
120 auto after = Clock::now();
121 duration_t durr = after - before;
122 duration = durr.count();
123 std::cout << "\rClosing down" << std::endl;
124}
125
126void print_stats(double duration, unsigned nthread, global_stat_t & global) {
127 assert(Node::creates == Node::destroys);
128 assert(global.crc_in == global.crc_out);
129
130 std::cout << "Done" << std::endl;
131
132 size_t ops = global.in + global.out;
133 size_t ops_sec = size_t(double(ops) / duration);
134 size_t ops_thread = ops_sec / nthread;
135 auto dur_nano = duration_cast<std::nano>(1.0);
136
137 std::cout << "Duration : " << duration << "s\n";
138 std::cout << "ns/Op : " << ( dur_nano / ops_thread )<< "\n";
139 std::cout << "Ops/sec/thread: " << ops_thread << "\n";
140 std::cout << "Ops/sec : " << ops_sec << "\n";
141 std::cout << "Total ops : " << ops << "(" << global.in << "i, " << global.out << "o, " << global.empty << "e)\n";
142 #ifndef NO_STATS
143 double push_sur = (100.0 * double(global.pick.push.success) / global.pick.push.attempt);
144 double pop_sur = (100.0 * double(global.pick.pop .success) / global.pick.pop .attempt);
145
146 std::cout << "Push Pick % : " << push_sur << "(" << global.pick.push.success << " / " << global.pick.push.attempt << ")\n";
147 std::cout << "Pop Pick % : " << pop_sur << "(" << global.pick.pop .success << " / " << global.pick.pop .attempt << ")\n";
148 std::cout << "Pop mask trys : " << global.pick.pop.mask_attempt << std::endl;
149
150 double avgQ_push = double(global.qstat.push.value) / global.qstat.push.count;
151 double avgQ_pop = double(global.qstat.pop .value) / global.qstat.pop .count;
152 double avgQ = double(global.qstat.push.value + global.qstat.pop .value) / (global.qstat.push.count + global.qstat.pop .count);
153 std::cout << "Push Avg Qs : " << avgQ_push << " (" << global.qstat.push.count << "ops)\n";
154 std::cout << "Pop Avg Qs : " << avgQ_pop << " (" << global.qstat.pop .count << "ops)\n";
155 std::cout << "Global Avg Qs : " << avgQ << " (" << (global.qstat.push.count + global.qstat.pop .count) << "ops)\n";
156 #endif
157}
158
159// ================================================================================================
160// EXPERIMENTS
161// ================================================================================================
162
163// ================================================================================================
164__attribute__((noinline)) void runChurn_body(
165 std::atomic<bool>& done,
166 Random & rand,
167 Node * my_nodes[],
168 unsigned nslots,
169 local_stat_t & local,
170 relaxed_list<Node> & list
171) {
172 while(__builtin_expect(!done.load(std::memory_order_relaxed), true)) {
173 int idx = rand.next() % nslots;
174 if (auto node = my_nodes[idx]) {
175 local.crc_in += node->value;
176 list.push(node);
177 my_nodes[idx] = nullptr;
178 local.in++;
179 }
180 else if(auto node = list.pop()) {
181 local.crc_out += node->value;
182 my_nodes[idx] = node;
183 local.out++;
184 }
185 else {
186 local.empty++;
187 }
188 }
189}
190
191void runChurn(unsigned nthread, unsigned nqueues, double duration, unsigned nnodes, const unsigned nslots) {
192 std::cout << "Churn Benchmark" << std::endl;
193 assert(nnodes <= nslots);
194 // List being tested
195 relaxed_list<Node> list = { nthread * nqueues };
196
197 // Barrier for synchronization
198 barrier_t barrier(nthread + 1);
199
200 // Data to check everything is OK
201 global_stat_t global;
202
203 // Flag to signal termination
204 std::atomic_bool done = { false };
205
206 // Prep nodes
207 std::cout << "Initializing ";
208 size_t npushed = 0;
209
210 Node** all_nodes[nthread];
211 for(auto & nodes : all_nodes) {
212 nodes = new __attribute__((aligned(64))) Node*[nslots + 8];
213 Random rand(rdtscl());
214 for(unsigned i = 0; i < nnodes; i++) {
215 nodes[i] = new Node(rand.next() % 100);
216 }
217
218 for(unsigned i = nnodes; i < nslots; i++) {
219 nodes[i] = nullptr;
220 }
221
222 for(int i = 0; i < 10 && i < (int)nslots; i++) {
223 int idx = rand.next() % nslots;
224 if (auto node = nodes[idx]) {
225 global.crc_in += node->value;
226 list.push(node);
227 npushed++;
228 nodes[idx] = nullptr;
229 }
230 }
231 }
232
233 std::cout << nnodes << " nodes (" << nslots << " slots)" << std::endl;
234
235 enable_stats = true;
236
237 std::thread * threads[nthread];
238 unsigned i = 1;
239 for(auto & t : threads) {
240 auto & my_nodes = all_nodes[i - 1];
241 t = new std::thread([&done, &list, &barrier, &global, &my_nodes, nslots](unsigned tid) {
242 Random rand(tid + rdtscl());
243
244 local_stat_t local;
245
246 // affinity(tid);
247
248 barrier.wait(tid);
249
250 // EXPERIMENT START
251
252 runChurn_body(done, rand, my_nodes, nslots, local, list);
253
254 // EXPERIMENT END
255
256 barrier.wait(tid);
257
258 tally_stats(global, local);
259
260 for(unsigned i = 0; i < nslots; i++) {
261 delete my_nodes[i];
262 }
263 }, i++);
264 }
265
266 waitfor(duration, barrier, done);
267
268 for(auto t : threads) {
269 t->join();
270 delete t;
271 }
272
273 enable_stats = false;
274
275 while(auto node = list.pop()) {
276 global.crc_out += node->value;
277 delete node;
278 }
279
280 for(auto nodes : all_nodes) {
281 delete[] nodes;
282 }
283
284 print_stats(duration, nthread, global);
285}
286
287// ================================================================================================
288__attribute__((noinline)) void runPingPong_body(
289 std::atomic<bool>& done,
290 Node initial_nodes[],
291 unsigned nnodes,
292 local_stat_t & local,
293 relaxed_list<Node> & list
294) {
295 Node * nodes[nnodes];
296 {
297 unsigned i = 0;
298 for(auto & n : nodes) {
299 n = &initial_nodes[i++];
300 }
301 }
302
303 while(__builtin_expect(!done.load(std::memory_order_relaxed), true)) {
304
305 for(Node * & node : nodes) {
306 local.crc_in += node->value;
307 list.push(node);
308 local.in++;
309 }
310
311 // -----
312
313 for(Node * & node : nodes) {
314 node = list.pop();
315 assert(node);
316 local.crc_out += node->value;
317 local.out++;
318 }
319 }
320}
321
322void runPingPong(unsigned nthread, unsigned nqueues, double duration, unsigned nnodes) {
323 std::cout << "PingPong Benchmark" << std::endl;
324
325 // List being tested
326 relaxed_list<Node> list = { nthread * nqueues };
327
328 // Barrier for synchronization
329 barrier_t barrier(nthread + 1);
330
331 // Data to check everything is OK
332 global_stat_t global;
333
334 // Flag to signal termination
335 std::atomic_bool done = { false };
336
337 std::cout << "Initializing ";
338 enable_stats = true;
339
340 std::thread * threads[nthread];
341 unsigned i = 1;
342 for(auto & t : threads) {
343 t = new std::thread([&done, &list, &barrier, &global, nnodes](unsigned tid) {
344 Random rand(tid + rdtscl());
345
346 Node nodes[nnodes];
347 for(auto & n : nodes) {
348 n.value = (int)rand.next() % 100;
349 }
350
351 local_stat_t local;
352
353 // affinity(tid);
354
355 barrier.wait(tid);
356
357 // EXPERIMENT START
358
359 runPingPong_body(done, nodes, nnodes, local, list);
360
361 // EXPERIMENT END
362
363 barrier.wait(tid);
364
365 tally_stats(global, local);
366 }, i++);
367 }
368
369 waitfor(duration, barrier, done);
370
371 for(auto t : threads) {
372 t->join();
373 delete t;
374 }
375
376 enable_stats = false;
377
378 print_stats(duration, nthread, global);
379}
380
381bool iequals(const std::string& a, const std::string& b)
382{
383 return std::equal(a.begin(), a.end(),
384 b.begin(), b.end(),
385 [](char a, char b) {
386 return std::tolower(a) == std::tolower(b);
387 });
388}
389
390int main(int argc, char * argv[]) {
391
392 double duration = 5.0;
393 unsigned nthreads = 2;
394 unsigned nqueues = 4;
395 unsigned nnodes = 100;
396 unsigned nslots = 100;
397
398 enum {
399 Churn,
400 PingPong,
401 NONE
402 } benchmark = NONE;
403
404 std::cout.imbue(std::locale(""));
405
406 for(;;) {
407 static struct option options[] = {
408 {"duration", required_argument, 0, 'd'},
409 {"nthreads", required_argument, 0, 't'},
410 {"nqueues", required_argument, 0, 'q'},
411 {"benchmark", required_argument, 0, 'b'},
412 {0, 0, 0, 0}
413 };
414
415 int idx = 0;
416 int opt = getopt_long(argc, argv, "d:t:q:b:", options, &idx);
417
418 std::string arg = optarg ? optarg : "";
419 size_t len = 0;
420 switch(opt) {
421 // Exit Case
422 case -1:
423 /* paranoid */ assert(optind <= argc);
424 switch(benchmark) {
425 case NONE:
426 std::cerr << "Must specify a benchmark" << std::endl;
427 goto usage;
428 case PingPong:
429 nnodes = 1;
430 nslots = 1;
431 switch(argc - optind) {
432 case 0: break;
433 case 1:
434 try {
435 arg = optarg = argv[optind];
436 nnodes = stoul(optarg, &len);
437 if(len != arg.size()) { throw std::invalid_argument(""); }
438 } catch(std::invalid_argument &) {
439 std::cerr << "Number of nodes must be a positive integer, was " << arg << std::endl;
440 goto usage;
441 }
442 break;
443 default:
444 std::cerr << "'PingPong' benchmark doesn't accept more than 2 extra arguments" << std::endl;
445 goto usage;
446 }
447 break;
448 case Churn:
449 nnodes = 100;
450 nslots = 100;
451 switch(argc - optind) {
452 case 0: break;
453 case 1:
454 try {
455 arg = optarg = argv[optind];
456 nnodes = stoul(optarg, &len);
457 if(len != arg.size()) { throw std::invalid_argument(""); }
458 nslots = nnodes;
459 } catch(std::invalid_argument &) {
460 std::cerr << "Number of nodes must be a positive integer, was " << arg << std::endl;
461 goto usage;
462 }
463 break;
464 case 2:
465 try {
466 arg = optarg = argv[optind];
467 nnodes = stoul(optarg, &len);
468 if(len != arg.size()) { throw std::invalid_argument(""); }
469 } catch(std::invalid_argument &) {
470 std::cerr << "Number of nodes must be a positive integer, was " << arg << std::endl;
471 goto usage;
472 }
473 try {
474 arg = optarg = argv[optind + 1];
475 nslots = stoul(optarg, &len);
476 if(len != arg.size()) { throw std::invalid_argument(""); }
477 } catch(std::invalid_argument &) {
478 std::cerr << "Number of slots must be a positive integer, was " << arg << std::endl;
479 goto usage;
480 }
481 break;
482 default:
483 std::cerr << "'Churn' benchmark doesn't accept more than 2 extra arguments" << std::endl;
484 goto usage;
485 }
486 break;
487 }
488 goto run;
489 // Benchmarks
490 case 'b':
491 if(benchmark != NONE) {
492 std::cerr << "Only when benchmark can be run" << std::endl;
493 goto usage;
494 }
495 if(iequals(arg, "churn")) {
496 benchmark = Churn;
497 break;
498 }
499 if(iequals(arg, "pingpong")) {
500 benchmark = PingPong;
501 break;
502 }
503 std::cerr << "Unkown benchmark " << arg << std::endl;
504 goto usage;
505 // Numeric Arguments
506 case 'd':
507 try {
508 duration = stod(optarg, &len);
509 if(len != arg.size()) { throw std::invalid_argument(""); }
510 } catch(std::invalid_argument &) {
511 std::cerr << "Duration must be a valid double, was " << arg << std::endl;
512 goto usage;
513 }
514 break;
515 case 't':
516 try {
517 nthreads = stoul(optarg, &len);
518 if(len != arg.size()) { throw std::invalid_argument(""); }
519 } catch(std::invalid_argument &) {
520 std::cerr << "Number of threads must be a positive integer, was " << arg << std::endl;
521 goto usage;
522 }
523 break;
524 case 'q':
525 try {
526 nqueues = stoul(optarg, &len);
527 if(len != arg.size()) { throw std::invalid_argument(""); }
528 } catch(std::invalid_argument &) {
529 std::cerr << "Number of queues must be a positive integer, was " << arg << std::endl;
530 goto usage;
531 }
532 break;
533 // Other cases
534 default: /* ? */
535 std::cerr << opt << std::endl;
536 usage:
537 std::cerr << "Usage: " << argv[0] << ": [options] -b churn [NNODES] [NSLOTS = NNODES]" << std::endl;
538 std::cerr << " or: " << argv[0] << ": [options] -b pingpong [NNODES]" << std::endl;
539 std::cerr << std::endl;
540 std::cerr << " -d, --duration=DURATION Duration of the experiment, in seconds" << std::endl;
541 std::cerr << " -t, --nthreads=NTHREADS Number of kernel threads" << std::endl;
542 std::cerr << " -q, --nqueues=NQUEUES Number of queues per threads" << std::endl;
543 std::exit(1);
544 }
545 }
546 run:
547
548 check_cache_line_size();
549
550 std::cout << "Running " << nthreads << " threads (" << (nthreads * nqueues) << " queues) for " << duration << " seconds" << std::endl;
551 switch(benchmark) {
552 case Churn:
553 runChurn(nthreads, nqueues, duration, nnodes, nslots);
554 break;
555 case PingPong:
556 runPingPong(nthreads, nqueues, duration, nnodes);
557 break;
558 default:
559 abort();
560 }
561 return 0;
562}
563
564const char * __my_progname = "Relaxed List";
Note: See TracBrowser for help on using the repository browser.