0

I have the following MpscQueue implementation

EDIT: added an is_running atomic, but problem still persists.

template<typename T>
class MpscQueue {
    public:
    MpscQueue() = default;
    MpscQueue(MpscQueue&&) = delete;

    bool wait_and_pop(T& val, std::atomic<bool>& is_running) {
        std::unique_lock<std::mutex> lock(mutex);

        cond_var.wait(lock,
            [this, &is_running]{ return queue.size() > 0 || !is_running; });

        if (!is_running) return false;

        val = std::move(queue.front());
        queue.pop();
        return true;
    }

    template<typename U>
    void push(U&& val) {
        auto const is_empty = [&]{
            auto const lock = std::unique_lock(mutex);
            auto const res = queue.empty();

            queue.push(std::forward<U>(val));

            return res;
        }();

        if (is_empty) cond_var.notify_one();
    }

    private:
    std::queue<T> queue;
    std::mutex mutex;
    std::condition_variable cond_var;
};

I am attempting to pop a value like this

// At some point earlier
MpscQueue<Message> mailbox;
std::atomic<boo> is_running{true}; // Is set to false at a later time
void run_once() {
    Message m;
    mailbox.wait_and_pop(m, is_running);
    // process_message(std::move(m));
}

The above code run_once is being fed into the thread constructor. My issue is that if I attempt to join the thread that this is on, it gets stuck in the condition variable wait condition. What would be the best way to solve this? I tried passing an atomic by reference as a parameter into wait_and_pop but it did not seem to be updating and also did not seem like a smart implementation decision.

Epsilon
  • 11
  • 3
  • "The best way" is mostly a meaningless description, because you need to define what "best" means to you, and only you can answer this question. The C++ library offers no means for a thread to be notified, in any way, that someone's trying to join it. Therefore it is necessary to implement some custom mechanism to do that so that thread knows to terminate. The "best" way to do that is something only you can define, based on the specific way your program works. – Sam Varshavchik Jan 01 '20 at 19:24
  • Ok I added what I tried, but it did not work. – Epsilon Jan 01 '20 at 19:25
  • Ok it would be UB, but that would not explain the blocking would it? Also I updated it to fix the UB. – Epsilon Jan 01 '20 at 19:38
  • It is hard to say for sure without a [repro]. In particular I am still not sure, how you modify `is_running` and how you notify the thread of the modification. – walnut Jan 01 '20 at 19:45
  • I simply set `is_running=true` in the main thread, but I am not sure what you mean by notifying the thread of modification – Epsilon Jan 01 '20 at 19:47
  • 1
    @Epsilon You need to `notify_*` the `std::condition_variable`, otherwise the thread may never wake up to test the condition... – walnut Jan 01 '20 at 19:47
  • Ok that makes sense. I solved this issue by setting `is_running=true` and then calling `notify_one()`. I appreciate the help. – Epsilon Jan 01 '20 at 19:53
  • 1
    @Epsilon Sorry, I messed up my first comment, so just in case: You need to also make sure that you are modifying `is_runnning` (no matter whether it is atomic or not) while holding the mutex, see [this question](https://stackoverflow.com/questions/41867228/why-do-i-need-to-acquire-a-lock-to-modify-a-shared-atomic-variable-before-noti). – walnut Jan 01 '20 at 20:09
  • Ok will do, thank you – Epsilon Jan 01 '20 at 20:53

0 Answers0