0

Problem

I think I'm misunderstanding the CV-Mutex design pattern because I'm creating a program that seems to not need a mutex, only CV.


Goal Overview

I am parsing a feed from a website from 2 different accounts. Alice, Bob. The parsing task is slow, so I have two separate threads each dedicated to handling the feeds from Alice and Bob.

I then have a thread that receives messages from the network and assigns the work to either the threadA or threadB, depending on who the update message is for. That way the reader/network thread isn't stalled, and the messages for Alice are in-order and the messages for Bob are in-order, too.

I don't care if Alice thread is a little bit behind Bob thread chronologically, as long as the individual account feeds are in-order.


Implementation Details

This is very similar to a thread pool, except the threads are essentially locked to a fixed-size array of size 2, and I use the same thread for each feed.

I create a AccountThread class which maintains a queue of JSON messages to be processed as soon as possible within the class. Here is the code for that:

#include <queue>
#include <string>
#include <condition_variable>
#include <mutex>

using namespace std;
class AccountThread {

public:
    AccountThread(const string& name) : name(name) { }

    void add_message(const string& d) {
        this->message_queue.push(d);
        this->cv.notify_all(); // could also do notify_one but whatever
    }

    void run_parsing_loop() {
        while (true) {
            std::unique_lock<std::mutex> mlock(lock_mutex);
            cv.wait(mlock, [&] {
                return this->is_dead || this->message_queue.size() > 0;
            });

            if (this->is_dead) { break; }

            const auto message = this->message_queue.front();
            this->message_queue.pop();

            // Do message parsing...
        }
    }

    void kill_thread() {
        this->is_dead = true;
    }


private:
    const string& name;

    condition_variable cv;
    mutex lock_mutex;
    queue<string> message_queue;

    // To Kill Thread if Needed
    bool is_dead;
};

I can add the main.cpp code, but it's essentially just a reader loop that calls thread.add_message(message) based on what the account name is.


Question

Why do I need the lock_mutex here? I don't see it's purpose since this class is essentially single-threaded. Is there a better design pattern for this? I feel like if I'm including a variable that I don't really need, such as the mutex then I'm using the wrong design pattern for this task.

I'm just adapting the code from some article I saw online about a threadpool implementation and was curious.

halfer
  • 19,824
  • 17
  • 99
  • 186
