source: benchmark/readyQ/locality.rs @ 3d19ae6

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 3d19ae6 was 3d19ae6, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Added padding to existing locality benchmarks

  • Property mode set to 100644
File size: 9.2 KB
Line 
1use std::ptr;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::Instant;
5use std::thread::{self, ThreadId};
6
7use tokio::runtime::Builder;
8use tokio::sync;
9
10use clap::{App, Arg};
11use num_format::{Locale, ToFormattedString};
12use rand::Rng;
13
14#[path = "../bench.rs"]
15mod bench;
16
17// ==================================================
18struct MyData {
19        data: Vec<u64>,
20        ttid: ThreadId,
21        _id: usize,
22}
23
24impl MyData {
25        fn new(id: usize, size: usize) -> MyData {
26                MyData {
27                        data: vec![0; size],
28                        ttid: thread::current().id(),
29                        _id: id,
30                }
31        }
32
33        fn moved(&mut self, ttid: ThreadId) -> u64 {
34                if self.ttid == ttid {
35                        return 0;
36                }
37                self.ttid = ttid;
38                return 1;
39        }
40
41        fn access(&mut self, idx: usize) {
42                let l = self.data.len();
43                self.data[idx % l] += 1;
44        }
45}
46
47struct MyDataPtr {
48        ptr: *mut MyData,
49}
50
51unsafe impl std::marker::Send for MyDataPtr{}
52
53// ==================================================
54struct MyCtx {
55        _p1: [16]u64,
56        s: sync::Semaphore,
57        d: MyDataPtr,
58        ttid: ThreadId,
59        _id: usize,
60        _p2: [16]u64,
61}
62
63impl MyCtx {
64        fn new(d: *mut MyData, id: usize) -> MyCtx {
65                MyCtx {
66                        s: sync::Semaphore::new(0),
67                        d: MyDataPtr{ ptr: d },
68                        ttid: thread::current().id(),
69                        _id: id
70                }
71        }
72
73        fn moved(&mut self, ttid: ThreadId) -> u64 {
74                if self.ttid == ttid {
75                        return 0;
76                }
77                self.ttid = ttid;
78                return 1;
79        }
80}
81// ==================================================
82// Atomic object where a single thread can wait
83// May exchanges data
84struct MySpot {
85        ptr: AtomicU64,
86        _id: usize,
87}
88
89impl MySpot {
90        fn new(id: usize) -> MySpot {
91                let r = MySpot{
92                        ptr: AtomicU64::new(0),
93                        _id: id,
94                };
95                r
96        }
97
98        fn one() -> u64 {
99                1
100        }
101
102        // Main handshake of the code
103        // Single seat, first thread arriving waits
104        // Next threads unblocks current one and blocks in its place
105        // if share == true, exchange data in the process
106        async fn put( &self, ctx: &mut MyCtx, data: MyDataPtr, share: bool) -> (*mut MyData, bool) {
107                {
108                        // Attempt to CAS our context into the seat
109                        let raw = {
110                                loop {
111                                        let expected = self.ptr.load(Ordering::Relaxed) as u64;
112                                        if expected == MySpot::one() { // Seat is closed, return
113                                                let r: *const MyData = ptr::null();
114                                                return (r as *mut MyData, true);
115                                        }
116                                        let got = self.ptr.compare_and_swap(expected, ctx as *mut MyCtx as u64, Ordering::SeqCst);
117                                        if got == expected {
118                                                break expected;// We got the seat
119                                        }
120                                }
121                        };
122
123                        // If we aren't the fist in, wake someone
124                        if raw != 0 {
125                                let val: &mut MyCtx = unsafe{ &mut *(raw as *mut MyCtx) };
126                                // If we are sharing, give them our data
127                                if share {
128                                        val.d.ptr = data.ptr;
129                                }
130
131                                // Wake them up
132                                val.s.add_permits(1);
133                        }
134                }
135
136                // Block once on the seat
137                ctx.s.acquire().await.forget();
138
139                // Someone woke us up, get the new data
140                let ret = ctx.d.ptr;
141                return (ret, false);
142        }
143
144        // Shutdown the spot
145        // Wake current thread and mark seat as closed
146        fn release(&self) {
147                let val = self.ptr.swap(MySpot::one(), Ordering::SeqCst);
148                if val == 0 {
149                        return
150                }
151
152                // Someone was there, release them
153                unsafe{ &mut *(val as *mut MyCtx) }.s.add_permits(1)
154        }
155}
156
157// ==================================================
158// Struct for result, Go doesn't support passing tuple in channels
159struct Result {
160        count: u64,
161        gmigs: u64,
162        dmigs: u64,
163}
164
165impl Result {
166        fn new() -> Result {
167                Result{ count: 0, gmigs: 0, dmigs: 0}
168        }
169
170        fn add(&mut self, o: Result) {
171                self.count += o.count;
172                self.gmigs += o.gmigs;
173                self.dmigs += o.dmigs;
174        }
175}
176
177// ==================================================
178// Random number generator, Go's native one is to slow and global
179fn __xorshift64( state: &mut u64 ) -> usize {
180        let mut x = *state;
181        x ^= x << 13;
182        x ^= x >> 7;
183        x ^= x << 17;
184        *state = x;
185        x as usize
186}
187
188// ==================================================
189// Do some work by accessing 'cnt' cells in the array
190fn work(data: &mut MyData, cnt: u64, state : &mut u64) {
191        for _ in 0..cnt {
192                data.access(__xorshift64(state))
193        }
194}
195
196async fn local(start: Arc<sync::Barrier>, idata: MyDataPtr, spots : Arc<Vec<MySpot>>, cnt: u64, share: bool, id: usize, exp: Arc<bench::BenchData>) -> Result{
197        let mut state = rand::thread_rng().gen::<u64>();
198        let mut data = idata;
199        let mut ctx = MyCtx::new(data.ptr, id);
200        let _size = unsafe{ &mut *data.ptr }.data.len();
201
202        // Prepare results
203        let mut r = Result::new();
204
205        // Wait for start
206        start.wait().await;
207
208        // Main loop
209        loop {
210                // Touch our current data, write to invalidate remote cache lines
211                work(unsafe{ &mut *data.ptr }, cnt, &mut state);
212
213                // Wait on a random spot
214                let i = (__xorshift64(&mut state) as usize) % spots.len();
215                let closed = {
216                        let (d, c) = spots[i].put(&mut ctx, data, share).await;
217                        data = MyDataPtr{ ptr: d };
218                        c
219                };
220
221                // Check if the experiment is over
222                if closed { break }                                                   // yes, spot was closed
223                if  exp.clock_mode && exp.stop.load(Ordering::Relaxed) { break }  // yes, time's up
224                if !exp.clock_mode && r.count >= exp.stop_count { break }         // yes, iterations reached
225
226                assert_ne!(data.ptr as *const MyData, ptr::null());
227
228                let d = unsafe{ &mut *data.ptr };
229
230                // Check everything is consistent
231                debug_assert_eq!(d.data.len(), _size);
232
233                // write down progress and check migrations
234                let ttid = thread::current().id();
235                r.count += 1;
236                r.gmigs += ctx .moved(ttid);
237                r.dmigs += d.moved(ttid);
238        }
239
240        exp.threads_left.fetch_sub(1, Ordering::SeqCst);
241        r
242}
243
244
245// ==================================================
246fn main() {
247        let options = App::new("Locality Tokio")
248                .args(&bench::args())
249                .arg(Arg::with_name("size") .short("w").long("worksize").takes_value(true).default_value("2").help("Size of the array for each threads, in words (64bit)"))
250                .arg(Arg::with_name("work") .short("c").long("workcnt") .takes_value(true).default_value("2").help("Number of words to touch when working (random pick, cells can be picked more than once)"))
251                .arg(Arg::with_name("share").short("s").long("share")   .takes_value(true).default_value("true").help("Pass the work data to the next thread when blocking"))
252                .get_matches();
253
254        let nthreads   = options.value_of("nthreads").unwrap().parse::<usize>().unwrap();
255        let nprocs     = options.value_of("nprocs").unwrap().parse::<usize>().unwrap();
256        let wsize      = options.value_of("size").unwrap().parse::<usize>().unwrap();
257        let wcnt       = options.value_of("work").unwrap().parse::<u64>().unwrap();
258        let share      = options.value_of("share").unwrap().parse::<bool>().unwrap();
259
260        // Check params
261        if ! (nthreads > nprocs) {
262                panic!("Must have more threads than procs");
263        }
264
265        let s = (1000000 as u64).to_formatted_string(&Locale::en);
266        assert_eq!(&s, "1,000,000");
267
268        let exp = Arc::new(bench::BenchData::new(options, nprocs));
269        let mut results = Result::new();
270
271        let mut elapsed : std::time::Duration = std::time::Duration::from_secs(0);
272
273        let mut data_arrays : Vec<MyData> = (0..nthreads).map(|i| MyData::new(i, wsize)).rev().collect();
274        let spots : Arc<Vec<MySpot>> = Arc::new((0..nthreads - nprocs).map(|i| MySpot::new(i)).rev().collect());
275        let barr = Arc::new(sync::Barrier::new(nthreads + 1));
276
277        let runtime = Builder::new_multi_thread()
278                .worker_threads(nprocs)
279                .enable_all()
280                .build()
281                .unwrap();
282
283        runtime.block_on(async
284                {
285                        let thrds: Vec<_> = (0..nthreads).map(|i| {
286                                debug_assert!(i < data_arrays.len());
287
288                                runtime.spawn(local(
289                                        barr.clone(),
290                                        MyDataPtr{ ptr: &mut data_arrays[i] },
291                                        spots.clone(),
292                                        wcnt,
293                                        share,
294                                        i,
295                                        exp.clone(),
296                                ))
297                        }).collect();
298
299
300                        println!("Starting");
301
302                        let start = Instant::now();
303                        barr.wait().await;
304
305                        elapsed = exp.wait(&start).await;
306
307                        println!("\nDone");
308
309                        // release all the blocked threads
310                        for s in &* spots {
311                                s.release();
312                        }
313
314                        println!("Threads released");
315
316                        // Join and accumulate results
317                        for t in thrds {
318                                results.add( t.await.unwrap() );
319                        }
320
321                        println!("Threads joined");
322                }
323        );
324
325        println!("Duration (ms)          : {}", (elapsed.as_millis()).to_formatted_string(&Locale::en));
326        println!("Number of processors   : {}", (nprocs).to_formatted_string(&Locale::en));
327        println!("Number of threads      : {}", (nthreads).to_formatted_string(&Locale::en));
328        println!("Work size (64bit words): {}", (wsize).to_formatted_string(&Locale::en));
329        println!("Total Operations(ops)  : {:>15}", (results.count).to_formatted_string(&Locale::en));
330        println!("Total G Migrations     : {:>15}", (results.gmigs).to_formatted_string(&Locale::en));
331        println!("Total D Migrations     : {:>15}", (results.dmigs).to_formatted_string(&Locale::en));
332        println!("Ops per second         : {:>15}", (((results.count as f64) / elapsed.as_secs() as f64) as u64).to_formatted_string(&Locale::en));
333        println!("ns per ops             : {:>15}", ((elapsed.as_nanos() as f64 / results.count as f64) as u64).to_formatted_string(&Locale::en));
334        println!("Ops per threads        : {:>15}", (results.count / nthreads as u64).to_formatted_string(&Locale::en));
335        println!("Ops per procs          : {:>15}", (results.count / nprocs as u64).to_formatted_string(&Locale::en));
336        println!("Ops/sec/procs          : {:>15}", ((((results.count as f64) / nprocs as f64) / elapsed.as_secs() as f64) as u64).to_formatted_string(&Locale::en));
337        println!("ns per ops/procs       : {:>15}", ((elapsed.as_nanos() as f64 / (results.count as f64 / nprocs as f64)) as u64).to_formatted_string(&Locale::en));
338}
Note: See TracBrowser for help on using the repository browser.