source: benchmark/readyQ/locality.cfa@ 753fb978

ADT ast-experimental enum forall-pointer-decay pthread-emulation qualifiedEnum
Last change on this file since 753fb978 was 75965a6, checked in by Peter A. Buhr <pabuhr@…>, 4 years ago

replace thread_rand with prng, replace xorshift64 with xorshift_13_7_17

  • Property mode set to 100644
File size: 7.0 KB
Line 
1#include "rq_bench.hfa"
2
3struct Result {
4 uint64_t count;
5 uint64_t dmigs;
6 uint64_t gmigs;
7};
8
9// ==================================================
10thread __attribute__((aligned(128))) MyThread {
11 struct MyData * volatile data;
12
13 struct {
14 struct MySpot ** ptr;
15 size_t len;
16 } spots;
17
18 bench_sem sem;
19
20 Result result;
21
22 bool share;
23 size_t cnt;
24 processor * ttid;
25 size_t id;
26};
27
28uint64_t moved(MyThread & this, processor * ttid) {
29 if(this.ttid == ttid) {
30 return 0;
31 }
32 this.ttid = ttid;
33 return 1;
34}
35
36// ==================================================
37struct __attribute__((aligned(128))) MyData {
38 uint64_t _p1[16]; // padding
39 uint64_t * data;
40 size_t len;
41 processor * ttid;
42 size_t id;
43 uint64_t _p2[16]; // padding
44};
45
46void ?{}(MyData & this, size_t id, size_t size) {
47 this.len = size;
48 this.data = alloc(this.len, 128`align);
49 this.ttid = active_processor();
50 this.id = id;
51
52 for(i; this.len) {
53 this.data[i] = 0;
54 }
55}
56
57uint64_t moved(MyData & this, processor * ttid) {
58 if(this.ttid == ttid) {
59 return 0;
60 }
61 this.ttid = ttid;
62 return 1;
63}
64
65__attribute__((noinline)) void access(MyData & this, size_t idx) {
66 size_t l = this.len;
67 this.data[idx % l] += 1;
68}
69
70// ==================================================
71// Atomic object where a single thread can wait
72// May exchanges data
73struct __attribute__((aligned(128))) MySpot {
74 MyThread * volatile ptr;
75 size_t id;
76 uint64_t _p1[16]; // padding
77};
78
79void ?{}(MySpot & this, size_t id) {
80 this.ptr = 0p;
81 this.id = id;
82}
83
84// Main handshake of the code
85// Single seat, first thread arriving waits
86// Next threads unblocks current one and blocks in its place
87// if share == true, exchange data in the process
88bool put( MySpot & this, MyThread & ctx, MyData * data, bool share) {
89 // Attempt to CAS our context into the seat
90 for() {
91 MyThread * expected = this.ptr;
92 if (expected == 1p) { // Seat is closed, return
93 return true;
94 }
95
96 if (__atomic_compare_exchange_n(&this.ptr, &expected, &ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
97 if(expected) {
98 if(share) {
99 expected->data = data;
100 }
101 post( expected->sem );
102 }
103 break; // We got the seat
104 }
105 }
106
107 // Block once on the seat
108 wait(ctx.sem);
109
110 // Someone woke us up, get the new data
111 return false;
112}
113
114// Shutdown the spot
115// Wake current thread and mark seat as closed
116void release( MySpot & this ) {
117 MyThread * val = __atomic_exchange_n(&this.ptr, 1p, __ATOMIC_SEQ_CST);
118 if (!val) {
119 return;
120 }
121
122 // Someone was there, release them
123 post( val->sem );
124}
125
126// ==================================================
127// Do some work by accessing 'cnt' cells in the array
128__attribute__((noinline)) void work(MyData & data, size_t cnt_, uint64_t & state) {
129 for (cnt_) {
130 access(data, xorshift_13_7_17(state));
131 }
132}
133
134void main(MyThread & this) {
135 uint64_t state = prng();
136
137 // Wait for start
138 wait(this.sem);
139
140 // Main loop
141 for() {
142 // Touch our current data, write to invalidate remote cache lines
143 work(*this.data, this.cnt, state);
144
145 // Wait on a random spot
146 uint64_t idx = xorshift_13_7_17(state) % this.spots.len;
147 bool closed = put(*this.spots.ptr[idx], this, this.data, this.share);
148
149 // Check if the experiment is over
150 if (closed) break;
151 if ( clock_mode && stop) break;
152 if (!clock_mode && this.result.count >= stop_count) break;
153
154 // Check everything is consistent
155 verify(this.data);
156
157 // write down progress and check migrations
158 processor * ttid = active_processor();
159 this.result.count += 1;
160 this.result.gmigs += moved(this, ttid);
161 this.result.dmigs += moved(*this.data, ttid);
162 }
163
164 __atomic_fetch_add(&threads_left, -1, __ATOMIC_SEQ_CST);
165}
166
167void ?{}( MyThread & this, MyData * data, MySpot ** spots, size_t spot_len, size_t cnt, bool share, size_t id) {
168 ((thread&)this){ bench_cluster };
169 this.data = data;
170 this.spots.ptr = spots;
171 this.spots.len = spot_len;
172 (this.sem){};
173 this.result.count = 0;
174 this.result.gmigs = 0;
175 this.result.dmigs = 0;
176 this.share = share;
177 this.cnt = cnt;
178 this.ttid = active_processor();
179 this.id = id;
180}
181
182// ==================================================
183int main(int argc, char * argv[]) {
184 unsigned wsize = 2;
185 unsigned wcnt = 2;
186 unsigned nspots = 0;
187 bool share = false;
188 cfa_option opt[] = {
189 BENCH_OPT,
190 { 'n', "nspots", "Number of spots where threads sleep (nthreads - nspots are active at the same time)", nspots},
191 { 'w', "worksize", "Size of the array for each threads, in words (64bit)", wsize},
192 { 'c', "workcnt" , "Number of words to touch when working (random pick, cells can be picked more than once)", wcnt },
193 { 's', "share" , "Pass the work data to the next thread when blocking", share, parse_truefalse }
194 };
195 BENCH_OPT_PARSE("cforall cycle benchmark");
196
197 unsigned long long global_count = 0;
198 unsigned long long global_gmigs = 0;
199 unsigned long long global_dmigs = 0;
200
201 if( nspots == 0 ) { nspots = nthreads - nprocs; }
202 if( nspots == 0 ) {
203 fprintf(stderr, "--nspots must be set or --nthreads set to something bigger than --nprocs\n");
204 exit(EXIT_FAILURE);
205 }
206
207 Time start, end;
208 {
209 MyData * data_arrays[nthreads];
210 for(i; nthreads) {
211 data_arrays[i] = malloc();
212 (*data_arrays[i]){ i, wsize };
213 }
214
215 MySpot * spots[nspots];
216 for(i; nspots) {
217 spots[i] = malloc();
218 (*spots[i]){ i };
219 }
220
221 BenchCluster bc = { nprocs };
222 threads_left = nprocs;
223 {
224 MyThread * threads[nthreads];
225 for(i; nthreads) {
226 threads[i] = malloc();
227 (*threads[i]){
228 data_arrays[i],
229 spots,
230 nspots,
231 wcnt,
232 share,
233 i
234 };
235 }
236
237 bool is_tty = isatty(STDOUT_FILENO);
238 start = timeHiRes();
239
240 for(i; nthreads) {
241 post( threads[i]->sem );
242 }
243 wait(start, is_tty);
244
245 stop = true;
246 end = timeHiRes();
247 printf("\nDone\n");
248
249 for(i; nthreads) {
250 post( threads[i]->sem );
251 MyThread & thrd = join( *threads[i] );
252 global_count += thrd.result.count;
253 global_gmigs += thrd.result.gmigs;
254 global_dmigs += thrd.result.dmigs;
255 }
256
257 for(i; nthreads) {
258 ^( *threads[i] ){};
259 free( threads[i] );
260 }
261 }
262
263 for(i; nthreads) {
264 ^( *data_arrays[i] ){};
265 free( data_arrays[i] );
266 }
267
268 for(i; nspots) {
269 ^( *spots[i] ){};
270 free( spots[i] );
271 }
272 }
273
274 printf("Duration (ms) : %'lf\n", (end - start)`dms);
275 printf("Number of processors : %'d\n", nprocs);
276 printf("Number of threads : %'d\n", nthreads);
277 printf("Total Operations(ops) : %'15llu\n", global_count);
278 printf("Work size (64bit words): %'15u\n", wsize);
279 printf("Total Operations(ops) : %'15llu\n", global_count);
280 printf("Total G Migrations : %'15llu\n", global_gmigs);
281 printf("Total D Migrations : %'15llu\n", global_dmigs);
282 printf("Ops per second : %'18.2lf\n", ((double)global_count) / (end - start)`ds);
283 printf("ns per ops : %'18.2lf\n", (end - start)`dns / global_count);
284 printf("Ops per threads : %'15llu\n", global_count / nthreads);
285 printf("Ops per procs : %'15llu\n", global_count / nprocs);
286 printf("Ops/sec/procs : %'18.2lf\n", (((double)global_count) / nprocs) / (end - start)`ds);
287 printf("ns per ops/procs : %'18.2lf\n", (end - start)`dns / (global_count / nprocs));
288 fflush(stdout);
289}
Note: See TracBrowser for help on using the repository browser.