Changeset 50aeb6f for doc/theses/thierry_delisle_PhD/code/relaxed_list.cpp
- Timestamp:
- Sep 26, 2019, 4:25:04 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, jenkins-sandbox, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 1e24d13
- Parents:
- b2a37b0
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
doc/theses/thierry_delisle_PhD/code/relaxed_list.cpp
rb2a37b0 r50aeb6f 13 13 #include "utils.hpp" 14 14 15 struct Node {15 struct __attribute__((aligned(64))) Node { 16 16 static std::atomic_size_t creates; 17 17 static std::atomic_size_t destroys; … … 33 33 34 34 static const constexpr int nodes_per_threads = 128; 35 struct NodeArray { 36 __attribute__((aligned(64))) Node * array[nodes_per_threads]; 37 __attribute__((aligned(64))) char pad; 38 }; 35 39 36 40 bool enable_stats = false; 37 41 38 __attribute__((aligned(64))) thread_local pick_stat local_pick; 39 40 void run(unsigned nthread, double duration) { 42 struct local_stat_t { 43 size_t in = 0; 44 size_t out = 0; 45 size_t empty = 0; 46 size_t crc_in = 0; 47 size_t crc_out = 0; 48 }; 49 50 __attribute__((noinline)) void run_body( 51 std::atomic<bool>& done, 52 Random & rand, 53 Node * (&my_nodes)[128], 54 local_stat_t & local, 55 relaxed_list<Node> & list 56 ) { 57 while(__builtin_expect(!done.load(std::memory_order_relaxed), true)) { 58 int idx = rand.next() % nodes_per_threads; 59 if (auto node = my_nodes[idx]) { 60 local.crc_in += node->value; 61 list.push(node); 62 my_nodes[idx] = nullptr; 63 local.in++; 64 } 65 else if(auto node = list.pop()) { 66 local.crc_out += node->value; 67 my_nodes[idx] = node; 68 local.out++; 69 } 70 else { 71 local.empty++; 72 } 73 } 74 } 75 76 void run(unsigned nthread, unsigned nqueues, unsigned fill, double duration) { 41 77 // List being tested 42 relaxed_list<Node> list = { nthread * 2};78 relaxed_list<Node> list = { nthread * nqueues }; 43 79 44 80 // Barrier for synchronization … … 52 88 std::atomic_size_t crc_in = { 0 }; 53 89 std::atomic_size_t crc_out = { 0 }; 54 std::atomic_size_t pick_at = { 0 }; 55 std::atomic_size_t pick_su = { 0 }; 90 struct { 91 struct { 92 std::atomic_size_t attempt = { 0 }; 93 std::atomic_size_t success = { 0 }; 94 } push; 95 struct { 96 std::atomic_size_t attempt = { 0 }; 97 std::atomic_size_t success = { 0 }; 98 } pop; 99 } pick; 56 100 } global; 57 101 … … 61 105 // Prep nodes 62 106 std::cout << "Initializing" << std::endl; 63 std::vector<Node *>all_nodes[nthread];107 NodeArray all_nodes[nthread]; 64 108 for(auto & nodes : all_nodes) { 65 109 Random rand(rdtscl()); 66 nodes.resize(nodes_per_threads); 67 for(auto & node : nodes) { 68 node = new Node(rand.next() % 100); 110 for(auto & node : nodes.array) { 111 auto r = rand.next() % 100; 112 if(r < fill) 113 node = new Node(rand.next() % 100); 69 114 } 70 115 71 116 for(int i = 0; i < 10; i++) { 72 117 int idx = rand.next() % nodes_per_threads; 73 if (auto node = nodes [idx]) {118 if (auto node = nodes.array[idx]) { 74 119 global.crc_in += node->value; 75 120 list.push(node); 76 nodes [idx] = nullptr;121 nodes.array[idx] = nullptr; 77 122 } 78 123 } … … 84 129 unsigned i = 1; 85 130 for(auto & t : threads) { 86 auto & my_nodes = all_nodes[i - 1] ;131 auto & my_nodes = all_nodes[i - 1].array; 87 132 t = new std::thread([&done, &list, &barrier, &global, &my_nodes](unsigned tid) { 88 133 Random rand(tid + rdtscl()); 89 134 90 size_t local_in = 0; 91 size_t local_out = 0; 92 size_t local_empty = 0; 93 size_t local_crc_in = 0; 94 size_t local_crc_out = 0; 95 96 affinity(tid); 135 local_stat_t local; 136 137 // affinity(tid); 97 138 98 139 barrier.wait(tid); … … 100 141 // EXPERIMENT START 101 142 102 while(__builtin_expect(!done, true)) { 103 int idx = rand.next() % nodes_per_threads; 104 if (auto node = my_nodes[idx]) { 105 local_crc_in += node->value; 106 list.push(node); 107 my_nodes[idx] = nullptr; 108 local_in++; 109 } 110 else if(auto node = list.pop2()) { 111 local_crc_out += node->value; 112 my_nodes[idx] = node; 113 local_out++; 114 } 115 else { 116 local_empty++; 117 } 118 } 143 run_body(done, rand, my_nodes, local, list); 119 144 120 145 // EXPERIMENT END … … 122 147 barrier.wait(tid); 123 148 124 global.in += local _in;125 global.out += local _out;126 global.empty += local _empty;149 global.in += local.in; 150 global.out += local.out; 151 global.empty += local.empty; 127 152 128 153 for(auto node : my_nodes) { … … 130 155 } 131 156 132 global.crc_in += local_crc_in; 133 global.crc_out += local_crc_out; 134 135 global.pick_at += local_pick.attempt; 136 global.pick_su += local_pick.success; 157 global.crc_in += local.crc_in; 158 global.crc_out += local.crc_out; 159 160 global.pick.push.attempt += relaxed_list<Node>::tls.pick.push.attempt; 161 global.pick.push.success += relaxed_list<Node>::tls.pick.push.success; 162 global.pick.pop .attempt += relaxed_list<Node>::tls.pick.pop.attempt; 163 global.pick.pop .success += relaxed_list<Node>::tls.pick.pop.success; 137 164 }, i++); 138 165 } … … 143 170 144 171 while(true) { 145 usleep(1000 );172 usleep(100000); 146 173 auto now = Clock::now(); 147 174 duration_t durr = now - before; … … 150 177 break; 151 178 } 179 std::cout << "\r" << durr.count(); 180 std::cout.flush(); 152 181 } 153 182 … … 156 185 duration_t durr = after - before; 157 186 duration = durr.count(); 158 std::cout << " Closing down" << std::endl;187 std::cout << "\nClosing down" << std::endl; 159 188 160 189 for(auto t : threads) { … … 181 210 182 211 std::cout << "Duration : " << duration << "s\n"; 212 std::cout << "ns/Op : " << ( dur_nano / ops_thread )<< "\n"; 213 std::cout << "Ops/sec/thread: " << ops_thread << "\n"; 214 std::cout << "Ops/sec : " << ops_sec << "\n"; 183 215 std::cout << "Total ops : " << ops << "(" << global.in << "i, " << global.out << "o, " << global.empty << "e)\n"; 184 std::cout << "Ops/sec : " << ops_sec << "\n"; 185 std::cout << "Ops/sec/thread: " << ops_thread << "\n"; 186 std::cout << "ns/Op : " << ( dur_nano / ops_thread )<< "\n"; 187 std::cout << "Pick % : " << (100.0 * double(global.pick_su) / global.pick_at) << "(" << global.pick_su << " / " << global.pick_at << ")\n"; 216 #ifndef NO_STATS 217 double push_sur = (100.0 * double(global.pick.push.success) / global.pick.push.attempt); 218 double pop_sur = (100.0 * double(global.pick.pop .success) / global.pick.pop .attempt); 219 std::cout << "Push Pick % : " << push_sur << "(" << global.pick.push.success << " / " << global.pick.push.attempt << ")\n"; 220 std::cout << "Pop Pick % : " << pop_sur << "(" << global.pick.pop .success << " / " << global.pick.pop .attempt << ")\n"; 221 #endif 188 222 } 189 223 190 224 void usage(char * argv[]) { 191 std::cerr << argv[0] << ": [DURATION (FLOAT:SEC)] [NTHREADS] " << std::endl;;225 std::cerr << argv[0] << ": [DURATION (FLOAT:SEC)] [NTHREADS] [NQUEUES] [FILL]" << std::endl;; 192 226 std::exit(1); 193 227 } … … 197 231 double duration = 5.0; 198 232 unsigned nthreads = 2; 233 unsigned nqueues = 2; 234 unsigned fill = 100; 199 235 200 236 std::cout.imbue(std::locale("")); … … 202 238 switch (argc) 203 239 { 240 case 5: 241 nqueues = std::stoul(argv[4]); 242 [[fallthrough]]; 243 case 4: 244 nqueues = std::stoul(argv[3]); 245 [[fallthrough]]; 204 246 case 3: 205 247 nthreads = std::stoul(argv[2]); … … 221 263 check_cache_line_size(); 222 264 223 std::cout << "Running " << nthreads << " threads for " << duration << " seconds" << std::endl;224 run(nthreads, duration);265 std::cout << "Running " << nthreads << " threads (" << (nthreads * nqueues) << " queues) for " << duration << " seconds" << std::endl; 266 run(nthreads, nqueues, duration, fill); 225 267 226 268 return 0; … … 228 270 229 271 template<> 230 thread_local Random relaxed_list<Node>::rng_g = { int(rdtscl()) }; 272 thread_local relaxed_list<Node>::TLS relaxed_list<Node>::tls = {}; 273 274 template<> 275 relaxed_list<Node>::intrusive_queue_t::stat::Dif relaxed_list<Node>::intrusive_queue_t::stat::dif = {}; 276 277 const char * __my_progname = "Relaxed List";
Note: See TracChangeset
for help on using the changeset viewer.