| 1 | #[cfg(any(
 | 
|---|
| 2 |         feature = "sync time rt-threaded",
 | 
|---|
| 3 |   ))]
 | 
|---|
| 4 | 
 | 
|---|
| 5 | extern crate tokio;
 | 
|---|
| 6 | 
 | 
|---|
| 7 | use std::io::{self, Write};
 | 
|---|
| 8 | use std::sync::Arc;
 | 
|---|
| 9 | use std::sync::atomic::{AtomicU64, AtomicBool,Ordering};
 | 
|---|
| 10 | use std::time::{Instant,Duration};
 | 
|---|
| 11 | 
 | 
|---|
| 12 | use tokio::runtime::Builder;
 | 
|---|
| 13 | use tokio::sync;
 | 
|---|
| 14 | use tokio::time;
 | 
|---|
| 15 | 
 | 
|---|
| 16 | extern crate isatty;
 | 
|---|
| 17 | use isatty::stdout_isatty;
 | 
|---|
| 18 | 
 | 
|---|
| 19 | extern crate num_format;
 | 
|---|
| 20 | use num_format::{Locale, ToFormattedString};
 | 
|---|
| 21 | 
 | 
|---|
| 22 | extern crate clap;
 | 
|---|
| 23 | use clap::{Arg, App};
 | 
|---|
| 24 | 
 | 
|---|
| 25 | use std::cell::UnsafeCell;
 | 
|---|
| 26 | use std::mem::MaybeUninit;
 | 
|---|
| 27 | use std::ops;
 | 
|---|
| 28 | 
 | 
|---|
| 29 | pub struct InitializeCell<T> {
 | 
|---|
| 30 |     inner: UnsafeCell<MaybeUninit<T>>,
 | 
|---|
| 31 | }
 | 
|---|
| 32 | 
 | 
|---|
| 33 | unsafe impl<T> Sync for InitializeCell<T> {}
 | 
|---|
| 34 | 
 | 
|---|
| 35 | impl<T> InitializeCell<T> {
 | 
|---|
| 36 |     pub const unsafe fn new_uninitialized() -> InitializeCell<T> {
 | 
|---|
| 37 |           InitializeCell {
 | 
|---|
| 38 |                 inner: UnsafeCell::new(MaybeUninit::uninit()),
 | 
|---|
| 39 |           }
 | 
|---|
| 40 |     }
 | 
|---|
| 41 |     pub const fn new(init: T) -> InitializeCell<T> {
 | 
|---|
| 42 |           InitializeCell {
 | 
|---|
| 43 |                 inner: UnsafeCell::new(MaybeUninit::new(init)),
 | 
|---|
| 44 |           }
 | 
|---|
| 45 |     }
 | 
|---|
| 46 |     pub unsafe fn init(&self, init: T) {
 | 
|---|
| 47 |           (*self.inner.get()) = MaybeUninit::new(init);
 | 
|---|
| 48 |     }
 | 
|---|
| 49 | }
 | 
|---|
| 50 | 
 | 
|---|
| 51 | impl<T> ops::Deref for InitializeCell<T> {
 | 
|---|
| 52 |     type Target = T;
 | 
|---|
| 53 |     fn deref(&self) -> &T {
 | 
|---|
| 54 |           unsafe {
 | 
|---|
| 55 |                 &*(*self.inner.get()).as_ptr()
 | 
|---|
| 56 |           }
 | 
|---|
| 57 |     }
 | 
|---|
| 58 | }
 | 
|---|
| 59 | 
 | 
|---|
| 60 | static CLOCK_MODE: InitializeCell<bool> = unsafe { InitializeCell::new_uninitialized() };
 | 
|---|
| 61 | static STOP_COUNT: InitializeCell<u64>  = unsafe { InitializeCell::new_uninitialized() };
 | 
|---|
| 62 | static DURATION: InitializeCell<f64>    = unsafe { InitializeCell::new_uninitialized() };
 | 
|---|
| 63 | static STOP         : AtomicBool = AtomicBool::new(false);
 | 
|---|
| 64 | static THREADS_LEFT : AtomicU64  = AtomicU64 ::new(10);
 | 
|---|
| 65 | 
 | 
|---|
| 66 | struct Partner {
 | 
|---|
| 67 |         sem: sync::Semaphore,
 | 
|---|
| 68 |         next: usize,
 | 
|---|
| 69 | }
 | 
|---|
| 70 | 
 | 
|---|
| 71 | async fn partner_main(result: sync::oneshot::Sender<u64>, idx: usize, others: Arc<Vec<Arc<Partner>>> ) {
 | 
|---|
| 72 |         let this = &others[idx];
 | 
|---|
| 73 |         let mut count:u64 = 0;
 | 
|---|
| 74 |         loop {
 | 
|---|
| 75 |                 this.sem.acquire().await.forget();
 | 
|---|
| 76 |                 others[this.next].sem.add_permits(1);
 | 
|---|
| 77 |                 count += 1;
 | 
|---|
| 78 | 
 | 
|---|
| 79 |                 if  *CLOCK_MODE && STOP.load(Ordering::Relaxed) { break; }
 | 
|---|
| 80 |                 if !*CLOCK_MODE && count >= *STOP_COUNT { break; }
 | 
|---|
| 81 |         }
 | 
|---|
| 82 | 
 | 
|---|
| 83 |         THREADS_LEFT.fetch_sub(1, Ordering::SeqCst);
 | 
|---|
| 84 |         result.send( count ).unwrap();
 | 
|---|
| 85 | }
 | 
