Changeset 2dd0689


Ignore:
Timestamp:
Dec 16, 2020, 2:37:31 PM (12 months ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
arm-eh, jacob/cs343-translation, master, new-ast-unique-expr
Children:
fd84538
Parents:
72b1800
Message:

Fix implementation of locality to properly use spots.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • benchmark/readyQ/locality.go

    r72b1800 r2dd0689  
    22
    33import (
     4        "context"
    45        "flag"
    56        "fmt"
     
    89        "sync/atomic"
    910        "time"
     11        "unsafe"
     12        "golang.org/x/sync/semaphore"
    1013        "golang.org/x/text/language"
    1114        "golang.org/x/text/message"
    1215)
    1316
    14 func handshake(stop chan struct {}, c chan [] uint64, data [] uint64, share bool) (bool, [] uint64) {
    15         var s [] uint64 = data
    16         if !share {
    17                 s = nil
     17type GoCtx struct {
     18        s * semaphore.Weighted
     19        d * [] uint64
     20}
     21
     22type Spot struct {
     23        ptr uintptr
     24}
     25
     26func (this * Spot) put( s * semaphore.Weighted, data [] uint64, share bool) ([] uint64) {
     27        ctx := GoCtx{s, &data}
     28
     29        var raw uintptr
     30        for true {
     31                raw = this.ptr
     32                if raw == uintptr(1) {
     33                        return nil
     34                }
     35                if atomic.CompareAndSwapUintptr(&this.ptr, raw, uintptr(unsafe.Pointer(&ctx))) {
     36                        break
     37                }
    1838        }
    1939
    20         // send the data
    21         select {
    22         case <- stop:
    23                 return true, nil
    24         case c <- s:
     40        if raw != uintptr(0) {
     41                val := (*GoCtx)(unsafe.Pointer(raw))
     42                if share {
     43                        val.d = &data
     44                }
     45
     46                val.s.Release(1)
    2547        }
    2648
    27         // get the new data chunk
    28         select {
    29         case <- stop:
    30                 return true, nil
    31         case n := <- c:
    32                 if share {
    33                         return false, n
    34                 }
    35                 return false, data
    36         }
     49        ctx.s.Acquire(context.Background(), 1)
     50
     51        return *ctx.d
    3752}
    3853
    39 func local(result chan uint64, start chan struct{}, stop chan struct{}, size uint64, cnt uint64, channels []chan [] uint64, chan_cnt uint64, share bool) {
     54func (this * Spot) release() {
     55        val := (*GoCtx)(unsafe.Pointer(atomic.SwapUintptr(&this.ptr, uintptr(1))))
     56        if val == nil {
     57                return
     58        }
     59
     60        val.s.Release(1)
     61}
     62
     63
     64
     65func local(result chan uint64, start chan struct{}, size uint64, cnt uint64, channels [] Spot, share bool) {
     66        lrand := rand.New(rand.NewSource(rand.Int63()))
    4067        var data [] uint64
    4168        data = make([]uint64, size)
     
    4370                data[i] = 0
    4471        }
     72
     73        sem := semaphore.NewWeighted(1)
     74        sem.Acquire(context.Background(), 1)
     75
    4576        count := uint64(0)
    4677        <- start
    4778        for true {
    4879                for i := uint64(0); i < cnt; i++ {
    49                         data[rand.Uint64() % size] += 1
     80                        data[lrand.Uint64() % size] += 1
    5081                }
    5182
    52                 i := rand.Uint64() % chan_cnt
    53                 var closed bool
    54                 closed, data = handshake(stop, channels[i], data, share)
     83                i := lrand.Int() % len(channels)
     84                data = channels[i].put(sem, data, share)
    5585                count += 1
    5686
    57                 if  closed { break }
     87                if  clock_mode && atomic.LoadInt32(&stop) == 1 { break }
    5888                if !clock_mode && count >= stop_count { break }
     89                if uint64(len(data)) != size {
     90                        panic("Data has weird size")
     91                }
    5992        }
    6093
     
    6497
    6598func main() {
    66 
    6799        work_sizeOpt := flag.Uint64("w", 2    , "Number of words (uint64) per threads")
    68100        countOpt     := flag.Uint64("c", 2    , "Number of words (uint64) to touch")
    69101        shareOpt     := flag.Bool  ("s", false, "Pass the work data to the next thread when blocking")
    70102
    71         bench_init()
     103        defer bench_init()()
    72104
    73105        size  := *work_sizeOpt
     
    81113
    82114        barrierStart := make(chan struct{})
    83         barrierStop  := make(chan struct{})
    84115        threads_left = int64(nthreads)
    85116        result  := make(chan uint64)
    86         channels := make([]chan [] uint64, nthreads - nprocs)
     117        channels := make([]Spot, nthreads - nprocs)
    87118        for i := range channels {
    88                 channels[i] = make(chan [] uint64, 1)
     119                channels[i] = Spot{uintptr(0)}
    89120        }
    90121
    91122        for i := 0; i < nthreads; i++ {
    92                 go local(result, barrierStart, barrierStop, size, cnt, channels, uint64(nthreads - nprocs), share)
     123                go local(result, barrierStart, size, cnt, channels, share)
    93124        }
    94125        fmt.Printf("Starting\n");
    95126
     127        atomic.StoreInt32(&stop, 0)
    96128        start := time.Now()
    97129        close(barrierStart)
     
    99131        wait(start, true);
    100132
    101         close(barrierStop)
     133        atomic.StoreInt32(&stop, 1)
    102134        end := time.Now()
    103135        delta := end.Sub(start)
    104136
    105137        fmt.Printf("\nDone\n")
     138
     139        for i := range channels {
     140                channels[i].release()
     141        }
    106142
    107143        global_counter := uint64(0)
Note: See TracChangeset for help on using the changeset viewer.