I've written a small thread pool class and adjusted it to the recommendations given by pschill (https://codereview.stackexchange.com/questions/229613/c-standard-thread-threadpool).
I've now queuing several tasks until a thread can handle it.
The task is made of a enum class State
, std::mutex
(for state) and a std::packaged_task<R(Args...)>
. The task is created by the calling instance (see code example) and the lifetime is ensured and tested. As soon as a thread is ready the task is handled.
Up to here everything works just fine in every build-configuration.
The execution of a Debug x64 build (latest VCpp) gave me the expected value of 4
. Using any other configuration results in a "random" value....
I've checked the lifetime of every object involved; just as expected.
Task execution
{
// May unnecessary but added due to previous errors
std::lock_guard<std::mutex>
m_state = State::INPROGESS;
}
m_task(args...);
{
std::lock_guard<std::mutex> lock(m_statelock);
m_state = State::DONE;
}
Queuing Task
template <typename R, typename ... Args>
void addTask(Task<R, Args...>& task, Args ... args) {
// m_tasks is a std::deque<std::function<void()>>
std::lock_guard<std::mutex> lock(m_tasklock);
m_tasks.emplace_back([&]() -> void {
task.execute(args...);
});
// Has own mutex
m_task_avaiable.notify_one(); // std::condition_variable
}
Thread Object Routine
[&]() -> void {
while (true) {
while (tpool.m_tasks.size() > 0) {
{
// Added due to crash with Release-config at asgining
// atomic<State> in Task
std::lock_guard<std::mutex> lock(statelock);
state = State::INPROGRESS;
}
std::function<void()> job;
{
std::lock_guard<std::mutex> lock(tpool.m_tasklock);
if (tpool.m_tasks.size() > 0) {
job = std::move(tpool.m_tasks[0]);
tpool.m_tasks.pop_front();
}
}
if (job) { job(); }
}
{
std::lock_guard<std::mutex> lock(statelock);
state = State::IDLE;
}
tpool.m_task_avaiable.wait(lock);
{
std::lock_guard<std::mutex> lock(statelock);
if (state == State::STOP) {
return; // To stop thread and make it joinable
}
}
}
}
Main
frm::util::ThreadPool pool(2);
frm::util::Task<int, int, int> t1([](int a, int b) {
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "Yey!...\n";
return a + b;
});
pool.addTask(t1, 2, 2);
// Direct access to member std::packaged_task<...> just for debugging
std::cout << t1.m_task.get_future().get() << '\n';
If it helps, the dump of the task-object (x32) showing that a future was requested (byte 61):
class std::mutex 48 Byte
1 0x02 0x00 0x00 0x00
5 0xbc 0x78 0x9f 0x0f
9 0x00 0x00 0x00 0x00
13 0x00 0x00 0x00 0x00
17 0x00 0x00 0x00 0x00
21 0x00 0x00 0x00 0x00
25 0x00 0x00 0x00 0x00
29 0x00 0x00 0x00 0x00
33 0x00 0x00 0x00 0x00
37 0x00 0x00 0x00 0x00
41 0xff 0xff 0xff 0xff
45 0x00 0x00 0x00 0x00
struct std::atomic<enum frm::util::Task<int,int,int>::State> 4 Byte
49 0x02 0x00 0x00 0x00
class std::packaged_task<int __cdecl(int,int)> 12 Byte
53 0x50 0xbd 0x15 0x01
57 0x00 0x00 0x00 0x00
61 0x01 0x00 0x00 0x00
This there any problem with std::future<T>
, std::shared_future<T>
or std::packaged_task<T>
in VCpp19 or do I miss something?
I can show more code, but due to SO's rules and recommendations I've copied as few as possible.
May also helps:
Before using addTask(Task<R, Args...>& task, Args ... args)
addTask(Task<R, Args...>* task, Args ... args)
resulted in an error (not in x64 debug) if there was a delay added like:
// Direct access to member std::packaged_task<...> just for debugging
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << t1.m_task.get_future().get() << '\n';