| 1 | package main
 | 
|---|
| 2 | 
 | 
|---|
| 3 | import (
 | 
|---|
| 4 |         "context"
 | 
|---|
| 5 |         "flag"
 | 
|---|
| 6 |         "fmt"
 | 
|---|
| 7 |         "math/rand"
 | 
|---|
| 8 |         "os"
 | 
|---|
| 9 |         "syscall"
 | 
|---|
| 10 |         "sync/atomic"
 | 
|---|
| 11 |         "time"
 | 
|---|
| 12 |         "unsafe"
 | 
|---|
| 13 |         "golang.org/x/sync/semaphore"
 | 
|---|
| 14 |         "golang.org/x/text/language"
 | 
|---|
| 15 |         "golang.org/x/text/message"
 | 
|---|
| 16 | )
 | 
|---|
| 17 | 
 | 
|---|
| 18 | // ==================================================
 | 
|---|
| 19 | type MyData struct {
 | 
|---|
| 20 |         _p1 [16]uint64 // padding
 | 
|---|
| 21 |         ttid int
 | 
|---|
| 22 |         id int
 | 
|---|
| 23 |         data [] uint64
 | 
|---|
| 24 |         _p2 [16]uint64 // padding
 | 
|---|
| 25 | }
 | 
|---|
| 26 | 
 | 
|---|
| 27 | func NewData(id int, size uint64) (*MyData) {
 | 
|---|
| 28 |         var data [] uint64
 | 
|---|
| 29 |         data = make([]uint64, size)
 | 
|---|
| 30 |         for i := uint64(0); i < size; i++ {
 | 
|---|
| 31 |                 data[i] = 0
 | 
|---|
| 32 |         }
 | 
|---|
| 33 |         return &MyData{[16]uint64{0}, syscall.Gettid(), id, data,[16]uint64{0}}
 | 
|---|
| 34 | }
 | 
|---|
| 35 | 
 | 
|---|
| 36 | func (this * MyData) moved( ttid int ) (uint64) {
 | 
|---|
| 37 |         if this.ttid == ttid {
 | 
|---|
| 38 |                 return 0
 | 
|---|
| 39 |         }
 | 
|---|
| 40 |         this.ttid = ttid
 | 
|---|
| 41 |         return 1
 | 
|---|
| 42 | }
 | 
|---|
| 43 | 
 | 
|---|
| 44 | func (this * MyData) access( idx uint64 ) {
 | 
|---|
| 45 |         this.data[idx % uint64(len(this.data))] += 1
 | 
|---|
| 46 | }
 | 
|---|
| 47 | 
 | 
|---|
| 48 | // ==================================================
 | 
|---|
| 49 | type MyCtx struct {
 | 
|---|
| 50 |         _p1 [16]uint64 // padding
 | 
|---|
| 51 |         s * semaphore.Weighted
 | 
|---|
| 52 |         d unsafe.Pointer
 | 
|---|
| 53 |         c context.Context
 | 
|---|
| 54 |         ttid int
 | 
|---|
| 55 |         id int
 | 
|---|
| 56 |         _p2 [16]uint64 // padding
 | 
|---|
| 57 | }
 | 
|---|
| 58 | 
 | 
|---|
| 59 | func NewCtx( data * MyData, id int ) (MyCtx) {
 | 
|---|
| 60 |         r := MyCtx{[16]uint64{0},semaphore.NewWeighted(1), unsafe.Pointer(data), context.Background(), syscall.Gettid(), id,[16]uint64{0}}
 | 
|---|
| 61 |         r.s.Acquire(context.Background(), 1)
 | 
|---|
| 62 |         return r
 | 
|---|
| 63 | }
 | 
|---|
| 64 | 
 | 
|---|
| 65 | func (this * MyCtx) moved( ttid int ) (uint64) {
 | 
|---|
| 66 |         if this.ttid == ttid {
 | 
|---|
| 67 |                 return 0
 | 
|---|
| 68 |         }
 | 
|---|
| 69 |         this.ttid = ttid
 | 
|---|
| 70 |         return 1
 | 
|---|
| 71 | }
 | 
