Changeset 2dd0689 for benchmark/readyQ
- Timestamp:
- Dec 16, 2020, 2:37:31 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- fd84538
- Parents:
- 72b1800
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/readyQ/locality.go
r72b1800 r2dd0689 2 2 3 3 import ( 4 "context" 4 5 "flag" 5 6 "fmt" … … 8 9 "sync/atomic" 9 10 "time" 11 "unsafe" 12 "golang.org/x/sync/semaphore" 10 13 "golang.org/x/text/language" 11 14 "golang.org/x/text/message" 12 15 ) 13 16 14 func handshake(stop chan struct {}, c chan [] uint64, data [] uint64, share bool) (bool, [] uint64) { 15 var s [] uint64 = data 16 if !share { 17 s = nil 17 type GoCtx struct { 18 s * semaphore.Weighted 19 d * [] uint64 20 } 21 22 type Spot struct { 23 ptr uintptr 24 } 25 26 func (this * Spot) put( s * semaphore.Weighted, data [] uint64, share bool) ([] uint64) { 27 ctx := GoCtx{s, &data} 28 29 var raw uintptr 30 for true { 31 raw = this.ptr 32 if raw == uintptr(1) { 33 return nil 34 } 35 if atomic.CompareAndSwapUintptr(&this.ptr, raw, uintptr(unsafe.Pointer(&ctx))) { 36 break 37 } 18 38 } 19 39 20 // send the data 21 select { 22 case <- stop: 23 return true, nil 24 case c <- s: 40 if raw != uintptr(0) { 41 val := (*GoCtx)(unsafe.Pointer(raw)) 42 if share { 43 val.d = &data 44 } 45 46 val.s.Release(1) 25 47 } 26 48 27 // get the new data chunk 28 select { 29 case <- stop: 30 return true, nil 31 case n := <- c: 32 if share { 33 return false, n 34 } 35 return false, data 36 } 49 ctx.s.Acquire(context.Background(), 1) 50 51 return *ctx.d 37 52 } 38 53 39 func local(result chan uint64, start chan struct{}, stop chan struct{}, size uint64, cnt uint64, channels []chan [] uint64, chan_cnt uint64, share bool) { 54 func (this * Spot) release() { 55 val := (*GoCtx)(unsafe.Pointer(atomic.SwapUintptr(&this.ptr, uintptr(1)))) 56 if val == nil { 57 return 58 } 59 60 val.s.Release(1) 61 } 62 63 64 65 func local(result chan uint64, start chan struct{}, size uint64, cnt uint64, channels [] Spot, share bool) { 66 lrand := rand.New(rand.NewSource(rand.Int63())) 40 67 var data [] uint64 41 68 data = make([]uint64, size) … … 43 70 data[i] = 0 44 71 } 72 73 sem := semaphore.NewWeighted(1) 74 sem.Acquire(context.Background(), 1) 75 45 76 count := uint64(0) 46 77 <- start 47 78 for true { 48 79 for i := uint64(0); i < cnt; i++ { 49 data[ rand.Uint64() % size] += 180 data[lrand.Uint64() % size] += 1 50 81 } 51 82 52 i := rand.Uint64() % chan_cnt 53 var closed bool 54 closed, data = handshake(stop, channels[i], data, share) 83 i := lrand.Int() % len(channels) 84 data = channels[i].put(sem, data, share) 55 85 count += 1 56 86 57 if clo sed{ break }87 if clock_mode && atomic.LoadInt32(&stop) == 1 { break } 58 88 if !clock_mode && count >= stop_count { break } 89 if uint64(len(data)) != size { 90 panic("Data has weird size") 91 } 59 92 } 60 93 … … 64 97 65 98 func main() { 66 67 99 work_sizeOpt := flag.Uint64("w", 2 , "Number of words (uint64) per threads") 68 100 countOpt := flag.Uint64("c", 2 , "Number of words (uint64) to touch") 69 101 shareOpt := flag.Bool ("s", false, "Pass the work data to the next thread when blocking") 70 102 71 bench_init()103 defer bench_init()() 72 104 73 105 size := *work_sizeOpt … … 81 113 82 114 barrierStart := make(chan struct{}) 83 barrierStop := make(chan struct{})84 115 threads_left = int64(nthreads) 85 116 result := make(chan uint64) 86 channels := make([] chan [] uint64, nthreads - nprocs)117 channels := make([]Spot, nthreads - nprocs) 87 118 for i := range channels { 88 channels[i] = make(chan [] uint64, 1)119 channels[i] = Spot{uintptr(0)} 89 120 } 90 121 91 122 for i := 0; i < nthreads; i++ { 92 go local(result, barrierStart, barrierStop, size, cnt, channels, uint64(nthreads - nprocs), share)123 go local(result, barrierStart, size, cnt, channels, share) 93 124 } 94 125 fmt.Printf("Starting\n"); 95 126 127 atomic.StoreInt32(&stop, 0) 96 128 start := time.Now() 97 129 close(barrierStart) … … 99 131 wait(start, true); 100 132 101 close(barrierStop)133 atomic.StoreInt32(&stop, 1) 102 134 end := time.Now() 103 135 delta := end.Sub(start) 104 136 105 137 fmt.Printf("\nDone\n") 138 139 for i := range channels { 140 channels[i].release() 141 } 106 142 107 143 global_counter := uint64(0)
Note: See TracChangeset
for help on using the changeset viewer.