Changes in benchmark/readyQ/locality.go [90ecade:f03209d3]
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/readyQ/locality.go
r90ecade rf03209d3 2 2 3 3 import ( 4 "context" 4 5 "flag" 5 6 "fmt" 6 7 "math/rand" 7 8 "os" 9 "syscall" 8 10 "sync/atomic" 9 11 "time" 12 "unsafe" 13 "golang.org/x/sync/semaphore" 10 14 "golang.org/x/text/language" 11 15 "golang.org/x/text/message" 12 16 ) 13 17 14 func handshake(stop chan struct {}, c chan [] uint64, data [] uint64, share bool) (bool, [] uint64) { 15 var s [] uint64 = data 16 if !share { 17 s = nil 18 } 19 20 // send the data 21 select { 22 case <- stop: 23 return true, nil 24 case c <- s: 25 } 26 27 // get the new data chunk 28 select { 29 case <- stop: 30 return true, nil 31 case n := <- c: 32 if share { 33 return false, n 34 } 35 return false, data 36 } 37 } 38 39 func local(result chan uint64, start chan struct{}, stop chan struct{}, size uint64, cnt uint64, channels []chan [] uint64, chan_cnt uint64, share bool) { 18 // ================================================== 19 type MyData struct { 20 _p1 [16]uint64 // padding 21 ttid int 22 id int 23 data [] uint64 24 _p2 [16]uint64 // padding 25 } 26 27 func NewData(id int, size uint64) (*MyData) { 40 28 var data [] uint64 41 29 data = make([]uint64, size) … … 43 31 data[i] = 0 44 32 } 45 count := uint64(0) 33 return &MyData{[16]uint64{0}, syscall.Gettid(), id, data,[16]uint64{0}} 34 } 35 36 func (this * MyData) moved( ttid int ) (uint64) { 37 if this.ttid == ttid { 38 return 0 39 } 40 this.ttid = ttid 41 return 1 42 } 43 44 func (this * MyData) access( idx uint64 ) { 45 this.data[idx % uint64(len(this.data))] += 1 46 } 47 48 // ================================================== 49 type MyCtx struct { 50 _p1 [16]uint64 // padding 51 s * semaphore.Weighted 52 d unsafe.Pointer 53 c context.Context 54 ttid int 55 id int 56 _p2 [16]uint64 // padding 57 } 58 59 func NewCtx( data * MyData, id int ) (MyCtx) { 60 r := MyCtx{[16]uint64{0},semaphore.NewWeighted(1), unsafe.Pointer(data), context.Background(), syscall.Gettid(), id,[16]uint64{0}} 61 r.s.Acquire(context.Background(), 1) 62 return r 63 } 64 65 func (this * MyCtx) moved( ttid int ) (uint64) { 66 if this.ttid == ttid { 67 return 0 68 } 69 this.ttid = ttid 70 return 1 71 } 72 73 // ================================================== 74 // Atomic object where a single thread can wait 75 // May exchanges data 76 type Spot struct { 77 _p1 [16]uint64 // padding 78 ptr uintptr // atomic variable use fo MES 79 id int // id for debugging 80 _p2 [16]uint64 // padding 81 } 82 83 // Main handshake of the code 84 // Single seat, first thread arriving waits 85 // Next threads unblocks current one and blocks in its place 86 // if share == true, exchange data in the process 87 func (this * Spot) put( ctx * MyCtx, data * MyData, share bool) (* MyData, bool) { 88 new := uintptr(unsafe.Pointer(ctx)) 89 // old_d := ctx.d 90 91 // Attempt to CAS our context into the seat 92 var raw uintptr 93 for true { 94 raw = this.ptr 95 if raw == uintptr(1) { // Seat is closed, return 96 return nil, true 97 } 98 if atomic.CompareAndSwapUintptr(&this.ptr, raw, new) { 99 break // We got the seat 100 } 101 } 102 103 // If we aren't the fist in, wake someone 104 if raw != uintptr(0) { 105 var val *MyCtx 106 val = (*MyCtx)(unsafe.Pointer(raw)) 107 108 // If we are sharing, give them our data 109 if share { 110 // fmt.Printf("[%d] - %d update %d: %p -> %p\n", this.id, ctx.id, val.id, val.d, data) 111 atomic.StorePointer(&val.d, unsafe.Pointer(data)) 112 } 113 114 // Wake them up 115 // fmt.Printf("[%d] - %d release %d\n", this.id, ctx.id, val.id) 116 val.s.Release(1) 117 } 118 119 // fmt.Printf("[%d] - %d enter\n", this.id, ctx.id) 120 121 // Block once on the seat 122 ctx.s.Acquire(ctx.c, 1) 123 124 // Someone woke us up, get the new data 125 ret := (* MyData)(atomic.LoadPointer(&ctx.d)) 126 // fmt.Printf("[%d] - %d leave: %p -> %p\n", this.id, ctx.id, ret, old_d) 127 128 return ret, false 129 } 130 131 // Shutdown the spot 132 // Wake current thread and mark seat as closed 133 func (this * Spot) release() { 134 val := (*MyCtx)(unsafe.Pointer(atomic.SwapUintptr(&this.ptr, uintptr(1)))) 135 if val == nil { 136 return 137 } 138 139 // Someone was there, release them 140 val.s.Release(1) 141 } 142 143 // ================================================== 144 // Struct for result, Go doesn't support passing tuple in channels 145 type Result struct { 146 count uint64 147 gmigs uint64 148 dmigs uint64 149 } 150 151 func NewResult() (Result) { 152 return Result{0, 0, 0} 153 } 154 155 // ================================================== 156 // Random number generator, Go's native one is to slow and global 157 func __xorshift64( state * uint64 ) (uint64) { 158 x := *state 159 x ^= x << 13 160 x ^= x >> 7 161 x ^= x << 17 162 *state = x 163 return x 164 } 165 166 // ================================================== 167 // Do some work by accessing 'cnt' cells in the array 168 func work(data * MyData, cnt uint64, state * uint64) { 169 for i := uint64(0); i < cnt; i++ { 170 data.access(__xorshift64(state)) 171 } 172 } 173 174 // Main body of the threads 175 func local(result chan Result, start chan struct{}, size uint64, cnt uint64, channels [] Spot, share bool, id int) { 176 // Initialize some data 177 state := rand.Uint64() // RNG state 178 data := NewData(id, size) // Starting piece of data 179 ctx := NewCtx(data, id) // Goroutine local context 180 181 // Prepare results 182 r := NewResult() 183 184 // Wait for start 46 185 <- start 186 187 // Main loop 47 188 for true { 48 for i := uint64(0); i < cnt; i++ {49 data[rand.Uint64() % size] += 150 } 51 52 i := rand.Uint64() % chan_cnt189 // Touch our current data, write to invalidate remote cache lines 190 work(data, cnt, &state) 191 192 // Wait on a random spot 193 i := __xorshift64(&state) % uint64(len(channels)) 53 194 var closed bool 54 closed, data = handshake(stop, channels[i], data, share) 55 count += 1 56 57 if closed { break } 58 if !clock_mode && count >= stop_count { break } 59 } 60 195 data, closed = channels[i].put(&ctx, data, share) 196 197 // Check if the experiment is over 198 if closed { break } // yes, spot was closed 199 if clock_mode && atomic.LoadInt32(&stop) == 1 { break } // yes, time's up 200 if !clock_mode && r.count >= stop_count { break } // yes, iterations reached 201 202 // Check everything is consistent 203 if uint64(len(data.data)) != size { panic("Data has weird size") } 204 205 // write down progress and check migrations 206 ttid := syscall.Gettid() 207 r.count += 1 208 r.gmigs += ctx .moved(ttid) 209 r.dmigs += data.moved(ttid) 210 } 211 212 // Mark goroutine as done 61 213 atomic.AddInt64(&threads_left, -1); 62 result <- count 63 } 64 214 215 // return result 216 result <- r 217 } 218 219 // ================================================== 220 // Program main 65 221 func main() { 66 67 work_sizeOpt := flag.Uint64("w", 2 , "Number of words (uint64) per threads") 68 countOpt := flag.Uint64("c", 2 , "Number of words (uint64) to touch") 222 // Benchmark specific command line arguments 223 nspotsOpt := flag.Int ("n", 0 , "Number of spots where threads sleep (nthreads - nspots are active at the same time)") 224 work_sizeOpt := flag.Uint64("w", 2 , "Size of the array for each threads, in words (64bit)") 225 countOpt := flag.Uint64("c", 2 , "Number of words to touch when working (random pick, cells can be picked more than once)") 69 226 shareOpt := flag.Bool ("s", false, "Pass the work data to the next thread when blocking") 70 227 71 bench_init() 72 228 // General benchmark initialization and deinitialization 229 defer bench_init()() 230 231 // Eval command line arguments 232 nspots:= *nspotsOpt 73 233 size := *work_sizeOpt 74 234 cnt := *countOpt 75 235 share := *shareOpt 76 236 237 if nspots == 0 { nspots = nthreads - nprocs; } 238 239 // Check params 77 240 if ! (nthreads > nprocs) { 78 241 fmt.Fprintf(os.Stderr, "Must have more threads than procs\n") … … 80 243 } 81 244 82 barrierStart := make(chan struct{})83 barrierSt op := make(chan struct{})84 threads_left = int64(nthreads )85 result := make(chan uint64)86 channels := make([] chan [] uint64, nthreads - nprocs)245 // Make global data 246 barrierStart := make(chan struct{}) // Barrier used at the start 247 threads_left = int64(nthreads - nspots) // Counter for active threads (not 'nthreads' because at all times 'nthreads - nprocs' are blocked) 248 result := make(chan Result) // Channel for results 249 channels := make([]Spot, nspots) // Number of spots 87 250 for i := range channels { 88 channels[i] = make(chan [] uint64, 1) 89 } 90 251 channels[i] = Spot{[16]uint64{0},uintptr(0), i,[16]uint64{0}} // init spots 252 } 253 254 // start the goroutines 91 255 for i := 0; i < nthreads; i++ { 92 go local(result, barrierStart, barrierStop, size, cnt, channels, uint64(nthreads - nprocs), share)256 go local(result, barrierStart, size, cnt, channels, share, i) 93 257 } 94 258 fmt.Printf("Starting\n"); 95 259 260 atomic.StoreInt32(&stop, 0) 96 261 start := time.Now() 97 close(barrierStart) 98 99 wait(start, true); 100 101 close(barrierStop)262 close(barrierStart) // release barrier 263 264 wait(start, true); // general benchmark wait 265 266 atomic.StoreInt32(&stop, 1) 102 267 end := time.Now() 103 268 delta := end.Sub(start) … … 105 270 fmt.Printf("\nDone\n") 106 271 107 global_counter := uint64(0) 272 // release all the blocked threads 273 for i := range channels { 274 channels[i].release() 275 } 276 277 // Join and accumulate results 278 results := NewResult() 108 279 for i := 0; i < nthreads; i++ { 109 global_counter += <- result 110 } 111 280 r := <- result 281 results.count += r.count 282 results.gmigs += r.gmigs 283 results.dmigs += r.dmigs 284 } 285 286 // Print with nice 's, i.e. 1'000'000 instead of 1000000 112 287 p := message.NewPrinter(language.English) 113 p.Printf("Duration ( ms): %f\n", delta.Seconds());288 p.Printf("Duration (s) : %f\n", delta.Seconds()); 114 289 p.Printf("Number of processors : %d\n", nprocs); 115 290 p.Printf("Number of threads : %d\n", nthreads); 116 291 p.Printf("Work size (64bit words): %d\n", size); 117 p.Printf("Total Operations(ops) : %15d\n", global_counter) 118 p.Printf("Ops per second : %18.2f\n", float64(global_counter) / delta.Seconds()) 119 p.Printf("ns per ops : %18.2f\n", float64(delta.Nanoseconds()) / float64(global_counter)) 120 p.Printf("Ops per threads : %15d\n", global_counter / uint64(nthreads)) 121 p.Printf("Ops per procs : %15d\n", global_counter / uint64(nprocs)) 122 p.Printf("Ops/sec/procs : %18.2f\n", (float64(global_counter) / float64(nprocs)) / delta.Seconds()) 123 p.Printf("ns per ops/procs : %18.2f\n", float64(delta.Nanoseconds()) / (float64(global_counter) / float64(nprocs))) 124 } 292 p.Printf("Total Operations(ops) : %15d\n", results.count) 293 p.Printf("Total G Migrations : %15d\n", results.gmigs) 294 p.Printf("Total D Migrations : %15d\n", results.dmigs) 295 p.Printf("Ops per second : %18.2f\n", float64(results.count) / delta.Seconds()) 296 p.Printf("ns per ops : %18.2f\n", float64(delta.Nanoseconds()) / float64(results.count)) 297 p.Printf("Ops per threads : %15d\n", results.count / uint64(nthreads)) 298 p.Printf("Ops per procs : %15d\n", results.count / uint64(nprocs)) 299 p.Printf("Ops/sec/procs : %18.2f\n", (float64(results.count) / float64(nprocs)) / delta.Seconds()) 300 p.Printf("ns per ops/procs : %18.2f\n", float64(delta.Nanoseconds()) / (float64(results.count) / float64(nprocs))) 301 }
Note: See TracChangeset
for help on using the changeset viewer.