Changes in benchmark/readyQ/cycle.rs [751e2eb:7192145]
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/readyQ/cycle.rs
r751e2eb r7192145 1 #[cfg(any( 2 feature = "sync time rt-threaded", 3 ))] 4 5 extern crate tokio; 6 7 use std::io::{self, Write}; 1 8 use std::sync::Arc; 2 use std::sync::atomic:: Ordering;3 use std::time:: Instant;9 use std::sync::atomic::{AtomicU64, AtomicBool,Ordering}; 10 use std::time::{Instant,Duration}; 4 11 5 12 use tokio::runtime::Builder; 6 13 use tokio::sync; 7 14 use tokio::time; 15 16 extern crate isatty; 17 use isatty::stdout_isatty; 18 19 extern crate num_format; 20 use num_format::{Locale, ToFormattedString}; 21 22 extern crate clap; 8 23 use clap::{Arg, App}; 9 use num_format::{Locale, ToFormattedString}; 10 11 #[path = "../bench.rs"] 12 mod bench; 13 14 // ================================================== 24 25 use std::cell::UnsafeCell; 26 use std::mem::MaybeUninit; 27 use std::ops; 28 29 pub struct InitializeCell<T> { 30 inner: UnsafeCell<MaybeUninit<T>>, 31 } 32 33 unsafe impl<T> Sync for InitializeCell<T> {} 34 35 impl<T> InitializeCell<T> { 36 pub const unsafe fn new_uninitialized() -> InitializeCell<T> { 37 InitializeCell { 38 inner: UnsafeCell::new(MaybeUninit::uninit()), 39 } 40 } 41 pub const fn new(init: T) -> InitializeCell<T> { 42 InitializeCell { 43 inner: UnsafeCell::new(MaybeUninit::new(init)), 44 } 45 } 46 pub unsafe fn init(&self, init: T) { 47 (*self.inner.get()) = MaybeUninit::new(init); 48 } 49 } 50 51 impl<T> ops::Deref for InitializeCell<T> { 52 type Target = T; 53 fn deref(&self) -> &T { 54 unsafe { 55 &*(*self.inner.get()).as_ptr() 56 } 57 } 58 } 59 60 static CLOCK_MODE: InitializeCell<bool> = unsafe { InitializeCell::new_uninitialized() }; 61 static STOP_COUNT: InitializeCell<u64> = unsafe { InitializeCell::new_uninitialized() }; 62 static DURATION: InitializeCell<f64> = unsafe { InitializeCell::new_uninitialized() }; 63 static STOP : AtomicBool = AtomicBool::new(false); 64 static THREADS_LEFT : AtomicU64 = AtomicU64 ::new(10); 65 15 66 struct Partner { 16 67 sem: sync::Semaphore, … … 18 69 } 19 70 20 async fn partner_main( idx: usize, others: Arc<Vec<Arc<Partner>>>, exp: Arc<bench::BenchData> ) -> u64{71 async fn partner_main(result: sync::oneshot::Sender<u64>, idx: usize, others: Arc<Vec<Arc<Partner>>> ) { 21 72 let this = &others[idx]; 22 73 let mut count:u64 = 0; … … 26 77 count += 1; 27 78 28 if exp.clock_mode && exp.stop.load(Ordering::Relaxed) { break; } 29 if !exp.clock_mode && count >= exp.stop_count { break; } 30 } 31 32 exp.threads_left.fetch_sub(1, Ordering::SeqCst); 33 count 34 } 35 36 // ================================================== 79 if *CLOCK_MODE && STOP.load(Ordering::Relaxed) { break; } 80 if !*CLOCK_MODE && count >= *STOP_COUNT { break; } 81 } 82 83 THREADS_LEFT.fetch_sub(1, Ordering::SeqCst); 84 result.send( count ).unwrap(); 85 } 86 87 fn prep(nthreads: usize, tthreads: usize) -> Vec<Arc<Partner>> { 88 let mut thddata = Vec::with_capacity(tthreads); 89 for i in 0..tthreads { 90 let pi = (i + nthreads) % tthreads; 91 thddata.push(Arc::new(Partner{ 92 sem: sync::Semaphore::new(0), 93 next: pi, 94 })); 95 } 96 return thddata; 97 } 98 99 async fn wait(start: &Instant, is_tty: bool) { 100 loop { 101 time::sleep(Duration::from_micros(100000)).await; 102 let delta = start.elapsed(); 103 if is_tty { 104 print!(" {:.1}\r", delta.as_secs_f32()); 105 io::stdout().flush().unwrap(); 106 } 107 if *CLOCK_MODE && delta >= Duration::from_secs_f64(*DURATION) { 108 break; 109 } 110 else if !*CLOCK_MODE && THREADS_LEFT.load(Ordering::Relaxed) == 0 { 111 break; 112 } 113 } 114 } 115 37 116 fn main() { 38 117 let options = App::new("Cycle Tokio") 39 .args(&bench::args()) 118 .arg(Arg::with_name("duration") .short("d").long("duration") .takes_value(true).default_value("5").help("Duration of the experiments in seconds")) 119 .arg(Arg::with_name("iterations").short("i").long("iterations").takes_value(true).conflicts_with("duration").help("Number of iterations of the experiments")) 120 .arg(Arg::with_name("nthreads") .short("t").long("nthreads") .takes_value(true).default_value("1").help("Number of threads to use")) 121 .arg(Arg::with_name("nprocs") .short("p").long("nprocs") .takes_value(true).default_value("1").help("Number of processors to use")) 40 122 .arg(Arg::with_name("ringsize") .short("r").long("ringsize") .takes_value(true).default_value("1").help("Number of threads in a cycle")) 41 123 .get_matches(); … … 45 127 let nprocs = options.value_of("nprocs").unwrap().parse::<usize>().unwrap(); 46 128 47 let tthreads = nthreads * ring_size; 48 let exp = Arc::new(bench::BenchData::new(options, tthreads)); 129 if options.is_present("iterations") { 130 unsafe{ 131 CLOCK_MODE.init( false ); 132 STOP_COUNT.init( options.value_of("iterations").unwrap().parse::<u64>().unwrap() ); 133 } 134 } 135 else { 136 unsafe{ 137 CLOCK_MODE.init(true); 138 DURATION .init(options.value_of("duration").unwrap().parse::<f64>().unwrap()); 139 } 140 } 49 141 50 142 let s = (1000000 as u64).to_formatted_string(&Locale::en); 51 143 assert_eq!(&s, "1,000,000"); 52 144 53 let thddata : Arc<Vec<Arc<Partner>>> = Arc::new( 54 (0..tthreads).map(|i| { 55 let pi = (i + nthreads) % tthreads; 56 Arc::new(Partner{ 57 sem: sync::Semaphore::new(0), 58 next: pi, 59 }) 60 }).collect() 61 ); 145 146 let tthreads = nthreads * ring_size; 147 THREADS_LEFT.store(tthreads as u64, Ordering::SeqCst); 148 let thddata = Arc::new(prep(nthreads, tthreads)); 62 149 63 150 let mut global_counter :u64 = 0; … … 70 157 71 158 runtime.block_on(async { 72 let threads: Vec<_> = (0..tthreads).map(|i| { 73 tokio::spawn(partner_main(i, thddata.clone(), exp.clone())) 74 }).collect(); 75 println!("Starting"); 76 77 let start = Instant::now(); 78 79 for i in 0..nthreads { 80 thddata[i].sem.add_permits(1); 81 } 82 83 duration = exp.wait(&start).await; 84 85 println!("\nDone"); 86 87 for i in 0..tthreads { 88 thddata[i].sem.add_permits(1); 89 } 90 91 for t in threads { 92 global_counter += t.await.unwrap(); 159 let mut result : Vec<sync::oneshot::Receiver::<u64>> = Vec::with_capacity(tthreads); 160 { 161 let mut threads = Vec::with_capacity(tthreads); 162 for i in 0..tthreads { 163 let (s, r) = sync::oneshot::channel::<u64>(); 164 result.push(r); 165 threads.push(tokio::spawn(partner_main(s, i, thddata.clone()))); 166 } 167 println!("Starting"); 168 169 let is_tty = stdout_isatty(); 170 let start = Instant::now(); 171 172 for i in 0..nthreads { 173 thddata[i].sem.add_permits(1); 174 } 175 176 wait(&start, is_tty).await; 177 178 STOP.store(true, Ordering::SeqCst); 179 duration = start.elapsed(); 180 181 println!("\nDone"); 182 183 for i in 0..tthreads { 184 thddata[i].sem.add_permits(1); 185 } 186 187 for _ in 0..tthreads { 188 global_counter += result.pop().unwrap().await.unwrap(); 189 } 93 190 } 94 191 });
Note: See TracChangeset
for help on using the changeset viewer.