Changeset c5a98f3 for benchmark/readyQ


Ignore:
Timestamp:
Dec 17, 2020, 4:18:15 PM (4 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
41cde266
Parents:
aa1d13c
Message:

Clean-up and comments

File:
1 edited

Legend:

Unmodified
Added
Removed
  • benchmark/readyQ/locality.go

    raa1d13c rc5a98f3  
    5353}
    5454
    55 func NewCtx( sem * semaphore.Weighted, data * MyData, id int ) (MyCtx) {
    56         return MyCtx{sem, unsafe.Pointer(data), context.Background(), syscall.Gettid(), id}
     55func NewCtx( data * MyData, id int ) (MyCtx) {
     56        r := MyCtx{semaphore.NewWeighted(1), unsafe.Pointer(data), context.Background(), syscall.Gettid(), id}
     57        r.s.Acquire(context.Background(), 1)
     58        return r
    5759}
    5860
     
    6668
    6769// ==================================================
     70// Atomic object where a single thread can wait
     71// May exchanges data
    6872type Spot struct {
    69         ptr uintptr
    70         id int
     73        ptr uintptr // atomic variable use fo MES
     74        id int      // id for debugging
    7175}
    7276
     
    7882        new := uintptr(unsafe.Pointer(ctx))
    7983        // old_d := ctx.d
     84
     85        // Attempt to CAS our context into the seat
    8086        var raw uintptr
    8187        for true {
    8288                raw = this.ptr
    83                 if raw == uintptr(1) {
     89                if raw == uintptr(1) { // Seat is closed, return
    8490                        return nil, true
    8591                }
    8692                if atomic.CompareAndSwapUintptr(&this.ptr, raw, new) {
    87                         break
     93                        break // We got the seat
    8894                }
    8995        }
    9096
     97        // If we aren't the fist in, wake someone
    9198        if raw != uintptr(0) {
    9299                var val *MyCtx
    93100                val = (*MyCtx)(unsafe.Pointer(raw))
     101
     102                // If we are sharing, give them our data
    94103                if share {
    95104                        // fmt.Printf("[%d] - %d update %d: %p -> %p\n", this.id, ctx.id, val.id, val.d, data)
     
    97106                }
    98107
     108                // Wake them up
    99109                // fmt.Printf("[%d] - %d release %d\n", this.id, ctx.id, val.id)
    100110                val.s.Release(1)
     
    102112
    103113        // fmt.Printf("[%d] - %d enter\n", this.id, ctx.id)
     114
     115        // Block once on the seat
    104116        ctx.s.Acquire(ctx.c, 1)
     117
     118        // Someone woke us up, get the new data
    105119        ret := (* MyData)(atomic.LoadPointer(&ctx.d))
    106120        // fmt.Printf("[%d] - %d leave: %p -> %p\n", this.id, ctx.id, ret, old_d)
     
    109123}
    110124
     125// Shutdown the spot
     126// Wake current thread and mark seat as closed
    111127func (this * Spot) release() {
    112128        val := (*MyCtx)(unsafe.Pointer(atomic.SwapUintptr(&this.ptr, uintptr(1))))
     
    115131        }
    116132
     133        // Someone was there, release them
    117134        val.s.Release(1)
    118135}
    119136
    120137// ==================================================
     138// Struct for result, Go doesn't support passing tuple in channels
    121139type Result struct {
    122140        count uint64
     
    130148
    131149// ==================================================
     150// Random number generator, Go's native one is to slow and global
    132151func __xorshift64( state * uint64 ) (uint64) {
    133152        x := *state
     
    139158}
    140159
     160// ==================================================
     161// Do some work by accessing 'cnt' cells in the array
    141162func work(data * MyData, cnt uint64, state * uint64) {
    142163        for i := uint64(0); i < cnt; i++ {
     
    145166}
    146167
     168// Main body of the threads
    147169func local(result chan Result, start chan struct{}, size uint64, cnt uint64, channels [] Spot, share bool, id int) {
    148         state := rand.Uint64()
    149 
    150         data := NewData(id, size)
    151         sem := semaphore.NewWeighted(1)
    152         sem.Acquire(context.Background(), 1)
    153         ctx := NewCtx(sem, data, id)
    154 
     170        // Initialize some data
     171        state := rand.Uint64()    // RNG state
     172        data := NewData(id, size) // Starting piece of data
     173        ctx := NewCtx(data, id)   // Goroutine local context
     174
     175        // Prepare results
    155176        r := NewResult()
     177
     178        // Wait for start
    156179        <- start
     180
     181        // Main loop
    157182        for true {
     183                // Touch our current data, write to invalidate remote cache lines
    158184                work(data, cnt, &state)
    159185
     186                // Wait on a random spot
    160187                i := __xorshift64(&state) % uint64(len(channels))
    161188                var closed bool
    162189                data, closed = channels[i].put(&ctx, data, share)
    163190
    164                 if closed { break }
    165                 if  clock_mode && atomic.LoadInt32(&stop) == 1 { break }
    166                 if !clock_mode && r.count >= stop_count { break }
    167                 if uint64(len(data.data)) != size {
    168                         panic("Data has weird size")
    169                 }
    170 
     191                // Check if the experiment is over
     192                if closed { break }                                       // yes, spot was closed
     193                if  clock_mode && atomic.LoadInt32(&stop) == 1 { break }  // yes, time's up
     194                if !clock_mode && r.count >= stop_count { break }         // yes, iterations reached
     195
     196                // Check everything is consistent
     197                if uint64(len(data.data)) != size { panic("Data has weird size") }
     198
     199                // write down progress and check migrations
    171200                ttid := syscall.Gettid()
    172201                r.count += 1
     
    175204        }
    176205
     206        // Mark goroutine as done
    177207        atomic.AddInt64(&threads_left, -1);
     208
     209        // return result
    178210        result <- r
    179211}
    180212
     213// ==================================================
     214// Program main
    181215func main() {
     216        // Benchmark specific command line arguments
    182217        work_sizeOpt := flag.Uint64("w", 2    , "Number of words (uint64) per threads")
    183218        countOpt     := flag.Uint64("c", 2    , "Number of words (uint64) to touch")
    184219        shareOpt     := flag.Bool  ("s", false, "Pass the work data to the next thread when blocking")
    185220
     221        // General benchmark initialization and deinitialization
    186222        defer bench_init()()
    187223
     224        // Eval command line arguments
    188225        size  := *work_sizeOpt
    189226        cnt   := *countOpt
    190227        share := *shareOpt
    191228
     229        // Check params
    192230        if ! (nthreads > nprocs) {
    193231                fmt.Fprintf(os.Stderr, "Must have more threads than procs\n")
     
    195233        }
    196234
    197         barrierStart := make(chan struct{})
    198         threads_left = int64(nprocs)
    199         result  := make(chan Result)
    200         channels := make([]Spot, nthreads - nprocs)
     235        // Make global data
     236        barrierStart := make(chan struct{})         // Barrier used at the start
     237        threads_left = int64(nprocs)                // Counter for active threads (not 'nthreads' because at all times 'nthreads - nprocs' are blocked)
     238        result  := make(chan Result)                // Channel for results
     239        channels := make([]Spot, nthreads - nprocs) // Number of spots
    201240        for i := range channels {
    202                 channels[i] = Spot{uintptr(0), i}
    203         }
    204 
     241                channels[i] = Spot{uintptr(0), i}     // init spots
     242        }
     243
     244        // start the goroutines
    205245        for i := 0; i < nthreads; i++ {
    206246                go local(result, barrierStart, size, cnt, channels, share, i)
     
    210250        atomic.StoreInt32(&stop, 0)
    211251        start := time.Now()
    212         close(barrierStart)
    213 
    214         wait(start, true);
     252        close(barrierStart) // release barrier
     253
     254        wait(start, true);  // general benchmark wait
    215255
    216256        atomic.StoreInt32(&stop, 1)
     
    220260        fmt.Printf("\nDone\n")
    221261
     262        // release all the blocked threads
    222263        for i := range channels {
    223264                channels[i].release()
    224265        }
    225266
     267        // Join and accumulate results
    226268        global_result := NewResult()
    227269        for i := 0; i < nthreads; i++ {
     
    232274        }
    233275
     276        // Print with nice 's, i.e. 1'000'000 instead of 1000000
    234277        p := message.NewPrinter(language.English)
    235278        p.Printf("Duration (ms)          : %f\n", delta.Seconds());
Note: See TracChangeset for help on using the changeset viewer.