7

I want to create a thread pool for experimental purposes (and for the fun factor). It should be able to process a wide variety of tasks (so I can possibly use it in later projects).


In my thread pool class I'm going to need some sort of task queue. Since the Standard Library provides std::packaged_task since the C++11 standard, my queue will look like std::deque<std::packaged_task<?()> > task_queue, so the client can push std::packaged_tasks into the queue via some sort of public interface function (and then one of the threads in the pool will be notified with a condition variable to execute it, etc.).


My question is related to the template argument of the std::packaged_task<?()>s in the deque.

The function signature ?() should be able to deal with any type/number of parameters, because the client can do something like:

std::packaged_task<int()> t(std::bind(factorial, 342)); thread_pool.add_task(t);

So I don't have to deal with the type/number of parameters.

But what should the return value be? (hence the question mark)

  • If I make my whole thread pool class a template class, one instance of it will only be able to deal with tasks with a specific signature (like std::packaged_task<int()>).

    I want one thread pool object to be able to deal with any kind of task.

  • If I go with std::packaged_task<void()> and the function invoked returns an integer, or anything at all, then thats undefined behaviour.

krispet krispet
  • 1,648
  • 1
  • 14
  • 25
  • 1
    If you can use Boost, then maybe something like [described in this old answer](http://stackoverflow.com/a/12267138/440558)? – Some programmer dude Jun 26 '15 at 11:45
  • You can store something like `std::deque>`, a list of callables that will wake up threads, run tasks and so on. I think that somewhat more interesting question is *where* and *when* are you going to retrieve the result of the task? You can return a future or expect callbacks, for example. – lisyarus Jun 26 '15 at 11:47
  • @lisyarus, yes, that's what `packaged_task` does, and the OP is already using it. – Jonathan Wakely Jun 26 '15 at 12:08
  • http://stackoverflow.com/a/28180524/1774667 -- write a move-only variant of `std::function`. Args should be bound using a lambda or helper prior to entering the queue. Retval populates a `future`. `packaged_task()` returns `void`. `condition_variable`s for wake up workers. `std::future` or `std::thread` for worker thread lifetimes. – Yakk - Adam Nevraumont Jun 26 '15 at 12:23
  • it's sounds like not a `thread pool` but `scheduler` – Andriy Tylychko Jun 26 '15 at 13:07

2 Answers2

10

So the hard part is that packaged_task<R()> is move-only, otherwise you could just toss it into a std::function<void()>, and run those in your threads.

There are a few ways around this.

First, ridiculously, use a packaged_task<void()> to store a packaged_task<R()>. I'd advise against this, but it does work. ;) (what is the signature of operator() on packaged_task<R()>? What is the required signature for the objects you pass to packaged_task<void()>?)

Second, wrap your packaged_task<R()> in a shared_ptr, capture that in a lambda with signature void(), store that in a std::function<void()>, and done. This has overhead costs, but probably less than the first solution.

Finally, write your own move-only function wrapper. For the signature void() it is short:

struct task {
  template<class F,
    class dF=std::decay_t<F>,
    class=decltype( std::declval<dF&>()() )
  >
  task( F&& f ):
    ptr(
      new dF(std::forward<F>(f)),
      [](void* ptr){ delete static_cast<dF*>(ptr); }
    ),
    invoke([](void*ptr){
      (*static_cast<dF*>(ptr))();
    })
  {}
  void operator()()const{
    invoke( ptr.get() );
  }
  task(task&&)=default;
  task&operator=(task&&)=default;
  task()=default;
  ~task()=default;
  explicit operator bool()const{return static_cast<bool>(ptr);}
private:
  std::unique_ptr<void, void(*)(void*)> ptr;
  void(*invoke)(void*) = nullptr;
};

and simple. The above can store packaged_task<R()> for any type R, and invoke them later.

This has relatively minimal overhead -- it should be cheaper than std::function, at least the implementations I've seen -- except it does not do SBO (small buffer optimization) where it stores small function objects internally instead of on the heap.

You can improve the unique_ptr<> ptr container with a small buffer optimization if you want.

Yakk - Adam Nevraumont
  • 262,606
  • 27
  • 330
  • 524
  • Why is it that the std::packaged_task version works? I've tried it on 3 different machines with 2 compilers, and it actually does work. But it shouldn't. I mean, that should be undefined behaviour. How did you know that it works fine, and why do you advise against it? – krispet krispet Jun 26 '15 at 22:04
  • @krispet `packaged_task` has an `operator()` woth signature `void()`, because the return value comes out the future, not the `()`. `packaged_task` can store any callable, destroyable, movable object that can be invoked with signature `void()`, which includes `packaged_task`! Basically packaged task implocitly includes an implementation analogous to my `task` above, a move-only `std::function`. It has lots of other stuff to, which makes using it that way inefficient. – Yakk - Adam Nevraumont Jun 28 '15 at 11:20
  • @Yakk-AdamNevraumont Do you have a syntax for your second method ? I had a similar question and I wanted different threads or classes to wrap their work ( function to be called when thread runs ) as an object of that task class which can then be invoked as a callable, with the flexibility to call different functions , taking different parameters and hence a more generic task class which can then be a part of generic thread pool code. – Invictus Apr 20 '23 at 16:00
  • @Yakk-AdamNevraumont Also in your solution you assigned class T in ptr with value as `new dF(std::forward(f))` is this not expected to be a pointer while declaration of ptr is as `std::unique_ptr ptr;` taking with T being declared as void but passed a void* as it is a new operation. Or my understanding is wrong here ? – Invictus Apr 20 '23 at 16:40
  • @Invictus if you want a detailed description, I'd make a post about writing a move only function. The skeetch I wrote probably has a few glitches, but comments aren't a great place to chat about them. (And yes, function pointers need to be handled special, because you can't copy a function). – Yakk - Adam Nevraumont Apr 20 '23 at 23:08
4

I happen to have an implementation which does exactly that. My way of doing things is to wrap the std::packaged_task objects in a struct which abstracts away the return type. The method which submits a task into the thread pool returns a future on the result.

This kind of works, but due to the memory allocations required for each task it is not suitable for tasks which are very short and very frequent (I tried to use it to parallelize chunks of a fluid simulation and the overhead was way too high, in the order of several milliseconds for 324 tasks).

The key part is this structure:

struct abstract_packaged_task
{
    template <typename R>
    abstract_packaged_task(std::packaged_task<R> &&task):
        m_task((void*)(new std::packaged_task<R>(std::move(task)))),
        m_call_exec([](abstract_packaged_task *instance)mutable{
            (*(std::packaged_task<R>*)instance->m_task)();
        }),
        m_call_delete([](abstract_packaged_task *instance)mutable{
            delete (std::packaged_task<R>*)(instance->m_task);
        })
    {

    }

    abstract_packaged_task(abstract_packaged_task &&other);

    ~abstract_packaged_task();

    void operator()();

    void *m_task;
    std::function<void(abstract_packaged_task*)> m_call_exec;
    std::function<void(abstract_packaged_task*)> m_call_delete;
};

As you can see, it hides away the type dependencies by using lambdas with std::function and a void*. If you know the maximum size of all possibly occuring std::packaged_task objects (I have not checked whether the size has a dependency on R at all), you could try to further optimize this by removing the memory allocation.

The submission method into the thread pool then does this:

template <typename R>
std::future<R> submit_task(std::packaged_task<R()> &&task)
{
    assert(m_workers.size() > 0);
    std::future<R> result = task.get_future();
    {
        std::unique_lock<std::mutex> lock(m_queue_mutex);
        m_task_queue.emplace_back(std::move(task));
    }
    m_queue_wakeup.notify_one();
    return result;
}

where m_task_queue is an std::deque of abstract_packaged_task structs. m_queue_wakeup is a std::condition_variable to wake a worker thread up to pick up the task. The worker threads implementation is as simple as:

void ThreadPool::worker_impl()
{
    std::unique_lock<std::mutex> lock(m_queue_mutex, std::defer_lock);
    while (!m_terminated) {
        lock.lock();
        while (m_task_queue.empty()) {
            m_queue_wakeup.wait(lock);
            if (m_terminated) {
                return;
            }
        }
        abstract_packaged_task task(std::move(m_task_queue.front()));
        m_task_queue.pop_front();
        lock.unlock();

        task();
    }
}

You can take a look at the full source code and the corresponding header on my github.

Jonas Schäfer
  • 20,140
  • 5
  • 55
  • 69
  • 1
    Take `F&&` instead of `packaged_task`. Cast to `std::decay::type`. Replace `void*` with `unique_ptr`, remove dtor and `m_call_delete`. Replace `m_call_exec` type with `void(*)(abstract_packaged_task*)`. Result: simpler code, faster code, works on any callable. Class becomes movable and not copyable. – Yakk - Adam Nevraumont Jun 26 '15 at 13:54