source: benchmark/readyQ/transfer.go @ d3aa55e9

Last change on this file since d3aa55e9 was aec2c022, checked in by Thierry Delisle <tdelisle@…>, 2 years ago

Clean-up the benchmarks a little

  • Property mode set to 100644
File size: 5.8 KB
Line 
1package main
2
3import (
4        "flag"
5        "fmt"
6        "math/rand"
7        "os"
8        "regexp"
9        "runtime"
10        "sync/atomic"
11        "time"
12        "golang.org/x/text/language"
13        "golang.org/x/text/message"
14)
15
16type LeaderInfo struct {
17        id uint64
18        idx uint64
19        estop uint64
20        seed uint64
21}
22
23func __xorshift64( state * uint64 ) (uint64) {
24        x := *state
25        x ^= x << 13
26        x ^= x >> 7
27        x ^= x << 17
28        *state = x
29        return x
30}
31
32func (this * LeaderInfo) next(len uint64) {
33        n := __xorshift64( &this.seed )
34        atomic.StoreUint64( &this.id, n % len )
35}
36
37func NewLeader(size uint64) (*LeaderInfo) {
38        this := &LeaderInfo{0, 0, 0, uint64(os.Getpid())}
39
40        r := rand.Intn(10)
41
42        for i := 0; i < r; i++ {
43                this.next( uint64(nthreads) )
44        }
45
46        return this
47}
48
49type MyThread struct {
50        id uint64
51        idx uint64
52        sem chan struct{}
53}
54
55func waitgroup(leader * LeaderInfo, idx uint64, threads [] MyThread, main_sem chan struct {}) {
56        start := time.Now()
57        Outer:
58        for i := 0; i < len(threads); i++ {
59                // fmt.Fprintf(os.Stderr, "Waiting for :%d (%d)\n", threads[i].id, atomic.LoadUint64(&threads[i].idx) );
60                for atomic.LoadUint64( &threads[i].idx ) != idx {
61                        // hint::spin_loop();
62                        end := time.Now()
63                        delta := end.Sub(start)
64                        if delta.Seconds() > 5 {
65                                fmt.Fprintf(os.Stderr, "Programs has been blocked for more than 5 secs")
66                                atomic.StoreUint64(&leader.estop, 1);
67                                main_sem <- (struct {}{})
68                                break Outer
69                        }
70                }
71        }
72        // debug!( "Waiting done" );
73}
74
75func wakegroup(exhaust bool, me uint64, threads [] MyThread) {
76        if !exhaust { return; }
77
78        for i := uint64(0); i < uint64(len(threads)); i++ {
79                if i != me {
80                        // debug!( "Leader waking {}", i);
81                        defer func() {
82                                if err := recover(); err != nil {
83                                        fmt.Fprintf(os.Stderr, "Panic occurred: %s\n", err)
84                                }
85                        }()
86                        threads[i].sem <- (struct {}{})
87                }
88        }
89}
90
91func lead(exhaust bool, leader * LeaderInfo, this * MyThread, threads [] MyThread, main_sem chan struct {}) {
92        nidx := atomic.LoadUint64(&leader.idx) + 1;
93        atomic.StoreUint64(&this.idx, nidx);
94        atomic.StoreUint64(&leader.idx, nidx);
95
96        if nidx > stop_count || atomic.LoadUint64(&leader.estop) != 0 {
97                // debug!( "Leader {} done", this.id);
98                main_sem <- (struct {}{})
99                return
100        }
101
102        // debug!( "====================\nLeader no {} : {}", nidx, this.id);
103
104        waitgroup(leader, nidx, threads, main_sem);
105
106        leader.next( uint64(len(threads)) );
107
108        wakegroup(exhaust, this.id, threads);
109
110        // debug!( "Leader no {} : {} done\n====================", nidx, this.id);
111}
112
113func waitleader(exhaust bool, leader * LeaderInfo, this * MyThread, rechecks * uint64) {
114        runtime.Gosched()
115
116        if atomic.LoadUint64(&leader.idx) == atomic.LoadUint64(&this.idx) {
117                // debug!("Waiting {} recheck", this.id);
118                *rechecks += uint64(1)
119                return
120        }
121
122        // debug!("Waiting {}", this.id);
123
124        // debug_assert!( (leader.idx.load(Ordering::Relaxed) - 1) == this.idx.load(Ordering::Relaxed) );
125        atomic.AddUint64(&this.idx, 1)
126        if exhaust {
127                // debug!("Waiting {} sem", this.id);
128                <- this.sem
129        } else {
130                // debug!("Waiting {} yield", this.id);
131                runtime.Gosched()
132        }
133
134        // debug!("Waiting {} done", this.id);
135}
136
137func transfer_main( result chan uint64, me uint64, leader * LeaderInfo, threads [] MyThread, exhaust bool, start chan struct{}, main_sem chan struct{}) {
138        // assert!( me == threads[me].id );
139
140        // debug!("Ready {}: {:p}", me, &threads[me].sem as *const sync::Semaphore);
141
142        // Wait for start
143        <- start
144
145        // debug!( "Start {}", me );
146
147        // Prepare results
148        r := uint64(0)
149
150        // Main loop
151        for true {
152                if atomic.LoadUint64(&leader.id) == me {
153                        lead( exhaust, leader, &threads[me], threads, main_sem )
154                        runtime.Gosched()
155                } else {
156                        waitleader( exhaust, leader, &threads[me], &r )
157                }
158                if atomic.LoadUint64(&leader.idx) > stop_count || atomic.LoadUint64(&leader.estop) != 0 { break; }
159        }
160
161        // return result
162        result <- r
163}
164
165func main() {
166        // Benchmark specific command line arguments
167        exhaustOpt := flag.String("e", "no", "Whether or not threads that have seen the new epoch should park instead of yielding.")
168
169        // General benchmark initialization and deinitialization
170        bench_init()
171
172        exhaustVal := *exhaustOpt;
173
174        var exhaust bool
175        re_yes := regexp.MustCompile("[Yy]|[Yy][Ee][Ss]")
176        re_no  := regexp.MustCompile("[Nn]|[Nn][Oo]")
177        if re_yes.Match([]byte(exhaustVal)) {
178                exhaust = true
179        } else if re_no.Match([]byte(exhaustVal)) {
180                exhaust = false
181        } else {
182                fmt.Fprintf(os.Stderr, "Unrecognized exhaust(-e) option '%s'\n", exhaustVal)
183                os.Exit(1)
184        }
185
186        if clock_mode {
187                fmt.Fprintf(os.Stderr, "Programs does not support fixed duration mode\n")
188                os.Exit(1)
189        }
190
191        var es string
192        if exhaust {
193                es = "with"
194        } else {
195                es = "without"
196        }
197        fmt.Printf("Running %d threads on %d processors, doing %d iterations, %s exhaustion\n", nthreads, nprocs, stop_count, es );
198
199        main_sem := make(chan struct{})
200        leader := NewLeader(uint64(nthreads))
201        barr := make(chan struct{})
202        result := make(chan uint64)
203
204        thddata := make([]MyThread, nthreads)
205        for i := range thddata {
206                thddata[i] = MyThread{ uint64(i), 0, make(chan struct {}) }
207        }
208
209        rechecks := uint64(0)
210        for i := range thddata {
211                go transfer_main(result, uint64(i), leader, thddata, exhaust, barr, main_sem)
212        }
213        fmt.Printf("Starting\n");
214
215        start := time.Now()
216        close(barr) // release barrier
217
218        <- main_sem
219
220        end := time.Now()
221        delta := end.Sub(start)
222
223        fmt.Printf("\nDone\n")
224
225        // release all the blocked threads
226        for i := range thddata {
227                close(thddata[i].sem)
228        }
229        for range thddata {
230                rechecks += <- result
231        }
232
233        p := message.NewPrinter(language.English)
234        var ws string
235        if exhaust {
236                ws = "yes"
237        } else {
238                ws = "no"
239        }
240        p.Printf("Duration (ms)           : %d\n", delta.Milliseconds() )
241        p.Printf("Number of processors    : %d\n", nprocs )
242        p.Printf("Number of threads       : %d\n", nthreads )
243        p.Printf("Total Operations(ops)   : %15d\n", (leader.idx - 1) )
244        p.Printf("Threads parking on wait : %s\n", ws)
245        p.Printf("Rechecking              : %d\n", rechecks )
246        p.Printf("ms per transfer         : %f\n", float64(delta.Milliseconds()) / float64(leader.idx) )
247}
Note: See TracBrowser for help on using the repository browser.