[821c534] | 1 | #[cfg(debug_assertions)]
|
---|
| 2 | use std::io::{self, Write};
|
---|
| 3 |
|
---|
| 4 | use std::process;
|
---|
| 5 | use std::option;
|
---|
[ebb6158] | 6 | use std::hint;
|
---|
[821c534] | 7 | use std::sync::Arc;
|
---|
[65c9208] | 8 | use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
---|
[821c534] | 9 | use std::time::{Instant,Duration};
|
---|
| 10 |
|
---|
| 11 | use tokio::runtime::Builder;
|
---|
| 12 | use tokio::sync;
|
---|
| 13 | use tokio::task;
|
---|
| 14 |
|
---|
| 15 | use rand::Rng;
|
---|
| 16 |
|
---|
| 17 | use clap::{Arg, App};
|
---|
| 18 | use num_format::{Locale, ToFormattedString};
|
---|
| 19 |
|
---|
| 20 | #[path = "../bench.rs"]
|
---|
| 21 | mod bench;
|
---|
| 22 |
|
---|
| 23 | #[cfg(debug_assertions)]
|
---|
| 24 | macro_rules! debug {
|
---|
| 25 | ($x:expr) => {
|
---|
| 26 | println!( $x );
|
---|
| 27 | io::stdout().flush().unwrap();
|
---|
| 28 | };
|
---|
| 29 | ($x:expr, $($more:expr),+) => {
|
---|
| 30 | println!( $x, $($more), * );
|
---|
| 31 | io::stdout().flush().unwrap();
|
---|
| 32 | };
|
---|
| 33 | }
|
---|
| 34 |
|
---|
| 35 | #[cfg(not(debug_assertions))]
|
---|
| 36 | macro_rules! debug {
|
---|
| 37 | ($x:expr ) => { () };
|
---|
| 38 | ($x:expr, $($more:expr),+) => { () };
|
---|
| 39 | }
|
---|
| 40 |
|
---|
| 41 | fn parse_yes_no(opt: option::Option<&str>, default: bool) -> bool {
|
---|
| 42 | match opt {
|
---|
| 43 | Some(val) => {
|
---|
| 44 | match val {
|
---|
| 45 | "yes" => true,
|
---|
[65c9208] | 46 | "Y" => true,
|
---|
| 47 | "y" => true,
|
---|
[821c534] | 48 | "no" => false,
|
---|
[65c9208] | 49 | "N" => false,
|
---|
| 50 | "n" => false,
|
---|
[821c534] | 51 | "maybe" | "I don't know" | "Can you repeat the question?" => {
|
---|
| 52 | eprintln!("Lines for 'Malcolm in the Middle' are not acceptable values of parameter 'exhaust'");
|
---|
| 53 | std::process::exit(1);
|
---|
| 54 | },
|
---|
| 55 | _ => {
|
---|
| 56 | eprintln!("parameter 'exhaust' must have value 'yes' or 'no', was {}", val);
|
---|
| 57 | std::process::exit(1);
|
---|
| 58 | },
|
---|
| 59 | }
|
---|
| 60 | },
|
---|
| 61 | _ => {
|
---|
| 62 | default
|
---|
| 63 | },
|
---|
| 64 | }
|
---|
| 65 | }
|
---|
| 66 |
|
---|
| 67 | struct LeaderInfo {
|
---|
| 68 | id: AtomicUsize,
|
---|
| 69 | idx: AtomicUsize,
|
---|
[65c9208] | 70 | estop: AtomicBool,
|
---|
[821c534] | 71 | seed: u128,
|
---|
| 72 | }
|
---|
| 73 |
|
---|
| 74 | impl LeaderInfo {
|
---|
| 75 | pub fn new(nthreads: usize) -> LeaderInfo {
|
---|
| 76 | let this = LeaderInfo{
|
---|
| 77 | id: AtomicUsize::new(nthreads),
|
---|
| 78 | idx: AtomicUsize::new(0),
|
---|
[65c9208] | 79 | estop: AtomicBool::new(false),
|
---|
[821c534] | 80 | seed: process::id() as u128
|
---|
| 81 | };
|
---|
| 82 |
|
---|
| 83 | let mut rng = rand::thread_rng();
|
---|
| 84 |
|
---|
| 85 | for _ in 0..rng.gen_range(0..10) {
|
---|
| 86 | this.next( nthreads );
|
---|
| 87 | }
|
---|
| 88 |
|
---|
| 89 | this
|
---|
| 90 | }
|
---|
| 91 |
|
---|
| 92 | pub fn next(&self, len: usize) {
|
---|
| 93 | let n = bench::_lehmer64( unsafe {
|
---|
| 94 | let r1 = &self.seed as *const u128;
|
---|
| 95 | let r2 = r1 as *mut u128;
|
---|
| 96 | &mut *r2
|
---|
| 97 | } ) as usize;
|
---|
| 98 | self.id.store( n % len , Ordering::SeqCst );
|
---|
| 99 | }
|
---|
| 100 | }
|
---|
| 101 |
|
---|
| 102 | struct MyThread {
|
---|
| 103 | id: usize,
|
---|
| 104 | idx: AtomicUsize,
|
---|
| 105 | sem: sync::Semaphore,
|
---|
| 106 | }
|
---|
| 107 |
|
---|
[65c9208] | 108 | fn waitgroup(leader: &LeaderInfo, idx: usize, threads: &Vec<Arc<MyThread>>, main_sem: &sync::Semaphore) {
|
---|
[821c534] | 109 | let start = Instant::now();
|
---|
[65c9208] | 110 | 'outer: for t in threads {
|
---|
[821c534] | 111 | debug!( "Waiting for :{} ({})", t.id, t.idx.load(Ordering::Relaxed) );
|
---|
| 112 | while t.idx.load(Ordering::Relaxed) != idx {
|
---|
[ebb6158] | 113 | hint::spin_loop();
|
---|
[821c534] | 114 | if start.elapsed() > Duration::from_secs(5) {
|
---|
| 115 | eprintln!("Programs has been blocked for more than 5 secs");
|
---|
[65c9208] | 116 | leader.estop.store(true, Ordering::Relaxed);
|
---|
| 117 | main_sem.add_permits(1);
|
---|
| 118 | break 'outer;
|
---|
[821c534] | 119 | }
|
---|
| 120 | }
|
---|
| 121 | }
|
---|
| 122 | debug!( "Waiting done" );
|
---|
| 123 | }
|
---|
| 124 |
|
---|
| 125 | fn wakegroup(exhaust: bool, me: usize, threads: &Vec<Arc<MyThread>>) {
|
---|
| 126 | if !exhaust { return; }
|
---|
| 127 |
|
---|
| 128 | for i in 0..threads.len() {
|
---|
| 129 | if i != me {
|
---|
| 130 | debug!( "Leader waking {}", i);
|
---|
| 131 | threads[i].sem.add_permits(1);
|
---|
| 132 | }
|
---|
| 133 | }
|
---|
| 134 | }
|
---|
| 135 |
|
---|
| 136 | fn lead(exhaust: bool, leader: &LeaderInfo, this: & MyThread, threads: &Vec<Arc<MyThread>>, main_sem: &sync::Semaphore, exp: &bench::BenchData) {
|
---|
| 137 | let nidx = leader.idx.load(Ordering::Relaxed) + 1;
|
---|
| 138 | this.idx.store(nidx, Ordering::Relaxed);
|
---|
| 139 | leader.idx.store(nidx, Ordering::Relaxed);
|
---|
| 140 |
|
---|
[65c9208] | 141 | if nidx as u64 > exp.stop_count || leader.estop.load(Ordering::Relaxed) {
|
---|
[821c534] | 142 | debug!( "Leader {} done", this.id);
|
---|
| 143 | main_sem.add_permits(1);
|
---|
| 144 | return;
|
---|
| 145 | }
|
---|
| 146 |
|
---|
| 147 | debug!( "====================\nLeader no {} : {}", nidx, this.id);
|
---|
| 148 |
|
---|
[65c9208] | 149 | waitgroup(leader, nidx, threads, main_sem);
|
---|
[821c534] | 150 |
|
---|
| 151 | leader.next( threads.len() );
|
---|
| 152 |
|
---|
| 153 | wakegroup(exhaust, this.id, threads);
|
---|
| 154 |
|
---|
| 155 | debug!( "Leader no {} : {} done\n====================", nidx, this.id);
|
---|
| 156 | }
|
---|
| 157 |
|
---|
| 158 | async fn wait(exhaust: bool, leader: &LeaderInfo, this: &MyThread, rechecks: &mut usize) {
|
---|
| 159 | task::yield_now().await;
|
---|
| 160 |
|
---|
| 161 | if leader.idx.load(Ordering::Relaxed) == this.idx.load(Ordering::Relaxed) {
|
---|
| 162 | debug!("Waiting {} recheck", this.id);
|
---|
| 163 | *rechecks += 1;
|
---|
| 164 | return;
|
---|
| 165 | }
|
---|
| 166 |
|
---|
| 167 | debug!("Waiting {}", this.id);
|
---|
| 168 |
|
---|
| 169 | debug_assert!( (leader.idx.load(Ordering::Relaxed) - 1) == this.idx.load(Ordering::Relaxed) );
|
---|
| 170 | this.idx.fetch_add(1, Ordering::SeqCst);
|
---|
| 171 | if exhaust {
|
---|
| 172 | debug!("Waiting {} sem", this.id);
|
---|
| 173 | this.sem.acquire().await.forget();
|
---|
| 174 | }
|
---|
| 175 | else {
|
---|
| 176 | debug!("Waiting {} yield", this.id);
|
---|
| 177 | task::yield_now().await;
|
---|
| 178 | }
|
---|
| 179 |
|
---|
| 180 | debug!("Waiting {} done", this.id);
|
---|
| 181 | }
|
---|
| 182 |
|
---|
| 183 | async fn transfer_main( me: usize, leader: Arc<LeaderInfo>, threads: Arc<Vec<Arc<MyThread>>>, exhaust: bool, start: Arc<sync::Barrier>, main_sem: Arc<sync::Semaphore>, exp: Arc<bench::BenchData>) -> usize{
|
---|
| 184 | assert!( me == threads[me].id );
|
---|
| 185 |
|
---|
| 186 | debug!("Ready {}: {:p}", me, &threads[me].sem as *const sync::Semaphore);
|
---|
| 187 |
|
---|
| 188 | start.wait().await;
|
---|
| 189 |
|
---|
| 190 | debug!( "Start {}", me );
|
---|
| 191 |
|
---|
| 192 | let mut rechecks: usize = 0;
|
---|
| 193 |
|
---|
| 194 | loop {
|
---|
| 195 | if leader.id.load(Ordering::Relaxed) == me {
|
---|
| 196 | lead( exhaust, &leader, &threads[me], &threads, &main_sem, &exp );
|
---|
| 197 | task::yield_now().await;
|
---|
| 198 | }
|
---|
| 199 | else {
|
---|
| 200 | wait( exhaust, &leader, &threads[me], &mut rechecks ).await;
|
---|
| 201 | }
|
---|
[65c9208] | 202 | if leader.idx.load(Ordering::Relaxed) as u64 > exp.stop_count || leader.estop.load(Ordering::Relaxed) { break; }
|
---|
[821c534] | 203 | }
|
---|
| 204 |
|
---|
| 205 | rechecks
|
---|
| 206 | }
|
---|
| 207 |
|
---|
| 208 | fn main() {
|
---|
| 209 | let options = App::new("Transfer Tokio")
|
---|
| 210 | .args(&bench::args())
|
---|
| 211 | .arg(Arg::with_name("exhaust") .short("e").long("exhaust") .takes_value(true).default_value("no").help("Whether or not threads that have seen the new epoch should park instead of yielding."))
|
---|
| 212 | .get_matches();
|
---|
| 213 |
|
---|
| 214 | let exhaust = parse_yes_no( options.value_of("exhaust"), false );
|
---|
| 215 | let nthreads = options.value_of("nthreads").unwrap().parse::<usize>().unwrap();
|
---|
| 216 | let nprocs = options.value_of("nprocs").unwrap().parse::<usize>().unwrap();
|
---|
| 217 |
|
---|
| 218 |
|
---|
| 219 | let exp = Arc::new(bench::BenchData::new(options, nthreads, Some(100)));
|
---|
| 220 | if exp.clock_mode {
|
---|
| 221 | eprintln!("Programs does not support fixed duration mode");
|
---|
| 222 | std::process::exit(1);
|
---|
| 223 | }
|
---|
| 224 |
|
---|
| 225 | println!("Running {} threads on {} processors, doing {} iterations, {} exhaustion", nthreads, nprocs, exp.stop_count, if exhaust { "with" } else { "without" });
|
---|
| 226 |
|
---|
| 227 | let s = (1000000 as u64).to_formatted_string(&Locale::en);
|
---|
| 228 | assert_eq!(&s, "1,000,000");
|
---|
| 229 |
|
---|
| 230 | let main_sem = Arc::new(sync::Semaphore::new(0));
|
---|
| 231 | let leader = Arc::new(LeaderInfo::new(nthreads));
|
---|
| 232 | let barr = Arc::new(sync::Barrier::new(nthreads + 1));
|
---|
| 233 | let thddata : Arc<Vec<Arc<MyThread>>> = Arc::new(
|
---|
| 234 | (0..nthreads).map(|i| {
|
---|
| 235 | Arc::new(MyThread{
|
---|
| 236 | id: i,
|
---|
| 237 | idx: AtomicUsize::new(0),
|
---|
| 238 | sem: sync::Semaphore::new(0),
|
---|
| 239 | })
|
---|
| 240 | }).collect()
|
---|
| 241 | );
|
---|
| 242 |
|
---|
| 243 | let mut rechecks: usize = 0;
|
---|
| 244 | let mut duration : std::time::Duration = std::time::Duration::from_secs(0);
|
---|
| 245 |
|
---|
| 246 | let runtime = Builder::new_multi_thread()
|
---|
| 247 | .worker_threads(nprocs)
|
---|
| 248 | .enable_all()
|
---|
| 249 | .build()
|
---|
| 250 | .unwrap();
|
---|
| 251 |
|
---|
| 252 | runtime.block_on(async {
|
---|
| 253 | let threads: Vec<_> = (0..nthreads).map(|i| {
|
---|
| 254 | tokio::spawn(transfer_main(i, leader.clone(), thddata.clone(), exhaust, barr.clone(), main_sem.clone(), exp.clone()))
|
---|
| 255 | }).collect();
|
---|
| 256 | println!("Starting");
|
---|
| 257 |
|
---|
| 258 | let start = Instant::now();
|
---|
| 259 |
|
---|
| 260 | barr.wait().await;
|
---|
| 261 | debug!("Unlocked all");
|
---|
| 262 |
|
---|
| 263 |
|
---|
| 264 | main_sem.acquire().await.forget();
|
---|
| 265 |
|
---|
| 266 | duration = start.elapsed();
|
---|
| 267 |
|
---|
| 268 | println!("\nDone");
|
---|
| 269 |
|
---|
| 270 |
|
---|
| 271 | for i in 0..nthreads {
|
---|
| 272 | thddata[i].sem.add_permits(1);
|
---|
| 273 | }
|
---|
| 274 |
|
---|
| 275 | for t in threads {
|
---|
| 276 | rechecks += t.await.unwrap();
|
---|
| 277 | }
|
---|
| 278 | });
|
---|
| 279 |
|
---|
| 280 | println!("Duration (ms) : {}", (duration.as_millis()).to_formatted_string(&Locale::en));
|
---|
| 281 | println!("Number of processors : {}", (nprocs).to_formatted_string(&Locale::en));
|
---|
| 282 | println!("Number of threads : {}", (nthreads).to_formatted_string(&Locale::en));
|
---|
[65c9208] | 283 | println!("Total Operations(ops) : {:>15}", (leader.idx.load(Ordering::Relaxed) - 1).to_formatted_string(&Locale::en));
|
---|
[821c534] | 284 | println!("Threads parking on wait : {}", if exhaust { "yes" } else { "no" });
|
---|
| 285 | println!("Rechecking : {}", rechecks );
|
---|
[65c9208] | 286 | println!("ns per transfer : {}", ((duration.as_nanos() as f64) / leader.idx.load(Ordering::Relaxed) as f64));
|
---|
| 287 |
|
---|
[821c534] | 288 | }
|
---|