|---|
| 86 | 
 | 
|---|
| 87 | fn prep(nthreads: usize, tthreads: usize) -> Vec<Arc<Partner>> {
 | 
|---|
| 88 |         let mut thddata = Vec::with_capacity(tthreads);
 | 
|---|
| 89 |         for i in 0..tthreads {
 | 
|---|
| 90 |                 let pi = (i + nthreads) % tthreads;
 | 
|---|
| 91 |                 thddata.push(Arc::new(Partner{
 | 
|---|
| 92 |                         sem: sync::Semaphore::new(0),
 | 
|---|
| 93 |                         next: pi,
 | 
|---|
| 94 |                 }));
 | 
|---|
| 95 |         }
 | 
|---|
| 96 |         return thddata;
 | 
|---|
| 97 | }
 | 
|---|
| 98 | 
 | 
|---|
| 99 | async fn wait(start: &Instant, is_tty: bool) {
 | 
|---|
| 100 |         loop {
 | 
|---|
| 101 |                 time::sleep(Duration::from_micros(100000)).await;
 | 
|---|
| 102 |                 let delta = start.elapsed();
 | 
|---|
| 103 |                 if is_tty {
 | 
|---|
| 104 |                         print!(" {:.1}\r", delta.as_secs_f32());
 | 
|---|
| 105 |                         io::stdout().flush().unwrap();
 | 
|---|
| 106 |                 }
 | 
|---|
| 107 |                 if *CLOCK_MODE && delta >= Duration::from_secs_f64(*DURATION)  {
 | 
|---|
| 108 |                         break;
 | 
|---|
| 109 |                 }
 | 
|---|
| 110 |                 else if !*CLOCK_MODE && THREADS_LEFT.load(Ordering::Relaxed) == 0 {
 | 
|---|
| 111 |                         break;
 | 
|---|
| 112 |                 }
 | 
|---|
| 113 |         }
 | 
|---|
| 114 | }
 | 
|---|
| 115 | 
 | 
