Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • benchmark/readyQ/locality.go

    r90ecade rf03209d3  
    22
    33import (
     4        "context"
    45        "flag"
    56        "fmt"
    67        "math/rand"
    78        "os"
     9        "syscall"
    810        "sync/atomic"
    911        "time"
     12        "unsafe"
     13        "golang.org/x/sync/semaphore"
    1014        "golang.org/x/text/language"
    1115        "golang.org/x/text/message"
    1216)
    1317
    14 func handshake(stop chan struct {}, c chan [] uint64, data [] uint64, share bool) (bool, [] uint64) {
    15         var s [] uint64 = data
    16         if !share {
    17                 s = nil
    18         }
    19 
    20         // send the data
    21         select {
    22         case <- stop:
    23                 return true, nil
    24         case c <- s:
    25         }
    26 
    27         // get the new data chunk
    28         select {
    29         case <- stop:
    30                 return true, nil
    31         case n := <- c:
    32                 if share {
    33                         return false, n
    34                 }
    35                 return false, data
    36         }
    37 }
    38 
    39 func local(result chan uint64, start chan struct{}, stop chan struct{}, size uint64, cnt uint64, channels []chan [] uint64, chan_cnt uint64, share bool) {
     18// ==================================================
     19type MyData struct {
     20        _p1 [16]uint64 // padding
     21        ttid int
     22        id int
     23        data [] uint64
     24        _p2 [16]uint64 // padding
     25}
     26
     27func NewData(id int, size uint64) (*MyData) {
    4028        var data [] uint64
    4129        data = make([]uint64, size)
     
    4331                data[i] = 0
    4432        }
    45         count := uint64(0)
     33        return &MyData{[16]uint64{0}, syscall.Gettid(), id, data,[16]uint64{0}}
     34}
     35
     36func (this * MyData) moved( ttid int ) (uint64) {
     37        if this.ttid == ttid {
     38                return 0
     39        }
     40        this.ttid = ttid
     41        return 1
     42}
     43
     44func (this * MyData) access( idx uint64 ) {
     45        this.data[idx % uint64(len(this.data))] += 1
     46}
     47
     48// ==================================================
     49type MyCtx struct {
     50        _p1 [16]uint64 // padding
     51        s * semaphore.Weighted
     52        d unsafe.Pointer
     53        c context.Context
     54        ttid int
     55        id int
     56        _p2 [16]uint64 // padding
     57}
     58
     59func NewCtx( data * MyData, id int ) (MyCtx) {
     60        r := MyCtx{[16]uint64{0},semaphore.NewWeighted(1), unsafe.Pointer(data), context.Background(), syscall.Gettid(), id,[16]uint64{0}}
     61        r.s.Acquire(context.Background(), 1)
     62        return r
     63}
     64
     65func (this * MyCtx) moved( ttid int ) (uint64) {
     66        if this.ttid == ttid {
     67                return 0
     68        }
     69        this.ttid = ttid
     70        return 1
     71}
     72
     73// ==================================================
     74// Atomic object where a single thread can wait
     75// May exchanges data
     76type Spot struct {
     77        _p1 [16]uint64 // padding
     78        ptr uintptr // atomic variable use fo MES
     79        id int      // id for debugging
     80        _p2 [16]uint64 // padding
     81}
     82
     83// Main handshake of the code
     84// Single seat, first thread arriving waits
     85// Next threads unblocks current one and blocks in its place
     86// if share == true, exchange data in the process
     87func (this * Spot) put( ctx * MyCtx, data * MyData, share bool) (* MyData, bool) {
     88        new := uintptr(unsafe.Pointer(ctx))
     89        // old_d := ctx.d
     90
     91        // Attempt to CAS our context into the seat
     92        var raw uintptr
     93        for true {
     94                raw = this.ptr
     95                if raw == uintptr(1) { // Seat is closed, return
     96                        return nil, true
     97                }
     98                if atomic.CompareAndSwapUintptr(&this.ptr, raw, new) {
     99                        break // We got the seat
     100                }
     101        }
     102
     103        // If we aren't the fist in, wake someone
     104        if raw != uintptr(0) {
     105                var val *MyCtx
     106                val = (*MyCtx)(unsafe.Pointer(raw))
     107
     108                // If we are sharing, give them our data
     109                if share {
     110                        // fmt.Printf("[%d] - %d update %d: %p -> %p\n", this.id, ctx.id, val.id, val.d, data)
     111                        atomic.StorePointer(&val.d, unsafe.Pointer(data))
     112                }
     113
     114                // Wake them up
     115                // fmt.Printf("[%d] - %d release %d\n", this.id, ctx.id, val.id)
     116                val.s.Release(1)
     117        }
     118
     119        // fmt.Printf("[%d] - %d enter\n", this.id, ctx.id)
     120
     121        // Block once on the seat
     122        ctx.s.Acquire(ctx.c, 1)
     123
     124        // Someone woke us up, get the new data
     125        ret := (* MyData)(atomic.LoadPointer(&ctx.d))
     126        // fmt.Printf("[%d] - %d leave: %p -> %p\n", this.id, ctx.id, ret, old_d)
     127
     128        return ret, false
     129}
     130
     131// Shutdown the spot
     132// Wake current thread and mark seat as closed
     133func (this * Spot) release() {
     134        val := (*MyCtx)(unsafe.Pointer(atomic.SwapUintptr(&this.ptr, uintptr(1))))
     135        if val == nil {
     136                return
     137        }
     138
     139        // Someone was there, release them
     140        val.s.Release(1)
     141}
     142
     143// ==================================================
     144// Struct for result, Go doesn't support passing tuple in channels
     145type Result struct {
     146        count uint64
     147        gmigs uint64
     148        dmigs uint64
     149}
     150
     151func NewResult() (Result) {
     152        return Result{0, 0, 0}
     153}
     154
     155// ==================================================
     156// Random number generator, Go's native one is to slow and global
     157func __xorshift64( state * uint64 ) (uint64) {
     158        x := *state
     159        x ^= x << 13
     160        x ^= x >> 7
     161        x ^= x << 17
     162        *state = x
     163        return x
     164}
     165
     166// ==================================================
     167// Do some work by accessing 'cnt' cells in the array
     168func work(data * MyData, cnt uint64, state * uint64) {
     169        for i := uint64(0); i < cnt; i++ {
     170                data.access(__xorshift64(state))
     171        }
     172}
     173
     174// Main body of the threads
     175func local(result chan Result, start chan struct{}, size uint64, cnt uint64, channels [] Spot, share bool, id int) {
     176        // Initialize some data
     177        state := rand.Uint64()    // RNG state
     178        data := NewData(id, size) // Starting piece of data
     179        ctx := NewCtx(data, id)   // Goroutine local context
     180
     181        // Prepare results
     182        r := NewResult()
     183
     184        // Wait for start
    46185        <- start
     186
     187        // Main loop
    47188        for true {
    48                 for i := uint64(0); i < cnt; i++ {
    49                         data[rand.Uint64() % size] += 1
    50                 }
    51 
    52                 i := rand.Uint64() % chan_cnt
     189                // Touch our current data, write to invalidate remote cache lines
     190                work(data, cnt, &state)
     191
     192                // Wait on a random spot
     193                i := __xorshift64(&state) % uint64(len(channels))
    53194                var closed bool
    54                 closed, data = handshake(stop, channels[i], data, share)
    55                 count += 1
    56 
    57                 if  closed { break }
    58                 if !clock_mode && count >= stop_count { break }
    59         }
    60 
     195                data, closed = channels[i].put(&ctx, data, share)
     196
     197                // Check if the experiment is over
     198                if closed { break }                                       // yes, spot was closed
     199                if  clock_mode && atomic.LoadInt32(&stop) == 1 { break }  // yes, time's up
     200                if !clock_mode && r.count >= stop_count { break }         // yes, iterations reached
     201
     202                // Check everything is consistent
     203                if uint64(len(data.data)) != size { panic("Data has weird size") }
     204
     205                // write down progress and check migrations
     206                ttid := syscall.Gettid()
     207                r.count += 1
     208                r.gmigs += ctx .moved(ttid)
     209                r.dmigs += data.moved(ttid)
     210        }
     211
     212        // Mark goroutine as done
    61213        atomic.AddInt64(&threads_left, -1);
    62         result <- count
    63 }
    64 
     214
     215        // return result
     216        result <- r
     217}
     218
     219// ==================================================
     220// Program main
    65221func main() {
    66 
    67         work_sizeOpt := flag.Uint64("w", 2    , "Number of words (uint64) per threads")
    68         countOpt     := flag.Uint64("c", 2    , "Number of words (uint64) to touch")
     222        // Benchmark specific command line arguments
     223        nspotsOpt    := flag.Int   ("n", 0    , "Number of spots where threads sleep (nthreads - nspots are active at the same time)")
     224        work_sizeOpt := flag.Uint64("w", 2    , "Size of the array for each threads, in words (64bit)")
     225        countOpt     := flag.Uint64("c", 2    , "Number of words to touch when working (random pick, cells can be picked more than once)")
    69226        shareOpt     := flag.Bool  ("s", false, "Pass the work data to the next thread when blocking")
    70227
    71         bench_init()
    72 
     228        // General benchmark initialization and deinitialization
     229        defer bench_init()()
     230
     231        // Eval command line arguments
     232        nspots:= *nspotsOpt
    73233        size  := *work_sizeOpt
    74234        cnt   := *countOpt
    75235        share := *shareOpt
    76236
     237        if nspots == 0 { nspots = nthreads - nprocs; }
     238
     239        // Check params
    77240        if ! (nthreads > nprocs) {
    78241                fmt.Fprintf(os.Stderr, "Must have more threads than procs\n")
     
    80243        }
    81244
    82         barrierStart := make(chan struct{})
    83         barrierStop  := make(chan struct{})
    84         threads_left = int64(nthreads)
    85         result  := make(chan uint64)
    86         channels := make([]chan [] uint64, nthreads - nprocs)
     245        // Make global data
     246        barrierStart := make(chan struct{})         // Barrier used at the start
     247        threads_left = int64(nthreads - nspots)                // Counter for active threads (not 'nthreads' because at all times 'nthreads - nprocs' are blocked)
     248        result  := make(chan Result)                // Channel for results
     249        channels := make([]Spot, nspots) // Number of spots
    87250        for i := range channels {
    88                 channels[i] = make(chan [] uint64, 1)
    89         }
    90 
     251                channels[i] = Spot{[16]uint64{0},uintptr(0), i,[16]uint64{0}}     // init spots
     252        }
     253
     254        // start the goroutines
    91255        for i := 0; i < nthreads; i++ {
    92                 go local(result, barrierStart, barrierStop, size, cnt, channels, uint64(nthreads - nprocs), share)
     256                go local(result, barrierStart, size, cnt, channels, share, i)
    93257        }
    94258        fmt.Printf("Starting\n");
    95259
     260        atomic.StoreInt32(&stop, 0)
    96261        start := time.Now()
    97         close(barrierStart)
    98 
    99         wait(start, true);
    100 
    101         close(barrierStop)
     262        close(barrierStart) // release barrier
     263
     264        wait(start, true);  // general benchmark wait
     265
     266        atomic.StoreInt32(&stop, 1)
    102267        end := time.Now()
    103268        delta := end.Sub(start)
     
    105270        fmt.Printf("\nDone\n")
    106271
    107         global_counter := uint64(0)
     272        // release all the blocked threads
     273        for i := range channels {
     274                channels[i].release()
     275        }
     276
     277        // Join and accumulate results
     278        results := NewResult()
    108279        for i := 0; i < nthreads; i++ {
    109                 global_counter += <- result
    110         }
    111 
     280                r := <- result
     281                results.count += r.count
     282                results.gmigs += r.gmigs
     283                results.dmigs += r.dmigs
     284        }
     285
     286        // Print with nice 's, i.e. 1'000'000 instead of 1000000
    112287        p := message.NewPrinter(language.English)
    113         p.Printf("Duration (ms)          : %f\n", delta.Seconds());
     288        p.Printf("Duration (s)           : %f\n", delta.Seconds());
    114289        p.Printf("Number of processors   : %d\n", nprocs);
    115290        p.Printf("Number of threads      : %d\n", nthreads);
    116291        p.Printf("Work size (64bit words): %d\n", size);
    117         p.Printf("Total Operations(ops)  : %15d\n", global_counter)
    118         p.Printf("Ops per second         : %18.2f\n", float64(global_counter) / delta.Seconds())
    119         p.Printf("ns per ops             : %18.2f\n", float64(delta.Nanoseconds()) / float64(global_counter))
    120         p.Printf("Ops per threads        : %15d\n", global_counter / uint64(nthreads))
    121         p.Printf("Ops per procs          : %15d\n", global_counter / uint64(nprocs))
    122         p.Printf("Ops/sec/procs          : %18.2f\n", (float64(global_counter) / float64(nprocs)) / delta.Seconds())
    123         p.Printf("ns per ops/procs       : %18.2f\n", float64(delta.Nanoseconds()) / (float64(global_counter) / float64(nprocs)))
    124 }
     292        p.Printf("Total Operations(ops)  : %15d\n", results.count)
     293        p.Printf("Total G Migrations     : %15d\n", results.gmigs)
     294        p.Printf("Total D Migrations     : %15d\n", results.dmigs)
     295        p.Printf("Ops per second         : %18.2f\n", float64(results.count) / delta.Seconds())
     296        p.Printf("ns per ops             : %18.2f\n", float64(delta.Nanoseconds()) / float64(results.count))
     297        p.Printf("Ops per threads        : %15d\n", results.count / uint64(nthreads))
     298        p.Printf("Ops per procs          : %15d\n", results.count / uint64(nprocs))
     299        p.Printf("Ops/sec/procs          : %18.2f\n", (float64(results.count) / float64(nprocs)) / delta.Seconds())
     300        p.Printf("ns per ops/procs       : %18.2f\n", float64(delta.Nanoseconds()) / (float64(results.count) / float64(nprocs)))
     301}
Note: See TracChangeset for help on using the changeset viewer.