As others have pointed out, the main culprit is the work
instance.
I'd much simplify the interface (there's really no reason to split shutdown
into shutdown
, _shutdown
, join_all
and some random logic in the destructor as well. That just makes it hard to know what responsibility is where.
The interface should be a Pit Of Success - easy to use right, hard to use wrong.
At the same time it makes it much easier to implement it correctly.
Here's a first stab:
Live On Coliru
#include <boost/asio.hpp>
#include <boost/thread.hpp>
namespace ba = boost::asio;
struct ThreadPool {
ThreadPool(unsigned poolSize = boost::thread::hardware_concurrency());
~ThreadPool();
void start();
template <typename F, typename... Args>
void execute(F f, Args&&... args) {
ioService.post(std::bind(f, std::forward<Args>(args)...));
}
private:
unsigned threadAmount;
ba::io_service ioService;
boost::thread_group pool;
std::unique_ptr<ba::io_service::work> work;
void shutdown();
};
ThreadPool::ThreadPool(
unsigned poolSize /*= boost::thread::hardware_concurrency()*/) {
threadAmount = std::max(1u, poolSize);
threadAmount = std::min(boost::thread::hardware_concurrency(), poolSize);
}
ThreadPool::~ThreadPool() {
shutdown();
}
void ThreadPool::start() {
if (!work) {
work = std::make_unique<ba::io_service::work>(ioService);
for (unsigned i = 0; i < threadAmount; ++i) {
pool.create_thread(
boost::bind(&ba::io_service::run, &ioService));
}
}
}
void ThreadPool::shutdown() {
work.reset();
pool.interrupt_all();
ioService.stop();
pool.join_all();
ioService.reset();
}
#include <iostream>
using namespace std::chrono_literals;
int main() {
auto now = std::chrono::high_resolution_clock::now;
auto s = now();
{
ThreadPool p(10);
p.start();
p.execute([] { std::this_thread::sleep_for(1s); });
p.execute([] { std::this_thread::sleep_for(600ms); });
p.execute([] { std::this_thread::sleep_for(400ms); });
p.execute([] { std::this_thread::sleep_for(200ms); });
p.execute([] { std::this_thread::sleep_for(10ms); });
}
std::cout << "Total elapsed: " << (now() - s) / 1.0s << "s\n";
}
Which on most multi-core systems will print something like on mine:
Total elapsed: 1.00064s
It looks like you had an error in calculating threadAmount
where you'd take 1
if poolSize
was more than hardware_concurrency.
To be honest, why have the bind
in the implementation? It really doesn't add a lot, you can leave it up to the caller, and they can choose whether they use bind, and if so, whether it's boost::bind
, std::bind
or some other way of composing calleables:
template <typename F>
void execute(F f) { ioService.post(f); }
You're missing exception handling around io_service::run
calls (see Should the exception thrown by boost::asio::io_service::run() be caught?).
If you're using recent boost version, you can use the newer io_context
and thread_pool
interfaces, greatly simplifying things:
Live On Coliru
#include <boost/asio.hpp>
struct ThreadPool {
ThreadPool(unsigned poolSize)
: pool(std::clamp(poolSize, 1u, std::thread::hardware_concurrency()))
{ }
template <typename F>
void execute(F f) { post(pool, f); }
private:
boost::asio::thread_pool pool;
};
This still has 99% of the functionality¹, but in 10 LoC.
In fact, the class has become a trivial wrapper, so we could just write:
Live On Coliru
#include <boost/asio.hpp>
#include <iostream>
using namespace std::chrono_literals;
using C = std::chrono::high_resolution_clock;
static void sleep_for(C::duration d) { std::this_thread::sleep_for(d); }
int main() {
auto s = C::now();
{
boost::asio::thread_pool pool;
post(pool, [] { sleep_for(1s); });
post(pool, [] { sleep_for(600ms); });
// still can bind if you want
post(pool, std::bind(sleep_for, 400ms));
post(pool, std::bind(sleep_for, 200ms));
post(pool, std::bind(sleep_for, 10ms));
//pool.join(); // implicit in destructor
}
std::cout << "Total elapsed: " << (C::now() - s) / 1.0s << "s\n";
}
Main difference is the default pool size: it is 2*hardware concurrency (but also calculated more safely, because not all platforms have a reliable hardware_concurrency() - it could be zero, e.g.).
¹ It doesn't currently exercise interruptions points