I want a WebSocket client that's capable of reading and writing asynchronosuly. Reading is working A-OK thanks to other StackOverflow user. The idea behind reading is there's a thread that's always reading, permamently blocked.
My writing-part idea is that user of the class can Write() to a stringstream as much as they want and eventually WebSocket will pick it up to send it, clearing the stringsteam out, ready for new user Write()'s.
The problem is boost::beast read operation is blocking. Of course there is async read too, but it's still blocking - just blocks another thread or IOC forever, until any data comes.
I was trying to use boost::asio::deadline_timer to timeout read operation, and let writes happen, but the only examples I found close the socket entirely with socket.cancel(), and it ended up with unhandled exceptions for me. Probably because .cancel() was called on a websocket thread.
Maybe the design is flawed from the very beginning? Yes I can imagine just running two threads, one is stuck on read, and the other takes care of the writes + handling connection issues. But I don't really know how to separate these tasks into separate threads. I think that if I can't distinguish threads of a thread_pool, two threads of that thread_pool might eventually end up doing same thing - possibly both stuck on reading.
My other approach was to define two IOC's, that I'd manually .run_for() for a couple of seconds, then switch if write operations are queued up. Of course it failed because websocket is bound to one executor (single IOC).
Here's the code that sums up the idea. Or on Coliru.
#include <string>
#include <chrono>
#include <mutex>
#include <vector>
#include <algorithm>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
class WebsocketPayload {
using ErrorCallbackT = std::function<void(const boost::system::error_code&)>;
using MessageCallbackT = std::function<void(const boost::beast::flat_buffer&)>;
public:
ErrorCallbackT OnError;
MessageCallbackT OnMessage;
// Declare Boost stuff as last, its important for destruction order... unfortunately.
private:
const std::string _host, _port;
boost::beast::flat_buffer _readBuffer;
std::string _writeBuffer;
std::stringstream _writeQueue;
std::mutex _writeLock;
boost::asio::thread_pool _ioc{1};
boost::beast::websocket::stream<boost::asio::ip::tcp::socket> _ws{_ioc};
boost::asio::steady_timer _timer{_ws.get_executor()};
std::chrono::milliseconds _tryReconnectTimeout;
bool _stop_requested = false;
enum class Status { INIT, CONNECTED, DISCONNECTED, STOPPED } _status{};
enum class Operation { READ, WRITE } _lastOperation;
public:
WebsocketPayload(const std::string& host, const std::string& port, uint64_t reconnectTimeoutMs)
: _host(host)
, _port(port)
, _tryReconnectTimeout(std::chrono::milliseconds(reconnectTimeoutMs))
{}
~WebsocketPayload() {
Stop();
}
void Start() {
boost::asio::post(_ws.get_executor(), [this] {
_stop_requested = false;
do_step_machine();
});
}
void Stop() {
do_stop();
}
void Write(const std::string& text) {
std::lock_guard<std::mutex> lock { _writeLock };
_writeQueue << text;
}
private:
void handle_error(const boost::system::error_code& ec) {
if (ec.failed()) {
_status = _stop_requested ? Status::STOPPED : Status::DISCONNECTED;
}
OnError(ec);
do_step_machine();
}
void handle_message(const boost::beast::flat_buffer& data) {
OnMessage(_readBuffer);
_readBuffer.clear();
do_step_machine();
}
void do_connect() {
using tcp = boost::asio::ip::tcp;
using error_code = boost::system::error_code;
using namespace boost;
async_connect(
_ws.next_layer(), tcp::resolver(_ioc).resolve(_host, _port),
[this](error_code ec, tcp::endpoint /*ep*/) {
if (ec) return handle_error(ec);
_ws.set_option(beast::websocket::stream_base::decorator([](beast::websocket::request_type& req) {
req.set(beast::http::field::user_agent,
BOOST_BEAST_VERSION_STRING " WsConnect");
}));
_ws.async_handshake(_host + ':' + _port, "/", [this](error_code ec) {
if (ec) return handle_error(ec);
_status = Status::CONNECTED;
_readBuffer.clear();
do_step_machine();
});
}
);
}
void do_read() {
using error_code = boost::system::error_code;
using namespace boost;
// Timeout if nothing comes??
_ws.async_read(_readBuffer, [this](error_code ec, size_t) {
if (ec) return handle_error(ec);
if (_ws.is_message_done()) {
return handle_message(_readBuffer);
} else {
do_read();
}
});
_lastOperation = Operation::READ;
}
void do_write() {
using error_code = boost::system::error_code;
using namespace boost;
if (_writeBuffer.empty()) {
std::lock_guard<std::mutex> lock { _writeLock };
_writeBuffer = _writeQueue.str();
}
_ws.async_write(asio::buffer(_writeBuffer), [this](error_code ec, size_t) {
if (ec) return handle_error(ec);
_writeBuffer.clear();
});
_lastOperation = Operation::WRITE;
}
void do_reconnect_delay() {
using error_code = boost::system::error_code;
_timer.expires_after(_tryReconnectTimeout);
_timer.async_wait([this](error_code ec) {
if (ec) return handle_error(ec);
_status = Status::INIT;
do_step_machine();
});
}
std::future<void> do_stop() {
using namespace boost;
return dispatch(_ws.get_executor(), std::packaged_task<void()>([this] {
_stop_requested = true;
_ws.next_layer().cancel();
}));
}
void do_step_machine() {
switch(_status) {
case Status::INIT: return do_connect();
case Status::CONNECTED: {
switch (_lastOperation) {
case Operation::READ: return do_write();
case Operation::WRITE: return do_read();
};
case Status::DISCONNECTED: return do_reconnect_delay();
case Status::STOPPED: break;
};
}
}
};