source: benchmark/readyQ/transfer.cpp @ 4e28d2e9

ADTast-experimentalenumforall-pointer-decaypthread-emulationqualifiedEnum
Last change on this file since 4e28d2e9 was 7711064, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Added the transfer benchmark for libfibre

  • Property mode set to 100644
File size: 4.1 KB
Line 
1#include "rq_bench.hpp"
2#pragma GCC diagnostic push
3#pragma GCC diagnostic ignored "-Wunused-parameter"
4        #include <libfibre/fibre.h>
5#pragma GCC diagnostic pop
6
7#define PRINT(...)
8
9__lehmer64_state_t lead_seed;
10volatile unsigned leader;
11volatile size_t lead_idx;
12
13bool exhaust = false;
14
15bench_sem the_main;
16
17class __attribute__((aligned(128))) MyThread;
18
19MyThread ** threads;
20
21class __attribute__((aligned(128))) MyThread {
22        unsigned id;
23        volatile size_t idx;
24        bench_sem sem;
25
26public:
27        size_t rechecks;
28
29        MyThread(unsigned _id)
30                : id(_id), idx(0), rechecks(0)
31        {}
32
33        void unpark() { sem.post(); }
34        void park  () { sem.wait(); }
35
36        void waitgroup() {
37                uint64_t start = timeHiRes();
38                for(size_t i = 0; i < nthreads; i++) {
39                        PRINT( std::cout << "Waiting for : " << i << " (" << threads[i]->idx << ")" << std::endl; )
40                        while( threads[i]->idx != lead_idx ) {
41                                Pause();
42                                if( to_miliseconds(timeHiRes() - start) > 5'000 ) {
43                                        std::cerr << "Programs has been blocked for more than 5 secs" << std::endl;
44                                        std::exit(1);
45                                }
46                        }
47                }
48                PRINT( std::cout | "Waiting done"; )
49        }
50
51        void wakegroup(unsigned me) {
52                if(!exhaust) return;
53
54                for(size_t i = 0; i < nthreads; i++) {
55                        if(i!= me) threads[i]->sem.post();
56                }
57        }
58
59        void lead() {
60                this->idx = ++lead_idx;
61                if(lead_idx > stop_count) {
62                        PRINT( std::cout << "Leader " << this->id << " done" << std::endl; )
63                        the_main.post();
64                        return;
65                }
66
67                PRINT( sout << "Leader no " << this->idx << ": " << this->id << std::endl; )
68
69                waitgroup();
70
71                unsigned nleader = __lehmer64( lead_seed ) % nthreads;
72                __atomic_store_n( &leader, nleader, __ATOMIC_SEQ_CST );
73
74                wakegroup(this->id);
75        }
76
77        void wait() {
78                fibre_yield();
79                if(lead_idx == this->idx) {
80                        this->rechecks++;
81                        return;
82                }
83
84                assert( (lead_idx - 1) == this->idx );
85                __atomic_add_fetch( &this->idx, 1, __ATOMIC_SEQ_CST );
86                if(exhaust) this->sem.wait();
87                else fibre_yield();
88        }
89
90        static void main(void * arg) {
91                MyThread & self = *reinterpret_cast<MyThread*>(arg);
92                self.park();
93
94                unsigned me = self.id;
95
96                for(;;) {
97                        if(leader == me) {
98                                self.lead();
99                        }
100                        else {
101                                self.wait();
102                        }
103                        if(lead_idx > stop_count) break;
104                }
105        }
106};
107
108// ==================================================
109int main(int argc, char * argv[]) {
110        __lehmer64_state_t lead_seed = getpid();
111        for(int i = 0; i < 10; i++) __lehmer64( lead_seed );
112        unsigned nprocs = 2;
113
114        option_t opt[] = {
115                BENCH_OPT,
116                { 'e', "exhaust", "Whether or not threads that have seen the new epoch should yield or park.", exhaust, parse_yesno}
117        };
118        BENCH_OPT_PARSE("cforall transition benchmark");
119
120        std::cout.imbue(std::locale(""));
121        setlocale(LC_ALL, "");
122
123        if(clock_mode) {
124                std::cerr << "This benchmark doesn't support duration mode" << std::endl;
125                return 1;
126        }
127
128        if(nprocs < 2) {
129                std::cerr << "Must have at least 2 processors" << std::endl;
130                return 1;
131        }
132
133        lead_idx = 0;
134        leader = __lehmer64( lead_seed ) % nthreads;
135
136        size_t rechecks = 0;
137
138        uint64_t start, end;
139        {
140                FibreInit(1, nprocs);
141                {
142                        Fibre ** handles = new Fibre*[nthreads];
143                        threads = new MyThread*[nthreads];
144                        for(size_t i = 0; i < nthreads; i++) {
145                                threads[i] = new MyThread( i );
146                                handles[i] = new Fibre( MyThread::main, threads[i] );
147                        }
148
149                        start = timeHiRes();
150                        for(size_t i = 0; i < nthreads; i++) {
151                                threads[i]->unpark();
152                        }
153
154                        the_main.wait();
155                        end = timeHiRes();
156
157                        for(size_t i = 0; i < nthreads; i++) {
158                                threads[i]->unpark();
159                        }
160
161                        for(size_t i = 0; i < nthreads; i++) {
162                                MyThread & thrd = *threads[i];
163                                fibre_join( handles[i], nullptr );
164                                PRINT( std::cout << i << " joined" << std::endl; )
165                                rechecks += thrd.rechecks;
166                                // delete( handles[i] );
167                                delete( threads[i] );
168                        }
169
170                        delete[] (threads);
171                        delete[] (handles);
172                }
173        }
174
175        std::cout << "Duration                : " << to_miliseconds(end - start) << "ms" << std::endl;
176        std::cout << "Number of processors    : " << nprocs << std::endl;
177        std::cout << "Number of threads       : " << nthreads << std::endl;
178        std::cout << "Total Operations(ops)   : " << stop_count << std::endl;
179        std::cout << "Threads parking on wait : " << (exhaust ? "yes" : "no") << std::endl;
180        std::cout << "Rechecking              : " << rechecks << std::endl;
181
182
183}
Note: See TracBrowser for help on using the repository browser.