I'm implementing a threadPool with waitable tasks. The threadPool saves tasks in a threadsafe deque, here threadsafeQueue<function_wrapper>
. Tasks are callable objects where almost all of the arguments are bound with std::bind
except of the last. I started with the implementation from Anthony Williams' Concurrency In Action book (Listing 9.2). I have difficulties implementing the function_wrapper
and the variadic template function submit
so that when a task is popped out from the deque try_pop(task)
and executed task()
I still can set the last argument like task(myLastArgument)
.
This is Anthony Williams' function wrapper:
class function_wrapper
{
struct impl_base {
virtual void call() = 0;
virtual ~impl_base() {}
};
std::unique_ptr<impl_base> impl;
template<typename F>
struct impl_type : impl_base
{
F f_;
impl_type(F&& f)
: f_(std::move(f))
{}
void call() { f_(); }
};
public:
template<typename F>
function_wrapper(F&& f)
: impl(new impl_type<F>(std::move(f)))
{}
void operator() () { impl->call(); }
function_wrapper() = default;
function_wrapper(function_wrapper&& other)
: impl(std::move(other.impl))
{}
function_wrapper& operator=(function_wrapper&& other)
{
impl = std::move(other.impl);
return *this;
}
function_wrapper(const function_wrapper&) = delete;
function_wrapper(function_wrapper&) = delete;
function_wrapper& operator=(const function_wrapper&) = delete;
};
This is the abbreviated threadPool:
template <typename T>
class threadPool
{
private:
threadsafeQueue<function_wrapper> workQueue_;
std::atomic_bool done_;
std::vector<std::thread> threads_;
void workerThread(unsigned long thread);
T threadData_;
template<typename FunctionType>
auto submit(FunctionType&& f)
{
typedef typename std::result_of<FunctionType()>::type resultType;
std::packaged_task<resultType()> task(std::move(f));
std::future<resultType> result(task.get_future());
workQueue_.push(std::move(task));
return result;
}
};
template <typename T>
void threadPool<T>::workerThread(unsigned long thread)
{
while (!done_)
{
function_wrapper task;
if(workQueue_.try_pop(task))
{
task();
}
else
{
std::this_thread::yield();
}
}
}
With the shown implementation I can submit callable objects to the pool. The variadic template function submit
creates a std::packaged_task<resultType()>
and puts it on the deque. The worker threads run workerThread(unsigned long thread)
and do there work by popping out the tasks previously inserted by the main thread and executing them with task()
.
A simple example for a main function:
int main()
{
threadPool<double> pool(5UL);
auto fut = pool.submit(std::bind((void(*) (int,int))doTheWork,11,22));
fut.wait();
}
What I would like to have is a submit call in my main function like:
auto fut = pool.submit(std::bind((void(*) (int,int))doTheWork,11,std::placeholders::_1));
The task execution inside my thread pool should then look like:
task(thread);
How do I have to change the variadic template function submit
and the function_wrapper
to achieve my goal?
Thanks in advance.
UPDATE:
I tried the function wrapper implementation from:
Return values for active objects
Now the function wrapper is moveable only and can take arguments, thanks to Yakk. But the problem now is that the new function wrapper can not handle std::bind()
with std::placeholders
. For example I can define a task with the new wrapper like
myWrapper<void(int,int)> task1 = doTheWork;
task1(66,77);
but I can not bind function arguments in a flexible way:
myWrapper<void()> task2 = std::bind(doTheWork, 66, 77); // ERROR
With the error in VS2019:
error C2440: 'initializing': cannot convert from 'std::_Binder<std::_Unforced,void (__cdecl *)(int,int),int,int>' to 'myWrapper<void (void)>'
message : No user-defined-conversion operator available that can perform this conversion, or the operator cannot be called
Or when I try it with std::placeholders
:
myWrapper<void(int)> task2 = std::bind(doTheWork, 66, std::placeholders::_1); // ERROR
task2(77);
I get the following error in VS2019:
error C2338: tuple index out of bounds
message : see reference to class template instantiation 'std::tuple_element<0,std::tuple<>>' being compiled
message : see reference to alias template instantiation 'std::tuple_element_t<0,std::tuple<>>' being compiled
message : see reference to function template instantiation 'const tuple_element<_Index,std::tuple<_Rest...>>::type &&std::get(const std::tuple<_Rest...> &&) noexcept' being compiled
message : see reference to class template instantiation 'std::result_of<const F &(void)>' being compiled
message : with
message : [
message : F=std::_Binder<std::_Unforced,void (__cdecl *)(int,int),int,const std::_Ph<1> &>
message : ]
message : see reference to alias template instantiation 'std::result_of_t<const std::_Binder<std::_Unforced,void (__cdecl *)(int,int),int,const std::_Ph<1> &>&(void)>' being compiled
message : see reference to class template instantiation 'std::integral_constant<bool,false>' being compiled
message : see reference to class template instantiation 'std::disjunction<_Traits...>' being compiled
error C2440: 'initializing': cannot convert from 'std::_Binder<std::_Unforced,void (__cdecl *)(int,int),int,const std::_Ph<1> &>' to 'myWrapper<void (void)>'
message : No user-defined-conversion operator available that can perform this conversion, or the operator cannot be called
However with std::function
it´s possible:
std::function<void(int)> task3 = std::bind(doTheWork, 66, std::placeholders::_1);
task3(77);
Any hints how to implement a movable only function wrapper with flexible arguments? The aim is still to use it inside a thread pool where the function wrapper is pased to a std::packaged_task
.