It's more than a little weird to use a heartbeat... "sender" thread with async IO.
What's more, there is no synchronization on the socket object, so that's a data race which is Undefined Behavior.
Finally, this is unsafe:
std::string data = boost::asio::buffer_cast<const char*>(buf.data());
It assumes that the data() will be NUL-terminated (which isn't true).
Typical, Single Threaded ASIO
You would not spawn threads for timers, but use e.g. boost::asio::deadline_timer
or boost::asio::highresolution_timer
. It can wait asynchronously, so you can do other tasks on the IO service until it expires.
Similarly you can do the request/response reading/writing asynchronously. The only "complicating" factor is that asynchronous calls don't complete before returning, so you have to make sure the buffers live long enough (they should not be a local variable).
Now, you already have a logical "unit" of lifetime that practically JUMPS out of the code at you:

That just screams to be rewritten as
struct LifeTimeUnit {
boost::asio::ip::tcp::socket sock;
void process();
std::string read_data();
void write_data(std::string);
void hearbeatSender(sock);
};
Of course LifeTimeUnit
is a funny name, so let's think of a better one: Session
seems meaningful!
Now that we have a unit of lifetime, it can handsomely contain other things like buffers and the timer:
struct Session {
Session(tcp::socket&& s) : sock(std::move(s)) {}
void start() {
hb_wait();
req_loop();
}
void cancel() {
hbtimer.cancel();
sock.cancel(); // or shutdown() e.g.
}
private:
bool checked(error_code ec, std::string const& msg = "error") {
if (ec) {
std::clog << msg << ": " << ec.message() << "\n";
cancel();
}
return !ec.failed();;
}
void req_loop(error_code ec = {}) {
if (!checked(ec, "req_loop")) {
async_read_until(sock, buf, "\n",
[this](error_code ec, size_t xfr) { on_request(ec, xfr); });
}
}
void on_request(error_code ec, size_t n) {
if (checked(ec, "on_request")) {
request.resize(n);
buf.sgetn(request.data(), n);
response = "Case " + std::to_string(request.at(0) - '0') + "\n";
async_write(sock, buffer(response),
[this](error_code ec, size_t) { req_loop(ec); });
}
}
void hb_wait(error_code ec = {}) {
if (checked(ec, "hb_wait")) {
hbtimer.expires_from_now(2s);
hbtimer.async_wait([this](error_code ec) { hb_send(ec); });
}
}
void hb_send(error_code ec) {
if (checked(ec, "hb_send")) {
async_write(sock, buffer(hbmsg), [this](error_code ec, size_t) { hb_wait(ec); });
}
}
tcp::socket sock;
boost::asio::high_resolution_timer hbtimer { sock.get_executor() };
const std::string hbmsg = "HEARTBEAT\n";
boost::asio::streambuf buf;
std::string request, response;
};
The only public things are start()
(actually we don't have a need for cancel()
for now, but you know).
The main program can be much un-altered:
tcp::acceptor acceptor(io, tcp::v4());
acceptor.bind({{}, 3333});
acceptor.listen();
tcp::socket sock(io);
acceptor.accept(sock);
Session sess(std::move(sock));
sess.start(); // does both request loop and the heartbeat
io.run();
No more threads, perfect asynchrony! Using bash
and netcat
to test:
while sleep 4; do printf "%d request\n" {1..10}; done | netcat localhost 3333
Prints:
host 3333
HEARTBEAT
Case 1
Case 2
Case 3
Case 4
Case 5
Case 6
Case 7
Case 8
Case 9
Case 1
HEARTBEAT
HEARTBEAT
HEARTBEAT
Case 1
Case 2
Case 3
Case 4
Case 5
Case 6
Case 7
Case 8
Case 9
Case 1
^C
After stopping the client, the server exits with
on_request: End of file
hb_send: Operation canceled
Single-Thread / Multi-Session
A big advantage is that now you can accept multiple clients on a single server thread. In fact, thousands of them concurrently without a problem.
int main() {
boost::asio::thread_pool io(1);
try {
tcp::acceptor acceptor(io, tcp::v4());
acceptor.bind({{}, 3333});
acceptor.listen();
std::list<Session> sessions;
while (true) {
tcp::socket sock(io);
acceptor.accept(sock);
auto& sess = sessions.emplace_back(std::move(sock));
sess.start(); // does both request loop and the heartbeat
sessions.remove_if([](Session& s) { return !s.is_active(); });
}
io.join();
} catch (boost::system::system_error& e) {
std::cout << "Error occured! Error code = " << e.code() << ". Message: " << e.code().message() << "\n";
return e.code().value();
}
}
Note how we subtly changed our execution-context to a singleton thread pool.
This means we still run all sessions on a single thread, but that's a different thread than running main()
, meaning we can continue to accept connections.
To avoid ever-increasing sessions
list, we weed out the inactive ones using a trivially implemented is_active()
property.
Note that we can ALMOST force a shutdown by doing
for (auto& sess: sessions)
sess.cancel();
That's ALMOST, because it requires posting the cancel operations on the pool thread:
for (auto& sess: sessions)
post(io, [&sess] { sess.cancel(); });
This is to avoid racing with any tasks on the IO pool
Since only the main thread ever touches sessions
there is no need for locking.
Live On Coliru
Testing with
for a in 3 2 1; do (sleep $a; echo "$a request" | nc 127.0.0.1 3333)& done; time wait
Prints:
Case 1
Case 2
Case 3
HEARTBEAT
HEARTBEAT
...
Multi-Threading For The Win?
Now we could add multi-threading. The changes are mild:
- we want to associate the socket with a strand (see Why do I need strand per connection when using boost::asio?)
- note we already use
sock
's executor to run the timer
We have to take extra precautions to make all of the public interface in Session
thread-safe:
- post actions from
start()
and cancel()
on the strand
- make the
active
flag atomic_bool
next up, we simply increase the number of threads in the pool from 1
to, say 10
Note, in practice it rarely makes sense to use more threads than logical cores. Also, in this simple example everything is IO bound, so a single thread probably already serves as well. This is just for demonstration
Live On Coliru
boost::asio::thread_pool io(10);
try {
tcp::acceptor acceptor(io, tcp::v4());
acceptor.set_option(tcp::acceptor::reuse_address(true));
acceptor.bind({{}, 3333});
acceptor.listen();
std::list<Session> sessions;
while (true) {
tcp::socket sock(make_strand(io)); // NOTE STRAND!
// ...
// ...
io.join();
And the changes in Session
:
void start() {
active = true;
post(sock.get_executor(), [this]{
hb_wait();
req_loop();
});
}
void cancel() {
post(sock.get_executor(), [this]{
hbtimer.cancel();
sock.cancel(); // or shutdown() e.g.
active = false;
});
}
// ....
std::atomic_bool active {false};
}