0

I've written a small thread pool class and adjusted it to the recommendations given by pschill (https://codereview.stackexchange.com/questions/229613/c-standard-thread-threadpool). I've now queuing several tasks until a thread can handle it. The task is made of a enum class State, std::mutex(for state) and a std::packaged_task<R(Args...)>. The task is created by the calling instance (see code example) and the lifetime is ensured and tested. As soon as a thread is ready the task is handled. Up to here everything works just fine in every build-configuration.

The execution of a Debug x64 build (latest VCpp) gave me the expected value of 4. Using any other configuration results in a "random" value....

I've checked the lifetime of every object involved; just as expected.

Task execution

{
    // May unnecessary but added due to previous errors
    std::lock_guard<std::mutex> 
    m_state = State::INPROGESS;
}

m_task(args...);

{
    std::lock_guard<std::mutex> lock(m_statelock);
    m_state = State::DONE;
}

Queuing Task

template <typename R, typename ... Args>
void addTask(Task<R, Args...>& task, Args ... args) {
    // m_tasks is a std::deque<std::function<void()>>
    std::lock_guard<std::mutex> lock(m_tasklock);
    m_tasks.emplace_back([&]() -> void {
        task.execute(args...);
    });

    // Has own mutex
    m_task_avaiable.notify_one(); // std::condition_variable
}

Thread Object Routine

[&]() -> void {
    while (true) {
        while (tpool.m_tasks.size() > 0) {
            {
                // Added due to crash with Release-config at asgining
                // atomic<State> in Task
                std::lock_guard<std::mutex> lock(statelock);
                state = State::INPROGRESS;
            }

            std::function<void()> job;

            {
                std::lock_guard<std::mutex> lock(tpool.m_tasklock);
                if (tpool.m_tasks.size() > 0) {
                    job = std::move(tpool.m_tasks[0]);
                    tpool.m_tasks.pop_front();
                }
            }

            if (job) { job(); }
    }

        {
            std::lock_guard<std::mutex> lock(statelock);
            state = State::IDLE;
        }
        tpool.m_task_avaiable.wait(lock);
        {
            std::lock_guard<std::mutex> lock(statelock);
            if (state == State::STOP) {
                return; // To stop thread and make it joinable
            }

        }
    }
}

Main

frm::util::ThreadPool pool(2);

frm::util::Task<int, int, int> t1([](int a, int b) {
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "Yey!...\n";
    return a + b;
});


pool.addTask(t1, 2, 2);

// Direct access to member std::packaged_task<...> just for debugging
std::cout << t1.m_task.get_future().get() << '\n';

If it helps, the dump of the task-object (x32) showing that a future was requested (byte 61):

 class std::mutex   48 Byte
    1   0x02 0x00 0x00 0x00
    5   0xbc 0x78 0x9f 0x0f
    9   0x00 0x00 0x00 0x00
   13   0x00 0x00 0x00 0x00
   17   0x00 0x00 0x00 0x00
   21   0x00 0x00 0x00 0x00
   25   0x00 0x00 0x00 0x00
   29   0x00 0x00 0x00 0x00
   33   0x00 0x00 0x00 0x00
   37   0x00 0x00 0x00 0x00
   41   0xff 0xff 0xff 0xff
   45   0x00 0x00 0x00 0x00
 struct std::atomic<enum frm::util::Task<int,int,int>::State>    4 Byte
   49   0x02 0x00 0x00 0x00
 class std::packaged_task<int __cdecl(int,int)>   12 Byte
   53   0x50 0xbd 0x15 0x01
   57   0x00 0x00 0x00 0x00
   61   0x01 0x00 0x00 0x00

This there any problem with std::future<T>, std::shared_future<T> or std::packaged_task<T> in VCpp19 or do I miss something?

I can show more code, but due to SO's rules and recommendations I've copied as few as possible.

May also helps:

Before using addTask(Task<R, Args...>& task, Args ... args) addTask(Task<R, Args...>* task, Args ... args) resulted in an error (not in x64 debug) if there was a delay added like:

// Direct access to member std::packaged_task<...> just for debugging
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << t1.m_task.get_future().get() << '\n';
Alphastrick
  • 131
  • 1
  • 9
  • 1
    See [this question](https://stackoverflow.com/questions/47496358/c-lambdas-how-to-capture-variadic-parameter-pack-from-the-upper-scope), it shows how to move parameters pack into lambda. – rafix07 Sep 27 '19 at 09:29

1 Answers1

3

You have undefined behaviour in the below code:

 m_tasks.emplace_back([&]() -> void {
      task.execute(args...);
   });

your lambda captures args by REFERENCE. emplace_back puts functor to m_tasks, then the enclosing scope ends, and when task is called you are access dangling reference.

You can pass it by value:

 m_tasks.emplace_back([=]() -> void {
    task.execute(args...);
});
rafix07
  • 20,001
  • 3
  • 20
  • 33