source: benchmark/readyQ/transfer.rs @ de3a579

Last change on this file since de3a579 was 65c9208, checked in by Thierry Delisle <tdelisle@…>, 2 years ago

Changed transfer benchmark to be more consistent with other rmit benchmarks

  • Property mode set to 100644
File size: 7.5 KB
Line 
1#[cfg(debug_assertions)]
2use std::io::{self, Write};
3
4use std::process;
5use std::option;
6use std::hint;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
9use std::time::{Instant,Duration};
10
11use tokio::runtime::Builder;
12use tokio::sync;
13use tokio::task;
14
15use rand::Rng;
16
17use clap::{Arg, App};
18use num_format::{Locale, ToFormattedString};
19
20#[path = "../bench.rs"]
21mod bench;
22
23#[cfg(debug_assertions)]
24macro_rules! debug {
25        ($x:expr) => {
26                println!( $x );
27                io::stdout().flush().unwrap();
28        };
29        ($x:expr, $($more:expr),+) => {
30                println!( $x, $($more), * );
31                io::stdout().flush().unwrap();
32        };
33}
34
35#[cfg(not(debug_assertions))]
36macro_rules! debug {
37    ($x:expr   ) => { () };
38    ($x:expr, $($more:expr),+) => { () };
39}
40
41fn parse_yes_no(opt: option::Option<&str>, default: bool) -> bool {
42        match opt {
43                Some(val) => {
44                        match val {
45                                "yes" => true,
46                                "Y" => true,
47                                "y" => true,
48                                "no"  => false,
49                                "N"  => false,
50                                "n"  => false,
51                                "maybe" | "I don't know" | "Can you repeat the question?" => {
52                                        eprintln!("Lines for 'Malcolm in the Middle' are not acceptable values of parameter 'exhaust'");
53                                        std::process::exit(1);
54                                },
55                                _ => {
56                                        eprintln!("parameter 'exhaust' must have value 'yes' or 'no', was {}", val);
57                                        std::process::exit(1);
58                                },
59                        }
60                },
61                _ => {
62                        default
63                },
64        }
65}
66
67struct LeaderInfo {
68        id: AtomicUsize,
69        idx: AtomicUsize,
70        estop: AtomicBool,
71        seed: u128,
72}
73
74impl LeaderInfo {
75        pub fn new(nthreads: usize) -> LeaderInfo {
76                let this = LeaderInfo{
77                        id: AtomicUsize::new(nthreads),
78                        idx: AtomicUsize::new(0),
79                        estop: AtomicBool::new(false),
80                        seed: process::id() as u128
81                };
82
83                let mut rng = rand::thread_rng();
84
85                for _ in 0..rng.gen_range(0..10) {
86                        this.next( nthreads );
87                }
88
89                this
90        }
91
92        pub fn next(&self, len: usize) {
93                let n = bench::_lehmer64( unsafe {
94                        let r1 = &self.seed as *const u128;
95                        let r2 = r1 as *mut u128;
96                        &mut *r2
97                } ) as usize;
98                self.id.store( n % len , Ordering::SeqCst );
99        }
100}
101
102struct MyThread {
103        id: usize,
104        idx: AtomicUsize,
105        sem: sync::Semaphore,
106}
107
108fn waitgroup(leader: &LeaderInfo, idx: usize, threads: &Vec<Arc<MyThread>>, main_sem: &sync::Semaphore) {
109        let start = Instant::now();
110        'outer: for t in threads {
111                debug!( "Waiting for :{} ({})", t.id, t.idx.load(Ordering::Relaxed) );
112                while t.idx.load(Ordering::Relaxed) != idx {
113                        hint::spin_loop();
114                        if start.elapsed() > Duration::from_secs(5) {
115                                eprintln!("Programs has been blocked for more than 5 secs");
116                                leader.estop.store(true, Ordering::Relaxed);
117                                main_sem.add_permits(1);
118                                break 'outer;
119                        }
120                }
121        }
122        debug!( "Waiting done" );
123}
124
125fn wakegroup(exhaust: bool, me: usize, threads: &Vec<Arc<MyThread>>) {
126        if !exhaust { return; }
127
128        for i in 0..threads.len() {
129                if i != me {
130                        debug!( "Leader waking {}", i);
131                        threads[i].sem.add_permits(1);
132                }
133        }
134}
135
136fn lead(exhaust: bool, leader: &LeaderInfo, this: & MyThread, threads: &Vec<Arc<MyThread>>, main_sem: &sync::Semaphore, exp: &bench::BenchData) {
137        let nidx = leader.idx.load(Ordering::Relaxed) + 1;
138        this.idx.store(nidx, Ordering::Relaxed);
139        leader.idx.store(nidx, Ordering::Relaxed);
140
141        if nidx as u64 > exp.stop_count || leader.estop.load(Ordering::Relaxed) {
142                debug!( "Leader {} done", this.id);
143                main_sem.add_permits(1);
144                return;
145        }
146
147        debug!( "====================\nLeader no {} : {}", nidx, this.id);
148
149        waitgroup(leader, nidx, threads, main_sem);
150
151        leader.next( threads.len() );
152
153        wakegroup(exhaust, this.id, threads);
154
155        debug!( "Leader no {} : {} done\n====================", nidx, this.id);
156}
157
158async fn wait(exhaust: bool, leader: &LeaderInfo, this: &MyThread, rechecks: &mut usize) {
159        task::yield_now().await;
160
161        if leader.idx.load(Ordering::Relaxed) == this.idx.load(Ordering::Relaxed) {
162                debug!("Waiting {} recheck", this.id);
163                *rechecks += 1;
164                return;
165        }
166
167        debug!("Waiting {}", this.id);
168
169        debug_assert!( (leader.idx.load(Ordering::Relaxed) - 1) == this.idx.load(Ordering::Relaxed) );
170        this.idx.fetch_add(1, Ordering::SeqCst);
171        if exhaust {
172                debug!("Waiting {} sem", this.id);
173                this.sem.acquire().await.forget();
174        }
175        else {
176                debug!("Waiting {} yield", this.id);
177                task::yield_now().await;
178        }
179
180        debug!("Waiting {} done", this.id);
181}
182
183async fn transfer_main( me: usize, leader: Arc<LeaderInfo>, threads: Arc<Vec<Arc<MyThread>>>, exhaust: bool, start: Arc<sync::Barrier>, main_sem: Arc<sync::Semaphore>, exp: Arc<bench::BenchData>) -> usize{
184        assert!( me == threads[me].id );
185
186        debug!("Ready {}: {:p}", me, &threads[me].sem as *const sync::Semaphore);
187
188        start.wait().await;
189
190        debug!( "Start {}", me );
191
192        let mut rechecks: usize = 0;
193
194        loop {
195                if leader.id.load(Ordering::Relaxed) == me {
196                        lead( exhaust, &leader, &threads[me], &threads, &main_sem, &exp );
197                        task::yield_now().await;
198                }
199                else {
200                        wait( exhaust, &leader, &threads[me], &mut rechecks ).await;
201                }
202                if leader.idx.load(Ordering::Relaxed) as u64 > exp.stop_count || leader.estop.load(Ordering::Relaxed) { break; }
203        }
204
205        rechecks
206}
207
208fn main() {
209        let options = App::new("Transfer Tokio")
210                .args(&bench::args())
211                .arg(Arg::with_name("exhaust")  .short("e").long("exhaust")  .takes_value(true).default_value("no").help("Whether or not threads that have seen the new epoch should park instead of yielding."))
212                .get_matches();
213
214        let exhaust  = parse_yes_no( options.value_of("exhaust"), false );
215        let nthreads = options.value_of("nthreads").unwrap().parse::<usize>().unwrap();
216        let nprocs   = options.value_of("nprocs").unwrap().parse::<usize>().unwrap();
217
218
219        let exp = Arc::new(bench::BenchData::new(options, nthreads, Some(100)));
220        if exp.clock_mode {
221                eprintln!("Programs does not support fixed duration mode");
222                std::process::exit(1);
223        }
224
225        println!("Running {} threads on {} processors, doing {} iterations, {} exhaustion", nthreads, nprocs, exp.stop_count, if exhaust { "with" } else { "without" });
226
227        let s = (1000000 as u64).to_formatted_string(&Locale::en);
228        assert_eq!(&s, "1,000,000");
229
230        let main_sem = Arc::new(sync::Semaphore::new(0));
231        let leader = Arc::new(LeaderInfo::new(nthreads));
232        let barr = Arc::new(sync::Barrier::new(nthreads + 1));
233        let thddata : Arc<Vec<Arc<MyThread>>> = Arc::new(
234                (0..nthreads).map(|i| {
235                        Arc::new(MyThread{
236                                id: i,
237                                idx: AtomicUsize::new(0),
238                                sem: sync::Semaphore::new(0),
239                        })
240                }).collect()
241        );
242
243        let mut rechecks: usize = 0;
244        let mut duration : std::time::Duration = std::time::Duration::from_secs(0);
245
246        let runtime = Builder::new_multi_thread()
247                .worker_threads(nprocs)
248                .enable_all()
249                .build()
250                .unwrap();
251
252        runtime.block_on(async {
253                let threads: Vec<_> = (0..nthreads).map(|i| {
254                        tokio::spawn(transfer_main(i, leader.clone(), thddata.clone(), exhaust, barr.clone(), main_sem.clone(), exp.clone()))
255                }).collect();
256                println!("Starting");
257
258                let start = Instant::now();
259
260                barr.wait().await;
261                debug!("Unlocked all");
262
263
264                main_sem.acquire().await.forget();
265
266                duration = start.elapsed();
267
268                println!("\nDone");
269
270
271                for i in 0..nthreads {
272                        thddata[i].sem.add_permits(1);
273                }
274
275                for t in threads {
276                        rechecks += t.await.unwrap();
277                }
278        });
279
280        println!("Duration (ms)           : {}", (duration.as_millis()).to_formatted_string(&Locale::en));
281        println!("Number of processors    : {}", (nprocs).to_formatted_string(&Locale::en));
282        println!("Number of threads       : {}", (nthreads).to_formatted_string(&Locale::en));
283        println!("Total Operations(ops)   : {:>15}", (leader.idx.load(Ordering::Relaxed) - 1).to_formatted_string(&Locale::en));
284        println!("Threads parking on wait : {}", if exhaust { "yes" } else { "no" });
285        println!("Rechecking              : {}", rechecks );
286        println!("ns per transfer         : {}", ((duration.as_nanos() as f64) / leader.idx.load(Ordering::Relaxed) as f64));
287
288}
Note: See TracBrowser for help on using the repository browser.