There is no such thing. You might be able to force something at the TCP stack (so, operating system, usually) level. E.g. disabling the network interface involved.
Note that most synchronous code can be trivially be transformed into asynchronous code with the exact same blocking semantics using asio::use_future
. That means you can use async deadlines. And those are supported by beast out of the box (basic your websocket on beast::tcp_stream
instead of asio::ip::tcp::socket
)
UPDATE
To the added code
Review/Simplify
Reduced the code removing unneeded bits and adding some fixes and demonstration handler notification so we can test the functioning:
Live On Coliru
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/signals2.hpp>
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;
class WsConnect {
using tcp = net::ip::tcp;
const std::string _host, _port;
net::io_context _ioc;
beast::flat_buffer _buffer;
websocket::stream<tcp::socket> _ws{_ioc};
enum class Status { DISCONNECTED, CONNECTED } _status{Status::DISCONNECTED};
std::atomic_bool _running{true}; // SEHE
public:
boost::signals2::signal<void(std::string)> callback; // SEHE
WsConnect(std::string host, uint16_t port)
: _host(std::move(host))
, _port(std::to_string(port)) {}
void Run() {
while (_running)
try {
switch (_status) {
case Status::DISCONNECTED: do_connect(); [[fallthrough]];
case Status::CONNECTED: do_read();
}
} catch (boost::system::system_error const& se) {
// se.code() is the error_code
_status = Status::DISCONNECTED;
std::this_thread::sleep_for(50ms); // SEHE avoid tight reconnect loop
}
}
void Close() {
_running = false;
beast::error_code ec;
_ws.close(websocket::close_code::normal, ec);
}
private:
void do_connect() {
connect(beast::get_lowest_layer(_ws), tcp::resolver(_ioc).resolve(_host, _port));
_ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
}));
_ws.handshake(_host + ':' + _port, "/");
_buffer.clear(); // SEHE this was missing
}
void do_read() {
do {
_ws.read(_buffer);
} while (!_ws.is_message_done());
callback(beast::buffers_to_string(_buffer.cdata())); // SEHE just for demo
_buffer.clear();
}
};
#include <iomanip>
#include <iostream>
void handle_message(std::string_view msg) { std::cout << "Handling " << quoted(msg) << std::endl; }
int main() {
WsConnect conn("localhost", 8989);
std::thread([&] { conn.Run(); }).detach(); // bye!
boost::signals2::scoped_connection subscription = conn.callback.connect(handle_message);
std::this_thread::sleep_for(3s);
conn.Close(); // Bad idea...
}
Exercising it for demonstration:

Design Issues
I'd argue there are three design issues:
- blocking IO operations do not afford cancellation
HandlerThread
is not a thread
detach()
-ing threads makes them ungovernable by definition.
Here's a fix, naively only time-limiting operations like in your example:
Live On Coliru
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/signals2.hpp>
#include <iostream>
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;
struct WsConnect {
boost::signals2::signal<void(std::string)> callback;
WsConnect(std::string host, uint16_t port)
: _host(std::move(host))
, _port(std::to_string(port)) {}
void Run(std::chrono::steady_clock::duration d) {
_ws.next_layer().expires_after(d);
for (;; std::this_thread::sleep_for(50ms))
try {
do_connect();
for (;;)
do_read();
} catch (boost::system::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
if (se.code() == beast::error::timeout)
break;
}
}
private:
using tcp = net::ip::tcp;
const std::string _host, _port;
beast::flat_buffer _buffer;
net::thread_pool _ioc{1};
websocket::stream<beast::tcp_stream> _ws{_ioc};
void do_connect() {
_ws.next_layer()
.async_connect(tcp::resolver(_ioc).resolve(_host, _port), net::use_future)
.get();
_ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
}));
_ws.async_handshake(_host + ':' + _port, "/", net::use_future).get();
_buffer.clear();
}
void do_read() {
do
_ws.async_read(_buffer, net::use_future).get();
while (!_ws.is_message_done());
callback(beast::buffers_to_string(_buffer.cdata()));
_buffer.clear();
}
};
void handle_message(std::string_view msg) { std::cout << "Handling " << msg << std::flush; }
int main() {
WsConnect conn("localhost", 8989);
boost::signals2::scoped_connection subscription = conn.callback.connect(handle_message);
conn.Run(3s);
} // close implied by destructors
Note how the code got simpler, shorter, and we even print error information. It's achieved by using use_future
together with beast::tcp_stream::expires_after
:

