Changeset 807a632 for doc/theses/thierry_delisle_PhD/code/relaxed_list.cpp
- Timestamp:
- Feb 10, 2020, 11:17:31 AM (3 years ago)
- Branches:
- arm-eh, enum, forall-pointer-decay, jacob/cs343-translation, jenkins-sandbox, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 3b56166
- Parents:
- 487198c
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
doc/theses/thierry_delisle_PhD/code/relaxed_list.cpp
r487198c r807a632 22 22 23 23 int value; 24 int id; 24 25 25 26 Node() { creates++; } … … 37 38 38 39 template<> 39 relaxed_list<Node>::intrusive_queue_t::stat::Dif relaxed_list<Node>::intrusive_queue_t::stat::dif = {}; 40 relaxed_list<Node> * relaxed_list<Node>::head = nullptr; 41 42 #ifndef NO_STATS 43 template<> 44 relaxed_list<Node>::GlobalStats relaxed_list<Node>::global_stats = {}; 45 #endif 40 46 41 47 // ================================================================================================ … … 49 55 size_t crc_in = 0; 50 56 size_t crc_out = 0; 57 size_t valmax = 0; 58 size_t valmin = 100000000ul; 51 59 }; 52 60 … … 57 65 std::atomic_size_t crc_in = { 0 }; 58 66 std::atomic_size_t crc_out = { 0 }; 59 struct { 60 struct { 61 std::atomic_size_t attempt = { 0 }; 62 std::atomic_size_t success = { 0 }; 63 } push; 64 struct { 65 std::atomic_size_t attempt = { 0 }; 66 std::atomic_size_t success = { 0 }; 67 std::atomic_size_t mask_attempt = { 0 }; 68 } pop; 69 } pick; 70 struct { 71 struct { 72 std::atomic_size_t value = { 0 }; 73 std::atomic_size_t count = { 0 }; 74 } push; 75 struct { 76 std::atomic_size_t value = { 0 }; 77 std::atomic_size_t count = { 0 }; 78 } pop; 79 } qstat; 67 std::atomic_size_t valmax = { 0 }; 68 std::atomic_size_t valmin = { 100000000ul }; 80 69 }; 81 70 71 void atomic_max(std::atomic_size_t & target, size_t value) { 72 for(;;) { 73 size_t expect = target.load(std::memory_order_relaxed); 74 if(value <= expect) return; 75 bool success = target.compare_exchange_strong(expect, value); 76 if(success) return; 77 } 78 } 79 80 void atomic_min(std::atomic_size_t & target, size_t value) { 81 for(;;) { 82 size_t expect = target.load(std::memory_order_relaxed); 83 if(value >= expect) return; 84 bool success = target.compare_exchange_strong(expect, value); 85 if(success) return; 86 } 87 } 88 82 89 void tally_stats(global_stat_t & global, local_stat_t & local) { 90 83 91 global.in += local.in; 84 92 global.out += local.out; … … 88 96 global.crc_out += local.crc_out; 89 97 90 global.pick.push.attempt += relaxed_list<Node>::tls.pick.push.attempt; 91 global.pick.push.success += relaxed_list<Node>::tls.pick.push.success; 92 global.pick.pop .attempt += relaxed_list<Node>::tls.pick.pop.attempt; 93 global.pick.pop .success += relaxed_list<Node>::tls.pick.pop.success; 94 global.pick.pop .mask_attempt += relaxed_list<Node>::tls.pick.pop.mask_attempt; 95 96 global.qstat.push.value += relaxed_list<Node>::tls.empty.push.value; 97 global.qstat.push.count += relaxed_list<Node>::tls.empty.push.count; 98 global.qstat.pop .value += relaxed_list<Node>::tls.empty.pop .value; 99 global.qstat.pop .count += relaxed_list<Node>::tls.empty.pop .count; 98 atomic_max(global.valmax, local.valmax); 99 atomic_min(global.valmin, local.valmin); 100 101 relaxed_list<Node>::stats_tls_tally(); 100 102 } 101 103 … … 124 126 } 125 127 128 void waitfor(double & duration, barrier_t & barrier, const std::atomic_size_t & count) { 129 std::cout << "Starting" << std::endl; 130 auto before = Clock::now(); 131 barrier.wait(0); 132 133 while(true) { 134 usleep(100000); 135 size_t c = count.load(); 136 if( c == 0 ) { 137 break; 138 } 139 std::cout << "\r" << c; 140 std::cout.flush(); 141 } 142 143 barrier.wait(0); 144 auto after = Clock::now(); 145 duration_t durr = after - before; 146 duration = durr.count(); 147 std::cout << "\rClosing down" << std::endl; 148 } 149 126 150 void print_stats(double duration, unsigned nthread, global_stat_t & global) { 127 151 assert(Node::creates == Node::destroys); … … 140 164 std::cout << "Ops/sec : " << ops_sec << "\n"; 141 165 std::cout << "Total ops : " << ops << "(" << global.in << "i, " << global.out << "o, " << global.empty << "e)\n"; 166 if(global.valmax != 0) { 167 std::cout << "Max runs : " << global.valmax << "\n"; 168 std::cout << "Min runs : " << global.valmin << "\n"; 169 } 142 170 #ifndef NO_STATS 143 double push_sur = (100.0 * double(global.pick.push.success) / global.pick.push.attempt); 144 double pop_sur = (100.0 * double(global.pick.pop .success) / global.pick.pop .attempt); 145 146 std::cout << "Push Pick % : " << push_sur << "(" << global.pick.push.success << " / " << global.pick.push.attempt << ")\n"; 147 std::cout << "Pop Pick % : " << pop_sur << "(" << global.pick.pop .success << " / " << global.pick.pop .attempt << ")\n"; 148 std::cout << "Pop mask trys : " << global.pick.pop.mask_attempt << std::endl; 149 150 double avgQ_push = double(global.qstat.push.value) / global.qstat.push.count; 151 double avgQ_pop = double(global.qstat.pop .value) / global.qstat.pop .count; 152 double avgQ = double(global.qstat.push.value + global.qstat.pop .value) / (global.qstat.push.count + global.qstat.pop .count); 153 std::cout << "Push Avg Qs : " << avgQ_push << " (" << global.qstat.push.count << "ops)\n"; 154 std::cout << "Pop Avg Qs : " << avgQ_pop << " (" << global.qstat.pop .count << "ops)\n"; 155 std::cout << "Global Avg Qs : " << avgQ << " (" << (global.qstat.push.count + global.qstat.pop .count) << "ops)\n"; 171 relaxed_list<Node>::stats_print(std::cout); 156 172 #endif 157 173 } 174 175 void save_fairness(const int data[], int factor, unsigned nthreads, size_t columns, size_t rows, const std::string & output); 158 176 159 177 // ================================================================================================ … … 193 211 assert(nnodes <= nslots); 194 212 // List being tested 195 relaxed_list<Node> list = { nthread * nqueues };196 213 197 214 // Barrier for synchronization … … 207 224 std::cout << "Initializing "; 208 225 size_t npushed = 0; 209 210 Node** all_nodes[nthread]; 211 for(auto & nodes : all_nodes) { 212 nodes = new __attribute__((aligned(64))) Node*[nslots + 8]; 213 Random rand(rdtscl()); 214 for(unsigned i = 0; i < nnodes; i++) { 215 nodes[i] = new Node(rand.next() % 100); 216 } 217 218 for(unsigned i = nnodes; i < nslots; i++) { 219 nodes[i] = nullptr; 220 } 221 222 for(int i = 0; i < 10 && i < (int)nslots; i++) { 223 int idx = rand.next() % nslots; 224 if (auto node = nodes[idx]) { 225 global.crc_in += node->value; 226 list.push(node); 227 npushed++; 228 nodes[idx] = nullptr; 226 relaxed_list<Node> list = { nthread * nqueues }; 227 { 228 Node** all_nodes[nthread]; 229 for(auto & nodes : all_nodes) { 230 nodes = new __attribute__((aligned(64))) Node*[nslots + 8]; 231 Random rand(rdtscl()); 232 for(unsigned i = 0; i < nnodes; i++) { 233 nodes[i] = new Node(rand.next() % 100); 229 234 } 230 } 231 } 232 233 std::cout << nnodes << " nodes (" << nslots << " slots)" << std::endl; 234 235 enable_stats = true; 236 237 std::thread * threads[nthread]; 238 unsigned i = 1; 239 for(auto & t : threads) { 240 auto & my_nodes = all_nodes[i - 1]; 241 t = new std::thread([&done, &list, &barrier, &global, &my_nodes, nslots](unsigned tid) { 242 Random rand(tid + rdtscl()); 243 244 local_stat_t local; 245 246 // affinity(tid); 247 248 barrier.wait(tid); 249 250 // EXPERIMENT START 251 252 runChurn_body(done, rand, my_nodes, nslots, local, list); 253 254 // EXPERIMENT END 255 256 barrier.wait(tid); 257 258 tally_stats(global, local); 259 260 for(unsigned i = 0; i < nslots; i++) { 261 delete my_nodes[i]; 235 236 for(unsigned i = nnodes; i < nslots; i++) { 237 nodes[i] = nullptr; 262 238 } 263 }, i++); 264 } 265 266 waitfor(duration, barrier, done); 267 268 for(auto t : threads) { 269 t->join(); 270 delete t; 271 } 272 273 enable_stats = false; 274 275 while(auto node = list.pop()) { 276 global.crc_out += node->value; 277 delete node; 278 } 279 280 for(auto nodes : all_nodes) { 281 delete[] nodes; 239 240 for(int i = 0; i < 10 && i < (int)nslots; i++) { 241 int idx = rand.next() % nslots; 242 if (auto node = nodes[idx]) { 243 global.crc_in += node->value; 244 list.push(node); 245 npushed++; 246 nodes[idx] = nullptr; 247 } 248 } 249 } 250 251 std::cout << nnodes << " nodes (" << nslots << " slots)" << std::endl; 252 253 enable_stats = true; 254 255 std::thread * threads[nthread]; 256 unsigned i = 1; 257 for(auto & t : threads) { 258 auto & my_nodes = all_nodes[i - 1]; 259 t = new std::thread([&done, &list, &barrier, &global, &my_nodes, nslots](unsigned tid) { 260 Random rand(tid + rdtscl()); 261 262 local_stat_t local; 263 264 // affinity(tid); 265 266 barrier.wait(tid); 267 268 // EXPERIMENT START 269 270 runChurn_body(done, rand, my_nodes, nslots, local, list); 271 272 // EXPERIMENT END 273 274 barrier.wait(tid); 275 276 tally_stats(global, local); 277 278 for(unsigned i = 0; i < nslots; i++) { 279 delete my_nodes[i]; 280 } 281 }, i++); 282 } 283 284 waitfor(duration, barrier, done); 285 286 for(auto t : threads) { 287 t->join(); 288 delete t; 289 } 290 291 enable_stats = false; 292 293 while(auto node = list.pop()) { 294 global.crc_out += node->value; 295 delete node; 296 } 297 298 for(auto nodes : all_nodes) { 299 delete[] nodes; 300 } 282 301 } 283 302 … … 323 342 std::cout << "PingPong Benchmark" << std::endl; 324 343 344 345 // Barrier for synchronization 346 barrier_t barrier(nthread + 1); 347 348 // Data to check everything is OK 349 global_stat_t global; 350 351 // Flag to signal termination 352 std::atomic_bool done = { false }; 353 354 std::cout << "Initializing "; 325 355 // List being tested 326 356 relaxed_list<Node> list = { nthread * nqueues }; 357 { 358 enable_stats = true; 359 360 std::thread * threads[nthread]; 361 unsigned i = 1; 362 for(auto & t : threads) { 363 t = new std::thread([&done, &list, &barrier, &global, nnodes](unsigned tid) { 364 Random rand(tid + rdtscl()); 365 366 Node nodes[nnodes]; 367 for(auto & n : nodes) { 368 n.value = (int)rand.next() % 100; 369 } 370 371 local_stat_t local; 372 373 // affinity(tid); 374 375 barrier.wait(tid); 376 377 // EXPERIMENT START 378 379 runPingPong_body(done, nodes, nnodes, local, list); 380 381 // EXPERIMENT END 382 383 barrier.wait(tid); 384 385 tally_stats(global, local); 386 }, i++); 387 } 388 389 waitfor(duration, barrier, done); 390 391 for(auto t : threads) { 392 t->join(); 393 delete t; 394 } 395 396 enable_stats = false; 397 } 398 399 print_stats(duration, nthread, global); 400 } 401 402 // ================================================================================================ 403 __attribute__((noinline)) void runFairness_body( 404 unsigned tid, 405 size_t width, 406 size_t length, 407 int output[], 408 std::atomic_size_t & count, 409 Node initial_nodes[], 410 unsigned nnodes, 411 local_stat_t & local, 412 relaxed_list<Node> & list 413 ) { 414 Node * nodes[nnodes]; 415 { 416 unsigned i = 0; 417 for(auto & n : nodes) { 418 n = &initial_nodes[i++]; 419 } 420 } 421 422 while(__builtin_expect(0 != count.load(std::memory_order_relaxed), true)) { 423 424 for(Node * & node : nodes) { 425 local.crc_in += node->id; 426 list.push(node); 427 local.in++; 428 } 429 430 // ----- 431 432 for(Node * & node : nodes) { 433 node = list.pop(); 434 assert(node); 435 436 if (unsigned(node->value) < length) { 437 size_t idx = (node->value * width) + node->id; 438 assert(idx < (width * length)); 439 output[idx] = tid; 440 } 441 442 node->value++; 443 if(unsigned(node->value) == length) count--; 444 445 local.crc_out += node->id; 446 local.out++; 447 } 448 } 449 } 450 451 void runFairness(unsigned nthread, unsigned nqueues, double duration, unsigned nnodes, const std::string & output) { 452 std::cout << "Fairness Benchmark, outputing to : " << output << std::endl; 327 453 328 454 // Barrier for synchronization … … 332 458 global_stat_t global; 333 459 460 std::cout << "Initializing "; 461 462 // Check fairness by creating a png of where the threads ran 463 size_t width = nthread * nnodes; 464 size_t length = 100000; 465 466 std::unique_ptr<int[]> data_out { new int[width * length] }; 467 334 468 // Flag to signal termination 335 std::atomic_bool done = { false }; 336 337 std::cout << "Initializing "; 338 enable_stats = true; 339 340 std::thread * threads[nthread]; 341 unsigned i = 1; 342 for(auto & t : threads) { 343 t = new std::thread([&done, &list, &barrier, &global, nnodes](unsigned tid) { 344 Random rand(tid + rdtscl()); 345 346 Node nodes[nnodes]; 347 for(auto & n : nodes) { 348 n.value = (int)rand.next() % 100; 349 } 350 351 local_stat_t local; 352 353 // affinity(tid); 354 355 barrier.wait(tid); 356 357 // EXPERIMENT START 358 359 runPingPong_body(done, nodes, nnodes, local, list); 360 361 // EXPERIMENT END 362 363 barrier.wait(tid); 364 365 tally_stats(global, local); 366 }, i++); 367 } 368 369 waitfor(duration, barrier, done); 370 371 for(auto t : threads) { 372 t->join(); 373 delete t; 374 } 375 376 enable_stats = false; 469 std::atomic_size_t count = width; 470 471 // List being tested 472 relaxed_list<Node> list = { nthread * nqueues }; 473 { 474 enable_stats = true; 475 476 std::thread * threads[nthread]; 477 unsigned i = 1; 478 for(auto & t : threads) { 479 t = new std::thread([&count, &list, &barrier, &global, nnodes, width, length, data_out = data_out.get()](unsigned tid) { 480 unsigned int start = (tid - 1) * nnodes; 481 Node nodes[nnodes]; 482 for(auto & n : nodes) { 483 n.id = start; 484 n.value = 0; 485 start++; 486 } 487 488 local_stat_t local; 489 490 // affinity(tid); 491 492 barrier.wait(tid); 493 494 // EXPERIMENT START 495 496 runFairness_body(tid, width, length, data_out, count, nodes, nnodes, local, list); 497 498 // EXPERIMENT END 499 500 barrier.wait(tid); 501 502 for(const auto & n : nodes) { 503 local.valmax = max(local.valmax, size_t(n.value)); 504 local.valmin = min(local.valmin, size_t(n.value)); 505 } 506 507 tally_stats(global, local); 508 }, i++); 509 } 510 511 waitfor(duration, barrier, count); 512 513 for(auto t : threads) { 514 t->join(); 515 delete t; 516 } 517 518 enable_stats = false; 519 } 377 520 378 521 print_stats(duration, nthread, global); 379 } 522 523 save_fairness(data_out.get(), 100, nthread, width, length, output); 524 } 525 526 // ================================================================================================ 380 527 381 528 bool iequals(const std::string& a, const std::string& b) … … 395 542 unsigned nnodes = 100; 396 543 unsigned nslots = 100; 544 std::string out = "fairness.png"; 397 545 398 546 enum { 399 547 Churn, 400 548 PingPong, 549 Fairness, 401 550 NONE 402 551 } benchmark = NONE; … … 485 634 } 486 635 break; 636 case Fairness: 637 nnodes = 1; 638 switch(argc - optind) { 639 case 0: break; 640 case 1: 641 arg = optarg = argv[optind]; 642 out = arg; 643 break; 644 default: 645 std::cerr << "'Churn' benchmark doesn't accept more than 2 extra arguments" << std::endl; 646 goto usage; 647 } 487 648 } 488 649 goto run; … … 499 660 if(iequals(arg, "pingpong")) { 500 661 benchmark = PingPong; 662 break; 663 } 664 if(iequals(arg, "fairness")) { 665 benchmark = Fairness; 501 666 break; 502 667 } … … 556 721 runPingPong(nthreads, nqueues, duration, nnodes); 557 722 break; 723 case Fairness: 724 runFairness(nthreads, nqueues, duration, nnodes, out); 725 break; 558 726 default: 559 727 abort(); … … 563 731 564 732 const char * __my_progname = "Relaxed List"; 733 734 struct rgb_t { 735 double r; // a fraction between 0 and 1 736 double g; // a fraction between 0 and 1 737 double b; // a fraction between 0 and 1 738 }; 739 740 struct hsv_t { 741 double h; // angle in degrees 742 double s; // a fraction between 0 and 1 743 double v; // a fraction between 0 and 1 744 }; 745 746 rgb_t hsv2rgb(hsv_t in) { 747 double hh, p, q, t, ff; 748 long i; 749 rgb_t out; 750 751 if(in.s <= 0.0) { // < is bogus, just shuts up warnings 752 out.r = in.v; 753 out.g = in.v; 754 out.b = in.v; 755 return out; 756 } 757 hh = in.h; 758 if(hh >= 360.0) hh = 0.0; 759 hh /= 60.0; 760 i = (long)hh; 761 ff = hh - i; 762 p = in.v * (1.0 - in.s); 763 q = in.v * (1.0 - (in.s * ff)); 764 t = in.v * (1.0 - (in.s * (1.0 - ff))); 765 766 switch(i) { 767 case 0: 768 out.r = in.v; 769 out.g = t; 770 out.b = p; 771 break; 772 case 1: 773 out.r = q; 774 out.g = in.v; 775 out.b = p; 776 break; 777 case 2: 778 out.r = p; 779 out.g = in.v; 780 out.b = t; 781 break; 782 783 case 3: 784 out.r = p; 785 out.g = q; 786 out.b = in.v; 787 break; 788 case 4: 789 out.r = t; 790 out.g = p; 791 out.b = in.v; 792 break; 793 case 5: 794 default: 795 out.r = in.v; 796 out.g = p; 797 out.b = q; 798 break; 799 } 800 return out; 801 } 802 803 void save_fairness(const int data[], int factor, unsigned nthreads, size_t columns, size_t rows, const std::string & output) { 804 std::ofstream os(output); 805 os << "<html>\n"; 806 os << "<head>\n"; 807 os << "<style>\n"; 808 os << "</style>\n"; 809 os << "</head>\n"; 810 os << "<body>\n"; 811 os << "<table style=\"width=100%\">\n"; 812 813 size_t idx = 0; 814 for(size_t r = 0ul; r < rows; r++) { 815 os << "<tr>\n"; 816 for(size_t c = 0ul; c < columns; c++) { 817 os << "<td class=\"custom custom" << data[idx] << "\"></td>\n"; 818 idx++; 819 } 820 os << "</tr>\n"; 821 } 822 823 os << "</table>\n"; 824 os << "</body>\n"; 825 os << "</html>\n"; 826 os << std::endl; 827 } 828 829 #include <png.h> 830 #include <setjmp.h> 831 832 /* 833 void save_fairness(const int data[], int factor, unsigned nthreads, size_t columns, size_t rows, const std::string & output) { 834 int width = columns * factor; 835 int height = rows / factor; 836 837 int code = 0; 838 int idx = 0; 839 FILE *fp = NULL; 840 png_structp png_ptr = NULL; 841 png_infop info_ptr = NULL; 842 png_bytep row = NULL; 843 844 // Open file for writing (binary mode) 845 fp = fopen(output.c_str(), "wb"); 846 if (fp == NULL) { 847 fprintf(stderr, "Could not open file %s for writing\n", output.c_str()); 848 code = 1; 849 goto finalise; 850 } 851 852 // Initialize write structure 853 png_ptr = png_create_write_struct(PNG_LIBPNG_VER_STRING, NULL, NULL, NULL); 854 if (png_ptr == NULL) { 855 fprintf(stderr, "Could not allocate write struct\n"); 856 code = 1; 857 goto finalise; 858 } 859 860 // Initialize info structure 861 info_ptr = png_create_info_struct(png_ptr); 862 if (info_ptr == NULL) { 863 fprintf(stderr, "Could not allocate info struct\n"); 864 code = 1; 865 goto finalise; 866 } 867 868 // Setup Exception handling 869 if (setjmp(png_jmpbuf(png_ptr))) { 870 fprintf(stderr, "Error during png creation\n"); 871 code = 1; 872 goto finalise; 873 } 874 875 png_init_io(png_ptr, fp); 876 877 // Write header (8 bit colour depth) 878 png_set_IHDR(png_ptr, info_ptr, width, height, 879 8, PNG_COLOR_TYPE_RGB, PNG_INTERLACE_NONE, 880 PNG_COMPRESSION_TYPE_BASE, PNG_FILTER_TYPE_BASE); 881 882 png_write_info(png_ptr, info_ptr); 883 884 // Allocate memory for one row (3 bytes per pixel - RGB) 885 row = (png_bytep) malloc(3 * width * sizeof(png_byte)); 886 887 // Write image data 888 int x, y; 889 for (y=0 ; y<height ; y++) { 890 for (x=0 ; x<width ; x++) { 891 auto & r = row[(x * 3) + 0]; 892 auto & g = row[(x * 3) + 1]; 893 auto & b = row[(x * 3) + 2]; 894 assert(idx < (rows * columns)); 895 int color = data[idx] - 1; 896 assert(color < nthreads); 897 assert(color >= 0); 898 idx++; 899 900 double angle = double(color) / double(nthreads); 901 902 auto c = hsv2rgb({ 360.0 * angle, 0.8, 0.8 }); 903 904 r = char(c.r * 255.0); 905 g = char(c.g * 255.0); 906 b = char(c.b * 255.0); 907 908 } 909 png_write_row(png_ptr, row); 910 } 911 912 assert(idx == (rows * columns)); 913 914 // End write 915 png_write_end(png_ptr, NULL); 916 917 finalise: 918 if (fp != NULL) fclose(fp); 919 if (info_ptr != NULL) png_free_data(png_ptr, info_ptr, PNG_FREE_ALL, -1); 920 if (png_ptr != NULL) png_destroy_write_struct(&png_ptr, (png_infopp)NULL); 921 if (row != NULL) free(row); 922 } 923 */
Note: See TracChangeset
for help on using the changeset viewer.