|---|
| 72 | 
 | 
|---|
| 73 | // ==================================================
 | 
|---|
| 74 | // Atomic object where a single thread can wait
 | 
|---|
| 75 | // May exchanges data
 | 
|---|
| 76 | type Spot struct {
 | 
|---|
| 77 |         _p1 [16]uint64 // padding
 | 
|---|
| 78 |         ptr uintptr // atomic variable use fo MES
 | 
|---|
| 79 |         id int      // id for debugging
 | 
|---|
| 80 |         _p2 [16]uint64 // padding
 | 
|---|
| 81 | }
 | 
|---|
| 82 | 
 | 
|---|
| 83 | // Main handshake of the code
 | 
|---|
| 84 | // Single seat, first thread arriving waits
 | 
|---|
| 85 | // Next threads unblocks current one and blocks in its place
 | 
|---|
| 86 | // if share == true, exchange data in the process
 | 
|---|
| 87 | func (this * Spot) put( ctx * MyCtx, data * MyData, share bool) (* MyData, bool) {
 | 
|---|
| 88 |         new := uintptr(unsafe.Pointer(ctx))
 | 
|---|
| 89 |         // old_d := ctx.d
 | 
|---|
| 90 | 
 | 
|---|
| 91 |         // Attempt to CAS our context into the seat
 | 
|---|
| 92 |         var raw uintptr
 | 
|---|
| 93 |         for true {
 | 
|---|
| 94 |                 raw = this.ptr
 | 
|---|
| 95 |                 if raw == uintptr(1) { // Seat is closed, return
 | 
|---|
| 96 |                         return nil, true
 | 
|---|
| 97 |                 }
 | 
|---|
| 98 |                 if atomic.CompareAndSwapUintptr(&this.ptr, raw, new) {
 | 
|---|
| 99 |                         break // We got the seat
 | 
|---|
| 100 |                 }
 | 
|---|
| 101 |         }
 | 
|---|
| 102 | 
 | 
|---|
| 103 |         // If we aren't the fist in, wake someone
 | 
|---|
| 104 |         if raw != uintptr(0) {
 | 
|---|
| 105 |                 var val *MyCtx
 | 
|---|
| 106 |                 val = (*MyCtx)(unsafe.Pointer(raw))
 | 
|---|
| 107 | 
 | 
|---|
| 108 |                 // If we are sharing, give them our data
 | 
|---|
| 109 |                 if share {
 | 
|---|
| 110 |                         // fmt.Printf("[%d] - %d update %d: %p -> %p\n", this.id, ctx.id, val.id, val.d, data)
 | 
|---|
| 111 |                         atomic.StorePointer(&val.d, unsafe.Pointer(data))
 | 
|---|
| 112 |                 }
 | 
|---|
| 113 | 
 | 
|---|
| 114 |                 // Wake them up
 | 
|---|
| 115 |                 // fmt.Printf("[%d] - %d release %d\n", this.id, ctx.id, val.id)
 | 
|---|
| 116 |                 val.s.Release(1)
 | 
|---|
| 117 |         }
 | 
|---|
| 118 | 
 | 
|---|
| 119 |         // fmt.Printf("[%d] - %d enter\n", this.id, ctx.id)
 | 
|---|
| 120 | 
 | 
|---|
| 121 |         // Block once on the seat
 | 
|---|
| 122 |         ctx.s.Acquire(ctx.c, 1)
 | 
|---|
| 123 | 
 | 
|---|
| 124 |         // Someone woke us up, get the new data
 | 
|---|
| 125 |         ret := (* MyData)(atomic.LoadPointer(&ctx.d))
 | 
|---|
| 126 |         // fmt.Printf("[%d] - %d leave: %p -> %p\n", this.id, ctx.id, ret, old_d)
 | 
|---|
| 127 | 
 | 
|---|
| 128 |         return ret, false
 | 
|---|
| 129 | }
 | 
|---|
| 130 | 
 | 
