| [821c534] | 1 | #[cfg(debug_assertions)]
 | 
|---|
 | 2 | use std::io::{self, Write};
 | 
|---|
 | 3 | 
 | 
|---|
 | 4 | use std::process;
 | 
|---|
 | 5 | use std::option;
 | 
|---|
| [ebb6158] | 6 | use std::hint;
 | 
|---|
| [821c534] | 7 | use std::sync::Arc;
 | 
|---|
| [65c9208] | 8 | use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 | 
|---|
| [821c534] | 9 | use std::time::{Instant,Duration};
 | 
|---|
 | 10 | 
 | 
|---|
 | 11 | use tokio::runtime::Builder;
 | 
|---|
 | 12 | use tokio::sync;
 | 
|---|
 | 13 | use tokio::task;
 | 
|---|
 | 14 | 
 | 
|---|
 | 15 | use rand::Rng;
 | 
|---|
 | 16 | 
 | 
|---|
 | 17 | use clap::{Arg, App};
 | 
|---|
 | 18 | use num_format::{Locale, ToFormattedString};
 | 
|---|
 | 19 | 
 | 
|---|
 | 20 | #[path = "../bench.rs"]
 | 
|---|
 | 21 | mod bench;
 | 
|---|
 | 22 | 
 | 
|---|
 | 23 | #[cfg(debug_assertions)]
 | 
|---|
 | 24 | macro_rules! debug {
 | 
|---|
 | 25 |         ($x:expr) => {
 | 
|---|
 | 26 |                 println!( $x );
 | 
|---|
 | 27 |                 io::stdout().flush().unwrap();
 | 
|---|
 | 28 |         };
 | 
|---|
 | 29 |         ($x:expr, $($more:expr),+) => {
 | 
|---|
 | 30 |                 println!( $x, $($more), * );
 | 
|---|
 | 31 |                 io::stdout().flush().unwrap();
 | 
|---|
 | 32 |         };
 | 
|---|
 | 33 | }
 | 
|---|
 | 34 | 
 | 
|---|
 | 35 | #[cfg(not(debug_assertions))]
 | 
|---|
 | 36 | macro_rules! debug {
 | 
|---|
 | 37 |     ($x:expr   ) => { () };
 | 
|---|
 | 38 |     ($x:expr, $($more:expr),+) => { () };
 | 
|---|
 | 39 | }
 | 
|---|
 | 40 | 
 | 
|---|
 | 41 | fn parse_yes_no(opt: option::Option<&str>, default: bool) -> bool {
 | 
|---|
 | 42 |         match opt {
 | 
|---|
 | 43 |                 Some(val) => {
 | 
|---|
 | 44 |                         match val {
 | 
|---|
 | 45 |                                 "yes" => true,
 | 
|---|
| [65c9208] | 46 |                                 "Y" => true,
 | 
|---|
 | 47 |                                 "y" => true,
 | 
|---|
| [821c534] | 48 |                                 "no"  => false,
 | 
|---|
| [65c9208] | 49 |                                 "N"  => false,
 | 
|---|
 | 50 |                                 "n"  => false,
 | 
|---|
| [821c534] | 51 |                                 "maybe" | "I don't know" | "Can you repeat the question?" => {
 | 
|---|
 | 52 |                                         eprintln!("Lines for 'Malcolm in the Middle' are not acceptable values of parameter 'exhaust'");
 | 
|---|
 | 53 |                                         std::process::exit(1);
 | 
|---|
 | 54 |                                 },
 | 
|---|
 | 55 |                                 _ => {
 | 
|---|
 | 56 |                                         eprintln!("parameter 'exhaust' must have value 'yes' or 'no', was {}", val);
 | 
|---|
 | 57 |                                         std::process::exit(1);
 | 
|---|
 | 58 |                                 },
 | 
|---|
 | 59 |                         }
 | 
|---|
 | 60 |                 },
 | 
|---|
 | 61 |                 _ => {
 | 
|---|
 | 62 |                         default
 | 
|---|
 | 63 |                 },
 | 
|---|
 | 64 |         }
 | 
|---|
 | 65 | }
 | 
|---|
 | 66 | 
 | 
