Changeset 9421f3d8 for doc/theses/thierry_delisle_PhD/code/relaxed_list.cpp
- Timestamp:
- Oct 30, 2019, 11:26:07 AM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, jenkins-sandbox, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- df75fe97
- Parents:
- b067d9b
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
doc/theses/thierry_delisle_PhD/code/relaxed_list.cpp
rb067d9b r9421f3d8 9 9 #include <vector> 10 10 11 #include <getopt.h> 11 12 #include <unistd.h> 12 13 #include <sys/sysinfo.h> … … 21 22 22 23 int value; 23 Node(int value): value(value) { 24 creates++; 25 } 26 27 ~Node() { 28 destroys++; 29 } 24 25 Node() { creates++; } 26 Node(int value): value(value) { creates++; } 27 ~Node() { destroys++; } 30 28 }; 31 29 … … 33 31 std::atomic_size_t Node::destroys = { 0 }; 34 32 35 static const constexpr int nodes_per_threads = 128;36 struct NodeArray {37 __attribute__((aligned(64))) Node * array[nodes_per_threads];38 __attribute__((aligned(64))) char pad;39 };40 41 33 bool enable_stats = false; 34 35 template<> 36 thread_local relaxed_list<Node>::TLS relaxed_list<Node>::tls = {}; 37 38 template<> 39 relaxed_list<Node>::intrusive_queue_t::stat::Dif relaxed_list<Node>::intrusive_queue_t::stat::dif = {}; 40 41 // ================================================================================================ 42 // UTILS 43 // ================================================================================================ 42 44 43 45 struct local_stat_t { … … 49 51 }; 50 52 51 __attribute__((noinline)) void run_body( 52 std::atomic<bool>& done, 53 Random & rand, 54 Node * (&my_nodes)[128], 55 local_stat_t & local, 56 relaxed_list<Node> & list 57 ) { 58 while(__builtin_expect(!done.load(std::memory_order_relaxed), true)) { 59 int idx = rand.next() % nodes_per_threads; 60 if (auto node = my_nodes[idx]) { 61 local.crc_in += node->value; 62 list.push(node); 63 my_nodes[idx] = nullptr; 64 local.in++; 65 } 66 else if(auto node = list.pop()) { 67 local.crc_out += node->value; 68 my_nodes[idx] = node; 69 local.out++; 70 } 71 else { 72 local.empty++; 73 } 74 } 75 } 76 77 void run(unsigned nthread, unsigned nqueues, unsigned fill, double duration) { 78 // List being tested 79 relaxed_list<Node> list = { nthread * nqueues }; 80 81 // Barrier for synchronization 82 barrier_t barrier(nthread + 1); 83 84 // Data to check everything is OK 53 struct global_stat_t { 54 std::atomic_size_t in = { 0 }; 55 std::atomic_size_t out = { 0 }; 56 std::atomic_size_t empty = { 0 }; 57 std::atomic_size_t crc_in = { 0 }; 58 std::atomic_size_t crc_out = { 0 }; 85 59 struct { 86 std::atomic_size_t in = { 0 };87 std::atomic_size_t out = { 0 };88 std::atomic_size_t empty = { 0 };89 std::atomic_size_t crc_in = { 0 };90 std::atomic_size_t crc_out = { 0 };91 60 struct { 92 struct { 93 std::atomic_size_t attempt = { 0 }; 94 std::atomic_size_t success = { 0 }; 95 } push; 96 struct { 97 std::atomic_size_t attempt = { 0 }; 98 std::atomic_size_t success = { 0 }; 99 } pop; 100 } pick; 101 } global; 102 103 // Flag to signal termination 104 std::atomic_bool done = { false }; 105 106 // Prep nodes 107 std::cout << "Initializing "; 108 size_t nnodes = 0; 109 size_t npushed = 0; 110 NodeArray all_nodes[nthread]; 111 for(auto & nodes : all_nodes) { 112 Random rand(rdtscl()); 113 for(auto & node : nodes.array) { 114 auto r = rand.next() % 100; 115 if(r < fill) { 116 node = new Node(rand.next() % 100); 117 nnodes++; 118 } else { 119 node = nullptr; 120 } 121 } 122 123 for(int i = 0; i < 10; i++) { 124 int idx = rand.next() % nodes_per_threads; 125 if (auto node = nodes.array[idx]) { 126 global.crc_in += node->value; 127 list.push(node); 128 npushed++; 129 nodes.array[idx] = nullptr; 130 } 131 } 132 } 133 134 std::cout << nnodes << " nodes " << fill << "% (" << npushed << " pushed)" << std::endl; 135 136 enable_stats = true; 137 138 std::thread * threads[nthread]; 139 unsigned i = 1; 140 for(auto & t : threads) { 141 auto & my_nodes = all_nodes[i - 1].array; 142 t = new std::thread([&done, &list, &barrier, &global, &my_nodes](unsigned tid) { 143 Random rand(tid + rdtscl()); 144 145 local_stat_t local; 146 147 // affinity(tid); 148 149 barrier.wait(tid); 150 151 // EXPERIMENT START 152 153 run_body(done, rand, my_nodes, local, list); 154 155 // EXPERIMENT END 156 157 barrier.wait(tid); 158 159 global.in += local.in; 160 global.out += local.out; 161 global.empty += local.empty; 162 163 for(auto node : my_nodes) { 164 delete node; 165 } 166 167 global.crc_in += local.crc_in; 168 global.crc_out += local.crc_out; 169 170 global.pick.push.attempt += relaxed_list<Node>::tls.pick.push.attempt; 171 global.pick.push.success += relaxed_list<Node>::tls.pick.push.success; 172 global.pick.pop .attempt += relaxed_list<Node>::tls.pick.pop.attempt; 173 global.pick.pop .success += relaxed_list<Node>::tls.pick.pop.success; 174 }, i++); 175 } 176 61 std::atomic_size_t attempt = { 0 }; 62 std::atomic_size_t success = { 0 }; 63 } push; 64 struct { 65 std::atomic_size_t attempt = { 0 }; 66 std::atomic_size_t success = { 0 }; 67 std::atomic_size_t mask_attempt = { 0 }; 68 } pop; 69 } pick; 70 struct { 71 struct { 72 std::atomic_size_t value = { 0 }; 73 std::atomic_size_t count = { 0 }; 74 } push; 75 struct { 76 std::atomic_size_t value = { 0 }; 77 std::atomic_size_t count = { 0 }; 78 } pop; 79 } qstat; 80 }; 81 82 void tally_stats(global_stat_t & global, local_stat_t & local) { 83 global.in += local.in; 84 global.out += local.out; 85 global.empty += local.empty; 86 87 global.crc_in += local.crc_in; 88 global.crc_out += local.crc_out; 89 90 global.pick.push.attempt += relaxed_list<Node>::tls.pick.push.attempt; 91 global.pick.push.success += relaxed_list<Node>::tls.pick.push.success; 92 global.pick.pop .attempt += relaxed_list<Node>::tls.pick.pop.attempt; 93 global.pick.pop .success += relaxed_list<Node>::tls.pick.pop.success; 94 global.pick.pop .mask_attempt += relaxed_list<Node>::tls.pick.pop.mask_attempt; 95 96 global.qstat.push.value += relaxed_list<Node>::tls.empty.push.value; 97 global.qstat.push.count += relaxed_list<Node>::tls.empty.push.count; 98 global.qstat.pop .value += relaxed_list<Node>::tls.empty.pop .value; 99 global.qstat.pop .count += relaxed_list<Node>::tls.empty.pop .count; 100 } 101 102 void waitfor(double & duration, barrier_t & barrier, std::atomic_bool & done) { 177 103 std::cout << "Starting" << std::endl; 178 104 auto before = Clock::now(); … … 196 122 duration = durr.count(); 197 123 std::cout << "\rClosing down" << std::endl; 198 199 for(auto t : threads) { 200 t->join(); 201 delete t; 202 } 203 204 enable_stats = false; 205 206 while(auto node = list.pop()) { 207 global.crc_out += node->value; 208 delete node; 209 } 210 124 } 125 126 void print_stats(double duration, unsigned nthread, global_stat_t & global) { 211 127 assert(Node::creates == Node::destroys); 212 128 assert(global.crc_in == global.crc_out); … … 227 143 double push_sur = (100.0 * double(global.pick.push.success) / global.pick.push.attempt); 228 144 double pop_sur = (100.0 * double(global.pick.pop .success) / global.pick.pop .attempt); 145 229 146 std::cout << "Push Pick % : " << push_sur << "(" << global.pick.push.success << " / " << global.pick.push.attempt << ")\n"; 230 147 std::cout << "Pop Pick % : " << pop_sur << "(" << global.pick.pop .success << " / " << global.pick.pop .attempt << ")\n"; 148 std::cout << "Pop mask trys : " << global.pick.pop.mask_attempt << std::endl; 149 150 double avgQ_push = double(global.qstat.push.value) / global.qstat.push.count; 151 double avgQ_pop = double(global.qstat.pop .value) / global.qstat.pop .count; 152 double avgQ = double(global.qstat.push.value + global.qstat.pop .value) / (global.qstat.push.count + global.qstat.pop .count); 153 std::cout << "Push Avg Qs : " << avgQ_push << " (" << global.qstat.push.count << "ops)\n"; 154 std::cout << "Pop Avg Qs : " << avgQ_pop << " (" << global.qstat.pop .count << "ops)\n"; 155 std::cout << "Global Avg Qs : " << avgQ << " (" << (global.qstat.push.count + global.qstat.pop .count) << "ops)\n"; 231 156 #endif 232 157 } 233 158 234 void usage(char * argv[]) { 235 std::cerr << argv[0] << ": [DURATION (FLOAT:SEC)] [NTHREADS] [NQUEUES] [FILL]" << std::endl;; 236 std::exit(1); 159 // ================================================================================================ 160 // EXPERIMENTS 161 // ================================================================================================ 162 163 // ================================================================================================ 164 __attribute__((noinline)) void runChurn_body( 165 std::atomic<bool>& done, 166 Random & rand, 167 Node * my_nodes[], 168 unsigned nslots, 169 local_stat_t & local, 170 relaxed_list<Node> & list 171 ) { 172 while(__builtin_expect(!done.load(std::memory_order_relaxed), true)) { 173 int idx = rand.next() % nslots; 174 if (auto node = my_nodes[idx]) { 175 local.crc_in += node->value; 176 list.push(node); 177 my_nodes[idx] = nullptr; 178 local.in++; 179 } 180 else if(auto node = list.pop()) { 181 local.crc_out += node->value; 182 my_nodes[idx] = node; 183 local.out++; 184 } 185 else { 186 local.empty++; 187 } 188 } 189 } 190 191 void runChurn(unsigned nthread, unsigned nqueues, double duration, unsigned nnodes, const unsigned nslots) { 192 std::cout << "Churn Benchmark" << std::endl; 193 assert(nnodes <= nslots); 194 // List being tested 195 relaxed_list<Node> list = { nthread * nqueues }; 196 197 // Barrier for synchronization 198 barrier_t barrier(nthread + 1); 199 200 // Data to check everything is OK 201 global_stat_t global; 202 203 // Flag to signal termination 204 std::atomic_bool done = { false }; 205 206 // Prep nodes 207 std::cout << "Initializing "; 208 size_t npushed = 0; 209 210 Node** all_nodes[nthread]; 211 for(auto & nodes : all_nodes) { 212 nodes = new __attribute__((aligned(64))) Node*[nslots + 8]; 213 Random rand(rdtscl()); 214 for(unsigned i = 0; i < nnodes; i++) { 215 nodes[i] = new Node(rand.next() % 100); 216 } 217 218 for(unsigned i = nnodes; i < nslots; i++) { 219 nodes[i] = nullptr; 220 } 221 222 for(int i = 0; i < 10 && i < (int)nslots; i++) { 223 int idx = rand.next() % nslots; 224 if (auto node = nodes[idx]) { 225 global.crc_in += node->value; 226 list.push(node); 227 npushed++; 228 nodes[idx] = nullptr; 229 } 230 } 231 } 232 233 std::cout << nnodes << " nodes (" << nslots << " slots)" << std::endl; 234 235 enable_stats = true; 236 237 std::thread * threads[nthread]; 238 unsigned i = 1; 239 for(auto & t : threads) { 240 auto & my_nodes = all_nodes[i - 1]; 241 t = new std::thread([&done, &list, &barrier, &global, &my_nodes, nslots](unsigned tid) { 242 Random rand(tid + rdtscl()); 243 244 local_stat_t local; 245 246 // affinity(tid); 247 248 barrier.wait(tid); 249 250 // EXPERIMENT START 251 252 runChurn_body(done, rand, my_nodes, nslots, local, list); 253 254 // EXPERIMENT END 255 256 barrier.wait(tid); 257 258 tally_stats(global, local); 259 260 for(unsigned i = 0; i < nslots; i++) { 261 delete my_nodes[i]; 262 } 263 }, i++); 264 } 265 266 waitfor(duration, barrier, done); 267 268 for(auto t : threads) { 269 t->join(); 270 delete t; 271 } 272 273 enable_stats = false; 274 275 while(auto node = list.pop()) { 276 global.crc_out += node->value; 277 delete node; 278 } 279 280 for(auto nodes : all_nodes) { 281 delete[] nodes; 282 } 283 284 print_stats(duration, nthread, global); 285 } 286 287 // ================================================================================================ 288 __attribute__((noinline)) void runPingPong_body( 289 std::atomic<bool>& done, 290 Node initial_nodes[], 291 unsigned nnodes, 292 local_stat_t & local, 293 relaxed_list<Node> & list 294 ) { 295 Node * nodes[nnodes]; 296 { 297 unsigned i = 0; 298 for(auto & n : nodes) { 299 n = &initial_nodes[i++]; 300 } 301 } 302 303 while(__builtin_expect(!done.load(std::memory_order_relaxed), true)) { 304 305 for(Node * & node : nodes) { 306 local.crc_in += node->value; 307 list.push(node); 308 local.in++; 309 } 310 311 // ----- 312 313 for(Node * & node : nodes) { 314 node = list.pop(); 315 assert(node); 316 local.crc_out += node->value; 317 local.out++; 318 } 319 } 320 } 321 322 void runPingPong(unsigned nthread, unsigned nqueues, double duration, unsigned nnodes) { 323 std::cout << "PingPong Benchmark" << std::endl; 324 325 // List being tested 326 relaxed_list<Node> list = { nthread * nqueues }; 327 328 // Barrier for synchronization 329 barrier_t barrier(nthread + 1); 330 331 // Data to check everything is OK 332 global_stat_t global; 333 334 // Flag to signal termination 335 std::atomic_bool done = { false }; 336 337 std::cout << "Initializing "; 338 enable_stats = true; 339 340 std::thread * threads[nthread]; 341 unsigned i = 1; 342 for(auto & t : threads) { 343 t = new std::thread([&done, &list, &barrier, &global, nnodes](unsigned tid) { 344 Random rand(tid + rdtscl()); 345 346 Node nodes[nnodes]; 347 for(auto & n : nodes) { 348 n.value = (int)rand.next() % 100; 349 } 350 351 local_stat_t local; 352 353 // affinity(tid); 354 355 barrier.wait(tid); 356 357 // EXPERIMENT START 358 359 runPingPong_body(done, nodes, nnodes, local, list); 360 361 // EXPERIMENT END 362 363 barrier.wait(tid); 364 365 tally_stats(global, local); 366 }, i++); 367 } 368 369 waitfor(duration, barrier, done); 370 371 for(auto t : threads) { 372 t->join(); 373 delete t; 374 } 375 376 enable_stats = false; 377 378 print_stats(duration, nthread, global); 379 } 380 381 bool iequals(const std::string& a, const std::string& b) 382 { 383 return std::equal(a.begin(), a.end(), 384 b.begin(), b.end(), 385 [](char a, char b) { 386 return std::tolower(a) == std::tolower(b); 387 }); 237 388 } 238 389 … … 241 392 double duration = 5.0; 242 393 unsigned nthreads = 2; 243 unsigned nqueues = 2; 244 unsigned fill = 100; 394 unsigned nqueues = 4; 395 unsigned nnodes = 100; 396 unsigned nslots = 100; 397 398 enum { 399 Churn, 400 PingPong, 401 NONE 402 } benchmark = NONE; 245 403 246 404 std::cout.imbue(std::locale("")); 247 405 248 switch (argc) 249 { 250 case 5: 251 fill = std::stoul(argv[4]); 252 [[fallthrough]]; 253 case 4: 254 nqueues = std::stoul(argv[3]); 255 [[fallthrough]]; 256 case 3: 257 nthreads = std::stoul(argv[2]); 258 [[fallthrough]]; 259 case 2: 260 duration = std::stod(argv[1]); 261 if( duration <= 0.0 ) { 262 std::cerr << "Duration must be positive, was " << argv[1] << "(" << duration << ")" << std::endl; 263 usage(argv); 264 } 265 [[fallthrough]]; 266 case 1: 267 break; 268 default: 269 usage(argv); 270 break; 271 } 406 for(;;) { 407 static struct option options[] = { 408 {"duration", required_argument, 0, 'd'}, 409 {"nthreads", required_argument, 0, 't'}, 410 {"nqueues", required_argument, 0, 'q'}, 411 {"benchmark", required_argument, 0, 'b'}, 412 {0, 0, 0, 0} 413 }; 414 415 int idx = 0; 416 int opt = getopt_long(argc, argv, "d:t:q:b:", options, &idx); 417 418 std::string arg = optarg ? optarg : ""; 419 size_t len = 0; 420 switch(opt) { 421 // Exit Case 422 case -1: 423 /* paranoid */ assert(optind <= argc); 424 switch(benchmark) { 425 case NONE: 426 std::cerr << "Must specify a benchmark" << std::endl; 427 goto usage; 428 case PingPong: 429 nnodes = 1; 430 nslots = 1; 431 switch(argc - optind) { 432 case 0: break; 433 case 1: 434 try { 435 arg = optarg = argv[optind]; 436 nnodes = stoul(optarg, &len); 437 if(len != arg.size()) { throw std::invalid_argument(""); } 438 } catch(std::invalid_argument &) { 439 std::cerr << "Number of nodes must be a positive integer, was " << arg << std::endl; 440 goto usage; 441 } 442 break; 443 default: 444 std::cerr << "'PingPong' benchmark doesn't accept more than 2 extra arguments" << std::endl; 445 goto usage; 446 } 447 break; 448 case Churn: 449 nnodes = 100; 450 nslots = 100; 451 switch(argc - optind) { 452 case 0: break; 453 case 1: 454 try { 455 arg = optarg = argv[optind]; 456 nnodes = stoul(optarg, &len); 457 if(len != arg.size()) { throw std::invalid_argument(""); } 458 nslots = nnodes; 459 } catch(std::invalid_argument &) { 460 std::cerr << "Number of nodes must be a positive integer, was " << arg << std::endl; 461 goto usage; 462 } 463 break; 464 case 2: 465 try { 466 arg = optarg = argv[optind]; 467 nnodes = stoul(optarg, &len); 468 if(len != arg.size()) { throw std::invalid_argument(""); } 469 } catch(std::invalid_argument &) { 470 std::cerr << "Number of nodes must be a positive integer, was " << arg << std::endl; 471 goto usage; 472 } 473 try { 474 arg = optarg = argv[optind + 1]; 475 nslots = stoul(optarg, &len); 476 if(len != arg.size()) { throw std::invalid_argument(""); } 477 } catch(std::invalid_argument &) { 478 std::cerr << "Number of slots must be a positive integer, was " << arg << std::endl; 479 goto usage; 480 } 481 break; 482 default: 483 std::cerr << "'Churn' benchmark doesn't accept more than 2 extra arguments" << std::endl; 484 goto usage; 485 } 486 break; 487 } 488 goto run; 489 // Benchmarks 490 case 'b': 491 if(benchmark != NONE) { 492 std::cerr << "Only when benchmark can be run" << std::endl; 493 goto usage; 494 } 495 if(iequals(arg, "churn")) { 496 benchmark = Churn; 497 break; 498 } 499 if(iequals(arg, "pingpong")) { 500 benchmark = PingPong; 501 break; 502 } 503 std::cerr << "Unkown benchmark " << arg << std::endl; 504 goto usage; 505 // Numeric Arguments 506 case 'd': 507 try { 508 duration = stod(optarg, &len); 509 if(len != arg.size()) { throw std::invalid_argument(""); } 510 } catch(std::invalid_argument &) { 511 std::cerr << "Duration must be a valid double, was " << arg << std::endl; 512 goto usage; 513 } 514 break; 515 case 't': 516 try { 517 nthreads = stoul(optarg, &len); 518 if(len != arg.size()) { throw std::invalid_argument(""); } 519 } catch(std::invalid_argument &) { 520 std::cerr << "Number of threads must be a positive integer, was " << arg << std::endl; 521 goto usage; 522 } 523 break; 524 case 'q': 525 try { 526 nqueues = stoul(optarg, &len); 527 if(len != arg.size()) { throw std::invalid_argument(""); } 528 } catch(std::invalid_argument &) { 529 std::cerr << "Number of queues must be a positive integer, was " << arg << std::endl; 530 goto usage; 531 } 532 break; 533 // Other cases 534 default: /* ? */ 535 std::cerr << opt << std::endl; 536 usage: 537 std::cerr << "Usage: " << argv[0] << ": [options] -b churn [NNODES] [NSLOTS = NNODES]" << std::endl; 538 std::cerr << " or: " << argv[0] << ": [options] -b pingpong [NNODES]" << std::endl; 539 std::cerr << std::endl; 540 std::cerr << " -d, --duration=DURATION Duration of the experiment, in seconds" << std::endl; 541 std::cerr << " -t, --nthreads=NTHREADS Number of kernel threads" << std::endl; 542 std::cerr << " -q, --nqueues=NQUEUES Number of queues per threads" << std::endl; 543 std::exit(1); 544 } 545 } 546 run: 272 547 273 548 check_cache_line_size(); 274 549 275 550 std::cout << "Running " << nthreads << " threads (" << (nthreads * nqueues) << " queues) for " << duration << " seconds" << std::endl; 276 run(nthreads, nqueues, fill, duration); 277 551 switch(benchmark) { 552 case Churn: 553 runChurn(nthreads, nqueues, duration, nnodes, nslots); 554 break; 555 case PingPong: 556 runPingPong(nthreads, nqueues, duration, nnodes); 557 break; 558 default: 559 abort(); 560 } 278 561 return 0; 279 562 } 280 563 281 template<>282 thread_local relaxed_list<Node>::TLS relaxed_list<Node>::tls = {};283 284 template<>285 relaxed_list<Node>::intrusive_queue_t::stat::Dif relaxed_list<Node>::intrusive_queue_t::stat::dif = {};286 287 564 const char * __my_progname = "Relaxed List";
Note: See TracChangeset
for help on using the changeset viewer.