JoeVictor
  • 1,806
  • 1
  • 17
  • 38
  • In this case `is_dead` must be protected by mutex otherwise there will be a race condition. Same for other objects being modified from different threads. – user7860670 Sep 19 '22 at 18:01
  • If you do not protect `message_queue` from simultaneous adding/removing things will be very, very bad. – user4581301 Sep 19 '22 at 18:02
  • 1
    See https://en.cppreference.com/w/cpp/thread/condition_variable and note the use of the words "must" and "has to" - correct behaviour is simply not guaranteed if you don't lock the mutex. – Jesper Juhl Sep 19 '22 at 18:02
  • @user7860670 i actually don't need that, but thanks noted. – JoeVictor Sep 19 '22 at 18:03
  • @user4581301 what if I guarantee through structure of the code that there is only one reader thread and one consumer thread? So no way for simultaneous adding or removing – JoeVictor Sep 19 '22 at 18:04
  • 1
    Wait a second. You have no thread? `condition_variable`'s not all that useful without a thread. – user4581301 Sep 19 '22 at 18:04
  • 1
    You have to ensure that it is not possible for the producer to update while the consumer is consuming. Doable, but kind-of defeats the point of threading. You're more in a State Machine world than a threading world at that point. – user4581301 Sep 19 '22 at 18:07
  • So I guess use a thread-safe queue right? That's the only race condition I'm seeing – JoeVictor Sep 19 '22 at 18:07
  • And if you have threads involved you DO need to protect `is_dead` One of the worries is the compiler sees that nothing in `run_parsing_loop` ever changes `is_dead` and optimizes the test out. – user4581301 Sep 19 '22 at 18:08
  • yeah I'm taking out `is_dead` there's really no point in running this script if one of the threads is dead. Gonna use [tbb's Concurrent Queue](https://oneapi-src.github.io/oneTBB/main/tbb_userguide/Concurrent_Queue_Classes.html) since it's thread safe and the `pop()` method there automatically waits until there's something there – JoeVictor Sep 19 '22 at 18:11
  • if you want to submit an answer, I'll accept it – JoeVictor Sep 19 '22 at 18:11
  • 1
    A lock-free queue would be a fabulous tool for this job. I don't think it'll be that easy to make with a queue of `std::string`. Lot of stuff going on in a `string` that isn't thread safe. – user4581301 Sep 19 '22 at 18:12
  • the strings are read only though? I update internal state using the contents, and copy the string message over into the queue on addition. How could any issues arise? – JoeVictor Sep 19 '22 at 18:14
  • 1
    I'm not going to go as far as say it's impossible, but copying the string in or constructing it in place is not something that can be made instantaneous. This is almost certainly solved by TBB, but I have no idea what infrastructure is used in the background to guarantee it. You also need to have some way out of `run_parsing_loop` otherwise you break [the forward progress rules](https://en.cppreference.com/w/cpp/language/memory_model#Forward_progress). An [`atomic_flag`](https://en.cppreference.com/w/cpp/atomic/atomic_flag) should help with that, though. – user4581301 Sep 19 '22 at 18:21
  • Updating internal state of queue is not a thread-safe operation. Modifying operations are thread-safe when they are either atomic or protected by mutex. – user7860670 Sep 19 '22 at 18:21
  • Mind you, setting the kill flag is not all that helpful if the queue is blocked waiting for a message. You're going to be better off sending a special terminate message. – user4581301 Sep 19 '22 at 18:31
  • Who is calling `run_parsing_loop` ? Your class is called `AccountThread` but there is no thread in sight. Also it's not callable from "outside" the class since it has a `while(true)` loop. Typical implementations spawn a thread in the constructor and have it run `while(!is_dead)`. Certainly looks like it's not the right tool for the job. Also you have undefined behavior – Nikos Athanasiou Sep 19 '22 at 19:06
  • @NikosAthanasiou it's called as so: `std::thread([&alice_account] {alice_account.run_parsing_loop()});` – JoeVictor Sep 19 '22 at 19:18
  • @user4581301 messages are sent by the network indefinitely. I could just terminate on next read. Not the cleanest implementation, but it also avoids the infinite loop problem i guess – JoeVictor Sep 19 '22 at 19:20
  • 1
    @JoeVictor There's your answer then. You need the mutex to synchronize access to the data (queue) between the thread running the loop and the thread that calls `add_message`. (at least there, there's multiple data races in the code as presented here) – Nikos Athanasiou Sep 19 '22 at 19:31

1 Answers1

1

First things first: there's no condition_variable::wait without a mutex. The interface of wait requires a mutex. So regarding

I'm creating a program that seems to not need a mutex, only CV

note that the mutex is needed to protect the condition variable itself. If the notion of how you'd have a data race without the mutex doesn't immediately make sense, check Why do pthreads’ condition variable functions require a mutex.

Secondly there's multiple pain points in the code you provide. Consider this version where the problems are addressed and I'll explain the issues below:

class AccountThread {

public:
    AccountThread(const string& name) : name(name) 
    {
        consumer = std::thread(&AccountThread::run_parsing_loop, this); // 1
    }
    
    ~AccountThread()
    {
        kill_thread(); // 2
        consumer.join();
    }

    void add_message(const string& d) {
        {
            std::lock_guard lok(lock_mutex); // 3
            this->message_queue.push(d);
        }
        this->cv.notify_one();
    }

private:
    void run_parsing_loop() 
    {
        while (!is_dead) {
            std::unique_lock<std::mutex> mlock(lock_mutex);
            cv.wait(mlock, [this] { // 4
                return is_dead || !message_queue.empty();
            });

            if (this->is_dead) { break; }

            std::string message = this->message_queue.front();
            this->message_queue.pop();

            string parsingMsg = name + " is processing " + message + "\n";
            std::cout << parsingMsg;
        }
    }

    void kill_thread() {
        {
            std::lock_guard lock(lock_mutex);
            this->is_dead = true;
        }
        cv.notify_one(); // 5
    }

private:
    string name; // 6

    mutable condition_variable cv; // 7
    mutable mutex lock_mutex;
    std::thread consumer;
    queue<string> message_queue;

    bool is_dead{false}; // 8
};

Top to bottom the problems noted (in the numbered comments are):

  1. If you have a worker thread class, like AccountThread, it's easier to get right when the class provides the thread. This way only the relevant interface is exposed and you have better control over the lifetime and workings of the consumer.
  2. Case in point, when an AccountThread "dies" the worker should also die. In the example above I fix this dependency by killing the consumer thread inside the destructor.
  3. add_message caused a data race in your code. Since you intend to run the parsing loop in a different thread, it's wrong to simply push to the queue without having a critical section.
  4. It's cleaner to capture this here, e.g. you probably don't need the reference to mlock captured.
  5. kill_thread was not correct. You need to notify the, potentially waiting, consumer thread that a change in state happened. To correctly do that you need to protect the state checked in the predicate with a lock.
  6. The initial version with const string &name is probably not something you want. Member const references don't extend the lifetime of temporaries, and the way your constructor is written can leave an instance with dangling state. Even if you do the typical checks, overload the constructor with an r-value reference version, you'll be depending on an external string being alive longer than your AccountThread object. Better use a value member.
  7. Remember the M&M rule.
  8. You had undefined behavior. The is_alive member was used without being initialized.

Demo

All in all, I think the suggested changes point in the right direction. You can also check an implementation of a Go-like communication channel if you want more insight on how something like the TBB component you mention is implemented. Such a channel (or buffer queue) would simplify implementation to avoid manual usage of mutexes, CVs and alive states:

class AccountThread {
public:
    AccountThread(const string& name) : name(name) {
        consumer = std::thread(&AccountThread::run_parsing_loop, this);
    }
    
    ~AccountThread() {
        kill_thread();
        consumer.join();
    }

    void add_message(const string& d) { _data.push(d); }

private:
    void run_parsing_loop() {
        try {
            while (true) {
                // This pop waits until there's data or the channel is closed.
                auto message = _data.pop();
                // TODO: Implement parsing here
            }
        } catch (...) { 
            // Single exception thrown per thread lifetime
        }
    }

    void kill_thread() { _data.set(yap::BufferBehavior::Closed); }

private:
    string name;
    std::thread consumer;
    yap::BufferQueue<string> _data;
};

Demo2

Nikos Athanasiou
  • 29,616
  • 15
  • 87
  • 153