#[cfg(any( feature = "sync time rt-threaded", ))] extern crate tokio; use std::io::{self, Write}; use std::sync::Arc; use std::sync::atomic::{AtomicU64, AtomicBool,Ordering}; use std::time::{Instant,Duration}; use tokio::runtime::Builder; use tokio::sync; use tokio::time; use isatty::stdout_isatty; use num_format::{Locale, ToFormattedString}; use clap::{Arg, App}; use std::cell::UnsafeCell; use std::mem::MaybeUninit; use std::ops; pub struct InitializeCell { inner: UnsafeCell>, } unsafe impl Sync for InitializeCell {} impl InitializeCell { pub const unsafe fn new_uninitialized() -> InitializeCell { InitializeCell { inner: UnsafeCell::new(MaybeUninit::uninit()), } } pub const fn new(init: T) -> InitializeCell { InitializeCell { inner: UnsafeCell::new(MaybeUninit::new(init)), } } pub unsafe fn init(&self, init: T) { (*self.inner.get()) = MaybeUninit::new(init); } } impl ops::Deref for InitializeCell { type Target = T; fn deref(&self) -> &T { unsafe { &*(*self.inner.get()).as_ptr() } } } static CLOCK_MODE: InitializeCell = unsafe { InitializeCell::new_uninitialized() }; static STOP_COUNT: InitializeCell = unsafe { InitializeCell::new_uninitialized() }; static DURATION: InitializeCell = unsafe { InitializeCell::new_uninitialized() }; static STOP : AtomicBool = AtomicBool::new(false); static THREADS_LEFT : AtomicU64 = AtomicU64 ::new(10); struct Partner { sem: sync::Semaphore, next: usize, } async fn partner_main(result: sync::oneshot::Sender, idx: usize, others: Arc>> ) { let this = &others[idx]; let mut count:u64 = 0; loop { this.sem.acquire().await.forget(); others[this.next].sem.add_permits(1); count += 1; if *CLOCK_MODE && STOP.load(Ordering::Relaxed) { break; } if !*CLOCK_MODE && count >= *STOP_COUNT { break; } } THREADS_LEFT.fetch_sub(1, Ordering::SeqCst); result.send( count ).unwrap(); } fn prep(nthreads: usize, tthreads: usize) -> Vec> { let mut thddata = Vec::with_capacity(tthreads); for i in 0..tthreads { let pi = (i + nthreads) % tthreads; thddata.push(Arc::new(Partner{ sem: sync::Semaphore::new(0), next: pi, })); } return thddata; } async fn wait(start: &Instant, is_tty: bool) { loop { time::sleep(Duration::from_micros(100000)).await; let delta = start.elapsed(); if is_tty { print!(" {:.1}\r", delta.as_secs_f32()); io::stdout().flush().unwrap(); } if *CLOCK_MODE && delta >= Duration::from_secs_f64(*DURATION) { break; } else if !*CLOCK_MODE && THREADS_LEFT.load(Ordering::Relaxed) == 0 { break; } } } fn main() { let options = App::new("Cycle Tokio") .arg(Arg::with_name("duration") .short("d").long("duration") .takes_value(true).default_value("5").help("Duration of the experiments in seconds")) .arg(Arg::with_name("iterations").short("i").long("iterations").takes_value(true).conflicts_with("duration").help("Number of iterations of the experiments")) .arg(Arg::with_name("nthreads") .short("t").long("nthreads") .takes_value(true).default_value("1").help("Number of threads to use")) .arg(Arg::with_name("nprocs") .short("p").long("nprocs") .takes_value(true).default_value("1").help("Number of processors to use")) .arg(Arg::with_name("ringsize") .short("r").long("ringsize") .takes_value(true).default_value("1").help("Number of threads in a cycle")) .get_matches(); let ring_size = options.value_of("ringsize").unwrap().parse::().unwrap(); let nthreads = options.value_of("nthreads").unwrap().parse::().unwrap(); let nprocs = options.value_of("nprocs").unwrap().parse::().unwrap(); if options.is_present("iterations") { unsafe{ CLOCK_MODE.init( false ); STOP_COUNT.init( options.value_of("iterations").unwrap().parse::().unwrap() ); } } else { unsafe{ CLOCK_MODE.init(true); DURATION .init(options.value_of("duration").unwrap().parse::().unwrap()); } } let s = (1000000 as u64).to_formatted_string(&Locale::en); assert_eq!(&s, "1,000,000"); let tthreads = nthreads * ring_size; THREADS_LEFT.store(tthreads as u64, Ordering::SeqCst); let thddata = Arc::new(prep(nthreads, tthreads)); let mut global_counter :u64 = 0; let mut duration : std::time::Duration = std::time::Duration::from_secs(0); let runtime = Builder::new_multi_thread() .worker_threads(nprocs) .enable_all() .build() .unwrap(); runtime.block_on(async { let mut result : Vec> = Vec::with_capacity(tthreads); { let mut threads = Vec::with_capacity(tthreads); for i in 0..tthreads { let (s, r) = sync::oneshot::channel::(); result.push(r); threads.push(tokio::spawn(partner_main(s, i, thddata.clone()))); } println!("Starting"); let is_tty = stdout_isatty(); let start = Instant::now(); for i in 0..nthreads { thddata[i].sem.add_permits(1); } wait(&start, is_tty).await; STOP.store(true, Ordering::SeqCst); duration = start.elapsed(); println!("\nDone"); for i in 0..tthreads { thddata[i].sem.add_permits(1); } for _ in 0..tthreads { global_counter += result.pop().unwrap().await.unwrap(); } } }); println!("Duration (ms) : {}", (duration.as_millis()).to_formatted_string(&Locale::en)); println!("Number of processors : {}", (nprocs).to_formatted_string(&Locale::en)); println!("Number of threads : {}", (tthreads).to_formatted_string(&Locale::en)); println!("Cycle size (# thrds) : {}", (ring_size).to_formatted_string(&Locale::en)); println!("Total Operations(ops): {:>15}", (global_counter).to_formatted_string(&Locale::en)); println!("Ops per second : {:>15}", (((global_counter as f64) / duration.as_secs() as f64) as u64).to_formatted_string(&Locale::en)); println!("ns per ops : {:>15}", ((duration.as_nanos() as f64 / global_counter as f64) as u64).to_formatted_string(&Locale::en)); println!("Ops per threads : {:>15}", (global_counter / tthreads as u64).to_formatted_string(&Locale::en)); println!("Ops per procs : {:>15}", (global_counter / nprocs as u64).to_formatted_string(&Locale::en)); println!("Ops/sec/procs : {:>15}", ((((global_counter as f64) / nprocs as f64) / duration.as_secs() as f64) as u64).to_formatted_string(&Locale::en)); println!("ns per ops/procs : {:>15}", ((duration.as_nanos() as f64 / (global_counter as f64 / nprocs as f64)) as u64).to_formatted_string(&Locale::en)); }