Changeset c5a98f3 for benchmark/readyQ
- Timestamp:
- Dec 17, 2020, 4:18:15 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 41cde266
- Parents:
- aa1d13c
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/readyQ/locality.go
raa1d13c rc5a98f3 53 53 } 54 54 55 func NewCtx( sem * semaphore.Weighted, data * MyData, id int ) (MyCtx) { 56 return MyCtx{sem, unsafe.Pointer(data), context.Background(), syscall.Gettid(), id} 55 func NewCtx( data * MyData, id int ) (MyCtx) { 56 r := MyCtx{semaphore.NewWeighted(1), unsafe.Pointer(data), context.Background(), syscall.Gettid(), id} 57 r.s.Acquire(context.Background(), 1) 58 return r 57 59 } 58 60 … … 66 68 67 69 // ================================================== 70 // Atomic object where a single thread can wait 71 // May exchanges data 68 72 type Spot struct { 69 ptr uintptr 70 id int 73 ptr uintptr // atomic variable use fo MES 74 id int // id for debugging 71 75 } 72 76 … … 78 82 new := uintptr(unsafe.Pointer(ctx)) 79 83 // old_d := ctx.d 84 85 // Attempt to CAS our context into the seat 80 86 var raw uintptr 81 87 for true { 82 88 raw = this.ptr 83 if raw == uintptr(1) { 89 if raw == uintptr(1) { // Seat is closed, return 84 90 return nil, true 85 91 } 86 92 if atomic.CompareAndSwapUintptr(&this.ptr, raw, new) { 87 break 93 break // We got the seat 88 94 } 89 95 } 90 96 97 // If we aren't the fist in, wake someone 91 98 if raw != uintptr(0) { 92 99 var val *MyCtx 93 100 val = (*MyCtx)(unsafe.Pointer(raw)) 101 102 // If we are sharing, give them our data 94 103 if share { 95 104 // fmt.Printf("[%d] - %d update %d: %p -> %p\n", this.id, ctx.id, val.id, val.d, data) … … 97 106 } 98 107 108 // Wake them up 99 109 // fmt.Printf("[%d] - %d release %d\n", this.id, ctx.id, val.id) 100 110 val.s.Release(1) … … 102 112 103 113 // fmt.Printf("[%d] - %d enter\n", this.id, ctx.id) 114 115 // Block once on the seat 104 116 ctx.s.Acquire(ctx.c, 1) 117 118 // Someone woke us up, get the new data 105 119 ret := (* MyData)(atomic.LoadPointer(&ctx.d)) 106 120 // fmt.Printf("[%d] - %d leave: %p -> %p\n", this.id, ctx.id, ret, old_d) … … 109 123 } 110 124 125 // Shutdown the spot 126 // Wake current thread and mark seat as closed 111 127 func (this * Spot) release() { 112 128 val := (*MyCtx)(unsafe.Pointer(atomic.SwapUintptr(&this.ptr, uintptr(1)))) … … 115 131 } 116 132 133 // Someone was there, release them 117 134 val.s.Release(1) 118 135 } 119 136 120 137 // ================================================== 138 // Struct for result, Go doesn't support passing tuple in channels 121 139 type Result struct { 122 140 count uint64 … … 130 148 131 149 // ================================================== 150 // Random number generator, Go's native one is to slow and global 132 151 func __xorshift64( state * uint64 ) (uint64) { 133 152 x := *state … … 139 158 } 140 159 160 // ================================================== 161 // Do some work by accessing 'cnt' cells in the array 141 162 func work(data * MyData, cnt uint64, state * uint64) { 142 163 for i := uint64(0); i < cnt; i++ { … … 145 166 } 146 167 168 // Main body of the threads 147 169 func local(result chan Result, start chan struct{}, size uint64, cnt uint64, channels [] Spot, share bool, id int) { 148 state := rand.Uint64() 149 150 data := NewData(id, size) 151 sem := semaphore.NewWeighted(1) 152 sem.Acquire(context.Background(), 1) 153 ctx := NewCtx(sem, data, id) 154 170 // Initialize some data 171 state := rand.Uint64() // RNG state 172 data := NewData(id, size) // Starting piece of data 173 ctx := NewCtx(data, id) // Goroutine local context 174 175 // Prepare results 155 176 r := NewResult() 177 178 // Wait for start 156 179 <- start 180 181 // Main loop 157 182 for true { 183 // Touch our current data, write to invalidate remote cache lines 158 184 work(data, cnt, &state) 159 185 186 // Wait on a random spot 160 187 i := __xorshift64(&state) % uint64(len(channels)) 161 188 var closed bool 162 189 data, closed = channels[i].put(&ctx, data, share) 163 190 164 if closed { break } 165 if clock_mode && atomic.LoadInt32(&stop) == 1 { break } 166 if !clock_mode && r.count >= stop_count { break } 167 if uint64(len(data.data)) != size { 168 panic("Data has weird size") 169 } 170 191 // Check if the experiment is over 192 if closed { break } // yes, spot was closed 193 if clock_mode && atomic.LoadInt32(&stop) == 1 { break } // yes, time's up 194 if !clock_mode && r.count >= stop_count { break } // yes, iterations reached 195 196 // Check everything is consistent 197 if uint64(len(data.data)) != size { panic("Data has weird size") } 198 199 // write down progress and check migrations 171 200 ttid := syscall.Gettid() 172 201 r.count += 1 … … 175 204 } 176 205 206 // Mark goroutine as done 177 207 atomic.AddInt64(&threads_left, -1); 208 209 // return result 178 210 result <- r 179 211 } 180 212 213 // ================================================== 214 // Program main 181 215 func main() { 216 // Benchmark specific command line arguments 182 217 work_sizeOpt := flag.Uint64("w", 2 , "Number of words (uint64) per threads") 183 218 countOpt := flag.Uint64("c", 2 , "Number of words (uint64) to touch") 184 219 shareOpt := flag.Bool ("s", false, "Pass the work data to the next thread when blocking") 185 220 221 // General benchmark initialization and deinitialization 186 222 defer bench_init()() 187 223 224 // Eval command line arguments 188 225 size := *work_sizeOpt 189 226 cnt := *countOpt 190 227 share := *shareOpt 191 228 229 // Check params 192 230 if ! (nthreads > nprocs) { 193 231 fmt.Fprintf(os.Stderr, "Must have more threads than procs\n") … … 195 233 } 196 234 197 barrierStart := make(chan struct{}) 198 threads_left = int64(nprocs) 199 result := make(chan Result) 200 channels := make([]Spot, nthreads - nprocs) 235 // Make global data 236 barrierStart := make(chan struct{}) // Barrier used at the start 237 threads_left = int64(nprocs) // Counter for active threads (not 'nthreads' because at all times 'nthreads - nprocs' are blocked) 238 result := make(chan Result) // Channel for results 239 channels := make([]Spot, nthreads - nprocs) // Number of spots 201 240 for i := range channels { 202 channels[i] = Spot{uintptr(0), i} 203 } 204 241 channels[i] = Spot{uintptr(0), i} // init spots 242 } 243 244 // start the goroutines 205 245 for i := 0; i < nthreads; i++ { 206 246 go local(result, barrierStart, size, cnt, channels, share, i) … … 210 250 atomic.StoreInt32(&stop, 0) 211 251 start := time.Now() 212 close(barrierStart) 213 214 wait(start, true); 252 close(barrierStart) // release barrier 253 254 wait(start, true); // general benchmark wait 215 255 216 256 atomic.StoreInt32(&stop, 1) … … 220 260 fmt.Printf("\nDone\n") 221 261 262 // release all the blocked threads 222 263 for i := range channels { 223 264 channels[i].release() 224 265 } 225 266 267 // Join and accumulate results 226 268 global_result := NewResult() 227 269 for i := 0; i < nthreads; i++ { … … 232 274 } 233 275 276 // Print with nice 's, i.e. 1'000'000 instead of 1000000 234 277 p := message.NewPrinter(language.English) 235 278 p.Printf("Duration (ms) : %f\n", delta.Seconds());
Note: See TracChangeset
for help on using the changeset viewer.