1 | #[cfg(debug_assertions)]
|
---|
2 | use std::io::{self, Write};
|
---|
3 |
|
---|
4 | use std::process;
|
---|
5 | use std::option;
|
---|
6 | use std::hint;
|
---|
7 | use std::sync::Arc;
|
---|
8 | use std::sync::atomic::{AtomicUsize, Ordering};
|
---|
9 | use std::time::{Instant,Duration};
|
---|
10 |
|
---|
11 | use tokio::runtime::Builder;
|
---|
12 | use tokio::sync;
|
---|
13 | use tokio::task;
|
---|
14 |
|
---|
15 | use rand::Rng;
|
---|
16 |
|
---|
17 | use clap::{Arg, App};
|
---|
18 | use num_format::{Locale, ToFormattedString};
|
---|
19 |
|
---|
20 | #[path = "../bench.rs"]
|
---|
21 | mod bench;
|
---|
22 |
|
---|
23 | #[cfg(debug_assertions)]
|
---|
24 | macro_rules! debug {
|
---|
25 | ($x:expr) => {
|
---|
26 | println!( $x );
|
---|
27 | io::stdout().flush().unwrap();
|
---|
28 | };
|
---|
29 | ($x:expr, $($more:expr),+) => {
|
---|
30 | println!( $x, $($more), * );
|
---|
31 | io::stdout().flush().unwrap();
|
---|
32 | };
|
---|
33 | }
|
---|
34 |
|
---|
35 | #[cfg(not(debug_assertions))]
|
---|
36 | macro_rules! debug {
|
---|
37 | ($x:expr ) => { () };
|
---|
38 | ($x:expr, $($more:expr),+) => { () };
|
---|
39 | }
|
---|
40 |
|
---|
41 | fn parse_yes_no(opt: option::Option<&str>, default: bool) -> bool {
|
---|
42 | match opt {
|
---|
43 | Some(val) => {
|
---|
44 | match val {
|
---|
45 | "yes" => true,
|
---|
46 | "no" => false,
|
---|
47 | "maybe" | "I don't know" | "Can you repeat the question?" => {
|
---|
48 | eprintln!("Lines for 'Malcolm in the Middle' are not acceptable values of parameter 'exhaust'");
|
---|
49 | std::process::exit(1);
|
---|
50 | },
|
---|
51 | _ => {
|
---|
52 | eprintln!("parameter 'exhaust' must have value 'yes' or 'no', was {}", val);
|
---|
53 | std::process::exit(1);
|
---|
54 | },
|
---|
55 | }
|
---|
56 | },
|
---|
57 | _ => {
|
---|
58 | default
|
---|
59 | },
|
---|
60 | }
|
---|
61 | }
|
---|
62 |
|
---|
63 | struct LeaderInfo {
|
---|
64 | id: AtomicUsize,
|
---|
65 | idx: AtomicUsize,
|
---|
66 | seed: u128,
|
---|
67 | }
|
---|
68 |
|
---|
69 | impl LeaderInfo {
|
---|
70 | pub fn new(nthreads: usize) -> LeaderInfo {
|
---|
71 | let this = LeaderInfo{
|
---|
72 | id: AtomicUsize::new(nthreads),
|
---|
73 | idx: AtomicUsize::new(0),
|
---|
74 | seed: process::id() as u128
|
---|
75 | };
|
---|
76 |
|
---|
77 | let mut rng = rand::thread_rng();
|
---|
78 |
|
---|
79 | for _ in 0..rng.gen_range(0..10) {
|
---|
80 | this.next( nthreads );
|
---|
81 | }
|
---|
82 |
|
---|
83 | this
|
---|
84 | }
|
---|
85 |
|
---|
86 | pub fn next(&self, len: usize) {
|
---|
87 | let n = bench::_lehmer64( unsafe {
|
---|
88 | let r1 = &self.seed as *const u128;
|
---|
89 | let r2 = r1 as *mut u128;
|
---|
90 | &mut *r2
|
---|
91 | } ) as usize;
|
---|
92 | self.id.store( n % len , Ordering::SeqCst );
|
---|
93 | }
|
---|
94 | }
|
---|
95 |
|
---|
96 | struct MyThread {
|
---|
97 | id: usize,
|
---|
98 | idx: AtomicUsize,
|
---|
99 | sem: sync::Semaphore,
|
---|
100 | }
|
---|
101 |
|
---|
102 | fn waitgroup(idx: usize, threads: &Vec<Arc<MyThread>>) {
|
---|
103 | let start = Instant::now();
|
---|
104 | for t in threads {
|
---|
105 | debug!( "Waiting for :{} ({})", t.id, t.idx.load(Ordering::Relaxed) );
|
---|
106 | while t.idx.load(Ordering::Relaxed) != idx {
|
---|
107 | hint::spin_loop();
|
---|
108 | if start.elapsed() > Duration::from_secs(5) {
|
---|
109 | eprintln!("Programs has been blocked for more than 5 secs");
|
---|
110 | std::process::exit(1);
|
---|
111 | }
|
---|
112 | }
|
---|
113 | }
|
---|
114 | debug!( "Waiting done" );
|
---|
115 | }
|
---|
116 |
|
---|
117 | fn wakegroup(exhaust: bool, me: usize, threads: &Vec<Arc<MyThread>>) {
|
---|
118 | if !exhaust { return; }
|
---|
119 |
|
---|
120 | for i in 0..threads.len() {
|
---|
121 | if i != me {
|
---|
122 | debug!( "Leader waking {}", i);
|
---|
123 | threads[i].sem.add_permits(1);
|
---|
124 | }
|
---|
125 | }
|
---|
126 | }
|
---|
127 |
|
---|
128 | fn lead(exhaust: bool, leader: &LeaderInfo, this: & MyThread, threads: &Vec<Arc<MyThread>>, main_sem: &sync::Semaphore, exp: &bench::BenchData) {
|
---|
129 | let nidx = leader.idx.load(Ordering::Relaxed) + 1;
|
---|
130 | this.idx.store(nidx, Ordering::Relaxed);
|
---|
131 | leader.idx.store(nidx, Ordering::Relaxed);
|
---|
132 |
|
---|
133 | if nidx as u64 > exp.stop_count {
|
---|
134 | debug!( "Leader {} done", this.id);
|
---|
135 | main_sem.add_permits(1);
|
---|
136 | return;
|
---|
137 | }
|
---|
138 |
|
---|
139 | debug!( "====================\nLeader no {} : {}", nidx, this.id);
|
---|
140 |
|
---|
141 | waitgroup(nidx, threads);
|
---|
142 |
|
---|
143 | leader.next( threads.len() );
|
---|
144 |
|
---|
145 | wakegroup(exhaust, this.id, threads);
|
---|
146 |
|
---|
147 | debug!( "Leader no {} : {} done\n====================", nidx, this.id);
|
---|
148 | }
|
---|
149 |
|
---|
150 | async fn wait(exhaust: bool, leader: &LeaderInfo, this: &MyThread, rechecks: &mut usize) {
|
---|
151 | task::yield_now().await;
|
---|
152 |
|
---|
153 | if leader.idx.load(Ordering::Relaxed) == this.idx.load(Ordering::Relaxed) {
|
---|
154 | debug!("Waiting {} recheck", this.id);
|
---|
155 | *rechecks += 1;
|
---|
156 | return;
|
---|
157 | }
|
---|
158 |
|
---|
159 | debug!("Waiting {}", this.id);
|
---|
160 |
|
---|
161 | debug_assert!( (leader.idx.load(Ordering::Relaxed) - 1) == this.idx.load(Ordering::Relaxed) );
|
---|
162 | this.idx.fetch_add(1, Ordering::SeqCst);
|
---|
163 | if exhaust {
|
---|
164 | debug!("Waiting {} sem", this.id);
|
---|
165 | this.sem.acquire().await.forget();
|
---|
166 | }
|
---|
167 | else {
|
---|
168 | debug!("Waiting {} yield", this.id);
|
---|
169 | task::yield_now().await;
|
---|
170 | }
|
---|
171 |
|
---|
172 | debug!("Waiting {} done", this.id);
|
---|
173 | }
|
---|
174 |
|
---|
175 | async fn transfer_main( me: usize, leader: Arc<LeaderInfo>, threads: Arc<Vec<Arc<MyThread>>>, exhaust: bool, start: Arc<sync::Barrier>, main_sem: Arc<sync::Semaphore>, exp: Arc<bench::BenchData>) -> usize{
|
---|
176 | assert!( me == threads[me].id );
|
---|
177 |
|
---|
178 | debug!("Ready {}: {:p}", me, &threads[me].sem as *const sync::Semaphore);
|
---|
179 |
|
---|
180 | start.wait().await;
|
---|
181 |
|
---|
182 | debug!( "Start {}", me );
|
---|
183 |
|
---|
184 | let mut rechecks: usize = 0;
|
---|
185 |
|
---|
186 | loop {
|
---|
187 | if leader.id.load(Ordering::Relaxed) == me {
|
---|
188 | lead( exhaust, &leader, &threads[me], &threads, &main_sem, &exp );
|
---|
189 | task::yield_now().await;
|
---|
190 | }
|
---|
191 | else {
|
---|
192 | wait( exhaust, &leader, &threads[me], &mut rechecks ).await;
|
---|
193 | }
|
---|
194 | if leader.idx.load(Ordering::Relaxed) as u64 > exp.stop_count { break; }
|
---|
195 | }
|
---|
196 |
|
---|
197 | rechecks
|
---|
198 | }
|
---|
199 |
|
---|
200 | fn main() {
|
---|
201 | let options = App::new("Transfer Tokio")
|
---|
202 | .args(&bench::args())
|
---|
203 | .arg(Arg::with_name("exhaust") .short("e").long("exhaust") .takes_value(true).default_value("no").help("Whether or not threads that have seen the new epoch should park instead of yielding."))
|
---|
204 | .get_matches();
|
---|
205 |
|
---|
206 | let exhaust = parse_yes_no( options.value_of("exhaust"), false );
|
---|
207 | let nthreads = options.value_of("nthreads").unwrap().parse::<usize>().unwrap();
|
---|
208 | let nprocs = options.value_of("nprocs").unwrap().parse::<usize>().unwrap();
|
---|
209 |
|
---|
210 |
|
---|
211 | let exp = Arc::new(bench::BenchData::new(options, nthreads, Some(100)));
|
---|
212 | if exp.clock_mode {
|
---|
213 | eprintln!("Programs does not support fixed duration mode");
|
---|
214 | std::process::exit(1);
|
---|
215 | }
|
---|
216 |
|
---|
217 | println!("Running {} threads on {} processors, doing {} iterations, {} exhaustion", nthreads, nprocs, exp.stop_count, if exhaust { "with" } else { "without" });
|
---|
218 |
|
---|
219 | let s = (1000000 as u64).to_formatted_string(&Locale::en);
|
---|
220 | assert_eq!(&s, "1,000,000");
|
---|
221 |
|
---|
222 | let main_sem = Arc::new(sync::Semaphore::new(0));
|
---|
223 | let leader = Arc::new(LeaderInfo::new(nthreads));
|
---|
224 | let barr = Arc::new(sync::Barrier::new(nthreads + 1));
|
---|
225 | let thddata : Arc<Vec<Arc<MyThread>>> = Arc::new(
|
---|
226 | (0..nthreads).map(|i| {
|
---|
227 | Arc::new(MyThread{
|
---|
228 | id: i,
|
---|
229 | idx: AtomicUsize::new(0),
|
---|
230 | sem: sync::Semaphore::new(0),
|
---|
231 | })
|
---|
232 | }).collect()
|
---|
233 | );
|
---|
234 |
|
---|
235 | let mut rechecks: usize = 0;
|
---|
236 | let mut duration : std::time::Duration = std::time::Duration::from_secs(0);
|
---|
237 |
|
---|
238 | let runtime = Builder::new_multi_thread()
|
---|
239 | .worker_threads(nprocs)
|
---|
240 | .enable_all()
|
---|
241 | .build()
|
---|
242 | .unwrap();
|
---|
243 |
|
---|
244 | runtime.block_on(async {
|
---|
245 | let threads: Vec<_> = (0..nthreads).map(|i| {
|
---|
246 | tokio::spawn(transfer_main(i, leader.clone(), thddata.clone(), exhaust, barr.clone(), main_sem.clone(), exp.clone()))
|
---|
247 | }).collect();
|
---|
248 | println!("Starting");
|
---|
249 |
|
---|
250 | let start = Instant::now();
|
---|
251 |
|
---|
252 | barr.wait().await;
|
---|
253 | debug!("Unlocked all");
|
---|
254 |
|
---|
255 |
|
---|
256 | main_sem.acquire().await.forget();
|
---|
257 |
|
---|
258 | duration = start.elapsed();
|
---|
259 |
|
---|
260 | println!("\nDone");
|
---|
261 |
|
---|
262 |
|
---|
263 | for i in 0..nthreads {
|
---|
264 | thddata[i].sem.add_permits(1);
|
---|
265 | }
|
---|
266 |
|
---|
267 | for t in threads {
|
---|
268 | rechecks += t.await.unwrap();
|
---|
269 | }
|
---|
270 | });
|
---|
271 |
|
---|
272 | println!("Duration (ms) : {}", (duration.as_millis()).to_formatted_string(&Locale::en));
|
---|
273 | println!("Number of processors : {}", (nprocs).to_formatted_string(&Locale::en));
|
---|
274 | println!("Number of threads : {}", (nthreads).to_formatted_string(&Locale::en));
|
---|
275 | println!("Total Operations(ops) : {:>15}", (exp.stop_count).to_formatted_string(&Locale::en));
|
---|
276 | println!("Threads parking on wait : {}", if exhaust { "yes" } else { "no" });
|
---|
277 | println!("Rechecking : {}", rechecks );
|
---|
278 | }
|
---|