source: benchmark/readyQ/locality.cfa @ 5c231c1

ADTast-experimentalpthread-emulationqualifiedEnum
Last change on this file since 5c231c1 was 75965a6, checked in by Peter A. Buhr <pabuhr@…>, 3 years ago

replace thread_rand with prng, replace xorshift64 with xorshift_13_7_17

  • Property mode set to 100644
File size: 7.0 KB
Line 
1#include "rq_bench.hfa"
2
3struct Result {
4        uint64_t count;
5        uint64_t dmigs;
6        uint64_t gmigs;
7};
8
9// ==================================================
10thread __attribute__((aligned(128))) MyThread {
11        struct MyData * volatile data;
12
13        struct {
14                struct MySpot ** ptr;
15                size_t len;
16        } spots;
17
18        bench_sem sem;
19
20        Result result;
21
22        bool share;
23        size_t cnt;
24        processor * ttid;
25        size_t id;
26};
27
28uint64_t moved(MyThread & this, processor * ttid) {
29        if(this.ttid == ttid) {
30                return 0;
31        }
32        this.ttid = ttid;
33        return 1;
34}
35
36// ==================================================
37struct __attribute__((aligned(128))) MyData {
38        uint64_t _p1[16];  // padding
39        uint64_t * data;
40        size_t len;
41        processor * ttid;
42        size_t id;
43        uint64_t _p2[16];  // padding
44};
45
46void ?{}(MyData & this, size_t id, size_t size) {
47        this.len = size;
48        this.data = alloc(this.len, 128`align);
49        this.ttid = active_processor();
50        this.id = id;
51
52        for(i; this.len) {
53                this.data[i] = 0;
54        }
55}
56
57uint64_t moved(MyData & this, processor * ttid) {
58        if(this.ttid == ttid) {
59                return 0;
60        }
61        this.ttid = ttid;
62        return 1;
63}
64
65__attribute__((noinline)) void access(MyData & this, size_t idx) {
66        size_t l = this.len;
67        this.data[idx % l] += 1;
68}
69
70// ==================================================
71// Atomic object where a single thread can wait
72// May exchanges data
73struct __attribute__((aligned(128))) MySpot {
74        MyThread * volatile ptr;
75        size_t id;
76        uint64_t _p1[16];  // padding
77};
78
79void ?{}(MySpot & this, size_t id) {
80        this.ptr = 0p;
81        this.id  = id;
82}
83
84// Main handshake of the code
85// Single seat, first thread arriving waits
86// Next threads unblocks current one and blocks in its place
87// if share == true, exchange data in the process
88bool put( MySpot & this, MyThread & ctx, MyData * data, bool share) {
89        // Attempt to CAS our context into the seat
90        for() {
91                MyThread * expected = this.ptr;
92                if (expected == 1p) { // Seat is closed, return
93                        return true;
94                }
95
96                if (__atomic_compare_exchange_n(&this.ptr, &expected, &ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
97                        if(expected) {
98                                if(share) {
99                                        expected->data = data;
100                                }
101                                post( expected->sem );
102                        }
103                        break; // We got the seat
104                }
105        }
106
107        // Block once on the seat
108        wait(ctx.sem);
109
110        // Someone woke us up, get the new data
111        return false;
112}
113
114// Shutdown the spot
115// Wake current thread and mark seat as closed
116void release( MySpot & this ) {
117        MyThread * val = __atomic_exchange_n(&this.ptr, 1p, __ATOMIC_SEQ_CST);
118        if (!val) {
119                return;
120        }
121
122        // Someone was there, release them
123        post( val->sem );
124}
125
126// ==================================================
127// Do some work by accessing 'cnt' cells in the array
128__attribute__((noinline)) void work(MyData & data, size_t cnt_, uint64_t & state) {
129        for (cnt_) {
130                access(data, xorshift_13_7_17(state));
131        }
132}
133
134void main(MyThread & this) {
135        uint64_t state = prng();
136
137        // Wait for start
138        wait(this.sem);
139
140        // Main loop
141        for() {
142                // Touch our current data, write to invalidate remote cache lines
143                work(*this.data, this.cnt, state);
144
145                // Wait on a random spot
146                uint64_t idx = xorshift_13_7_17(state) % this.spots.len;
147                bool closed = put(*this.spots.ptr[idx], this, this.data, this.share);
148
149                // Check if the experiment is over
150                if (closed) break;
151                if ( clock_mode && stop) break;
152                if (!clock_mode && this.result.count >= stop_count) break;
153
154                // Check everything is consistent
155                verify(this.data);
156
157                // write down progress and check migrations
158                processor * ttid = active_processor();
159                this.result.count += 1;
160                this.result.gmigs += moved(this, ttid);
161                this.result.dmigs += moved(*this.data, ttid);
162        }
163
164        __atomic_fetch_add(&threads_left, -1, __ATOMIC_SEQ_CST);
165}
166
167void ?{}( MyThread & this, MyData * data, MySpot ** spots, size_t spot_len, size_t cnt, bool share, size_t id) {
168        ((thread&)this){ bench_cluster };
169        this.data = data;
170        this.spots.ptr = spots;
171        this.spots.len = spot_len;
172        (this.sem){};
173        this.result.count = 0;
174        this.result.gmigs = 0;
175        this.result.dmigs = 0;
176        this.share = share;
177        this.cnt = cnt;
178        this.ttid = active_processor();
179        this.id = id;
180}
181
182// ==================================================
183int main(int argc, char * argv[]) {
184        unsigned wsize = 2;
185        unsigned wcnt  = 2;
186        unsigned nspots = 0;
187        bool share = false;
188        cfa_option opt[] = {
189                BENCH_OPT,
190                { 'n', "nspots", "Number of spots where threads sleep (nthreads - nspots are active at the same time)", nspots},
191                { 'w', "worksize", "Size of the array for each threads, in words (64bit)", wsize},
192                { 'c', "workcnt" , "Number of words to touch when working (random pick, cells can be picked more than once)", wcnt },
193                { 's', "share"   , "Pass the work data to the next thread when blocking", share, parse_truefalse }
194        };
195        BENCH_OPT_PARSE("cforall cycle benchmark");
196
197        unsigned long long global_count = 0;
198        unsigned long long global_gmigs = 0;
199        unsigned long long global_dmigs = 0;
200
201        if( nspots == 0 ) { nspots = nthreads - nprocs; }
202        if( nspots == 0 ) {
203                fprintf(stderr, "--nspots must be set or --nthreads set to something bigger than --nprocs\n");
204                exit(EXIT_FAILURE);
205        }
206
207        Time start, end;
208        {
209                MyData * data_arrays[nthreads];
210                for(i; nthreads) {
211                        data_arrays[i] = malloc();
212                        (*data_arrays[i]){ i, wsize };
213                }
214
215                MySpot * spots[nspots];
216                for(i; nspots) {
217                        spots[i] = malloc();
218                        (*spots[i]){ i };
219                }
220
221                BenchCluster bc = { nprocs };
222                threads_left = nprocs;
223                {
224                        MyThread * threads[nthreads];
225                        for(i; nthreads) {
226                                threads[i] = malloc();
227                                (*threads[i]){
228                                        data_arrays[i],
229                                        spots,
230                                        nspots,
231                                        wcnt,
232                                        share,
233                                        i
234                                };
235                        }
236
237                        bool is_tty = isatty(STDOUT_FILENO);
238                        start = timeHiRes();
239
240                        for(i; nthreads) {
241                                post( threads[i]->sem );
242                        }
243                        wait(start, is_tty);
244
245                        stop = true;
246                        end = timeHiRes();
247                        printf("\nDone\n");
248
249                        for(i; nthreads) {
250                                post( threads[i]->sem );
251                                MyThread & thrd = join( *threads[i] );
252                                global_count += thrd.result.count;
253                                global_gmigs += thrd.result.gmigs;
254                                global_dmigs += thrd.result.dmigs;
255                        }
256
257                        for(i; nthreads) {
258                                ^( *threads[i] ){};
259                                free( threads[i] );
260                        }
261                }
262
263                for(i; nthreads) {
264                        ^( *data_arrays[i] ){};
265                        free( data_arrays[i] );
266                }
267
268                for(i; nspots) {
269                        ^( *spots[i] ){};
270                        free( spots[i] );
271                }
272        }
273
274        printf("Duration (ms)          : %'lf\n", (end - start)`dms);
275        printf("Number of processors   : %'d\n", nprocs);
276        printf("Number of threads      : %'d\n", nthreads);
277        printf("Total Operations(ops)  : %'15llu\n", global_count);
278        printf("Work size (64bit words): %'15u\n", wsize);
279        printf("Total Operations(ops)  : %'15llu\n", global_count);
280        printf("Total G Migrations     : %'15llu\n", global_gmigs);
281        printf("Total D Migrations     : %'15llu\n", global_dmigs);
282        printf("Ops per second         : %'18.2lf\n", ((double)global_count) / (end - start)`ds);
283        printf("ns per ops             : %'18.2lf\n", (end - start)`dns / global_count);
284        printf("Ops per threads        : %'15llu\n", global_count / nthreads);
285        printf("Ops per procs          : %'15llu\n", global_count / nprocs);
286        printf("Ops/sec/procs          : %'18.2lf\n", (((double)global_count) / nprocs) / (end - start)`ds);
287        printf("ns per ops/procs       : %'18.2lf\n", (end - start)`dns / (global_count / nprocs));
288        fflush(stdout);
289}
Note: See TracBrowser for help on using the repository browser.