Full Cancellation
To allow for /externally triggered/ cancellation (instead of a fixed deadline), we can cheat a little by using 2 threads, so one can be "clogged" doing blocking waits on the futures:
Live On Coliru
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/signals2.hpp>
#include <iostream>
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;
struct BackgroundWs {
boost::signals2::signal<void(std::string)> callback;
BackgroundWs(std::string host, uint16_t port)
: _host(std::move(host))
, _port(std::to_string(port)) {}
void Start() {
post(_ws.get_executor(), [this] { do_run(); });
}
void Stop() { do_stop().get(); }
~BackgroundWs() {
do_stop().wait(); // noexcept, please
}
private:
using tcp = net::ip::tcp;
const std::string _host, _port;
beast::flat_buffer _buffer;
net::thread_pool _ioc{2};
websocket::stream<tcp::socket> _ws{_ioc};
bool _stopped{false};
void do_run() {
for (; !_stopped; std::this_thread::sleep_for(50ms))
try {
do_connect();
for (;;)
do_read();
} catch (boost::system::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
}
}
std::future<void> do_stop() {
return dispatch(_ws.get_executor(), std::packaged_task<void()>([this] {
_stopped = true;
_ws.next_layer().cancel();
_ws.close(websocket::normal);
}));
}
void do_connect() {
async_connect(_ws.next_layer(), tcp::resolver(_ioc).resolve(_host, _port), net::use_future)
.get();
_ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
}));
_ws.async_handshake(_host + ':' + _port, "/", net::use_future).get();
_buffer.clear();
}
void do_read() {
do
_ws.async_read(_buffer, net::use_future).get();
while (!_ws.is_message_done());
callback(beast::buffers_to_string(_buffer.cdata()));
_buffer.clear();
}
};
void handle_message(std::string_view msg) { std::cout << "Handling " << msg << std::flush; }
int main() {
{
BackgroundWs client("localhost", 8989);
boost::signals2::scoped_connection subscription = client.callback.connect(handle_message);
client.Start();
std::string input;
while (std::cout << "Enter 'Stop' to stop... " && getline(std::cin, input))
if (input == "Stop") break;
// client.Stop(); // or just rely on destructor
} // destructor
std::cout << "Press Enter to quit... ";
std::cin.ignore(1024, '\n');
}
Now, the thing runs, reconnecting happily until the user enters Stop
on the terminal:

