source: benchmark/readyQ/locality.rs @ 49ce636

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 49ce636 was 720b1a9, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Implemented locality benchmark in rust

  • Property mode set to 100644
File size: 11.0 KB
Line 
1use std::io::{self, Write};
2use std::ptr;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
5use std::time::{Instant,Duration};
6use std::thread::{self, ThreadId};
7
8use tokio::runtime::Builder;
9use tokio::sync;
10use tokio::time;
11
12use clap::{Arg, App, ArgMatches};
13use isatty::stdout_isatty;
14use num_format::{Locale, ToFormattedString};
15use rand::Rng;
16
17// ==================================================
18struct BenchData {
19        clock_mode: bool,
20        stop: AtomicBool,
21        stop_count: u64,
22        duration: f64,
23        threads_left: AtomicU64,
24}
25
26impl BenchData {
27        fn new(options: ArgMatches, nthreads: usize) -> BenchData {
28                let (clock_mode, stop_count, duration) = if options.is_present("iterations") {
29                        (false,
30                        options.value_of("iterations").unwrap().parse::<u64>().unwrap(),
31                        -1.0)
32                } else {
33                        (true,
34                        std::u64::MAX,
35                        options.value_of("duration").unwrap().parse::<f64>().unwrap())
36                };
37
38                BenchData{
39                        clock_mode: clock_mode,
40                        stop: AtomicBool::new(false),
41                        stop_count: stop_count,
42                        duration: duration,
43                        threads_left: AtomicU64::new(nthreads as u64),
44                }
45        }
46}
47
48async fn wait(bench: &BenchData, start: &Instant, is_tty: bool) {
49        loop {
50                time::sleep(Duration::from_micros(100000)).await;
51                let delta = start.elapsed();
52                if is_tty {
53                        print!(" {:.1}\r", delta.as_secs_f32());
54                        io::stdout().flush().unwrap();
55                }
56                if bench.clock_mode && delta >= Duration::from_secs_f64(bench.duration)  {
57                        break;
58                }
59                else if !bench.clock_mode && bench.threads_left.load(Ordering::Relaxed) == 0 {
60                        break;
61                }
62        }
63}
64
65// ==================================================
66struct MyData {
67        data: Vec<u64>,
68        ttid: ThreadId,
69        _id: usize,
70}
71
72impl MyData {
73        fn new(id: usize, size: usize) -> MyData {
74                MyData {
75                        data: vec![0; size],
76                        ttid: thread::current().id(),
77                        _id: id,
78                }
79        }
80
81        fn moved(&mut self, ttid: ThreadId) -> u64 {
82                if self.ttid == ttid {
83                        return 0;
84                }
85                self.ttid = ttid;
86                return 1;
87        }
88
89        fn access(&mut self, idx: usize) {
90                let l = self.data.len();
91                self.data[idx % l] += 1;
92        }
93}
94
95struct MyDataPtr {
96        ptr: *mut MyData,
97}
98
99unsafe impl std::marker::Send for MyDataPtr{}
100
101// ==================================================
102struct MyCtx {
103        s: sync::Semaphore,
104        d: MyDataPtr,
105        ttid: ThreadId,
106        _id: usize,
107}
108
109impl MyCtx {
110        fn new(d: *mut MyData, id: usize) -> MyCtx {
111                MyCtx {
112                        s: sync::Semaphore::new(0),
113                        d: MyDataPtr{ ptr: d },
114                        ttid: thread::current().id(),
115                        _id: id
116                }
117        }
118
119        fn moved(&mut self, ttid: ThreadId) -> u64 {
120                if self.ttid == ttid {
121                        return 0;
122                }
123                self.ttid = ttid;
124                return 1;
125        }
126}
127// ==================================================
128// Atomic object where a single thread can wait
129// May exchanges data
130struct MySpot {
131        ptr: AtomicU64,
132        _id: usize,
133}
134
135impl MySpot {
136        fn new(id: usize) -> MySpot {
137                let r = MySpot{
138                        ptr: AtomicU64::new(0),
139                        _id: id,
140                };
141                r
142        }
143
144        fn one() -> u64 {
145                1
146        }
147
148        // Main handshake of the code
149        // Single seat, first thread arriving waits
150        // Next threads unblocks current one and blocks in its place
151        // if share == true, exchange data in the process
152        async fn put( &self, ctx: &mut MyCtx, data: MyDataPtr, share: bool) -> (*mut MyData, bool) {
153                {
154                        // Attempt to CAS our context into the seat
155                        let raw = {
156                                loop {
157                                        let expected = self.ptr.load(Ordering::Relaxed) as u64;
158                                        if expected == MySpot::one() { // Seat is closed, return
159                                                let r: *const MyData = ptr::null();
160                                                return (r as *mut MyData, true);
161                                        }
162                                        let got = self.ptr.compare_and_swap(expected, ctx as *mut MyCtx as u64, Ordering::SeqCst);
163                                        if got == expected {
164                                                break expected;// We got the seat
165                                        }
166                                }
167                        };
168
169                        // If we aren't the fist in, wake someone
170                        if raw != 0 {
171                                let val: &mut MyCtx = unsafe{ &mut *(raw as *mut MyCtx) };
172                                // If we are sharing, give them our data
173                                if share {
174                                        val.d.ptr = data.ptr;
175                                }
176
177                                // Wake them up
178                                val.s.add_permits(1);
179                        }
180                }
181
182                // Block once on the seat
183                ctx.s.acquire().await.forget();
184
185                // Someone woke us up, get the new data
186                let ret = ctx.d.ptr;
187                return (ret, false);
188        }
189
190        // Shutdown the spot
191        // Wake current thread and mark seat as closed
192        fn release(&self) {
193                let val = self.ptr.swap(MySpot::one(), Ordering::SeqCst);
194                if val == 0 {
195                        return
196                }
197
198                // Someone was there, release them
199                unsafe{ &mut *(val as *mut MyCtx) }.s.add_permits(1)
200        }
201}
202
203// ==================================================
204// Struct for result, Go doesn't support passing tuple in channels
205struct Result {
206        count: u64,
207        gmigs: u64,
208        dmigs: u64,
209}
210
211impl Result {
212        fn new() -> Result {
213                Result{ count: 0, gmigs: 0, dmigs: 0}
214        }
215
216        fn add(&mut self, o: Result) {
217                self.count += o.count;
218                self.gmigs += o.gmigs;
219                self.dmigs += o.dmigs;
220        }
221}
222
223// ==================================================
224// Random number generator, Go's native one is to slow and global
225fn __xorshift64( state: &mut u64 ) -> usize {
226        let mut x = *state;
227        x ^= x << 13;
228        x ^= x >> 7;
229        x ^= x << 17;
230        *state = x;
231        x as usize
232}
233
234// ==================================================
235// Do some work by accessing 'cnt' cells in the array
236fn work(data: &mut MyData, cnt: u64, state : &mut u64) {
237        for _ in 0..cnt {
238                data.access(__xorshift64(state))
239        }
240}
241
242async fn local(start: Arc<sync::Barrier>, idata: MyDataPtr, spots : Arc<Vec<MySpot>>, cnt: u64, share: bool, id: usize, bench: Arc<BenchData>) -> Result{
243        let mut state = rand::thread_rng().gen::<u64>();
244        let mut data = idata;
245        let mut ctx = MyCtx::new(data.ptr, id);
246        let _size = unsafe{ &mut *data.ptr }.data.len();
247
248        // Prepare results
249        let mut r = Result::new();
250
251        // Wait for start
252        start.wait().await;
253
254        // Main loop
255        loop {
256                // Touch our current data, write to invalidate remote cache lines
257                work(unsafe{ &mut *data.ptr }, cnt, &mut state);
258
259                // Wait on a random spot
260                let i = (__xorshift64(&mut state) as usize) % spots.len();
261                let closed = {
262                        let (d, c) = spots[i].put(&mut ctx, data, share).await;
263                        data = MyDataPtr{ ptr: d };
264                        c
265                };
266
267                // Check if the experiment is over
268                if closed { break }                                                   // yes, spot was closed
269                if  bench.clock_mode && bench.stop.load(Ordering::Relaxed) { break }  // yes, time's up
270                if !bench.clock_mode && r.count >= bench.stop_count { break }         // yes, iterations reached
271
272                assert_ne!(data.ptr as *const MyData, ptr::null());
273
274                let d = unsafe{ &mut *data.ptr };
275
276                // Check everything is consistent
277                debug_assert_eq!(d.data.len(), _size);
278
279                // write down progress and check migrations
280                let ttid = thread::current().id();
281                r.count += 1;
282                r.gmigs += ctx .moved(ttid);
283                r.dmigs += d.moved(ttid);
284        }
285
286        bench.threads_left.fetch_sub(1, Ordering::SeqCst);
287        r
288}
289
290
291// ==================================================
292fn main() {
293        let options = App::new("Locality Tokio")
294                .arg(Arg::with_name("duration")  .short("d").long("duration")  .takes_value(true).default_value("5").help("Duration of the experiments in seconds"))
295                .arg(Arg::with_name("iterations").short("i").long("iterations").takes_value(true).conflicts_with("duration").help("Number of iterations of the experiments"))
296                .arg(Arg::with_name("nthreads")  .short("t").long("nthreads")  .takes_value(true).default_value("1").help("Number of threads to use"))
297                .arg(Arg::with_name("nprocs")    .short("p").long("nprocs")    .takes_value(true).default_value("1").help("Number of processors to use"))
298                .arg(Arg::with_name("size")      .short("w").long("worksize")  .takes_value(true).default_value("2").help("Size of the array for each threads, in words (64bit)"))
299                .arg(Arg::with_name("work")      .short("c").long("workcnt")   .takes_value(true).default_value("2").help("Number of words to touch when working (random pick, cells can be picked more than once)"))
300                .arg(Arg::with_name("share")     .short("s").long("share")     .takes_value(true).default_value("true").help("Pass the work data to the next thread when blocking"))
301                .get_matches();
302
303        let nthreads   = options.value_of("nthreads").unwrap().parse::<usize>().unwrap();
304        let nprocs     = options.value_of("nprocs").unwrap().parse::<usize>().unwrap();
305        let wsize      = options.value_of("size").unwrap().parse::<usize>().unwrap();
306        let wcnt       = options.value_of("work").unwrap().parse::<u64>().unwrap();
307        let share      = options.value_of("share").unwrap().parse::<bool>().unwrap();
308
309        // Check params
310        if ! (nthreads > nprocs) {
311                panic!("Must have more threads than procs");
312        }
313
314        let s = (1000000 as u64).to_formatted_string(&Locale::en);
315        assert_eq!(&s, "1,000,000");
316
317        let bench = Arc::new(BenchData::new(options, nprocs));
318        let mut results = Result::new();
319
320        let mut elapsed : std::time::Duration = std::time::Duration::from_secs(0);
321
322        let mut data_arrays : Vec<MyData> = (0..nthreads).map(|i| MyData::new(i, wsize)).rev().collect();
323        let spots : Arc<Vec<MySpot>> = Arc::new((0..nthreads - nprocs).map(|i| MySpot::new(i)).rev().collect());
324        let barr = Arc::new(sync::Barrier::new(nthreads + 1));
325
326        let runtime = Builder::new_multi_thread()
327                .worker_threads(nprocs)
328                .enable_all()
329                .build()
330                .unwrap();
331
332        runtime.block_on(async
333                {
334                        let thrds: Vec<_> = (0..nthreads).map(|i| {
335                                debug_assert!(i < data_arrays.len());
336
337                                runtime.spawn(local(
338                                        barr.clone(),
339                                        MyDataPtr{ ptr: &mut data_arrays[i] },
340                                        spots.clone(),
341                                        wcnt,
342                                        share,
343                                        i,
344                                        bench.clone(),
345                                ))
346                        }).collect();
347
348
349                        println!("Starting");
350
351                        let is_tty = stdout_isatty();
352                        let start = Instant::now();
353                        barr.wait().await;
354
355                        wait(&bench, &start, is_tty).await;
356
357                        bench.stop.store(true, Ordering::SeqCst);
358                        elapsed = start.elapsed();
359
360                        println!("\nDone");
361
362                        // release all the blocked threads
363                        for s in &* spots {
364                                s.release();
365                        }
366
367                        println!("Threads released");
368
369                        // Join and accumulate results
370                        for t in thrds {
371                                results.add( t.await.unwrap() );
372                        }
373
374                        println!("Threads joined");
375                }
376        );
377
378        println!("Duration (ms)          : {}", (elapsed.as_millis()).to_formatted_string(&Locale::en));
379        println!("Number of processors   : {}", (nprocs).to_formatted_string(&Locale::en));
380        println!("Number of threads      : {}", (nthreads).to_formatted_string(&Locale::en));
381        println!("Work size (64bit words): {}", (wsize).to_formatted_string(&Locale::en));
382        println!("Total Operations(ops)  : {:>15}", (results.count).to_formatted_string(&Locale::en));
383        println!("Total G Migrations     : {:>15}", (results.gmigs).to_formatted_string(&Locale::en));
384        println!("Total D Migrations     : {:>15}", (results.dmigs).to_formatted_string(&Locale::en));
385        println!("Ops per second         : {:>15}", (((results.count as f64) / elapsed.as_secs() as f64) as u64).to_formatted_string(&Locale::en));
386        println!("ns per ops             : {:>15}", ((elapsed.as_nanos() as f64 / results.count as f64) as u64).to_formatted_string(&Locale::en));
387        println!("Ops per threads        : {:>15}", (results.count / nthreads as u64).to_formatted_string(&Locale::en));
388        println!("Ops per procs          : {:>15}", (results.count / nprocs as u64).to_formatted_string(&Locale::en));
389        println!("Ops/sec/procs          : {:>15}", ((((results.count as f64) / nprocs as f64) / elapsed.as_secs() as f64) as u64).to_formatted_string(&Locale::en));
390        println!("ns per ops/procs       : {:>15}", ((elapsed.as_nanos() as f64 / (results.count as f64 / nprocs as f64)) as u64).to_formatted_string(&Locale::en));
391}
Note: See TracBrowser for help on using the repository browser.