0

I am trying to implement a producer <-> consumer pattern in C++. When I read about this pattern they always seems to mention a potential deadlock that has to be avoided. However I have implemented this below without using any mutex below. What is wrong with my code?

#include <vector>
#include <iostream>
#include <chrono>
#include <thread>
#include <atomic>

class CircularBuffer
{
public:
    CircularBuffer();
    int*          getWritePos();
    void      finishedWriting();
    int*           getReadPos();
    void      finishedReading();
private:
    void waitForReaderToCatchUp();
    void waitForWriterToCatchUp();

    const int size = 5;
    std::vector<int> data;
    // Changed from int since these variables are shared between the two threads and assignment is not necessarily atomic: 
    std::atomic<int> writePos = 0;
    std::atomic<int> readPos = 0;
};

CircularBuffer::CircularBuffer() {
    data.resize(size);
}

void
CircularBuffer::waitForReaderToCatchUp() {
    int unread = writePos - readPos;
    while (unread >= size) {
        std::this_thread::sleep_for(std::chrono::nanoseconds(10));
        unread = writePos - readPos;
    }
}

int*
CircularBuffer::getWritePos() {
    waitForReaderToCatchUp();
    int pos = writePos % size;
    return &data[pos];
}

void
CircularBuffer::finishedWriting() {
    writePos++;
}

void
CircularBuffer::waitForWriterToCatchUp() {
    int unread = writePos - readPos;
    while (unread < 1) {
        std::this_thread::sleep_for(std::chrono::nanoseconds(10));
        unread = writePos - readPos;
    }
}

int*
CircularBuffer::getReadPos() {
    waitForWriterToCatchUp();
    int pos = readPos % size;
    return &data[pos];
}

void
CircularBuffer::finishedReading() {
    readPos++;
}

const int produceMinTime = 100;

void produce(CircularBuffer *cb) {
    for (int i = 0; i < 15; i++) {
        int r = rand() % 1000;
        std::this_thread::sleep_for(std::chrono::milliseconds(produceMinTime + r));
        int *p = cb->getWritePos();
        memcpy(p, &i, 4);
        cb->finishedWriting();
    }
}

void consume(CircularBuffer *cb) {
    for (int i = 0; i < 15; i++) {
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        int *p = cb->getReadPos();
        int j = *p;
        std::cout << "Value: " << j << std::endl;
        cb->finishedReading();
    }
}

int main()
{
    CircularBuffer cb;
    std::thread t1(produce, &cb);
    std::this_thread::sleep_for(std::chrono::milliseconds(2000));
    std::thread t2(consume, &cb);

    t1.join();
    t2.join();
    int k;
    std::cin >> k;
}
Andy
  • 3,251
  • 4
  • 32
  • 53
  • 2
    I don't understand what your question is? If you don't have anything like a mutex then you can't have deadlock. – Lou Franco Feb 28 '20 at 15:11
  • 2
    not sure about deadlocks in this code - you'd have to ask the person who raised it - but any time you see `this_thread::sleep_for()` questions should be asked – UKMonkey Feb 28 '20 at 15:13
  • 3
    You won't have a deadlock but you have much worse: undefined behavior because of data races. You _need_ synchronisation primitives to prevent them. – Mat Feb 28 '20 at 15:13
  • 1
    The code does not work, the problem is that it has not been tested well enough. Data race bugs occur only once a ~month, give or take a month. Another notable flaw is that a millisecond is an eternity on modern machines. Don't write it yourself, google "c++ thread-safe circular buffer" to find code. – Hans Passant Feb 28 '20 at 15:22
  • @HansPassant: thank you. I changed it to 10 nanoseconds. – Andy Feb 29 '20 at 09:31

1 Answers1

1

std::vector<int> is not a thread-safe data structure. So, if you access it from two threads simultaneously, that would be considered undefined behavior. You could crash, have other problems, or it could seemingly work (but still be wrong).

The ints inside the vector, and the ones representing your positions are also not thread-safe -- read/write isn't necessarily atomic (there are lock-free ways to do that).

So, you could totally implement something like this lock-free, but not this way. Some info here: https://www.infoq.com/news/2014/10/cpp-lock-free-programming/

Generally, you want to look at the primitives in std::atomic: https://en.cppreference.com/w/cpp/atomic/atomic

Also see: Ring buffer with atomic indexes

Lou Franco
  • 87,846
  • 14
  • 132
  • 192
  • Thank you for your help! If I change writePos and readPos to atomic that would fix the problem in your second paragraph? – Andy Feb 29 '20 at 08:49
  • Regarding std::vector not being threadsafe. From what I understand reading from different threads is OK? In my case I both read and write. However I never write and read the same element at the same time. The producer is always at least 1 element ahead of the consumer. Also the vector is never resized. Is this not also OK? https://stackoverflow.com/questions/32694114/can-i-make-a-thread-safe-stdatomicvectorint – Andy Feb 29 '20 at 09:19
  • https://www.quora.com/Are-iterator-container-threads-safe-in-C++11 – Andy Feb 29 '20 at 09:56
  • https://en.cppreference.com/w/cpp/container#Thread_safety: "All const member functions can be called concurrently by different threads on the same container.", "operator[], behave as const for the purposes of thread safety", "Different elements in the same container can be modified concurrently by different threads". – Andy Feb 29 '20 at 10:03
  • Great. Then you still need to deal with the pos vars. – Lou Franco Feb 29 '20 at 13:52