5

I have a need to read structure fields set from another goroutine, afaik doing so directly even when knowing for sure there will be no concurrent access(write finished before read occurred, signaled via chan struct{}) may result in stale data

Will sending a pointer to the structure(created in the 1st goroutine, modified in the 2nd, read by the 3rd) resolve the possible staleness issue, considering I can guarantee no concurrent access?

I would like to avoid copying as structure is big and contains huge Bytes.Buffer filled in the 2nd goroutine, I need to read from the 3rd

There is an option for locking, but seems like an overkill considering I know that there will be no concurrent access

let4be
  • 1,048
  • 11
  • 30
  • `I know that there will be no concurrent access` so the go routines are not operating at the same time? – poopoothegorilla May 22 '16 at 06:31
  • They do operate at the same time, but I can easily synchronize the read/write via a simple and fast `chan struct{}` to ensure read happens after write finishes, i.e. reader would wait for writer to finish modifying the struct fields(writing to Bytes.Buffer and setting some simple fields(int) in the struct). By design there is no concurrent access, but reader and writer are always located in different goroutines – let4be May 22 '16 at 06:48
  • 3
    If there are no concurrent access and each goroutine share the address of the same struct in memory, then there's no reason for staleness. – T. Claverie May 22 '16 at 08:54
  • 1
    Always protect data that can be accessed concurrently. If there's no contention then performance hit will be negligible anyway. Don't rely on assumptions, it will cause problems that will be difficult to debug. On the other hand, you can always tweak you code if locking causes performance problems. – creker May 22 '16 at 12:27

2 Answers2

6

There are many answers to this, and it depends to your data structure and program logic.

see: How to lock/synchronize access to a variable in Go during concurrent goroutines?
and: How to use RWMutex in Golang?

1- using Stateful Goroutines and channels
2- using sync.Mutex
3- using sync/atomic
4- using WaitGroup
5- using program logic(Semaphore)
...


1: Stateful Goroutines and channels:
I simulated very similar sample(imagine you want to read from one SSD and write to another SSD with different speed):
In this sample code one goroutine (named write) does some job prepares data and fills the big struct, and another goroutine (named read) reads data from big struct then do some job, And the manger goroutine, guarantee no concurrent access to same data. And communication between three goroutines done with channels. And in your case you can use pointers for channel data, or global struct like this sample.
output will be like this:
mean= 36.6920166015625 stdev= 6.068973186592054

I hope this helps you to get the idea.
Working sample code:

package main

import (
    "fmt"
    "math"
    "math/rand"
    "runtime"
    "sync"
    "time"
)

type BigStruct struct {
    big     []uint16
    rpos    int
    wpos    int
    full    bool
    empty   bool
    stopped bool
}

func main() {
    wg.Add(1)
    go write()
    go read()
    go manage()
    runtime.Gosched()
    stopCh <- <-time.After(5 * time.Second)
    wg.Wait()
    mean := Mean(hist)
    stdev := stdDev(hist, mean)
    fmt.Println("mean=", mean, "stdev=", stdev)
}

const N = 1024 * 1024 * 1024

var wg sync.WaitGroup
var stopCh chan time.Time = make(chan time.Time)

var hist []int = make([]int, 65536)

var s *BigStruct = &BigStruct{empty: true,
    big: make([]uint16, N), //2GB
}

var rc chan uint16 = make(chan uint16)
var wc chan uint16 = make(chan uint16)

func next(pos int) int {
    pos++
    if pos >= N {
        pos = 0
    }
    return pos
}

func manage() {
    dataReady := false
    var data uint16
    for {
        if !dataReady && !s.empty {
            dataReady = true
            data = s.big[s.rpos]
            s.rpos++
            if s.rpos >= N {
                s.rpos = 0
            }
            s.empty = s.rpos == s.wpos
            s.full = next(s.wpos) == s.rpos
        }
        if dataReady {
            select {
            case rc <- data:
                dataReady = false
            default:
                runtime.Gosched()
            }
        }
        if !s.full {
            select {
            case d := <-wc:
                s.big[s.wpos] = d
                s.wpos++
                if s.wpos >= N {
                    s.wpos = 0
                }
                s.empty = s.rpos == s.wpos
                s.full = next(s.wpos) == s.rpos
            default:
                runtime.Gosched()
            }
        }
        if s.stopped {
            if s.empty {
                wg.Done()
                return
            }
        }

    }
}

func read() {
    for {
        d := <-rc
        hist[d]++
    }
}

func write() {
    for {
        wc <- uint16(rand.Intn(65536))
        select {
        case <-stopCh:
            s.stopped = true
            return
        default:
            runtime.Gosched()
        }
    }
}

func stdDev(data []int, mean float64) float64 {
    sum := 0.0
    for _, d := range data {
        sum += math.Pow(float64(d)-mean, 2)
    }
    variance := sum / float64(len(data)-1)
    return math.Sqrt(variance)
}
func Mean(data []int) float64 {
    sum := 0.0
    for _, d := range data {
        sum += float64(d)
    }
    return sum / float64(len(data))
}

5: another way(faster) for some use cases:
here another way to use shared data structure for read job/write job/ processing job which it was separated in first post, now here doing same 3 jobs without channels and without mutex.

working sample:

package main

import (
    "fmt"
    "math"
    "math/rand"
    "time"
)

type BigStruct struct {
    big     []uint16
    rpos    int
    wpos    int
    full    bool
    empty   bool
    stopped bool
}

func manage() {
    for {
        if !s.empty {
            hist[s.big[s.rpos]]++ //sample read job with any time len
            nextPtr(&s.rpos)
        }
        if !s.full && !s.stopped {
            s.big[s.wpos] = uint16(rand.Intn(65536)) //sample wrire job with any time len
            nextPtr(&s.wpos)
        }
        if s.stopped {
            if s.empty {
                return
            }
        } else {
            s.stopped = time.Since(t0) >= 5*time.Second
        }
    }
}

func main() {
    t0 = time.Now()
    manage()
    mean := Mean(hist)
    stdev := StdDev(hist, mean)
    fmt.Println("mean=", mean, "stdev=", stdev)
    d0 := time.Since(t0)
    fmt.Println(d0) //5.8523347s
}

var t0 time.Time

const N = 100 * 1024 * 1024

var hist []int = make([]int, 65536)

var s *BigStruct = &BigStruct{empty: true,
    big: make([]uint16, N), //2GB
}

func next(pos int) int {
    pos++
    if pos >= N {
        pos = 0
    }
    return pos
}
func nextPtr(pos *int) {
    *pos++
    if *pos >= N {
        *pos = 0
    }

    s.empty = s.rpos == s.wpos
    s.full = next(s.wpos) == s.rpos
}

func StdDev(data []int, mean float64) float64 {
    sum := 0.0
    for _, d := range data {
        sum += math.Pow(float64(d)-mean, 2)
    }
    variance := sum / float64(len(data)-1)
    return math.Sqrt(variance)
}
func Mean(data []int) float64 {
    sum := 0.0
    for _, d := range data {
        sum += float64(d)
    }
    return sum / float64(len(data))
}
Community
  • 1
  • 1
  • 2
    That code looks too complicated for such a simple thing. Mutexes in go are used for this exact reason - it will be much simpler and probably even faster. Don't use channels just because you can. Also, any use of `runtime.Gosched()` most of the times means you're doing it wrong. You don't need it, remove `default` cases from `select` statements. Also remove `select` statements with just one channel operation and use channels directly instead – creker May 22 '16 at 12:20
  • I can see that your code uses `select` with `runtime.Gosched()` to access channels without blocking but, again, you're doing it wrong. The whole example is too complicated, unnecessary, fragile and simply not the Go way of doing things. And because of the `runtime.Gosched()` I'm not sure how it will behave when GOMAXPROCS is 1. – creker May 22 '16 at 12:34
  • Did you measure the performance? I also didn't but quite sure that mutex will be faster. Channels are expensive by themselves, mutexes are cheap if there's no contention. But it doesn't matter anyway, mutex will make the code much simpler and that's more important. Until benchmark tells you that channels faster and you can justify why performance benefit is worth hurting readability, use mutexes – creker May 22 '16 at 12:57
  • 2
    https://github.com/golang/go/wiki/MutexOrChannel `Use whichever is most expressive and/or most simple`. That's the universal rule for almost everything until you have very good reasons why you need to use more complicated solution. But that's not the only problem with your code, it's just not a good Go code by itself. – creker May 22 '16 at 13:03
  • @creker: This is called Stateful Goroutines: https://gobyexample.com/stateful-goroutines If you comment out all runtime.Gosched() time=5.8523347s and above code time=5.0052863s it is faster with runtime.Gosched() so I keep it The scheduler is instructed to switch the execution to another goroutine. This is exactly needed here, buffer is empty and another goroutine needed to fill the buffer. About channels I do not agree with you. It is very elemental to Golang and concurrency, I’ve been using it before Golang: (Erlang Message Passing) it is very simple and cheap. –  May 22 '16 at 17:47
5

To prevent concurrent modifications to a struct while retaining the ability to read, you'd typically embed a sync.RWMutex. This is no exemption. You can simply lock your struct for writes while it is in transit and unlock it at a point in time of your convenience.

package main

import (
    "fmt"
    "sync"
    "time"
)

// Big simulates your big struct
type Big struct {
    sync.RWMutex
    value string
}

// pump uses a groutine to take the slice of pointers to Big,
// locks the underlying structs and sends the pointers to
// the locked instances of Big downstream
func pump(bigs []*Big) chan *Big {

    // We make the channel buffered for this example
    // for illustration purposes
    c := make(chan *Big, 3)

    go func() {
        for _, big := range bigs {
            // We lock the struct before sending it to the channel
            // so it can not be changed via pointer while in transit
            big.Lock()
            c <- big
        }
        close(c)
    }()

    return c
}

// sink reads pointers to the locked instances of Big
// reads them and unlocks them
func sink(c chan *Big) {

    for big := range c {
        fmt.Println(big.value)
        time.Sleep(1 * time.Second)
        big.Unlock()

    }
}

// modify tries to achieve locks to the instances and modify them
func modify(bigs []*Big) {
    for _, big := range bigs {

        big.Lock()
        big.value = "modified"
        big.Unlock()
    }
}

func main() {

    bigs := []*Big{&Big{value: "Foo"}, &Big{value: "Bar"}, &Big{value: "Baz"}}
    c := pump(bigs)

    // For the sake of this example, we wait until all entries are
    // send into the channel and hence are locked
    time.Sleep(1 * time.Second)

    // Now we try to modify concurrently before we even start to read
    // the struct of which the pointers were sent into the channel
    go modify(bigs)
    sink(c)

    // We use sleep here to keep waiting for modify() to finish simple.
    // Usually, you'd use a sync.waitGroup
    time.Sleep(1 * time.Second)

    for _, big := range bigs {
        fmt.Println(big.value)
    }

}

Run on playground

Markus W Mahlberg
  • 19,711
  • 6
  • 65
  • 89