Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • benchmark/readyQ/locality.go

    rf03209d3 r90ecade  
    22
    33import (
    4         "context"
    54        "flag"
    65        "fmt"
    76        "math/rand"
    87        "os"
    9         "syscall"
    108        "sync/atomic"
    119        "time"
    12         "unsafe"
    13         "golang.org/x/sync/semaphore"
    1410        "golang.org/x/text/language"
    1511        "golang.org/x/text/message"
    1612)
    1713
    18 // ==================================================
    19 type MyData struct {
    20         _p1 [16]uint64 // padding
    21         ttid int
    22         id int
    23         data [] uint64
    24         _p2 [16]uint64 // padding
     14func handshake(stop chan struct {}, c chan [] uint64, data [] uint64, share bool) (bool, [] uint64) {
     15        var s [] uint64 = data
     16        if !share {
     17                s = nil
     18        }
     19
     20        // send the data
     21        select {
     22        case <- stop:
     23                return true, nil
     24        case c <- s:
     25        }
     26
     27        // get the new data chunk
     28        select {
     29        case <- stop:
     30                return true, nil
     31        case n := <- c:
     32                if share {
     33                        return false, n
     34                }
     35                return false, data
     36        }
    2537}
    2638
    27 func NewData(id int, size uint64) (*MyData) {
     39func local(result chan uint64, start chan struct{}, stop chan struct{}, size uint64, cnt uint64, channels []chan [] uint64, chan_cnt uint64, share bool) {
    2840        var data [] uint64
    2941        data = make([]uint64, size)
     
    3143                data[i] = 0
    3244        }
    33         return &MyData{[16]uint64{0}, syscall.Gettid(), id, data,[16]uint64{0}}
     45        count := uint64(0)
     46        <- start
     47        for true {
     48                for i := uint64(0); i < cnt; i++ {
     49                        data[rand.Uint64() % size] += 1
     50                }
     51
     52                i := rand.Uint64() % chan_cnt
     53                var closed bool
     54                closed, data = handshake(stop, channels[i], data, share)
     55                count += 1
     56
     57                if  closed { break }
     58                if !clock_mode && count >= stop_count { break }
     59        }
     60
     61        atomic.AddInt64(&threads_left, -1);
     62        result <- count
    3463}
    3564
    36 func (this * MyData) moved( ttid int ) (uint64) {
    37         if this.ttid == ttid {
    38                 return 0
    39         }
    40         this.ttid = ttid
    41         return 1
    42 }
     65func main() {
    4366
    44 func (this * MyData) access( idx uint64 ) {
    45         this.data[idx % uint64(len(this.data))] += 1
    46 }
    47 
    48 // ==================================================
    49 type MyCtx struct {
    50         _p1 [16]uint64 // padding
    51         s * semaphore.Weighted
    52         d unsafe.Pointer
    53         c context.Context
    54         ttid int
    55         id int
    56         _p2 [16]uint64 // padding
    57 }
    58 
    59 func NewCtx( data * MyData, id int ) (MyCtx) {
    60         r := MyCtx{[16]uint64{0},semaphore.NewWeighted(1), unsafe.Pointer(data), context.Background(), syscall.Gettid(), id,[16]uint64{0}}
    61         r.s.Acquire(context.Background(), 1)
    62         return r
    63 }
    64 
    65 func (this * MyCtx) moved( ttid int ) (uint64) {
    66         if this.ttid == ttid {
    67                 return 0
    68         }
    69         this.ttid = ttid
    70         return 1
    71 }
    72 
    73 // ==================================================
    74 // Atomic object where a single thread can wait
    75 // May exchanges data
    76 type Spot struct {
    77         _p1 [16]uint64 // padding
    78         ptr uintptr // atomic variable use fo MES
    79         id int      // id for debugging
    80         _p2 [16]uint64 // padding
    81 }
    82 
    83 // Main handshake of the code
    84 // Single seat, first thread arriving waits
    85 // Next threads unblocks current one and blocks in its place
    86 // if share == true, exchange data in the process
    87 func (this * Spot) put( ctx * MyCtx, data * MyData, share bool) (* MyData, bool) {
    88         new := uintptr(unsafe.Pointer(ctx))
    89         // old_d := ctx.d
    90 
    91         // Attempt to CAS our context into the seat
    92         var raw uintptr
    93         for true {
    94                 raw = this.ptr
    95                 if raw == uintptr(1) { // Seat is closed, return
    96                         return nil, true
    97                 }
    98                 if atomic.CompareAndSwapUintptr(&this.ptr, raw, new) {
    99                         break // We got the seat
    100                 }
    101         }
    102 
    103         // If we aren't the fist in, wake someone
    104         if raw != uintptr(0) {
    105                 var val *MyCtx
    106                 val = (*MyCtx)(unsafe.Pointer(raw))
    107 
    108                 // If we are sharing, give them our data
    109                 if share {
    110                         // fmt.Printf("[%d] - %d update %d: %p -> %p\n", this.id, ctx.id, val.id, val.d, data)
    111                         atomic.StorePointer(&val.d, unsafe.Pointer(data))
    112                 }
    113 
    114                 // Wake them up
    115                 // fmt.Printf("[%d] - %d release %d\n", this.id, ctx.id, val.id)
    116                 val.s.Release(1)
    117         }
    118 
    119         // fmt.Printf("[%d] - %d enter\n", this.id, ctx.id)
    120 
    121         // Block once on the seat
    122         ctx.s.Acquire(ctx.c, 1)
    123 
    124         // Someone woke us up, get the new data
    125         ret := (* MyData)(atomic.LoadPointer(&ctx.d))
    126         // fmt.Printf("[%d] - %d leave: %p -> %p\n", this.id, ctx.id, ret, old_d)
    127 
    128         return ret, false
    129 }
    130 
    131 // Shutdown the spot
    132 // Wake current thread and mark seat as closed
    133 func (this * Spot) release() {
    134         val := (*MyCtx)(unsafe.Pointer(atomic.SwapUintptr(&this.ptr, uintptr(1))))
    135         if val == nil {
    136                 return
    137         }
    138 
    139         // Someone was there, release them
    140         val.s.Release(1)
    141 }
    142 
    143 // ==================================================
    144 // Struct for result, Go doesn't support passing tuple in channels
    145 type Result struct {
    146         count uint64
    147         gmigs uint64
    148         dmigs uint64
    149 }
    150 
    151 func NewResult() (Result) {
    152         return Result{0, 0, 0}
    153 }
    154 
    155 // ==================================================
    156 // Random number generator, Go's native one is to slow and global
    157 func __xorshift64( state * uint64 ) (uint64) {
    158         x := *state
    159         x ^= x << 13
    160         x ^= x >> 7
    161         x ^= x << 17
    162         *state = x
    163         return x
    164 }
    165 
    166 // ==================================================
    167 // Do some work by accessing 'cnt' cells in the array
    168 func work(data * MyData, cnt uint64, state * uint64) {
    169         for i := uint64(0); i < cnt; i++ {
    170                 data.access(__xorshift64(state))
    171         }
    172 }
    173 
    174 // Main body of the threads
    175 func local(result chan Result, start chan struct{}, size uint64, cnt uint64, channels [] Spot, share bool, id int) {
    176         // Initialize some data
    177         state := rand.Uint64()    // RNG state
    178         data := NewData(id, size) // Starting piece of data
    179         ctx := NewCtx(data, id)   // Goroutine local context
    180 
    181         // Prepare results
    182         r := NewResult()
    183 
    184         // Wait for start
    185         <- start
    186 
    187         // Main loop
    188         for true {
    189                 // Touch our current data, write to invalidate remote cache lines
    190                 work(data, cnt, &state)
    191 
    192                 // Wait on a random spot
    193                 i := __xorshift64(&state) % uint64(len(channels))
    194                 var closed bool
    195                 data, closed = channels[i].put(&ctx, data, share)
    196 
    197                 // Check if the experiment is over
    198                 if closed { break }                                       // yes, spot was closed
    199                 if  clock_mode && atomic.LoadInt32(&stop) == 1 { break }  // yes, time's up
    200                 if !clock_mode && r.count >= stop_count { break }         // yes, iterations reached
    201 
    202                 // Check everything is consistent
    203                 if uint64(len(data.data)) != size { panic("Data has weird size") }
    204 
    205                 // write down progress and check migrations
    206                 ttid := syscall.Gettid()
    207                 r.count += 1
    208                 r.gmigs += ctx .moved(ttid)
    209                 r.dmigs += data.moved(ttid)
    210         }
    211 
    212         // Mark goroutine as done
    213         atomic.AddInt64(&threads_left, -1);
    214 
    215         // return result
    216         result <- r
    217 }
    218 
    219 // ==================================================
    220 // Program main
    221 func main() {
    222         // Benchmark specific command line arguments
    223         nspotsOpt    := flag.Int   ("n", 0    , "Number of spots where threads sleep (nthreads - nspots are active at the same time)")
    224         work_sizeOpt := flag.Uint64("w", 2    , "Size of the array for each threads, in words (64bit)")
    225         countOpt     := flag.Uint64("c", 2    , "Number of words to touch when working (random pick, cells can be picked more than once)")
     67        work_sizeOpt := flag.Uint64("w", 2    , "Number of words (uint64) per threads")
     68        countOpt     := flag.Uint64("c", 2    , "Number of words (uint64) to touch")
    22669        shareOpt     := flag.Bool  ("s", false, "Pass the work data to the next thread when blocking")
    22770
    228         // General benchmark initialization and deinitialization
    229         defer bench_init()()
     71        bench_init()
    23072
    231         // Eval command line arguments
    232         nspots:= *nspotsOpt
    23373        size  := *work_sizeOpt
    23474        cnt   := *countOpt
    23575        share := *shareOpt
    23676
    237         if nspots == 0 { nspots = nthreads - nprocs; }
    238 
    239         // Check params
    24077        if ! (nthreads > nprocs) {
    24178                fmt.Fprintf(os.Stderr, "Must have more threads than procs\n")
     
    24380        }
    24481
    245         // Make global data
    246         barrierStart := make(chan struct{})         // Barrier used at the start
    247         threads_left = int64(nthreads - nspots)                // Counter for active threads (not 'nthreads' because at all times 'nthreads - nprocs' are blocked)
    248         result  := make(chan Result)                // Channel for results
    249         channels := make([]Spot, nspots) // Number of spots
     82        barrierStart := make(chan struct{})
     83        barrierStop  := make(chan struct{})
     84        threads_left = int64(nthreads)
     85        result  := make(chan uint64)
     86        channels := make([]chan [] uint64, nthreads - nprocs)
    25087        for i := range channels {
    251                 channels[i] = Spot{[16]uint64{0},uintptr(0), i,[16]uint64{0}}     // init spots
     88                channels[i] = make(chan [] uint64, 1)
    25289        }
    25390
    254         // start the goroutines
    25591        for i := 0; i < nthreads; i++ {
    256                 go local(result, barrierStart, size, cnt, channels, share, i)
     92                go local(result, barrierStart, barrierStop, size, cnt, channels, uint64(nthreads - nprocs), share)
    25793        }
    25894        fmt.Printf("Starting\n");
    25995
    260         atomic.StoreInt32(&stop, 0)
    26196        start := time.Now()
    262         close(barrierStart) // release barrier
     97        close(barrierStart)
    26398
    264         wait(start, true);  // general benchmark wait
     99        wait(start, true);
    265100
    266         atomic.StoreInt32(&stop, 1)
     101        close(barrierStop)
    267102        end := time.Now()
    268103        delta := end.Sub(start)
     
    270105        fmt.Printf("\nDone\n")
    271106
    272         // release all the blocked threads
    273         for i := range channels {
    274                 channels[i].release()
     107        global_counter := uint64(0)
     108        for i := 0; i < nthreads; i++ {
     109                global_counter += <- result
    275110        }
    276111
    277         // Join and accumulate results
    278         results := NewResult()
    279         for i := 0; i < nthreads; i++ {
    280                 r := <- result
    281                 results.count += r.count
    282                 results.gmigs += r.gmigs
    283                 results.dmigs += r.dmigs
    284         }
    285 
    286         // Print with nice 's, i.e. 1'000'000 instead of 1000000
    287112        p := message.NewPrinter(language.English)
    288         p.Printf("Duration (s)           : %f\n", delta.Seconds());
     113        p.Printf("Duration (ms)          : %f\n", delta.Seconds());
    289114        p.Printf("Number of processors   : %d\n", nprocs);
    290115        p.Printf("Number of threads      : %d\n", nthreads);
    291116        p.Printf("Work size (64bit words): %d\n", size);
    292         p.Printf("Total Operations(ops)  : %15d\n", results.count)
    293         p.Printf("Total G Migrations     : %15d\n", results.gmigs)
    294         p.Printf("Total D Migrations     : %15d\n", results.dmigs)
    295         p.Printf("Ops per second         : %18.2f\n", float64(results.count) / delta.Seconds())
    296         p.Printf("ns per ops             : %18.2f\n", float64(delta.Nanoseconds()) / float64(results.count))
    297         p.Printf("Ops per threads        : %15d\n", results.count / uint64(nthreads))
    298         p.Printf("Ops per procs          : %15d\n", results.count / uint64(nprocs))
    299         p.Printf("Ops/sec/procs          : %18.2f\n", (float64(results.count) / float64(nprocs)) / delta.Seconds())
    300         p.Printf("ns per ops/procs       : %18.2f\n", float64(delta.Nanoseconds()) / (float64(results.count) / float64(nprocs)))
     117        p.Printf("Total Operations(ops)  : %15d\n", global_counter)
     118        p.Printf("Ops per second         : %18.2f\n", float64(global_counter) / delta.Seconds())
     119        p.Printf("ns per ops             : %18.2f\n", float64(delta.Nanoseconds()) / float64(global_counter))
     120        p.Printf("Ops per threads        : %15d\n", global_counter / uint64(nthreads))
     121        p.Printf("Ops per procs          : %15d\n", global_counter / uint64(nprocs))
     122        p.Printf("Ops/sec/procs          : %18.2f\n", (float64(global_counter) / float64(nprocs)) / delta.Seconds())
     123        p.Printf("ns per ops/procs       : %18.2f\n", float64(delta.Nanoseconds()) / (float64(global_counter) / float64(nprocs)))
    301124}
Note: See TracChangeset for help on using the changeset viewer.