Changes in benchmark/readyQ/cycle.rs [7192145:751e2eb]
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/readyQ/cycle.rs
r7192145 r751e2eb 1 #[cfg(any(2 feature = "sync time rt-threaded",3 ))]4 5 extern crate tokio;6 7 use std::io::{self, Write};8 1 use std::sync::Arc; 9 use std::sync::atomic:: {AtomicU64, AtomicBool,Ordering};10 use std::time:: {Instant,Duration};2 use std::sync::atomic::Ordering; 3 use std::time::Instant; 11 4 12 5 use tokio::runtime::Builder; 13 6 use tokio::sync; 14 use tokio::time;15 7 16 extern crate isatty; 17 use isatty::stdout_isatty; 18 19 extern crate num_format; 8 use clap::{Arg, App}; 20 9 use num_format::{Locale, ToFormattedString}; 21 10 22 extern crate clap; 23 use clap::{Arg, App};11 #[path = "../bench.rs"] 12 mod bench; 24 13 25 use std::cell::UnsafeCell; 26 use std::mem::MaybeUninit; 27 use std::ops; 28 29 pub struct InitializeCell<T> { 30 inner: UnsafeCell<MaybeUninit<T>>, 31 } 32 33 unsafe impl<T> Sync for InitializeCell<T> {} 34 35 impl<T> InitializeCell<T> { 36 pub const unsafe fn new_uninitialized() -> InitializeCell<T> { 37 InitializeCell { 38 inner: UnsafeCell::new(MaybeUninit::uninit()), 39 } 40 } 41 pub const fn new(init: T) -> InitializeCell<T> { 42 InitializeCell { 43 inner: UnsafeCell::new(MaybeUninit::new(init)), 44 } 45 } 46 pub unsafe fn init(&self, init: T) { 47 (*self.inner.get()) = MaybeUninit::new(init); 48 } 49 } 50 51 impl<T> ops::Deref for InitializeCell<T> { 52 type Target = T; 53 fn deref(&self) -> &T { 54 unsafe { 55 &*(*self.inner.get()).as_ptr() 56 } 57 } 58 } 59 60 static CLOCK_MODE: InitializeCell<bool> = unsafe { InitializeCell::new_uninitialized() }; 61 static STOP_COUNT: InitializeCell<u64> = unsafe { InitializeCell::new_uninitialized() }; 62 static DURATION: InitializeCell<f64> = unsafe { InitializeCell::new_uninitialized() }; 63 static STOP : AtomicBool = AtomicBool::new(false); 64 static THREADS_LEFT : AtomicU64 = AtomicU64 ::new(10); 65 14 // ================================================== 66 15 struct Partner { 67 16 sem: sync::Semaphore, … … 69 18 } 70 19 71 async fn partner_main( result: sync::oneshot::Sender<u64>, idx: usize, others: Arc<Vec<Arc<Partner>>> ){20 async fn partner_main(idx: usize, others: Arc<Vec<Arc<Partner>>>, exp: Arc<bench::BenchData> ) -> u64 { 72 21 let this = &others[idx]; 73 22 let mut count:u64 = 0; … … 77 26 count += 1; 78 27 79 if *CLOCK_MODE && STOP.load(Ordering::Relaxed) { break; }80 if ! *CLOCK_MODE && count >= *STOP_COUNT{ break; }28 if exp.clock_mode && exp.stop.load(Ordering::Relaxed) { break; } 29 if !exp.clock_mode && count >= exp.stop_count { break; } 81 30 } 82 31 83 THREADS_LEFT.fetch_sub(1, Ordering::SeqCst);84 result.send( count ).unwrap();32 exp.threads_left.fetch_sub(1, Ordering::SeqCst); 33 count 85 34 } 86 35 87 fn prep(nthreads: usize, tthreads: usize) -> Vec<Arc<Partner>> { 88 let mut thddata = Vec::with_capacity(tthreads); 89 for i in 0..tthreads { 90 let pi = (i + nthreads) % tthreads; 91 thddata.push(Arc::new(Partner{ 92 sem: sync::Semaphore::new(0), 93 next: pi, 94 })); 95 } 96 return thddata; 97 } 98 99 async fn wait(start: &Instant, is_tty: bool) { 100 loop { 101 time::sleep(Duration::from_micros(100000)).await; 102 let delta = start.elapsed(); 103 if is_tty { 104 print!(" {:.1}\r", delta.as_secs_f32()); 105 io::stdout().flush().unwrap(); 106 } 107 if *CLOCK_MODE && delta >= Duration::from_secs_f64(*DURATION) { 108 break; 109 } 110 else if !*CLOCK_MODE && THREADS_LEFT.load(Ordering::Relaxed) == 0 { 111 break; 112 } 113 } 114 } 115 36 // ================================================== 116 37 fn main() { 117 38 let options = App::new("Cycle Tokio") 118 .arg(Arg::with_name("duration") .short("d").long("duration") .takes_value(true).default_value("5").help("Duration of the experiments in seconds")) 119 .arg(Arg::with_name("iterations").short("i").long("iterations").takes_value(true).conflicts_with("duration").help("Number of iterations of the experiments")) 120 .arg(Arg::with_name("nthreads") .short("t").long("nthreads") .takes_value(true).default_value("1").help("Number of threads to use")) 121 .arg(Arg::with_name("nprocs") .short("p").long("nprocs") .takes_value(true).default_value("1").help("Number of processors to use")) 39 .args(&bench::args()) 122 40 .arg(Arg::with_name("ringsize") .short("r").long("ringsize") .takes_value(true).default_value("1").help("Number of threads in a cycle")) 123 41 .get_matches(); … … 127 45 let nprocs = options.value_of("nprocs").unwrap().parse::<usize>().unwrap(); 128 46 129 if options.is_present("iterations") { 130 unsafe{ 131 CLOCK_MODE.init( false ); 132 STOP_COUNT.init( options.value_of("iterations").unwrap().parse::<u64>().unwrap() ); 133 } 134 } 135 else { 136 unsafe{ 137 CLOCK_MODE.init(true); 138 DURATION .init(options.value_of("duration").unwrap().parse::<f64>().unwrap()); 139 } 140 } 47 let tthreads = nthreads * ring_size; 48 let exp = Arc::new(bench::BenchData::new(options, tthreads)); 141 49 142 50 let s = (1000000 as u64).to_formatted_string(&Locale::en); 143 51 assert_eq!(&s, "1,000,000"); 144 52 145 146 let tthreads = nthreads * ring_size; 147 THREADS_LEFT.store(tthreads as u64, Ordering::SeqCst); 148 let thddata = Arc::new(prep(nthreads, tthreads)); 53 let thddata : Arc<Vec<Arc<Partner>>> = Arc::new( 54 (0..tthreads).map(|i| { 55 let pi = (i + nthreads) % tthreads; 56 Arc::new(Partner{ 57 sem: sync::Semaphore::new(0), 58 next: pi, 59 }) 60 }).collect() 61 ); 149 62 150 63 let mut global_counter :u64 = 0; … … 157 70 158 71 runtime.block_on(async { 159 let mut result : Vec<sync::oneshot::Receiver::<u64>> = Vec::with_capacity(tthreads); 160 { 161 let mut threads = Vec::with_capacity(tthreads); 162 for i in 0..tthreads { 163 let (s, r) = sync::oneshot::channel::<u64>(); 164 result.push(r); 165 threads.push(tokio::spawn(partner_main(s, i, thddata.clone()))); 166 } 167 println!("Starting"); 72 let threads: Vec<_> = (0..tthreads).map(|i| { 73 tokio::spawn(partner_main(i, thddata.clone(), exp.clone())) 74 }).collect(); 75 println!("Starting"); 168 76 169 let is_tty = stdout_isatty(); 170 let start = Instant::now(); 77 let start = Instant::now(); 171 78 172 173 174 79 for i in 0..nthreads { 80 thddata[i].sem.add_permits(1); 81 } 175 82 176 wait(&start, is_tty).await;83 duration = exp.wait(&start).await; 177 84 178 STOP.store(true, Ordering::SeqCst); 179 duration = start.elapsed(); 85 println!("\nDone"); 180 86 181 println!("\nDone"); 87 for i in 0..tthreads { 88 thddata[i].sem.add_permits(1); 89 } 182 90 183 for i in 0..tthreads { 184 thddata[i].sem.add_permits(1); 185 } 186 187 for _ in 0..tthreads { 188 global_counter += result.pop().unwrap().await.unwrap(); 189 } 91 for t in threads { 92 global_counter += t.await.unwrap(); 190 93 } 191 94 });
Note: See TracChangeset
for help on using the changeset viewer.