[7192145] | 1 | #[cfg(any( |
---|
| 2 | feature = "sync time rt-threaded", |
---|
| 3 | ))] |
---|
| 4 | |
---|
| 5 | extern crate tokio; |
---|
| 6 | |
---|
| 7 | use std::io::{self, Write}; |
---|
| 8 | use std::sync::Arc; |
---|
| 9 | use std::sync::atomic::{AtomicU64, AtomicBool,Ordering}; |
---|
| 10 | use std::time::{Instant,Duration}; |
---|
| 11 | |
---|
| 12 | use tokio::runtime::Builder; |
---|
| 13 | use tokio::sync; |
---|
| 14 | use tokio::time; |
---|
| 15 | |
---|
| 16 | extern crate isatty; |
---|
| 17 | use isatty::stdout_isatty; |
---|
| 18 | |
---|
| 19 | extern crate num_format; |
---|
| 20 | use num_format::{Locale, ToFormattedString}; |
---|
| 21 | |
---|
| 22 | extern crate clap; |
---|
| 23 | use clap::{Arg, App}; |
---|
| 24 | |
---|
| 25 | use std::cell::UnsafeCell; |
---|
| 26 | use std::mem::MaybeUninit; |
---|
| 27 | use std::ops; |
---|
| 28 | |
---|
| 29 | pub struct InitializeCell<T> { |
---|
| 30 | inner: UnsafeCell<MaybeUninit<T>>, |
---|
| 31 | } |
---|
| 32 | |
---|
| 33 | unsafe impl<T> Sync for InitializeCell<T> {} |
---|
| 34 | |
---|
| 35 | impl<T> InitializeCell<T> { |
---|
| 36 | pub const unsafe fn new_uninitialized() -> InitializeCell<T> { |
---|
| 37 | InitializeCell { |
---|
| 38 | inner: UnsafeCell::new(MaybeUninit::uninit()), |
---|
| 39 | } |
---|
| 40 | } |
---|
| 41 | pub const fn new(init: T) -> InitializeCell<T> { |
---|
| 42 | InitializeCell { |
---|
| 43 | inner: UnsafeCell::new(MaybeUninit::new(init)), |
---|
| 44 | } |
---|
| 45 | } |
---|
| 46 | pub unsafe fn init(&self, init: T) { |
---|
| 47 | (*self.inner.get()) = MaybeUninit::new(init); |
---|
| 48 | } |
---|
| 49 | } |
---|
| 50 | |
---|
| 51 | impl<T> ops::Deref for InitializeCell<T> { |
---|
| 52 | type Target = T; |
---|
| 53 | fn deref(&self) -> &T { |
---|
| 54 | unsafe { |
---|
| 55 | &*(*self.inner.get()).as_ptr() |
---|
| 56 | } |
---|
| 57 | } |
---|
| 58 | } |
---|
| 59 | |
---|
| 60 | static CLOCK_MODE: InitializeCell<bool> = unsafe { InitializeCell::new_uninitialized() }; |
---|
| 61 | static STOP_COUNT: InitializeCell<u64> = unsafe { InitializeCell::new_uninitialized() }; |
---|
| 62 | static DURATION: InitializeCell<f64> = unsafe { InitializeCell::new_uninitialized() }; |
---|
| 63 | static STOP : AtomicBool = AtomicBool::new(false); |
---|
| 64 | static THREADS_LEFT : AtomicU64 = AtomicU64 ::new(10); |
---|
| 65 | |
---|
| 66 | struct Partner { |
---|
| 67 | sem: sync::Semaphore, |
---|
| 68 | next: usize, |
---|
| 69 | } |
---|
| 70 | |
---|
| 71 | async fn partner_main(result: sync::oneshot::Sender<u64>, idx: usize, others: Arc<Vec<Arc<Partner>>> ) { |
---|
| 72 | let this = &others[idx]; |
---|
| 73 | let mut count:u64 = 0; |
---|
| 74 | loop { |
---|
| 75 | this.sem.acquire().await.forget(); |
---|
| 76 | others[this.next].sem.add_permits(1); |
---|
| 77 | count += 1; |
---|
| 78 | |
---|
| 79 | if *CLOCK_MODE && STOP.load(Ordering::Relaxed) { break; } |
---|
| 80 | if !*CLOCK_MODE && count >= *STOP_COUNT { break; } |
---|
| 81 | } |
---|
| 82 | |
---|
| 83 | THREADS_LEFT.fetch_sub(1, Ordering::SeqCst); |
---|
| 84 | result.send( count ).unwrap(); |
---|
| 85 | } |
---|
| 86 | |
---|
| 87 | fn prep(nthreads: usize, tthreads: usize) -> Vec<Arc<Partner>> { |
---|
| 88 | let mut thddata = Vec::with_capacity(tthreads); |
---|
| 89 | for i in 0..tthreads { |
---|
| 90 | let pi = (i + nthreads) % tthreads; |
---|
| 91 | thddata.push(Arc::new(Partner{ |
---|
| 92 | sem: sync::Semaphore::new(0), |
---|
| 93 | next: pi, |
---|
| 94 | })); |
---|
| 95 | } |
---|
| 96 | return thddata; |
---|
| 97 | } |
---|
| 98 | |
---|
| 99 | async fn wait(start: &Instant, is_tty: bool) { |
---|
| 100 | loop { |
---|
| 101 | time::sleep(Duration::from_micros(100000)).await; |
---|
| 102 | let delta = start.elapsed(); |
---|
| 103 | if is_tty { |
---|
| 104 | print!(" {:.1}\r", delta.as_secs_f32()); |
---|
| 105 | io::stdout().flush().unwrap(); |
---|
| 106 | } |
---|
| 107 | if *CLOCK_MODE && delta >= Duration::from_secs_f64(*DURATION) { |
---|
| 108 | break; |
---|
| 109 | } |
---|
| 110 | else if !*CLOCK_MODE && THREADS_LEFT.load(Ordering::Relaxed) == 0 { |
---|
| 111 | break; |
---|
| 112 | } |
---|
| 113 | } |
---|
| 114 | } |
---|
| 115 | |
---|
| 116 | fn main() { |
---|
| 117 | let options = App::new("Cycle Tokio") |
---|
| 118 | .arg(Arg::with_name("duration") .short("d").long("duration") .takes_value(true).default_value("5").help("Duration of the experiments in seconds")) |
---|
| 119 | .arg(Arg::with_name("iterations").short("i").long("iterations").takes_value(true).conflicts_with("duration").help("Number of iterations of the experiments")) |
---|
| 120 | .arg(Arg::with_name("nthreads") .short("t").long("nthreads") .takes_value(true).default_value("1").help("Number of threads to use")) |
---|
| 121 | .arg(Arg::with_name("nprocs") .short("p").long("nprocs") .takes_value(true).default_value("1").help("Number of processors to use")) |
---|
| 122 | .arg(Arg::with_name("ringsize") .short("r").long("ringsize") .takes_value(true).default_value("1").help("Number of threads in a cycle")) |
---|
| 123 | .get_matches(); |
---|
| 124 | |
---|
| 125 | let ring_size = options.value_of("ringsize").unwrap().parse::<usize>().unwrap(); |
---|
| 126 | let nthreads = options.value_of("nthreads").unwrap().parse::<usize>().unwrap(); |
---|
| 127 | let nprocs = options.value_of("nprocs").unwrap().parse::<usize>().unwrap(); |
---|
| 128 | |
---|
| 129 | if options.is_present("iterations") { |
---|
| 130 | unsafe{ |
---|
| 131 | CLOCK_MODE.init( false ); |
---|
| 132 | STOP_COUNT.init( options.value_of("iterations").unwrap().parse::<u64>().unwrap() ); |
---|
| 133 | } |
---|
| 134 | } |
---|
| 135 | else { |
---|
| 136 | unsafe{ |
---|
| 137 | CLOCK_MODE.init(true); |
---|
| 138 | DURATION .init(options.value_of("duration").unwrap().parse::<f64>().unwrap()); |
---|
| 139 | } |
---|
| 140 | } |
---|
| 141 | |
---|
| 142 | let s = (1000000 as u64).to_formatted_string(&Locale::en); |
---|
| 143 | assert_eq!(&s, "1,000,000"); |
---|
| 144 | |
---|
| 145 | |
---|
| 146 | let tthreads = nthreads * ring_size; |
---|
| 147 | THREADS_LEFT.store(tthreads as u64, Ordering::SeqCst); |
---|
| 148 | let thddata = Arc::new(prep(nthreads, tthreads)); |
---|
| 149 | |
---|
| 150 | let mut global_counter :u64 = 0; |
---|
| 151 | let mut duration : std::time::Duration = std::time::Duration::from_secs(0); |
---|
| 152 | let runtime = Builder::new_multi_thread() |
---|
| 153 | .worker_threads(nprocs) |
---|
| 154 | .enable_all() |
---|
| 155 | .build() |
---|
| 156 | .unwrap(); |
---|
| 157 | |
---|
| 158 | runtime.block_on(async { |
---|
| 159 | let mut result : Vec<sync::oneshot::Receiver::<u64>> = Vec::with_capacity(tthreads); |
---|
| 160 | { |
---|
| 161 | let mut threads = Vec::with_capacity(tthreads); |
---|
| 162 | for i in 0..tthreads { |
---|
| 163 | let (s, r) = sync::oneshot::channel::<u64>(); |
---|
| 164 | result.push(r); |
---|
| 165 | threads.push(tokio::spawn(partner_main(s, i, thddata.clone()))); |
---|
| 166 | } |
---|
| 167 | println!("Starting"); |
---|
| 168 | |
---|
| 169 | let is_tty = stdout_isatty(); |
---|
| 170 | let start = Instant::now(); |
---|
| 171 | |
---|
| 172 | for i in 0..nthreads { |
---|
| 173 | thddata[i].sem.add_permits(1); |
---|
| 174 | } |
---|
| 175 | |
---|
| 176 | wait(&start, is_tty).await; |
---|
| 177 | |
---|
| 178 | STOP.store(true, Ordering::SeqCst); |
---|
| 179 | duration = start.elapsed(); |
---|
| 180 | |
---|
| 181 | println!("\nDone"); |
---|
| 182 | |
---|
| 183 | for i in 0..tthreads { |
---|
| 184 | thddata[i].sem.add_permits(1); |
---|
| 185 | } |
---|
| 186 | |
---|
| 187 | for _ in 0..tthreads { |
---|
| 188 | global_counter += result.pop().unwrap().await.unwrap(); |
---|
| 189 | } |
---|
| 190 | } |
---|
| 191 | }); |
---|
| 192 | |
---|
| 193 | println!("Duration (ms) : {}", (duration.as_millis()).to_formatted_string(&Locale::en)); |
---|
| 194 | println!("Number of processors : {}", (nprocs).to_formatted_string(&Locale::en)); |
---|
| 195 | println!("Number of threads : {}", (tthreads).to_formatted_string(&Locale::en)); |
---|
| 196 | println!("Cycle size (# thrds) : {}", (ring_size).to_formatted_string(&Locale::en)); |
---|
| 197 | println!("Total Operations(ops): {:>15}", (global_counter).to_formatted_string(&Locale::en)); |
---|
| 198 | println!("Ops per second : {:>15}", (((global_counter as f64) / duration.as_secs() as f64) as u64).to_formatted_string(&Locale::en)); |
---|
| 199 | println!("ns per ops : {:>15}", ((duration.as_nanos() as f64 / global_counter as f64) as u64).to_formatted_string(&Locale::en)); |
---|
| 200 | println!("Ops per threads : {:>15}", (global_counter / tthreads as u64).to_formatted_string(&Locale::en)); |
---|
| 201 | println!("Ops per procs : {:>15}", (global_counter / nprocs as u64).to_formatted_string(&Locale::en)); |
---|
| 202 | println!("Ops/sec/procs : {:>15}", ((((global_counter as f64) / nprocs as f64) / duration.as_secs() as f64) as u64).to_formatted_string(&Locale::en)); |
---|
| 203 | println!("ns per ops/procs : {:>15}", ((duration.as_nanos() as f64 / (global_counter as f64 / nprocs as f64)) as u64).to_formatted_string(&Locale::en)); |
---|
| 204 | } |
---|