0

I need a thread safe queue that yields data if it's not empty, and waits for a data to arrive. With a timeout. I have it like this

void ThreadsafeQueue::enqueue(data_t& data)
{
    std::lock_guard<std::mutex> lock(m_mutex);
    m_queue.push(data);

    m_condvar.notify_one();
}

boost::optional<data_t> ThreadsafeQueue::dequeue()
{
    std::unique_lock<std::mutex> lock(m_mutex);
    const std::chrono::seconds DEFAULT_DELAY(10);
    if ((!m_queue.empty()) || m_condvar.wait_for(lock, DEFAULT_DELAY) == std::cv_status::no_timeout)
    {
        const auto data = m_queue.front();
        m_queue.pop();
        return data;
    }

    return boost::none;
}

But, for some reason sometimes it sometimes enters the if statement with no_timeout when actually it wasn't notified. And tries to use front() on an empty queue.

Am I doing it wrong?

Eugene Mart
  • 185
  • 1
  • 10

2 Answers2

2

Conditional variables are sometimes awoken by the system even when not notified. That is why, after awoken, you need to check if the condition was actually met or otherwise wait again.

Attersson
  • 4,755
  • 1
  • 15
  • 29
2

Since it's possible that multiple threads may be awoken by kicking a condition variable, or that spurious wake-ups can sometimes happen(1), you should check the predicate on wake-up to ensure it's true.

In other words, your dequeue code would be better off as something like:

boost::optional<data_t> ThreadsafeQueue::dequeue() {
    std::unique_lock<std::mutex> lock(m_mutex);
    static const std::chrono::seconds DEFAULT_DELAY(10);

    // Only wait on empty queue, return none on timeout.
    if (m_queue.empty())
        if (m_condvar.wait_for(lock, DEFAULT_DELAY) == std::cv_status::timeout)
            return boost::none;

    // Catch spurious wake-up.
    if (m_queue.empty())
        return boost::none;

    // Have item in queue, extract and return it.
    const auto data = m_queue.front();
    m_queue.pop();
    return data;
}

(1) As per here (my emphasis):

Atomically releases lock, blocks the current executing thread, and adds it to the list of threads waiting on *this. The thread will be unblocked when notify_all() or notify_one() is executed, or when the relative timeout rel_time expires. It may also be unblocked spuriously.

paxdiablo
  • 854,327
  • 234
  • 1,573
  • 1,953