source: benchmark/readyQ/locality.go@ 8235415

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since 8235415 was 8235415, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

forgot to comment debug only var

  • Property mode set to 100644
File size: 4.6 KB
Line 
1package main
2
3import (
4 "context"
5 "flag"
6 "fmt"
7 "math/rand"
8 "os"
9 "sync/atomic"
10 "time"
11 "unsafe"
12 "golang.org/x/sync/semaphore"
13 "golang.org/x/text/language"
14 "golang.org/x/text/message"
15)
16
17type GoCtx struct {
18 s * semaphore.Weighted
19 d unsafe.Pointer
20 c context.Context
21 id int
22}
23
24type Spot struct {
25 ptr uintptr
26 id int
27}
28
29// Main handshake of the code
30// Single seat, first thread arriving waits
31// Next threads unblocks current one and blocks in its place
32// if share == true, exchange data in the process
33func (this * Spot) put( ctx * GoCtx, data * [] uint64, share bool) (* [] uint64, bool) {
34 new := uintptr(unsafe.Pointer(ctx))
35 // old_d := ctx.d
36 var raw uintptr
37 for true {
38 raw = this.ptr
39 if raw == uintptr(1) {
40 return nil, true
41 }
42 if atomic.CompareAndSwapUintptr(&this.ptr, raw, new) {
43 break
44 }
45 }
46
47 if raw != uintptr(0) {
48 var val *GoCtx
49 val = (*GoCtx)(unsafe.Pointer(raw))
50 if share {
51 // fmt.Printf("[%d] - %d update %d: %p -> %p\n", this.id, ctx.id, val.id, val.d, data)
52 atomic.StorePointer(&val.d, unsafe.Pointer(data))
53 }
54
55 // fmt.Printf("[%d] - %d release %d\n", this.id, ctx.id, val.id)
56 val.s.Release(1)
57 }
58
59 // fmt.Printf("[%d] - %d enter\n", this.id, ctx.id)
60 ctx.s.Acquire(ctx.c, 1)
61 ret := (* [] uint64)(atomic.LoadPointer(&ctx.d))
62 // fmt.Printf("[%d] - %d leave: %p -> %p\n", this.id, ctx.id, ret, old_d)
63
64 return ret, false
65}
66
67func (this * Spot) release() {
68 val := (*GoCtx)(unsafe.Pointer(atomic.SwapUintptr(&this.ptr, uintptr(1))))
69 if val == nil {
70 return
71 }
72
73 val.s.Release(1)
74}
75
76func __xorshift64( state * uint64 ) (uint64) {
77 x := *state
78 x ^= x << 13
79 x ^= x >> 7
80 x ^= x << 17
81 *state = x
82 return x
83}
84
85func local(result chan uint64, start chan struct{}, size uint64, cnt uint64, channels [] Spot, share bool, id int) {
86 state := rand.Uint64()
87 var my_data [] uint64
88 my_data = make([]uint64, size)
89 for i := uint64(0); i < size; i++ {
90 my_data[i] = 0
91 }
92 data := &my_data
93
94 sem := semaphore.NewWeighted(1)
95 sem.Acquire(context.Background(), 1)
96 ctx := GoCtx{sem, unsafe.Pointer(data), context.Background(), id}
97
98 count := uint64(0)
99 <- start
100 for true {
101 for i := uint64(0); i < cnt; i++ {
102 (*data)[__xorshift64(&state) % size] += 1
103 }
104
105 i := __xorshift64(&state) % uint64(len(channels))
106 var closed bool
107 data, closed = channels[i].put(&ctx, data, share)
108 count += 1
109
110 if closed { break }
111 if clock_mode && atomic.LoadInt32(&stop) == 1 { break }
112 if !clock_mode && count >= stop_count { break }
113 if uint64(len(*data)) != size {
114 panic("Data has weird size")
115 }
116 }
117
118 atomic.AddInt64(&threads_left, -1);
119 result <- count
120}
121
122func main() {
123 work_sizeOpt := flag.Uint64("w", 2 , "Number of words (uint64) per threads")
124 countOpt := flag.Uint64("c", 2 , "Number of words (uint64) to touch")
125 shareOpt := flag.Bool ("s", false, "Pass the work data to the next thread when blocking")
126
127 defer bench_init()()
128
129 size := *work_sizeOpt
130 cnt := *countOpt
131 share := *shareOpt
132
133 if ! (nthreads > nprocs) {
134 fmt.Fprintf(os.Stderr, "Must have more threads than procs\n")
135 os.Exit(1)
136 }
137
138 barrierStart := make(chan struct{})
139 threads_left = int64(nprocs)
140 result := make(chan uint64)
141 channels := make([]Spot, nthreads - nprocs)
142 for i := range channels {
143 channels[i] = Spot{uintptr(0), i}
144 }
145
146 for i := 0; i < nthreads; i++ {
147 go local(result, barrierStart, size, cnt, channels, share, i)
148 }
149 fmt.Printf("Starting\n");
150
151 atomic.StoreInt32(&stop, 0)
152 start := time.Now()
153 close(barrierStart)
154
155 wait(start, true);
156
157 atomic.StoreInt32(&stop, 1)
158 end := time.Now()
159 delta := end.Sub(start)
160
161 fmt.Printf("\nDone\n")
162
163 for i := range channels {
164 channels[i].release()
165 }
166
167 global_counter := uint64(0)
168 for i := 0; i < nthreads; i++ {
169 r := <- result
170 global_counter += r
171 fmt.Printf("%d\n", r)
172 }
173
174 p := message.NewPrinter(language.English)
175 p.Printf("Duration (ms) : %f\n", delta.Seconds());
176 p.Printf("Number of processors : %d\n", nprocs);
177 p.Printf("Number of threads : %d\n", nthreads);
178 p.Printf("Work size (64bit words): %d\n", size);
179 p.Printf("Total Operations(ops) : %15d\n", global_counter)
180 p.Printf("Ops per second : %18.2f\n", float64(global_counter) / delta.Seconds())
181 p.Printf("ns per ops : %18.2f\n", float64(delta.Nanoseconds()) / float64(global_counter))
182 p.Printf("Ops per threads : %15d\n", global_counter / uint64(nthreads))
183 p.Printf("Ops per procs : %15d\n", global_counter / uint64(nprocs))
184 p.Printf("Ops/sec/procs : %18.2f\n", (float64(global_counter) / float64(nprocs)) / delta.Seconds())
185 p.Printf("ns per ops/procs : %18.2f\n", float64(delta.Nanoseconds()) / (float64(global_counter) / float64(nprocs)))
186}
Note: See TracBrowser for help on using the repository browser.