|---|
 | 67 | struct LeaderInfo {
 | 
|---|
 | 68 |         id: AtomicUsize,
 | 
|---|
 | 69 |         idx: AtomicUsize,
 | 
|---|
| [65c9208] | 70 |         estop: AtomicBool,
 | 
|---|
| [821c534] | 71 |         seed: u128,
 | 
|---|
 | 72 | }
 | 
|---|
 | 73 | 
 | 
|---|
 | 74 | impl LeaderInfo {
 | 
|---|
 | 75 |         pub fn new(nthreads: usize) -> LeaderInfo {
 | 
|---|
 | 76 |                 let this = LeaderInfo{
 | 
|---|
 | 77 |                         id: AtomicUsize::new(nthreads),
 | 
|---|
 | 78 |                         idx: AtomicUsize::new(0),
 | 
|---|
| [65c9208] | 79 |                         estop: AtomicBool::new(false),
 | 
|---|
| [821c534] | 80 |                         seed: process::id() as u128
 | 
|---|
 | 81 |                 };
 | 
|---|
 | 82 | 
 | 
|---|
 | 83 |                 let mut rng = rand::thread_rng();
 | 
|---|
 | 84 | 
 | 
|---|
 | 85 |                 for _ in 0..rng.gen_range(0..10) {
 | 
|---|
 | 86 |                         this.next( nthreads );
 | 
|---|
 | 87 |                 }
 | 
|---|
 | 88 | 
 | 
|---|
 | 89 |                 this
 | 
|---|
 | 90 |         }
 | 
|---|
 | 91 | 
 | 
|---|
 | 92 |         pub fn next(&self, len: usize) {
 | 
|---|
 | 93 |                 let n = bench::_lehmer64( unsafe {
 | 
|---|
 | 94 |                         let r1 = &self.seed as *const u128;
 | 
|---|
 | 95 |                         let r2 = r1 as *mut u128;
 | 
|---|
 | 96 |                         &mut *r2
 | 
|---|
 | 97 |                 } ) as usize;
 | 
|---|
 | 98 |                 self.id.store( n % len , Ordering::SeqCst );
 | 
|---|
 | 99 |         }
 | 
|---|
 | 100 | }
 | 
|---|
 | 101 | 
 | 
|---|
 | 102 | struct MyThread {
 | 
|---|
 | 103 |         id: usize,
 | 
|---|
 | 104 |         idx: AtomicUsize,
 | 
|---|
 | 105 |         sem: sync::Semaphore,
 | 
|---|
 | 106 | }
 | 
|---|
 | 107 | 
 | 
|---|
| [65c9208] | 108 | fn waitgroup(leader: &LeaderInfo, idx: usize, threads: &Vec<Arc<MyThread>>, main_sem: &sync::Semaphore) {
 | 
|---|
| [821c534] | 109 |         let start = Instant::now();
 | 
|---|
| [65c9208] | 110 |         'outer: for t in threads {
 | 
|---|
| [821c534] | 111 |                 debug!( "Waiting for :{} ({})", t.id, t.idx.load(Ordering::Relaxed) );
 | 
|---|
 | 112 |                 while t.idx.load(Ordering::Relaxed) != idx {
 | 
|---|
| [ebb6158] | 113 |                         hint::spin_loop();
 | 
|---|
| [821c534] | 114 |                         if start.elapsed() > Duration::from_secs(5) {
 | 
|---|
 | 115 |                                 eprintln!("Programs has been blocked for more than 5 secs");
 | 
|---|
| [65c9208] | 116 |                                 leader.estop.store(true, Ordering::Relaxed);
 | 
|---|
 | 117 |                                 main_sem.add_permits(1);
 | 
|---|
 | 118 |                                 break 'outer;
 | 
|---|
| [821c534] | 119 |                         }
 | 
|---|
 | 120 |                 }
 | 
|---|
 | 121 |         }
 | 
|---|
 | 122 |         debug!( "Waiting done" );
 | 
|---|
 | 123 | }
 | 
|---|
 | 124 | 
 | 
