0

In go-disruptor project. I would like to know how to guarantee that writing and reading of ringBuffer is thread safe. Because I found that the writing and reading of the ringBuffer are not locked, and the data that may be read may not be written into the ringBuffer. Because of the MESI protocol, it does not guarantee good visibility.

func main() {
    writer, reader := disruptor.New(
        disruptor.WithCapacity(BufferSize),
        disruptor.WithConsumerGroup(MyConsumer{}))

    // producer
    go func() {
        reservation := writer.Reserve(1)
        ringBuffer[sequence&RingBufferMask] = 42 // example of incoming value from a network operation such as HTTP, TCP, UDP, etc.
        writer.Commit(reservation, reservation)

        _ = reader.Close() // close the Reader once we're done producing messages
    }()

    reader.Read() // blocks until fully closed
}

type MyConsumer struct{}

func (m MyConsumer) Consume(lowerSequence, upperSequence int64) {
    for sequence := lowerSequence; sequence <= upperSequence; sequence++ {
        index := sequence&RingBufferMask
        message := ringBuffer[index]
        fmt.Println(message)
    }
}

var ringBuffer = [BufferSize]int

const (
    BufferSize = 1024 * 64
    BufferMask = BufferSize - 1
)

I think the ringBuffer read operation needs to be locked to protect the visibility of the internal elements after being written.

I wrote a simple program to simulate go-disruptor production and consumption, and found some concurrency issues when accessing ringBuffer. But I don't know the difference between the writing method and go-disruptor. Below is my program.

    group := &sync.WaitGroup{}
    var nums = [1024 * 64]int64{}
    group.Add(2)
    i := &Cursor{}
    go func() {
        defer group.Done()
        j := i.Load()
        for j < int64(len(nums)) {
            j = i.Load()
            nums[j] = j
            i.Store(j + 1)
        }
    }()
    go func() {
        defer group.Done()
        j := i.Load()
        for j < int64(len(nums)) {
            j = i.Load()
            if j != nums[j] {
                fmt.Println(j, nums[j])
            }
        }
    }()
    group.Wait()

Cursor uses the definition in disruptor.

type Cursor [8]int64 // prevent false sharing of the sequence cursor by padding the CPU cache line with 64 *bytes* of data.

func NewCursor() *Cursor {
    var this Cursor
    this[0] = defaultCursorValue
    return &this
}

func (this *Cursor) Store(value int64) { atomic.StoreInt64(&this[0], value) }
func (this *Cursor) Load() int64       { return atomic.LoadInt64(&this[0]) }

const defaultCursorValue = -1

It prints random data such as

3 0
440 440
512 0
512 0
556 0
637 0
724 724
...
365318663
  • 11
  • 1
  • I don't understand why you think there needs to be a lock involved, but this isn't a forum for general discussion. If you want to learn how the algorithm works, you could start here: https://stackoverflow.com/questions/6559308/how-does-lmaxs-disruptor-pattern-work – JimB May 04 '23 at 18:38
  • totally agree with @JimB - is there a question here? – AthulMuralidhar May 08 '23 at 15:34
  • Thank you for your answers. I wrote a simple program to simulate go-disruptor production and consumption, and found some concurrency issues when accessing ringBuffer. But I don't know the difference between the writing method and go-disruptor. Below is my program. ```go – 365318663 May 08 '23 at 17:47
  • Please see the code and output I added to the question. – 365318663 May 08 '23 at 18:05

0 Answers0