I am trying to implement a producer <-> consumer pattern in C++. When I read about this pattern they always seems to mention a potential deadlock that has to be avoided. However I have implemented this below without using any mutex below. What is wrong with my code?
#include <vector>
#include <iostream>
#include <chrono>
#include <thread>
#include <atomic>
class CircularBuffer
{
public:
CircularBuffer();
int* getWritePos();
void finishedWriting();
int* getReadPos();
void finishedReading();
private:
void waitForReaderToCatchUp();
void waitForWriterToCatchUp();
const int size = 5;
std::vector<int> data;
// Changed from int since these variables are shared between the two threads and assignment is not necessarily atomic:
std::atomic<int> writePos = 0;
std::atomic<int> readPos = 0;
};
CircularBuffer::CircularBuffer() {
data.resize(size);
}
void
CircularBuffer::waitForReaderToCatchUp() {
int unread = writePos - readPos;
while (unread >= size) {
std::this_thread::sleep_for(std::chrono::nanoseconds(10));
unread = writePos - readPos;
}
}
int*
CircularBuffer::getWritePos() {
waitForReaderToCatchUp();
int pos = writePos % size;
return &data[pos];
}
void
CircularBuffer::finishedWriting() {
writePos++;
}
void
CircularBuffer::waitForWriterToCatchUp() {
int unread = writePos - readPos;
while (unread < 1) {
std::this_thread::sleep_for(std::chrono::nanoseconds(10));
unread = writePos - readPos;
}
}
int*
CircularBuffer::getReadPos() {
waitForWriterToCatchUp();
int pos = readPos % size;
return &data[pos];
}
void
CircularBuffer::finishedReading() {
readPos++;
}
const int produceMinTime = 100;
void produce(CircularBuffer *cb) {
for (int i = 0; i < 15; i++) {
int r = rand() % 1000;
std::this_thread::sleep_for(std::chrono::milliseconds(produceMinTime + r));
int *p = cb->getWritePos();
memcpy(p, &i, 4);
cb->finishedWriting();
}
}
void consume(CircularBuffer *cb) {
for (int i = 0; i < 15; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
int *p = cb->getReadPos();
int j = *p;
std::cout << "Value: " << j << std::endl;
cb->finishedReading();
}
}
int main()
{
CircularBuffer cb;
std::thread t1(produce, &cb);
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
std::thread t2(consume, &cb);
t1.join();
t2.join();
int k;
std::cin >> k;
}