1 | #[cfg(any( |
2 | feature = "sync time rt-threaded", |
3 | ))] |
4 | |
5 | extern crate tokio; |
6 | |
7 | use std::io::{self, Write}; |
8 | use std::sync::Arc; |
9 | use std::sync::atomic::{AtomicU64, AtomicBool,Ordering}; |
10 | use std::time::{Instant,Duration}; |
11 | |
12 | use tokio::runtime::Builder; |
13 | use tokio::sync; |
14 | use tokio::time; |
15 | |
16 | use isatty::stdout_isatty; |
17 | |
18 | use num_format::{Locale, ToFormattedString}; |
19 | |
20 | use clap::{Arg, App}; |
21 | |
22 | use std::cell::UnsafeCell; |
23 | use std::mem::MaybeUninit; |
24 | use std::ops; |
25 | |
26 | pub struct InitializeCell<T> { |
27 | inner: UnsafeCell<MaybeUninit<T>>, |
28 | } |
29 | |
30 | unsafe impl<T> Sync for InitializeCell<T> {} |
31 | |
32 | impl<T> InitializeCell<T> { |
33 | pub const unsafe fn new_uninitialized() -> InitializeCell<T> { |
34 | InitializeCell { |
35 | inner: UnsafeCell::new(MaybeUninit::uninit()), |
36 | } |
37 | } |
38 | pub const fn new(init: T) -> InitializeCell<T> { |
39 | InitializeCell { |
40 | inner: UnsafeCell::new(MaybeUninit::new(init)), |
41 | } |
42 | } |
43 | pub unsafe fn init(&self, init: T) { |
44 | (*self.inner.get()) = MaybeUninit::new(init); |
45 | } |
46 | } |
47 | |
48 | impl<T> ops::Deref for InitializeCell<T> { |
49 | type Target = T; |
50 | fn deref(&self) -> &T { |
51 | unsafe { |
52 | &*(*self.inner.get()).as_ptr() |
53 | } |
54 | } |
55 | } |
56 | |
57 | static CLOCK_MODE: InitializeCell<bool> = unsafe { InitializeCell::new_uninitialized() }; |
58 | static STOP_COUNT: InitializeCell<u64> = unsafe { InitializeCell::new_uninitialized() }; |
59 | static DURATION: InitializeCell<f64> = unsafe { InitializeCell::new_uninitialized() }; |
60 | static STOP : AtomicBool = AtomicBool::new(false); |
61 | static THREADS_LEFT : AtomicU64 = AtomicU64 ::new(10); |
62 | |
63 | struct Partner { |
64 | sem: sync::Semaphore, |
65 | next: usize, |
66 | } |
67 | |
68 | async fn partner_main(result: sync::oneshot::Sender<u64>, idx: usize, others: Arc<Vec<Arc<Partner>>> ) { |
69 | let this = &others[idx]; |
70 | let mut count:u64 = 0; |
71 | loop { |
72 | this.sem.acquire().await.forget(); |
73 | others[this.next].sem.add_permits(1); |
74 | count += 1; |
75 | |
76 | if *CLOCK_MODE && STOP.load(Ordering::Relaxed) { break; } |
77 | if !*CLOCK_MODE && count >= *STOP_COUNT { break; } |
78 | } |
79 | |
80 | THREADS_LEFT.fetch_sub(1, Ordering::SeqCst); |
81 | result.send( count ).unwrap(); |
82 | } |
83 | |
84 | fn prep(nthreads: usize, tthreads: usize) -> Vec<Arc<Partner>> { |
85 | let mut thddata = Vec::with_capacity(tthreads); |
86 | for i in 0..tthreads { |
87 | let pi = (i + nthreads) % tthreads; |
88 | thddata.push(Arc::new(Partner{ |
89 | sem: sync::Semaphore::new(0), |
90 | next: pi, |
91 | })); |
92 | } |
93 | return thddata; |
94 | } |
95 | |
96 | async fn wait(start: &Instant, is_tty: bool) { |
97 | loop { |
98 | time::sleep(Duration::from_micros(100000)).await; |
99 | let delta = start.elapsed(); |
100 | if is_tty { |
101 | print!(" {:.1}\r", delta.as_secs_f32()); |
102 | io::stdout().flush().unwrap(); |
103 | } |
104 | if *CLOCK_MODE && delta >= Duration::from_secs_f64(*DURATION) { |
105 | break; |
106 | } |
107 | else if !*CLOCK_MODE && THREADS_LEFT.load(Ordering::Relaxed) == 0 { |
108 | break; |
109 | } |
110 | } |
111 | } |
112 | |
113 | fn main() { |
114 | let options = App::new("Cycle Tokio") |
115 | .arg(Arg::with_name("duration") .short("d").long("duration") .takes_value(true).default_value("5").help("Duration of the experiments in seconds")) |
116 | .arg(Arg::with_name("iterations").short("i").long("iterations").takes_value(true).conflicts_with("duration").help("Number of iterations of the experiments")) |
117 | .arg(Arg::with_name("nthreads") .short("t").long("nthreads") .takes_value(true).default_value("1").help("Number of threads to use")) |
118 | .arg(Arg::with_name("nprocs") .short("p").long("nprocs") .takes_value(true).default_value("1").help("Number of processors to use")) |
119 | .arg(Arg::with_name("ringsize") .short("r").long("ringsize") .takes_value(true).default_value("1").help("Number of threads in a cycle")) |
120 | .get_matches(); |
121 | |
122 | let ring_size = options.value_of("ringsize").unwrap().parse::<usize>().unwrap(); |
123 | let nthreads = options.value_of("nthreads").unwrap().parse::<usize>().unwrap(); |
124 | let nprocs = options.value_of("nprocs").unwrap().parse::<usize>().unwrap(); |
125 | |
126 | if options.is_present("iterations") { |
127 | unsafe{ |
128 | CLOCK_MODE.init( false ); |
129 | STOP_COUNT.init( options.value_of("iterations").unwrap().parse::<u64>().unwrap() ); |
130 | } |
131 | } |
132 | else { |
133 | unsafe{ |
134 | CLOCK_MODE.init(true); |
135 | DURATION .init(options.value_of("duration").unwrap().parse::<f64>().unwrap()); |
136 | } |
137 | } |
138 | |
139 | let s = (1000000 as u64).to_formatted_string(&Locale::en); |
140 | assert_eq!(&s, "1,000,000"); |
141 | |
142 | |
143 | let tthreads = nthreads * ring_size; |
144 | THREADS_LEFT.store(tthreads as u64, Ordering::SeqCst); |
145 | let thddata = Arc::new(prep(nthreads, tthreads)); |
146 | |
147 | let mut global_counter :u64 = 0; |
148 | let mut duration : std::time::Duration = std::time::Duration::from_secs(0); |
149 | let runtime = Builder::new_multi_thread() |
150 | .worker_threads(nprocs) |
151 | .enable_all() |
152 | .build() |
153 | .unwrap(); |
154 | |
155 | runtime.block_on(async { |
156 | let mut result : Vec<sync::oneshot::Receiver::<u64>> = Vec::with_capacity(tthreads); |
157 | { |
158 | let mut threads = Vec::with_capacity(tthreads); |
159 | for i in 0..tthreads { |
160 | let (s, r) = sync::oneshot::channel::<u64>(); |
161 | result.push(r); |
162 | threads.push(tokio::spawn(partner_main(s, i, thddata.clone()))); |
163 | } |
164 | println!("Starting"); |
165 | |
166 | let is_tty = stdout_isatty(); |
167 | let start = Instant::now(); |
168 | |
169 | for i in 0..nthreads { |
170 | thddata[i].sem.add_permits(1); |
171 | } |
172 | |
173 | wait(&start, is_tty).await; |
174 | |
175 | STOP.store(true, Ordering::SeqCst); |
176 | duration = start.elapsed(); |
177 | |
178 | println!("\nDone"); |
179 | |
180 | for i in 0..tthreads { |
181 | thddata[i].sem.add_permits(1); |
182 | } |
183 | |
184 | for _ in 0..tthreads { |
185 | global_counter += result.pop().unwrap().await.unwrap(); |
186 | } |
187 | } |
188 | }); |
189 | |
190 | println!("Duration (ms) : {}", (duration.as_millis()).to_formatted_string(&Locale::en)); |
191 | println!("Number of processors : {}", (nprocs).to_formatted_string(&Locale::en)); |
192 | println!("Number of threads : {}", (tthreads).to_formatted_string(&Locale::en)); |
193 | println!("Cycle size (# thrds) : {}", (ring_size).to_formatted_string(&Locale::en)); |
194 | println!("Total Operations(ops): {:>15}", (global_counter).to_formatted_string(&Locale::en)); |
195 | println!("Ops per second : {:>15}", (((global_counter as f64) / duration.as_secs() as f64) as u64).to_formatted_string(&Locale::en)); |
196 | println!("ns per ops : {:>15}", ((duration.as_nanos() as f64 / global_counter as f64) as u64).to_formatted_string(&Locale::en)); |
197 | println!("Ops per threads : {:>15}", (global_counter / tthreads as u64).to_formatted_string(&Locale::en)); |
198 | println!("Ops per procs : {:>15}", (global_counter / nprocs as u64).to_formatted_string(&Locale::en)); |
199 | println!("Ops/sec/procs : {:>15}", ((((global_counter as f64) / nprocs as f64) / duration.as_secs() as f64) as u64).to_formatted_string(&Locale::en)); |
200 | println!("ns per ops/procs : {:>15}", ((duration.as_nanos() as f64 / (global_counter as f64 / nprocs as f64)) as u64).to_formatted_string(&Locale::en)); |
201 | } |