Changeset 751e2eb for benchmark/readyQ
- Timestamp:
- Dec 18, 2020, 2:53:20 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 5d369c7
- Parents:
- 49ce636
- Location:
- benchmark/readyQ
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/readyQ/cycle.rs
r49ce636 r751e2eb 1 #[cfg(any(2 feature = "sync time rt-threaded",3 ))]4 5 extern crate tokio;6 7 use std::io::{self, Write};8 1 use std::sync::Arc; 9 use std::sync::atomic:: {AtomicU64, AtomicBool,Ordering};10 use std::time:: {Instant,Duration};2 use std::sync::atomic::Ordering; 3 use std::time::Instant; 11 4 12 5 use tokio::runtime::Builder; 13 6 use tokio::sync; 14 use tokio::time;15 7 16 use isatty::stdout_isatty; 17 8 use clap::{Arg, App}; 18 9 use num_format::{Locale, ToFormattedString}; 19 10 20 use clap::{Arg, App}; 11 #[path = "../bench.rs"] 12 mod bench; 21 13 22 use std::cell::UnsafeCell; 23 use std::mem::MaybeUninit; 24 use std::ops; 25 26 pub struct InitializeCell<T> { 27 inner: UnsafeCell<MaybeUninit<T>>, 28 } 29 30 unsafe impl<T> Sync for InitializeCell<T> {} 31 32 impl<T> InitializeCell<T> { 33 pub const unsafe fn new_uninitialized() -> InitializeCell<T> { 34 InitializeCell { 35 inner: UnsafeCell::new(MaybeUninit::uninit()), 36 } 37 } 38 pub const fn new(init: T) -> InitializeCell<T> { 39 InitializeCell { 40 inner: UnsafeCell::new(MaybeUninit::new(init)), 41 } 42 } 43 pub unsafe fn init(&self, init: T) { 44 (*self.inner.get()) = MaybeUninit::new(init); 45 } 46 } 47 48 impl<T> ops::Deref for InitializeCell<T> { 49 type Target = T; 50 fn deref(&self) -> &T { 51 unsafe { 52 &*(*self.inner.get()).as_ptr() 53 } 54 } 55 } 56 57 static CLOCK_MODE: InitializeCell<bool> = unsafe { InitializeCell::new_uninitialized() }; 58 static STOP_COUNT: InitializeCell<u64> = unsafe { InitializeCell::new_uninitialized() }; 59 static DURATION: InitializeCell<f64> = unsafe { InitializeCell::new_uninitialized() }; 60 static STOP : AtomicBool = AtomicBool::new(false); 61 static THREADS_LEFT : AtomicU64 = AtomicU64 ::new(10); 62 14 // ================================================== 63 15 struct Partner { 64 16 sem: sync::Semaphore, … … 66 18 } 67 19 68 async fn partner_main( result: sync::oneshot::Sender<u64>, idx: usize, others: Arc<Vec<Arc<Partner>>> ){20 async fn partner_main(idx: usize, others: Arc<Vec<Arc<Partner>>>, exp: Arc<bench::BenchData> ) -> u64 { 69 21 let this = &others[idx]; 70 22 let mut count:u64 = 0; … … 74 26 count += 1; 75 27 76 if *CLOCK_MODE && STOP.load(Ordering::Relaxed) { break; }77 if ! *CLOCK_MODE && count >= *STOP_COUNT{ break; }28 if exp.clock_mode && exp.stop.load(Ordering::Relaxed) { break; } 29 if !exp.clock_mode && count >= exp.stop_count { break; } 78 30 } 79 31 80 THREADS_LEFT.fetch_sub(1, Ordering::SeqCst);81 result.send( count ).unwrap();32 exp.threads_left.fetch_sub(1, Ordering::SeqCst); 33 count 82 34 } 83 35 84 fn prep(nthreads: usize, tthreads: usize) -> Vec<Arc<Partner>> { 85 let mut thddata = Vec::with_capacity(tthreads); 86 for i in 0..tthreads { 87 let pi = (i + nthreads) % tthreads; 88 thddata.push(Arc::new(Partner{ 89 sem: sync::Semaphore::new(0), 90 next: pi, 91 })); 92 } 93 return thddata; 94 } 95 96 async fn wait(start: &Instant, is_tty: bool) { 97 loop { 98 time::sleep(Duration::from_micros(100000)).await; 99 let delta = start.elapsed(); 100 if is_tty { 101 print!(" {:.1}\r", delta.as_secs_f32()); 102 io::stdout().flush().unwrap(); 103 } 104 if *CLOCK_MODE && delta >= Duration::from_secs_f64(*DURATION) { 105 break; 106 } 107 else if !*CLOCK_MODE && THREADS_LEFT.load(Ordering::Relaxed) == 0 { 108 break; 109 } 110 } 111 } 112 36 // ================================================== 113 37 fn main() { 114 38 let options = App::new("Cycle Tokio") 115 .arg(Arg::with_name("duration") .short("d").long("duration") .takes_value(true).default_value("5").help("Duration of the experiments in seconds")) 116 .arg(Arg::with_name("iterations").short("i").long("iterations").takes_value(true).conflicts_with("duration").help("Number of iterations of the experiments")) 117 .arg(Arg::with_name("nthreads") .short("t").long("nthreads") .takes_value(true).default_value("1").help("Number of threads to use")) 118 .arg(Arg::with_name("nprocs") .short("p").long("nprocs") .takes_value(true).default_value("1").help("Number of processors to use")) 39 .args(&bench::args()) 119 40 .arg(Arg::with_name("ringsize") .short("r").long("ringsize") .takes_value(true).default_value("1").help("Number of threads in a cycle")) 120 41 .get_matches(); … … 124 45 let nprocs = options.value_of("nprocs").unwrap().parse::<usize>().unwrap(); 125 46 126 if options.is_present("iterations") { 127 unsafe{ 128 CLOCK_MODE.init( false ); 129 STOP_COUNT.init( options.value_of("iterations").unwrap().parse::<u64>().unwrap() ); 130 } 131 } 132 else { 133 unsafe{ 134 CLOCK_MODE.init(true); 135 DURATION .init(options.value_of("duration").unwrap().parse::<f64>().unwrap()); 136 } 137 } 47 let tthreads = nthreads * ring_size; 48 let exp = Arc::new(bench::BenchData::new(options, tthreads)); 138 49 139 50 let s = (1000000 as u64).to_formatted_string(&Locale::en); 140 51 assert_eq!(&s, "1,000,000"); 141 52 142 143 let tthreads = nthreads * ring_size; 144 THREADS_LEFT.store(tthreads as u64, Ordering::SeqCst); 145 let thddata = Arc::new(prep(nthreads, tthreads)); 53 let thddata : Arc<Vec<Arc<Partner>>> = Arc::new( 54 (0..tthreads).map(|i| { 55 let pi = (i + nthreads) % tthreads; 56 Arc::new(Partner{ 57 sem: sync::Semaphore::new(0), 58 next: pi, 59 }) 60 }).collect() 61 ); 146 62 147 63 let mut global_counter :u64 = 0; … … 154 70 155 71 runtime.block_on(async { 156 let mut result : Vec<sync::oneshot::Receiver::<u64>> = Vec::with_capacity(tthreads); 157 { 158 let mut threads = Vec::with_capacity(tthreads); 159 for i in 0..tthreads { 160 let (s, r) = sync::oneshot::channel::<u64>(); 161 result.push(r); 162 threads.push(tokio::spawn(partner_main(s, i, thddata.clone()))); 163 } 164 println!("Starting"); 72 let threads: Vec<_> = (0..tthreads).map(|i| { 73 tokio::spawn(partner_main(i, thddata.clone(), exp.clone())) 74 }).collect(); 75 println!("Starting"); 165 76 166 let is_tty = stdout_isatty(); 167 let start = Instant::now(); 77 let start = Instant::now(); 168 78 169 170 171 79 for i in 0..nthreads { 80 thddata[i].sem.add_permits(1); 81 } 172 82 173 wait(&start, is_tty).await;83 duration = exp.wait(&start).await; 174 84 175 STOP.store(true, Ordering::SeqCst); 176 duration = start.elapsed(); 85 println!("\nDone"); 177 86 178 println!("\nDone"); 87 for i in 0..tthreads { 88 thddata[i].sem.add_permits(1); 89 } 179 90 180 for i in 0..tthreads { 181 thddata[i].sem.add_permits(1); 182 } 183 184 for _ in 0..tthreads { 185 global_counter += result.pop().unwrap().await.unwrap(); 186 } 91 for t in threads { 92 global_counter += t.await.unwrap(); 187 93 } 188 94 }); -
benchmark/readyQ/locality.rs
r49ce636 r751e2eb 1 use std::io::{self, Write};2 1 use std::ptr; 3 2 use std::sync::Arc; 4 use std::sync::atomic::{AtomicU64, AtomicBool,Ordering};5 use std::time:: {Instant,Duration};3 use std::sync::atomic::{AtomicU64, Ordering}; 4 use std::time::Instant; 6 5 use std::thread::{self, ThreadId}; 7 6 8 7 use tokio::runtime::Builder; 9 8 use tokio::sync; 10 use tokio::time; 11 12 use clap::{Arg, App, ArgMatches}; 13 use isatty::stdout_isatty; 9 10 use clap::{App, Arg}; 14 11 use num_format::{Locale, ToFormattedString}; 15 12 use rand::Rng; 16 13 17 // ================================================== 18 struct BenchData { 19 clock_mode: bool, 20 stop: AtomicBool, 21 stop_count: u64, 22 duration: f64, 23 threads_left: AtomicU64, 24 } 25 26 impl BenchData { 27 fn new(options: ArgMatches, nthreads: usize) -> BenchData { 28 let (clock_mode, stop_count, duration) = if options.is_present("iterations") { 29 (false, 30 options.value_of("iterations").unwrap().parse::<u64>().unwrap(), 31 -1.0) 32 } else { 33 (true, 34 std::u64::MAX, 35 options.value_of("duration").unwrap().parse::<f64>().unwrap()) 36 }; 37 38 BenchData{ 39 clock_mode: clock_mode, 40 stop: AtomicBool::new(false), 41 stop_count: stop_count, 42 duration: duration, 43 threads_left: AtomicU64::new(nthreads as u64), 44 } 45 } 46 } 47 48 async fn wait(bench: &BenchData, start: &Instant, is_tty: bool) { 49 loop { 50 time::sleep(Duration::from_micros(100000)).await; 51 let delta = start.elapsed(); 52 if is_tty { 53 print!(" {:.1}\r", delta.as_secs_f32()); 54 io::stdout().flush().unwrap(); 55 } 56 if bench.clock_mode && delta >= Duration::from_secs_f64(bench.duration) { 57 break; 58 } 59 else if !bench.clock_mode && bench.threads_left.load(Ordering::Relaxed) == 0 { 60 break; 61 } 62 } 63 } 14 #[path = "../bench.rs"] 15 mod bench; 64 16 65 17 // ================================================== … … 240 192 } 241 193 242 async fn local(start: Arc<sync::Barrier>, idata: MyDataPtr, spots : Arc<Vec<MySpot>>, cnt: u64, share: bool, id: usize, bench: Arc<BenchData>) -> Result{194 async fn local(start: Arc<sync::Barrier>, idata: MyDataPtr, spots : Arc<Vec<MySpot>>, cnt: u64, share: bool, id: usize, exp: Arc<bench::BenchData>) -> Result{ 243 195 let mut state = rand::thread_rng().gen::<u64>(); 244 196 let mut data = idata; … … 267 219 // Check if the experiment is over 268 220 if closed { break } // yes, spot was closed 269 if bench.clock_mode && bench.stop.load(Ordering::Relaxed) { break } // yes, time's up270 if ! bench.clock_mode && r.count >= bench.stop_count { break } // yes, iterations reached221 if exp.clock_mode && exp.stop.load(Ordering::Relaxed) { break } // yes, time's up 222 if !exp.clock_mode && r.count >= exp.stop_count { break } // yes, iterations reached 271 223 272 224 assert_ne!(data.ptr as *const MyData, ptr::null()); … … 284 236 } 285 237 286 bench.threads_left.fetch_sub(1, Ordering::SeqCst);238 exp.threads_left.fetch_sub(1, Ordering::SeqCst); 287 239 r 288 240 } … … 292 244 fn main() { 293 245 let options = App::new("Locality Tokio") 294 .arg(Arg::with_name("duration") .short("d").long("duration") .takes_value(true).default_value("5").help("Duration of the experiments in seconds")) 295 .arg(Arg::with_name("iterations").short("i").long("iterations").takes_value(true).conflicts_with("duration").help("Number of iterations of the experiments")) 296 .arg(Arg::with_name("nthreads") .short("t").long("nthreads") .takes_value(true).default_value("1").help("Number of threads to use")) 297 .arg(Arg::with_name("nprocs") .short("p").long("nprocs") .takes_value(true).default_value("1").help("Number of processors to use")) 246 .args(&bench::args()) 298 247 .arg(Arg::with_name("size") .short("w").long("worksize") .takes_value(true).default_value("2").help("Size of the array for each threads, in words (64bit)")) 299 248 .arg(Arg::with_name("work") .short("c").long("workcnt") .takes_value(true).default_value("2").help("Number of words to touch when working (random pick, cells can be picked more than once)")) … … 315 264 assert_eq!(&s, "1,000,000"); 316 265 317 let bench = Arc::new(BenchData::new(options, nprocs));266 let exp = Arc::new(bench::BenchData::new(options, nprocs)); 318 267 let mut results = Result::new(); 319 268 … … 342 291 share, 343 292 i, 344 bench.clone(),293 exp.clone(), 345 294 )) 346 295 }).collect(); … … 349 298 println!("Starting"); 350 299 351 let is_tty = stdout_isatty();352 300 let start = Instant::now(); 353 301 barr.wait().await; 354 302 355 wait(&bench, &start, is_tty).await; 356 357 bench.stop.store(true, Ordering::SeqCst); 358 elapsed = start.elapsed(); 303 elapsed = exp.wait(&start).await; 359 304 360 305 println!("\nDone");
Note: See TracChangeset
for help on using the changeset viewer.