| 1 | use std::sync::Arc;
 | 
|---|
| 2 | use std::sync::atomic::Ordering;
 | 
|---|
| 3 | use std::time::Instant;
 | 
|---|
| 4 | 
 | 
|---|
| 5 | use tokio::runtime::Builder;
 | 
|---|
| 6 | use tokio::sync;
 | 
|---|
| 7 | use tokio::time::{sleep,Duration};
 | 
|---|
| 8 | 
 | 
|---|
| 9 | use rand::Rng;
 | 
|---|
| 10 | 
 | 
|---|
| 11 | use clap::{Arg, App};
 | 
|---|
| 12 | use num_format::{Locale, ToFormattedString};
 | 
|---|
| 13 | 
 | 
|---|
| 14 | #[path = "../bench.rs"]
 | 
|---|
| 15 | mod bench;
 | 
|---|
| 16 | 
 | 
|---|
| 17 | // ==================================================
 | 
|---|
| 18 | struct Churner {
 | 
|---|
| 19 |         start: sync::Notify,
 | 
|---|
| 20 | }
 | 
|---|
| 21 | 
 | 
|---|
| 22 | async fn churn_main( this: Arc<Churner>, spots: Arc<Vec<sync::Semaphore>>, exp: Arc<bench::BenchData>, skip_in: bool ) -> u64 {
 | 
|---|
| 23 |         let mut skip = skip_in;
 | 
|---|
| 24 |         this.start.notified().await;
 | 
|---|
| 25 | 
 | 
|---|
| 26 |         let mut count:u64 = 0;
 | 
|---|
| 27 |         loop {
 | 
|---|
| 28 |                 let r : usize = rand::thread_rng().gen();
 | 
|---|
| 29 | 
 | 
|---|
| 30 |                 let spot : &sync::Semaphore = &spots[r % spots.len()];
 | 
|---|
| 31 |                 if !skip { spot.add_permits(1); }
 | 
|---|
| 32 |                 spot.acquire().await.forget();
 | 
|---|
| 33 |                 skip = false;
 | 
|---|
| 34 | 
 | 
|---|
| 35 |                 count += 1;
 | 
|---|
| 36 |                 if  exp.clock_mode && exp.stop.load(Ordering::Relaxed) { break; }
 | 
|---|
| 37 |                 if !exp.clock_mode && count >= exp.stop_count { break; }
 | 
|---|
| 38 |         }
 | 
|---|
| 39 | 
 | 
|---|
| 40 |         exp.threads_left.fetch_sub(1, Ordering::SeqCst);
 | 
|---|
| 41 |         count
 | 
|---|
| 42 | }
 | 
|---|
| 43 | 
 | 
|---|
| 44 | // ==================================================
 | 
