source: benchmark/readyQ/locality.go @ 0285efe

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 0285efe was c5a98f3, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Clean-up and comments

  • Property mode set to 100644
File size: 7.8 KB
Line 
1package main
2
3import (
4        "context"
5        "flag"
6        "fmt"
7        "math/rand"
8        "os"
9        "syscall"
10        "sync/atomic"
11        "time"
12        "unsafe"
13        "golang.org/x/sync/semaphore"
14        "golang.org/x/text/language"
15        "golang.org/x/text/message"
16)
17
18// ==================================================
19type MyData struct {
20        ttid int
21        id int
22        data [] uint64
23}
24
25func NewData(id int, size uint64) (*MyData) {
26        var data [] uint64
27        data = make([]uint64, size)
28        for i := uint64(0); i < size; i++ {
29                data[i] = 0
30        }
31        return &MyData{syscall.Gettid(), id, data}
32}
33
34func (this * MyData) moved( ttid int ) (uint64) {
35        if this.ttid == ttid {
36                return 0
37        }
38        this.ttid = ttid
39        return 1
40}
41
42func (this * MyData) access( idx uint64 ) {
43        this.data[idx % uint64(len(this.data))] += 1
44}
45
46// ==================================================
47type MyCtx struct {
48        s * semaphore.Weighted
49        d unsafe.Pointer
50        c context.Context
51        ttid int
52        id int
53}
54
55func NewCtx( data * MyData, id int ) (MyCtx) {
56        r := MyCtx{semaphore.NewWeighted(1), unsafe.Pointer(data), context.Background(), syscall.Gettid(), id}
57        r.s.Acquire(context.Background(), 1)
58        return r
59}
60
61func (this * MyCtx) moved( ttid int ) (uint64) {
62        if this.ttid == ttid {
63                return 0
64        }
65        this.ttid = ttid
66        return 1
67}
68
69// ==================================================
70// Atomic object where a single thread can wait
71// May exchanges data
72type Spot struct {
73        ptr uintptr // atomic variable use fo MES
74        id int      // id for debugging
75}
76
77// Main handshake of the code
78// Single seat, first thread arriving waits
79// Next threads unblocks current one and blocks in its place
80// if share == true, exchange data in the process
81func (this * Spot) put( ctx * MyCtx, data * MyData, share bool) (* MyData, bool) {
82        new := uintptr(unsafe.Pointer(ctx))
83        // old_d := ctx.d
84
85        // Attempt to CAS our context into the seat
86        var raw uintptr
87        for true {
88                raw = this.ptr
89                if raw == uintptr(1) { // Seat is closed, return
90                        return nil, true
91                }
92                if atomic.CompareAndSwapUintptr(&this.ptr, raw, new) {
93                        break // We got the seat
94                }
95        }
96
97        // If we aren't the fist in, wake someone
98        if raw != uintptr(0) {
99                var val *MyCtx
100                val = (*MyCtx)(unsafe.Pointer(raw))
101
102                // If we are sharing, give them our data
103                if share {
104                        // fmt.Printf("[%d] - %d update %d: %p -> %p\n", this.id, ctx.id, val.id, val.d, data)
105                        atomic.StorePointer(&val.d, unsafe.Pointer(data))
106                }
107
108                // Wake them up
109                // fmt.Printf("[%d] - %d release %d\n", this.id, ctx.id, val.id)
110                val.s.Release(1)
111        }
112
113        // fmt.Printf("[%d] - %d enter\n", this.id, ctx.id)
114
115        // Block once on the seat
116        ctx.s.Acquire(ctx.c, 1)
117
118        // Someone woke us up, get the new data
119        ret := (* MyData)(atomic.LoadPointer(&ctx.d))
120        // fmt.Printf("[%d] - %d leave: %p -> %p\n", this.id, ctx.id, ret, old_d)
121
122        return ret, false
123}
124
125// Shutdown the spot
126// Wake current thread and mark seat as closed
127func (this * Spot) release() {
128        val := (*MyCtx)(unsafe.Pointer(atomic.SwapUintptr(&this.ptr, uintptr(1))))
129        if val == nil {
130                return
131        }
132
133        // Someone was there, release them
134        val.s.Release(1)
135}
136
137// ==================================================
138// Struct for result, Go doesn't support passing tuple in channels
139type Result struct {
140        count uint64
141        gmigs uint64
142        dmigs uint64
143}
144
145func NewResult() (Result) {
146        return Result{0, 0, 0}
147}
148
149// ==================================================
150// Random number generator, Go's native one is to slow and global
151func __xorshift64( state * uint64 ) (uint64) {
152        x := *state
153        x ^= x << 13
154        x ^= x >> 7
155        x ^= x << 17
156        *state = x
157        return x
158}
159
160// ==================================================
161// Do some work by accessing 'cnt' cells in the array
162func work(data * MyData, cnt uint64, state * uint64) {
163        for i := uint64(0); i < cnt; i++ {
164                data.access(__xorshift64(state))
165        }
166}
167
168// Main body of the threads
169func local(result chan Result, start chan struct{}, size uint64, cnt uint64, channels [] Spot, share bool, id int) {
170        // Initialize some data
171        state := rand.Uint64()    // RNG state
172        data := NewData(id, size) // Starting piece of data
173        ctx := NewCtx(data, id)   // Goroutine local context
174
175        // Prepare results
176        r := NewResult()
177
178        // Wait for start
179        <- start
180
181        // Main loop
182        for true {
183                // Touch our current data, write to invalidate remote cache lines
184                work(data, cnt, &state)
185
186                // Wait on a random spot
187                i := __xorshift64(&state) % uint64(len(channels))
188                var closed bool
189                data, closed = channels[i].put(&ctx, data, share)
190
191                // Check if the experiment is over
192                if closed { break }                                       // yes, spot was closed
193                if  clock_mode && atomic.LoadInt32(&stop) == 1 { break }  // yes, time's up
194                if !clock_mode && r.count >= stop_count { break }         // yes, iterations reached
195
196                // Check everything is consistent
197                if uint64(len(data.data)) != size { panic("Data has weird size") }
198
199                // write down progress and check migrations
200                ttid := syscall.Gettid()
201                r.count += 1
202                r.gmigs += ctx .moved(ttid)
203                r.dmigs += data.moved(ttid)
204        }
205
206        // Mark goroutine as done
207        atomic.AddInt64(&threads_left, -1);
208
209        // return result
210        result <- r
211}
212
213// ==================================================
214// Program main
215func main() {
216        // Benchmark specific command line arguments
217        work_sizeOpt := flag.Uint64("w", 2    , "Number of words (uint64) per threads")
218        countOpt     := flag.Uint64("c", 2    , "Number of words (uint64) to touch")
219        shareOpt     := flag.Bool  ("s", false, "Pass the work data to the next thread when blocking")
220
221        // General benchmark initialization and deinitialization
222        defer bench_init()()
223
224        // Eval command line arguments
225        size  := *work_sizeOpt
226        cnt   := *countOpt
227        share := *shareOpt
228
229        // Check params
230        if ! (nthreads > nprocs) {
231                fmt.Fprintf(os.Stderr, "Must have more threads than procs\n")
232                os.Exit(1)
233        }
234
235        // Make global data
236        barrierStart := make(chan struct{})         // Barrier used at the start
237        threads_left = int64(nprocs)                // Counter for active threads (not 'nthreads' because at all times 'nthreads - nprocs' are blocked)
238        result  := make(chan Result)                // Channel for results
239        channels := make([]Spot, nthreads - nprocs) // Number of spots
240        for i := range channels {
241                channels[i] = Spot{uintptr(0), i}     // init spots
242        }
243
244        // start the goroutines
245        for i := 0; i < nthreads; i++ {
246                go local(result, barrierStart, size, cnt, channels, share, i)
247        }
248        fmt.Printf("Starting\n");
249
250        atomic.StoreInt32(&stop, 0)
251        start := time.Now()
252        close(barrierStart) // release barrier
253
254        wait(start, true);  // general benchmark wait
255
256        atomic.StoreInt32(&stop, 1)
257        end := time.Now()
258        delta := end.Sub(start)
259
260        fmt.Printf("\nDone\n")
261
262        // release all the blocked threads
263        for i := range channels {
264                channels[i].release()
265        }
266
267        // Join and accumulate results
268        global_result := NewResult()
269        for i := 0; i < nthreads; i++ {
270                r := <- result
271                global_result.count += r.count
272                global_result.gmigs += r.gmigs
273                global_result.dmigs += r.dmigs
274        }
275
276        // Print with nice 's, i.e. 1'000'000 instead of 1000000
277        p := message.NewPrinter(language.English)
278        p.Printf("Duration (ms)          : %f\n", delta.Seconds());
279        p.Printf("Number of processors   : %d\n", nprocs);
280        p.Printf("Number of threads      : %d\n", nthreads);
281        p.Printf("Work size (64bit words): %d\n", size);
282        p.Printf("Total Operations(ops)  : %15d\n", global_result.count)
283        p.Printf("Total G Migrations     : %15d\n", global_result.gmigs)
284        p.Printf("Total D Migrations     : %15d\n", global_result.dmigs)
285        p.Printf("Ops per second         : %18.2f\n", float64(global_result.count) / delta.Seconds())
286        p.Printf("ns per ops             : %18.2f\n", float64(delta.Nanoseconds()) / float64(global_result.count))
287        p.Printf("Ops per threads        : %15d\n", global_result.count / uint64(nthreads))
288        p.Printf("Ops per procs          : %15d\n", global_result.count / uint64(nprocs))
289        p.Printf("Ops/sec/procs          : %18.2f\n", (float64(global_result.count) / float64(nprocs)) / delta.Seconds())
290        p.Printf("ns per ops/procs       : %18.2f\n", float64(delta.Nanoseconds()) / (float64(global_result.count) / float64(nprocs)))
291}
Note: See TracBrowser for help on using the repository browser.