| 1 | use std::sync::Arc;
 | 
|---|
| 2 | use std::sync::atomic::Ordering;
 | 
|---|
| 3 | use std::time::Instant;
 | 
|---|
| 4 | 
 | 
|---|
| 5 | use tokio::runtime::Builder;
 | 
|---|
| 6 | use tokio::sync;
 | 
|---|
| 7 | 
 | 
|---|
| 8 | use clap::{Arg, App};
 | 
|---|
| 9 | use num_format::{Locale, ToFormattedString};
 | 
|---|
| 10 | 
 | 
|---|
| 11 | #[path = "../bench.rs"]
 | 
|---|
| 12 | mod bench;
 | 
|---|
| 13 | 
 | 
|---|
| 14 | // ==================================================
 | 
|---|
| 15 | struct Partner {
 | 
|---|
| 16 |         sem: sync::Semaphore,
 | 
|---|
| 17 |         next: usize,
 | 
|---|
| 18 | }
 | 
|---|
| 19 | 
 | 
|---|
| 20 | async fn partner_main(idx: usize, others: Arc<Vec<Arc<Partner>>>, exp: Arc<bench::BenchData> ) -> u64 {
 | 
|---|
| 21 |         let this = &others[idx];
 | 
|---|
| 22 |         let mut count:u64 = 0;
 | 
|---|
| 23 |         loop {
 | 
|---|
| 24 |                 this.sem.acquire().await.forget();
 | 
|---|
| 25 |                 others[this.next].sem.add_permits(1);
 | 
|---|
| 26 |                 count += 1;
 | 
|---|
| 27 | 
 | 
|---|
| 28 |                 if  exp.clock_mode && exp.stop.load(Ordering::Relaxed) { break; }
 | 
|---|
| 29 |                 if !exp.clock_mode && count >= exp.stop_count { break; }
 | 
|---|
| 30 |         }
 | 
|---|
| 31 | 
 | 
|---|
| 32 |         exp.threads_left.fetch_sub(1, Ordering::SeqCst);
 | 
|---|
| 33 |         count
 | 
|---|
| 34 | }
 | 
|---|
| 35 | 
 | 
|---|
| 36 | // ==================================================
 | 
