source: doc/theses/thierry_delisle_PhD/code/relaxed_list.cpp@ b2a37b0

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation jenkins-sandbox new-ast new-ast-unique-expr pthread-emulation qualifiedEnum stuck-waitfor-destruct
Last change on this file since b2a37b0 was b2a37b0, checked in by Thierry Delisle <tdelisle@…>, 6 years ago

Initial drafts in C++ of the CFA scheduler

  • Property mode set to 100644
File size: 5.0 KB
Line 
1#include "relaxed_list.hpp"
2
3#include <array>
4#include <iostream>
5#include <locale>
6#include <string>
7#include <thread>
8#include <vector>
9
10#include <unistd.h>
11#include <sys/sysinfo.h>
12
13#include "utils.hpp"
14
15struct Node {
16 static std::atomic_size_t creates;
17 static std::atomic_size_t destroys;
18
19 _LinksFields_t<Node> _links;
20
21 int value;
22 Node(int value): value(value) {
23 creates++;
24 }
25
26 ~Node() {
27 destroys++;
28 }
29};
30
31std::atomic_size_t Node::creates = { 0 };
32std::atomic_size_t Node::destroys = { 0 };
33
34static const constexpr int nodes_per_threads = 128;
35
36bool enable_stats = false;
37
38__attribute__((aligned(64))) thread_local pick_stat local_pick;
39
40void run(unsigned nthread, double duration) {
41 // List being tested
42 relaxed_list<Node> list = { nthread * 2 };
43
44 // Barrier for synchronization
45 barrier_t barrier(nthread + 1);
46
47 // Data to check everything is OK
48 struct {
49 std::atomic_size_t in = { 0 };
50 std::atomic_size_t out = { 0 };
51 std::atomic_size_t empty = { 0 };
52 std::atomic_size_t crc_in = { 0 };
53 std::atomic_size_t crc_out = { 0 };
54 std::atomic_size_t pick_at = { 0 };
55 std::atomic_size_t pick_su = { 0 };
56 } global;
57
58 // Flag to signal termination
59 std::atomic_bool done = { false };
60
61 // Prep nodes
62 std::cout << "Initializing" << std::endl;
63 std::vector<Node *> all_nodes[nthread];
64 for(auto & nodes : all_nodes) {
65 Random rand(rdtscl());
66 nodes.resize(nodes_per_threads);
67 for(auto & node : nodes) {
68 node = new Node(rand.next() % 100);
69 }
70
71 for(int i = 0; i < 10; i++) {
72 int idx = rand.next() % nodes_per_threads;
73 if (auto node = nodes[idx]) {
74 global.crc_in += node->value;
75 list.push(node);
76 nodes[idx] = nullptr;
77 }
78 }
79 }
80
81 enable_stats = true;
82
83 std::thread * threads[nthread];
84 unsigned i = 1;
85 for(auto & t : threads) {
86 auto & my_nodes = all_nodes[i - 1];
87 t = new std::thread([&done, &list, &barrier, &global, &my_nodes](unsigned tid) {
88 Random rand(tid + rdtscl());
89
90 size_t local_in = 0;
91 size_t local_out = 0;
92 size_t local_empty = 0;
93 size_t local_crc_in = 0;
94 size_t local_crc_out = 0;
95
96 affinity(tid);
97
98 barrier.wait(tid);
99
100 // EXPERIMENT START
101
102 while(__builtin_expect(!done, true)) {
103 int idx = rand.next() % nodes_per_threads;
104 if (auto node = my_nodes[idx]) {
105 local_crc_in += node->value;
106 list.push(node);
107 my_nodes[idx] = nullptr;
108 local_in++;
109 }
110 else if(auto node = list.pop2()) {
111 local_crc_out += node->value;
112 my_nodes[idx] = node;
113 local_out++;
114 }
115 else {
116 local_empty++;
117 }
118 }
119
120 // EXPERIMENT END
121
122 barrier.wait(tid);
123
124 global.in += local_in;
125 global.out += local_out;
126 global.empty += local_empty;
127
128 for(auto node : my_nodes) {
129 delete node;
130 }
131
132 global.crc_in += local_crc_in;
133 global.crc_out += local_crc_out;
134
135 global.pick_at += local_pick.attempt;
136 global.pick_su += local_pick.success;
137 }, i++);
138 }
139
140 std::cout << "Starting" << std::endl;
141 auto before = Clock::now();
142 barrier.wait(0);
143
144 while(true) {
145 usleep(1000);
146 auto now = Clock::now();
147 duration_t durr = now - before;
148 if( durr.count() > duration ) {
149 done = true;
150 break;
151 }
152 }
153
154 barrier.wait(0);
155 auto after = Clock::now();
156 duration_t durr = after - before;
157 duration = durr.count();
158 std::cout << "Closing down" << std::endl;
159
160 for(auto t : threads) {
161 t->join();
162 delete t;
163 }
164
165 enable_stats = false;
166
167 while(auto node = list.pop()) {
168 global.crc_out += node->value;
169 delete node;
170 }
171
172 assert(Node::creates == Node::destroys);
173 assert(global.crc_in == global.crc_out);
174
175 std::cout << "Done" << std::endl;
176
177 size_t ops = global.in + global.out;
178 size_t ops_sec = size_t(double(ops) / duration);
179 size_t ops_thread = ops_sec / nthread;
180 auto dur_nano = duration_cast<std::nano>(1.0);
181
182 std::cout << "Duration : " << duration << "s\n";
183 std::cout << "Total ops : " << ops << "(" << global.in << "i, " << global.out << "o, " << global.empty << "e)\n";
184 std::cout << "Ops/sec : " << ops_sec << "\n";
185 std::cout << "Ops/sec/thread: " << ops_thread << "\n";
186 std::cout << "ns/Op : " << ( dur_nano / ops_thread )<< "\n";
187 std::cout << "Pick % : " << (100.0 * double(global.pick_su) / global.pick_at) << "(" << global.pick_su << " / " << global.pick_at << ")\n";
188}
189
190void usage(char * argv[]) {
191 std::cerr << argv[0] << ": [DURATION (FLOAT:SEC)] [NTHREADS]" << std::endl;;
192 std::exit(1);
193}
194
195int main(int argc, char * argv[]) {
196
197 double duration = 5.0;
198 unsigned nthreads = 2;
199
200 std::cout.imbue(std::locale(""));
201
202 switch (argc)
203 {
204 case 3:
205 nthreads = std::stoul(argv[2]);
206 [[fallthrough]];
207 case 2:
208 duration = std::stod(argv[1]);
209 if( duration <= 0.0 ) {
210 std::cerr << "Duration must be positive, was " << argv[1] << "(" << duration << ")" << std::endl;
211 usage(argv);
212 }
213 [[fallthrough]];
214 case 1:
215 break;
216 default:
217 usage(argv);
218 break;
219 }
220
221 check_cache_line_size();
222
223 std::cout << "Running " << nthreads << " threads for " << duration << " seconds" << std::endl;
224 run(nthreads, duration);
225
226 return 0;
227}
228
229template<>
230thread_local Random relaxed_list<Node>::rng_g = { int(rdtscl()) };
Note: See TracBrowser for help on using the repository browser.