|---|
 | 125 | fn wakegroup(exhaust: bool, me: usize, threads: &Vec<Arc<MyThread>>) {
 | 
|---|
 | 126 |         if !exhaust { return; }
 | 
|---|
 | 127 | 
 | 
|---|
 | 128 |         for i in 0..threads.len() {
 | 
|---|
 | 129 |                 if i != me {
 | 
|---|
 | 130 |                         debug!( "Leader waking {}", i);
 | 
|---|
 | 131 |                         threads[i].sem.add_permits(1);
 | 
|---|
 | 132 |                 }
 | 
|---|
 | 133 |         }
 | 
|---|
 | 134 | }
 | 
|---|
 | 135 | 
 | 
|---|
 | 136 | fn lead(exhaust: bool, leader: &LeaderInfo, this: & MyThread, threads: &Vec<Arc<MyThread>>, main_sem: &sync::Semaphore, exp: &bench::BenchData) {
 | 
|---|
 | 137 |         let nidx = leader.idx.load(Ordering::Relaxed) + 1;
 | 
|---|
 | 138 |         this.idx.store(nidx, Ordering::Relaxed);
 | 
|---|
 | 139 |         leader.idx.store(nidx, Ordering::Relaxed);
 | 
|---|
 | 140 | 
 | 
|---|
| [65c9208] | 141 |         if nidx as u64 > exp.stop_count || leader.estop.load(Ordering::Relaxed) {
 | 
|---|
| [821c534] | 142 |                 debug!( "Leader {} done", this.id);
 | 
|---|
 | 143 |                 main_sem.add_permits(1);
 | 
|---|
 | 144 |                 return;
 | 
|---|
 | 145 |         }
 | 
|---|
 | 146 | 
 | 
|---|
 | 147 |         debug!( "====================\nLeader no {} : {}", nidx, this.id);
 | 
|---|
 | 148 | 
 | 
|---|
| [65c9208] | 149 |         waitgroup(leader, nidx, threads, main_sem);
 | 
|---|
| [821c534] | 150 | 
 | 
|---|
 | 151 |         leader.next( threads.len() );
 | 
|---|
 | 152 | 
 | 
|---|
 | 153 |         wakegroup(exhaust, this.id, threads);
 | 
|---|
 | 154 | 
 | 
|---|
 | 155 |         debug!( "Leader no {} : {} done\n====================", nidx, this.id);
 | 
|---|
 | 156 | }
 | 
|---|
 | 157 | 
 | 
|---|
 | 158 | async fn wait(exhaust: bool, leader: &LeaderInfo, this: &MyThread, rechecks: &mut usize) {
 | 
|---|
 | 159 |         task::yield_now().await;
 | 
|---|
 | 160 | 
 | 
|---|
 | 161 |         if leader.idx.load(Ordering::Relaxed) == this.idx.load(Ordering::Relaxed) {
 | 
|---|
 | 162 |                 debug!("Waiting {} recheck", this.id);
 | 
|---|
 | 163 |                 *rechecks += 1;
 | 
|---|
 | 164 |                 return;
 | 
|---|
 | 165 |         }
 | 
|---|
 | 166 | 
 | 
|---|
 | 167 |         debug!("Waiting {}", this.id);
 | 
|---|
 | 168 | 
 | 
|---|
 | 169 |         debug_assert!( (leader.idx.load(Ordering::Relaxed) - 1) == this.idx.load(Ordering::Relaxed) );
 | 
|---|
 | 170 |         this.idx.fetch_add(1, Ordering::SeqCst);
 | 
|---|
 | 171 |         if exhaust {
 | 
|---|
 | 172 |                 debug!("Waiting {} sem", this.id);
 | 
|---|
 | 173 |                 this.sem.acquire().await.forget();
 | 
|---|
 | 174 |         }
 | 
|---|
 | 175 |         else {
 | 
|---|
 | 176 |                 debug!("Waiting {} yield", this.id);
 | 
|---|
 | 177 |                 task::yield_now().await;
 | 
|---|
 | 178 |         }
 | 
|---|
 | 179 | 
 | 
|---|
 | 180 |         debug!("Waiting {} done", this.id);
 | 
|---|
 | 181 | }
 | 