|---|
| 37 | fn main() {
 | 
|---|
| 38 |         let options = App::new("Cycle Tokio")
 | 
|---|
| 39 |                 .args(&bench::args())
 | 
|---|
| 40 |                 .arg(Arg::with_name("ringsize")  .short("r").long("ringsize")  .takes_value(true).default_value("1").help("Number of threads in a cycle"))
 | 
|---|
| 41 |                 .get_matches();
 | 
|---|
| 42 | 
 | 
|---|
| 43 |         let ring_size = options.value_of("ringsize").unwrap().parse::<usize>().unwrap();
 | 
|---|
| 44 |         let nthreads  = options.value_of("nthreads").unwrap().parse::<usize>().unwrap();
 | 
|---|
| 45 |         let nprocs    = options.value_of("nprocs").unwrap().parse::<usize>().unwrap();
 | 
|---|
| 46 | 
 | 
|---|
| 47 |         let tthreads = nthreads * ring_size;
 | 
|---|
| 48 |         let exp = Arc::new(bench::BenchData::new(options, tthreads));
 | 
|---|
| 49 | 
 | 
|---|
| 50 |         let s = (1000000 as u64).to_formatted_string(&Locale::en);
 | 
|---|
| 51 |         assert_eq!(&s, "1,000,000");
 | 
|---|
| 52 | 
 | 
|---|
| 53 |         let thddata : Arc<Vec<Arc<Partner>>> = Arc::new(
 | 
|---|
| 54 |                 (0..tthreads).map(|i| {
 | 
|---|
| 55 |                         let pi = (i + nthreads) % tthreads;
 | 
|---|
| 56 |                         Arc::new(Partner{
 | 
|---|
| 57 |                                 sem: sync::Semaphore::new(0),
 | 
|---|
| 58 |                                 next: pi,
 | 
|---|
| 59 |                         })
 | 
|---|
| 60 |                 }).collect()
 | 
|---|
| 61 |         );
 | 
|---|
| 62 | 
 | 
|---|
| 63 |         let mut global_counter :u64 = 0;
 | 
|---|
| 64 |         let mut duration : std::time::Duration = std::time::Duration::from_secs(0);
 | 
|---|
| 65 |         let runtime = Builder::new_multi_thread()
 | 
|---|
| 66 |                 .worker_threads(nprocs)
 | 
|---|
| 67 |                 .enable_all()
 | 
|---|
| 68 |                 .build()
 | 
|---|
| 69 |                 .unwrap();
 | 
|---|
| 70 | 
 | 
|---|
| 71 |         runtime.block_on(async {
 | 
|---|
| 72 |                 let threads: Vec<_> = (0..tthreads).map(|i| {
 | 
|---|
| 73 |                         tokio::spawn(partner_main(i, thddata.clone(), exp.clone()))
 | 
|---|
| 74 |                 }).collect();
 | 
|---|
| 75 |                 println!("Starting");
 | 
|---|
| 76 | 
 | 
|---|
| 77 |                 let start = Instant::now();
 | 
|---|
| 78 | 
 | 
|---|
| 79 |                 for i in 0..nthreads {
 | 
|---|
| 80 |                         thddata[i].sem.add_permits(1);
 | 
|---|
| 81 |                 }
 | 
|---|
| 82 | 
 | 
|---|
| 83 |                 duration = exp.wait(&start).await;
 | 
|---|
| 84 | 
 | 
|---|
| 85 |                 println!("\nDone");
 | 
|---|
| 86 | 
 | 
|---|
| 87 |                 for i in 0..tthreads {
 | 
|---|
| 88 |                         thddata[i].sem.add_permits(1);
 | 
|---|
| 89 |                 }
 | 
|---|
| 90 | 
 | 
|---|
| 91 |                 for t in threads {
 | 
|---|
| 92 |                         global_counter += t.await.unwrap();
 | 
|---|
| 93 |                 }
 | 
|---|
| 94 |         });
 | 
|---|
| 95 | 
 | 
|---|
| 96 |         println!("Duration (ms)        : {}", (duration.as_millis()).to_formatted_string(&Locale::en));
 | 
|---|
| 97 |         println!("Number of processors : {}", (nprocs).to_formatted_string(&Locale::en));
 | 
|---|
| 98 |         println!("Number of threads    : {}", (tthreads).to_formatted_string(&Locale::en));
 | 
|---|
| 99 |         println!("Cycle size (# thrds) : {}", (ring_size).to_formatted_string(&Locale::en));
 | 
|---|
| 100 |         println!("Total Operations(ops): {:>15}", (global_counter).to_formatted_string(&Locale::en));
 | 
|---|
| 101 |         println!("Ops per second       : {:>15}", (((global_counter as f64) / duration.as_secs() as f64) as u64).to_formatted_string(&Locale::en));
 | 
|---|
| 102 |         println!("ns per ops           : {:>15}", ((duration.as_nanos() as f64 / global_counter as f64) as u64).to_formatted_string(&Locale::en));
 | 
|---|
| 103 |         println!("Ops per threads      : {:>15}", (global_counter / tthreads as u64).to_formatted_string(&Locale::en));
 | 
|---|
| 104 |         println!("Ops per procs        : {:>15}", (global_counter / nprocs as u64).to_formatted_string(&Locale::en));
 | 
|---|
| 105 |         println!("Ops/sec/procs        : {:>15}", ((((global_counter as f64) / nprocs as f64) / duration.as_secs() as f64) as u64).to_formatted_string(&Locale::en));
 | 
|---|
| 106 |         println!("ns per ops/procs     : {:>15}", ((duration.as_nanos() as f64 / (global_counter as f64 / nprocs as f64)) as u64).to_formatted_string(&Locale::en));
 | 
|---|
| 107 | }
 | 
|---|