1

I'm implementing a threadPool with waitable tasks. The threadPool saves tasks in a threadsafe deque, here threadsafeQueue<function_wrapper>. Tasks are callable objects where almost all of the arguments are bound with std::bind except of the last. I started with the implementation from Anthony Williams' Concurrency In Action book (Listing 9.2). I have difficulties implementing the function_wrapper and the variadic template function submit so that when a task is popped out from the deque try_pop(task) and executed task() I still can set the last argument like task(myLastArgument).

This is Anthony Williams' function wrapper:

class function_wrapper
{
    struct impl_base {
        virtual void call() = 0;
        virtual ~impl_base() {}
    };

    std::unique_ptr<impl_base> impl;

    template<typename F>
    struct impl_type : impl_base
    {
        F f_;
        impl_type(F&& f)
        : f_(std::move(f))
        {}

        void call() { f_(); }
    };

public:

    template<typename F>
    function_wrapper(F&& f)
    : impl(new impl_type<F>(std::move(f)))
    {}

    void operator() () { impl->call(); }

    function_wrapper() = default;

    function_wrapper(function_wrapper&& other)
    : impl(std::move(other.impl))
    {}

    function_wrapper& operator=(function_wrapper&& other)
    {
        impl = std::move(other.impl);
        return *this;
    }

    function_wrapper(const function_wrapper&) = delete;
    function_wrapper(function_wrapper&) = delete;
    function_wrapper& operator=(const function_wrapper&) = delete;
};

This is the abbreviated threadPool:

template <typename T>
class threadPool
{
private:
    threadsafeQueue<function_wrapper> workQueue_;   
    std::atomic_bool done_;
    std::vector<std::thread> threads_;

    void workerThread(unsigned long thread);

        T threadData_; 

    template<typename FunctionType> 
    auto submit(FunctionType&& f)
    {
        typedef typename std::result_of<FunctionType()>::type resultType;
        std::packaged_task<resultType()> task(std::move(f));
        std::future<resultType> result(task.get_future());
        workQueue_.push(std::move(task));

        return result;
    }
};


template <typename T>
void threadPool<T>::workerThread(unsigned long thread)
{
    while (!done_)
    {
        function_wrapper task;

        if(workQueue_.try_pop(task))
        {
            task();
        }
        else
        {
            std::this_thread::yield();
        }

    }
}

With the shown implementation I can submit callable objects to the pool. The variadic template function submit creates a std::packaged_task<resultType()> and puts it on the deque. The worker threads run workerThread(unsigned long thread) and do there work by popping out the tasks previously inserted by the main thread and executing them with task().

A simple example for a main function:

int main()
{
    threadPool<double> pool(5UL);

    auto fut = pool.submit(std::bind((void(*) (int,int))doTheWork,11,22));

    fut.wait();
}

What I would like to have is a submit call in my main function like:

auto fut = pool.submit(std::bind((void(*) (int,int))doTheWork,11,std::placeholders::_1));

The task execution inside my thread pool should then look like:

task(thread);

How do I have to change the variadic template function submit and the function_wrapper to achieve my goal?

Thanks in advance.

UPDATE:

I tried the function wrapper implementation from:

Return values for active objects

Now the function wrapper is moveable only and can take arguments, thanks to Yakk. But the problem now is that the new function wrapper can not handle std::bind() with std::placeholders. For example I can define a task with the new wrapper like

myWrapper<void(int,int)> task1 = doTheWork;
task1(66,77);

but I can not bind function arguments in a flexible way:

myWrapper<void()> task2 = std::bind(doTheWork, 66, 77); // ERROR 

With the error in VS2019:

error C2440:  'initializing': cannot convert from 'std::_Binder<std::_Unforced,void (__cdecl *)(int,int),int,int>' to 'myWrapper<void (void)>'
message :  No user-defined-conversion operator available that can perform this conversion, or the operator cannot be called

Or when I try it with std::placeholders:

myWrapper<void(int)> task2 = std::bind(doTheWork, 66, std::placeholders::_1); // ERROR
task2(77);

I get the following error in VS2019:

