source: benchmark/readyQ/locality.go @ aa1d13c

arm-ehjacob/cs343-translationnew-ast-unique-expr
Last change on this file since aa1d13c was aa1d13c, checked in by Thierry Delisle <tdelisle@…>, 10 months ago

Refactored code to track data and goroutine migrations

  • Property mode set to 100644
File size: 5.9 KB
Line 
1package main
2
3import (
4        "context"
5        "flag"
6        "fmt"
7        "math/rand"
8        "os"
9        "syscall"
10        "sync/atomic"
11        "time"
12        "unsafe"
13        "golang.org/x/sync/semaphore"
14        "golang.org/x/text/language"
15        "golang.org/x/text/message"
16)
17
18// ==================================================
19type MyData struct {
20        ttid int
21        id int
22        data [] uint64
23}
24
25func NewData(id int, size uint64) (*MyData) {
26        var data [] uint64
27        data = make([]uint64, size)
28        for i := uint64(0); i < size; i++ {
29                data[i] = 0
30        }
31        return &MyData{syscall.Gettid(), id, data}
32}
33
34func (this * MyData) moved( ttid int ) (uint64) {
35        if this.ttid == ttid {
36                return 0
37        }
38        this.ttid = ttid
39        return 1
40}
41
42func (this * MyData) access( idx uint64 ) {
43        this.data[idx % uint64(len(this.data))] += 1
44}
45
46// ==================================================
47type MyCtx struct {
48        s * semaphore.Weighted
49        d unsafe.Pointer
50        c context.Context
51        ttid int
52        id int
53}
54
55func NewCtx( sem * semaphore.Weighted, data * MyData, id int ) (MyCtx) {
56        return MyCtx{sem, unsafe.Pointer(data), context.Background(), syscall.Gettid(), id}
57}
58
59func (this * MyCtx) moved( ttid int ) (uint64) {
60        if this.ttid == ttid {
61                return 0
62        }
63        this.ttid = ttid
64        return 1
65}
66
67// ==================================================
68type Spot struct {
69        ptr uintptr
70        id int
71}
72
73// Main handshake of the code
74// Single seat, first thread arriving waits
75// Next threads unblocks current one and blocks in its place
76// if share == true, exchange data in the process
77func (this * Spot) put( ctx * MyCtx, data * MyData, share bool) (* MyData, bool) {
78        new := uintptr(unsafe.Pointer(ctx))
79        // old_d := ctx.d
80        var raw uintptr
81        for true {
82                raw = this.ptr
83                if raw == uintptr(1) {
84                        return nil, true
85                }
86                if atomic.CompareAndSwapUintptr(&this.ptr, raw, new) {
87                        break
88                }
89        }
90
91        if raw != uintptr(0) {
92                var val *MyCtx
93                val = (*MyCtx)(unsafe.Pointer(raw))
94                if share {
95                        // fmt.Printf("[%d] - %d update %d: %p -> %p\n", this.id, ctx.id, val.id, val.d, data)
96                        atomic.StorePointer(&val.d, unsafe.Pointer(data))
97                }
98
99                // fmt.Printf("[%d] - %d release %d\n", this.id, ctx.id, val.id)
100                val.s.Release(1)
101        }
102
103        // fmt.Printf("[%d] - %d enter\n", this.id, ctx.id)
104        ctx.s.Acquire(ctx.c, 1)
105        ret := (* MyData)(atomic.LoadPointer(&ctx.d))
106        // fmt.Printf("[%d] - %d leave: %p -> %p\n", this.id, ctx.id, ret, old_d)
107
108        return ret, false
109}
110
111func (this * Spot) release() {
112        val := (*MyCtx)(unsafe.Pointer(atomic.SwapUintptr(&this.ptr, uintptr(1))))
113        if val == nil {
114                return
115        }
116
117        val.s.Release(1)
118}
119
120// ==================================================
121type Result struct {
122        count uint64
123        gmigs uint64
124        dmigs uint64
125}
126
127func NewResult() (Result) {
128        return Result{0, 0, 0}
129}
130
131// ==================================================
132func __xorshift64( state * uint64 ) (uint64) {
133        x := *state
134        x ^= x << 13
135        x ^= x >> 7
136        x ^= x << 17
137        *state = x
138        return x
139}
140
141func work(data * MyData, cnt uint64, state * uint64) {
142        for i := uint64(0); i < cnt; i++ {
143                data.access(__xorshift64(state))
144        }
145}
146
147func local(result chan Result, start chan struct{}, size uint64, cnt uint64, channels [] Spot, share bool, id int) {
148        state := rand.Uint64()
149
150        data := NewData(id, size)
151        sem := semaphore.NewWeighted(1)
152        sem.Acquire(context.Background(), 1)
153        ctx := NewCtx(sem, data, id)
154
155        r := NewResult()
156        <- start
157        for true {
158                work(data, cnt, &state)
159
160                i := __xorshift64(&state) % uint64(len(channels))
161                var closed bool
162                data, closed = channels[i].put(&ctx, data, share)
163
164                if closed { break }
165                if  clock_mode && atomic.LoadInt32(&stop) == 1 { break }
166                if !clock_mode && r.count >= stop_count { break }
167                if uint64(len(data.data)) != size {
168                        panic("Data has weird size")
169                }
170
171                ttid := syscall.Gettid()
172                r.count += 1
173                r.gmigs += ctx .moved(ttid)
174                r.dmigs += data.moved(ttid)
175        }
176
177        atomic.AddInt64(&threads_left, -1);
178        result <- r
179}
180
181func main() {
182        work_sizeOpt := flag.Uint64("w", 2    , "Number of words (uint64) per threads")
183        countOpt     := flag.Uint64("c", 2    , "Number of words (uint64) to touch")
184        shareOpt     := flag.Bool  ("s", false, "Pass the work data to the next thread when blocking")
185
186        defer bench_init()()
187
188        size  := *work_sizeOpt
189        cnt   := *countOpt
190        share := *shareOpt
191
192        if ! (nthreads > nprocs) {
193                fmt.Fprintf(os.Stderr, "Must have more threads than procs\n")
194                os.Exit(1)
195        }
196
197        barrierStart := make(chan struct{})
198        threads_left = int64(nprocs)
199        result  := make(chan Result)
200        channels := make([]Spot, nthreads - nprocs)
201        for i := range channels {
202                channels[i] = Spot{uintptr(0), i}
203        }
204
205        for i := 0; i < nthreads; i++ {
206                go local(result, barrierStart, size, cnt, channels, share, i)
207        }
208        fmt.Printf("Starting\n");
209
210        atomic.StoreInt32(&stop, 0)
211        start := time.Now()
212        close(barrierStart)
213
214        wait(start, true);
215
216        atomic.StoreInt32(&stop, 1)
217        end := time.Now()
218        delta := end.Sub(start)
219
220        fmt.Printf("\nDone\n")
221
222        for i := range channels {
223                channels[i].release()
224        }
225
226        global_result := NewResult()
227        for i := 0; i < nthreads; i++ {
228                r := <- result
229                global_result.count += r.count
230                global_result.gmigs += r.gmigs
231                global_result.dmigs += r.dmigs
232        }
233
234        p := message.NewPrinter(language.English)
235        p.Printf("Duration (ms)          : %f\n", delta.Seconds());
236        p.Printf("Number of processors   : %d\n", nprocs);
237        p.Printf("Number of threads      : %d\n", nthreads);
238        p.Printf("Work size (64bit words): %d\n", size);
239        p.Printf("Total Operations(ops)  : %15d\n", global_result.count)
240        p.Printf("Total G Migrations     : %15d\n", global_result.gmigs)
241        p.Printf("Total D Migrations     : %15d\n", global_result.dmigs)
242        p.Printf("Ops per second         : %18.2f\n", float64(global_result.count) / delta.Seconds())
243        p.Printf("ns per ops             : %18.2f\n", float64(delta.Nanoseconds()) / float64(global_result.count))
244        p.Printf("Ops per threads        : %15d\n", global_result.count / uint64(nthreads))
245        p.Printf("Ops per procs          : %15d\n", global_result.count / uint64(nprocs))
246        p.Printf("Ops/sec/procs          : %18.2f\n", (float64(global_result.count) / float64(nprocs)) / delta.Seconds())
247        p.Printf("ns per ops/procs       : %18.2f\n", float64(delta.Nanoseconds()) / (float64(global_result.count) / float64(nprocs)))
248}
Note: See TracBrowser for help on using the repository browser.