source: benchmark/readyQ/locality.cpp@ 857a1c6

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since 857a1c6 was f03209d3, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Locality benchmark now supports explicit number of spots instead of using nthreads - nprocs

  • Property mode set to 100644
File size: 8.5 KB
Line 
1#include "rq_bench.hpp"
2#pragma GCC diagnostic push
3#pragma GCC diagnostic ignored "-Wunused-parameter"
4 #include <libfibre/fibre.h>
5#pragma GCC diagnostic pop
6
7struct Result {
8 uint64_t count = 0;
9 uint64_t dmigs = 0;
10 uint64_t gmigs = 0;
11};
12
13class __attribute__((aligned(128))) bench_sem {
14 Fibre * volatile ptr = nullptr;
15public:
16 inline bool wait() {
17 static Fibre * const ready = reinterpret_cast<Fibre * const>(1ull);
18 for(;;) {
19 Fibre * expected = this->ptr;
20 if(expected == ready) {
21 if(__atomic_compare_exchange_n(&this->ptr, &expected, nullptr, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
22 return false;
23 }
24 }
25 else {
26 /* paranoid */ assert( expected == nullptr );
27 if(__atomic_compare_exchange_n(&this->ptr, &expected, fibre_self(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
28 fibre_park();
29 return true;
30 }
31 }
32
33 }
34 }
35
36 inline bool post() {
37 static Fibre * const ready = reinterpret_cast<Fibre * const>(1ull);
38 for(;;) {
39 Fibre * expected = this->ptr;
40 if(expected == ready) return false;
41 if(expected == nullptr) {
42 if(__atomic_compare_exchange_n(&this->ptr, &expected, ready, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
43 return false;
44 }
45 }
46 else {
47 if(__atomic_compare_exchange_n(&this->ptr, &expected, nullptr, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
48 fibre_unpark( expected );
49 return true;
50 }
51 }
52 }
53 }
54};
55
56// ==================================================
57struct __attribute__((aligned(128))) MyData {
58 uint64_t _p1[16]; // padding
59 uint64_t * data;
60 size_t len;
61 BaseProcessor * ttid;
62 size_t id;
63 uint64_t _p2[16]; // padding
64
65 MyData(size_t id, size_t size)
66 : data( (uintptr_t *)aligned_alloc(128, size * sizeof(uint64_t)) )
67 , len( size )
68 , ttid( &Context::CurrProcessor() )
69 , id( id )
70 {
71 for(size_t i = 0; i < this->len; i++) {
72 this->data[i] = 0;
73 }
74 }
75
76 uint64_t moved(BaseProcessor * ttid) {
77 if(this->ttid == ttid) {
78 return 0;
79 }
80 this->ttid = ttid;
81 return 1;
82 }
83
84 __attribute__((noinline)) void access(size_t idx) {
85 size_t l = this->len;
86 this->data[idx % l] += 1;
87 }
88};
89
90// ==================================================
91struct __attribute__((aligned(128))) MyCtx {
92 struct MyData * volatile data;
93
94 struct {
95 struct MySpot ** ptr;
96 size_t len;
97 } spots;
98
99 bench_sem sem;
100
101 Result result;
102
103 bool share;
104 size_t cnt;
105 BaseProcessor * ttid;
106 size_t id;
107
108 MyCtx(MyData * d, MySpot ** spots, size_t len, size_t cnt, bool share, size_t id)
109 : data( d )
110 , spots{ .ptr = spots, .len = len }
111 , share( share )
112 , cnt( cnt )
113 , ttid( &Context::CurrProcessor() )
114 , id( id )
115 {}
116
117 uint64_t moved(BaseProcessor * ttid) {
118 if(this->ttid == ttid) {
119 return 0;
120 }
121 this->ttid = ttid;
122 return 1;
123 }
124};
125
126// ==================================================
127// Atomic object where a single thread can wait
128// May exchanges data
129struct __attribute__((aligned(128))) MySpot {
130 MyCtx * volatile ptr;
131 size_t id;
132 uint64_t _p1[16]; // padding
133
134 MySpot(size_t id) : ptr( nullptr ), id( id ) {}
135
136
137 static inline MyCtx * one() {
138 return reinterpret_cast<MyCtx *>(1);
139 }
140
141 // Main handshake of the code
142 // Single seat, first thread arriving waits
143 // Next threads unblocks current one and blocks in its place
144 // if share == true, exchange data in the process
145 bool put( MyCtx & ctx, MyData * data, bool share) {
146 // Attempt to CAS our context into the seat
147 for(;;) {
148 MyCtx * expected = this->ptr;
149 if (expected == one()) { // Seat is closed, return
150 return true;
151 }
152
153 if (__atomic_compare_exchange_n(&this->ptr, &expected, &ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
154 if(expected) {
155 if(share) {
156 expected->data = data;
157 }
158 expected->sem.post();
159 }
160 break; // We got the seat
161 }
162 }
163
164 // Block once on the seat
165 ctx.sem.wait();
166
167 // Someone woke us up, get the new data
168 return false;
169 }
170
171 // Shutdown the spot
172 // Wake current thread and mark seat as closed
173 void release() {
174 struct MyCtx * val = __atomic_exchange_n(&this->ptr, one(), __ATOMIC_SEQ_CST);
175 if (!val) {
176 return;
177 }
178
179 // Someone was there, release them
180 val->sem.post();
181 }
182};
183
184// ==================================================
185// Random number generator, Go's native one is to slow and global
186uint64_t __xorshift64( uint64_t & state ) {
187 uint64_t x = state;
188 x ^= x << 13;
189 x ^= x >> 7;
190 x ^= x << 17;
191 return state = x;
192}
193
194// ==================================================
195// Do some work by accessing 'cnt' cells in the array
196__attribute__((noinline)) void work(MyData & data, size_t cnt, uint64_t & state) {
197 for (size_t i = 0; i < cnt; i++) {
198 data.access(__xorshift64(state));
199 }
200}
201
202void thread_main( MyCtx & ctx ) {
203 uint64_t state = ctx.id;
204
205 // Wait for start
206 ctx.sem.wait();
207
208 // Main loop
209 for(;;) {
210 // Touch our current data, write to invalidate remote cache lines
211 work( *ctx.data, ctx.cnt, state );
212
213 // Wait on a random spot
214 uint64_t idx = __xorshift64(state) % ctx.spots.len;
215 bool closed = ctx.spots.ptr[idx]->put(ctx, ctx.data, ctx.share);
216
217 // Check if the experiment is over
218 if (closed) break;
219 if ( clock_mode && stop) break;
220 if (!clock_mode && ctx.result.count >= stop_count) break;
221
222 // Check everything is consistent
223 assert( ctx.data );
224
225 // write down progress and check migrations
226 BaseProcessor * ttid = &Context::CurrProcessor();
227 ctx.result.count += 1;
228 ctx.result.gmigs += ctx.moved(ttid);
229 ctx.result.dmigs += ctx.data->moved(ttid);
230 }
231
232 __atomic_fetch_add(&threads_left, -1, __ATOMIC_SEQ_CST);
233}
234
235// ==================================================
236int main(int argc, char * argv[]) {
237 unsigned wsize = 2;
238 unsigned wcnt = 2;
239 unsigned nspots = 0;
240 bool share = false;
241 option_t opt[] = {
242 BENCH_OPT,
243 { 'n', "nspots", "Number of spots where threads sleep (nthreads - nspots are active at the same time)", nspots},
244 { 'w', "worksize", "Size of the array for each threads, in words (64bit)", wsize},
245 { 'c', "workcnt" , "Number of words to touch when working (random pick, cells can be picked more than once)", wcnt },
246 { 's', "share" , "Pass the work data to the next thread when blocking", share, parse_truefalse }
247 };
248 BENCH_OPT_PARSE("libfibre cycle benchmark");
249
250 std::cout.imbue(std::locale(""));
251 setlocale(LC_ALL, "");
252
253 unsigned long long global_count = 0;
254 unsigned long long global_gmigs = 0;
255 unsigned long long global_dmigs = 0;
256
257 if( nspots == 0 ) { nspots = nthreads - nprocs; }
258
259 uint64_t start, end;
260 {
261 FibreInit(1, nprocs);
262 MyData * data_arrays[nthreads];
263 for(size_t i = 0; i < nthreads; i++) {
264 data_arrays[i] = new MyData( i, wsize );
265 }
266
267 MySpot * spots[nspots];
268 for(unsigned i = 0; i < nspots; i++) {
269 spots[i] = new MySpot{ i };
270 }
271
272 threads_left = nthreads - nspots;
273 Fibre * threads[nthreads];
274 MyCtx * thddata[nthreads];
275 {
276 for(size_t i = 0; i < nthreads; i++) {
277 thddata[i] = new MyCtx(
278 data_arrays[i],
279 spots,
280 nspots,
281 wcnt,
282 share,
283 i
284 );
285 threads[i] = new Fibre( reinterpret_cast<void (*)(void *)>(thread_main), thddata[i] );
286 }
287
288 bool is_tty = isatty(STDOUT_FILENO);
289 start = getTimeNsec();
290
291 for(size_t i = 0; i < nthreads; i++) {
292 thddata[i]->sem.post();
293 }
294 wait<Fibre>(start, is_tty);
295
296 stop = true;
297 end = getTimeNsec();
298 printf("\nDone\n");
299
300 for(size_t i = 0; i < nthreads; i++) {
301 thddata[i]->sem.post();
302 fibre_join( threads[i], nullptr );
303 global_count += thddata[i]->result.count;
304 global_gmigs += thddata[i]->result.gmigs;
305 global_dmigs += thddata[i]->result.dmigs;
306 }
307 }
308
309 for(size_t i = 0; i < nthreads; i++) {
310 delete( data_arrays[i] );
311 }
312
313 for(size_t i = 0; i < nspots; i++) {
314 delete( spots[i] );
315 }
316 }
317
318 printf("Duration (ms) : %'ld\n", to_miliseconds(end - start));
319 printf("Number of processors : %'d\n", nprocs);
320 printf("Number of threads : %'d\n", nthreads);
321 printf("Number of spots : %'d\n", nspots);
322 printf("Work size (64bit words): %'15u\n", wsize);
323 printf("Total Operations(ops) : %'15llu\n", global_count);
324 printf("Total G Migrations : %'15llu\n", global_gmigs);
325 printf("Total D Migrations : %'15llu\n", global_dmigs);
326 printf("Ops per second : %'18.2lf\n", ((double)global_count) / to_fseconds(end - start));
327 printf("ns per ops : %'18.2lf\n", ((double)(end - start)) / global_count);
328 printf("Ops per threads : %'15llu\n", global_count / nthreads);
329 printf("Ops per procs : %'15llu\n", global_count / nprocs);
330 printf("Ops/sec/procs : %'18.2lf\n", (((double)global_count) / nprocs) / to_fseconds(end - start));
331 printf("ns per ops/procs : %'18.2lf\n", ((double)(end - start)) / (global_count / nprocs));
332 fflush(stdout);
333}
Note: See TracBrowser for help on using the repository browser.