source: benchmark/readyQ/locality.cc@ 8fd1b7c

ADT ast-experimental
Last change on this file since 8fd1b7c was e54d0c3, checked in by Peter A. Buhr <pabuhr@…>, 4 years ago

Fixed benchmarks after another change to getTimeNsec()

  • Property mode set to 100644
File size: 8.1 KB
Line 
1#include "rq_bench.hpp"
2
3#include <pthread.h>
4#include <semaphore.h>
5#include <sched.h>
6#include <unistd.h>
7
8#include <iostream>
9
10struct Result {
11 uint64_t count = 0;
12 uint64_t dmigs = 0;
13 uint64_t gmigs = 0;
14};
15
16struct Pthread {
17 static int usleep(useconds_t usec) {
18 return ::usleep(usec);
19 }
20};
21
22// ==================================================
23struct __attribute__((aligned(128))) MyData {
24 uint64_t _p1[16]; // padding
25 uint64_t * data;
26 size_t len;
27 int ttid;
28 size_t id;
29 uint64_t _p2[16]; // padding
30
31 MyData(size_t id, size_t size)
32 : data( (uintptr_t *)aligned_alloc(128, size * sizeof(uint64_t)) )
33 , len( size )
34 , ttid( sched_getcpu() )
35 , id( id )
36 {
37 for(size_t i = 0; i < this->len; i++) {
38 this->data[i] = 0;
39 }
40 }
41
42 uint64_t moved(int ttid) {
43 if(this->ttid == ttid) {
44 return 0;
45 }
46 this->ttid = ttid;
47 return 1;
48 }
49
50 __attribute__((noinline)) void access(size_t idx) {
51 size_t l = this->len;
52 this->data[idx % l] += 1;
53 }
54};
55
56// ==================================================
57struct __attribute__((aligned(128))) MyCtx {
58 struct MyData * volatile data;
59
60 struct {
61 struct MySpot ** ptr;
62 size_t len;
63 } spots;
64
65 sem_t sem;
66
67 Result result;
68
69 bool share;
70 size_t cnt;
71 int ttid;
72 size_t id;
73
74 MyCtx(MyData * d, MySpot ** spots, size_t len, size_t cnt, bool share, size_t id)
75 : data( d )
76 , spots{ .ptr = spots, .len = len }
77 , share( share )
78 , cnt( cnt )
79 , ttid( sched_getcpu() )
80 , id( id )
81 {
82 int ret = sem_init( &sem, false, 0 );
83 if(ret != 0) std::abort();
84 }
85
86 ~MyCtx() {
87 int ret = sem_destroy( &sem );
88 if(ret != 0) std::abort();
89 }
90
91 uint64_t moved(int ttid) {
92 if(this->ttid == ttid) {
93 return 0;
94 }
95 this->ttid = ttid;
96 return 1;
97 }
98};
99
100// ==================================================
101// Atomic object where a single thread can wait
102// May exchanges data
103struct __attribute__((aligned(128))) MySpot {
104 MyCtx * volatile ptr;
105 size_t id;
106 uint64_t _p1[16]; // padding
107
108 MySpot(size_t id) : ptr( nullptr ), id( id ) {}
109
110
111 static inline MyCtx * one() {
112 return reinterpret_cast<MyCtx *>(1);
113 }
114
115 // Main handshake of the code
116 // Single seat, first thread arriving waits
117 // Next threads unblocks current one and blocks in its place
118 // if share == true, exchange data in the process
119 bool put( MyCtx & ctx, MyData * data, bool share) {
120 // Attempt to CAS our context into the seat
121 for(;;) {
122 MyCtx * expected = this->ptr;
123 if (expected == one()) { // Seat is closed, return
124 return true;
125 }
126
127 if (__atomic_compare_exchange_n(&this->ptr, &expected, &ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
128 if(expected) {
129 if(share) {
130 expected->data = data;
131 }
132 sem_post(&expected->sem);
133 }
134 break; // We got the seat
135 }
136 }
137
138 // Block once on the seat
139 sem_wait(&ctx.sem);
140
141 // Someone woke us up, get the new data
142 return false;
143 }
144
145 // Shutdown the spot
146 // Wake current thread and mark seat as closed
147 void release() {
148 struct MyCtx * val = __atomic_exchange_n(&this->ptr, one(), __ATOMIC_SEQ_CST);
149 if (!val) {
150 return;
151 }
152
153 // Someone was there, release them
154 sem_post(&val->sem);
155 }
156};
157
158// ==================================================
159// Random number generator, Go's native one is to slow and global
160uint64_t __xorshift64( uint64_t & state ) {
161 uint64_t x = state;
162 x ^= x << 13;
163 x ^= x >> 7;
164 x ^= x << 17;
165 return state = x;
166}
167
168// ==================================================
169// Do some work by accessing 'cnt' cells in the array
170__attribute__((noinline)) void work(MyData & data, size_t cnt, uint64_t & state) {
171 for (size_t i = 0; i < cnt; i++) {
172 data.access(__xorshift64(state));
173 }
174}
175
176void thread_main( MyCtx & ctx ) {
177 uint64_t state = ctx.id;
178
179 // Wait for start
180 sem_wait(&ctx.sem);
181
182 // Main loop
183 for(;;) {
184 // Touch our current data, write to invalidate remote cache lines
185 work( *ctx.data, ctx.cnt, state );
186
187 // Wait on a random spot
188 uint64_t idx = __xorshift64(state) % ctx.spots.len;
189 bool closed = ctx.spots.ptr[idx]->put(ctx, ctx.data, ctx.share);
190
191 // Check if the experiment is over
192 if (closed) break;
193 if ( clock_mode && stop) break;
194 if (!clock_mode && ctx.result.count >= stop_count) break;
195
196 // Check everything is consistent
197 assert( ctx.data );
198
199 // write down progress and check migrations
200 int ttid = sched_getcpu();
201 ctx.result.count += 1;
202 ctx.result.gmigs += ctx.moved(ttid);
203 ctx.result.dmigs += ctx.data->moved(ttid);
204 }
205
206 __atomic_fetch_add(&threads_left, -1, __ATOMIC_SEQ_CST);
207}
208
209// ==================================================
210int main(int argc, char * argv[]) {
211 unsigned wsize = 2;
212 unsigned wcnt = 2;
213 unsigned nspots = 0;
214 bool share = false;
215 option_t opt[] = {
216 BENCH_OPT,
217 { 'n', "nspots", "Number of spots where threads sleep (nthreads - nspots are active at the same time)", nspots},
218 { 'w', "worksize", "Size of the array for each threads, in words (64bit)", wsize},
219 { 'c', "workcnt" , "Number of words to touch when working (random pick, cells can be picked more than once)", wcnt },
220 { 's', "share" , "Pass the work data to the next thread when blocking", share, parse_truefalse }
221 };
222 BENCH_OPT_PARSE("libfibre cycle benchmark");
223
224 std::cout.imbue(std::locale(""));
225 setlocale(LC_ALL, "");
226
227 unsigned long long global_count = 0;
228 unsigned long long global_gmigs = 0;
229 unsigned long long global_dmigs = 0;
230
231 if( nspots == 0 ) { nspots = nthreads - nprocs; }
232
233 uint64_t start, end;
234 {
235 cpu_set_t cpuset;
236 int ret = pthread_getaffinity_np( pthread_self(), sizeof(cpuset), &cpuset );
237 if(ret != 0) std::abort();
238
239 unsigned cnt = CPU_COUNT_S(sizeof(cpuset), &cpuset);
240 if(cnt > nprocs) {
241 unsigned extras = cnt - nprocs;
242 for(int i = 0; i < CPU_SETSIZE && extras > 0; i++) {
243 if(CPU_ISSET_S(i, sizeof(cpuset), &cpuset)) {
244 CPU_CLR_S(i, sizeof(cpuset), &cpuset);
245 extras--;
246 }
247 }
248
249 ret = pthread_setaffinity_np( pthread_self(), sizeof(cpuset), &cpuset );
250 if(ret != 0) std::abort();
251 }
252 }
253
254 {
255 MyData * data_arrays[nthreads];
256 for(size_t i = 0; i < nthreads; i++) {
257 data_arrays[i] = new MyData( i, wsize );
258 }
259
260 MySpot * spots[nspots];
261 for(unsigned i = 0; i < nspots; i++) {
262 spots[i] = new MySpot{ i };
263 }
264
265 threads_left = nthreads - nspots;
266 pthread_t threads[nthreads];
267 MyCtx * thddata[nthreads];
268 {
269 for(size_t i = 0; i < nthreads; i++) {
270 thddata[i] = new MyCtx(
271 data_arrays[i],
272 spots,
273 nspots,
274 wcnt,
275 share,
276 i
277 );
278 int ret = pthread_create( &threads[i], nullptr, reinterpret_cast<void * (*)(void *)>(thread_main), thddata[i] );
279 if(ret != 0) std::abort();
280 }
281
282 bool is_tty = isatty(STDOUT_FILENO);
283 start = timeHiRes();
284
285 for(size_t i = 0; i < nthreads; i++) {
286 sem_post(&thddata[i]->sem);
287 }
288 wait<Pthread>(start, is_tty);
289
290 stop = true;
291 end = timeHiRes();
292 printf("\nDone\n");
293
294 for(size_t i = 0; i < nthreads; i++) {
295 sem_post(&thddata[i]->sem);
296 int ret = pthread_join( threads[i], nullptr );
297 if(ret != 0) std::abort();
298 global_count += thddata[i]->result.count;
299 global_gmigs += thddata[i]->result.gmigs;
300 global_dmigs += thddata[i]->result.dmigs;
301 }
302 }
303
304 for(size_t i = 0; i < nthreads; i++) {
305 delete( data_arrays[i] );
306 }
307
308 for(size_t i = 0; i < nspots; i++) {
309 delete( spots[i] );
310 }
311 }
312
313 printf("Duration (ms) : %'ld\n", to_miliseconds(end - start));
314 printf("Number of processors : %'d\n", nprocs);
315 printf("Number of threads : %'d\n", nthreads);
316 printf("Number of spots : %'d\n", nspots);
317 printf("Work size (64bit words): %'15u\n", wsize);
318 printf("Total Operations(ops) : %'15llu\n", global_count);
319 printf("Total G Migrations : %'15llu\n", global_gmigs);
320 printf("Total D Migrations : %'15llu\n", global_dmigs);
321 printf("Ops per second : %'18.2lf\n", ((double)global_count) / to_fseconds(end - start));
322 printf("ns per ops : %'18.2lf\n", ((double)(end - start)) / global_count);
323 printf("Ops per threads : %'15llu\n", global_count / nthreads);
324 printf("Ops per procs : %'15llu\n", global_count / nprocs);
325 printf("Ops/sec/procs : %'18.2lf\n", (((double)global_count) / nprocs) / to_fseconds(end - start));
326 printf("ns per ops/procs : %'18.2lf\n", ((double)(end - start)) / (global_count / nprocs));
327 fflush(stdout);
328}
Note: See TracBrowser for help on using the repository browser.