|---|
 | 182 | 
 | 
|---|
 | 183 | async fn transfer_main( me: usize, leader: Arc<LeaderInfo>, threads: Arc<Vec<Arc<MyThread>>>, exhaust: bool, start: Arc<sync::Barrier>, main_sem: Arc<sync::Semaphore>, exp: Arc<bench::BenchData>) -> usize{
 | 
|---|
 | 184 |         assert!( me == threads[me].id );
 | 
|---|
 | 185 | 
 | 
|---|
 | 186 |         debug!("Ready {}: {:p}", me, &threads[me].sem as *const sync::Semaphore);
 | 
|---|
 | 187 | 
 | 
|---|
 | 188 |         start.wait().await;
 | 
|---|
 | 189 | 
 | 
|---|
 | 190 |         debug!( "Start {}", me );
 | 
|---|
 | 191 | 
 | 
|---|
 | 192 |         let mut rechecks: usize = 0;
 | 
|---|
 | 193 | 
 | 
|---|
 | 194 |         loop {
 | 
|---|
 | 195 |                 if leader.id.load(Ordering::Relaxed) == me {
 | 
|---|
 | 196 |                         lead( exhaust, &leader, &threads[me], &threads, &main_sem, &exp );
 | 
|---|
 | 197 |                         task::yield_now().await;
 | 
|---|
 | 198 |                 }
 | 
|---|
 | 199 |                 else {
 | 
|---|
 | 200 |                         wait( exhaust, &leader, &threads[me], &mut rechecks ).await;
 | 
|---|
 | 201 |                 }
 | 
|---|
| [65c9208] | 202 |                 if leader.idx.load(Ordering::Relaxed) as u64 > exp.stop_count || leader.estop.load(Ordering::Relaxed) { break; }
 | 
|---|
| [821c534] | 203 |         }
 | 
|---|
 | 204 | 
 | 
|---|
 | 205 |         rechecks
 | 
|---|
 | 206 | }
 | 
|---|
 | 207 | 
 | 
