0

I am trying to implement simple thread pool using boost library.

Here is code:

//boost::asio::io_service ioService;
//boost::thread_group pool;
//boost::asio::io_service::work* worker;

ThreadPool::ThreadPool(int poolSize /*= boost::thread::hardware_concurrency()*/)
{
    if (poolSize >= 1 && poolSize <= boost::thread::hardware_concurrency())
        threadAmount = poolSize;
    else
        threadAmount = 1;

    worker = NULL;
}

ThreadPool::~ThreadPool()
{
    if (worker != NULL && !ioService.stopped())
    {
        _shutdown();
        delete worker;
        worker = NULL;
    }
}

void ThreadPool::start()
{
    if (worker != NULL)
    {
        return;
    }

    worker = new boost::asio::io_service::work(ioService);

    for (int i = 0; i < threadAmount; ++i)
    {
        pool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));
    }
}

template<class F, class...Args>
void ThreadPool::execute(F f, Args&&... args)
{
    ioService.post(boost::bind(f, std::forward<Args>(args)...));
}

void ThreadPool::shutdown()
{
    pool.interrupt_all();
    _shutdown();
}

void ThreadPool::join_all()
{
// wait for all threads before continue
// in other words - barier for all threads when they finished all jobs
// and to be able re-use them in futur.
}

void ThreadPool::_shutdown()
{
    ioService.reset();
    ioService.stop();
}

In my program i assign to thread pool some tasks that needs to be done, and going further with main thread. At some point i need to wait for all threads to finished all tasks before i could proceed calculations. Is there any way to do this ?

