source: benchmark/readyQ/transfer.go @ 0218d51

ADTast-experimentalenumforall-pointer-decaypthread-emulationqualifiedEnum
Last change on this file since 0218d51 was 0218d51, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Implemeted go transfer benchmark

  • Property mode set to 100644
File size: 5.0 KB
Line 
1package main
2
3import (
4        "flag"
5        "fmt"
6        "math/rand"
7        "os"
8        "runtime"
9        "sync/atomic"
10        "time"
11        "golang.org/x/text/language"
12        "golang.org/x/text/message"
13)
14
15type LeaderInfo struct {
16        id uint64
17        idx uint64
18        seed uint64
19}
20
21func __xorshift64( state * uint64 ) (uint64) {
22        x := *state
23        x ^= x << 13
24        x ^= x >> 7
25        x ^= x << 17
26        *state = x
27        return x
28}
29
30func (this * LeaderInfo) next(len uint64) {
31        n := __xorshift64( &this.seed )
32        atomic.StoreUint64( &this.id, n % len )
33}
34
35func NewLeader(size uint64) (*LeaderInfo) {
36        this := &LeaderInfo{0, 0, uint64(os.Getpid())}
37
38        r := rand.Intn(10)
39
40        for i := 0; i < r; i++ {
41                this.next( uint64(nthreads) )
42        }
43
44        return this
45}
46
47type MyThread struct {
48        id uint64
49        idx uint64
50        sem chan struct{}
51}
52
53func waitgroup(idx uint64, threads [] MyThread) {
54        start := time.Now()
55        for i := 0; i < len(threads); i++ {
56                // fmt.Fprintf(os.Stderr, "Waiting for :%d (%d)\n", threads[i].id, atomic.LoadUint64(&threads[i].idx) );
57                for atomic.LoadUint64( &threads[i].idx ) != idx {
58                        // hint::spin_loop();
59                        end := time.Now()
60                        delta := end.Sub(start)
61                        if delta.Seconds() > 5 {
62                                fmt.Fprintf(os.Stderr, "Programs has been blocked for more than 5 secs")
63                                os.Exit(1)
64                        }
65                }
66        }
67        // debug!( "Waiting done" );
68}
69
70func wakegroup(exhaust bool, me uint64, threads [] MyThread) {
71        if !exhaust { return; }
72
73        for i := uint64(0); i < uint64(len(threads)); i++ {
74                if i != me {
75                        // debug!( "Leader waking {}", i);
76                        threads[i].sem <- (struct {}{})
77                }
78        }
79}
80
81func lead(exhaust bool, leader * LeaderInfo, this * MyThread, threads [] MyThread, main_sem chan struct {}) {
82        nidx := atomic.LoadUint64(&leader.idx) + 1;
83        atomic.StoreUint64(&this.idx, nidx);
84        atomic.StoreUint64(&leader.idx, nidx);
85
86        if nidx > stop_count {
87                // debug!( "Leader {} done", this.id);
88                main_sem <- (struct {}{})
89                return
90        }
91
92        // debug!( "====================\nLeader no {} : {}", nidx, this.id);
93
94        waitgroup(nidx, threads);
95
96        leader.next( uint64(len(threads)) );
97
98        wakegroup(exhaust, this.id, threads);
99
100        // debug!( "Leader no {} : {} done\n====================", nidx, this.id);
101}
102
103func waitleader(exhaust bool, leader * LeaderInfo, this * MyThread, rechecks * uint64) {
104        runtime.Gosched()
105
106        if atomic.LoadUint64(&leader.idx) == atomic.LoadUint64(&this.idx) {
107                // debug!("Waiting {} recheck", this.id);
108                *rechecks += uint64(1)
109                return
110        }
111
112        // debug!("Waiting {}", this.id);
113
114        // debug_assert!( (leader.idx.load(Ordering::Relaxed) - 1) == this.idx.load(Ordering::Relaxed) );
115        atomic.AddUint64(&this.idx, 1)
116        if exhaust {
117                // debug!("Waiting {} sem", this.id);
118                <- this.sem
119        } else {
120                // debug!("Waiting {} yield", this.id);
121                runtime.Gosched()
122        }
123
124        // debug!("Waiting {} done", this.id);
125}
126
127func transfer_main( result chan uint64, me uint64, leader * LeaderInfo, threads [] MyThread, exhaust bool, start chan struct{}, main_sem chan struct{}) {
128        // assert!( me == threads[me].id );
129
130        // debug!("Ready {}: {:p}", me, &threads[me].sem as *const sync::Semaphore);
131
132        // Wait for start
133        <- start
134
135        // debug!( "Start {}", me );
136
137        // Prepare results
138        r := uint64(0)
139
140        // Main loop
141        for true {
142                if atomic.LoadUint64(&leader.id) == me {
143                        lead( exhaust, leader, &threads[me], threads, main_sem )
144                        runtime.Gosched()
145                } else {
146                        waitleader( exhaust, leader, &threads[me], &r )
147                }
148                if atomic.LoadUint64(&leader.idx) > stop_count { break; }
149        }
150
151        // return result
152        result <- r
153}
154
155func main() {
156        // Benchmark specific command line arguments
157        exhaustOpt := flag.Bool("e", false, "Whether or not threads that have seen the new epoch should park instead of yielding.")
158
159        // General benchmark initialization and deinitialization
160        defer bench_init()()
161
162        exhaust := *exhaustOpt;
163        if clock_mode {
164                fmt.Fprintf(os.Stderr, "Programs does not support fixed duration mode")
165                os.Exit(1)
166        }
167
168        var es string
169        if exhaust {
170                es = "with"
171        } else {
172                es = "without"
173        }
174        fmt.Printf("Running %d threads on %d processors, doing %d iterations, %s exhaustion\n", nthreads, nprocs, stop_count, es );
175
176        main_sem := make(chan struct{})
177        leader := NewLeader(uint64(nthreads))
178        barr := make(chan struct{})
179        result := make(chan uint64)
180
181        thddata := make([]MyThread, nthreads)
182        for i := range thddata {
183                thddata[i] = MyThread{ uint64(i), 0, make(chan struct {}) }
184        }
185
186        rechecks := uint64(0)
187        for i := range thddata {
188                go transfer_main(result, uint64(i), leader, thddata, exhaust, barr, main_sem)
189        }
190        fmt.Printf("Starting\n");
191
192        start := time.Now()
193        close(barr) // release barrier
194
195        <- main_sem
196
197        end := time.Now()
198        delta := end.Sub(start)
199
200        fmt.Printf("\nDone\n")
201
202        // release all the blocked threads
203        for i := range thddata {
204                close(thddata[i].sem)
205        }
206        for range thddata {
207                rechecks += <- result
208        }
209
210        p := message.NewPrinter(language.English)
211        var ws string
212        if exhaust {
213                ws = "yes"
214        } else {
215                ws = "no"
216        }
217        p.Printf("Duration (s)            : %f\n", delta.Seconds() )
218        p.Printf("Number of processors    : %d\n", nprocs )
219        p.Printf("Number of threads       : %d\n", nthreads )
220        p.Printf("Total Operations(ops)   : %15d\n", stop_count )
221        p.Printf("Threads parking on wait : %s\n", ws)
222        p.Printf("Rechecking              : %d\n", rechecks )
223}
Note: See TracBrowser for help on using the repository browser.