No Cheats, C++20
In C++ using coro's you can have basically identical code being true async. This gets rid of the "cheating extra thread":
Live On Coliru
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/signals2.hpp>
#include <iostream>
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;
struct BackgroundWs {
boost::signals2::signal<void(std::string)> callback;
BackgroundWs(std::string host, uint16_t port)
: _host(std::move(host))
, _port(std::to_string(port)) {}
void Start() { net::co_spawn(_ws.get_executor(), do_run(), net::detached); }
void Stop() { do_stop().get(); }
~BackgroundWs() {
do_stop().wait(); // noexcept, please
}
private:
using tcp = net::ip::tcp;
const std::string _host, _port;
beast::flat_buffer _buffer;
net::thread_pool _ioc{1};
websocket::stream<tcp::socket> _ws{_ioc};
bool _stopped{false};
net::awaitable<void> do_run() {
for (; !_stopped; co_await async_sleep(50ms))
try {
co_await do_connect();
for (;;)
co_await do_read();
} catch (boost::system::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
}
}
net::awaitable<void> async_sleep(auto duration) {
co_await net::steady_timer(_ws.get_executor(), duration).async_wait(net::use_awaitable);
}
std::future<void> do_stop() {
return dispatch(_ws.get_executor(), std::packaged_task<void()>([this] {
_stopped = true;
_ws.next_layer().cancel();
_ws.close(websocket::normal);
}));
}
net::awaitable<void> do_connect() {
co_await async_connect(_ws.next_layer(), tcp::resolver(_ioc).resolve(_host, _port),
net::use_awaitable);
_ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
}));
co_await _ws.async_handshake(_host + ':' + _port, "/", net::use_awaitable);
_buffer.clear();
}
net::awaitable<void> do_read() {
do
co_await _ws.async_read(_buffer, net::use_awaitable);
while (!_ws.is_message_done());
callback(beast::buffers_to_string(_buffer.cdata()));
_buffer.clear();
}
};
void handle_message(std::string_view msg) { std::cout << "Handling " << msg << std::flush; }
int main() {
{
BackgroundWs client("localhost", 8989);
boost::signals2::scoped_connection subscription = client.callback.connect(handle_message);
client.Start();
std::string input;
while (std::cout << "Enter 'Stop' to stop... " && getline(std::cin, input))
if (input == "Stop") break;
// client.Stop(); // or just rely on destructor
} // destructor
std::cout << "Press Enter to quit... ";
std::cin.ignore(1024, '\n');
}
Not Using C++20
This is conceptually identical, but more tedious as it requires explicit callback functions:
Live On Coliru
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/signals2.hpp>
#include <iostream>
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;
struct BackgroundWs {
boost::signals2::signal<void(std::string)> callback;
BackgroundWs(std::string host, uint16_t port)
: _host(std::move(host))
, _port(std::to_string(port)) {}
void Start() {
net::post(_ws.get_executor(), [this] {
_stop_requested = false;
do_step_machine();
});
}
void Stop() { do_stop().get(); }
~BackgroundWs() {
do_stop().wait(); // noexcept, please
}
private:
using tcp = net::ip::tcp;
using error_code = boost::system::error_code;
const std::string _host, _port;
beast::flat_buffer _buffer;
net::thread_pool _ioc{1};
websocket::stream<tcp::socket> _ws{_ioc};
net::steady_timer _timer{_ws.get_executor()};
bool _stop_requested = false;
enum class Status { INIT, CONNECTED, DISCONNECTED, STOPPED } _status{};
void handle(error_code ec) {
std::cerr << "Error: " << ec.message() << std::endl;
if (ec.failed())
_status = _stop_requested ? Status::STOPPED : Status::DISCONNECTED;
do_step_machine();
}
void do_step_machine() {
switch(_status) {
case Status::INIT: return do_connect();
case Status::CONNECTED: return do_read();
case Status::DISCONNECTED: return do_reconnect_delay(50ms);
case Status::STOPPED: break;
};
}
void do_reconnect_delay(std::chrono::steady_clock::duration d) {
_timer.expires_after(d);
_timer.async_wait([this](error_code ec) {
if (ec) return handle(ec);
_status = Status::INIT;
do_step_machine();
});
}
std::future<void> do_stop() {
return dispatch(_ws.get_executor(), std::packaged_task<void()>([this] {
_stop_requested = true;
_ws.next_layer().cancel();
_ws.close(websocket::normal);
}));
}
void do_connect() {
async_connect( //
_ws.next_layer(), tcp::resolver(_ioc).resolve(_host, _port),
[this](error_code ec, tcp::endpoint /*ep*/) {
if (ec) return handle(ec);
_ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
req.set(beast::http::field::user_agent,
BOOST_BEAST_VERSION_STRING " WsConnect");
}));
_ws.async_handshake(_host + ':' + _port, "/", [this](error_code ec) {
if (ec) return handle(ec);
_status = Status::CONNECTED;
_buffer.clear();
do_step_machine();
});
});
}
void do_read() {
_ws.async_read(_buffer, [this](error_code ec, size_t) {
if (ec) return handle(ec);
if (_ws.is_message_done()) {
callback(beast::buffers_to_string(_buffer.cdata()));
_buffer.clear();
do_step_machine();
} else {
do_read();
}
});
}
};
void handle_message(std::string_view msg) { std::cout << "Handling " << msg << std::flush; }
int main() {
{
BackgroundWs client("localhost", 8989);
boost::signals2::scoped_connection subscription = client.callback.connect(handle_message);
client.Start();
std::string input;
while (std::cout << "Enter 'Stop' to stop... " && getline(std::cin, input))
if (input == "Stop") break;
// client.Stop(); // or just rely on destructor
} // destructor
std::cout << "Press Enter to quit... ";
std::cin.ignore(1024, '\n');
}
Again, same behaviour:
