In go-disruptor project.
I would like to know how to guarantee that writing and reading of ringBuffer
is thread safe. Because I found that the writing and reading of the ringBuffer
are not locked, and the data that may be read may not be written into the ringBuffer
. Because of the MESI protocol, it does not guarantee good visibility.
func main() {
writer, reader := disruptor.New(
disruptor.WithCapacity(BufferSize),
disruptor.WithConsumerGroup(MyConsumer{}))
// producer
go func() {
reservation := writer.Reserve(1)
ringBuffer[sequence&RingBufferMask] = 42 // example of incoming value from a network operation such as HTTP, TCP, UDP, etc.
writer.Commit(reservation, reservation)
_ = reader.Close() // close the Reader once we're done producing messages
}()
reader.Read() // blocks until fully closed
}
type MyConsumer struct{}
func (m MyConsumer) Consume(lowerSequence, upperSequence int64) {
for sequence := lowerSequence; sequence <= upperSequence; sequence++ {
index := sequence&RingBufferMask
message := ringBuffer[index]
fmt.Println(message)
}
}
var ringBuffer = [BufferSize]int
const (
BufferSize = 1024 * 64
BufferMask = BufferSize - 1
)
I think the ringBuffer read operation needs to be locked to protect the visibility of the internal elements after being written.
I wrote a simple program to simulate go-disruptor production and consumption, and found some concurrency issues when accessing ringBuffer. But I don't know the difference between the writing method and go-disruptor. Below is my program.
group := &sync.WaitGroup{}
var nums = [1024 * 64]int64{}
group.Add(2)
i := &Cursor{}
go func() {
defer group.Done()
j := i.Load()
for j < int64(len(nums)) {
j = i.Load()
nums[j] = j
i.Store(j + 1)
}
}()
go func() {
defer group.Done()
j := i.Load()
for j < int64(len(nums)) {
j = i.Load()
if j != nums[j] {
fmt.Println(j, nums[j])
}
}
}()
group.Wait()
Cursor uses the definition in disruptor.
type Cursor [8]int64 // prevent false sharing of the sequence cursor by padding the CPU cache line with 64 *bytes* of data.
func NewCursor() *Cursor {
var this Cursor
this[0] = defaultCursorValue
return &this
}
func (this *Cursor) Store(value int64) { atomic.StoreInt64(&this[0], value) }
func (this *Cursor) Load() int64 { return atomic.LoadInt64(&this[0]) }
const defaultCursorValue = -1
It prints random data such as
3 0
440 440
512 0
512 0
556 0
637 0
724 724
...