source: benchmark/readyQ/transfer.rs@ a882b68

ADT ast-experimental
Last change on this file since a882b68 was 65c9208, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Changed transfer benchmark to be more consistent with other rmit benchmarks

  • Property mode set to 100644
File size: 7.5 KB
Line 
1#[cfg(debug_assertions)]
2use std::io::{self, Write};
3
4use std::process;
5use std::option;
6use std::hint;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
9use std::time::{Instant,Duration};
10
11use tokio::runtime::Builder;
12use tokio::sync;
13use tokio::task;
14
15use rand::Rng;
16
17use clap::{Arg, App};
18use num_format::{Locale, ToFormattedString};
19
20#[path = "../bench.rs"]
21mod bench;
22
23#[cfg(debug_assertions)]
24macro_rules! debug {
25 ($x:expr) => {
26 println!( $x );
27 io::stdout().flush().unwrap();
28 };
29 ($x:expr, $($more:expr),+) => {
30 println!( $x, $($more), * );
31 io::stdout().flush().unwrap();
32 };
33}
34
35#[cfg(not(debug_assertions))]
36macro_rules! debug {
37 ($x:expr ) => { () };
38 ($x:expr, $($more:expr),+) => { () };
39}
40
41fn parse_yes_no(opt: option::Option<&str>, default: bool) -> bool {
42 match opt {
43 Some(val) => {
44 match val {
45 "yes" => true,
46 "Y" => true,
47 "y" => true,
48 "no" => false,
49 "N" => false,
50 "n" => false,
51 "maybe" | "I don't know" | "Can you repeat the question?" => {
52 eprintln!("Lines for 'Malcolm in the Middle' are not acceptable values of parameter 'exhaust'");
53 std::process::exit(1);
54 },
55 _ => {
56 eprintln!("parameter 'exhaust' must have value 'yes' or 'no', was {}", val);
57 std::process::exit(1);
58 },
59 }
60 },
61 _ => {
62 default
63 },
64 }
65}
66
67struct LeaderInfo {
68 id: AtomicUsize,
69 idx: AtomicUsize,
70 estop: AtomicBool,
71 seed: u128,
72}
73
74impl LeaderInfo {
75 pub fn new(nthreads: usize) -> LeaderInfo {
76 let this = LeaderInfo{
77 id: AtomicUsize::new(nthreads),
78 idx: AtomicUsize::new(0),
79 estop: AtomicBool::new(false),
80 seed: process::id() as u128
81 };
82
83 let mut rng = rand::thread_rng();
84
85 for _ in 0..rng.gen_range(0..10) {
86 this.next( nthreads );
87 }
88
89 this
90 }
91
92 pub fn next(&self, len: usize) {
93 let n = bench::_lehmer64( unsafe {
94 let r1 = &self.seed as *const u128;
95 let r2 = r1 as *mut u128;
96 &mut *r2
97 } ) as usize;
98 self.id.store( n % len , Ordering::SeqCst );
99 }
100}
101
102struct MyThread {
103 id: usize,
104 idx: AtomicUsize,
105 sem: sync::Semaphore,
106}
107
108fn waitgroup(leader: &LeaderInfo, idx: usize, threads: &Vec<Arc<MyThread>>, main_sem: &sync::Semaphore) {
109 let start = Instant::now();
110 'outer: for t in threads {
111 debug!( "Waiting for :{} ({})", t.id, t.idx.load(Ordering::Relaxed) );
112 while t.idx.load(Ordering::Relaxed) != idx {
113 hint::spin_loop();
114 if start.elapsed() > Duration::from_secs(5) {
115 eprintln!("Programs has been blocked for more than 5 secs");
116 leader.estop.store(true, Ordering::Relaxed);
117 main_sem.add_permits(1);
118 break 'outer;
119 }
120 }
121 }
122 debug!( "Waiting done" );
123}
124
125fn wakegroup(exhaust: bool, me: usize, threads: &Vec<Arc<MyThread>>) {
126 if !exhaust { return; }
127
128 for i in 0..threads.len() {
129 if i != me {
130 debug!( "Leader waking {}", i);
131 threads[i].sem.add_permits(1);
132 }
133 }
134}
135
136fn lead(exhaust: bool, leader: &LeaderInfo, this: & MyThread, threads: &Vec<Arc<MyThread>>, main_sem: &sync::Semaphore, exp: &bench::BenchData) {
137 let nidx = leader.idx.load(Ordering::Relaxed) + 1;
138 this.idx.store(nidx, Ordering::Relaxed);
139 leader.idx.store(nidx, Ordering::Relaxed);
140
141 if nidx as u64 > exp.stop_count || leader.estop.load(Ordering::Relaxed) {
142 debug!( "Leader {} done", this.id);
143 main_sem.add_permits(1);
144 return;
145 }
146
147 debug!( "====================\nLeader no {} : {}", nidx, this.id);
148
149 waitgroup(leader, nidx, threads, main_sem);
150
151 leader.next( threads.len() );
152
153 wakegroup(exhaust, this.id, threads);
154
155 debug!( "Leader no {} : {} done\n====================", nidx, this.id);
156}
157
158async fn wait(exhaust: bool, leader: &LeaderInfo, this: &MyThread, rechecks: &mut usize) {
159 task::yield_now().await;
160
161 if leader.idx.load(Ordering::Relaxed) == this.idx.load(Ordering::Relaxed) {
162 debug!("Waiting {} recheck", this.id);
163 *rechecks += 1;
164 return;
165 }
166
167 debug!("Waiting {}", this.id);
168
169 debug_assert!( (leader.idx.load(Ordering::Relaxed) - 1) == this.idx.load(Ordering::Relaxed) );
170 this.idx.fetch_add(1, Ordering::SeqCst);
171 if exhaust {
172 debug!("Waiting {} sem", this.id);
173 this.sem.acquire().await.forget();
174 }
175 else {
176 debug!("Waiting {} yield", this.id);
177 task::yield_now().await;
178 }
179
180 debug!("Waiting {} done", this.id);
181}
182
183async fn transfer_main( me: usize, leader: Arc<LeaderInfo>, threads: Arc<Vec<Arc<MyThread>>>, exhaust: bool, start: Arc<sync::Barrier>, main_sem: Arc<sync::Semaphore>, exp: Arc<bench::BenchData>) -> usize{
184 assert!( me == threads[me].id );
185
186 debug!("Ready {}: {:p}", me, &threads[me].sem as *const sync::Semaphore);
187
188 start.wait().await;
189
190 debug!( "Start {}", me );
191
192 let mut rechecks: usize = 0;
193
194 loop {
195 if leader.id.load(Ordering::Relaxed) == me {
196 lead( exhaust, &leader, &threads[me], &threads, &main_sem, &exp );
197 task::yield_now().await;
198 }
199 else {
200 wait( exhaust, &leader, &threads[me], &mut rechecks ).await;
201 }
202 if leader.idx.load(Ordering::Relaxed) as u64 > exp.stop_count || leader.estop.load(Ordering::Relaxed) { break; }
203 }
204
205 rechecks
206}
207
208fn main() {
209 let options = App::new("Transfer Tokio")
210 .args(&bench::args())
211 .arg(Arg::with_name("exhaust") .short("e").long("exhaust") .takes_value(true).default_value("no").help("Whether or not threads that have seen the new epoch should park instead of yielding."))
212 .get_matches();
213
214 let exhaust = parse_yes_no( options.value_of("exhaust"), false );
215 let nthreads = options.value_of("nthreads").unwrap().parse::<usize>().unwrap();
216 let nprocs = options.value_of("nprocs").unwrap().parse::<usize>().unwrap();
217
218
219 let exp = Arc::new(bench::BenchData::new(options, nthreads, Some(100)));
220 if exp.clock_mode {
221 eprintln!("Programs does not support fixed duration mode");
222 std::process::exit(1);
223 }
224
225 println!("Running {} threads on {} processors, doing {} iterations, {} exhaustion", nthreads, nprocs, exp.stop_count, if exhaust { "with" } else { "without" });
226
227 let s = (1000000 as u64).to_formatted_string(&Locale::en);
228 assert_eq!(&s, "1,000,000");
229
230 let main_sem = Arc::new(sync::Semaphore::new(0));
231 let leader = Arc::new(LeaderInfo::new(nthreads));
232 let barr = Arc::new(sync::Barrier::new(nthreads + 1));
233 let thddata : Arc<Vec<Arc<MyThread>>> = Arc::new(
234 (0..nthreads).map(|i| {
235 Arc::new(MyThread{
236 id: i,
237 idx: AtomicUsize::new(0),
238 sem: sync::Semaphore::new(0),
239 })
240 }).collect()
241 );
242
243 let mut rechecks: usize = 0;
244 let mut duration : std::time::Duration = std::time::Duration::from_secs(0);
245
246 let runtime = Builder::new_multi_thread()
247 .worker_threads(nprocs)
248 .enable_all()
249 .build()
250 .unwrap();
251
252 runtime.block_on(async {
253 let threads: Vec<_> = (0..nthreads).map(|i| {
254 tokio::spawn(transfer_main(i, leader.clone(), thddata.clone(), exhaust, barr.clone(), main_sem.clone(), exp.clone()))
255 }).collect();
256 println!("Starting");
257
258 let start = Instant::now();
259
260 barr.wait().await;
261 debug!("Unlocked all");
262
263
264 main_sem.acquire().await.forget();
265
266 duration = start.elapsed();
267
268 println!("\nDone");
269
270
271 for i in 0..nthreads {
272 thddata[i].sem.add_permits(1);
273 }
274
275 for t in threads {
276 rechecks += t.await.unwrap();
277 }
278 });
279
280 println!("Duration (ms) : {}", (duration.as_millis()).to_formatted_string(&Locale::en));
281 println!("Number of processors : {}", (nprocs).to_formatted_string(&Locale::en));
282 println!("Number of threads : {}", (nthreads).to_formatted_string(&Locale::en));
283 println!("Total Operations(ops) : {:>15}", (leader.idx.load(Ordering::Relaxed) - 1).to_formatted_string(&Locale::en));
284 println!("Threads parking on wait : {}", if exhaust { "yes" } else { "no" });
285 println!("Rechecking : {}", rechecks );
286 println!("ns per transfer : {}", ((duration.as_nanos() as f64) / leader.idx.load(Ordering::Relaxed) as f64));
287
288}
Note: See TracBrowser for help on using the repository browser.