1

I have 2 threads,
one thread creates a pipeline and responsible for it,
another thread creates a communication of that pipeline with outer world,
so I want to create the communication after the pipeline is initialized.
This code works, but:

  • mutex there have a warning: C26110 Caller failing to hold lock 'm' before calling function 'std::_Mutex_base::unlock', what is it about?
  • how I could rewrite it without manual mutex manipulation?

#include <iostream>
#include <mutex>

using namespace std::chrono_literals;

class Pipeline {
public:
    Pipeline() { std::cout << "Pipeline()\n"; };
    void init() { std::cout << "Pipeline::init()\n"; std::this_thread::sleep_for(3s); };
    void run() { while (true) { std::cout << "Pipeline::run()\n"; std::this_thread::sleep_for(1s); } };
};

class Connection {
public:
    Connection(Pipeline* p) { std::cout << "Connection()\n"; };
    void run() { while (true) { std::cout << "Connection::run()\n"; std::this_thread::sleep_for(1s); } };
};

void pipeline_func(Pipeline** pipeline, std::mutex& m) {
    *pipeline = new Pipeline();
    (*pipeline)->init();

    m.unlock();

    (*pipeline)->run(); // run indefinitely
};


void connection_func(Pipeline** pipeline, std::mutex& m) {
    m.lock();

    Connection connection(*pipeline);
    connection.run(); // run indefinitely
};


