Changes in benchmark/readyQ/locality.go [f03209d3:90ecade]
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/readyQ/locality.go
rf03209d3 r90ecade 2 2 3 3 import ( 4 "context"5 4 "flag" 6 5 "fmt" 7 6 "math/rand" 8 7 "os" 9 "syscall"10 8 "sync/atomic" 11 9 "time" 12 "unsafe"13 "golang.org/x/sync/semaphore"14 10 "golang.org/x/text/language" 15 11 "golang.org/x/text/message" 16 12 ) 17 13 18 // ================================================== 19 type MyData struct { 20 _p1 [16]uint64 // padding 21 ttid int 22 id int 23 data [] uint64 24 _p2 [16]uint64 // padding 14 func handshake(stop chan struct {}, c chan [] uint64, data [] uint64, share bool) (bool, [] uint64) { 15 var s [] uint64 = data 16 if !share { 17 s = nil 18 } 19 20 // send the data 21 select { 22 case <- stop: 23 return true, nil 24 case c <- s: 25 } 26 27 // get the new data chunk 28 select { 29 case <- stop: 30 return true, nil 31 case n := <- c: 32 if share { 33 return false, n 34 } 35 return false, data 36 } 25 37 } 26 38 27 func NewData(id int, size uint64) (*MyData) {39 func local(result chan uint64, start chan struct{}, stop chan struct{}, size uint64, cnt uint64, channels []chan [] uint64, chan_cnt uint64, share bool) { 28 40 var data [] uint64 29 41 data = make([]uint64, size) … … 31 43 data[i] = 0 32 44 } 33 return &MyData{[16]uint64{0}, syscall.Gettid(), id, data,[16]uint64{0}} 45 count := uint64(0) 46 <- start 47 for true { 48 for i := uint64(0); i < cnt; i++ { 49 data[rand.Uint64() % size] += 1 50 } 51 52 i := rand.Uint64() % chan_cnt 53 var closed bool 54 closed, data = handshake(stop, channels[i], data, share) 55 count += 1 56 57 if closed { break } 58 if !clock_mode && count >= stop_count { break } 59 } 60 61 atomic.AddInt64(&threads_left, -1); 62 result <- count 34 63 } 35 64 36 func (this * MyData) moved( ttid int ) (uint64) { 37 if this.ttid == ttid { 38 return 0 39 } 40 this.ttid = ttid 41 return 1 42 } 65 func main() { 43 66 44 func (this * MyData) access( idx uint64 ) { 45 this.data[idx % uint64(len(this.data))] += 1 46 } 47 48 // ================================================== 49 type MyCtx struct { 50 _p1 [16]uint64 // padding 51 s * semaphore.Weighted 52 d unsafe.Pointer 53 c context.Context 54 ttid int 55 id int 56 _p2 [16]uint64 // padding 57 } 58 59 func NewCtx( data * MyData, id int ) (MyCtx) { 60 r := MyCtx{[16]uint64{0},semaphore.NewWeighted(1), unsafe.Pointer(data), context.Background(), syscall.Gettid(), id,[16]uint64{0}} 61 r.s.Acquire(context.Background(), 1) 62 return r 63 } 64 65 func (this * MyCtx) moved( ttid int ) (uint64) { 66 if this.ttid == ttid { 67 return 0 68 } 69 this.ttid = ttid 70 return 1 71 } 72 73 // ================================================== 74 // Atomic object where a single thread can wait 75 // May exchanges data 76 type Spot struct { 77 _p1 [16]uint64 // padding 78 ptr uintptr // atomic variable use fo MES 79 id int // id for debugging 80 _p2 [16]uint64 // padding 81 } 82 83 // Main handshake of the code 84 // Single seat, first thread arriving waits 85 // Next threads unblocks current one and blocks in its place 86 // if share == true, exchange data in the process 87 func (this * Spot) put( ctx * MyCtx, data * MyData, share bool) (* MyData, bool) { 88 new := uintptr(unsafe.Pointer(ctx)) 89 // old_d := ctx.d 90 91 // Attempt to CAS our context into the seat 92 var raw uintptr 93 for true { 94 raw = this.ptr 95 if raw == uintptr(1) { // Seat is closed, return 96 return nil, true 97 } 98 if atomic.CompareAndSwapUintptr(&this.ptr, raw, new) { 99 break // We got the seat 100 } 101 } 102 103 // If we aren't the fist in, wake someone 104 if raw != uintptr(0) { 105 var val *MyCtx 106 val = (*MyCtx)(unsafe.Pointer(raw)) 107 108 // If we are sharing, give them our data 109 if share { 110 // fmt.Printf("[%d] - %d update %d: %p -> %p\n", this.id, ctx.id, val.id, val.d, data) 111 atomic.StorePointer(&val.d, unsafe.Pointer(data)) 112 } 113 114 // Wake them up 115 // fmt.Printf("[%d] - %d release %d\n", this.id, ctx.id, val.id) 116 val.s.Release(1) 117 } 118 119 // fmt.Printf("[%d] - %d enter\n", this.id, ctx.id) 120 121 // Block once on the seat 122 ctx.s.Acquire(ctx.c, 1) 123 124 // Someone woke us up, get the new data 125 ret := (* MyData)(atomic.LoadPointer(&ctx.d)) 126 // fmt.Printf("[%d] - %d leave: %p -> %p\n", this.id, ctx.id, ret, old_d) 127 128 return ret, false 129 } 130 131 // Shutdown the spot 132 // Wake current thread and mark seat as closed 133 func (this * Spot) release() { 134 val := (*MyCtx)(unsafe.Pointer(atomic.SwapUintptr(&this.ptr, uintptr(1)))) 135 if val == nil { 136 return 137 } 138 139 // Someone was there, release them 140 val.s.Release(1) 141 } 142 143 // ================================================== 144 // Struct for result, Go doesn't support passing tuple in channels 145 type Result struct { 146 count uint64 147 gmigs uint64 148 dmigs uint64 149 } 150 151 func NewResult() (Result) { 152 return Result{0, 0, 0} 153 } 154 155 // ================================================== 156 // Random number generator, Go's native one is to slow and global 157 func __xorshift64( state * uint64 ) (uint64) { 158 x := *state 159 x ^= x << 13 160 x ^= x >> 7 161 x ^= x << 17 162 *state = x 163 return x 164 } 165 166 // ================================================== 167 // Do some work by accessing 'cnt' cells in the array 168 func work(data * MyData, cnt uint64, state * uint64) { 169 for i := uint64(0); i < cnt; i++ { 170 data.access(__xorshift64(state)) 171 } 172 } 173 174 // Main body of the threads 175 func local(result chan Result, start chan struct{}, size uint64, cnt uint64, channels [] Spot, share bool, id int) { 176 // Initialize some data 177 state := rand.Uint64() // RNG state 178 data := NewData(id, size) // Starting piece of data 179 ctx := NewCtx(data, id) // Goroutine local context 180 181 // Prepare results 182 r := NewResult() 183 184 // Wait for start 185 <- start 186 187 // Main loop 188 for true { 189 // Touch our current data, write to invalidate remote cache lines 190 work(data, cnt, &state) 191 192 // Wait on a random spot 193 i := __xorshift64(&state) % uint64(len(channels)) 194 var closed bool 195 data, closed = channels[i].put(&ctx, data, share) 196 197 // Check if the experiment is over 198 if closed { break } // yes, spot was closed 199 if clock_mode && atomic.LoadInt32(&stop) == 1 { break } // yes, time's up 200 if !clock_mode && r.count >= stop_count { break } // yes, iterations reached 201 202 // Check everything is consistent 203 if uint64(len(data.data)) != size { panic("Data has weird size") } 204 205 // write down progress and check migrations 206 ttid := syscall.Gettid() 207 r.count += 1 208 r.gmigs += ctx .moved(ttid) 209 r.dmigs += data.moved(ttid) 210 } 211 212 // Mark goroutine as done 213 atomic.AddInt64(&threads_left, -1); 214 215 // return result 216 result <- r 217 } 218 219 // ================================================== 220 // Program main 221 func main() { 222 // Benchmark specific command line arguments 223 nspotsOpt := flag.Int ("n", 0 , "Number of spots where threads sleep (nthreads - nspots are active at the same time)") 224 work_sizeOpt := flag.Uint64("w", 2 , "Size of the array for each threads, in words (64bit)") 225 countOpt := flag.Uint64("c", 2 , "Number of words to touch when working (random pick, cells can be picked more than once)") 67 work_sizeOpt := flag.Uint64("w", 2 , "Number of words (uint64) per threads") 68 countOpt := flag.Uint64("c", 2 , "Number of words (uint64) to touch") 226 69 shareOpt := flag.Bool ("s", false, "Pass the work data to the next thread when blocking") 227 70 228 // General benchmark initialization and deinitialization 229 defer bench_init()() 71 bench_init() 230 72 231 // Eval command line arguments232 nspots:= *nspotsOpt233 73 size := *work_sizeOpt 234 74 cnt := *countOpt 235 75 share := *shareOpt 236 76 237 if nspots == 0 { nspots = nthreads - nprocs; }238 239 // Check params240 77 if ! (nthreads > nprocs) { 241 78 fmt.Fprintf(os.Stderr, "Must have more threads than procs\n") … … 243 80 } 244 81 245 // Make global data246 barrierSt art := make(chan struct{}) // Barrier used at the start247 threads_left = int64(nthreads - nspots) // Counter for active threads (not 'nthreads' because at all times 'nthreads - nprocs' are blocked)248 result := make(chan Result) // Channel for results249 channels := make([] Spot, nspots) // Number of spots82 barrierStart := make(chan struct{}) 83 barrierStop := make(chan struct{}) 84 threads_left = int64(nthreads) 85 result := make(chan uint64) 86 channels := make([]chan [] uint64, nthreads - nprocs) 250 87 for i := range channels { 251 channels[i] = Spot{[16]uint64{0},uintptr(0), i,[16]uint64{0}} // init spots88 channels[i] = make(chan [] uint64, 1) 252 89 } 253 90 254 // start the goroutines255 91 for i := 0; i < nthreads; i++ { 256 go local(result, barrierStart, size, cnt, channels, share, i)92 go local(result, barrierStart, barrierStop, size, cnt, channels, uint64(nthreads - nprocs), share) 257 93 } 258 94 fmt.Printf("Starting\n"); 259 95 260 atomic.StoreInt32(&stop, 0)261 96 start := time.Now() 262 close(barrierStart) // release barrier97 close(barrierStart) 263 98 264 wait(start, true); // general benchmark wait99 wait(start, true); 265 100 266 atomic.StoreInt32(&stop, 1)101 close(barrierStop) 267 102 end := time.Now() 268 103 delta := end.Sub(start) … … 270 105 fmt.Printf("\nDone\n") 271 106 272 // release all the blocked threads273 for i := range channels{274 channels[i].release()107 global_counter := uint64(0) 108 for i := 0; i < nthreads; i++ { 109 global_counter += <- result 275 110 } 276 111 277 // Join and accumulate results278 results := NewResult()279 for i := 0; i < nthreads; i++ {280 r := <- result281 results.count += r.count282 results.gmigs += r.gmigs283 results.dmigs += r.dmigs284 }285 286 // Print with nice 's, i.e. 1'000'000 instead of 1000000287 112 p := message.NewPrinter(language.English) 288 p.Printf("Duration ( s): %f\n", delta.Seconds());113 p.Printf("Duration (ms) : %f\n", delta.Seconds()); 289 114 p.Printf("Number of processors : %d\n", nprocs); 290 115 p.Printf("Number of threads : %d\n", nthreads); 291 116 p.Printf("Work size (64bit words): %d\n", size); 292 p.Printf("Total Operations(ops) : %15d\n", results.count) 293 p.Printf("Total G Migrations : %15d\n", results.gmigs) 294 p.Printf("Total D Migrations : %15d\n", results.dmigs) 295 p.Printf("Ops per second : %18.2f\n", float64(results.count) / delta.Seconds()) 296 p.Printf("ns per ops : %18.2f\n", float64(delta.Nanoseconds()) / float64(results.count)) 297 p.Printf("Ops per threads : %15d\n", results.count / uint64(nthreads)) 298 p.Printf("Ops per procs : %15d\n", results.count / uint64(nprocs)) 299 p.Printf("Ops/sec/procs : %18.2f\n", (float64(results.count) / float64(nprocs)) / delta.Seconds()) 300 p.Printf("ns per ops/procs : %18.2f\n", float64(delta.Nanoseconds()) / (float64(results.count) / float64(nprocs))) 117 p.Printf("Total Operations(ops) : %15d\n", global_counter) 118 p.Printf("Ops per second : %18.2f\n", float64(global_counter) / delta.Seconds()) 119 p.Printf("ns per ops : %18.2f\n", float64(delta.Nanoseconds()) / float64(global_counter)) 120 p.Printf("Ops per threads : %15d\n", global_counter / uint64(nthreads)) 121 p.Printf("Ops per procs : %15d\n", global_counter / uint64(nprocs)) 122 p.Printf("Ops/sec/procs : %18.2f\n", (float64(global_counter) / float64(nprocs)) / delta.Seconds()) 123 p.Printf("ns per ops/procs : %18.2f\n", float64(delta.Nanoseconds()) / (float64(global_counter) / float64(nprocs))) 301 124 }
Note: See TracChangeset
for help on using the changeset viewer.