source: benchmark/readyQ/transfer.go@ f57f6ea0

ADT ast-experimental enum forall-pointer-decay pthread-emulation qualifiedEnum
Last change on this file since f57f6ea0 was 6dc2db9, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Change benchmarks to consistently print duration in ms.

  • Property mode set to 100644
File size: 5.0 KB
Line 
1package main
2
3import (
4 "flag"
5 "fmt"
6 "math/rand"
7 "os"
8 "runtime"
9 "sync/atomic"
10 "time"
11 "golang.org/x/text/language"
12 "golang.org/x/text/message"
13)
14
15type LeaderInfo struct {
16 id uint64
17 idx uint64
18 seed uint64
19}
20
21func __xorshift64( state * uint64 ) (uint64) {
22 x := *state
23 x ^= x << 13
24 x ^= x >> 7
25 x ^= x << 17
26 *state = x
27 return x
28}
29
30func (this * LeaderInfo) next(len uint64) {
31 n := __xorshift64( &this.seed )
32 atomic.StoreUint64( &this.id, n % len )
33}
34
35func NewLeader(size uint64) (*LeaderInfo) {
36 this := &LeaderInfo{0, 0, uint64(os.Getpid())}
37
38 r := rand.Intn(10)
39
40 for i := 0; i < r; i++ {
41 this.next( uint64(nthreads) )
42 }
43
44 return this
45}
46
47type MyThread struct {
48 id uint64
49 idx uint64
50 sem chan struct{}
51}
52
53func waitgroup(idx uint64, threads [] MyThread) {
54 start := time.Now()
55 for i := 0; i < len(threads); i++ {
56 // fmt.Fprintf(os.Stderr, "Waiting for :%d (%d)\n", threads[i].id, atomic.LoadUint64(&threads[i].idx) );
57 for atomic.LoadUint64( &threads[i].idx ) != idx {
58 // hint::spin_loop();
59 end := time.Now()
60 delta := end.Sub(start)
61 if delta.Seconds() > 5 {
62 fmt.Fprintf(os.Stderr, "Programs has been blocked for more than 5 secs")
63 os.Exit(1)
64 }
65 }
66 }
67 // debug!( "Waiting done" );
68}
69
70func wakegroup(exhaust bool, me uint64, threads [] MyThread) {
71 if !exhaust { return; }
72
73 for i := uint64(0); i < uint64(len(threads)); i++ {
74 if i != me {
75 // debug!( "Leader waking {}", i);
76 threads[i].sem <- (struct {}{})
77 }
78 }
79}
80
81func lead(exhaust bool, leader * LeaderInfo, this * MyThread, threads [] MyThread, main_sem chan struct {}) {
82 nidx := atomic.LoadUint64(&leader.idx) + 1;
83 atomic.StoreUint64(&this.idx, nidx);
84 atomic.StoreUint64(&leader.idx, nidx);
85
86 if nidx > stop_count {
87 // debug!( "Leader {} done", this.id);
88 main_sem <- (struct {}{})
89 return
90 }
91
92 // debug!( "====================\nLeader no {} : {}", nidx, this.id);
93
94 waitgroup(nidx, threads);
95
96 leader.next( uint64(len(threads)) );
97
98 wakegroup(exhaust, this.id, threads);
99
100 // debug!( "Leader no {} : {} done\n====================", nidx, this.id);
101}
102
103func waitleader(exhaust bool, leader * LeaderInfo, this * MyThread, rechecks * uint64) {
104 runtime.Gosched()
105
106 if atomic.LoadUint64(&leader.idx) == atomic.LoadUint64(&this.idx) {
107 // debug!("Waiting {} recheck", this.id);
108 *rechecks += uint64(1)
109 return
110 }
111
112 // debug!("Waiting {}", this.id);
113
114 // debug_assert!( (leader.idx.load(Ordering::Relaxed) - 1) == this.idx.load(Ordering::Relaxed) );
115 atomic.AddUint64(&this.idx, 1)
116 if exhaust {
117 // debug!("Waiting {} sem", this.id);
118 <- this.sem
119 } else {
120 // debug!("Waiting {} yield", this.id);
121 runtime.Gosched()
122 }
123
124 // debug!("Waiting {} done", this.id);
125}
126
127func transfer_main( result chan uint64, me uint64, leader * LeaderInfo, threads [] MyThread, exhaust bool, start chan struct{}, main_sem chan struct{}) {
128 // assert!( me == threads[me].id );
129
130 // debug!("Ready {}: {:p}", me, &threads[me].sem as *const sync::Semaphore);
131
132 // Wait for start
133 <- start
134
135 // debug!( "Start {}", me );
136
137 // Prepare results
138 r := uint64(0)
139
140 // Main loop
141 for true {
142 if atomic.LoadUint64(&leader.id) == me {
143 lead( exhaust, leader, &threads[me], threads, main_sem )
144 runtime.Gosched()
145 } else {
146 waitleader( exhaust, leader, &threads[me], &r )
147 }
148 if atomic.LoadUint64(&leader.idx) > stop_count { break; }
149 }
150
151 // return result
152 result <- r
153}
154
155func main() {
156 // Benchmark specific command line arguments
157 exhaustOpt := flag.Bool("e", false, "Whether or not threads that have seen the new epoch should park instead of yielding.")
158
159 // General benchmark initialization and deinitialization
160 defer bench_init()()
161
162 exhaust := *exhaustOpt;
163 if clock_mode {
164 fmt.Fprintf(os.Stderr, "Programs does not support fixed duration mode")
165 os.Exit(1)
166 }
167
168 var es string
169 if exhaust {
170 es = "with"
171 } else {
172 es = "without"
173 }
174 fmt.Printf("Running %d threads on %d processors, doing %d iterations, %s exhaustion\n", nthreads, nprocs, stop_count, es );
175
176 main_sem := make(chan struct{})
177 leader := NewLeader(uint64(nthreads))
178 barr := make(chan struct{})
179 result := make(chan uint64)
180
181 thddata := make([]MyThread, nthreads)
182 for i := range thddata {
183 thddata[i] = MyThread{ uint64(i), 0, make(chan struct {}) }
184 }
185
186 rechecks := uint64(0)
187 for i := range thddata {
188 go transfer_main(result, uint64(i), leader, thddata, exhaust, barr, main_sem)
189 }
190 fmt.Printf("Starting\n");
191
192 start := time.Now()
193 close(barr) // release barrier
194
195 <- main_sem
196
197 end := time.Now()
198 delta := end.Sub(start)
199
200 fmt.Printf("\nDone\n")
201
202 // release all the blocked threads
203 for i := range thddata {
204 close(thddata[i].sem)
205 }
206 for range thddata {
207 rechecks += <- result
208 }
209
210 p := message.NewPrinter(language.English)
211 var ws string
212 if exhaust {
213 ws = "yes"
214 } else {
215 ws = "no"
216 }
217 p.Printf("Duration (ms) : %f\n", delta.Milliseconds() )
218 p.Printf("Number of processors : %d\n", nprocs )
219 p.Printf("Number of threads : %d\n", nthreads )
220 p.Printf("Total Operations(ops) : %15d\n", stop_count )
221 p.Printf("Threads parking on wait : %s\n", ws)
222 p.Printf("Rechecking : %d\n", rechecks )
223}
Note: See TracBrowser for help on using the repository browser.