source: benchmark/readyQ/transfer.rs @ 72bd9cd

ADTast-experimentalenumforall-pointer-decaypthread-emulationqualifiedEnum
Last change on this file since 72bd9cd was 821c534, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Implemented transfer in rust and fixed minor issues with rust benchmarks

  • Property mode set to 100644
File size: 7.0 KB
Line 
1#[cfg(debug_assertions)]
2use std::io::{self, Write};
3
4use std::process;
5use std::option;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::time::{Instant,Duration};
9
10use tokio::runtime::Builder;
11use tokio::sync;
12use tokio::task;
13
14use rand::Rng;
15
16use clap::{Arg, App};
17use num_format::{Locale, ToFormattedString};
18
19#[path = "../bench.rs"]
20mod bench;
21
22#[cfg(debug_assertions)]
23macro_rules! debug {
24        ($x:expr) => {
25                println!( $x );
26                io::stdout().flush().unwrap();
27        };
28        ($x:expr, $($more:expr),+) => {
29                println!( $x, $($more), * );
30                io::stdout().flush().unwrap();
31        };
32}
33
34#[cfg(not(debug_assertions))]
35macro_rules! debug {
36    ($x:expr   ) => { () };
37    ($x:expr, $($more:expr),+) => { () };
38}
39
40fn parse_yes_no(opt: option::Option<&str>, default: bool) -> bool {
41        match opt {
42                Some(val) => {
43                        match val {
44                                "yes" => true,
45                                "no"  => false,
46                                "maybe" | "I don't know" | "Can you repeat the question?" => {
47                                        eprintln!("Lines for 'Malcolm in the Middle' are not acceptable values of parameter 'exhaust'");
48                                        std::process::exit(1);
49                                },
50                                _ => {
51                                        eprintln!("parameter 'exhaust' must have value 'yes' or 'no', was {}", val);
52                                        std::process::exit(1);
53                                },
54                        }
55                },
56                _ => {
57                        default
58                },
59        }
60}
61
62struct LeaderInfo {
63        id: AtomicUsize,
64        idx: AtomicUsize,
65        seed: u128,
66}
67
68impl LeaderInfo {
69        pub fn new(nthreads: usize) -> LeaderInfo {
70                let this = LeaderInfo{
71                        id: AtomicUsize::new(nthreads),
72                        idx: AtomicUsize::new(0),
73                        seed: process::id() as u128
74                };
75
76                let mut rng = rand::thread_rng();
77
78                for _ in 0..rng.gen_range(0..10) {
79                        this.next( nthreads );
80                }
81
82                this
83        }
84
85        pub fn next(&self, len: usize) {
86                let n = bench::_lehmer64( unsafe {
87                        let r1 = &self.seed as *const u128;
88                        let r2 = r1 as *mut u128;
89                        &mut *r2
90                } ) as usize;
91                self.id.store( n % len , Ordering::SeqCst );
92        }
93}
94
95struct MyThread {
96        id: usize,
97        idx: AtomicUsize,
98        sem: sync::Semaphore,
99}
100
101fn waitgroup(idx: usize, threads: &Vec<Arc<MyThread>>) {
102        let start = Instant::now();
103        for t in threads {
104                debug!( "Waiting for :{} ({})", t.id, t.idx.load(Ordering::Relaxed) );
105                while t.idx.load(Ordering::Relaxed) != idx {
106                        std::sync::atomic::spin_loop_hint();
107                        if start.elapsed() > Duration::from_secs(5) {
108                                eprintln!("Programs has been blocked for more than 5 secs");
109                                std::process::exit(1);
110                        }
111                }
112        }
113        debug!( "Waiting done" );
114}
115
116fn wakegroup(exhaust: bool, me: usize, threads: &Vec<Arc<MyThread>>) {
117        if !exhaust { return; }
118
119        for i in 0..threads.len() {
120                if i != me {
121                        debug!( "Leader waking {}", i);
122                        threads[i].sem.add_permits(1);
123                }
124        }
125}
126
127fn lead(exhaust: bool, leader: &LeaderInfo, this: & MyThread, threads: &Vec<Arc<MyThread>>, main_sem: &sync::Semaphore, exp: &bench::BenchData) {
128        let nidx = leader.idx.load(Ordering::Relaxed) + 1;
129        this.idx.store(nidx, Ordering::Relaxed);
130        leader.idx.store(nidx, Ordering::Relaxed);
131
132        if nidx as u64 > exp.stop_count {
133                debug!( "Leader {} done", this.id);
134                main_sem.add_permits(1);
135                return;
136        }
137
138        debug!( "====================\nLeader no {} : {}", nidx, this.id);
139
140        waitgroup(nidx, threads);
141
142        leader.next( threads.len() );
143
144        wakegroup(exhaust, this.id, threads);
145
146        debug!( "Leader no {} : {} done\n====================", nidx, this.id);
147}
148
149async fn wait(exhaust: bool, leader: &LeaderInfo, this: &MyThread, rechecks: &mut usize) {
150        task::yield_now().await;
151
152        if leader.idx.load(Ordering::Relaxed) == this.idx.load(Ordering::Relaxed) {
153                debug!("Waiting {} recheck", this.id);
154                *rechecks += 1;
155                return;
156        }
157
158        debug!("Waiting {}", this.id);
159
160        debug_assert!( (leader.idx.load(Ordering::Relaxed) - 1) == this.idx.load(Ordering::Relaxed) );
161        this.idx.fetch_add(1, Ordering::SeqCst);
162        if exhaust {
163                debug!("Waiting {} sem", this.id);
164                this.sem.acquire().await.forget();
165        }
166        else {
167                debug!("Waiting {} yield", this.id);
168                task::yield_now().await;
169        }
170
171        debug!("Waiting {} done", this.id);
172}
173
174async fn transfer_main( me: usize, leader: Arc<LeaderInfo>, threads: Arc<Vec<Arc<MyThread>>>, exhaust: bool, start: Arc<sync::Barrier>, main_sem: Arc<sync::Semaphore>, exp: Arc<bench::BenchData>) -> usize{
175        assert!( me == threads[me].id );
176
177        debug!("Ready {}: {:p}", me, &threads[me].sem as *const sync::Semaphore);
178
179        start.wait().await;
180
181        debug!( "Start {}", me );
182
183        let mut rechecks: usize = 0;
184
185        loop {
186                if leader.id.load(Ordering::Relaxed) == me {
187                        lead( exhaust, &leader, &threads[me], &threads, &main_sem, &exp );
188                        task::yield_now().await;
189                }
190                else {
191                        wait( exhaust, &leader, &threads[me], &mut rechecks ).await;
192                }
193                if leader.idx.load(Ordering::Relaxed) as u64 > exp.stop_count { break; }
194        }
195
196        rechecks
197}
198
199fn main() {
200        let options = App::new("Transfer Tokio")
201                .args(&bench::args())
202                .arg(Arg::with_name("exhaust")  .short("e").long("exhaust")  .takes_value(true).default_value("no").help("Whether or not threads that have seen the new epoch should park instead of yielding."))
203                .get_matches();
204
205        let exhaust  = parse_yes_no( options.value_of("exhaust"), false );
206        let nthreads = options.value_of("nthreads").unwrap().parse::<usize>().unwrap();
207        let nprocs   = options.value_of("nprocs").unwrap().parse::<usize>().unwrap();
208
209
210        let exp = Arc::new(bench::BenchData::new(options, nthreads, Some(100)));
211        if exp.clock_mode {
212                eprintln!("Programs does not support fixed duration mode");
213                std::process::exit(1);
214        }
215
216        println!("Running {} threads on {} processors, doing {} iterations, {} exhaustion", nthreads, nprocs, exp.stop_count, if exhaust { "with" } else { "without" });
217
218        let s = (1000000 as u64).to_formatted_string(&Locale::en);
219        assert_eq!(&s, "1,000,000");
220
221        let main_sem = Arc::new(sync::Semaphore::new(0));
222        let leader = Arc::new(LeaderInfo::new(nthreads));
223        let barr = Arc::new(sync::Barrier::new(nthreads + 1));
224        let thddata : Arc<Vec<Arc<MyThread>>> = Arc::new(
225                (0..nthreads).map(|i| {
226                        Arc::new(MyThread{
227                                id: i,
228                                idx: AtomicUsize::new(0),
229                                sem: sync::Semaphore::new(0),
230                        })
231                }).collect()
232        );
233
234        let mut rechecks: usize = 0;
235        let mut duration : std::time::Duration = std::time::Duration::from_secs(0);
236
237        let runtime = Builder::new_multi_thread()
238                .worker_threads(nprocs)
239                .enable_all()
240                .build()
241                .unwrap();
242
243        runtime.block_on(async {
244                let threads: Vec<_> = (0..nthreads).map(|i| {
245                        tokio::spawn(transfer_main(i, leader.clone(), thddata.clone(), exhaust, barr.clone(), main_sem.clone(), exp.clone()))
246                }).collect();
247                println!("Starting");
248
249                let start = Instant::now();
250
251                barr.wait().await;
252                debug!("Unlocked all");
253
254
255                main_sem.acquire().await.forget();
256
257                duration = start.elapsed();
258
259                println!("\nDone");
260
261
262                for i in 0..nthreads {
263                        thddata[i].sem.add_permits(1);
264                }
265
266                for t in threads {
267                        rechecks += t.await.unwrap();
268                }
269        });
270
271        println!("Duration (ms)           : {}", (duration.as_millis()).to_formatted_string(&Locale::en));
272        println!("Number of processors    : {}", (nprocs).to_formatted_string(&Locale::en));
273        println!("Number of threads       : {}", (nthreads).to_formatted_string(&Locale::en));
274        println!("Total Operations(ops)   : {:>15}", (exp.stop_count).to_formatted_string(&Locale::en));
275        println!("Threads parking on wait : {}", if exhaust { "yes" } else { "no" });
276        println!("Rechecking              : {}", rechecks );
277}
Note: See TracBrowser for help on using the repository browser.