source: benchmark/readyQ/transfer.rs @ 5695645

ADTast-experimentalpthread-emulationqualifiedEnum
Last change on this file since 5695645 was ebb6158, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Minor fixes to warnings, printing and ridiculous go/rust requirements.

  • Property mode set to 100644
File size: 7.0 KB
Line 
1#[cfg(debug_assertions)]
2use std::io::{self, Write};
3
4use std::process;
5use std::option;
6use std::hint;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Instant,Duration};
10
11use tokio::runtime::Builder;
12use tokio::sync;
13use tokio::task;
14
15use rand::Rng;
16
17use clap::{Arg, App};
18use num_format::{Locale, ToFormattedString};
19
20#[path = "../bench.rs"]
21mod bench;
22
23#[cfg(debug_assertions)]
24macro_rules! debug {
25        ($x:expr) => {
26                println!( $x );
27                io::stdout().flush().unwrap();
28        };
29        ($x:expr, $($more:expr),+) => {
30                println!( $x, $($more), * );
31                io::stdout().flush().unwrap();
32        };
33}
34
35#[cfg(not(debug_assertions))]
36macro_rules! debug {
37    ($x:expr   ) => { () };
38    ($x:expr, $($more:expr),+) => { () };
39}
40
41fn parse_yes_no(opt: option::Option<&str>, default: bool) -> bool {
42        match opt {
43                Some(val) => {
44                        match val {
45                                "yes" => true,
46                                "no"  => false,
47                                "maybe" | "I don't know" | "Can you repeat the question?" => {
48                                        eprintln!("Lines for 'Malcolm in the Middle' are not acceptable values of parameter 'exhaust'");
49                                        std::process::exit(1);
50                                },
51                                _ => {
52                                        eprintln!("parameter 'exhaust' must have value 'yes' or 'no', was {}", val);
53                                        std::process::exit(1);
54                                },
55                        }
56                },
57                _ => {
58                        default
59                },
60        }
61}
62
63struct LeaderInfo {
64        id: AtomicUsize,
65        idx: AtomicUsize,
66        seed: u128,
67}
68
69impl LeaderInfo {
70        pub fn new(nthreads: usize) -> LeaderInfo {
71                let this = LeaderInfo{
72                        id: AtomicUsize::new(nthreads),
73                        idx: AtomicUsize::new(0),
74                        seed: process::id() as u128
75                };
76
77                let mut rng = rand::thread_rng();
78
79                for _ in 0..rng.gen_range(0..10) {
80                        this.next( nthreads );
81                }
82
83                this
84        }
85
86        pub fn next(&self, len: usize) {
87                let n = bench::_lehmer64( unsafe {
88                        let r1 = &self.seed as *const u128;
89                        let r2 = r1 as *mut u128;
90                        &mut *r2
91                } ) as usize;
92                self.id.store( n % len , Ordering::SeqCst );
93        }
94}
95
96struct MyThread {
97        id: usize,
98        idx: AtomicUsize,
99        sem: sync::Semaphore,
100}
101
102fn waitgroup(idx: usize, threads: &Vec<Arc<MyThread>>) {
103        let start = Instant::now();
104        for t in threads {
105                debug!( "Waiting for :{} ({})", t.id, t.idx.load(Ordering::Relaxed) );
106                while t.idx.load(Ordering::Relaxed) != idx {
107                        hint::spin_loop();
108                        if start.elapsed() > Duration::from_secs(5) {
109                                eprintln!("Programs has been blocked for more than 5 secs");
110                                std::process::exit(1);
111                        }
112                }
113        }
114        debug!( "Waiting done" );
115}
116
117fn wakegroup(exhaust: bool, me: usize, threads: &Vec<Arc<MyThread>>) {
118        if !exhaust { return; }
119
120        for i in 0..threads.len() {
121                if i != me {
122                        debug!( "Leader waking {}", i);
123                        threads[i].sem.add_permits(1);
124                }
125        }
126}
127
128fn lead(exhaust: bool, leader: &LeaderInfo, this: & MyThread, threads: &Vec<Arc<MyThread>>, main_sem: &sync::Semaphore, exp: &bench::BenchData) {
129        let nidx = leader.idx.load(Ordering::Relaxed) + 1;
130        this.idx.store(nidx, Ordering::Relaxed);
131        leader.idx.store(nidx, Ordering::Relaxed);
132
133        if nidx as u64 > exp.stop_count {
134                debug!( "Leader {} done", this.id);
135                main_sem.add_permits(1);
136                return;
137        }
138
139        debug!( "====================\nLeader no {} : {}", nidx, this.id);
140
141        waitgroup(nidx, threads);
142
143        leader.next( threads.len() );
144
145        wakegroup(exhaust, this.id, threads);
146
147        debug!( "Leader no {} : {} done\n====================", nidx, this.id);
148}
149
150async fn wait(exhaust: bool, leader: &LeaderInfo, this: &MyThread, rechecks: &mut usize) {
151        task::yield_now().await;
152
153        if leader.idx.load(Ordering::Relaxed) == this.idx.load(Ordering::Relaxed) {
154                debug!("Waiting {} recheck", this.id);
155                *rechecks += 1;
156                return;
157        }
158
159        debug!("Waiting {}", this.id);
160
161        debug_assert!( (leader.idx.load(Ordering::Relaxed) - 1) == this.idx.load(Ordering::Relaxed) );
162        this.idx.fetch_add(1, Ordering::SeqCst);
163        if exhaust {
164                debug!("Waiting {} sem", this.id);
165                this.sem.acquire().await.forget();
166        }
167        else {
168                debug!("Waiting {} yield", this.id);
169                task::yield_now().await;
170        }
171
172        debug!("Waiting {} done", this.id);
173}
174
175async fn transfer_main( me: usize, leader: Arc<LeaderInfo>, threads: Arc<Vec<Arc<MyThread>>>, exhaust: bool, start: Arc<sync::Barrier>, main_sem: Arc<sync::Semaphore>, exp: Arc<bench::BenchData>) -> usize{
176        assert!( me == threads[me].id );
177
178        debug!("Ready {}: {:p}", me, &threads[me].sem as *const sync::Semaphore);
179
180        start.wait().await;
181
182        debug!( "Start {}", me );
183
184        let mut rechecks: usize = 0;
185
186        loop {
187                if leader.id.load(Ordering::Relaxed) == me {
188                        lead( exhaust, &leader, &threads[me], &threads, &main_sem, &exp );
189                        task::yield_now().await;
190                }
191                else {
192                        wait( exhaust, &leader, &threads[me], &mut rechecks ).await;
193                }
194                if leader.idx.load(Ordering::Relaxed) as u64 > exp.stop_count { break; }
195        }
196
197        rechecks
198}
199
200fn main() {
201        let options = App::new("Transfer Tokio")
202                .args(&bench::args())
203                .arg(Arg::with_name("exhaust")  .short("e").long("exhaust")  .takes_value(true).default_value("no").help("Whether or not threads that have seen the new epoch should park instead of yielding."))
204                .get_matches();
205
206        let exhaust  = parse_yes_no( options.value_of("exhaust"), false );
207        let nthreads = options.value_of("nthreads").unwrap().parse::<usize>().unwrap();
208        let nprocs   = options.value_of("nprocs").unwrap().parse::<usize>().unwrap();
209
210
211        let exp = Arc::new(bench::BenchData::new(options, nthreads, Some(100)));
212        if exp.clock_mode {
213                eprintln!("Programs does not support fixed duration mode");
214                std::process::exit(1);
215        }
216
217        println!("Running {} threads on {} processors, doing {} iterations, {} exhaustion", nthreads, nprocs, exp.stop_count, if exhaust { "with" } else { "without" });
218
219        let s = (1000000 as u64).to_formatted_string(&Locale::en);
220        assert_eq!(&s, "1,000,000");
221
222        let main_sem = Arc::new(sync::Semaphore::new(0));
223        let leader = Arc::new(LeaderInfo::new(nthreads));
224        let barr = Arc::new(sync::Barrier::new(nthreads + 1));
225        let thddata : Arc<Vec<Arc<MyThread>>> = Arc::new(
226                (0..nthreads).map(|i| {
227                        Arc::new(MyThread{
228                                id: i,
229                                idx: AtomicUsize::new(0),
230                                sem: sync::Semaphore::new(0),
231                        })
232                }).collect()
233        );
234
235        let mut rechecks: usize = 0;
236        let mut duration : std::time::Duration = std::time::Duration::from_secs(0);
237
238        let runtime = Builder::new_multi_thread()
239                .worker_threads(nprocs)
240                .enable_all()
241                .build()
242                .unwrap();
243
244        runtime.block_on(async {
245                let threads: Vec<_> = (0..nthreads).map(|i| {
246                        tokio::spawn(transfer_main(i, leader.clone(), thddata.clone(), exhaust, barr.clone(), main_sem.clone(), exp.clone()))
247                }).collect();
248                println!("Starting");
249
250                let start = Instant::now();
251
252                barr.wait().await;
253                debug!("Unlocked all");
254
255
256                main_sem.acquire().await.forget();
257
258                duration = start.elapsed();
259
260                println!("\nDone");
261
262
263                for i in 0..nthreads {
264                        thddata[i].sem.add_permits(1);
265                }
266
267                for t in threads {
268                        rechecks += t.await.unwrap();
269                }
270        });
271
272        println!("Duration (ms)           : {}", (duration.as_millis()).to_formatted_string(&Locale::en));
273        println!("Number of processors    : {}", (nprocs).to_formatted_string(&Locale::en));
274        println!("Number of threads       : {}", (nthreads).to_formatted_string(&Locale::en));
275        println!("Total Operations(ops)   : {:>15}", (exp.stop_count).to_formatted_string(&Locale::en));
276        println!("Threads parking on wait : {}", if exhaust { "yes" } else { "no" });
277        println!("Rechecking              : {}", rechecks );
278}
Note: See TracBrowser for help on using the repository browser.