int main() {
    Pipeline* pipeline = nullptr;

    std::mutex m;
    m.lock(); // lock there for stopping connection3_thread, intil mutex is unlocked in pipeline_thread

    std::thread pipelineTh(pipeline_func, &pipeline, std::ref(m));
    std::thread connectionTh(connection_func, &pipeline, std::ref(m));

    pipelineTh.join();
    connectionTh.join();

    m.unlock();
}
yalov
  • 576
  • 5
  • 15
  • 1
    The only alternative to mutexes are atomic objects. Sadly, historical records of endless quests for lock-free, atomic unicorn fairies, ended in tears more often than not. – Sam Varshavchik Aug 31 '23 at 11:48
  • In `pipeline_func` you unlock the mutex without locking it beforehand on this thread. – wohlstad Aug 31 '23 at 11:53
  • 1
    Without manual lock : `use std::scoped_lock lock{m}` And try to come up with a C++ design that at doesnt require double pointers. Something about your code says memory leask waiting to happen. Investigate, std::vector` , std::vector emplace_back (and the reference it returns). – Pepijn Kramer Aug 31 '23 at 11:58
  • Another option to synchronize pipelines eg. let one stage wait for a previous one is to use std::condition_variable – Pepijn Kramer Aug 31 '23 at 11:59
  • "so I want to create the communication after the pipeline is initialized." So do that. Why are you spawning threads for that, if you can just do it synchronously? This code is seriously messed up. Just initialize everything In a synchronous way. You then avoid this unsafe juggling with the mutex. For example now, if you don't start `pipelineTh` thread, then your code deadlocks. Very bad. – freakish Aug 31 '23 at 12:09
  • @PepijnKramer that is interesting to use vector for not having double-pointers even though I have one pipeline. I was trying to use std::condition_variable but it's still need mutex guard, and condition_variable itself do nothing. scoped_lock also to nothing, because on the common mutex I use lock/unlock logic on all 3 threads: main, pipeline, connection – yalov Aug 31 '23 at 12:13
  • @freakish because `.run()` in both threads run indefinitely, so there is 2 threads for them. and objects are also created in the responsible threads – yalov Aug 31 '23 at 12:20
  • 1
    @yalov yes, I understand that. That's why you should initialize everything before spawning threads, in the order you want it, and only then spawn threads to just call run(). You don't even need mutex at all in such setup (unless run() calls require it). – freakish Aug 31 '23 at 12:22
  • You say your code works, but when I compile it, `Connection::run()` never runs. So you are perhaps relying on undefined behavior by calling `unlock()` in a thread that doesn't have ownership. – VLL Aug 31 '23 at 12:23
  • @Yalov each pipeline stage can have its own condition variable. Don't expose it directly just use it in implementation fot get_next_update blocking call by another pipeline stage. – Pepijn Kramer Aug 31 '23 at 12:29
  • @VLL thanks, it's undefined behavior https://stackoverflow.com/questions/43487357/how-stdmutex-got-unlocked-in-different-thread – yalov Aug 31 '23 at 12:31

3 Answers3

1

If you want to avoid mutexes altogether I would recommend using atomics, however atomics can be difficult to use if you are dealing with multiple threads as they cannot be put into the standard vector class, so you would have to resort to the much dreaded manual heap allocations.

Other than that you could also use scoped locks that you could implement to facilitate the management of mutexes: How does scope-locking work?.

Additionally, I would like to point out that in your current code you are calling for m.lock() in your connection_func() function but you are not calling the unlock afterwards (within the same scope), so you may experience thread locking, so I would avoid writing code where the mutex lock is call within a function but the mutex is unlocked outside of it as it could lead to the above mentioned conditions. This is a good reason to use scoped locks, as you will be able to protect the critical section of code without having to worry about unlocking the mutex later.

Valdez
  • 46
  • 3
1

Your current code is relying on undefined behavior by calling unlock() in a thread that doesn't have ownership.

From mutex::unlock:

The mutex must be locked by the current thread of execution, otherwise, the behavior is undefined.

You can simplify the code by initializing the pipeline before launching the threads. Then you don't need a mutex at all. If initialization takes a long time, you could do it in a thread, and launch the two threads in that thread.

#include <iostream>
#include <memory>
#include <thread>

using namespace std::chrono_literals;

class Pipeline {
public:
    Pipeline() { std::cout << "Pipeline()\n"; };
    void init() { std::cout << "Pipeline::init()\n"; std::this_thread::sleep_for(3s); };
    void run() { while (true) { std::cout << "Pipeline::run()\n"; std::this_thread::sleep_for(1s); } };
};

class Connection {
public:
    Connection(Pipeline* p) { std::cout << "Connection()\n"; };
    void run() { while (true) { std::cout << "Connection::run()\n"; std::this_thread::sleep_for(1s); } };
};

void pipeline_func(std::shared_ptr<Pipeline> pipeline) {
    pipeline->run(); // run indefinitely
};

void connection_func(std::shared_ptr<Pipeline> pipeline) {
    Connection connection(pipeline.get());
    connection.run(); // run indefinitely
};

int main() {
    auto pipeline = std::make_shared<Pipeline>();
    pipeline->init();

    std::thread pipelineTh(pipeline_func, pipeline);
    std::thread connectionTh(connection_func, pipeline);

    pipelineTh.join();
    connectionTh.join();
}
yalov
  • 576
  • 5
  • 15
VLL
  • 9,634
  • 1
  • 29
  • 54
1

You are misusing std::mutex. It is meant for one purpose only—to ensure that when variables are shared between threads, every thread always sees the variables in some "consistent" state. It is not possible for one thread to unlock a mutex that was locked by a different thread.

The most primitive way to use it* looks like this:

std::mutex m;

some_type some_function(...) {
    // Don't access shared variables here. Don't even _look_ at them here.
    m.lock();
    // Now it's OK to access shared variables.
    ...
    // Ensure that shared variables are in the "consistent" state before 
    // unlocking.
    m.unlock();
    // Again, do NOT access shared variables while the mutex is unlocked.
    return ...something...
}

The mutex does two things:

  1. It never allows more than one thread to have it locked at the same time. If thread A calls m.lock() when thread B already has the mutex locked, then the m.lock() call will not return until after thread B calls m.unlock(). [Note: "mutex" is short for "MUTual EXclusion."]

  2. On multi-processor hardware, the lock() and unlock() functions ensure that changes in the shared data will be propagated from one processor to the other. Any change to any variable made by thread B before thread B unlocks the mutex is guaranteed to be seen by thread A after thread A subsequently locks the mutex.


Starting with C++20, there's a new synchronization object you can use: std::binary_semaphore. A binary semaphore is a lot like a mutex, except you can "unlock" it in a different thread from the thread that locked it.

As a rule of thumb, if you want other programmers to have the least difficulty reading and understanding your code, then you should use mutexes for protecting shared data (as above), and use semaphores for signalling changes in the program state between threads.


* It's better to use an RAII object to manage the locking and unlocking. For example:

std::mutex m;

some_type some_function(...) {
    // Don't access shared variables here. Don't even _look_ at them here.

    // The constructor of the `lock` object calls `m.lock()`.
    std::shared_lock<std::mutex> lock(m);

    // Now it's OK to access shared variables.
    ...
    // Ensure that shared variables are in the "consistent" state before 
    // leaving the scope of the `lock` object.
    return ...something...
}
// The destructor of the `lock` object guarantees to call `m.unlock()` upon
// exit from the scope (exit from some_function, in this case.) It will
// call `m.unlock()` even if we leave the scope because of an exception
// being thrown.
Solomon Slow
  • 25,130
  • 5
  • 37
  • 57