|---|
 | 208 | fn main() {
 | 
|---|
 | 209 |         let options = App::new("Transfer Tokio")
 | 
|---|
 | 210 |                 .args(&bench::args())
 | 
|---|
 | 211 |                 .arg(Arg::with_name("exhaust")  .short("e").long("exhaust")  .takes_value(true).default_value("no").help("Whether or not threads that have seen the new epoch should park instead of yielding."))
 | 
|---|
 | 212 |                 .get_matches();
 | 
|---|
 | 213 | 
 | 
|---|
 | 214 |         let exhaust  = parse_yes_no( options.value_of("exhaust"), false );
 | 
|---|
 | 215 |         let nthreads = options.value_of("nthreads").unwrap().parse::<usize>().unwrap();
 | 
|---|
 | 216 |         let nprocs   = options.value_of("nprocs").unwrap().parse::<usize>().unwrap();
 | 
|---|
 | 217 | 
 | 
|---|
 | 218 | 
 | 
|---|
 | 219 |         let exp = Arc::new(bench::BenchData::new(options, nthreads, Some(100)));
 | 
|---|
 | 220 |         if exp.clock_mode {
 | 
|---|
 | 221 |                 eprintln!("Programs does not support fixed duration mode");
 | 
|---|
 | 222 |                 std::process::exit(1);
 | 
|---|
 | 223 |         }
 | 
|---|
 | 224 | 
 | 
|---|
 | 225 |         println!("Running {} threads on {} processors, doing {} iterations, {} exhaustion", nthreads, nprocs, exp.stop_count, if exhaust { "with" } else { "without" });
 | 
|---|
 | 226 | 
 | 
|---|
 | 227 |         let s = (1000000 as u64).to_formatted_string(&Locale::en);
 | 
|---|
 | 228 |         assert_eq!(&s, "1,000,000");
 | 
|---|
 | 229 | 
 | 
|---|
 | 230 |         let main_sem = Arc::new(sync::Semaphore::new(0));
 | 
|---|
 | 231 |         let leader = Arc::new(LeaderInfo::new(nthreads));
 | 
|---|
 | 232 |         let barr = Arc::new(sync::Barrier::new(nthreads + 1));
 | 
|---|
 | 233 |         let thddata : Arc<Vec<Arc<MyThread>>> = Arc::new(
 | 
|---|
 | 234 |                 (0..nthreads).map(|i| {
 | 
|---|
 | 235 |                         Arc::new(MyThread{
 | 
|---|
 | 236 |                                 id: i,
 | 
|---|
 | 237 |                                 idx: AtomicUsize::new(0),
 | 
|---|
 | 238 |                                 sem: sync::Semaphore::new(0),
 | 
|---|
 | 239 |                         })
 | 
|---|
 | 240 |                 }).collect()
 | 
|---|
 | 241 |         );
 | 
|---|
 | 242 | 
 | 
|---|
 | 243 |         let mut rechecks: usize = 0;
 | 
|---|
 | 244 |         let mut duration : std::time::Duration = std::time::Duration::from_secs(0);
 | 
|---|
 | 245 | 
 | 
|---|
 | 246 |         let runtime = Builder::new_multi_thread()
 | 
|---|
 | 247 |                 .worker_threads(nprocs)
 | 
|---|
 | 248 |                 .enable_all()
 | 
|---|
 | 249 |                 .build()
 | 
|---|
 | 250 |                 .unwrap();
 | 
|---|
 | 251 | 
 | 
|---|
 | 252 |         runtime.block_on(async {
 | 
|---|
 | 253 |                 let threads: Vec<_> = (0..nthreads).map(|i| {
 | 
|---|
 | 254 |                         tokio::spawn(transfer_main(i, leader.clone(), thddata.clone(), exhaust, barr.clone(), main_sem.clone(), exp.clone()))
 | 
|---|
 | 255 |                 }).collect();
 | 
|---|
 | 256 |                 println!("Starting");
 | 
|---|
 | 257 | 
 | 
|---|
 | 258 |                 let start = Instant::now();
 | 
|---|
 | 259 | 
 | 
|---|
 | 260 |                 barr.wait().await;
 | 
|---|
 | 261 |                 debug!("Unlocked all");
 | 
|---|
 | 262 | 
 | 
|---|
 | 263 | 
 | 
|---|
 | 264 |                 main_sem.acquire().await.forget();
 | 
|---|
 | 265 | 
 | 
|---|
 | 266 |                 duration = start.elapsed();
 | 
|---|
 | 267 | 
 | 
|---|
 | 268 |                 println!("\nDone");
 | 
|---|
 | 269 | 
 | 
|---|
 | 270 | 
 | 
|---|
 | 271 |                 for i in 0..nthreads {
 | 
|---|
 | 272 |                         thddata[i].sem.add_permits(1);
 | 
|---|
 | 273 |                 }
 | 
|---|
 | 274 | 
 | 
|---|
 | 275 |                 for t in threads {
 | 
|---|
 | 276 |                         rechecks += t.await.unwrap();
 | 
|---|
 | 277 |                 }
 | 
|---|
 | 278 |         });
 | 
|---|
 | 279 | 
 | 
|---|
 | 280 |         println!("Duration (ms)           : {}", (duration.as_millis()).to_formatted_string(&Locale::en));
 | 
|---|
 | 281 |         println!("Number of processors    : {}", (nprocs).to_formatted_string(&Locale::en));
 | 
|---|
 | 282 |         println!("Number of threads       : {}", (nthreads).to_formatted_string(&Locale::en));
 | 
|---|
| [65c9208] | 283 |         println!("Total Operations(ops)   : {:>15}", (leader.idx.load(Ordering::Relaxed) - 1).to_formatted_string(&Locale::en));
 | 
|---|
| [821c534] | 284 |         println!("Threads parking on wait : {}", if exhaust { "yes" } else { "no" });
 | 
|---|
 | 285 |         println!("Rechecking              : {}", rechecks );
 | 
|---|
| [65c9208] | 286 |         println!("ns per transfer         : {}", ((duration.as_nanos() as f64) / leader.idx.load(Ordering::Relaxed) as f64));
 | 
|---|
 | 287 | 
 | 
|---|
| [821c534] | 288 | }
 | 
|---|