source: benchmark/readyQ/transfer.cpp@ 1e538fb

Last change on this file since 1e538fb was 65c9208, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Changed transfer benchmark to be more consistent with other rmit benchmarks

  • Property mode set to 100644
File size: 4.3 KB
Line 
1#include "rq_bench.hpp"
2#pragma GCC diagnostic push
3#pragma GCC diagnostic ignored "-Wunused-parameter"
4 #include <libfibre/fibre.h>
5#pragma GCC diagnostic pop
6
7#define PRINT(...)
8
9__lehmer64_state_t lead_seed;
10volatile unsigned leader;
11volatile size_t lead_idx;
12
13bool exhaust = false;
14volatile bool estop = false;
15
16bench_sem the_main;
17
18class __attribute__((aligned(128))) MyThread;
19
20MyThread ** threads;
21
22class __attribute__((aligned(128))) MyThread {
23 unsigned id;
24 volatile size_t idx;
25 bench_sem sem;
26
27public:
28 size_t rechecks;
29
30 MyThread(unsigned _id)
31 : id(_id), idx(0), rechecks(0)
32 {}
33
34 void unpark() { sem.post(); }
35 void park () { sem.wait(); }
36
37 void waitgroup() {
38 uint64_t start = timeHiRes();
39 for(size_t i = 0; i < nthreads; i++) {
40 PRINT( std::cout << "Waiting for : " << i << " (" << threads[i]->idx << ")" << std::endl; )
41 while( threads[i]->idx != lead_idx ) {
42 Pause();
43 if( to_miliseconds(timeHiRes() - start) > 5'000 ) {
44 std::cerr << "Programs has been blocked for more than 5 secs" << std::endl;
45 estop = true;
46 the_main.post();
47 goto END;
48 }
49 }
50 }
51 END:;
52 PRINT( std::cout | "Waiting done"; )
53 }
54
55 void wakegroup(unsigned me) {
56 if(!exhaust) return;
57
58 for(size_t i = 0; i < nthreads; i++) {
59 if(i!= me) threads[i]->sem.post();
60 }
61 }
62
63 void lead() {
64 this->idx = ++lead_idx;
65 if(lead_idx > stop_count || estop) {
66 PRINT( std::cout << "Leader " << this->id << " done" << std::endl; )
67 the_main.post();
68 return;
69 }
70
71 PRINT( sout << "Leader no " << this->idx << ": " << this->id << std::endl; )
72
73 waitgroup();
74
75 unsigned nleader = __lehmer64( lead_seed ) % nthreads;
76 __atomic_store_n( &leader, nleader, __ATOMIC_SEQ_CST );
77
78 wakegroup(this->id);
79 }
80
81 void wait() {
82 fibre_yield();
83 if(lead_idx == this->idx) {
84 this->rechecks++;
85 return;
86 }
87
88 assert( (lead_idx - 1) == this->idx );
89 __atomic_add_fetch( &this->idx, 1, __ATOMIC_SEQ_CST );
90 if(exhaust) this->sem.wait();
91 else fibre_yield();
92 }
93
94 static void main(MyThread * arg) {
95 MyThread & self = *arg;
96 self.park();
97
98 unsigned me = self.id;
99
100 for(;;) {
101 if(leader == me) {
102 self.lead();
103 }
104 else {
105 self.wait();
106 }
107 if(lead_idx > stop_count || estop) break;
108 }
109 }
110};
111
112// ==================================================
113int main(int argc, char * argv[]) {
114 __lehmer64_state_t lead_seed = getpid();
115 for(int i = 0; i < 10; i++) __lehmer64( lead_seed );
116 unsigned nprocs = 2;
117
118 option_t opt[] = {
119 BENCH_OPT,
120 { 'e', "exhaust", "Whether or not threads that have seen the new epoch should yield or park.", exhaust, parse_yesno}
121 };
122 BENCH_OPT_PARSE("cforall transition benchmark");
123
124 std::cout.imbue(std::locale(""));
125 setlocale(LC_ALL, "");
126
127 if(clock_mode) {
128 std::cerr << "This benchmark doesn't support duration mode" << std::endl;
129 return 1;
130 }
131
132 if(nprocs < 2) {
133 std::cerr << "Must have at least 2 processors" << std::endl;
134 return 1;
135 }
136
137 lead_idx = 0;
138 leader = __lehmer64( lead_seed ) % nthreads;
139
140 size_t rechecks = 0;
141
142 uint64_t start, end;
143 {
144 FibreInit(1, nprocs);
145 {
146 Fibre ** handles = new Fibre*[nthreads];
147 threads = new MyThread*[nthreads];
148 for(size_t i = 0; i < nthreads; i++) {
149 threads[i] = new MyThread( i );
150 handles[i] = new Fibre();
151 handles[i]->run( MyThread::main, threads[i] );
152 }
153
154 start = timeHiRes();
155 for(size_t i = 0; i < nthreads; i++) {
156 threads[i]->unpark();
157 }
158
159 the_main.wait();
160 end = timeHiRes();
161
162 for(size_t i = 0; i < nthreads; i++) {
163 threads[i]->unpark();
164 }
165
166 for(size_t i = 0; i < nthreads; i++) {
167 MyThread & thrd = *threads[i];
168 fibre_join( handles[i], nullptr );
169 PRINT( std::cout << i << " joined" << std::endl; )
170 rechecks += thrd.rechecks;
171 delete( threads[i] );
172 }
173
174 delete[] (threads);
175 delete[] (handles);
176 }
177 }
178
179 std::cout << "Duration (ms) : " << to_miliseconds(end - start) << std::endl;
180 std::cout << "Number of processors : " << nprocs << std::endl;
181 std::cout << "Number of threads : " << nthreads << std::endl;
182 std::cout << "Total Operations(ops) : " << (lead_idx - 1) << std::endl;
183 std::cout << "Threads parking on wait : " << (exhaust ? "yes" : "no") << std::endl;
184 std::cout << "Rechecking : " << rechecks << std::endl;
185 std::cout << "ns per transfer : " << std::fixed << (((double)(end - start)) / (lead_idx)) << std::endl;
186
187
188}
Note: See TracBrowser for help on using the repository browser.