source: benchmark/readyQ/locality.cpp @ 09ee131

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 09ee131 was f03209d3, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Locality benchmark now supports explicit number of spots instead of using nthreads - nprocs

  • Property mode set to 100644
File size: 8.5 KB
RevLine 
[c4241b6]1#include "rq_bench.hpp"
2#pragma GCC diagnostic push
3#pragma GCC diagnostic ignored "-Wunused-parameter"
4        #include <libfibre/fibre.h>
5#pragma GCC diagnostic pop
6
7struct Result {
8        uint64_t count = 0;
9        uint64_t dmigs = 0;
10        uint64_t gmigs = 0;
11};
12
13class __attribute__((aligned(128))) bench_sem {
14        Fibre * volatile ptr = nullptr;
15public:
16        inline bool wait() {
17                static Fibre * const ready  = reinterpret_cast<Fibre * const>(1ull);
18                for(;;) {
19                        Fibre * expected = this->ptr;
20                        if(expected == ready) {
21                                if(__atomic_compare_exchange_n(&this->ptr, &expected, nullptr, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
22                                        return false;
23                                }
24                        }
25                        else {
26                                /* paranoid */ assert( expected == nullptr );
27                                if(__atomic_compare_exchange_n(&this->ptr, &expected, fibre_self(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
28                                        fibre_park();
29                                        return true;
30                                }
31                        }
32
33                }
34        }
35
36        inline bool post() {
37                static Fibre * const ready  = reinterpret_cast<Fibre * const>(1ull);
38                for(;;) {
39                        Fibre * expected = this->ptr;
40                        if(expected == ready) return false;
41                        if(expected == nullptr) {
42                                if(__atomic_compare_exchange_n(&this->ptr, &expected, ready, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
43                                        return false;
44                                }
45                        }
46                        else {
47                                if(__atomic_compare_exchange_n(&this->ptr, &expected, nullptr, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
48                                        fibre_unpark( expected );
49                                        return true;
50                                }
51                        }
52                }
53        }
54};
55
56// ==================================================
57struct __attribute__((aligned(128))) MyData {
58        uint64_t _p1[16];  // padding
59        uint64_t * data;
60        size_t len;
61        BaseProcessor * ttid;
62        size_t id;
63        uint64_t _p2[16];  // padding
64
65        MyData(size_t id, size_t size)
66                : data( (uintptr_t *)aligned_alloc(128, size * sizeof(uint64_t)) )
67                , len( size )
68                , ttid( &Context::CurrProcessor() )
69                , id( id )
70        {
71                for(size_t i = 0; i < this->len; i++) {
72                        this->data[i] = 0;
73                }
74        }
75
76        uint64_t moved(BaseProcessor * ttid) {
77                if(this->ttid == ttid) {
78                        return 0;
79                }
80                this->ttid = ttid;
81                return 1;
82        }
83
84        __attribute__((noinline)) void access(size_t idx) {
85                size_t l = this->len;
86                this->data[idx % l] += 1;
87        }
88};
89
90// ==================================================
91struct __attribute__((aligned(128))) MyCtx {
92        struct MyData * volatile data;
93
94        struct {
95                struct MySpot ** ptr;
96                size_t len;
97        } spots;
98
99        bench_sem sem;
100
101        Result result;
102
103        bool share;
104        size_t cnt;
105        BaseProcessor * ttid;
106        size_t id;
107
108        MyCtx(MyData * d, MySpot ** spots, size_t len, size_t cnt, bool share, size_t id)
109                : data( d )
110                , spots{ .ptr = spots, .len = len }
111                , share( share )
112                , cnt( cnt )
113                , ttid( &Context::CurrProcessor() )
114                , id( id )
115        {}
116
117        uint64_t moved(BaseProcessor * ttid) {
118                if(this->ttid == ttid) {
119                        return 0;
120                }
121                this->ttid = ttid;
122                return 1;
123        }
124};
125
126// ==================================================
127// Atomic object where a single thread can wait
128// May exchanges data
129struct __attribute__((aligned(128))) MySpot {
130        MyCtx * volatile ptr;
131        size_t id;
132        uint64_t _p1[16];  // padding
133
134        MySpot(size_t id) : ptr( nullptr ), id( id ) {}
135
136
137        static inline MyCtx * one() {
138                return reinterpret_cast<MyCtx *>(1);
139        }
140
141        // Main handshake of the code
142        // Single seat, first thread arriving waits
143        // Next threads unblocks current one and blocks in its place
144        // if share == true, exchange data in the process
145        bool put( MyCtx & ctx, MyData * data, bool share) {
146                // Attempt to CAS our context into the seat
147                for(;;) {
148                        MyCtx * expected = this->ptr;
149                        if (expected == one()) { // Seat is closed, return
150                                return true;
151                        }
152
153                        if (__atomic_compare_exchange_n(&this->ptr, &expected, &ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
154                                if(expected) {
155                                        if(share) {
156                                                expected->data = data;
157                                        }
158                                        expected->sem.post();
159                                }
160                                break; // We got the seat
161                        }
162                }
163
164                // Block once on the seat
165                ctx.sem.wait();
166
167                // Someone woke us up, get the new data
168                return false;
169        }
170
171        // Shutdown the spot
172        // Wake current thread and mark seat as closed
173        void release() {
174                struct MyCtx * val = __atomic_exchange_n(&this->ptr, one(), __ATOMIC_SEQ_CST);
175                if (!val) {
176                        return;
177                }
178
179                // Someone was there, release them
180                val->sem.post();
181        }
182};
183
184// ==================================================
185// Random number generator, Go's native one is to slow and global
186uint64_t __xorshift64( uint64_t & state ) {
187        uint64_t x = state;
188        x ^= x << 13;
189        x ^= x >> 7;
190        x ^= x << 17;
191        return state = x;
192}
193
194// ==================================================
195// Do some work by accessing 'cnt' cells in the array
196__attribute__((noinline)) void work(MyData & data, size_t cnt, uint64_t & state) {
197        for (size_t i = 0; i < cnt; i++) {
198                data.access(__xorshift64(state));
199        }
200}
201
202void thread_main( MyCtx & ctx ) {
[f03209d3]203        uint64_t state = ctx.id;
[c4241b6]204
205        // Wait for start
206        ctx.sem.wait();
207
208        // Main loop
209        for(;;) {
210                // Touch our current data, write to invalidate remote cache lines
211                work( *ctx.data, ctx.cnt, state );
212
213                // Wait on a random spot
214                uint64_t idx = __xorshift64(state) % ctx.spots.len;
215                bool closed = ctx.spots.ptr[idx]->put(ctx, ctx.data, ctx.share);
216
217                // Check if the experiment is over
218                if (closed) break;
219                if ( clock_mode && stop) break;
220                if (!clock_mode && ctx.result.count >= stop_count) break;
221
222                // Check everything is consistent
223                assert( ctx.data );
224
225                // write down progress and check migrations
226                BaseProcessor * ttid = &Context::CurrProcessor();
227                ctx.result.count += 1;
228                ctx.result.gmigs += ctx.moved(ttid);
229                ctx.result.dmigs += ctx.data->moved(ttid);
230        }
231
232        __atomic_fetch_add(&threads_left, -1, __ATOMIC_SEQ_CST);
233}
234
235// ==================================================
236int main(int argc, char * argv[]) {
237        unsigned wsize = 2;
238        unsigned wcnt  = 2;
[f03209d3]239        unsigned nspots = 0;
[c4241b6]240        bool share = false;
241        option_t opt[] = {
242                BENCH_OPT,
[f03209d3]243                { 'n', "nspots", "Number of spots where threads sleep (nthreads - nspots are active at the same time)", nspots},
[c4241b6]244                { 'w', "worksize", "Size of the array for each threads, in words (64bit)", wsize},
245                { 'c', "workcnt" , "Number of words to touch when working (random pick, cells can be picked more than once)", wcnt },
246                { 's', "share"   , "Pass the work data to the next thread when blocking", share, parse_truefalse }
247        };
248        BENCH_OPT_PARSE("libfibre cycle benchmark");
249
250        std::cout.imbue(std::locale(""));
251        setlocale(LC_ALL, "");
252
253        unsigned long long global_count = 0;
254        unsigned long long global_gmigs = 0;
255        unsigned long long global_dmigs = 0;
256
[f03209d3]257        if( nspots == 0 ) { nspots = nthreads - nprocs; }
258
[c4241b6]259        uint64_t start, end;
260        {
261                FibreInit(1, nprocs);
262                MyData * data_arrays[nthreads];
263                for(size_t i = 0; i < nthreads; i++) {
264                        data_arrays[i] = new MyData( i, wsize );
265                }
266
[f03209d3]267                MySpot * spots[nspots];
268                for(unsigned i = 0; i < nspots; i++) {
[c4241b6]269                        spots[i] = new MySpot{ i };
270                }
271
[f03209d3]272                threads_left = nthreads - nspots;
[c4241b6]273                Fibre * threads[nthreads];
274                MyCtx * thddata[nthreads];
275                {
276                        for(size_t i = 0; i < nthreads; i++) {
277                                thddata[i] = new MyCtx(
278                                        data_arrays[i],
279                                        spots,
[f03209d3]280                                        nspots,
[c4241b6]281                                        wcnt,
282                                        share,
283                                        i
284                                );
285                                threads[i] = new Fibre( reinterpret_cast<void (*)(void *)>(thread_main), thddata[i] );
286                        }
287
288                        bool is_tty = isatty(STDOUT_FILENO);
289                        start = getTimeNsec();
290
291                        for(size_t i = 0; i < nthreads; i++) {
292                                thddata[i]->sem.post();
293                        }
294                        wait<Fibre>(start, is_tty);
295
296                        stop = true;
297                        end = getTimeNsec();
298                        printf("\nDone\n");
299
300                        for(size_t i = 0; i < nthreads; i++) {
301                                thddata[i]->sem.post();
302                                fibre_join( threads[i], nullptr );
303                                global_count += thddata[i]->result.count;
304                                global_gmigs += thddata[i]->result.gmigs;
305                                global_dmigs += thddata[i]->result.dmigs;
306                        }
307                }
308
309                for(size_t i = 0; i < nthreads; i++) {
310                        delete( data_arrays[i] );
311                }
312
[f03209d3]313                for(size_t i = 0; i < nspots; i++) {
[c4241b6]314                        delete( spots[i] );
315                }
316        }
317
318        printf("Duration (ms)          : %'ld\n", to_miliseconds(end - start));
319        printf("Number of processors   : %'d\n", nprocs);
320        printf("Number of threads      : %'d\n", nthreads);
[f03209d3]321        printf("Number of spots        : %'d\n", nspots);
[c4241b6]322        printf("Work size (64bit words): %'15u\n", wsize);
323        printf("Total Operations(ops)  : %'15llu\n", global_count);
324        printf("Total G Migrations     : %'15llu\n", global_gmigs);
325        printf("Total D Migrations     : %'15llu\n", global_dmigs);
[f03209d3]326        printf("Ops per second         : %'18.2lf\n", ((double)global_count) / to_fseconds(end - start));
327        printf("ns per ops             : %'18.2lf\n", ((double)(end - start)) / global_count);
328        printf("Ops per threads        : %'15llu\n", global_count / nthreads);
329        printf("Ops per procs          : %'15llu\n", global_count / nprocs);
330        printf("Ops/sec/procs          : %'18.2lf\n", (((double)global_count) / nprocs) / to_fseconds(end - start));
331        printf("ns per ops/procs       : %'18.2lf\n", ((double)(end - start)) / (global_count / nprocs));
[c4241b6]332        fflush(stdout);
333}
Note: See TracBrowser for help on using the repository browser.