source: benchmark/readyQ/locality.go@ aa1d13c

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast-unique-expr pthread-emulation qualifiedEnum stuck-waitfor-destruct
Last change on this file since aa1d13c was aa1d13c, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Refactored code to track data and goroutine migrations

  • Property mode set to 100644
File size: 5.9 KB
Line 
1package main
2
3import (
4 "context"
5 "flag"
6 "fmt"
7 "math/rand"
8 "os"
9 "syscall"
10 "sync/atomic"
11 "time"
12 "unsafe"
13 "golang.org/x/sync/semaphore"
14 "golang.org/x/text/language"
15 "golang.org/x/text/message"
16)
17
18// ==================================================
19type MyData struct {
20 ttid int
21 id int
22 data [] uint64
23}
24
25func NewData(id int, size uint64) (*MyData) {
26 var data [] uint64
27 data = make([]uint64, size)
28 for i := uint64(0); i < size; i++ {
29 data[i] = 0
30 }
31 return &MyData{syscall.Gettid(), id, data}
32}
33
34func (this * MyData) moved( ttid int ) (uint64) {
35 if this.ttid == ttid {
36 return 0
37 }
38 this.ttid = ttid
39 return 1
40}
41
42func (this * MyData) access( idx uint64 ) {
43 this.data[idx % uint64(len(this.data))] += 1
44}
45
46// ==================================================
47type MyCtx struct {
48 s * semaphore.Weighted
49 d unsafe.Pointer
50 c context.Context
51 ttid int
52 id int
53}
54
55func NewCtx( sem * semaphore.Weighted, data * MyData, id int ) (MyCtx) {
56 return MyCtx{sem, unsafe.Pointer(data), context.Background(), syscall.Gettid(), id}
57}
58
59func (this * MyCtx) moved( ttid int ) (uint64) {
60 if this.ttid == ttid {
61 return 0
62 }
63 this.ttid = ttid
64 return 1
65}
66
67// ==================================================
68type Spot struct {
69 ptr uintptr
70 id int
71}
72
73// Main handshake of the code
74// Single seat, first thread arriving waits
75// Next threads unblocks current one and blocks in its place
76// if share == true, exchange data in the process
77func (this * Spot) put( ctx * MyCtx, data * MyData, share bool) (* MyData, bool) {
78 new := uintptr(unsafe.Pointer(ctx))
79 // old_d := ctx.d
80 var raw uintptr
81 for true {
82 raw = this.ptr
83 if raw == uintptr(1) {
84 return nil, true
85 }
86 if atomic.CompareAndSwapUintptr(&this.ptr, raw, new) {
87 break
88 }
89 }
90
91 if raw != uintptr(0) {
92 var val *MyCtx
93 val = (*MyCtx)(unsafe.Pointer(raw))
94 if share {
95 // fmt.Printf("[%d] - %d update %d: %p -> %p\n", this.id, ctx.id, val.id, val.d, data)
96 atomic.StorePointer(&val.d, unsafe.Pointer(data))
97 }
98
99 // fmt.Printf("[%d] - %d release %d\n", this.id, ctx.id, val.id)
100 val.s.Release(1)
101 }
102
103 // fmt.Printf("[%d] - %d enter\n", this.id, ctx.id)
104 ctx.s.Acquire(ctx.c, 1)
105 ret := (* MyData)(atomic.LoadPointer(&ctx.d))
106 // fmt.Printf("[%d] - %d leave: %p -> %p\n", this.id, ctx.id, ret, old_d)
107
108 return ret, false
109}
110
111func (this * Spot) release() {
112 val := (*MyCtx)(unsafe.Pointer(atomic.SwapUintptr(&this.ptr, uintptr(1))))
113 if val == nil {
114 return
115 }
116
117 val.s.Release(1)
118}
119
120// ==================================================
121type Result struct {
122 count uint64
123 gmigs uint64
124 dmigs uint64
125}
126
127func NewResult() (Result) {
128 return Result{0, 0, 0}
129}
130
131// ==================================================
132func __xorshift64( state * uint64 ) (uint64) {
133 x := *state
134 x ^= x << 13
135 x ^= x >> 7
136 x ^= x << 17
137 *state = x
138 return x
139}
140
141func work(data * MyData, cnt uint64, state * uint64) {
142 for i := uint64(0); i < cnt; i++ {
143 data.access(__xorshift64(state))
144 }
145}
146
147func local(result chan Result, start chan struct{}, size uint64, cnt uint64, channels [] Spot, share bool, id int) {
148 state := rand.Uint64()
149
150 data := NewData(id, size)
151 sem := semaphore.NewWeighted(1)
152 sem.Acquire(context.Background(), 1)
153 ctx := NewCtx(sem, data, id)
154
155 r := NewResult()
156 <- start
157 for true {
158 work(data, cnt, &state)
159
160 i := __xorshift64(&state) % uint64(len(channels))
161 var closed bool
162 data, closed = channels[i].put(&ctx, data, share)
163
164 if closed { break }
165 if clock_mode && atomic.LoadInt32(&stop) == 1 { break }
166 if !clock_mode && r.count >= stop_count { break }
167 if uint64(len(data.data)) != size {
168 panic("Data has weird size")
169 }
170
171 ttid := syscall.Gettid()
172 r.count += 1
173 r.gmigs += ctx .moved(ttid)
174 r.dmigs += data.moved(ttid)
175 }
176
177 atomic.AddInt64(&threads_left, -1);
178 result <- r
179}
180
181func main() {
182 work_sizeOpt := flag.Uint64("w", 2 , "Number of words (uint64) per threads")
183 countOpt := flag.Uint64("c", 2 , "Number of words (uint64) to touch")
184 shareOpt := flag.Bool ("s", false, "Pass the work data to the next thread when blocking")
185
186 defer bench_init()()
187
188 size := *work_sizeOpt
189 cnt := *countOpt
190 share := *shareOpt
191
192 if ! (nthreads > nprocs) {
193 fmt.Fprintf(os.Stderr, "Must have more threads than procs\n")
194 os.Exit(1)
195 }
196
197 barrierStart := make(chan struct{})
198 threads_left = int64(nprocs)
199 result := make(chan Result)
200 channels := make([]Spot, nthreads - nprocs)
201 for i := range channels {
202 channels[i] = Spot{uintptr(0), i}
203 }
204
205 for i := 0; i < nthreads; i++ {
206 go local(result, barrierStart, size, cnt, channels, share, i)
207 }
208 fmt.Printf("Starting\n");
209
210 atomic.StoreInt32(&stop, 0)
211 start := time.Now()
212 close(barrierStart)
213
214 wait(start, true);
215
216 atomic.StoreInt32(&stop, 1)
217 end := time.Now()
218 delta := end.Sub(start)
219
220 fmt.Printf("\nDone\n")
221
222 for i := range channels {
223 channels[i].release()
224 }
225
226 global_result := NewResult()
227 for i := 0; i < nthreads; i++ {
228 r := <- result
229 global_result.count += r.count
230 global_result.gmigs += r.gmigs
231 global_result.dmigs += r.dmigs
232 }
233
234 p := message.NewPrinter(language.English)
235 p.Printf("Duration (ms) : %f\n", delta.Seconds());
236 p.Printf("Number of processors : %d\n", nprocs);
237 p.Printf("Number of threads : %d\n", nthreads);
238 p.Printf("Work size (64bit words): %d\n", size);
239 p.Printf("Total Operations(ops) : %15d\n", global_result.count)
240 p.Printf("Total G Migrations : %15d\n", global_result.gmigs)
241 p.Printf("Total D Migrations : %15d\n", global_result.dmigs)
242 p.Printf("Ops per second : %18.2f\n", float64(global_result.count) / delta.Seconds())
243 p.Printf("ns per ops : %18.2f\n", float64(delta.Nanoseconds()) / float64(global_result.count))
244 p.Printf("Ops per threads : %15d\n", global_result.count / uint64(nthreads))
245 p.Printf("Ops per procs : %15d\n", global_result.count / uint64(nprocs))
246 p.Printf("Ops/sec/procs : %18.2f\n", (float64(global_result.count) / float64(nprocs)) / delta.Seconds())
247 p.Printf("ns per ops/procs : %18.2f\n", float64(delta.Nanoseconds()) / (float64(global_result.count) / float64(nprocs)))
248}
Note: See TracBrowser for help on using the repository browser.