1 | #[cfg(debug_assertions)] |
---|
2 | use std::io::{self, Write}; |
---|
3 | |
---|
4 | use std::process; |
---|
5 | use std::option; |
---|
6 | use std::hint; |
---|
7 | use std::sync::Arc; |
---|
8 | use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; |
---|
9 | use std::time::{Instant,Duration}; |
---|
10 | |
---|
11 | use tokio::runtime::Builder; |
---|
12 | use tokio::sync; |
---|
13 | use tokio::task; |
---|
14 | |
---|
15 | use rand::Rng; |
---|
16 | |
---|
17 | use clap::{Arg, App}; |
---|
18 | use num_format::{Locale, ToFormattedString}; |
---|
19 | |
---|
20 | #[path = "../bench.rs"] |
---|
21 | mod bench; |
---|
22 | |
---|
23 | #[cfg(debug_assertions)] |
---|
24 | macro_rules! debug { |
---|
25 | ($x:expr) => { |
---|
26 | println!( $x ); |
---|
27 | io::stdout().flush().unwrap(); |
---|
28 | }; |
---|
29 | ($x:expr, $($more:expr),+) => { |
---|
30 | println!( $x, $($more), * ); |
---|
31 | io::stdout().flush().unwrap(); |
---|
32 | }; |
---|
33 | } |
---|
34 | |
---|
35 | #[cfg(not(debug_assertions))] |
---|
36 | macro_rules! debug { |
---|
37 | ($x:expr ) => { () }; |
---|
38 | ($x:expr, $($more:expr),+) => { () }; |
---|
39 | } |
---|
40 | |
---|
41 | fn parse_yes_no(opt: option::Option<&str>, default: bool) -> bool { |
---|
42 | match opt { |
---|
43 | Some(val) => { |
---|
44 | match val { |
---|
45 | "yes" => true, |
---|
46 | "Y" => true, |
---|
47 | "y" => true, |
---|
48 | "no" => false, |
---|
49 | "N" => false, |
---|
50 | "n" => false, |
---|
51 | "maybe" | "I don't know" | "Can you repeat the question?" => { |
---|
52 | eprintln!("Lines for 'Malcolm in the Middle' are not acceptable values of parameter 'exhaust'"); |
---|
53 | std::process::exit(1); |
---|
54 | }, |
---|
55 | _ => { |
---|
56 | eprintln!("parameter 'exhaust' must have value 'yes' or 'no', was {}", val); |
---|
57 | std::process::exit(1); |
---|
58 | }, |
---|
59 | } |
---|
60 | }, |
---|
61 | _ => { |
---|
62 | default |
---|
63 | }, |
---|
64 | } |
---|
65 | } |
---|
66 | |
---|
67 | struct LeaderInfo { |
---|
68 | id: AtomicUsize, |
---|
69 | idx: AtomicUsize, |
---|
70 | estop: AtomicBool, |
---|
71 | seed: u128, |
---|
72 | } |
---|
73 | |
---|
74 | impl LeaderInfo { |
---|
75 | pub fn new(nthreads: usize) -> LeaderInfo { |
---|
76 | let this = LeaderInfo{ |
---|
77 | id: AtomicUsize::new(nthreads), |
---|
78 | idx: AtomicUsize::new(0), |
---|
79 | estop: AtomicBool::new(false), |
---|
80 | seed: process::id() as u128 |
---|
81 | }; |
---|
82 | |
---|
83 | let mut rng = rand::thread_rng(); |
---|
84 | |
---|
85 | for _ in 0..rng.gen_range(0..10) { |
---|
86 | this.next( nthreads ); |
---|
87 | } |
---|
88 | |
---|
89 | this |
---|
90 | } |
---|
91 | |
---|
92 | pub fn next(&self, len: usize) { |
---|
93 | let n = bench::_lehmer64( unsafe { |
---|
94 | let r1 = &self.seed as *const u128; |
---|
95 | let r2 = r1 as *mut u128; |
---|
96 | &mut *r2 |
---|
97 | } ) as usize; |
---|
98 | self.id.store( n % len , Ordering::SeqCst ); |
---|
99 | } |
---|
100 | } |
---|
101 | |
---|
102 | struct MyThread { |
---|
103 | id: usize, |
---|
104 | idx: AtomicUsize, |
---|
105 | sem: sync::Semaphore, |
---|
106 | } |
---|
107 | |
---|
108 | fn waitgroup(leader: &LeaderInfo, idx: usize, threads: &Vec<Arc<MyThread>>, main_sem: &sync::Semaphore) { |
---|
109 | let start = Instant::now(); |
---|
110 | 'outer: for t in threads { |
---|
111 | debug!( "Waiting for :{} ({})", t.id, t.idx.load(Ordering::Relaxed) ); |
---|
112 | while t.idx.load(Ordering::Relaxed) != idx { |
---|
113 | hint::spin_loop(); |
---|
114 | if start.elapsed() > Duration::from_secs(5) { |
---|
115 | eprintln!("Programs has been blocked for more than 5 secs"); |
---|
116 | leader.estop.store(true, Ordering::Relaxed); |
---|
117 | main_sem.add_permits(1); |
---|
118 | break 'outer; |
---|
119 | } |
---|
120 | } |
---|
121 | } |
---|
122 | debug!( "Waiting done" ); |
---|
123 | } |
---|
124 | |
---|
125 | fn wakegroup(exhaust: bool, me: usize, threads: &Vec<Arc<MyThread>>) { |
---|
126 | if !exhaust { return; } |
---|
127 | |
---|
128 | for i in 0..threads.len() { |
---|
129 | if i != me { |
---|
130 | debug!( "Leader waking {}", i); |
---|
131 | threads[i].sem.add_permits(1); |
---|
132 | } |
---|
133 | } |
---|
134 | } |
---|
135 | |
---|
136 | fn lead(exhaust: bool, leader: &LeaderInfo, this: & MyThread, threads: &Vec<Arc<MyThread>>, main_sem: &sync::Semaphore, exp: &bench::BenchData) { |
---|
137 | let nidx = leader.idx.load(Ordering::Relaxed) + 1; |
---|
138 | this.idx.store(nidx, Ordering::Relaxed); |
---|
139 | leader.idx.store(nidx, Ordering::Relaxed); |
---|
140 | |
---|
141 | if nidx as u64 > exp.stop_count || leader.estop.load(Ordering::Relaxed) { |
---|
142 | debug!( "Leader {} done", this.id); |
---|
143 | main_sem.add_permits(1); |
---|
144 | return; |
---|
145 | } |
---|
146 | |
---|
147 | debug!( "====================\nLeader no {} : {}", nidx, this.id); |
---|
148 | |
---|
149 | waitgroup(leader, nidx, threads, main_sem); |
---|
150 | |
---|
151 | leader.next( threads.len() ); |
---|
152 | |
---|
153 | wakegroup(exhaust, this.id, threads); |
---|
154 | |
---|
155 | debug!( "Leader no {} : {} done\n====================", nidx, this.id); |
---|
156 | } |
---|
157 | |
---|
158 | async fn wait(exhaust: bool, leader: &LeaderInfo, this: &MyThread, rechecks: &mut usize) { |
---|
159 | task::yield_now().await; |
---|
160 | |
---|
161 | if leader.idx.load(Ordering::Relaxed) == this.idx.load(Ordering::Relaxed) { |
---|
162 | debug!("Waiting {} recheck", this.id); |
---|
163 | *rechecks += 1; |
---|
164 | return; |
---|
165 | } |
---|
166 | |
---|
167 | debug!("Waiting {}", this.id); |
---|
168 | |
---|
169 | debug_assert!( (leader.idx.load(Ordering::Relaxed) - 1) == this.idx.load(Ordering::Relaxed) ); |
---|
170 | this.idx.fetch_add(1, Ordering::SeqCst); |
---|
171 | if exhaust { |
---|
172 | debug!("Waiting {} sem", this.id); |
---|
173 | this.sem.acquire().await.forget(); |
---|
174 | } |
---|
175 | else { |
---|
176 | debug!("Waiting {} yield", this.id); |
---|
177 | task::yield_now().await; |
---|
178 | } |
---|
179 | |
---|
180 | debug!("Waiting {} done", this.id); |
---|
181 | } |
---|
182 | |
---|
183 | async fn transfer_main( me: usize, leader: Arc<LeaderInfo>, threads: Arc<Vec<Arc<MyThread>>>, exhaust: bool, start: Arc<sync::Barrier>, main_sem: Arc<sync::Semaphore>, exp: Arc<bench::BenchData>) -> usize{ |
---|
184 | assert!( me == threads[me].id ); |
---|
185 | |
---|
186 | debug!("Ready {}: {:p}", me, &threads[me].sem as *const sync::Semaphore); |
---|
187 | |
---|
188 | start.wait().await; |
---|
189 | |
---|
190 | debug!( "Start {}", me ); |
---|
191 | |
---|
192 | let mut rechecks: usize = 0; |
---|
193 | |
---|
194 | loop { |
---|
195 | if leader.id.load(Ordering::Relaxed) == me { |
---|
196 | lead( exhaust, &leader, &threads[me], &threads, &main_sem, &exp ); |
---|
197 | task::yield_now().await; |
---|
198 | } |
---|
199 | else { |
---|
200 | wait( exhaust, &leader, &threads[me], &mut rechecks ).await; |
---|
201 | } |
---|
202 | if leader.idx.load(Ordering::Relaxed) as u64 > exp.stop_count || leader.estop.load(Ordering::Relaxed) { break; } |
---|
203 | } |
---|
204 | |
---|
205 | rechecks |
---|
206 | } |
---|
207 | |
---|
208 | fn main() { |
---|
209 | let options = App::new("Transfer Tokio") |
---|
210 | .args(&bench::args()) |
---|
211 | .arg(Arg::with_name("exhaust") .short("e").long("exhaust") .takes_value(true).default_value("no").help("Whether or not threads that have seen the new epoch should park instead of yielding.")) |
---|
212 | .get_matches(); |
---|
213 | |
---|
214 | let exhaust = parse_yes_no( options.value_of("exhaust"), false ); |
---|
215 | let nthreads = options.value_of("nthreads").unwrap().parse::<usize>().unwrap(); |
---|
216 | let nprocs = options.value_of("nprocs").unwrap().parse::<usize>().unwrap(); |
---|
217 | |
---|
218 | |
---|
219 | let exp = Arc::new(bench::BenchData::new(options, nthreads, Some(100))); |
---|
220 | if exp.clock_mode { |
---|
221 | eprintln!("Programs does not support fixed duration mode"); |
---|
222 | std::process::exit(1); |
---|
223 | } |
---|
224 | |
---|
225 | println!("Running {} threads on {} processors, doing {} iterations, {} exhaustion", nthreads, nprocs, exp.stop_count, if exhaust { "with" } else { "without" }); |
---|
226 | |
---|
227 | let s = (1000000 as u64).to_formatted_string(&Locale::en); |
---|
228 | assert_eq!(&s, "1,000,000"); |
---|
229 | |
---|
230 | let main_sem = Arc::new(sync::Semaphore::new(0)); |
---|
231 | let leader = Arc::new(LeaderInfo::new(nthreads)); |
---|
232 | let barr = Arc::new(sync::Barrier::new(nthreads + 1)); |
---|
233 | let thddata : Arc<Vec<Arc<MyThread>>> = Arc::new( |
---|
234 | (0..nthreads).map(|i| { |
---|
235 | Arc::new(MyThread{ |
---|
236 | id: i, |
---|
237 | idx: AtomicUsize::new(0), |
---|
238 | sem: sync::Semaphore::new(0), |
---|
239 | }) |
---|
240 | }).collect() |
---|
241 | ); |
---|
242 | |
---|
243 | let mut rechecks: usize = 0; |
---|
244 | let mut duration : std::time::Duration = std::time::Duration::from_secs(0); |
---|
245 | |
---|
246 | let runtime = Builder::new_multi_thread() |
---|
247 | .worker_threads(nprocs) |
---|
248 | .enable_all() |
---|
249 | .build() |
---|
250 | .unwrap(); |
---|
251 | |
---|
252 | runtime.block_on(async { |
---|
253 | let threads: Vec<_> = (0..nthreads).map(|i| { |
---|
254 | tokio::spawn(transfer_main(i, leader.clone(), thddata.clone(), exhaust, barr.clone(), main_sem.clone(), exp.clone())) |
---|
255 | }).collect(); |
---|
256 | println!("Starting"); |
---|
257 | |
---|
258 | let start = Instant::now(); |
---|
259 | |
---|
260 | barr.wait().await; |
---|
261 | debug!("Unlocked all"); |
---|
262 | |
---|
263 | |
---|
264 | main_sem.acquire().await.forget(); |
---|
265 | |
---|
266 | duration = start.elapsed(); |
---|
267 | |
---|
268 | println!("\nDone"); |
---|
269 | |
---|
270 | |
---|
271 | for i in 0..nthreads { |
---|
272 | thddata[i].sem.add_permits(1); |
---|
273 | } |
---|
274 | |
---|
275 | for t in threads { |
---|
276 | rechecks += t.await.unwrap(); |
---|
277 | } |
---|
278 | }); |
---|
279 | |
---|
280 | println!("Duration (ms) : {}", (duration.as_millis()).to_formatted_string(&Locale::en)); |
---|
281 | println!("Number of processors : {}", (nprocs).to_formatted_string(&Locale::en)); |
---|
282 | println!("Number of threads : {}", (nthreads).to_formatted_string(&Locale::en)); |
---|
283 | println!("Total Operations(ops) : {:>15}", (leader.idx.load(Ordering::Relaxed) - 1).to_formatted_string(&Locale::en)); |
---|
284 | println!("Threads parking on wait : {}", if exhaust { "yes" } else { "no" }); |
---|
285 | println!("Rechecking : {}", rechecks ); |
---|
286 | println!("ns per transfer : {}", ((duration.as_nanos() as f64) / leader.idx.load(Ordering::Relaxed) as f64)); |
---|
287 | |
---|
288 | } |
---|