source: benchmark/readyQ/transfer.rs@ f57f6ea0

ADT ast-experimental enum forall-pointer-decay pthread-emulation qualifiedEnum
Last change on this file since f57f6ea0 was ebb6158, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Minor fixes to warnings, printing and ridiculous go/rust requirements.

  • Property mode set to 100644
File size: 7.0 KB
Line 
1#[cfg(debug_assertions)]
2use std::io::{self, Write};
3
4use std::process;
5use std::option;
6use std::hint;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Instant,Duration};
10
11use tokio::runtime::Builder;
12use tokio::sync;
13use tokio::task;
14
15use rand::Rng;
16
17use clap::{Arg, App};
18use num_format::{Locale, ToFormattedString};
19
20#[path = "../bench.rs"]
21mod bench;
22
23#[cfg(debug_assertions)]
24macro_rules! debug {
25 ($x:expr) => {
26 println!( $x );
27 io::stdout().flush().unwrap();
28 };
29 ($x:expr, $($more:expr),+) => {
30 println!( $x, $($more), * );
31 io::stdout().flush().unwrap();
32 };
33}
34
35#[cfg(not(debug_assertions))]
36macro_rules! debug {
37 ($x:expr ) => { () };
38 ($x:expr, $($more:expr),+) => { () };
39}
40
41fn parse_yes_no(opt: option::Option<&str>, default: bool) -> bool {
42 match opt {
43 Some(val) => {
44 match val {
45 "yes" => true,
46 "no" => false,
47 "maybe" | "I don't know" | "Can you repeat the question?" => {
48 eprintln!("Lines for 'Malcolm in the Middle' are not acceptable values of parameter 'exhaust'");
49 std::process::exit(1);
50 },
51 _ => {
52 eprintln!("parameter 'exhaust' must have value 'yes' or 'no', was {}", val);
53 std::process::exit(1);
54 },
55 }
56 },
57 _ => {
58 default
59 },
60 }
61}
62
63struct LeaderInfo {
64 id: AtomicUsize,
65 idx: AtomicUsize,
66 seed: u128,
67}
68
69impl LeaderInfo {
70 pub fn new(nthreads: usize) -> LeaderInfo {
71 let this = LeaderInfo{
72 id: AtomicUsize::new(nthreads),
73 idx: AtomicUsize::new(0),
74 seed: process::id() as u128
75 };
76
77 let mut rng = rand::thread_rng();
78
79 for _ in 0..rng.gen_range(0..10) {
80 this.next( nthreads );
81 }
82
83 this
84 }
85
86 pub fn next(&self, len: usize) {
87 let n = bench::_lehmer64( unsafe {
88 let r1 = &self.seed as *const u128;
89 let r2 = r1 as *mut u128;
90 &mut *r2
91 } ) as usize;
92 self.id.store( n % len , Ordering::SeqCst );
93 }
94}
95
96struct MyThread {
97 id: usize,
98 idx: AtomicUsize,
99 sem: sync::Semaphore,
100}
101
102fn waitgroup(idx: usize, threads: &Vec<Arc<MyThread>>) {
103 let start = Instant::now();
104 for t in threads {
105 debug!( "Waiting for :{} ({})", t.id, t.idx.load(Ordering::Relaxed) );
106 while t.idx.load(Ordering::Relaxed) != idx {
107 hint::spin_loop();
108 if start.elapsed() > Duration::from_secs(5) {
109 eprintln!("Programs has been blocked for more than 5 secs");
110 std::process::exit(1);
111 }
112 }
113 }
114 debug!( "Waiting done" );
115}
116
117fn wakegroup(exhaust: bool, me: usize, threads: &Vec<Arc<MyThread>>) {
118 if !exhaust { return; }
119
120 for i in 0..threads.len() {
121 if i != me {
122 debug!( "Leader waking {}", i);
123 threads[i].sem.add_permits(1);
124 }
125 }
126}
127
128fn lead(exhaust: bool, leader: &LeaderInfo, this: & MyThread, threads: &Vec<Arc<MyThread>>, main_sem: &sync::Semaphore, exp: &bench::BenchData) {
129 let nidx = leader.idx.load(Ordering::Relaxed) + 1;
130 this.idx.store(nidx, Ordering::Relaxed);
131 leader.idx.store(nidx, Ordering::Relaxed);
132
133 if nidx as u64 > exp.stop_count {
134 debug!( "Leader {} done", this.id);
135 main_sem.add_permits(1);
136 return;
137 }
138
139 debug!( "====================\nLeader no {} : {}", nidx, this.id);
140
141 waitgroup(nidx, threads);
142
143 leader.next( threads.len() );
144
145 wakegroup(exhaust, this.id, threads);
146
147 debug!( "Leader no {} : {} done\n====================", nidx, this.id);
148}
149
150async fn wait(exhaust: bool, leader: &LeaderInfo, this: &MyThread, rechecks: &mut usize) {
151 task::yield_now().await;
152
153 if leader.idx.load(Ordering::Relaxed) == this.idx.load(Ordering::Relaxed) {
154 debug!("Waiting {} recheck", this.id);
155 *rechecks += 1;
156 return;
157 }
158
159 debug!("Waiting {}", this.id);
160
161 debug_assert!( (leader.idx.load(Ordering::Relaxed) - 1) == this.idx.load(Ordering::Relaxed) );
162 this.idx.fetch_add(1, Ordering::SeqCst);
163 if exhaust {
164 debug!("Waiting {} sem", this.id);
165 this.sem.acquire().await.forget();
166 }
167 else {
168 debug!("Waiting {} yield", this.id);
169 task::yield_now().await;
170 }
171
172 debug!("Waiting {} done", this.id);
173}
174
175async fn transfer_main( me: usize, leader: Arc<LeaderInfo>, threads: Arc<Vec<Arc<MyThread>>>, exhaust: bool, start: Arc<sync::Barrier>, main_sem: Arc<sync::Semaphore>, exp: Arc<bench::BenchData>) -> usize{
176 assert!( me == threads[me].id );
177
178 debug!("Ready {}: {:p}", me, &threads[me].sem as *const sync::Semaphore);
179
180 start.wait().await;
181
182 debug!( "Start {}", me );
183
184 let mut rechecks: usize = 0;
185
186 loop {
187 if leader.id.load(Ordering::Relaxed) == me {
188 lead( exhaust, &leader, &threads[me], &threads, &main_sem, &exp );
189 task::yield_now().await;
190 }
191 else {
192 wait( exhaust, &leader, &threads[me], &mut rechecks ).await;
193 }
194 if leader.idx.load(Ordering::Relaxed) as u64 > exp.stop_count { break; }
195 }
196
197 rechecks
198}
199
200fn main() {
201 let options = App::new("Transfer Tokio")
202 .args(&bench::args())
203 .arg(Arg::with_name("exhaust") .short("e").long("exhaust") .takes_value(true).default_value("no").help("Whether or not threads that have seen the new epoch should park instead of yielding."))
204 .get_matches();
205
206 let exhaust = parse_yes_no( options.value_of("exhaust"), false );
207 let nthreads = options.value_of("nthreads").unwrap().parse::<usize>().unwrap();
208 let nprocs = options.value_of("nprocs").unwrap().parse::<usize>().unwrap();
209
210
211 let exp = Arc::new(bench::BenchData::new(options, nthreads, Some(100)));
212 if exp.clock_mode {
213 eprintln!("Programs does not support fixed duration mode");
214 std::process::exit(1);
215 }
216
217 println!("Running {} threads on {} processors, doing {} iterations, {} exhaustion", nthreads, nprocs, exp.stop_count, if exhaust { "with" } else { "without" });
218
219 let s = (1000000 as u64).to_formatted_string(&Locale::en);
220 assert_eq!(&s, "1,000,000");
221
222 let main_sem = Arc::new(sync::Semaphore::new(0));
223 let leader = Arc::new(LeaderInfo::new(nthreads));
224 let barr = Arc::new(sync::Barrier::new(nthreads + 1));
225 let thddata : Arc<Vec<Arc<MyThread>>> = Arc::new(
226 (0..nthreads).map(|i| {
227 Arc::new(MyThread{
228 id: i,
229 idx: AtomicUsize::new(0),
230 sem: sync::Semaphore::new(0),
231 })
232 }).collect()
233 );
234
235 let mut rechecks: usize = 0;
236 let mut duration : std::time::Duration = std::time::Duration::from_secs(0);
237
238 let runtime = Builder::new_multi_thread()
239 .worker_threads(nprocs)
240 .enable_all()
241 .build()
242 .unwrap();
243
244 runtime.block_on(async {
245 let threads: Vec<_> = (0..nthreads).map(|i| {
246 tokio::spawn(transfer_main(i, leader.clone(), thddata.clone(), exhaust, barr.clone(), main_sem.clone(), exp.clone()))
247 }).collect();
248 println!("Starting");
249
250 let start = Instant::now();
251
252 barr.wait().await;
253 debug!("Unlocked all");
254
255
256 main_sem.acquire().await.forget();
257
258 duration = start.elapsed();
259
260 println!("\nDone");
261
262
263 for i in 0..nthreads {
264 thddata[i].sem.add_permits(1);
265 }
266
267 for t in threads {
268 rechecks += t.await.unwrap();
269 }
270 });
271
272 println!("Duration (ms) : {}", (duration.as_millis()).to_formatted_string(&Locale::en));
273 println!("Number of processors : {}", (nprocs).to_formatted_string(&Locale::en));
274 println!("Number of threads : {}", (nthreads).to_formatted_string(&Locale::en));
275 println!("Total Operations(ops) : {:>15}", (exp.stop_count).to_formatted_string(&Locale::en));
276 println!("Threads parking on wait : {}", if exhaust { "yes" } else { "no" });
277 println!("Rechecking : {}", rechecks );
278}
Note: See TracBrowser for help on using the repository browser.