|---|
| 116 | fn main() {
 | 
|---|
| 117 |         let options = App::new("Cycle Tokio")
 | 
|---|
| 118 |                 .arg(Arg::with_name("duration")  .short("d").long("duration")  .takes_value(true).default_value("5").help("Duration of the experiments in seconds"))
 | 
|---|
| 119 |                 .arg(Arg::with_name("iterations").short("i").long("iterations").takes_value(true).conflicts_with("duration").help("Number of iterations of the experiments"))
 | 
|---|
| 120 |                 .arg(Arg::with_name("nthreads")  .short("t").long("nthreads")  .takes_value(true).default_value("1").help("Number of threads to use"))
 | 
|---|
| 121 |                 .arg(Arg::with_name("nprocs")    .short("p").long("nprocs")    .takes_value(true).default_value("1").help("Number of processors to use"))
 | 
|---|
| 122 |                 .arg(Arg::with_name("ringsize")  .short("r").long("ringsize")  .takes_value(true).default_value("1").help("Number of threads in a cycle"))
 | 
|---|
| 123 |                 .get_matches();
 | 
|---|
| 124 | 
 | 
|---|
| 125 |         let ring_size = options.value_of("ringsize").unwrap().parse::<usize>().unwrap();
 | 
|---|
| 126 |         let nthreads  = options.value_of("nthreads").unwrap().parse::<usize>().unwrap();
 | 
|---|
| 127 |         let nprocs    = options.value_of("nprocs").unwrap().parse::<usize>().unwrap();
 | 
|---|
| 128 | 
 | 
|---|
| 129 |         if options.is_present("iterations") {
 | 
|---|
| 130 |                 unsafe{
 | 
|---|
| 131 |                         CLOCK_MODE.init( false );
 | 
|---|
| 132 |                         STOP_COUNT.init( options.value_of("iterations").unwrap().parse::<u64>().unwrap() );
 | 
|---|
| 133 |                 }
 | 
|---|
| 134 |         }
 | 
|---|
| 135 |         else {
 | 
|---|
| 136 |                 unsafe{
 | 
|---|
| 137 |                         CLOCK_MODE.init(true);
 | 
|---|
| 138 |                         DURATION  .init(options.value_of("duration").unwrap().parse::<f64>().unwrap());
 | 
|---|
| 139 |                 }
 | 
|---|
| 140 |         }
 | 
|---|
| 141 | 
 | 
|---|
| 142 |         let s = (1000000 as u64).to_formatted_string(&Locale::en);
 | 
|---|
| 143 |         assert_eq!(&s, "1,000,000");
 | 
|---|
| 144 | 
 | 
|---|
| 145 | 
 | 
|---|
| 146 |         let tthreads = nthreads * ring_size;
 | 
|---|
| 147 |         THREADS_LEFT.store(tthreads as u64, Ordering::SeqCst);
 | 
|---|
| 148 |         let thddata = Arc::new(prep(nthreads, tthreads));
 | 
|---|
| 149 | 
 | 
|---|
| 150 |         let mut global_counter :u64 = 0;
 | 
|---|
| 151 |         let mut duration : std::time::Duration = std::time::Duration::from_secs(0);
 | 
|---|
| 152 |         let runtime = Builder::new_multi_thread()
 | 
|---|
| 153 |                 .worker_threads(nprocs)
 | 
|---|
| 154 |                 .enable_all()
 | 
|---|
| 155 |                 .build()
 | 
|---|
| 156 |                 .unwrap();
 | 
|---|
| 157 | 
 | 
|---|
| 158 |         runtime.block_on(async {
 | 
|---|
| 159 |                 let mut result  : Vec<sync::oneshot::Receiver::<u64>> = Vec::with_capacity(tthreads);
 | 
|---|
| 160 |                 {
 | 
|---|
| 161 |                         let mut threads = Vec::with_capacity(tthreads);
 | 
|---|
| 162 |                         for i in 0..tthreads {
 | 
|---|
| 163 |                                 let (s, r) = sync::oneshot::channel::<u64>();
 | 
|---|
| 164 |                                 result.push(r);
 | 
|---|
| 165 |                                 threads.push(tokio::spawn(partner_main(s, i, thddata.clone())));
 | 
|---|
| 166 |                         }
 | 
|---|
| 167 |                         println!("Starting");
 | 
|---|
| 168 | 
 | 
|---|
| 169 |                         let is_tty = stdout_isatty();
 | 
|---|
| 170 |                         let start = Instant::now();
 | 
|---|
| 171 | 
 | 
|---|
| 172 |                         for i in 0..nthreads {
 | 
|---|
| 173 |                                 thddata[i].sem.add_permits(1);
 | 
|---|
| 174 |                         }
 | 
|---|
| 175 | 
 | 
|---|
| 176 |                         wait(&start, is_tty).await;
 | 
|---|
| 177 | 
 | 
|---|
| 178 |                         STOP.store(true, Ordering::SeqCst);
 | 
|---|
| 179 |                         duration = start.elapsed();
 | 
|---|
| 180 | 
 | 
|---|
| 181 |                         println!("\nDone");
 | 
|---|
| 182 | 
 | 
|---|
| 183 |                         for i in 0..tthreads {
 | 
|---|
| 184 |                                 thddata[i].sem.add_permits(1);
 | 
|---|
| 185 |                         }
 | 
|---|
| 186 | 
 | 
|---|
| 187 |                         for _ in 0..tthreads {
 | 
|---|
| 188 |                                 global_counter += result.pop().unwrap().await.unwrap();
 | 
|---|
| 189 |                         }
 | 
|---|
| 190 |                 }
 | 
|---|
| 191 |         });
 | 
|---|
| 192 | 
 | 
|---|
| 193 |         println!("Duration (ms)        : {}", (duration.as_millis()).to_formatted_string(&Locale::en));
 | 
|---|
| 194 |         println!("Number of processors : {}", (nprocs).to_formatted_string(&Locale::en));
 | 
|---|
| 195 |         println!("Number of threads    : {}", (tthreads).to_formatted_string(&Locale::en));
 | 
|---|
| 196 |         println!("Cycle size (# thrds) : {}", (ring_size).to_formatted_string(&Locale::en));
 | 
|---|
| 197 |         println!("Total Operations(ops): {:>15}", (global_counter).to_formatted_string(&Locale::en));
 | 
|---|
| 198 |         println!("Ops per second       : {:>15}", (((global_counter as f64) / duration.as_secs() as f64) as u64).to_formatted_string(&Locale::en));
 | 
|---|
| 199 |         println!("ns per ops           : {:>15}", ((duration.as_nanos() as f64 / global_counter as f64) as u64).to_formatted_string(&Locale::en));
 | 
|---|
| 200 |         println!("Ops per threads      : {:>15}", (global_counter / tthreads as u64).to_formatted_string(&Locale::en));
 | 
|---|
| 201 |         println!("Ops per procs        : {:>15}", (global_counter / nprocs as u64).to_formatted_string(&Locale::en));
 | 
|---|
| 202 |         println!("Ops/sec/procs        : {:>15}", ((((global_counter as f64) / nprocs as f64) / duration.as_secs() as f64) as u64).to_formatted_string(&Locale::en));
 | 
|---|
| 203 |         println!("ns per ops/procs     : {:>15}", ((duration.as_nanos() as f64 / (global_counter as f64 / nprocs as f64)) as u64).to_formatted_string(&Locale::en));
 | 
|---|
| 204 | }
 | 
|---|