source: benchmark/readyQ/locality.cc @ fefd77a

ADTast-experimentalenumpthread-emulationqualifiedEnum
Last change on this file since fefd77a was e54d0c3, checked in by Peter A. Buhr <pabuhr@…>, 4 years ago

Fixed benchmarks after another change to getTimeNsec()

  • Property mode set to 100644
File size: 8.1 KB
RevLine 
[4468a70]1#include "rq_bench.hpp"
2
3#include <pthread.h>
4#include <semaphore.h>
5#include <sched.h>
6#include <unistd.h>
7
8#include <iostream>
9
10struct Result {
11        uint64_t count = 0;
12        uint64_t dmigs = 0;
13        uint64_t gmigs = 0;
14};
15
16struct Pthread {
17        static int usleep(useconds_t usec) {
18                return ::usleep(usec);
19        }
20};
21
22// ==================================================
23struct __attribute__((aligned(128))) MyData {
24        uint64_t _p1[16];  // padding
25        uint64_t * data;
26        size_t len;
27        int ttid;
28        size_t id;
29        uint64_t _p2[16];  // padding
30
31        MyData(size_t id, size_t size)
32                : data( (uintptr_t *)aligned_alloc(128, size * sizeof(uint64_t)) )
33                , len( size )
34                , ttid( sched_getcpu() )
35                , id( id )
36        {
37                for(size_t i = 0; i < this->len; i++) {
38                        this->data[i] = 0;
39                }
40        }
41
42        uint64_t moved(int ttid) {
43                if(this->ttid == ttid) {
44                        return 0;
45                }
46                this->ttid = ttid;
47                return 1;
48        }
49
50        __attribute__((noinline)) void access(size_t idx) {
51                size_t l = this->len;
52                this->data[idx % l] += 1;
53        }
54};
55
56// ==================================================
57struct __attribute__((aligned(128))) MyCtx {
58        struct MyData * volatile data;
59
60        struct {
61                struct MySpot ** ptr;
62                size_t len;
63        } spots;
64
65        sem_t sem;
66
67        Result result;
68
69        bool share;
70        size_t cnt;
71        int ttid;
72        size_t id;
73
74        MyCtx(MyData * d, MySpot ** spots, size_t len, size_t cnt, bool share, size_t id)
75                : data( d )
76                , spots{ .ptr = spots, .len = len }
77                , share( share )
78                , cnt( cnt )
79                , ttid( sched_getcpu() )
80                , id( id )
81        {
82                int ret = sem_init( &sem, false, 0 );
83                if(ret != 0) std::abort();
84        }
85
86        ~MyCtx() {
87                int ret = sem_destroy( &sem );
88                if(ret != 0) std::abort();
89        }
90
91        uint64_t moved(int ttid) {
92                if(this->ttid == ttid) {
93                        return 0;
94                }
95                this->ttid = ttid;
96                return 1;
97        }
98};
99
100// ==================================================
101// Atomic object where a single thread can wait
102// May exchanges data
103struct __attribute__((aligned(128))) MySpot {
104        MyCtx * volatile ptr;
105        size_t id;
106        uint64_t _p1[16];  // padding
107
108        MySpot(size_t id) : ptr( nullptr ), id( id ) {}
109
110
111        static inline MyCtx * one() {
112                return reinterpret_cast<MyCtx *>(1);
113        }
114
115        // Main handshake of the code
116        // Single seat, first thread arriving waits
117        // Next threads unblocks current one and blocks in its place
118        // if share == true, exchange data in the process
119        bool put( MyCtx & ctx, MyData * data, bool share) {
120                // Attempt to CAS our context into the seat
121                for(;;) {
122                        MyCtx * expected = this->ptr;
123                        if (expected == one()) { // Seat is closed, return
124                                return true;
125                        }
126
127                        if (__atomic_compare_exchange_n(&this->ptr, &expected, &ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
128                                if(expected) {
129                                        if(share) {
130                                                expected->data = data;
131                                        }
132                                        sem_post(&expected->sem);
133                                }
134                                break; // We got the seat
135                        }
136                }
137
138                // Block once on the seat
139                sem_wait(&ctx.sem);
140
141                // Someone woke us up, get the new data
142                return false;
143        }
144
145        // Shutdown the spot
146        // Wake current thread and mark seat as closed
147        void release() {
148                struct MyCtx * val = __atomic_exchange_n(&this->ptr, one(), __ATOMIC_SEQ_CST);
149                if (!val) {
150                        return;
151                }
152
153                // Someone was there, release them
154                sem_post(&val->sem);
155        }
156};
157
158// ==================================================
159// Random number generator, Go's native one is to slow and global
160uint64_t __xorshift64( uint64_t & state ) {
161        uint64_t x = state;
162        x ^= x << 13;
163        x ^= x >> 7;
164        x ^= x << 17;
165        return state = x;
166}
167
168// ==================================================
169// Do some work by accessing 'cnt' cells in the array
170__attribute__((noinline)) void work(MyData & data, size_t cnt, uint64_t & state) {
171        for (size_t i = 0; i < cnt; i++) {
172                data.access(__xorshift64(state));
173        }
174}
175
176void thread_main( MyCtx & ctx ) {
177        uint64_t state = ctx.id;
178
179        // Wait for start
180        sem_wait(&ctx.sem);
181
182        // Main loop
183        for(;;) {
184                // Touch our current data, write to invalidate remote cache lines
185                work( *ctx.data, ctx.cnt, state );
186
187                // Wait on a random spot
188                uint64_t idx = __xorshift64(state) % ctx.spots.len;
189                bool closed = ctx.spots.ptr[idx]->put(ctx, ctx.data, ctx.share);
190
191                // Check if the experiment is over
192                if (closed) break;
193                if ( clock_mode && stop) break;
194                if (!clock_mode && ctx.result.count >= stop_count) break;
195
196                // Check everything is consistent
197                assert( ctx.data );
198
199                // write down progress and check migrations
200                int ttid = sched_getcpu();
201                ctx.result.count += 1;
202                ctx.result.gmigs += ctx.moved(ttid);
203                ctx.result.dmigs += ctx.data->moved(ttid);
204        }
205
206        __atomic_fetch_add(&threads_left, -1, __ATOMIC_SEQ_CST);
207}
208
209// ==================================================
210int main(int argc, char * argv[]) {
211        unsigned wsize = 2;
212        unsigned wcnt  = 2;
213        unsigned nspots = 0;
214        bool share = false;
215        option_t opt[] = {
216                BENCH_OPT,
217                { 'n', "nspots", "Number of spots where threads sleep (nthreads - nspots are active at the same time)", nspots},
218                { 'w', "worksize", "Size of the array for each threads, in words (64bit)", wsize},
219                { 'c', "workcnt" , "Number of words to touch when working (random pick, cells can be picked more than once)", wcnt },
220                { 's', "share"   , "Pass the work data to the next thread when blocking", share, parse_truefalse }
221        };
222        BENCH_OPT_PARSE("libfibre cycle benchmark");
223
224        std::cout.imbue(std::locale(""));
225        setlocale(LC_ALL, "");
226
227        unsigned long long global_count = 0;
228        unsigned long long global_gmigs = 0;
229        unsigned long long global_dmigs = 0;
230
231        if( nspots == 0 ) { nspots = nthreads - nprocs; }
232
233        uint64_t start, end;
234        {
235                cpu_set_t cpuset;
236                int ret = pthread_getaffinity_np( pthread_self(), sizeof(cpuset), &cpuset );
237                if(ret != 0) std::abort();
238
239                unsigned cnt = CPU_COUNT_S(sizeof(cpuset), &cpuset);
240                if(cnt > nprocs) {
241                        unsigned extras = cnt - nprocs;
242                        for(int i = 0; i < CPU_SETSIZE && extras > 0; i++) {
243                                if(CPU_ISSET_S(i, sizeof(cpuset), &cpuset)) {
244                                        CPU_CLR_S(i, sizeof(cpuset), &cpuset);
245                                        extras--;
246                                }
247                        }
248
249                        ret = pthread_setaffinity_np( pthread_self(), sizeof(cpuset), &cpuset );
250                        if(ret != 0) std::abort();
251                }
252        }
253
254        {
255                MyData * data_arrays[nthreads];
256                for(size_t i = 0; i < nthreads; i++) {
257                        data_arrays[i] = new MyData( i, wsize );
258                }
259
260                MySpot * spots[nspots];
261                for(unsigned i = 0; i < nspots; i++) {
262                        spots[i] = new MySpot{ i };
263                }
264
265                threads_left = nthreads - nspots;
266                pthread_t threads[nthreads];
267                MyCtx * thddata[nthreads];
268                {
269                        for(size_t i = 0; i < nthreads; i++) {
270                                thddata[i] = new MyCtx(
271                                        data_arrays[i],
272                                        spots,
273                                        nspots,
274                                        wcnt,
275                                        share,
276                                        i
277                                );
278                                int ret = pthread_create( &threads[i], nullptr, reinterpret_cast<void * (*)(void *)>(thread_main), thddata[i] );
279                                if(ret != 0) std::abort();
280                        }
281
282                        bool is_tty = isatty(STDOUT_FILENO);
[e54d0c3]283                        start = timeHiRes();
[4468a70]284
285                        for(size_t i = 0; i < nthreads; i++) {
286                                sem_post(&thddata[i]->sem);
287                        }
288                        wait<Pthread>(start, is_tty);
289
290                        stop = true;
[e54d0c3]291                        end = timeHiRes();
[4468a70]292                        printf("\nDone\n");
293
294                        for(size_t i = 0; i < nthreads; i++) {
295                                sem_post(&thddata[i]->sem);
296                                int ret = pthread_join( threads[i], nullptr );
297                                if(ret != 0) std::abort();
298                                global_count += thddata[i]->result.count;
299                                global_gmigs += thddata[i]->result.gmigs;
300                                global_dmigs += thddata[i]->result.dmigs;
301                        }
302                }
303
304                for(size_t i = 0; i < nthreads; i++) {
305                        delete( data_arrays[i] );
306                }
307
308                for(size_t i = 0; i < nspots; i++) {
309                        delete( spots[i] );
310                }
311        }
312
313        printf("Duration (ms)          : %'ld\n", to_miliseconds(end - start));
314        printf("Number of processors   : %'d\n", nprocs);
315        printf("Number of threads      : %'d\n", nthreads);
316        printf("Number of spots        : %'d\n", nspots);
317        printf("Work size (64bit words): %'15u\n", wsize);
318        printf("Total Operations(ops)  : %'15llu\n", global_count);
319        printf("Total G Migrations     : %'15llu\n", global_gmigs);
320        printf("Total D Migrations     : %'15llu\n", global_dmigs);
321        printf("Ops per second         : %'18.2lf\n", ((double)global_count) / to_fseconds(end - start));
322        printf("ns per ops             : %'18.2lf\n", ((double)(end - start)) / global_count);
323        printf("Ops per threads        : %'15llu\n", global_count / nthreads);
324        printf("Ops per procs          : %'15llu\n", global_count / nprocs);
325        printf("Ops/sec/procs          : %'18.2lf\n", (((double)global_count) / nprocs) / to_fseconds(end - start));
326        printf("ns per ops/procs       : %'18.2lf\n", ((double)(end - start)) / (global_count / nprocs));
327        fflush(stdout);
[e54d0c3]328}
Note: See TracBrowser for help on using the repository browser.