|---|
| 131 | // Shutdown the spot
 | 
|---|
| 132 | // Wake current thread and mark seat as closed
 | 
|---|
| 133 | func (this * Spot) release() {
 | 
|---|
| 134 |         val := (*MyCtx)(unsafe.Pointer(atomic.SwapUintptr(&this.ptr, uintptr(1))))
 | 
|---|
| 135 |         if val == nil {
 | 
|---|
| 136 |                 return
 | 
|---|
| 137 |         }
 | 
|---|
| 138 | 
 | 
|---|
| 139 |         // Someone was there, release them
 | 
|---|
| 140 |         val.s.Release(1)
 | 
|---|
| 141 | }
 | 
|---|
| 142 | 
 | 
|---|
| 143 | // ==================================================
 | 
|---|
| 144 | // Struct for result, Go doesn't support passing tuple in channels
 | 
|---|
| 145 | type Result struct {
 | 
|---|
| 146 |         count uint64
 | 
|---|
| 147 |         gmigs uint64
 | 
|---|
| 148 |         dmigs uint64
 | 
|---|
| 149 | }
 | 
|---|
| 150 | 
 | 
|---|
| 151 | func NewResult() (Result) {
 | 
|---|
| 152 |         return Result{0, 0, 0}
 | 
|---|
| 153 | }
 | 
|---|
| 154 | 
 | 
|---|
| 155 | // ==================================================
 | 
|---|
| 156 | // Random number generator, Go's native one is to slow and global
 | 
|---|
| 157 | func __xorshift64( state * uint64 ) (uint64) {
 | 
|---|
| 158 |         x := *state
 | 
|---|
| 159 |         x ^= x << 13
 | 
|---|
| 160 |         x ^= x >> 7
 | 
|---|
| 161 |         x ^= x << 17
 | 
|---|
| 162 |         *state = x
 | 
|---|
| 163 |         return x
 | 
|---|
| 164 | }
 | 
|---|
| 165 | 
 | 
|---|
| 166 | // ==================================================
 | 
|---|
| 167 | // Do some work by accessing 'cnt' cells in the array
 | 
|---|
| 168 | func work(data * MyData, cnt uint64, state * uint64) {
 | 
|---|
| 169 |         for i := uint64(0); i < cnt; i++ {
 | 
|---|
| 170 |                 data.access(__xorshift64(state))
 | 
|---|
| 171 |         }
 | 
|---|
| 172 | }
 | 
|---|
| 173 | 
 | 
|---|
| 174 | // Main body of the threads
 | 
|---|
| 175 | func local(result chan Result, start chan struct{}, size uint64, cnt uint64, channels [] Spot, share bool, id int) {
 | 
|---|
| 176 |         // Initialize some data
 | 
|---|
| 177 |         state := rand.Uint64()    // RNG state
 | 
|---|
| 178 |         data := NewData(id, size) // Starting piece of data
 | 
|---|
| 179 |         ctx := NewCtx(data, id)   // Goroutine local context
 | 
|---|
| 180 | 
 | 
|---|
| 181 |         // Prepare results
 | 
|---|
| 182 |         r := NewResult()
 | 
|---|
| 183 | 
 | 
|---|
| 184 |         // Wait for start
 | 
|---|
| 185 |         <- start
 | 
|---|
| 186 | 
 | 
|---|
| 187 |         // Main loop
 | 
|---|
| 188 |         for true {
 | 
|---|
| 189 |                 // Touch our current data, write to invalidate remote cache lines
 | 
|---|
| 190 |                 work(data, cnt, &state)
 | 
|---|
| 191 | 
 | 
|---|
| 192 |                 // Wait on a random spot
 | 
|---|
| 193 |                 i := __xorshift64(&state) % uint64(len(channels))
 | 
|---|
| 194 |                 var closed bool
 | 
|---|
| 195 |                 data, closed = channels[i].put(&ctx, data, share)
 | 
|---|
| 196 | 
 | 
|---|
| 197 |                 // Check if the experiment is over
 | 
|---|
| 198 |                 if closed { break }                                       // yes, spot was closed
 | 
|---|
| 199 |                 if  clock_mode && atomic.LoadInt32(&stop) == 1 { break }  // yes, time's up
 | 
|---|
| 200 |                 if !clock_mode && r.count >= stop_count { break }         // yes, iterations reached
 | 
|---|
| 201 | 
 | 
|---|
| 202 |                 // Check everything is consistent
 | 
|---|
| 203 |                 if uint64(len(data.data)) != size { panic("Data has weird size") }
 | 
|---|
| 204 | 
 | 
|---|
| 205 |                 // write down progress and check migrations
 | 
|---|
| 206 |                 ttid := syscall.Gettid()
 | 
|---|
| 207 |                 r.count += 1
 | 
|---|
| 208 |                 r.gmigs += ctx .moved(ttid)
 | 
|---|
| 209 |                 r.dmigs += data.moved(ttid)
 | 
|---|
| 210 |         }
 | 
|---|
| 211 | 
 | 
|---|
| 212 |         // Mark goroutine as done
 | 
|---|
| 213 |         atomic.AddInt64(&threads_left, -1);
 | 
|---|
| 214 | 
 | 
|---|
| 215 |         // return result
 | 
|---|
| 216 |         result <- r
 | 
|---|
| 217 | }
 | 
