source: benchmark/readyQ/locality.rs@ 3d19ae6

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since 3d19ae6 was 3d19ae6, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Added padding to existing locality benchmarks

  • Property mode set to 100644
File size: 9.2 KB
Line 
1use std::ptr;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::Instant;
5use std::thread::{self, ThreadId};
6
7use tokio::runtime::Builder;
8use tokio::sync;
9
10use clap::{App, Arg};
11use num_format::{Locale, ToFormattedString};
12use rand::Rng;
13
14#[path = "../bench.rs"]
15mod bench;
16
17// ==================================================
18struct MyData {
19 data: Vec<u64>,
20 ttid: ThreadId,
21 _id: usize,
22}
23
24impl MyData {
25 fn new(id: usize, size: usize) -> MyData {
26 MyData {
27 data: vec![0; size],
28 ttid: thread::current().id(),
29 _id: id,
30 }
31 }
32
33 fn moved(&mut self, ttid: ThreadId) -> u64 {
34 if self.ttid == ttid {
35 return 0;
36 }
37 self.ttid = ttid;
38 return 1;
39 }
40
41 fn access(&mut self, idx: usize) {
42 let l = self.data.len();
43 self.data[idx % l] += 1;
44 }
45}
46
47struct MyDataPtr {
48 ptr: *mut MyData,
49}
50
51unsafe impl std::marker::Send for MyDataPtr{}
52
53// ==================================================
54struct MyCtx {
55 _p1: [16]u64,
56 s: sync::Semaphore,
57 d: MyDataPtr,
58 ttid: ThreadId,
59 _id: usize,
60 _p2: [16]u64,
61}
62
63impl MyCtx {
64 fn new(d: *mut MyData, id: usize) -> MyCtx {
65 MyCtx {
66 s: sync::Semaphore::new(0),
67 d: MyDataPtr{ ptr: d },
68 ttid: thread::current().id(),
69 _id: id
70 }
71 }
72
73 fn moved(&mut self, ttid: ThreadId) -> u64 {
74 if self.ttid == ttid {
75 return 0;
76 }
77 self.ttid = ttid;
78 return 1;
79 }
80}
81// ==================================================
82// Atomic object where a single thread can wait
83// May exchanges data
84struct MySpot {
85 ptr: AtomicU64,
86 _id: usize,
87}
88
89impl MySpot {
90 fn new(id: usize) -> MySpot {
91 let r = MySpot{
92 ptr: AtomicU64::new(0),
93 _id: id,
94 };
95 r
96 }
97
98 fn one() -> u64 {
99 1
100 }
101
102 // Main handshake of the code
103 // Single seat, first thread arriving waits
104 // Next threads unblocks current one and blocks in its place
105 // if share == true, exchange data in the process
106 async fn put( &self, ctx: &mut MyCtx, data: MyDataPtr, share: bool) -> (*mut MyData, bool) {
107 {
108 // Attempt to CAS our context into the seat
109 let raw = {
110 loop {
111 let expected = self.ptr.load(Ordering::Relaxed) as u64;
112 if expected == MySpot::one() { // Seat is closed, return
113 let r: *const MyData = ptr::null();
114 return (r as *mut MyData, true);
115 }
116 let got = self.ptr.compare_and_swap(expected, ctx as *mut MyCtx as u64, Ordering::SeqCst);
117 if got == expected {
118 break expected;// We got the seat
119 }
120 }
121 };
122
123 // If we aren't the fist in, wake someone
124 if raw != 0 {
125 let val: &mut MyCtx = unsafe{ &mut *(raw as *mut MyCtx) };
126 // If we are sharing, give them our data
127 if share {
128 val.d.ptr = data.ptr;
129 }
130
131 // Wake them up
132 val.s.add_permits(1);
133 }
134 }
135
136 // Block once on the seat
137 ctx.s.acquire().await.forget();
138
139 // Someone woke us up, get the new data
140 let ret = ctx.d.ptr;
141 return (ret, false);
142 }
143
144 // Shutdown the spot
145 // Wake current thread and mark seat as closed
146 fn release(&self) {
147 let val = self.ptr.swap(MySpot::one(), Ordering::SeqCst);
148 if val == 0 {
149 return
150 }
151
152 // Someone was there, release them
153 unsafe{ &mut *(val as *mut MyCtx) }.s.add_permits(1)
154 }
155}
156
157// ==================================================
158// Struct for result, Go doesn't support passing tuple in channels
159struct Result {
160 count: u64,
161 gmigs: u64,
162 dmigs: u64,
163}
164
165impl Result {
166 fn new() -> Result {
167 Result{ count: 0, gmigs: 0, dmigs: 0}
168 }
169
170 fn add(&mut self, o: Result) {
171 self.count += o.count;
172 self.gmigs += o.gmigs;
173 self.dmigs += o.dmigs;
174 }
175}
176
177// ==================================================
178// Random number generator, Go's native one is to slow and global
179fn __xorshift64( state: &mut u64 ) -> usize {
180 let mut x = *state;
181 x ^= x << 13;
182 x ^= x >> 7;
183 x ^= x << 17;
184 *state = x;
185 x as usize
186}
187
188// ==================================================
189// Do some work by accessing 'cnt' cells in the array
190fn work(data: &mut MyData, cnt: u64, state : &mut u64) {
191 for _ in 0..cnt {
192 data.access(__xorshift64(state))
193 }
194}
195
196async fn local(start: Arc<sync::Barrier>, idata: MyDataPtr, spots : Arc<Vec<MySpot>>, cnt: u64, share: bool, id: usize, exp: Arc<bench::BenchData>) -> Result{
197 let mut state = rand::thread_rng().gen::<u64>();
198 let mut data = idata;
199 let mut ctx = MyCtx::new(data.ptr, id);
200 let _size = unsafe{ &mut *data.ptr }.data.len();
201
202 // Prepare results
203 let mut r = Result::new();
204
205 // Wait for start
206 start.wait().await;
207
208 // Main loop
209 loop {
210 // Touch our current data, write to invalidate remote cache lines
211 work(unsafe{ &mut *data.ptr }, cnt, &mut state);
212
213 // Wait on a random spot
214 let i = (__xorshift64(&mut state) as usize) % spots.len();
215 let closed = {
216 let (d, c) = spots[i].put(&mut ctx, data, share).await;
217 data = MyDataPtr{ ptr: d };
218 c
219 };
220
221 // Check if the experiment is over
222 if closed { break } // yes, spot was closed
223 if exp.clock_mode && exp.stop.load(Ordering::Relaxed) { break } // yes, time's up
224 if !exp.clock_mode && r.count >= exp.stop_count { break } // yes, iterations reached
225
226 assert_ne!(data.ptr as *const MyData, ptr::null());
227
228 let d = unsafe{ &mut *data.ptr };
229
230 // Check everything is consistent
231 debug_assert_eq!(d.data.len(), _size);
232
233 // write down progress and check migrations
234 let ttid = thread::current().id();
235 r.count += 1;
236 r.gmigs += ctx .moved(ttid);
237 r.dmigs += d.moved(ttid);
238 }
239
240 exp.threads_left.fetch_sub(1, Ordering::SeqCst);
241 r
242}
243
244
245// ==================================================
246fn main() {
247 let options = App::new("Locality Tokio")
248 .args(&bench::args())
249 .arg(Arg::with_name("size") .short("w").long("worksize").takes_value(true).default_value("2").help("Size of the array for each threads, in words (64bit)"))
250 .arg(Arg::with_name("work") .short("c").long("workcnt") .takes_value(true).default_value("2").help("Number of words to touch when working (random pick, cells can be picked more than once)"))
251 .arg(Arg::with_name("share").short("s").long("share") .takes_value(true).default_value("true").help("Pass the work data to the next thread when blocking"))
252 .get_matches();
253
254 let nthreads = options.value_of("nthreads").unwrap().parse::<usize>().unwrap();
255 let nprocs = options.value_of("nprocs").unwrap().parse::<usize>().unwrap();
256 let wsize = options.value_of("size").unwrap().parse::<usize>().unwrap();
257 let wcnt = options.value_of("work").unwrap().parse::<u64>().unwrap();
258 let share = options.value_of("share").unwrap().parse::<bool>().unwrap();
259
260 // Check params
261 if ! (nthreads > nprocs) {
262 panic!("Must have more threads than procs");
263 }
264
265 let s = (1000000 as u64).to_formatted_string(&Locale::en);
266 assert_eq!(&s, "1,000,000");
267
268 let exp = Arc::new(bench::BenchData::new(options, nprocs));
269 let mut results = Result::new();
270
271 let mut elapsed : std::time::Duration = std::time::Duration::from_secs(0);
272
273 let mut data_arrays : Vec<MyData> = (0..nthreads).map(|i| MyData::new(i, wsize)).rev().collect();
274 let spots : Arc<Vec<MySpot>> = Arc::new((0..nthreads - nprocs).map(|i| MySpot::new(i)).rev().collect());
275 let barr = Arc::new(sync::Barrier::new(nthreads + 1));
276
277 let runtime = Builder::new_multi_thread()
278 .worker_threads(nprocs)
279 .enable_all()
280 .build()
281 .unwrap();
282
283 runtime.block_on(async
284 {
285 let thrds: Vec<_> = (0..nthreads).map(|i| {
286 debug_assert!(i < data_arrays.len());
287
288 runtime.spawn(local(
289 barr.clone(),
290 MyDataPtr{ ptr: &mut data_arrays[i] },
291 spots.clone(),
292 wcnt,
293 share,
294 i,
295 exp.clone(),
296 ))
297 }).collect();
298
299
300 println!("Starting");
301
302 let start = Instant::now();
303 barr.wait().await;
304
305 elapsed = exp.wait(&start).await;
306
307 println!("\nDone");
308
309 // release all the blocked threads
310 for s in &* spots {
311 s.release();
312 }
313
314 println!("Threads released");
315
316 // Join and accumulate results
317 for t in thrds {
318 results.add( t.await.unwrap() );
319 }
320
321 println!("Threads joined");
322 }
323 );
324
325 println!("Duration (ms) : {}", (elapsed.as_millis()).to_formatted_string(&Locale::en));
326 println!("Number of processors : {}", (nprocs).to_formatted_string(&Locale::en));
327 println!("Number of threads : {}", (nthreads).to_formatted_string(&Locale::en));
328 println!("Work size (64bit words): {}", (wsize).to_formatted_string(&Locale::en));
329 println!("Total Operations(ops) : {:>15}", (results.count).to_formatted_string(&Locale::en));
330 println!("Total G Migrations : {:>15}", (results.gmigs).to_formatted_string(&Locale::en));
331 println!("Total D Migrations : {:>15}", (results.dmigs).to_formatted_string(&Locale::en));
332 println!("Ops per second : {:>15}", (((results.count as f64) / elapsed.as_secs() as f64) as u64).to_formatted_string(&Locale::en));
333 println!("ns per ops : {:>15}", ((elapsed.as_nanos() as f64 / results.count as f64) as u64).to_formatted_string(&Locale::en));
334 println!("Ops per threads : {:>15}", (results.count / nthreads as u64).to_formatted_string(&Locale::en));
335 println!("Ops per procs : {:>15}", (results.count / nprocs as u64).to_formatted_string(&Locale::en));
336 println!("Ops/sec/procs : {:>15}", ((((results.count as f64) / nprocs as f64) / elapsed.as_secs() as f64) as u64).to_formatted_string(&Locale::en));
337 println!("ns per ops/procs : {:>15}", ((elapsed.as_nanos() as f64 / (results.count as f64 / nprocs as f64)) as u64).to_formatted_string(&Locale::en));
338}
Note: See TracBrowser for help on using the repository browser.