error C2338:  tuple index out of bounds
message :  see reference to class template instantiation 'std::tuple_element<0,std::tuple<>>' being compiled
message :  see reference to alias template instantiation 'std::tuple_element_t<0,std::tuple<>>' being compiled
message :  see reference to function template instantiation 'const tuple_element<_Index,std::tuple<_Rest...>>::type &&std::get(const std::tuple<_Rest...> &&) noexcept' being compiled
message :  see reference to class template instantiation 'std::result_of<const F &(void)>' being compiled
message :         with
message :         [
message :             F=std::_Binder<std::_Unforced,void (__cdecl *)(int,int),int,const std::_Ph<1> &>
message :         ]
message :  see reference to alias template instantiation 'std::result_of_t<const std::_Binder<std::_Unforced,void (__cdecl *)(int,int),int,const std::_Ph<1> &>&(void)>' being compiled
message :  see reference to class template instantiation 'std::integral_constant<bool,false>' being compiled
message :  see reference to class template instantiation 'std::disjunction<_Traits...>' being compiled
error C2440:  'initializing': cannot convert from 'std::_Binder<std::_Unforced,void (__cdecl *)(int,int),int,const std::_Ph<1> &>' to 'myWrapper<void (void)>'
message :  No user-defined-conversion operator available that can perform this conversion, or the operator cannot be called

However with std::function it´s possible:

std::function<void(int)> task3 = std::bind(doTheWork, 66, std::placeholders::_1);
task3(77);

Any hints how to implement a movable only function wrapper with flexible arguments? The aim is still to use it inside a thread pool where the function wrapper is pased to a std::packaged_task.

d.m.q.
  • 31
  • 4
  • `function_wrapper` looks like an attempt to reinvent `std::function`, except it hard-codes a single signature, the equivalent of `std::function`. Just drop `function_wrapper` and use `std::function` in its place. – Igor Tandetnik Aug 15 '19 at 14:47
  • @IgorTandetnik: Thanks for your comment. The ``function_wrapper`` is used because ``std::packaged_task<>`` are not copyable, just movable. There for ``std::function<>`` can not be used for the queue entries. ``std::function<>`` requires that the stored function objects are copy-constructible. – d.m.q. Aug 15 '19 at 14:58
  • Your virtual call method takes no argument. You say you want to pass an argument to your function. Have you tried - just passing an argument to your function? What happened? I'm really unclear on what specific problem you encountered. – Useless Aug 15 '19 at 15:12
  • Ah. Well, there are some ideas [here](https://stackoverflow.com/questions/25330716/move-only-version-of-stdfunction), but if you want to salvage `function_wrapper`, just make `call()` and `operator()` take a parameter. – Igor Tandetnik Aug 15 '19 at 15:12
  • @IgorTandetnik: Thanks for the hint. I implemented a moveable only function wrapper which takes parameter. But now I can not manage arguments in a flexible way. See Update of my question. – d.m.q. Aug 21 '19 at 13:59
  • I'm not sure I understand. Suppose you managed to implement the wrapper the way you want - how do you plan to use this facility? By what magic is `workerThread` supposed to know which task to call with one argument and which with two? Where would it obtain those extra arguments? – Igor Tandetnik Aug 21 '19 at 14:04
  • @IgorTandetnik: I want to push ``std::packaged_task``s on my threadsafe queue. The ``std::packaged_task``s wraps a callable function which in my case is the ``function_wrapper``, due to the movable only restriction. As shown in my example I want to bind arguments on this callable function. One argument should remain flexible. Because inside my thread pool each thread runs ``void threadPool::workerThread(unsigned long thread)``, tries to pop a task/callable function from the queue and executes it with its specific thread number (``unsigned long thread``), which is not known beforehand. – d.m.q. Aug 21 '19 at 16:09
  • Ah, I think I misunderstood the question. What error do you get from `myWrapper task2 = std::bind(doTheWork, 66, std::placeholders::_1);`? It feels like it should work. – Igor Tandetnik Aug 21 '19 at 20:49
  • @IgorTandetnik: Thanks for your comment. I updated my question and inserted the errors. – d.m.q. Aug 22 '19 at 07:55
  • The error message talks about `myWrapper`, not `myWrapper`. I suspect the code you show is not the code you actually compile. – Igor Tandetnik Aug 22 '19 at 14:35
  • @IgorTandetnik: You are right, thanks. The error is for ``myWrapper task2 = `` ... – d.m.q. Aug 23 '19 at 07:02

0 Answers0