Changeset 089b1a9a
- Timestamp:
- Dec 18, 2020, 12:26:22 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- ebd1899
- Parents:
- 636d45f5 (diff), 41cde266 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/readyQ/locality.go
r636d45f5 r089b1a9a 2 2 3 3 import ( 4 "context" 4 5 "flag" 5 6 "fmt" 6 7 "math/rand" 7 8 "os" 9 "syscall" 8 10 "sync/atomic" 9 11 "time" 12 "unsafe" 13 "golang.org/x/sync/semaphore" 10 14 "golang.org/x/text/language" 11 15 "golang.org/x/text/message" 12 16 ) 13 17 14 func handshake(stop chan struct {}, c chan [] uint64, data [] uint64, share bool) (bool, [] uint64) { 15 var s [] uint64 = data 16 if !share { 17 s = nil 18 } 19 20 // send the data 21 select { 22 case <- stop: 23 return true, nil 24 case c <- s: 25 } 26 27 // get the new data chunk 28 select { 29 case <- stop: 30 return true, nil 31 case n := <- c: 32 if share { 33 return false, n 34 } 35 return false, data 36 } 37 } 38 39 func local(result chan uint64, start chan struct{}, stop chan struct{}, size uint64, cnt uint64, channels []chan [] uint64, chan_cnt uint64, share bool) { 18 // ================================================== 19 type MyData struct { 20 ttid int 21 id int 22 data [] uint64 23 } 24 25 func NewData(id int, size uint64) (*MyData) { 40 26 var data [] uint64 41 27 data = make([]uint64, size) … … 43 29 data[i] = 0 44 30 } 45 count := uint64(0) 31 return &MyData{syscall.Gettid(), id, data} 32 } 33 34 func (this * MyData) moved( ttid int ) (uint64) { 35 if this.ttid == ttid { 36 return 0 37 } 38 this.ttid = ttid 39 return 1 40 } 41 42 func (this * MyData) access( idx uint64 ) { 43 this.data[idx % uint64(len(this.data))] += 1 44 } 45 46 // ================================================== 47 type MyCtx struct { 48 s * semaphore.Weighted 49 d unsafe.Pointer 50 c context.Context 51 ttid int 52 id int 53 } 54 55 func NewCtx( data * MyData, id int ) (MyCtx) { 56 r := MyCtx{semaphore.NewWeighted(1), unsafe.Pointer(data), context.Background(), syscall.Gettid(), id} 57 r.s.Acquire(context.Background(), 1) 58 return r 59 } 60 61 func (this * MyCtx) moved( ttid int ) (uint64) { 62 if this.ttid == ttid { 63 return 0 64 } 65 this.ttid = ttid 66 return 1 67 } 68 69 // ================================================== 70 // Atomic object where a single thread can wait 71 // May exchanges data 72 type Spot struct { 73 ptr uintptr // atomic variable use fo MES 74 id int // id for debugging 75 } 76 77 // Main handshake of the code 78 // Single seat, first thread arriving waits 79 // Next threads unblocks current one and blocks in its place 80 // if share == true, exchange data in the process 81 func (this * Spot) put( ctx * MyCtx, data * MyData, share bool) (* MyData, bool) { 82 new := uintptr(unsafe.Pointer(ctx)) 83 // old_d := ctx.d 84 85 // Attempt to CAS our context into the seat 86 var raw uintptr 87 for true { 88 raw = this.ptr 89 if raw == uintptr(1) { // Seat is closed, return 90 return nil, true 91 } 92 if atomic.CompareAndSwapUintptr(&this.ptr, raw, new) { 93 break // We got the seat 94 } 95 } 96 97 // If we aren't the fist in, wake someone 98 if raw != uintptr(0) { 99 var val *MyCtx 100 val = (*MyCtx)(unsafe.Pointer(raw)) 101 102 // If we are sharing, give them our data 103 if share { 104 // fmt.Printf("[%d] - %d update %d: %p -> %p\n", this.id, ctx.id, val.id, val.d, data) 105 atomic.StorePointer(&val.d, unsafe.Pointer(data)) 106 } 107 108 // Wake them up 109 // fmt.Printf("[%d] - %d release %d\n", this.id, ctx.id, val.id) 110 val.s.Release(1) 111 } 112 113 // fmt.Printf("[%d] - %d enter\n", this.id, ctx.id) 114 115 // Block once on the seat 116 ctx.s.Acquire(ctx.c, 1) 117 118 // Someone woke us up, get the new data 119 ret := (* MyData)(atomic.LoadPointer(&ctx.d)) 120 // fmt.Printf("[%d] - %d leave: %p -> %p\n", this.id, ctx.id, ret, old_d) 121 122 return ret, false 123 } 124 125 // Shutdown the spot 126 // Wake current thread and mark seat as closed 127 func (this * Spot) release() { 128 val := (*MyCtx)(unsafe.Pointer(atomic.SwapUintptr(&this.ptr, uintptr(1)))) 129 if val == nil { 130 return 131 } 132 133 // Someone was there, release them 134 val.s.Release(1) 135 } 136 137 // ================================================== 138 // Struct for result, Go doesn't support passing tuple in channels 139 type Result struct { 140 count uint64 141 gmigs uint64 142 dmigs uint64 143 } 144 145 func NewResult() (Result) { 146 return Result{0, 0, 0} 147 } 148 149 // ================================================== 150 // Random number generator, Go's native one is to slow and global 151 func __xorshift64( state * uint64 ) (uint64) { 152 x := *state 153 x ^= x << 13 154 x ^= x >> 7 155 x ^= x << 17 156 *state = x 157 return x 158 } 159 160 // ================================================== 161 // Do some work by accessing 'cnt' cells in the array 162 func work(data * MyData, cnt uint64, state * uint64) { 163 for i := uint64(0); i < cnt; i++ { 164 data.access(__xorshift64(state)) 165 } 166 } 167 168 // Main body of the threads 169 func local(result chan Result, start chan struct{}, size uint64, cnt uint64, channels [] Spot, share bool, id int) { 170 // Initialize some data 171 state := rand.Uint64() // RNG state 172 data := NewData(id, size) // Starting piece of data 173 ctx := NewCtx(data, id) // Goroutine local context 174 175 // Prepare results 176 r := NewResult() 177 178 // Wait for start 46 179 <- start 180 181 // Main loop 47 182 for true { 48 for i := uint64(0); i < cnt; i++ {49 data[rand.Uint64() % size] += 150 } 51 52 i := rand.Uint64() % chan_cnt183 // Touch our current data, write to invalidate remote cache lines 184 work(data, cnt, &state) 185 186 // Wait on a random spot 187 i := __xorshift64(&state) % uint64(len(channels)) 53 188 var closed bool 54 closed, data = handshake(stop, channels[i], data, share) 55 count += 1 56 57 if closed { break } 58 if !clock_mode && count >= stop_count { break } 59 } 60 189 data, closed = channels[i].put(&ctx, data, share) 190 191 // Check if the experiment is over 192 if closed { break } // yes, spot was closed 193 if clock_mode && atomic.LoadInt32(&stop) == 1 { break } // yes, time's up 194 if !clock_mode && r.count >= stop_count { break } // yes, iterations reached 195 196 // Check everything is consistent 197 if uint64(len(data.data)) != size { panic("Data has weird size") } 198 199 // write down progress and check migrations 200 ttid := syscall.Gettid() 201 r.count += 1 202 r.gmigs += ctx .moved(ttid) 203 r.dmigs += data.moved(ttid) 204 } 205 206 // Mark goroutine as done 61 207 atomic.AddInt64(&threads_left, -1); 62 result <- count 63 } 64 208 209 // return result 210 result <- r 211 } 212 213 // ================================================== 214 // Program main 65 215 func main() { 66 216 // Benchmark specific command line arguments 67 217 work_sizeOpt := flag.Uint64("w", 2 , "Number of words (uint64) per threads") 68 218 countOpt := flag.Uint64("c", 2 , "Number of words (uint64) to touch") 69 219 shareOpt := flag.Bool ("s", false, "Pass the work data to the next thread when blocking") 70 220 71 bench_init() 72 221 // General benchmark initialization and deinitialization 222 defer bench_init()() 223 224 // Eval command line arguments 73 225 size := *work_sizeOpt 74 226 cnt := *countOpt 75 227 share := *shareOpt 76 228 229 // Check params 77 230 if ! (nthreads > nprocs) { 78 231 fmt.Fprintf(os.Stderr, "Must have more threads than procs\n") … … 80 233 } 81 234 82 barrierStart := make(chan struct{})83 barrierSt op := make(chan struct{})84 threads_left = int64(n threads)85 result := make(chan uint64)86 channels := make([] chan [] uint64, nthreads - nprocs)235 // Make global data 236 barrierStart := make(chan struct{}) // Barrier used at the start 237 threads_left = int64(nprocs) // Counter for active threads (not 'nthreads' because at all times 'nthreads - nprocs' are blocked) 238 result := make(chan Result) // Channel for results 239 channels := make([]Spot, nthreads - nprocs) // Number of spots 87 240 for i := range channels { 88 channels[i] = make(chan [] uint64, 1) 89 } 90 241 channels[i] = Spot{uintptr(0), i} // init spots 242 } 243 244 // start the goroutines 91 245 for i := 0; i < nthreads; i++ { 92 go local(result, barrierStart, barrierStop, size, cnt, channels, uint64(nthreads - nprocs), share)246 go local(result, barrierStart, size, cnt, channels, share, i) 93 247 } 94 248 fmt.Printf("Starting\n"); 95 249 250 atomic.StoreInt32(&stop, 0) 96 251 start := time.Now() 97 close(barrierStart) 98 99 wait(start, true); 100 101 close(barrierStop)252 close(barrierStart) // release barrier 253 254 wait(start, true); // general benchmark wait 255 256 atomic.StoreInt32(&stop, 1) 102 257 end := time.Now() 103 258 delta := end.Sub(start) … … 105 260 fmt.Printf("\nDone\n") 106 261 107 global_counter := uint64(0) 262 // release all the blocked threads 263 for i := range channels { 264 channels[i].release() 265 } 266 267 // Join and accumulate results 268 global_result := NewResult() 108 269 for i := 0; i < nthreads; i++ { 109 global_counter += <- result 110 } 111 270 r := <- result 271 global_result.count += r.count 272 global_result.gmigs += r.gmigs 273 global_result.dmigs += r.dmigs 274 } 275 276 // Print with nice 's, i.e. 1'000'000 instead of 1000000 112 277 p := message.NewPrinter(language.English) 113 278 p.Printf("Duration (ms) : %f\n", delta.Seconds()); … … 115 280 p.Printf("Number of threads : %d\n", nthreads); 116 281 p.Printf("Work size (64bit words): %d\n", size); 117 p.Printf("Total Operations(ops) : %15d\n", global_counter) 118 p.Printf("Ops per second : %18.2f\n", float64(global_counter) / delta.Seconds()) 119 p.Printf("ns per ops : %18.2f\n", float64(delta.Nanoseconds()) / float64(global_counter)) 120 p.Printf("Ops per threads : %15d\n", global_counter / uint64(nthreads)) 121 p.Printf("Ops per procs : %15d\n", global_counter / uint64(nprocs)) 122 p.Printf("Ops/sec/procs : %18.2f\n", (float64(global_counter) / float64(nprocs)) / delta.Seconds()) 123 p.Printf("ns per ops/procs : %18.2f\n", float64(delta.Nanoseconds()) / (float64(global_counter) / float64(nprocs))) 124 } 282 p.Printf("Total Operations(ops) : %15d\n", global_result.count) 283 p.Printf("Total G Migrations : %15d\n", global_result.gmigs) 284 p.Printf("Total D Migrations : %15d\n", global_result.dmigs) 285 p.Printf("Ops per second : %18.2f\n", float64(global_result.count) / delta.Seconds()) 286 p.Printf("ns per ops : %18.2f\n", float64(delta.Nanoseconds()) / float64(global_result.count)) 287 p.Printf("Ops per threads : %15d\n", global_result.count / uint64(nthreads)) 288 p.Printf("Ops per procs : %15d\n", global_result.count / uint64(nprocs)) 289 p.Printf("Ops/sec/procs : %18.2f\n", (float64(global_result.count) / float64(nprocs)) / delta.Seconds()) 290 p.Printf("ns per ops/procs : %18.2f\n", float64(delta.Nanoseconds()) / (float64(global_result.count) / float64(nprocs))) 291 }
Note: See TracChangeset
for help on using the changeset viewer.