Changeset 089b1a9


Ignore:
Timestamp:
Dec 18, 2020, 12:26:22 PM (3 years ago)
Author:
Colby Alexander Parsons <caparsons@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
ebd1899
Parents:
636d45f5 (diff), 41cde266 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

File:
1 edited

Legend:

Unmodified
Added
Removed
  • benchmark/readyQ/locality.go

    r636d45f5 r089b1a9  
    22
    33import (
     4        "context"
    45        "flag"
    56        "fmt"
    67        "math/rand"
    78        "os"
     9        "syscall"
    810        "sync/atomic"
    911        "time"
     12        "unsafe"
     13        "golang.org/x/sync/semaphore"
    1014        "golang.org/x/text/language"
    1115        "golang.org/x/text/message"
    1216)
    1317
    14 func handshake(stop chan struct {}, c chan [] uint64, data [] uint64, share bool) (bool, [] uint64) {
    15         var s [] uint64 = data
    16         if !share {
    17                 s = nil
    18         }
    19 
    20         // send the data
    21         select {
    22         case <- stop:
    23                 return true, nil
    24         case c <- s:
    25         }
    26 
    27         // get the new data chunk
    28         select {
    29         case <- stop:
    30                 return true, nil
    31         case n := <- c:
    32                 if share {
    33                         return false, n
    34                 }
    35                 return false, data
    36         }
    37 }
    38 
    39 func local(result chan uint64, start chan struct{}, stop chan struct{}, size uint64, cnt uint64, channels []chan [] uint64, chan_cnt uint64, share bool) {
     18// ==================================================
     19type MyData struct {
     20        ttid int
     21        id int
     22        data [] uint64
     23}
     24
     25func NewData(id int, size uint64) (*MyData) {
    4026        var data [] uint64
    4127        data = make([]uint64, size)
     
    4329                data[i] = 0
    4430        }
    45         count := uint64(0)
     31        return &MyData{syscall.Gettid(), id, data}
     32}
     33
     34func (this * MyData) moved( ttid int ) (uint64) {
     35        if this.ttid == ttid {
     36                return 0
     37        }
     38        this.ttid = ttid
     39        return 1
     40}
     41
     42func (this * MyData) access( idx uint64 ) {
     43        this.data[idx % uint64(len(this.data))] += 1
     44}
     45
     46// ==================================================
     47type MyCtx struct {
     48        s * semaphore.Weighted
     49        d unsafe.Pointer
     50        c context.Context
     51        ttid int
     52        id int
     53}
     54
     55func NewCtx( data * MyData, id int ) (MyCtx) {
     56        r := MyCtx{semaphore.NewWeighted(1), unsafe.Pointer(data), context.Background(), syscall.Gettid(), id}
     57        r.s.Acquire(context.Background(), 1)
     58        return r
     59}
     60
     61func (this * MyCtx) moved( ttid int ) (uint64) {
     62        if this.ttid == ttid {
     63                return 0
     64        }
     65        this.ttid = ttid
     66        return 1
     67}
     68
     69// ==================================================
     70// Atomic object where a single thread can wait
     71// May exchanges data
     72type Spot struct {
     73        ptr uintptr // atomic variable use fo MES
     74        id int      // id for debugging
     75}
     76
     77// Main handshake of the code
     78// Single seat, first thread arriving waits
     79// Next threads unblocks current one and blocks in its place
     80// if share == true, exchange data in the process
     81func (this * Spot) put( ctx * MyCtx, data * MyData, share bool) (* MyData, bool) {
     82        new := uintptr(unsafe.Pointer(ctx))
     83        // old_d := ctx.d
     84
     85        // Attempt to CAS our context into the seat
     86        var raw uintptr
     87        for true {
     88                raw = this.ptr
     89                if raw == uintptr(1) { // Seat is closed, return
     90                        return nil, true
     91                }
     92                if atomic.CompareAndSwapUintptr(&this.ptr, raw, new) {
     93                        break // We got the seat
     94                }
     95        }
     96
     97        // If we aren't the fist in, wake someone
     98        if raw != uintptr(0) {
     99                var val *MyCtx
     100                val = (*MyCtx)(unsafe.Pointer(raw))
     101
     102                // If we are sharing, give them our data
     103                if share {
     104                        // fmt.Printf("[%d] - %d update %d: %p -> %p\n", this.id, ctx.id, val.id, val.d, data)
     105                        atomic.StorePointer(&val.d, unsafe.Pointer(data))
     106                }
     107
     108                // Wake them up
     109                // fmt.Printf("[%d] - %d release %d\n", this.id, ctx.id, val.id)
     110                val.s.Release(1)
     111        }
     112
     113        // fmt.Printf("[%d] - %d enter\n", this.id, ctx.id)
     114
     115        // Block once on the seat
     116        ctx.s.Acquire(ctx.c, 1)
     117
     118        // Someone woke us up, get the new data
     119        ret := (* MyData)(atomic.LoadPointer(&ctx.d))
     120        // fmt.Printf("[%d] - %d leave: %p -> %p\n", this.id, ctx.id, ret, old_d)
     121
     122        return ret, false
     123}
     124
     125// Shutdown the spot
     126// Wake current thread and mark seat as closed
     127func (this * Spot) release() {
     128        val := (*MyCtx)(unsafe.Pointer(atomic.SwapUintptr(&this.ptr, uintptr(1))))
     129        if val == nil {
     130                return
     131        }
     132
     133        // Someone was there, release them
     134        val.s.Release(1)
     135}
     136
     137// ==================================================
     138// Struct for result, Go doesn't support passing tuple in channels
     139type Result struct {
     140        count uint64
     141        gmigs uint64
     142        dmigs uint64
     143}
     144
     145func NewResult() (Result) {
     146        return Result{0, 0, 0}
     147}
     148
     149// ==================================================
     150// Random number generator, Go's native one is to slow and global
     151func __xorshift64( state * uint64 ) (uint64) {
     152        x := *state
     153        x ^= x << 13
     154        x ^= x >> 7
     155        x ^= x << 17
     156        *state = x
     157        return x
     158}
     159
     160// ==================================================
     161// Do some work by accessing 'cnt' cells in the array
     162func work(data * MyData, cnt uint64, state * uint64) {
     163        for i := uint64(0); i < cnt; i++ {
     164                data.access(__xorshift64(state))
     165        }
     166}
     167
     168// Main body of the threads
     169func local(result chan Result, start chan struct{}, size uint64, cnt uint64, channels [] Spot, share bool, id int) {
     170        // Initialize some data
     171        state := rand.Uint64()    // RNG state
     172        data := NewData(id, size) // Starting piece of data
     173        ctx := NewCtx(data, id)   // Goroutine local context
     174
     175        // Prepare results
     176        r := NewResult()
     177
     178        // Wait for start
    46179        <- start
     180
     181        // Main loop
    47182        for true {
    48                 for i := uint64(0); i < cnt; i++ {
    49                         data[rand.Uint64() % size] += 1
    50                 }
    51 
    52                 i := rand.Uint64() % chan_cnt
     183                // Touch our current data, write to invalidate remote cache lines
     184                work(data, cnt, &state)
     185
     186                // Wait on a random spot
     187                i := __xorshift64(&state) % uint64(len(channels))
    53188                var closed bool
    54                 closed, data = handshake(stop, channels[i], data, share)
    55                 count += 1
    56 
    57                 if  closed { break }
    58                 if !clock_mode && count >= stop_count { break }
    59         }
    60 
     189                data, closed = channels[i].put(&ctx, data, share)
     190
     191                // Check if the experiment is over
     192                if closed { break }                                       // yes, spot was closed
     193                if  clock_mode && atomic.LoadInt32(&stop) == 1 { break }  // yes, time's up
     194                if !clock_mode && r.count >= stop_count { break }         // yes, iterations reached
     195
     196                // Check everything is consistent
     197                if uint64(len(data.data)) != size { panic("Data has weird size") }
     198
     199                // write down progress and check migrations
     200                ttid := syscall.Gettid()
     201                r.count += 1
     202                r.gmigs += ctx .moved(ttid)
     203                r.dmigs += data.moved(ttid)
     204        }
     205
     206        // Mark goroutine as done
    61207        atomic.AddInt64(&threads_left, -1);
    62         result <- count
    63 }
    64 
     208
     209        // return result
     210        result <- r
     211}
     212
     213// ==================================================
     214// Program main
    65215func main() {
    66 
     216        // Benchmark specific command line arguments
    67217        work_sizeOpt := flag.Uint64("w", 2    , "Number of words (uint64) per threads")
    68218        countOpt     := flag.Uint64("c", 2    , "Number of words (uint64) to touch")
    69219        shareOpt     := flag.Bool  ("s", false, "Pass the work data to the next thread when blocking")
    70220
    71         bench_init()
    72 
     221        // General benchmark initialization and deinitialization
     222        defer bench_init()()
     223
     224        // Eval command line arguments
    73225        size  := *work_sizeOpt
    74226        cnt   := *countOpt
    75227        share := *shareOpt
    76228
     229        // Check params
    77230        if ! (nthreads > nprocs) {
    78231                fmt.Fprintf(os.Stderr, "Must have more threads than procs\n")
     
    80233        }
    81234
    82         barrierStart := make(chan struct{})
    83         barrierStop  := make(chan struct{})
    84         threads_left = int64(nthreads)
    85         result  := make(chan uint64)
    86         channels := make([]chan [] uint64, nthreads - nprocs)
     235        // Make global data
     236        barrierStart := make(chan struct{})         // Barrier used at the start
     237        threads_left = int64(nprocs)                // Counter for active threads (not 'nthreads' because at all times 'nthreads - nprocs' are blocked)
     238        result  := make(chan Result)                // Channel for results
     239        channels := make([]Spot, nthreads - nprocs) // Number of spots
    87240        for i := range channels {
    88                 channels[i] = make(chan [] uint64, 1)
    89         }
    90 
     241                channels[i] = Spot{uintptr(0), i}     // init spots
     242        }
     243
     244        // start the goroutines
    91245        for i := 0; i < nthreads; i++ {
    92                 go local(result, barrierStart, barrierStop, size, cnt, channels, uint64(nthreads - nprocs), share)
     246                go local(result, barrierStart, size, cnt, channels, share, i)
    93247        }
    94248        fmt.Printf("Starting\n");
    95249
     250        atomic.StoreInt32(&stop, 0)
    96251        start := time.Now()
    97         close(barrierStart)
    98 
    99         wait(start, true);
    100 
    101         close(barrierStop)
     252        close(barrierStart) // release barrier
     253
     254        wait(start, true);  // general benchmark wait
     255
     256        atomic.StoreInt32(&stop, 1)
    102257        end := time.Now()
    103258        delta := end.Sub(start)
     
    105260        fmt.Printf("\nDone\n")
    106261
    107         global_counter := uint64(0)
     262        // release all the blocked threads
     263        for i := range channels {
     264                channels[i].release()
     265        }
     266
     267        // Join and accumulate results
     268        global_result := NewResult()
    108269        for i := 0; i < nthreads; i++ {
    109                 global_counter += <- result
    110         }
    111 
     270                r := <- result
     271                global_result.count += r.count
     272                global_result.gmigs += r.gmigs
     273                global_result.dmigs += r.dmigs
     274        }
     275
     276        // Print with nice 's, i.e. 1'000'000 instead of 1000000
    112277        p := message.NewPrinter(language.English)
    113278        p.Printf("Duration (ms)          : %f\n", delta.Seconds());
     
    115280        p.Printf("Number of threads      : %d\n", nthreads);
    116281        p.Printf("Work size (64bit words): %d\n", size);
    117         p.Printf("Total Operations(ops)  : %15d\n", global_counter)
    118         p.Printf("Ops per second         : %18.2f\n", float64(global_counter) / delta.Seconds())
    119         p.Printf("ns per ops             : %18.2f\n", float64(delta.Nanoseconds()) / float64(global_counter))
    120         p.Printf("Ops per threads        : %15d\n", global_counter / uint64(nthreads))
    121         p.Printf("Ops per procs          : %15d\n", global_counter / uint64(nprocs))
    122         p.Printf("Ops/sec/procs          : %18.2f\n", (float64(global_counter) / float64(nprocs)) / delta.Seconds())
    123         p.Printf("ns per ops/procs       : %18.2f\n", float64(delta.Nanoseconds()) / (float64(global_counter) / float64(nprocs)))
    124 }
     282        p.Printf("Total Operations(ops)  : %15d\n", global_result.count)
     283        p.Printf("Total G Migrations     : %15d\n", global_result.gmigs)
     284        p.Printf("Total D Migrations     : %15d\n", global_result.dmigs)
     285        p.Printf("Ops per second         : %18.2f\n", float64(global_result.count) / delta.Seconds())
     286        p.Printf("ns per ops             : %18.2f\n", float64(delta.Nanoseconds()) / float64(global_result.count))
     287        p.Printf("Ops per threads        : %15d\n", global_result.count / uint64(nthreads))
     288        p.Printf("Ops per procs          : %15d\n", global_result.count / uint64(nprocs))
     289        p.Printf("Ops/sec/procs          : %18.2f\n", (float64(global_result.count) / float64(nprocs)) / delta.Seconds())
     290        p.Printf("ns per ops/procs       : %18.2f\n", float64(delta.Nanoseconds()) / (float64(global_result.count) / float64(nprocs)))
     291}
Note: See TracChangeset for help on using the changeset viewer.