source: benchmark/readyQ/transfer.cpp @ ab9c1b3

Last change on this file since ab9c1b3 was 65c9208, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Changed transfer benchmark to be more consistent with other rmit benchmarks

  • Property mode set to 100644
File size: 4.3 KB
Line 
1#include "rq_bench.hpp"
2#pragma GCC diagnostic push
3#pragma GCC diagnostic ignored "-Wunused-parameter"
4        #include <libfibre/fibre.h>
5#pragma GCC diagnostic pop
6
7#define PRINT(...)
8
9__lehmer64_state_t lead_seed;
10volatile unsigned leader;
11volatile size_t lead_idx;
12
13bool exhaust = false;
14volatile bool estop = false;
15
16bench_sem the_main;
17
18class __attribute__((aligned(128))) MyThread;
19
20MyThread ** threads;
21
22class __attribute__((aligned(128))) MyThread {
23        unsigned id;
24        volatile size_t idx;
25        bench_sem sem;
26
27public:
28        size_t rechecks;
29
30        MyThread(unsigned _id)
31                : id(_id), idx(0), rechecks(0)
32        {}
33
34        void unpark() { sem.post(); }
35        void park  () { sem.wait(); }
36
37        void waitgroup() {
38                uint64_t start = timeHiRes();
39                for(size_t i = 0; i < nthreads; i++) {
40                        PRINT( std::cout << "Waiting for : " << i << " (" << threads[i]->idx << ")" << std::endl; )
41                        while( threads[i]->idx != lead_idx ) {
42                                Pause();
43                                if( to_miliseconds(timeHiRes() - start) > 5'000 ) {
44                                        std::cerr << "Programs has been blocked for more than 5 secs" << std::endl;
45                                        estop = true;
46                                        the_main.post();
47                                        goto END;
48                                }
49                        }
50                }
51                END:;
52                PRINT( std::cout | "Waiting done"; )
53        }
54
55        void wakegroup(unsigned me) {
56                if(!exhaust) return;
57
58                for(size_t i = 0; i < nthreads; i++) {
59                        if(i!= me) threads[i]->sem.post();
60                }
61        }
62
63        void lead() {
64                this->idx = ++lead_idx;
65                if(lead_idx > stop_count || estop) {
66                        PRINT( std::cout << "Leader " << this->id << " done" << std::endl; )
67                        the_main.post();
68                        return;
69                }
70
71                PRINT( sout << "Leader no " << this->idx << ": " << this->id << std::endl; )
72
73                waitgroup();
74
75                unsigned nleader = __lehmer64( lead_seed ) % nthreads;
76                __atomic_store_n( &leader, nleader, __ATOMIC_SEQ_CST );
77
78                wakegroup(this->id);
79        }
80
81        void wait() {
82                fibre_yield();
83                if(lead_idx == this->idx) {
84                        this->rechecks++;
85                        return;
86                }
87
88                assert( (lead_idx - 1) == this->idx );
89                __atomic_add_fetch( &this->idx, 1, __ATOMIC_SEQ_CST );
90                if(exhaust) this->sem.wait();
91                else fibre_yield();
92        }
93
94        static void main(MyThread * arg) {
95                MyThread & self = *arg;
96                self.park();
97
98                unsigned me = self.id;
99
100                for(;;) {
101                        if(leader == me) {
102                                self.lead();
103                        }
104                        else {
105                                self.wait();
106                        }
107                        if(lead_idx > stop_count || estop) break;
108                }
109        }
110};
111
112// ==================================================
113int main(int argc, char * argv[]) {
114        __lehmer64_state_t lead_seed = getpid();
115        for(int i = 0; i < 10; i++) __lehmer64( lead_seed );
116        unsigned nprocs = 2;
117
118        option_t opt[] = {
119                BENCH_OPT,
120                { 'e', "exhaust", "Whether or not threads that have seen the new epoch should yield or park.", exhaust, parse_yesno}
121        };
122        BENCH_OPT_PARSE("cforall transition benchmark");
123
124        std::cout.imbue(std::locale(""));
125        setlocale(LC_ALL, "");
126
127        if(clock_mode) {
128                std::cerr << "This benchmark doesn't support duration mode" << std::endl;
129                return 1;
130        }
131
132        if(nprocs < 2) {
133                std::cerr << "Must have at least 2 processors" << std::endl;
134                return 1;
135        }
136
137        lead_idx = 0;
138        leader = __lehmer64( lead_seed ) % nthreads;
139
140        size_t rechecks = 0;
141
142        uint64_t start, end;
143        {
144                FibreInit(1, nprocs);
145                {
146                        Fibre ** handles = new Fibre*[nthreads];
147                        threads = new MyThread*[nthreads];
148                        for(size_t i = 0; i < nthreads; i++) {
149                                threads[i] = new MyThread( i );
150                                handles[i] = new Fibre();
151                                handles[i]->run( MyThread::main, threads[i] );
152                        }
153
154                        start = timeHiRes();
155                        for(size_t i = 0; i < nthreads; i++) {
156                                threads[i]->unpark();
157                        }
158
159                        the_main.wait();
160                        end = timeHiRes();
161
162                        for(size_t i = 0; i < nthreads; i++) {
163                                threads[i]->unpark();
164                        }
165
166                        for(size_t i = 0; i < nthreads; i++) {
167                                MyThread & thrd = *threads[i];
168                                fibre_join( handles[i], nullptr );
169                                PRINT( std::cout << i << " joined" << std::endl; )
170                                rechecks += thrd.rechecks;
171                                delete( threads[i] );
172                        }
173
174                        delete[] (threads);
175                        delete[] (handles);
176                }
177        }
178
179        std::cout << "Duration (ms)           : " << to_miliseconds(end - start) << std::endl;
180        std::cout << "Number of processors    : " << nprocs << std::endl;
181        std::cout << "Number of threads       : " << nthreads << std::endl;
182        std::cout << "Total Operations(ops)   : " << (lead_idx - 1) << std::endl;
183        std::cout << "Threads parking on wait : " << (exhaust ? "yes" : "no") << std::endl;
184        std::cout << "Rechecking              : " << rechecks << std::endl;
185        std::cout << "ns per transfer         : " << std::fixed << (((double)(end - start)) / (lead_idx)) << std::endl;
186
187
188}
Note: See TracBrowser for help on using the repository browser.