1 | #[cfg(any(
|
---|
2 | feature = "sync time rt-threaded",
|
---|
3 | ))]
|
---|
4 |
|
---|
5 | extern crate tokio;
|
---|
6 |
|
---|
7 | use std::io::{self, Write};
|
---|
8 | use std::sync::Arc;
|
---|
9 | use std::sync::atomic::{AtomicU64, AtomicBool,Ordering};
|
---|
10 | use std::time::{Instant,Duration};
|
---|
11 |
|
---|
12 | use tokio::runtime::Builder;
|
---|
13 | use tokio::sync;
|
---|
14 | use tokio::time;
|
---|
15 |
|
---|
16 | extern crate isatty;
|
---|
17 | use isatty::stdout_isatty;
|
---|
18 |
|
---|
19 | extern crate num_format;
|
---|
20 | use num_format::{Locale, ToFormattedString};
|
---|
21 |
|
---|
22 | extern crate clap;
|
---|
23 | use clap::{Arg, App};
|
---|
24 |
|
---|
25 | use std::cell::UnsafeCell;
|
---|
26 | use std::mem::MaybeUninit;
|
---|
27 | use std::ops;
|
---|
28 |
|
---|
29 | pub struct InitializeCell<T> {
|
---|
30 | inner: UnsafeCell<MaybeUninit<T>>,
|
---|
31 | }
|
---|
32 |
|
---|
33 | unsafe impl<T> Sync for InitializeCell<T> {}
|
---|
34 |
|
---|
35 | impl<T> InitializeCell<T> {
|
---|
36 | pub const unsafe fn new_uninitialized() -> InitializeCell<T> {
|
---|
37 | InitializeCell {
|
---|
38 | inner: UnsafeCell::new(MaybeUninit::uninit()),
|
---|
39 | }
|
---|
40 | }
|
---|
41 | pub const fn new(init: T) -> InitializeCell<T> {
|
---|
42 | InitializeCell {
|
---|
43 | inner: UnsafeCell::new(MaybeUninit::new(init)),
|
---|
44 | }
|
---|
45 | }
|
---|
46 | pub unsafe fn init(&self, init: T) {
|
---|
47 | (*self.inner.get()) = MaybeUninit::new(init);
|
---|
48 | }
|
---|
49 | }
|
---|
50 |
|
---|
51 | impl<T> ops::Deref for InitializeCell<T> {
|
---|
52 | type Target = T;
|
---|
53 | fn deref(&self) -> &T {
|
---|
54 | unsafe {
|
---|
55 | &*(*self.inner.get()).as_ptr()
|
---|
56 | }
|
---|
57 | }
|
---|
58 | }
|
---|
59 |
|
---|
60 | static CLOCK_MODE: InitializeCell<bool> = unsafe { InitializeCell::new_uninitialized() };
|
---|
61 | static STOP_COUNT: InitializeCell<u64> = unsafe { InitializeCell::new_uninitialized() };
|
---|
62 | static DURATION: InitializeCell<f64> = unsafe { InitializeCell::new_uninitialized() };
|
---|
63 | static STOP : AtomicBool = AtomicBool::new(false);
|
---|
64 | static THREADS_LEFT : AtomicU64 = AtomicU64 ::new(10);
|
---|
65 |
|
---|
66 | struct Partner {
|
---|
67 | sem: sync::Semaphore,
|
---|
68 | next: usize,
|
---|
69 | }
|
---|
70 |
|
---|
71 | async fn partner_main(result: sync::oneshot::Sender<u64>, idx: usize, others: Arc<Vec<Arc<Partner>>> ) {
|
---|
72 | let this = &others[idx];
|
---|
73 | let mut count:u64 = 0;
|
---|
74 | loop {
|
---|
75 | this.sem.acquire().await.forget();
|
---|
76 | others[this.next].sem.add_permits(1);
|
---|
77 | count += 1;
|
---|
78 |
|
---|
79 | if *CLOCK_MODE && STOP.load(Ordering::Relaxed) { break; }
|
---|
80 | if !*CLOCK_MODE && count >= *STOP_COUNT { break; }
|
---|
81 | }
|
---|
82 |
|
---|
83 | THREADS_LEFT.fetch_sub(1, Ordering::SeqCst);
|
---|
84 | result.send( count ).unwrap();
|
---|
85 | }
|
---|
86 |
|
---|
87 | fn prep(nthreads: usize, tthreads: usize) -> Vec<Arc<Partner>> {
|
---|
88 | let mut thddata = Vec::with_capacity(tthreads);
|
---|
89 | for i in 0..tthreads {
|
---|
90 | let pi = (i + nthreads) % tthreads;
|
---|
91 | thddata.push(Arc::new(Partner{
|
---|
92 | sem: sync::Semaphore::new(0),
|
---|
93 | next: pi,
|
---|
94 | }));
|
---|
95 | }
|
---|
96 | return thddata;
|
---|
97 | }
|
---|
98 |
|
---|
99 | async fn wait(start: &Instant, is_tty: bool) {
|
---|
100 | loop {
|
---|
101 | time::sleep(Duration::from_micros(100000)).await;
|
---|
102 | let delta = start.elapsed();
|
---|
103 | if is_tty {
|
---|
104 | print!(" {:.1}\r", delta.as_secs_f32());
|
---|
105 | io::stdout().flush().unwrap();
|
---|
106 | }
|
---|
107 | if *CLOCK_MODE && delta >= Duration::from_secs_f64(*DURATION) {
|
---|
108 | break;
|
---|
109 | }
|
---|
110 | else if !*CLOCK_MODE && THREADS_LEFT.load(Ordering::Relaxed) == 0 {
|
---|
111 | break;
|
---|
112 | }
|
---|
113 | }
|
---|
114 | }
|
---|
115 |
|
---|
116 | fn main() {
|
---|
117 | let options = App::new("Cycle Tokio")
|
---|
118 | .arg(Arg::with_name("duration") .short("d").long("duration") .takes_value(true).default_value("5").help("Duration of the experiments in seconds"))
|
---|
119 | .arg(Arg::with_name("iterations").short("i").long("iterations").takes_value(true).conflicts_with("duration").help("Number of iterations of the experiments"))
|
---|
120 | .arg(Arg::with_name("nthreads") .short("t").long("nthreads") .takes_value(true).default_value("1").help("Number of threads to use"))
|
---|
121 | .arg(Arg::with_name("nprocs") .short("p").long("nprocs") .takes_value(true).default_value("1").help("Number of processors to use"))
|
---|
122 | .arg(Arg::with_name("ringsize") .short("r").long("ringsize") .takes_value(true).default_value("1").help("Number of threads in a cycle"))
|
---|
123 | .get_matches();
|
---|
124 |
|
---|
125 | let ring_size = options.value_of("ringsize").unwrap().parse::<usize>().unwrap();
|
---|
126 | let nthreads = options.value_of("nthreads").unwrap().parse::<usize>().unwrap();
|
---|
127 | let nprocs = options.value_of("nprocs").unwrap().parse::<usize>().unwrap();
|
---|
128 |
|
---|
129 | if options.is_present("iterations") {
|
---|
130 | unsafe{
|
---|
131 | CLOCK_MODE.init( false );
|
---|
132 | STOP_COUNT.init( options.value_of("iterations").unwrap().parse::<u64>().unwrap() );
|
---|
133 | }
|
---|
134 | }
|
---|
135 | else {
|
---|
136 | unsafe{
|
---|
137 | CLOCK_MODE.init(true);
|
---|
138 | DURATION .init(options.value_of("duration").unwrap().parse::<f64>().unwrap());
|
---|
139 | }
|
---|
140 | }
|
---|
141 |
|
---|
142 | let s = (1000000 as u64).to_formatted_string(&Locale::en);
|
---|
143 | assert_eq!(&s, "1,000,000");
|
---|
144 |
|
---|
145 |
|
---|
146 | let tthreads = nthreads * ring_size;
|
---|
147 | THREADS_LEFT.store(tthreads as u64, Ordering::SeqCst);
|
---|
148 | let thddata = Arc::new(prep(nthreads, tthreads));
|
---|
149 |
|
---|
150 | let mut global_counter :u64 = 0;
|
---|
151 | let mut duration : std::time::Duration = std::time::Duration::from_secs(0);
|
---|
152 | let runtime = Builder::new_multi_thread()
|
---|
153 | .worker_threads(nprocs)
|
---|
154 | .enable_all()
|
---|
155 | .build()
|
---|
156 | .unwrap();
|
---|
157 |
|
---|
158 | runtime.block_on(async {
|
---|
159 | let mut result : Vec<sync::oneshot::Receiver::<u64>> = Vec::with_capacity(tthreads);
|
---|
160 | {
|
---|
161 | let mut threads = Vec::with_capacity(tthreads);
|
---|
162 | for i in 0..tthreads {
|
---|
163 | let (s, r) = sync::oneshot::channel::<u64>();
|
---|
164 | result.push(r);
|
---|
165 | threads.push(tokio::spawn(partner_main(s, i, thddata.clone())));
|
---|
166 | }
|
---|
167 | println!("Starting");
|
---|
168 |
|
---|
169 | let is_tty = stdout_isatty();
|
---|
170 | let start = Instant::now();
|
---|
171 |
|
---|
172 | for i in 0..nthreads {
|
---|
173 | thddata[i].sem.add_permits(1);
|
---|
174 | }
|
---|
175 |
|
---|
176 | wait(&start, is_tty).await;
|
---|
177 |
|
---|
178 | STOP.store(true, Ordering::SeqCst);
|
---|
179 | duration = start.elapsed();
|
---|
180 |
|
---|
181 | println!("\nDone");
|
---|
182 |
|
---|
183 | for i in 0..tthreads {
|
---|
184 | thddata[i].sem.add_permits(1);
|
---|
185 | }
|
---|
186 |
|
---|
187 | for _ in 0..tthreads {
|
---|
188 | global_counter += result.pop().unwrap().await.unwrap();
|
---|
189 | }
|
---|
190 | }
|
---|
191 | });
|
---|
192 |
|
---|
193 | println!("Duration (ms) : {}", (duration.as_millis()).to_formatted_string(&Locale::en));
|
---|
194 | println!("Number of processors : {}", (nprocs).to_formatted_string(&Locale::en));
|
---|
195 | println!("Number of threads : {}", (tthreads).to_formatted_string(&Locale::en));
|
---|
196 | println!("Cycle size (# thrds) : {}", (ring_size).to_formatted_string(&Locale::en));
|
---|
197 | println!("Total Operations(ops): {:>15}", (global_counter).to_formatted_string(&Locale::en));
|
---|
198 | println!("Ops per second : {:>15}", (((global_counter as f64) / duration.as_secs() as f64) as u64).to_formatted_string(&Locale::en));
|
---|
199 | println!("ns per ops : {:>15}", ((duration.as_nanos() as f64 / global_counter as f64) as u64).to_formatted_string(&Locale::en));
|
---|
200 | println!("Ops per threads : {:>15}", (global_counter / tthreads as u64).to_formatted_string(&Locale::en));
|
---|
201 | println!("Ops per procs : {:>15}", (global_counter / nprocs as u64).to_formatted_string(&Locale::en));
|
---|
202 | println!("Ops/sec/procs : {:>15}", ((((global_counter as f64) / nprocs as f64) / duration.as_secs() as f64) as u64).to_formatted_string(&Locale::en));
|
---|
203 | println!("ns per ops/procs : {:>15}", ((duration.as_nanos() as f64 / (global_counter as f64 / nprocs as f64)) as u64).to_formatted_string(&Locale::en));
|
---|
204 | }
|
---|