Thanks a lot.

  • `join()` all the threads. – Jesper Juhl May 10 '20 at 14:32
  • Yep. Also read the API documentation for the things you are using, so you have an overview of what operations these types support. As a new user here, please also take the [tour] and read [ask]. – Ulrich Eckhardt May 10 '20 at 14:34
  • @JesperJuhl i tried to use thread_group.join_all() but it 'locked' further execution for main thread. – Levantail Yolo May 10 '20 at 15:16
  • @LevantailYolo Then I guess all your threads did not terminate (or at least didn't do so within the time you waited). – Jesper Juhl May 10 '20 at 15:19
  • `run` hangs because you use `work` without deleting it. When you destroy `work`, its destructor will inform `io_service` that `run` can return when there are no any pending handlers. – rafix07 May 10 '20 at 17:59
  • @rafix07 in case if i delete `work` all threads that are finished their work, are exit. I need to re-use in future. E.g. im looking for barrier for all threads in pool. – Levantail Yolo May 11 '20 at 00:33

1 Answers1

0

As others have pointed out, the main culprit is the work instance.

I'd much simplify the interface (there's really no reason to split shutdown into shutdown, _shutdown, join_all and some random logic in the destructor as well. That just makes it hard to know what responsibility is where.

The interface should be a Pit Of Success - easy to use right, hard to use wrong.

At the same time it makes it much easier to implement it correctly.

Here's a first stab:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/thread.hpp>

namespace ba = boost::asio;

struct ThreadPool {
    ThreadPool(unsigned poolSize = boost::thread::hardware_concurrency());
    ~ThreadPool();
    void start();

    template <typename F, typename... Args>
    void execute(F f, Args&&... args) {
        ioService.post(std::bind(f, std::forward<Args>(args)...));
    }
  private:
    unsigned threadAmount;
    ba::io_service ioService;
    boost::thread_group pool;
    std::unique_ptr<ba::io_service::work> work;
    void shutdown();
};

ThreadPool::ThreadPool(
    unsigned poolSize /*= boost::thread::hardware_concurrency()*/) {
    threadAmount = std::max(1u, poolSize);
    threadAmount = std::min(boost::thread::hardware_concurrency(), poolSize);
}

ThreadPool::~ThreadPool() {
    shutdown();
}

void ThreadPool::start() {
    if (!work) {
        work = std::make_unique<ba::io_service::work>(ioService);

        for (unsigned i = 0; i < threadAmount; ++i) {
            pool.create_thread(
                boost::bind(&ba::io_service::run, &ioService));
        }
    }
}

void ThreadPool::shutdown() {
    work.reset();

    pool.interrupt_all();
    ioService.stop();

    pool.join_all();

    ioService.reset();
}

#include <iostream>
using namespace std::chrono_literals;

int main() {
    auto now = std::chrono::high_resolution_clock::now;
    auto s = now();
    {
        ThreadPool p(10);
        p.start();

        p.execute([] { std::this_thread::sleep_for(1s); });
        p.execute([] { std::this_thread::sleep_for(600ms); });
        p.execute([] { std::this_thread::sleep_for(400ms); });
        p.execute([] { std::this_thread::sleep_for(200ms); });
        p.execute([] { std::this_thread::sleep_for(10ms); });
    }
    std::cout << "Total elapsed: " << (now() - s) / 1.0s << "s\n";
}

Which on most multi-core systems will print something like on mine:

Total elapsed: 1.00064s

It looks like you had an error in calculating threadAmount where you'd take 1 if poolSize was more than hardware_concurrency.

To be honest, why have the bind in the implementation? It really doesn't add a lot, you can leave it up to the caller, and they can choose whether they use bind, and if so, whether it's boost::bind, std::bind or some other way of composing calleables:

template <typename F>
void execute(F f) { ioService.post(f); }

You're missing exception handling around io_service::run calls (see Should the exception thrown by boost::asio::io_service::run() be caught?).

If you're using recent boost version, you can use the newer io_context and thread_pool interfaces, greatly simplifying things:

Live On Coliru

#include <boost/asio.hpp>

struct ThreadPool {
    ThreadPool(unsigned poolSize)
        : pool(std::clamp(poolSize, 1u, std::thread::hardware_concurrency()))
    { }

    template <typename F>
    void execute(F f) { post(pool, f); }
  private:
    boost::asio::thread_pool pool;
};

This still has 99% of the functionality¹, but in 10 LoC.

In fact, the class has become a trivial wrapper, so we could just write:

Live On Coliru

#include <boost/asio.hpp>
#include <iostream>
using namespace std::chrono_literals;
using C = std::chrono::high_resolution_clock;

static void sleep_for(C::duration d) { std::this_thread::sleep_for(d); }

int main() {
    auto s = C::now();
    {
        boost::asio::thread_pool pool;

        post(pool, [] { sleep_for(1s); });
        post(pool, [] { sleep_for(600ms); });
        // still can bind if you want
        post(pool, std::bind(sleep_for, 400ms));
        post(pool, std::bind(sleep_for, 200ms));
        post(pool, std::bind(sleep_for, 10ms));

        //pool.join(); // implicit in destructor
    }
    std::cout << "Total elapsed: " << (C::now() - s) / 1.0s << "s\n";
}

Main difference is the default pool size: it is 2*hardware concurrency (but also calculated more safely, because not all platforms have a reliable hardware_concurrency() - it could be zero, e.g.).


¹ It doesn't currently exercise interruptions points

sehe
  • 374,641
  • 47
  • 450
  • 633
  • I admit, your example is a lot more readable and cleaner then mine, thanks for your effort. About `join_all`, i intend to leave this function, to implement some kind of barrier for all threads. E.g. when main thread call this `join_all`, all threads in pool should end up with all assigned jobs. After synchronizing main thread may continue his work. This is pretty similar to MPI_Barrier. – Levantail Yolo May 11 '20 at 15:15
  • But there is no way to join all the threads without effectively having shut down all threads. So you can have just `start()` and `stop()`? (Where the destructor just invokes `stop();`?) – sehe May 11 '20 at 15:17
  • I just realize that `join_all` function name is misleading. As i wrote above, i simply looking for some kind of barrier to make it for similar to [MPI_Barrier](https://www.mpich.org/static/docs/latest/www3/MPI_Barrier.html). – Levantail Yolo May 12 '20 at 13:30