2

I'm using secure websocket boost::beast implementation and I'd like to be able to receive new message while current one is being handled. So I've chosen to try it using co-routines (method with yield)

  1. this is my websocket object :
using Websocket = boost::beast::websocket::stream<
      boost::beast::ssl_stream<boost::beast::tcp_stream>>;

std::optional<Websocket> ws_;
  1. And this is how I call my websocket listener code
ws_.reset();
boost::asio::spawn(ioc_,
    [=](const boost::asio::yield_context &yield) {
        this->startAndServeWs(yield);
  });
}
  1. And this is how my websocket handler method works : notice this it's divided into 2 parts.

    First, the initialization part.

    Second, the websocket mainloop in which it ready to read new message.


So my Question is whether this code below is suitable to fetch new messages from server while handling the current message (in either sendPostFetchItems or sendPostDownloadNewVersion which can take a while since they trigger http post request and wait for server response). And if it doesn't, so can I assume that the new message will be queued, waiting for the next iteration when current message handle will be completed ?

My second question is about the catch statement, and if this is the proper way to retry the connection

void Comm::startAndServeWs(const boost::asio::yield_context &yield) {
  try {
 
    // webSocet init according to ssl context and io_context.
    ws_.emplace(ioc_, ctx_);

    boost::beast::get_lowest_layer(ws_.value())
        .expires_after(std::chrono::seconds(30));

    boost::asio::ip::tcp::resolver resolver(io_context_);

    auto const results =
        resolver.async_resolve(ip_.value(), port_.value(), yield);

    auto ep = boost::beast::get_lowest_layer(ws_.value())
                  .async_connect(results, yield);

    // Set SNI Hostname (many hosts need this to handshake successfully)
    if (!SSL_set_tlsext_host_name(ws_.value().next_layer().native_handle(),
                                  ip_.value().c_str())) {
      throw("Failed to set SNI Hostname");
    }

    // Update the host_ string. This will provide the value of the
    // Host HTTP header during the WebSocket handshake.
    // Se  e https://tools.ietf.org/html/rfc7230#section-5.4
    auto address = ip_.value() + std::to_string(ep.port());

    // Perform the SSL handshake
    ws_.value().next_layer().handshake(boost::asio::ssl::stream_base::client);

    // Turn off the timeout on the tcp_stream, because
    // the websocket stream has its own timeout system.
    boost::beast::get_lowest_layer(ws_.value()).expires_never();

    // Set suggested timeout settings for the websocket
    ws_.value().set_option(
        boost::beast::websocket::stream_base::timeout::suggested(
            boost::beast::role_type::client));

    // Set a decorator to change the User-Agent of the handshake
    ws_.value().set_option(boost::beast::websocket::stream_base::decorator(
        [](boost::beast::websocket::request_type &req) {
          req.set(boost::beast::http::field::user_agent, kWebsocketIdentifier);
        }));

    Log("trying to establish websocket in address {}", address);
            
    ws_.value().async_handshake(address, "/ws", yield);


//////////////////// here's the websocket main loop.

    for (;;) {
      boost::beast::flat_buffer buffer;
      // Read a message into our buffer
--->  ws_.value().async_read(buffer, yield);
      
      Log("websocket response buffer = {}",boost::beast::make_printable(buffer.data()));

      try {
        nlohmann::json response = nlohmann::json::parse(s);

        if (response["command"] == "fetchItems") {
          sendPostFetchItems();
        } else if (response["command"] == "getLogs") {
          sendPostDownloadNewVersion();
        }... 
    }

  } catch (std::exception &e) {
    Log("websocket reconnect failed. reason = {}", e.what());
    ws_.reset();
    timer_websocket_.expires_at(timer_websocket_.expiry() +
                                boost::asio::chrono::seconds(10));
    timer_websocket_.async_wait(
        [this]([[maybe_unused]] const boost::system::error_code &error) {
          boost::asio::spawn(io_context_,
                             [this](const boost::asio::yield_context &yield) {
                               this->startAndServeWs(yield);
                             });
        });
  }
}
Zohar81
  • 4,554
  • 5
  • 29
  • 82

0 Answers0