0

On some machines I have access to, my code gets stuck in read_thread.join() but I can't understand why. The vectors_reader class reads vectors_chunks from a file an maintains them ready for a consumer in a bounded queue.

class vectors_reader {
public:
  // Threads management
  const static int MAX_QUEUE_SIZE = 2;
  std::mutex mutex_;
  std::condition_variable condition_empty_;
  std::condition_variable condition_full_;
  std::queue<vectors_chunk<float>> queue_;
  // Vectors
  unsigned count_;
  unsigned read_count_;
  // [...] Member variables for file management (ifstream etc.)

  void run() {
    while (read_count_ != count_) {
        // Wait until queue is not full
        std::unique_lock<std::mutex> lock(mutex_);
        while (queue_.size() == MAX_QUEUE_SIZE) {
            condition_full_.wait(lock);
        }
        lock.unlock();

        // Read chunk
        vectors_chunk<float> chunk(dim_, chunk_count, read_count_);
        // [...] Read data into chunk

        // Push chunk
        lock.lock();
        read_count_ += chunk_count;
        queue_.push(std::move(chunk));
        lock.unlock();
        condition_empty_.notify_one();
    }
    std::cout << "Reader exit" << std::endl;
  }

  // [...] Constructor: pass filename, open file etc.
  vectors_reader(unsigned count) : count_(count), read_count_(0) // [...]

  bool is_over() {
    bool over = false;
    std::unique_lock<std::mutex> lock(mutex_);
    over = (count_ == read_count_) && queue_.empty();
    lock.unlock();
    return over;
  }

  vectors_chunk<float> get_chunk() {
    std::unique_lock<std::mutex> lock(mutex_);
    while (queue_.empty()) {
        condition_empty_.wait(lock);
    }
    vectors_chunk<float> chunk(std::move(queue_.front()));
    queue_.pop();

    lock.unlock();
    condition_full_.notify_one();
    return chunk;
}

This class is used as follows:

// Init reader
std::shared_ptr<vectors_reader> reader(...);

std::thread read_thread([reader] {
    reader->run();
    std::cerr << "Thread exit";
});

while (!reader->is_over()) {
    std::cerr << "Waiting chunk";
    std::cerr.flush();
    vectors_chunk<float> chunk = reader->get_chunk();
    std::cerr << "\rChunk pop: " << chunk.offset << " " << chunk.count << std::endl;
    // Do something with chunk (also multithreaded)
}
std::cerr << "Reader join" << std::endl;
read_thread.join();

I get the following output:

Chunk pop: 0 200000
Chunk pop: 200000 200000
Chunk pop: 400000 200000
Reader exit
Chunk pop: 600000 200000
Chunk pop: 800000 200000
Reader join

And my code gets stuck.

Xion345
  • 1,627
  • 12
  • 26

0 Answers0