|---|
| 218 | 
 | 
|---|
| 219 | // ==================================================
 | 
|---|
| 220 | // Program main
 | 
|---|
| 221 | func main() {
 | 
|---|
| 222 |         // Benchmark specific command line arguments
 | 
|---|
| 223 |         nspotsOpt    := flag.Int   ("n", 0    , "Number of spots where threads sleep (nthreads - nspots are active at the same time)")
 | 
|---|
| 224 |         work_sizeOpt := flag.Uint64("w", 2    , "Size of the array for each threads, in words (64bit)")
 | 
|---|
| 225 |         countOpt     := flag.Uint64("c", 2    , "Number of words to touch when working (random pick, cells can be picked more than once)")
 | 
|---|
| 226 |         shareOpt     := flag.Bool  ("s", false, "Pass the work data to the next thread when blocking")
 | 
|---|
| 227 | 
 | 
|---|
| 228 |         // General benchmark initialization and deinitialization
 | 
|---|
| 229 |         defer bench_init()()
 | 
|---|
| 230 | 
 | 
|---|
| 231 |         // Eval command line arguments
 | 
|---|
| 232 |         nspots:= *nspotsOpt
 | 
|---|
| 233 |         size  := *work_sizeOpt
 | 
|---|
| 234 |         cnt   := *countOpt
 | 
|---|
| 235 |         share := *shareOpt
 | 
|---|
| 236 | 
 | 
|---|
| 237 |         if nspots == 0 { nspots = nthreads - nprocs; }
 | 
|---|
| 238 | 
 | 
|---|
| 239 |         // Check params
 | 
|---|
| 240 |         if ! (nthreads > nprocs) {
 | 
|---|
| 241 |                 fmt.Fprintf(os.Stderr, "Must have more threads than procs\n")
 | 
|---|
| 242 |                 os.Exit(1)
 | 
|---|
| 243 |         }
 | 
|---|
| 244 | 
 | 
|---|
| 245 |         // Make global data
 | 
|---|
| 246 |         barrierStart := make(chan struct{})         // Barrier used at the start
 | 
|---|
| 247 |         threads_left = int64(nthreads - nspots)                // Counter for active threads (not 'nthreads' because at all times 'nthreads - nprocs' are blocked)
 | 
|---|
| 248 |         result  := make(chan Result)                // Channel for results
 | 
|---|
| 249 |         channels := make([]Spot, nspots) // Number of spots
 | 
|---|
| 250 |         for i := range channels {
 | 
|---|
| 251 |                 channels[i] = Spot{[16]uint64{0},uintptr(0), i,[16]uint64{0}}     // init spots
 | 
|---|
| 252 |         }
 | 
|---|
| 253 | 
 | 
|---|
| 254 |         // start the goroutines
 | 
|---|
| 255 |         for i := 0; i < nthreads; i++ {
 | 
|---|
| 256 |                 go local(result, barrierStart, size, cnt, channels, share, i)
 | 
|---|
| 257 |         }
 | 
|---|
| 258 |         fmt.Printf("Starting\n");
 | 
|---|
| 259 | 
 | 
|---|
| 260 |         atomic.StoreInt32(&stop, 0)
 | 
|---|
| 261 |         start := time.Now()
 | 
|---|
| 262 |         close(barrierStart) // release barrier
 | 
|---|
| 263 | 
 | 
|---|
| 264 |         wait(start, true);  // general benchmark wait
 | 
|---|
| 265 | 
 | 
|---|
| 266 |         atomic.StoreInt32(&stop, 1)
 | 
|---|
| 267 |         end := time.Now()
 | 
|---|
| 268 |         delta := end.Sub(start)
 | 
|---|
| 269 | 
 | 
|---|
| 270 |         fmt.Printf("\nDone\n")
 | 
|---|
| 271 | 
 | 
|---|
| 272 |         // release all the blocked threads
 | 
|---|
| 273 |         for i := range channels {
 | 
|---|
| 274 |                 channels[i].release()
 | 
|---|
| 275 |         }
 | 
|---|
| 276 | 
 | 
|---|
| 277 |         // Join and accumulate results
 | 
|---|
| 278 |         results := NewResult()
 | 
|---|
| 279 |         for i := 0; i < nthreads; i++ {
 | 
|---|
| 280 |                 r := <- result
 | 
|---|
| 281 |                 results.count += r.count
 | 
|---|
| 282 |                 results.gmigs += r.gmigs
 | 
|---|
| 283 |                 results.dmigs += r.dmigs
 | 
|---|
| 284 |         }
 | 
|---|
| 285 | 
 | 
|---|
| 286 |         // Print with nice 's, i.e. 1'000'000 instead of 1000000
 | 
|---|
| 287 |         p := message.NewPrinter(language.English)
 | 
|---|
| 288 |         p.Printf("Duration (ms)          : %d\n", delta.Milliseconds());
 | 
|---|
| 289 |         p.Printf("Number of processors   : %d\n", nprocs);
 | 
|---|
| 290 |         p.Printf("Number of threads      : %d\n", nthreads);
 | 
|---|
| 291 |         p.Printf("Work size (64bit words): %d\n", size);
 | 
|---|
| 292 |         if share {
 | 
|---|
| 293 |                 p.Printf("Data sharing           : On\n");
 | 
|---|
| 294 |         } else {
 | 
|---|
| 295 |                 p.Printf("Data sharing           : Off\n");
 | 
|---|
| 296 |         }
 | 
|---|
| 297 |         p.Printf("Total Operations(ops)  : %15d\n", results.count)
 | 
|---|
| 298 |         p.Printf("Total G Migrations     : %15d\n", results.gmigs)
 | 
|---|
| 299 |         p.Printf("Total D Migrations     : %15d\n", results.dmigs)
 | 
|---|
| 300 |         p.Printf("Ops per second         : %18.2f\n", float64(results.count) / delta.Seconds())
 | 
|---|
| 301 |         p.Printf("ns per ops             : %18.2f\n", float64(delta.Nanoseconds()) / float64(results.count))
 | 
|---|
| 302 |         p.Printf("Ops per threads        : %15d\n", results.count / uint64(nthreads))
 | 
|---|
| 303 |         p.Printf("Ops per procs          : %15d\n", results.count / uint64(nprocs))
 | 
|---|
| 304 |         p.Printf("Ops/sec/procs          : %18.2f\n", (float64(results.count) / float64(nprocs)) / delta.Seconds())
 | 
|---|
| 305 |         p.Printf("ns per ops/procs       : %18.2f\n", float64(delta.Nanoseconds()) / (float64(results.count) / float64(nprocs)))
 | 
|---|
| 306 | }
 | 
|---|