source: benchmark/readyQ/locality.go@ 94d93510

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast-unique-expr pthread-emulation qualifiedEnum stuck-waitfor-destruct
Last change on this file since 94d93510 was 94d93510, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Moved work out so it looks better in profiles

  • Property mode set to 100644
File size: 4.7 KB
Line 
1package main
2
3import (
4 "context"
5 "flag"
6 "fmt"
7 "math/rand"
8 "os"
9 "sync/atomic"
10 "time"
11 "unsafe"
12 "golang.org/x/sync/semaphore"
13 "golang.org/x/text/language"
14 "golang.org/x/text/message"
15)
16
17type GoCtx struct {
18 s * semaphore.Weighted
19 d unsafe.Pointer
20 c context.Context
21 id int
22}
23
24type Spot struct {
25 ptr uintptr
26 id int
27}
28
29// Main handshake of the code
30// Single seat, first thread arriving waits
31// Next threads unblocks current one and blocks in its place
32// if share == true, exchange data in the process
33func (this * Spot) put( ctx * GoCtx, data * [] uint64, share bool) (* [] uint64, bool) {
34 new := uintptr(unsafe.Pointer(ctx))
35 // old_d := ctx.d
36 var raw uintptr
37 for true {
38 raw = this.ptr
39 if raw == uintptr(1) {
40 return nil, true
41 }
42 if atomic.CompareAndSwapUintptr(&this.ptr, raw, new) {
43 break
44 }
45 }
46
47 if raw != uintptr(0) {
48 var val *GoCtx
49 val = (*GoCtx)(unsafe.Pointer(raw))
50 if share {
51 // fmt.Printf("[%d] - %d update %d: %p -> %p\n", this.id, ctx.id, val.id, val.d, data)
52 atomic.StorePointer(&val.d, unsafe.Pointer(data))
53 }
54
55 // fmt.Printf("[%d] - %d release %d\n", this.id, ctx.id, val.id)
56 val.s.Release(1)
57 }
58
59 // fmt.Printf("[%d] - %d enter\n", this.id, ctx.id)
60 ctx.s.Acquire(ctx.c, 1)
61 ret := (* [] uint64)(atomic.LoadPointer(&ctx.d))
62 // fmt.Printf("[%d] - %d leave: %p -> %p\n", this.id, ctx.id, ret, old_d)
63
64 return ret, false
65}
66
67func (this * Spot) release() {
68 val := (*GoCtx)(unsafe.Pointer(atomic.SwapUintptr(&this.ptr, uintptr(1))))
69 if val == nil {
70 return
71 }
72
73 val.s.Release(1)
74}
75
76func __xorshift64( state * uint64 ) (uint64) {
77 x := *state
78 x ^= x << 13
79 x ^= x >> 7
80 x ^= x << 17
81 *state = x
82 return x
83}
84
85func work(data * [] uint64, size uint64, cnt uint64, state * uint64) {
86 for i := uint64(0); i < cnt; i++ {
87 (*data)[__xorshift64(state) % size] += 1
88 }
89}
90
91func local(result chan uint64, start chan struct{}, size uint64, cnt uint64, channels [] Spot, share bool, id int) {
92 state := rand.Uint64()
93 var my_data [] uint64
94 my_data = make([]uint64, size)
95 for i := uint64(0); i < size; i++ {
96 my_data[i] = 0
97 }
98 data := &my_data
99
100 sem := semaphore.NewWeighted(1)
101 sem.Acquire(context.Background(), 1)
102 ctx := GoCtx{sem, unsafe.Pointer(data), context.Background(), id}
103
104 count := uint64(0)
105 <- start
106 for true {
107 work(data, size, cnt, &state)
108
109 i := __xorshift64(&state) % uint64(len(channels))
110 var closed bool
111 data, closed = channels[i].put(&ctx, data, share)
112 count += 1
113
114 if closed { break }
115 if clock_mode && atomic.LoadInt32(&stop) == 1 { break }
116 if !clock_mode && count >= stop_count { break }
117 if uint64(len(*data)) != size {
118 panic("Data has weird size")
119 }
120 }
121
122 atomic.AddInt64(&threads_left, -1);
123 result <- count
124}
125
126func main() {
127 work_sizeOpt := flag.Uint64("w", 2 , "Number of words (uint64) per threads")
128 countOpt := flag.Uint64("c", 2 , "Number of words (uint64) to touch")
129 shareOpt := flag.Bool ("s", false, "Pass the work data to the next thread when blocking")
130
131 defer bench_init()()
132
133 size := *work_sizeOpt
134 cnt := *countOpt
135 share := *shareOpt
136
137 if ! (nthreads > nprocs) {
138 fmt.Fprintf(os.Stderr, "Must have more threads than procs\n")
139 os.Exit(1)
140 }
141
142 barrierStart := make(chan struct{})
143 threads_left = int64(nprocs)
144 result := make(chan uint64)
145 channels := make([]Spot, nthreads - nprocs)
146 for i := range channels {
147 channels[i] = Spot{uintptr(0), i}
148 }
149
150 for i := 0; i < nthreads; i++ {
151 go local(result, barrierStart, size, cnt, channels, share, i)
152 }
153 fmt.Printf("Starting\n");
154
155 atomic.StoreInt32(&stop, 0)
156 start := time.Now()
157 close(barrierStart)
158
159 wait(start, true);
160
161 atomic.StoreInt32(&stop, 1)
162 end := time.Now()
163 delta := end.Sub(start)
164
165 fmt.Printf("\nDone\n")
166
167 for i := range channels {
168 channels[i].release()
169 }
170
171 global_counter := uint64(0)
172 for i := 0; i < nthreads; i++ {
173 r := <- result
174 global_counter += r
175 fmt.Printf("%d\n", r)
176 }
177
178 p := message.NewPrinter(language.English)
179 p.Printf("Duration (ms) : %f\n", delta.Seconds());
180 p.Printf("Number of processors : %d\n", nprocs);
181 p.Printf("Number of threads : %d\n", nthreads);
182 p.Printf("Work size (64bit words): %d\n", size);
183 p.Printf("Total Operations(ops) : %15d\n", global_counter)
184 p.Printf("Ops per second : %18.2f\n", float64(global_counter) / delta.Seconds())
185 p.Printf("ns per ops : %18.2f\n", float64(delta.Nanoseconds()) / float64(global_counter))
186 p.Printf("Ops per threads : %15d\n", global_counter / uint64(nthreads))
187 p.Printf("Ops per procs : %15d\n", global_counter / uint64(nprocs))
188 p.Printf("Ops/sec/procs : %18.2f\n", (float64(global_counter) / float64(nprocs)) / delta.Seconds())
189 p.Printf("ns per ops/procs : %18.2f\n", float64(delta.Nanoseconds()) / (float64(global_counter) / float64(nprocs)))
190}
Note: See TracBrowser for help on using the repository browser.