source: benchmark/readyQ/transfer.go@ 8fd1b7c

ADT ast-experimental
Last change on this file since 8fd1b7c was aec2c022, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Clean-up the benchmarks a little

  • Property mode set to 100644
File size: 5.8 KB
Line 
1package main
2
3import (
4 "flag"
5 "fmt"
6 "math/rand"
7 "os"
8 "regexp"
9 "runtime"
10 "sync/atomic"
11 "time"
12 "golang.org/x/text/language"
13 "golang.org/x/text/message"
14)
15
16type LeaderInfo struct {
17 id uint64
18 idx uint64
19 estop uint64
20 seed uint64
21}
22
23func __xorshift64( state * uint64 ) (uint64) {
24 x := *state
25 x ^= x << 13
26 x ^= x >> 7
27 x ^= x << 17
28 *state = x
29 return x
30}
31
32func (this * LeaderInfo) next(len uint64) {
33 n := __xorshift64( &this.seed )
34 atomic.StoreUint64( &this.id, n % len )
35}
36
37func NewLeader(size uint64) (*LeaderInfo) {
38 this := &LeaderInfo{0, 0, 0, uint64(os.Getpid())}
39
40 r := rand.Intn(10)
41
42 for i := 0; i < r; i++ {
43 this.next( uint64(nthreads) )
44 }
45
46 return this
47}
48
49type MyThread struct {
50 id uint64
51 idx uint64
52 sem chan struct{}
53}
54
55func waitgroup(leader * LeaderInfo, idx uint64, threads [] MyThread, main_sem chan struct {}) {
56 start := time.Now()
57 Outer:
58 for i := 0; i < len(threads); i++ {
59 // fmt.Fprintf(os.Stderr, "Waiting for :%d (%d)\n", threads[i].id, atomic.LoadUint64(&threads[i].idx) );
60 for atomic.LoadUint64( &threads[i].idx ) != idx {
61 // hint::spin_loop();
62 end := time.Now()
63 delta := end.Sub(start)
64 if delta.Seconds() > 5 {
65 fmt.Fprintf(os.Stderr, "Programs has been blocked for more than 5 secs")
66 atomic.StoreUint64(&leader.estop, 1);
67 main_sem <- (struct {}{})
68 break Outer
69 }
70 }
71 }
72 // debug!( "Waiting done" );
73}
74
75func wakegroup(exhaust bool, me uint64, threads [] MyThread) {
76 if !exhaust { return; }
77
78 for i := uint64(0); i < uint64(len(threads)); i++ {
79 if i != me {
80 // debug!( "Leader waking {}", i);
81 defer func() {
82 if err := recover(); err != nil {
83 fmt.Fprintf(os.Stderr, "Panic occurred: %s\n", err)
84 }
85 }()
86 threads[i].sem <- (struct {}{})
87 }
88 }
89}
90
91func lead(exhaust bool, leader * LeaderInfo, this * MyThread, threads [] MyThread, main_sem chan struct {}) {
92 nidx := atomic.LoadUint64(&leader.idx) + 1;
93 atomic.StoreUint64(&this.idx, nidx);
94 atomic.StoreUint64(&leader.idx, nidx);
95
96 if nidx > stop_count || atomic.LoadUint64(&leader.estop) != 0 {
97 // debug!( "Leader {} done", this.id);
98 main_sem <- (struct {}{})
99 return
100 }
101
102 // debug!( "====================\nLeader no {} : {}", nidx, this.id);
103
104 waitgroup(leader, nidx, threads, main_sem);
105
106 leader.next( uint64(len(threads)) );
107
108 wakegroup(exhaust, this.id, threads);
109
110 // debug!( "Leader no {} : {} done\n====================", nidx, this.id);
111}
112
113func waitleader(exhaust bool, leader * LeaderInfo, this * MyThread, rechecks * uint64) {
114 runtime.Gosched()
115
116 if atomic.LoadUint64(&leader.idx) == atomic.LoadUint64(&this.idx) {
117 // debug!("Waiting {} recheck", this.id);
118 *rechecks += uint64(1)
119 return
120 }
121
122 // debug!("Waiting {}", this.id);
123
124 // debug_assert!( (leader.idx.load(Ordering::Relaxed) - 1) == this.idx.load(Ordering::Relaxed) );
125 atomic.AddUint64(&this.idx, 1)
126 if exhaust {
127 // debug!("Waiting {} sem", this.id);
128 <- this.sem
129 } else {
130 // debug!("Waiting {} yield", this.id);
131 runtime.Gosched()
132 }
133
134 // debug!("Waiting {} done", this.id);
135}
136
137func transfer_main( result chan uint64, me uint64, leader * LeaderInfo, threads [] MyThread, exhaust bool, start chan struct{}, main_sem chan struct{}) {
138 // assert!( me == threads[me].id );
139
140 // debug!("Ready {}: {:p}", me, &threads[me].sem as *const sync::Semaphore);
141
142 // Wait for start
143 <- start
144
145 // debug!( "Start {}", me );
146
147 // Prepare results
148 r := uint64(0)
149
150 // Main loop
151 for true {
152 if atomic.LoadUint64(&leader.id) == me {
153 lead( exhaust, leader, &threads[me], threads, main_sem )
154 runtime.Gosched()
155 } else {
156 waitleader( exhaust, leader, &threads[me], &r )
157 }
158 if atomic.LoadUint64(&leader.idx) > stop_count || atomic.LoadUint64(&leader.estop) != 0 { break; }
159 }
160
161 // return result
162 result <- r
163}
164
165func main() {
166 // Benchmark specific command line arguments
167 exhaustOpt := flag.String("e", "no", "Whether or not threads that have seen the new epoch should park instead of yielding.")
168
169 // General benchmark initialization and deinitialization
170 bench_init()
171
172 exhaustVal := *exhaustOpt;
173
174 var exhaust bool
175 re_yes := regexp.MustCompile("[Yy]|[Yy][Ee][Ss]")
176 re_no := regexp.MustCompile("[Nn]|[Nn][Oo]")
177 if re_yes.Match([]byte(exhaustVal)) {
178 exhaust = true
179 } else if re_no.Match([]byte(exhaustVal)) {
180 exhaust = false
181 } else {
182 fmt.Fprintf(os.Stderr, "Unrecognized exhaust(-e) option '%s'\n", exhaustVal)
183 os.Exit(1)
184 }
185
186 if clock_mode {
187 fmt.Fprintf(os.Stderr, "Programs does not support fixed duration mode\n")
188 os.Exit(1)
189 }
190
191 var es string
192 if exhaust {
193 es = "with"
194 } else {
195 es = "without"
196 }
197 fmt.Printf("Running %d threads on %d processors, doing %d iterations, %s exhaustion\n", nthreads, nprocs, stop_count, es );
198
199 main_sem := make(chan struct{})
200 leader := NewLeader(uint64(nthreads))
201 barr := make(chan struct{})
202 result := make(chan uint64)
203
204 thddata := make([]MyThread, nthreads)
205 for i := range thddata {
206 thddata[i] = MyThread{ uint64(i), 0, make(chan struct {}) }
207 }
208
209 rechecks := uint64(0)
210 for i := range thddata {
211 go transfer_main(result, uint64(i), leader, thddata, exhaust, barr, main_sem)
212 }
213 fmt.Printf("Starting\n");
214
215 start := time.Now()
216 close(barr) // release barrier
217
218 <- main_sem
219
220 end := time.Now()
221 delta := end.Sub(start)
222
223 fmt.Printf("\nDone\n")
224
225 // release all the blocked threads
226 for i := range thddata {
227 close(thddata[i].sem)
228 }
229 for range thddata {
230 rechecks += <- result
231 }
232
233 p := message.NewPrinter(language.English)
234 var ws string
235 if exhaust {
236 ws = "yes"
237 } else {
238 ws = "no"
239 }
240 p.Printf("Duration (ms) : %d\n", delta.Milliseconds() )
241 p.Printf("Number of processors : %d\n", nprocs )
242 p.Printf("Number of threads : %d\n", nthreads )
243 p.Printf("Total Operations(ops) : %15d\n", (leader.idx - 1) )
244 p.Printf("Threads parking on wait : %s\n", ws)
245 p.Printf("Rechecking : %d\n", rechecks )
246 p.Printf("ms per transfer : %f\n", float64(delta.Milliseconds()) / float64(leader.idx) )
247}
Note: See TracBrowser for help on using the repository browser.