On some machines I have access to, my code gets stuck in read_thread.join()
but I can't understand why. The vectors_reader
class reads vectors_chunk
s from a file an maintains them ready for a consumer in a bounded queue.
class vectors_reader {
public:
// Threads management
const static int MAX_QUEUE_SIZE = 2;
std::mutex mutex_;
std::condition_variable condition_empty_;
std::condition_variable condition_full_;
std::queue<vectors_chunk<float>> queue_;
// Vectors
unsigned count_;
unsigned read_count_;
// [...] Member variables for file management (ifstream etc.)
void run() {
while (read_count_ != count_) {
// Wait until queue is not full
std::unique_lock<std::mutex> lock(mutex_);
while (queue_.size() == MAX_QUEUE_SIZE) {
condition_full_.wait(lock);
}
lock.unlock();
// Read chunk
vectors_chunk<float> chunk(dim_, chunk_count, read_count_);
// [...] Read data into chunk
// Push chunk
lock.lock();
read_count_ += chunk_count;
queue_.push(std::move(chunk));
lock.unlock();
condition_empty_.notify_one();
}
std::cout << "Reader exit" << std::endl;
}
// [...] Constructor: pass filename, open file etc.
vectors_reader(unsigned count) : count_(count), read_count_(0) // [...]
bool is_over() {
bool over = false;
std::unique_lock<std::mutex> lock(mutex_);
over = (count_ == read_count_) && queue_.empty();
lock.unlock();
return over;
}
vectors_chunk<float> get_chunk() {
std::unique_lock<std::mutex> lock(mutex_);
while (queue_.empty()) {
condition_empty_.wait(lock);
}
vectors_chunk<float> chunk(std::move(queue_.front()));
queue_.pop();
lock.unlock();
condition_full_.notify_one();
return chunk;
}
This class is used as follows:
// Init reader
std::shared_ptr<vectors_reader> reader(...);
std::thread read_thread([reader] {
reader->run();
std::cerr << "Thread exit";
});
while (!reader->is_over()) {
std::cerr << "Waiting chunk";
std::cerr.flush();
vectors_chunk<float> chunk = reader->get_chunk();
std::cerr << "\rChunk pop: " << chunk.offset << " " << chunk.count << std::endl;
// Do something with chunk (also multithreaded)
}
std::cerr << "Reader join" << std::endl;
read_thread.join();
I get the following output:
Chunk pop: 0 200000
Chunk pop: 200000 200000
Chunk pop: 400000 200000
Reader exit
Chunk pop: 600000 200000
Chunk pop: 800000 200000
Reader join
And my code gets stuck.