|---|
| 45 | fn main() {
 | 
|---|
| 46 |         let options = App::new("Churn Tokio")
 | 
|---|
| 47 |                 .args(&bench::args())
 | 
|---|
| 48 |                 .arg(Arg::with_name("nspots")  .short("s").long("spots")  .takes_value(true).default_value("1").help("Number of spots in the system"))
 | 
|---|
| 49 |                 .get_matches();
 | 
|---|
| 50 | 
 | 
|---|
| 51 |         let nthreads  = options.value_of("nthreads").unwrap().parse::<usize>().unwrap();
 | 
|---|
| 52 |         let nprocs    = options.value_of("nprocs").unwrap().parse::<usize>().unwrap();
 | 
|---|
| 53 |         let nspots    = options.value_of("nspots").unwrap().parse::<usize>().unwrap();
 | 
|---|
| 54 | 
 | 
|---|
| 55 |         let exp = Arc::new(bench::BenchData::new(options, nthreads, None));
 | 
|---|
| 56 | 
 | 
|---|
| 57 |         let s = (1000000 as u64).to_formatted_string(&Locale::en);
 | 
|---|
| 58 |         assert_eq!(&s, "1,000,000");
 | 
|---|
| 59 | 
 | 
|---|
| 60 |         let spots : Arc<Vec<sync::Semaphore>> = Arc::new((0..nspots).map(|_| {
 | 
|---|
| 61 |                 sync::Semaphore::new(0)
 | 
|---|
| 62 |         }).collect());
 | 
|---|
| 63 | 
 | 
|---|
| 64 |         let thddata : Vec<Arc<Churner>> = (0..nthreads).map(|_| {
 | 
|---|
| 65 |                 Arc::new(Churner{
 | 
|---|
| 66 |                         start: sync::Notify::new(),
 | 
|---|
| 67 |                 })
 | 
|---|
| 68 |         }).collect();
 | 
|---|
| 69 | 
 | 
|---|
| 70 |         let mut global_counter :u64 = 0;
 | 
|---|
| 71 |         let mut duration : std::time::Duration = std::time::Duration::from_secs(0);
 | 
|---|
| 72 |         let runtime = Builder::new_multi_thread()
 | 
|---|
| 73 |                 .worker_threads(nprocs)
 | 
|---|
| 74 |                 .enable_all()
 | 
|---|
| 75 |                 .build()
 | 
|---|
| 76 |                 .unwrap();
 | 
|---|
| 77 | 
 | 
|---|
| 78 |         runtime.block_on(async {
 | 
|---|
| 79 |                 let threads: Vec<_> = (0..nthreads).map(|i| {
 | 
|---|
| 80 |                         tokio::spawn(churn_main(thddata[i].clone(), spots.clone(), exp.clone(), i < nspots))
 | 
|---|
| 81 |                 }).collect();
 | 
|---|
| 82 |                 println!("Starting");
 | 
|---|
| 83 | 
 | 
|---|
| 84 |                 sleep(Duration::from_millis(100)).await;
 | 
|---|
| 85 | 
 | 
|---|
| 86 |                 let start = Instant::now();
 | 
|---|
| 87 | 
 | 
|---|
| 88 |                 for i in 0..nthreads {
 | 
|---|
| 89 |                         thddata[i].start.notify_one();
 | 
|---|
| 90 |                 }
 | 
|---|
| 91 | 
 | 
|---|
| 92 |                 duration = exp.wait(&start).await;
 | 
|---|
| 93 | 
 | 
|---|
| 94 |                 println!("\nDone");
 | 
|---|
| 95 | 
 | 
|---|
| 96 |                 for i in 0..nspots {
 | 
|---|
| 97 |                         spots[i].add_permits(10000);
 | 
|---|
| 98 |                 }
 | 
|---|
| 99 | 
 | 
|---|
| 100 |                 for t in threads {
 | 
|---|
| 101 |                         let c = t.await.unwrap();
 | 
|---|
| 102 |                         global_counter += c;
 | 
|---|
| 103 |                 }
 | 
|---|
| 104 |         });
 | 
|---|
| 105 | 
 | 
|---|
| 106 |         println!("Duration (ms)        : {}", (duration.as_millis()).to_formatted_string(&Locale::en));
 | 
|---|
| 107 |         println!("Number of processors : {}", (nprocs).to_formatted_string(&Locale::en));
 | 
|---|
| 108 |         println!("Number of threads    : {}", (nthreads).to_formatted_string(&Locale::en));
 | 
|---|
| 109 |         println!("Number of spots      : {}", "6");
 | 
|---|
| 110 |         println!("Total Operations(ops): {:>15}", (global_counter).to_formatted_string(&Locale::en));
 | 
|---|
| 111 |         println!("Ops per second       : {:>15}", (((global_counter as f64) / duration.as_secs() as f64) as u64).to_formatted_string(&Locale::en));
 | 
|---|
| 112 |         println!("ns per ops           : {:>15}", ((duration.as_nanos() as f64 / global_counter as f64) as u64).to_formatted_string(&Locale::en));
 | 
|---|
| 113 |         println!("Ops per threads      : {:>15}", (global_counter / nthreads as u64).to_formatted_string(&Locale::en));
 | 
|---|
| 114 |         println!("Ops per procs        : {:>15}", (global_counter / nprocs as u64).to_formatted_string(&Locale::en));
 | 
|---|
| 115 |         println!("Ops/sec/procs        : {:>15}", ((((global_counter as f64) / nprocs as f64) / duration.as_secs() as f64) as u64).to_formatted_string(&Locale::en));
 | 
|---|
| 116 |         println!("ns per ops/procs     : {:>15}", ((duration.as_nanos() as f64 / (global_counter as f64 / nprocs as f64)) as u64).to_formatted_string(&Locale::en));
 | 
|---|
| 117 | }
 | 
|---|