1 | #[cfg(any( |
---|
2 | feature = "sync time rt-threaded", |
---|
3 | ))] |
---|
4 | |
---|
5 | extern crate tokio; |
---|
6 | |
---|
7 | use std::io::{self, Write}; |
---|
8 | use std::sync::Arc; |
---|
9 | use std::sync::atomic::{AtomicU64, AtomicBool,Ordering}; |
---|
10 | use std::time::{Instant,Duration}; |
---|
11 | |
---|
12 | use tokio::runtime::Builder; |
---|
13 | use tokio::sync; |
---|
14 | use tokio::time; |
---|
15 | |
---|
16 | use isatty::stdout_isatty; |
---|
17 | |
---|
18 | use num_format::{Locale, ToFormattedString}; |
---|
19 | |
---|
20 | use clap::{Arg, App}; |
---|
21 | |
---|
22 | use std::cell::UnsafeCell; |
---|
23 | use std::mem::MaybeUninit; |
---|
24 | use std::ops; |
---|
25 | |
---|
26 | pub struct InitializeCell<T> { |
---|
27 | inner: UnsafeCell<MaybeUninit<T>>, |
---|
28 | } |
---|
29 | |
---|
30 | unsafe impl<T> Sync for InitializeCell<T> {} |
---|
31 | |
---|
32 | impl<T> InitializeCell<T> { |
---|
33 | pub const unsafe fn new_uninitialized() -> InitializeCell<T> { |
---|
34 | InitializeCell { |
---|
35 | inner: UnsafeCell::new(MaybeUninit::uninit()), |
---|
36 | } |
---|
37 | } |
---|
38 | pub const fn new(init: T) -> InitializeCell<T> { |
---|
39 | InitializeCell { |
---|
40 | inner: UnsafeCell::new(MaybeUninit::new(init)), |
---|
41 | } |
---|
42 | } |
---|
43 | pub unsafe fn init(&self, init: T) { |
---|
44 | (*self.inner.get()) = MaybeUninit::new(init); |
---|
45 | } |
---|
46 | } |
---|
47 | |
---|
48 | impl<T> ops::Deref for InitializeCell<T> { |
---|
49 | type Target = T; |
---|
50 | fn deref(&self) -> &T { |
---|
51 | unsafe { |
---|
52 | &*(*self.inner.get()).as_ptr() |
---|
53 | } |
---|
54 | } |
---|
55 | } |
---|
56 | |
---|
57 | static CLOCK_MODE: InitializeCell<bool> = unsafe { InitializeCell::new_uninitialized() }; |
---|
58 | static STOP_COUNT: InitializeCell<u64> = unsafe { InitializeCell::new_uninitialized() }; |
---|
59 | static DURATION: InitializeCell<f64> = unsafe { InitializeCell::new_uninitialized() }; |
---|
60 | static STOP : AtomicBool = AtomicBool::new(false); |
---|
61 | static THREADS_LEFT : AtomicU64 = AtomicU64 ::new(10); |
---|
62 | |
---|
63 | struct Partner { |
---|
64 | sem: sync::Semaphore, |
---|
65 | next: usize, |
---|
66 | } |
---|
67 | |
---|
68 | async fn partner_main(result: sync::oneshot::Sender<u64>, idx: usize, others: Arc<Vec<Arc<Partner>>> ) { |
---|
69 | let this = &others[idx]; |
---|
70 | let mut count:u64 = 0; |
---|
71 | loop { |
---|
72 | this.sem.acquire().await.forget(); |
---|
73 | others[this.next].sem.add_permits(1); |
---|
74 | count += 1; |
---|
75 | |
---|
76 | if *CLOCK_MODE && STOP.load(Ordering::Relaxed) { break; } |
---|
77 | if !*CLOCK_MODE && count >= *STOP_COUNT { break; } |
---|
78 | } |
---|
79 | |
---|
80 | THREADS_LEFT.fetch_sub(1, Ordering::SeqCst); |
---|
81 | result.send( count ).unwrap(); |
---|
82 | } |
---|
83 | |
---|
84 | fn prep(nthreads: usize, tthreads: usize) -> Vec<Arc<Partner>> { |
---|
85 | let mut thddata = Vec::with_capacity(tthreads); |
---|
86 | for i in 0..tthreads { |
---|
87 | let pi = (i + nthreads) % tthreads; |
---|
88 | thddata.push(Arc::new(Partner{ |
---|
89 | sem: sync::Semaphore::new(0), |
---|
90 | next: pi, |
---|
91 | })); |
---|
92 | } |
---|
93 | return thddata; |
---|
94 | } |
---|
95 | |
---|
96 | async fn wait(start: &Instant, is_tty: bool) { |
---|
97 | loop { |
---|
98 | time::sleep(Duration::from_micros(100000)).await; |
---|
99 | let delta = start.elapsed(); |
---|
100 | if is_tty { |
---|
101 | print!(" {:.1}\r", delta.as_secs_f32()); |
---|
102 | io::stdout().flush().unwrap(); |
---|
103 | } |
---|
104 | if *CLOCK_MODE && delta >= Duration::from_secs_f64(*DURATION) { |
---|
105 | break; |
---|
106 | } |
---|
107 | else if !*CLOCK_MODE && THREADS_LEFT.load(Ordering::Relaxed) == 0 { |
---|
108 | break; |
---|
109 | } |
---|
110 | } |
---|
111 | } |
---|
112 | |
---|
113 | fn main() { |
---|
114 | let options = App::new("Cycle Tokio") |
---|
115 | .arg(Arg::with_name("duration") .short("d").long("duration") .takes_value(true).default_value("5").help("Duration of the experiments in seconds")) |
---|
116 | .arg(Arg::with_name("iterations").short("i").long("iterations").takes_value(true).conflicts_with("duration").help("Number of iterations of the experiments")) |
---|
117 | .arg(Arg::with_name("nthreads") .short("t").long("nthreads") .takes_value(true).default_value("1").help("Number of threads to use")) |
---|
118 | .arg(Arg::with_name("nprocs") .short("p").long("nprocs") .takes_value(true).default_value("1").help("Number of processors to use")) |
---|
119 | .arg(Arg::with_name("ringsize") .short("r").long("ringsize") .takes_value(true).default_value("1").help("Number of threads in a cycle")) |
---|
120 | .get_matches(); |
---|
121 | |
---|
122 | let ring_size = options.value_of("ringsize").unwrap().parse::<usize>().unwrap(); |
---|
123 | let nthreads = options.value_of("nthreads").unwrap().parse::<usize>().unwrap(); |
---|
124 | let nprocs = options.value_of("nprocs").unwrap().parse::<usize>().unwrap(); |
---|
125 | |
---|
126 | if options.is_present("iterations") { |
---|
127 | unsafe{ |
---|
128 | CLOCK_MODE.init( false ); |
---|
129 | STOP_COUNT.init( options.value_of("iterations").unwrap().parse::<u64>().unwrap() ); |
---|
130 | } |
---|
131 | } |
---|
132 | else { |
---|
133 | unsafe{ |
---|
134 | CLOCK_MODE.init(true); |
---|
135 | DURATION .init(options.value_of("duration").unwrap().parse::<f64>().unwrap()); |
---|
136 | } |
---|
137 | } |
---|
138 | |
---|
139 | let s = (1000000 as u64).to_formatted_string(&Locale::en); |
---|
140 | assert_eq!(&s, "1,000,000"); |
---|
141 | |
---|
142 | |
---|
143 | let tthreads = nthreads * ring_size; |
---|
144 | THREADS_LEFT.store(tthreads as u64, Ordering::SeqCst); |
---|
145 | let thddata = Arc::new(prep(nthreads, tthreads)); |
---|
146 | |
---|
147 | let mut global_counter :u64 = 0; |
---|
148 | let mut duration : std::time::Duration = std::time::Duration::from_secs(0); |
---|
149 | let runtime = Builder::new_multi_thread() |
---|
150 | .worker_threads(nprocs) |
---|
151 | .enable_all() |
---|
152 | .build() |
---|
153 | .unwrap(); |
---|
154 | |
---|
155 | runtime.block_on(async { |
---|
156 | let mut result : Vec<sync::oneshot::Receiver::<u64>> = Vec::with_capacity(tthreads); |
---|
157 | { |
---|
158 | let mut threads = Vec::with_capacity(tthreads); |
---|
159 | for i in 0..tthreads { |
---|
160 | let (s, r) = sync::oneshot::channel::<u64>(); |
---|
161 | result.push(r); |
---|
162 | threads.push(tokio::spawn(partner_main(s, i, thddata.clone()))); |
---|
163 | } |
---|
164 | println!("Starting"); |
---|
165 | |
---|
166 | let is_tty = stdout_isatty(); |
---|
167 | let start = Instant::now(); |
---|
168 | |
---|
169 | for i in 0..nthreads { |
---|
170 | thddata[i].sem.add_permits(1); |
---|
171 | } |
---|
172 | |
---|
173 | wait(&start, is_tty).await; |
---|
174 | |
---|
175 | STOP.store(true, Ordering::SeqCst); |
---|
176 | duration = start.elapsed(); |
---|
177 | |
---|
178 | println!("\nDone"); |
---|
179 | |
---|
180 | for i in 0..tthreads { |
---|
181 | thddata[i].sem.add_permits(1); |
---|
182 | } |
---|
183 | |
---|
184 | for _ in 0..tthreads { |
---|
185 | global_counter += result.pop().unwrap().await.unwrap(); |
---|
186 | } |
---|
187 | } |
---|
188 | }); |
---|
189 | |
---|
190 | println!("Duration (ms) : {}", (duration.as_millis()).to_formatted_string(&Locale::en)); |
---|
191 | println!("Number of processors : {}", (nprocs).to_formatted_string(&Locale::en)); |
---|
192 | println!("Number of threads : {}", (tthreads).to_formatted_string(&Locale::en)); |
---|
193 | println!("Cycle size (# thrds) : {}", (ring_size).to_formatted_string(&Locale::en)); |
---|
194 | println!("Total Operations(ops): {:>15}", (global_counter).to_formatted_string(&Locale::en)); |
---|
195 | println!("Ops per second : {:>15}", (((global_counter as f64) / duration.as_secs() as f64) as u64).to_formatted_string(&Locale::en)); |
---|
196 | println!("ns per ops : {:>15}", ((duration.as_nanos() as f64 / global_counter as f64) as u64).to_formatted_string(&Locale::en)); |
---|
197 | println!("Ops per threads : {:>15}", (global_counter / tthreads as u64).to_formatted_string(&Locale::en)); |
---|
198 | println!("Ops per procs : {:>15}", (global_counter / nprocs as u64).to_formatted_string(&Locale::en)); |
---|
199 | println!("Ops/sec/procs : {:>15}", ((((global_counter as f64) / nprocs as f64) / duration.as_secs() as f64) as u64).to_formatted_string(&Locale::en)); |
---|
200 | println!("ns per ops/procs : {:>15}", ((duration.as_nanos() as f64 / (global_counter as f64 / nprocs as f64)) as u64).to_formatted_string(&Locale::en)); |
---|
201 | } |
---|