source: benchmark/readyQ/transfer.cpp@ 753fb978

ADT ast-experimental enum forall-pointer-decay pthread-emulation qualifiedEnum
Last change on this file since 753fb978 was 6dc2db9, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Change benchmarks to consistently print duration in ms.

  • Property mode set to 100644
File size: 4.1 KB
Line 
1#include "rq_bench.hpp"
2#pragma GCC diagnostic push
3#pragma GCC diagnostic ignored "-Wunused-parameter"
4 #include <libfibre/fibre.h>
5#pragma GCC diagnostic pop
6
7#define PRINT(...)
8
9__lehmer64_state_t lead_seed;
10volatile unsigned leader;
11volatile size_t lead_idx;
12
13bool exhaust = false;
14
15bench_sem the_main;
16
17class __attribute__((aligned(128))) MyThread;
18
19MyThread ** threads;
20
21class __attribute__((aligned(128))) MyThread {
22 unsigned id;
23 volatile size_t idx;
24 bench_sem sem;
25
26public:
27 size_t rechecks;
28
29 MyThread(unsigned _id)
30 : id(_id), idx(0), rechecks(0)
31 {}
32
33 void unpark() { sem.post(); }
34 void park () { sem.wait(); }
35
36 void waitgroup() {
37 uint64_t start = timeHiRes();
38 for(size_t i = 0; i < nthreads; i++) {
39 PRINT( std::cout << "Waiting for : " << i << " (" << threads[i]->idx << ")" << std::endl; )
40 while( threads[i]->idx != lead_idx ) {
41 Pause();
42 if( to_miliseconds(timeHiRes() - start) > 5'000 ) {
43 std::cerr << "Programs has been blocked for more than 5 secs" << std::endl;
44 std::exit(1);
45 }
46 }
47 }
48 PRINT( std::cout | "Waiting done"; )
49 }
50
51 void wakegroup(unsigned me) {
52 if(!exhaust) return;
53
54 for(size_t i = 0; i < nthreads; i++) {
55 if(i!= me) threads[i]->sem.post();
56 }
57 }
58
59 void lead() {
60 this->idx = ++lead_idx;
61 if(lead_idx > stop_count) {
62 PRINT( std::cout << "Leader " << this->id << " done" << std::endl; )
63 the_main.post();
64 return;
65 }
66
67 PRINT( sout << "Leader no " << this->idx << ": " << this->id << std::endl; )
68
69 waitgroup();
70
71 unsigned nleader = __lehmer64( lead_seed ) % nthreads;
72 __atomic_store_n( &leader, nleader, __ATOMIC_SEQ_CST );
73
74 wakegroup(this->id);
75 }
76
77 void wait() {
78 fibre_yield();
79 if(lead_idx == this->idx) {
80 this->rechecks++;
81 return;
82 }
83
84 assert( (lead_idx - 1) == this->idx );
85 __atomic_add_fetch( &this->idx, 1, __ATOMIC_SEQ_CST );
86 if(exhaust) this->sem.wait();
87 else fibre_yield();
88 }
89
90 static void main(void * arg) {
91 MyThread & self = *reinterpret_cast<MyThread*>(arg);
92 self.park();
93
94 unsigned me = self.id;
95
96 for(;;) {
97 if(leader == me) {
98 self.lead();
99 }
100 else {
101 self.wait();
102 }
103 if(lead_idx > stop_count) break;
104 }
105 }
106};
107
108// ==================================================
109int main(int argc, char * argv[]) {
110 __lehmer64_state_t lead_seed = getpid();
111 for(int i = 0; i < 10; i++) __lehmer64( lead_seed );
112 unsigned nprocs = 2;
113
114 option_t opt[] = {
115 BENCH_OPT,
116 { 'e', "exhaust", "Whether or not threads that have seen the new epoch should yield or park.", exhaust, parse_yesno}
117 };
118 BENCH_OPT_PARSE("cforall transition benchmark");
119
120 std::cout.imbue(std::locale(""));
121 setlocale(LC_ALL, "");
122
123 if(clock_mode) {
124 std::cerr << "This benchmark doesn't support duration mode" << std::endl;
125 return 1;
126 }
127
128 if(nprocs < 2) {
129 std::cerr << "Must have at least 2 processors" << std::endl;
130 return 1;
131 }
132
133 lead_idx = 0;
134 leader = __lehmer64( lead_seed ) % nthreads;
135
136 size_t rechecks = 0;
137
138 uint64_t start, end;
139 {
140 FibreInit(1, nprocs);
141 {
142 Fibre ** handles = new Fibre*[nthreads];
143 threads = new MyThread*[nthreads];
144 for(size_t i = 0; i < nthreads; i++) {
145 threads[i] = new MyThread( i );
146 handles[i] = new Fibre( MyThread::main, threads[i] );
147 }
148
149 start = timeHiRes();
150 for(size_t i = 0; i < nthreads; i++) {
151 threads[i]->unpark();
152 }
153
154 the_main.wait();
155 end = timeHiRes();
156
157 for(size_t i = 0; i < nthreads; i++) {
158 threads[i]->unpark();
159 }
160
161 for(size_t i = 0; i < nthreads; i++) {
162 MyThread & thrd = *threads[i];
163 fibre_join( handles[i], nullptr );
164 PRINT( std::cout << i << " joined" << std::endl; )
165 rechecks += thrd.rechecks;
166 // delete( handles[i] );
167 delete( threads[i] );
168 }
169
170 delete[] (threads);
171 delete[] (handles);
172 }
173 }
174
175 std::cout << "Duration (ms) : " << to_miliseconds(end - start) << std::endl;
176 std::cout << "Number of processors : " << nprocs << std::endl;
177 std::cout << "Number of threads : " << nthreads << std::endl;
178 std::cout << "Total Operations(ops) : " << stop_count << std::endl;
179 std::cout << "Threads parking on wait : " << (exhaust ? "yes" : "no") << std::endl;
180 std::cout << "Rechecking : " << rechecks << std::endl;
181
182
183}
Note: See TracBrowser for help on using the repository browser.