0

I'm trying to stop multiple worker threads using a std::atomic_flag. Starting from Issue using std::atomic_flag with worker thread the following works:

#include <iostream>
#include <atomic>
#include <chrono>
#include <thread>

std::atomic_flag continueFlag;
std::thread t;

void work()
{
    while (continueFlag.test_and_set(std::memory_order_relaxed)) {
        std::cout << "work ";
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

void start()
{
    continueFlag.test_and_set(std::memory_order_relaxed);
    t = std::thread(&work);
}

void stop()
{
    continueFlag.clear(std::memory_order_relaxed);
    t.join();
}

int main()
{
    std::cout << "Start" << std::endl;
    start();
    std::this_thread::sleep_for(std::chrono::milliseconds(200));
    std::cout << "Stop" << std::endl;
    stop();
    std::cout << "Stopped." << std::endl;

    return 0;
}

Trying to rewrite into multiple worker threads:

#include <iostream>
#include <atomic>
#include <chrono>
#include <thread>
#include <vector>
#include <memory>

struct thread_data {
    std::atomic_flag continueFlag;
    std::thread thread;
};

std::vector<thread_data> threads;

void work(int threadNum, std::atomic_flag &continueFlag)
{
    while (continueFlag.test_and_set(std::memory_order_relaxed)) {
        std::cout << "work" << threadNum << " ";
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

void start()
{
    const unsigned int numThreads = 2;

    for (int i = 0; i < numThreads; i++) {
        ////////////////////////////////////////////////////////////////////
        //PROBLEM SECTOR
        ////////////////////////////////////////////////////////////////////
        thread_data td;
        td.continueFlag.test_and_set(std::memory_order_relaxed);

        td.thread = std::thread(&work, i, td.continueFlag);

        threads.push_back(std::move(td));
        ////////////////////////////////////////////////////////////////////
        //PROBLEM SECTOR
        ////////////////////////////////////////////////////////////////////
    }
}

void stop()
{
    //Flag stop
    for (auto &data : threads) {
        data.continueFlag.clear(std::memory_order_relaxed);
    }
    //Join
    for (auto &data : threads) {
        data.thread.join();
    }
    threads.clear();
}

int main()
{
    std::cout << "Start" << std::endl;
    start();
    std::this_thread::sleep_for(std::chrono::milliseconds(200));
    std::cout << "Stop" << std::endl;
    stop();
    std::cout << "Stopped." << std::endl;

    return 0;
}

My issue is "Problem Sector" in above. Namely creating the threads. I cannot wrap my head around how to instantiate the threads and passing the variables to the work thread.

The error right now is referencing this line threads.push_back(std::move(td)); with error Error C2280 'thread_data::thread_data(const thread_data &)': attempting to reference a deleted function.

Trying to use unique_ptr like this:

        auto td = std::make_unique<thread_data>();
        td->continueFlag.test_and_set(std::memory_order_relaxed);

        td->thread = std::thread(&work, i, td->continueFlag);

        threads.push_back(std::move(td));

Gives error std::atomic_flag::atomic_flag(const std::atomic_flag &)': attempting to reference a deleted function at line td->thread = std::thread(&work, i, td->continueFlag);. Am I fundamentally misunderstanding the use of std::atomic_flag? Is it really both immovable and uncopyable?

Community
  • 1
  • 1
Sheph
  • 625
  • 1
  • 6
  • 19
  • I haven't read the code, so this may be inapplicable. But `std::atomic_flag` is fairly low level, and a bit tricky to use. Seems to me that `std::atomic` is more appropriate here. It looks just like an ordinary `bool`: `if(my_flag) ...` and `my_flag = true;` and `my_flag = false;`. – Pete Becker Feb 27 '16 at 13:16
  • According to the standard `std::atomic_flag` is guaranteed to be lock free. A simple work thread test incrementing a counter wrapped by `while(continueFlag)` the `std::atomic_flag` is more than 100% faster than `std::atomic`. Given that is a synthetic example, it still indicates that if you poll the flag often the flag version is much better. I tend to poll in semi critical sections to know if the work is cancelled. – Sheph Feb 27 '16 at 22:47
  • Unless you're on some really funky system `std::atomic` will be lock free. I doubt very much that any actual difference in speed between that and `std::atomic_flag` will be noticeable. If the performance of your application is limited by the speed of checking an atomic variable then It's not doing any real work and needs to be redesigned. – Pete Becker Feb 27 '16 at 22:59

2 Answers2

0

Your first approach was actually closer to the truth. The problem is that it passed a reference to an object within the local for loop scope to each thread, as a parameter. But, of course, once the loop iteration ended, that object went out of scope and got destroyed, leaving each thread with a reference to a destroyed object, resulting in undefined behavior.

Nobody cared about the fact that you moved the object into the std::vector, after creating the thread. The thread received a reference to a locally-scoped object, and that's all it knew. End of story.

Moving the object into the vector first, and then passing to each thread a reference to the object in the std::vector will not work either. As soon as the vector internally reallocates, as part of its natural growth, you'll be in the same pickle.

What needs to happen is to have the entire threads array created first, before actually starting any std::threads. If the RAII principle is religiously followed, that means nothing more than a simple call to std::vector::resize().

Then, in a second loop, iterate over the fully-cooked threads array, and go and spawn off a std::thread for each element in the array.

Community
  • 1
  • 1
Sam Varshavchik
  • 114,536
  • 5
  • 94
  • 148
  • I'm not sure if this is correct but `threads.push_back(std::move(td));` should make the call to `push_back` be called as an rvalue reference and as such the default move constructor should be called. So it shouldn't matter if the moved from object goes out of scope. However the problem I found was that `std::atomic_flag` is neither move or copyable (which makes sense after all). Your idea with resize() doesn´t work either, since it **also** calls a move or copy assignment since its memory gets relocated. But a fresh copy of a vector works. See my answer. – Sheph Feb 27 '16 at 22:23
  • @Sheph -- it most certainly matters, since in the original version, the thread receives a reference to the moved-from object, before it gets moved. No matter how you go about moving it, the thread doesn't care. It gets a reference to the moved-from object, and subsequently moving it has no effect on the reference, so each thread continues to attempt to access the object that gets destroyed. Undefined behavior. Your final solution is correct. – Sam Varshavchik Feb 27 '16 at 23:22
0

I was almost there with my unique_ptr solution. I just needed to pass the call as a std::ref() as such:

std::vector<std::unique_ptr<thread_data>> threads;

void start()
{
    const unsigned int numThreads = 2;

    for (int i = 0; i < numThreads; i++) {
        auto td = std::make_unique<thread_data>();
        td->continueFlag.test_and_set(std::memory_order_relaxed);
        td->thread = std::thread(&work, i, std::ref(td->continueFlag));

        threads.push_back(std::move(td));
    }
}

However, inspired by Sam above I also figured a non-pointer way:

std::vector<thread_data> threads;

void start()
{
    const unsigned int numThreads = 2;

    //create new vector, resize doesn't work as it tries to assign/copy which atomic_flag
    //does not support
    threads = std::vector<thread_data>(numThreads);
    for (int i = 0; i < numThreads; i++) {
        auto& t = threads.at(i);
        t.continueFlag.test_and_set(std::memory_order_relaxed);
        t.thread = std::thread(&work, i, std::ref(t.continueFlag));
    }
}
Sheph
  • 625
  • 1
  • 6
  • 19